Source code for tornado.platform.asyncio

"""Bridges between the `asyncio` module and Tornado IOLoop.

.. versionadded:: 3.2

This module integrates Tornado with the ``asyncio`` module introduced
in Python 3.4. This makes it possible to combine the two libraries on
the same event loop.

.. deprecated:: 5.0

   While the code in this module is still used, it is now enabled
   automatically when `asyncio` is available, so applications should
   no longer need to refer to this module directly.

.. note::

   Tornado requires the `~asyncio.AbstractEventLoop.add_reader` family of
   methods, so it is not compatible with the `~asyncio.ProactorEventLoop` on
   Windows. Use the `~asyncio.SelectorEventLoop` instead.
"""

from __future__ import absolute_import, division, print_function
import functools

from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop
from tornado import stack_context

import asyncio


class BaseAsyncIOLoop(IOLoop):
    def initialize(self, asyncio_loop, **kwargs):
        self.asyncio_loop = asyncio_loop
        # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
        self.handlers = {}
        # Set of fds listening for reads/writes
        self.readers = set()
        self.writers = set()
        self.closing = False
        # If an asyncio loop was closed through an asyncio interface
        # instead of IOLoop.close(), we'd never hear about it and may
        # have left a dangling reference in our map. In case an
        # application (or, more likely, a test suite) creates and
        # destroys a lot of event loops in this way, check here to
        # ensure that we don't have a lot of dead loops building up in
        # the map.
        #
        # TODO(bdarnell): consider making self.asyncio_loop a weakref
        # for AsyncIOMainLoop and make _ioloop_for_asyncio a
        # WeakKeyDictionary.
        for loop in list(IOLoop._ioloop_for_asyncio):
            if loop.is_closed():
                del IOLoop._ioloop_for_asyncio[loop]
        IOLoop._ioloop_for_asyncio[asyncio_loop] = self
        super(BaseAsyncIOLoop, self).initialize(**kwargs)

    def close(self, all_fds=False):
        self.closing = True
        for fd in list(self.handlers):
            fileobj, handler_func = self.handlers[fd]
            self.remove_handler(fd)
            if all_fds:
                self.close_fd(fileobj)
        self.asyncio_loop.close()
        del IOLoop._ioloop_for_asyncio[self.asyncio_loop]

    def add_handler(self, fd, handler, events):
        fd, fileobj = self.split_fd(fd)
        if fd in self.handlers:
            raise ValueError("fd %s added twice" % fd)
        self.handlers[fd] = (fileobj, stack_context.wrap(handler))
        if events & IOLoop.READ:
            self.asyncio_loop.add_reader(
                fd, self._handle_events, fd, IOLoop.READ)
            self.readers.add(fd)
        if events & IOLoop.WRITE:
            self.asyncio_loop.add_writer(
                fd, self._handle_events, fd, IOLoop.WRITE)
            self.writers.add(fd)

    def update_handler(self, fd, events):
        fd, fileobj = self.split_fd(fd)
        if events & IOLoop.READ:
            if fd not in self.readers:
                self.asyncio_loop.add_reader(
                    fd, self._handle_events, fd, IOLoop.READ)
                self.readers.add(fd)
        else:
            if fd in self.readers:
                self.asyncio_loop.remove_reader(fd)
                self.readers.remove(fd)
        if events & IOLoop.WRITE:
            if fd not in self.writers:
                self.asyncio_loop.add_writer(
                    fd, self._handle_events, fd, IOLoop.WRITE)
                self.writers.add(fd)
        else:
            if fd in self.writers:
                self.asyncio_loop.remove_writer(fd)
                self.writers.remove(fd)

    def remove_handler(self, fd):
        fd, fileobj = self.split_fd(fd)
        if fd not in self.handlers:
            return
        if fd in self.readers:
            self.asyncio_loop.remove_reader(fd)
            self.readers.remove(fd)
        if fd in self.writers:
            self.asyncio_loop.remove_writer(fd)
            self.writers.remove(fd)
        del self.handlers[fd]

    def _handle_events(self, fd, events):
        fileobj, handler_func = self.handlers[fd]
        handler_func(fileobj, events)

    def start(self):
        try:
            old_loop = asyncio.get_event_loop()
        except (RuntimeError, AssertionError):
            old_loop = None
        try:
            self._setup_logging()
            asyncio.set_event_loop(self.asyncio_loop)
            self.asyncio_loop.run_forever()
        finally:
            asyncio.set_event_loop(old_loop)

    def stop(self):
        self.asyncio_loop.stop()

    def call_at(self, when, callback, *args, **kwargs):
        # asyncio.call_at supports *args but not **kwargs, so bind them here.
        # We do not synchronize self.time and asyncio_loop.time, so
        # convert from absolute to relative.
        return self.asyncio_loop.call_later(
            max(0, when - self.time()), self._run_callback,
            functools.partial(stack_context.wrap(callback), *args, **kwargs))

    def remove_timeout(self, timeout):
        timeout.cancel()

    def add_callback(self, callback, *args, **kwargs):
        try:
            self.asyncio_loop.call_soon_threadsafe(
                self._run_callback,
                functools.partial(stack_context.wrap(callback), *args, **kwargs))
        except RuntimeError:
            # "Event loop is closed". Swallow the exception for
            # consistency with PollIOLoop (and logical consistency
            # with the fact that we can't guarantee that an
            # add_callback that completes without error will
            # eventually execute).
            pass

    add_callback_from_signal = add_callback

    def run_in_executor(self, executor, func, *args):
        return self.asyncio_loop.run_in_executor(executor, func, *args)

    def set_default_executor(self, executor):
        return self.asyncio_loop.set_default_executor(executor)


