# Copyright 2012 Facebook
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utilities for working with threads and ``Futures``.
``Futures`` are a pattern for concurrent programming introduced in
Python 3.2 in the `concurrent.futures` package (this package has also
been backported to older versions of Python and can be installed with
``pip install futures``). Tornado will use `concurrent.futures.Future` if
it is available; otherwise it will use a compatible class defined in this
from __future__ import absolute_import, division, print_function, with_statement
from tornado.stack_context import ExceptionStackContext, wrap
from tornado.util import raise_exc_info, ArgReplacer
from concurrent import futures
futures = None
"""Placeholder for an asynchronous result.
Similar to `concurrent.futures.Future`, but not thread-safe (and
therefore faster for use with single-threaded event loops.
In addition to ``exception`` and ``set_exception``, methods ``exc_info``
and ``set_exc_info`` are supported to capture tracebacks in Python 2.
The traceback is automatically available in Python 3, but in the
Python 2 futures backport this information is discarded.
This functionality was previously available in a separate class
``TracebackFuture``, which is now a deprecated alias for this class.
self._done = False
self._result = None
self._exception = None
self._exc_info = None
self._callbacks = 
return not self._done
TracebackFuture = Future
if futures is None:
FUTURES = Future
FUTURES = (futures.Future, Future)
return isinstance(x, FUTURES)
def submit(self, fn, *args, **kwargs):
future = TracebackFuture()
def shutdown(self, wait=True):
dummy_executor = DummyExecutor()
[docs] def done(self):
[docs] def result(self, timeout=None):
if self._result is not None:
if self._exc_info is not None:
elif self._exception is not None:
[docs] def exception(self, timeout=None):
if self._exception is not None:
[docs] def add_done_callback(self, fn):
def set_result(self, result):
self._result = result
def set_exception(self, exception):
self._exception = exception
[docs] def set_exc_info(self, exc_info):
"""Traceback-aware replacement for
self._exc_info = exc_info
if not self._done:
raise Exception("DummyFuture does not support blocking for results")
self._done = True
for cb in self._callbacks:
# TODO: error handling
self._callbacks = None
"""Decorator to run a synchronous method asynchronously on an executor.
The decorated method may be called with a ``callback`` keyword
argument and returns a future.
This decorator should be used only on methods of objects with attributes
``executor`` and ``io_loop``.
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = self.executor.submit(fn, self, *args, **kwargs)
lambda future: callback(future.result()))
_NO_RESULT = object()
"""Decorator to make a function that returns via callback return a
The wrapped function should take a ``callback`` keyword argument
and invoke it with one argument when it has finished. To signal failure,
the function can simply raise an exception (which will be
captured by the `.StackContext` and passed along to the ``Future``).
From the caller's perspective, the callback argument is optional.
If one is given, it will be invoked when the function is complete
with `Future.result()` as an argument. If the function fails, the
callback will not be run and an exception will be raised into the
If no callback is given, the caller should use the ``Future`` to
wait for the function to complete (perhaps by yielding it in a
`.gen.engine` function, or passing it to `.IOLoop.add_future`).
def future_func(arg1, arg2, callback):
# Do stuff (possibly asynchronous)
yield future_func(arg1, arg2)
Note that ``@return_future`` and ``@gen.engine`` can be applied to the
same function, provided ``@return_future`` appears first. However,
consider using ``@gen.coroutine`` instead of this combination.
replacer = ArgReplacer(f, 'callback')
def wrapper(*args, **kwargs):
future = TracebackFuture()
callback, args, kwargs = replacer.replace(
lambda value=_NO_RESULT: future.set_result(value),
def handle_error(typ, value, tb):
future.set_exc_info((typ, value, tb))
exc_info = None
result = f(*args, **kwargs)
if result is not None:
"@return_future should not be used with functions "
"that return values")
exc_info = sys.exc_info()
if exc_info is not None:
# If the initial synchronous part of f() raised an exception,
# go ahead and raise it to the caller directly without waiting
# for them to inspect the Future.
# If the caller passed in a callback, schedule it to be called
# when the future resolves. It is important that this happens
# just before we return the future, or else we risk confusing
# stack contexts with multiple exceptions (one here with the
# immediate exception, and again when the future resolves and
# the callback triggers its exception by calling future.result()).
if callback is not None:
result = future.result()
if result is _NO_RESULT:
[docs]def chain_future(a, b):
"""Chain two futures together so that when one completes, so does the other.
The result (success or failure) of ``a`` will be copied to ``b``.
assert future is a
if (isinstance(a, TracebackFuture) and isinstance(b, TracebackFuture)
and a.exc_info() is not None):
elif a.exception() is not None: