tornado.queues
– Queues for coroutines¶
New in version 4.2.
Asynchronous queues for coroutines. These classes are very similar to those provided in the standard library’s asyncio package.
Warning
Unlike the standard library’s queue
module, the classes defined here
are not thread-safe. To use these queues from another thread,
use IOLoop.add_callback
to transfer control to the IOLoop
thread
before calling any queue methods.
Classes¶
Queue¶
-
class
tornado.queues.
Queue
(maxsize: int = 0)[source]¶ Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) async def consumer(): async for item in q: try: print('Doing work on %s' % item) await gen.sleep(0.01) finally: q.task_done() async def producer(): for item in range(5): await q.put(item) print('Put %s' % item) async def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) await producer() # Wait for producer to put all tasks. await q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main)
Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done
In versions of Python without native coroutines (before 3.5),
consumer()
could be written as:@gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
Changed in version 4.3: Added
async for
support in Python 3.5.-
maxsize
¶ Number of items allowed in the queue.
-
put
(item: _T, timeout: Union[float, datetime.timedelta, None] = None) → Future[None][source]¶ Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises
tornado.util.TimeoutError
after a timeout.timeout
may be a number denoting a time (on the same scale astornado.ioloop.IOLoop.time
, normallytime.time
), or adatetime.timedelta
object for a deadline relative to the current time.
-
put_nowait
(item: _T) → None[source]¶ Put an item into the queue without blocking.
If no free slot is immediately available, raise
QueueFull
.
-
get
(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[_T][source]¶ Remove and return an item from the queue.
Returns an awaitable which resolves once an item is available, or raises
tornado.util.TimeoutError
after a timeout.timeout
may be a number denoting a time (on the same scale astornado.ioloop.IOLoop.time
, normallytime.time
), or adatetime.timedelta
object for a deadline relative to the current time.Note
The
timeout
argument of this method differs from that of the standard library’squeue.Queue.get
. That method interprets numeric values as relative timeouts; this one interprets them as absolute deadlines and requirestimedelta
objects for relative timeouts (consistent with other timeouts in Tornado).
-
get_nowait
() → _T[source]¶ Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise
QueueEmpty
.
-
task_done
() → None[source]¶ Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each
get
used to fetch a task, a subsequent call totask_done
tells the queue that the processing on the task is complete.If a
join
is blocking, it resumes when all items have been processed; that is, when everyput
is matched by atask_done
.Raises
ValueError
if called more times thanput
.
-
join
(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[None][source]¶ Block until all items in the queue are processed.
Returns an awaitable, which raises
tornado.util.TimeoutError
after a timeout.
-
PriorityQueue¶
-
class
tornado.queues.
PriorityQueue
(maxsize: int = 0)[source]¶ A
Queue
that retrieves entries in priority order, lowest first.Entries are typically tuples like
(priority number, data)
.from tornado.queues import PriorityQueue q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait())
(0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item')
Exceptions¶
QueueEmpty¶
-
exception
tornado.queues.
QueueEmpty
[source]¶ Raised by
Queue.get_nowait
when the queue has no items.
QueueFull¶
-
exception
tornado.queues.
QueueFull
[source]¶ Raised by
Queue.put_nowait
when a queue is at its maximum size.