[docs]class AsyncIOMainLoop(BaseAsyncIOLoop): """``AsyncIOMainLoop`` creates an `.IOLoop` that corresponds to the current ``asyncio`` event loop (i.e. the one returned by ``asyncio.get_event_loop()``). .. deprecated:: 5.0 Now used automatically when appropriate; it is no longer necessary to refer to this class directly. .. versionchanged:: 5.0 Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop. """ def initialize(self, **kwargs): super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs) def make_current(self): # AsyncIOMainLoop already refers to the current asyncio loop so # nothing to do here. pass
[docs]class AsyncIOLoop(BaseAsyncIOLoop): """``AsyncIOLoop`` is an `.IOLoop` that runs on an ``asyncio`` event loop. This class follows the usual Tornado semantics for creating new ``IOLoops``; these loops are not necessarily related to the ``asyncio`` default event loop. Each ``AsyncIOLoop`` creates a new ``asyncio.EventLoop``; this object can be accessed with the ``asyncio_loop`` attribute. .. versionchanged:: 5.0 When an ``AsyncIOLoop`` becomes the current `.IOLoop`, it also sets the current `asyncio` event loop. .. deprecated:: 5.0 Now used automatically when appropriate; it is no longer necessary to refer to this class directly. """ def initialize(self, **kwargs): self.is_current = False loop = asyncio.new_event_loop() try: super(AsyncIOLoop, self).initialize(loop, **kwargs) except Exception: # If initialize() does not succeed (taking ownership of the loop), # we have to close it. loop.close() raise def close(self, all_fds=False): if self.is_current: self.clear_current() super(AsyncIOLoop, self).close(all_fds=all_fds) def make_current(self): if not self.is_current: try: self.old_asyncio = asyncio.get_event_loop() except (RuntimeError, AssertionError): self.old_asyncio = None self.is_current = True asyncio.set_event_loop(self.asyncio_loop) def _clear_current_hook(self): if self.is_current: asyncio.set_event_loop(self.old_asyncio) self.is_current = False
[docs]def to_tornado_future(asyncio_future): """Convert an `asyncio.Future` to a `tornado.concurrent.Future`. .. versionadded:: 4.1 .. deprecated:: 5.0 Tornado ``Futures`` have been merged with `asyncio.Future`, so this method is now a no-op. """ return asyncio_future
[docs]def to_asyncio_future(tornado_future): """Convert a Tornado yieldable object to an `asyncio.Future`. .. versionadded:: 4.1 .. versionchanged:: 4.3 Now accepts any yieldable object, not just `tornado.concurrent.Future`. .. deprecated:: 5.0 Tornado ``Futures`` have been merged with `asyncio.Future`, so this method is now equivalent to `tornado.gen.convert_yielded`. """ return convert_yielded(tornado_future)
[docs]class AnyThreadEventLoopPolicy(asyncio.DefaultEventLoopPolicy): """Event loop policy that allows loop creation on any thread. The default `asyncio` event loop policy only automatically creates event loops in the main threads. Other threads must create event loops explicitly or `asyncio.get_event_loop` (and therefore `.IOLoop.current`) will fail. Installing this policy allows event loops to be created automatically on any thread, matching the behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2). Usage:: asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) .. versionadded:: 5.0 """ def get_event_loop(self): try: return super().get_event_loop() except (RuntimeError, AssertionError): # This was an AssertionError in python 3.4.2 (which ships with debian jessie) # and changed to a RuntimeError in 3.4.3. # "There is no current event loop in thread %r" loop = self.new_event_loop() self.set_event_loop(loop) return loop