tornado.queues – Queues for coroutines

New in version 4.2.

Asynchronous queues for coroutines. These classes are very similar to those provided in the standard library’s asyncio package.

Warning

Unlike the standard library’s queue module, the classes defined here are not thread-safe. To use these queues from another thread, use IOLoop.add_callback to transfer control to the IOLoop thread before calling any queue methods.

Classes

Queue

class tornado.queues.Queue(maxsize=0)[source]

Coordinate producer and consumer coroutines.

If maxsize is 0 (the default) the queue size is unbounded.

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

In Python 3.5, Queue implements the async iterator protocol, so consumer() could be rewritten as:

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

Changed in version 4.3: Added async for support in Python 3.5.

maxsize

Number of items allowed in the queue.

qsize()[source]

Number of items in the queue.

put(item, timeout=None)[source]

Put an item into the queue, perhaps waiting until there is room.

Returns a Future, which raises tornado.util.TimeoutError after a timeout.

timeout may be a number denoting a time (on the same scale as tornado.ioloop.IOLoop.time, normally time.time), or a datetime.timedelta object for a deadline relative to the current time.

put_nowait(item)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

get(timeout=None)[source]

Remove and return an item from the queue.

Returns a Future which resolves once an item is available, or raises tornado.util.TimeoutError after a timeout.

timeout may be a number denoting a time (on the same scale as tornado.ioloop.IOLoop.time, normally time.time), or a datetime.timedelta object for a deadline relative to the current time.

get_nowait()[source]

Remove and return an item from the queue without blocking.

Return an item if one is immediately available, else raise QueueEmpty.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get used to fetch a task, a subsequent call to task_done tells the queue that the processing on the task is complete.

If a join is blocking, it resumes when all items have been processed; that is, when every put is matched by a task_done.

Raises ValueError if called more times than put.

join(timeout=None)[source]

Block until all items in the queue are processed.

Returns a Future, which raises tornado.util.TimeoutError after a timeout.

PriorityQueue

class tornado.queues.PriorityQueue(maxsize=0)[source]

A Queue that retrieves entries in priority order, lowest first.

Entries are typically tuples like (priority number, data).

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize=0)[source]

A Queue that retrieves the most recently put items first.

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

Exceptions

QueueEmpty

exception tornado.queues.QueueEmpty[source]

Raised by Queue.get_nowait when the queue has no items.

QueueFull

exception tornado.queues.QueueFull[source]

Raised by Queue.put_nowait when a queue is at its maximum size.