Source code for tornado.netutil

#!/usr/bin/env python
# Copyright 2011 Facebook
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Miscellaneous network utility code."""

from __future__ import absolute_import, division, with_statement

import errno
import logging
import os
import socket
import stat

from tornado import process
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
from import set_close_exec

    import ssl  # Python 2.6+
except ImportError:
    ssl = None

[docs]class TCPServer(object): r"""A non-blocking, single-threaded TCP server. To use `TCPServer`, define a subclass which overrides the `handle_stream` method. `TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL. To make this server serve SSL traffic, send the ssl_options dictionary argument with the arguments required for the `ssl.wrap_socket` method, including "certfile" and "keyfile":: TCPServer(ssl_options={ "certfile": os.path.join(data_dir, "mydomain.crt"), "keyfile": os.path.join(data_dir, "mydomain.key"), }) `TCPServer` initialization follows one of three patterns: 1. `listen`: simple single-process:: server = TCPServer() server.listen(8888) IOLoop.instance().start() 2. `bind`/`start`: simple multi-process:: server = TCPServer() server.bind(8888) server.start(0) # Forks multiple sub-processes IOLoop.instance().start() When using this interface, an `IOLoop` must *not* be passed to the `TCPServer` constructor. `start` will always start the server on the default singleton `IOLoop`. 3. `add_sockets`: advanced multi-process:: sockets = bind_sockets(8888) tornado.process.fork_processes(0) server = TCPServer() server.add_sockets(sockets) IOLoop.instance().start() The `add_sockets` interface is more complicated, but it can be used with `tornado.process.fork_processes` to give you more flexibility in when the fork happens. `add_sockets` can also be used in single-process servers if you want to create your listening sockets in some way other than `bind_sockets`. """ def __init__(self, io_loop=None, ssl_options=None): self.io_loop = io_loop self.ssl_options = ssl_options self._sockets = {} # fd -> socket object self._pending_sockets = [] self._started = False # Verify the SSL options. Otherwise we don't get errors until clients # connect. This doesn't verify that the keys are legitimate, but # the SSL module doesn't do that until there is a connected socket # which seems like too much work if self.ssl_options is not None: # Only certfile is required: it can contain both keys if 'certfile' not in self.ssl_options: raise KeyError('missing key "certfile" in ssl_options') if not os.path.exists(self.ssl_options['certfile']): raise ValueError('certfile "%s" does not exist' % self.ssl_options['certfile']) if ('keyfile' in self.ssl_options and not os.path.exists(self.ssl_options['keyfile'])): raise ValueError('keyfile "%s" does not exist' % self.ssl_options['keyfile'])
[docs] def listen(self, port, address=""): """Starts accepting connections on the given port. This method may be called more than once to listen on multiple ports. `listen` takes effect immediately; it is not necessary to call `TCPServer.start` afterwards. It is, however, necessary to start the `IOLoop`. """ sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
[docs] def add_sockets(self, sockets): """Makes this server start accepting connections on the given sockets. The ``sockets`` parameter is a list of socket objects such as those returned by `bind_sockets`. `add_sockets` is typically used in combination with that method and `tornado.process.fork_processes` to provide greater control over the initialization of a multi-process server. """ if self.io_loop is None: self.io_loop = IOLoop.instance() for sock in sockets: self._sockets[sock.fileno()] = sock add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)
[docs] def add_socket(self, socket): """Singular version of `add_sockets`. Takes a single socket object.""" self.add_sockets([socket])
[docs] def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): """Binds this server to the given port on the given address. To start the server, call `start`. If you want to run this server in a single process, you can call `listen` as a shortcut to the sequence of `bind` and `start` calls. Address may be either an IP address or hostname. If it's a hostname, the server will listen on all IP addresses associated with the name. Address may be an empty string or None to listen on all available interfaces. Family may be set to either ``socket.AF_INET`` or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise both will be used if available. The ``backlog`` argument has the same meaning as for `socket.listen`. This method may be called multiple times prior to `start` to listen on multiple ports or interfaces. """ sockets = bind_sockets(port, address=address, family=family, backlog=backlog) if self._started: self.add_sockets(sockets) else: self._pending_sockets.extend(sockets)
[docs] def start(self, num_processes=1): """Starts this server in the IOLoop. By default, we run the server in this process and do not fork any additional child process. If num_processes is ``None`` or <= 0, we detect the number of cores available on this machine and fork that number of child processes. If num_processes is given and > 1, we fork that specific number of sub-processes. Since we use processes and not threads, there is no shared memory between any server code. Note that multiple processes are not compatible with the autoreload module (or the ``debug=True`` option to `tornado.web.Application`). When using multiple processes, no IOLoops can be created or referenced until after the call to ``TCPServer.start(n)``. """ assert not self._started self._started = True if num_processes != 1: process.fork_processes(num_processes) sockets = self._pending_sockets self._pending_sockets = [] self.add_sockets(sockets)
[docs] def stop(self): """Stops listening for new connections. Requests currently in progress may still continue after the server is stopped. """ for fd, sock in self._sockets.iteritems(): self.io_loop.remove_handler(fd) sock.close()
[docs] def handle_stream(self, stream, address): """Override to handle a new `IOStream` from an incoming connection.""" raise NotImplementedError()
def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop) else: stream = IOStream(connection, io_loop=self.io_loop) self.handle_stream(stream, address) except Exception: logging.error("Error in connection callback", exc_info=True)
[docs]def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128): """Creates listening sockets bound to the given port and address. Returns a list of socket objects (multiple sockets are returned if the given address maps to multiple IP addresses, which is most common for mixed IPv4 and IPv6 use). Address may be either an IP address or hostname. If it's a hostname, the server will listen on all IP addresses associated with the name. Address may be an empty string or None to listen on all available interfaces. Family may be set to either socket.AF_INET or socket.AF_INET6 to restrict to ipv4 or ipv6 addresses, otherwise both will be used if available. The ``backlog`` argument has the same meaning as for ``socket.listen()``. """ sockets = [] if address == "": address = None flags = socket.AI_PASSIVE if hasattr(socket, "AI_ADDRCONFIG"): # AI_ADDRCONFIG ensures that we only try to bind on ipv6 # if the system is configured for it, but the flag doesn't # exist on some platforms (specifically WinXP, although # newer versions of windows have it) flags |= socket.AI_ADDRCONFIG for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)): af, socktype, proto, canonname, sockaddr = res sock = socket.socket(af, socktype, proto) set_close_exec(sock.fileno()) if != 'nt': sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) sock.bind(sockaddr) sock.listen(backlog) sockets.append(sock) return sockets
if hasattr(socket, 'AF_UNIX'):
[docs] def bind_unix_socket(file, mode=0600, backlog=128): """Creates a listening unix socket. If a socket with the given name already exists, it will be deleted. If any other file with that name exists, an exception will be raised. Returns a socket object (not a list of socket objects like `bind_sockets`) """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) set_close_exec(sock.fileno()) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) try: st = os.stat(file) except OSError, err: if err.errno != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(file) else: raise ValueError("File %s exists and is not a socket", file) sock.bind(file) os.chmod(file, mode) sock.listen(backlog) return sock
[docs]def add_accept_handler(sock, callback, io_loop=None): """Adds an ``IOLoop`` event handler to accept new connections on ``sock``. When a connection is accepted, ``callback(connection, address)`` will be run (``connection`` is a socket object, and ``address`` is the address of the other end of the connection). Note that this signature is different from the ``callback(fd, events)`` signature used for ``IOLoop`` handlers. """ if io_loop is None: io_loop = IOLoop.instance() def accept_handler(fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise callback(connection, address) io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)