Source code for tornado.httputil

#!/usr/bin/env python
# Copyright 2009 Facebook
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""HTTP utility code shared by clients and servers."""

from __future__ import absolute_import, division, print_function, with_statement

import datetime
import numbers
import re
import time

from tornado.escape import native_str, parse_qs_bytes, utf8
from tornado.log import gen_log
from tornado.util import ObjectDict

    from httplib import responses  # py2
except ImportError:
    from http.client import responses  # py3

# responses is unused in this file, but we re-export it to other files.
# Reference it so pyflakes doesn't complain.

    from urllib import urlencode  # py2
except ImportError:
    from urllib.parse import urlencode  # py3

[docs]class HTTPHeaders(dict): """A dictionary that maintains ``Http-Header-Case`` for all keys. Supports multiple values per key via a pair of new methods, `add()` and `get_list()`. The regular dictionary interface returns a single value per key, with multiple values joined by a comma. >>> h = HTTPHeaders({"content-type": "text/html"}) >>> list(h.keys()) ['Content-Type'] >>> h["Content-Type"] 'text/html' >>> h.add("Set-Cookie", "A=B") >>> h.add("Set-Cookie", "C=D") >>> h["set-cookie"] 'A=B,C=D' >>> h.get_list("set-cookie") ['A=B', 'C=D'] >>> for (k,v) in sorted(h.get_all()): ... print('%s: %s' % (k,v)) ... Content-Type: text/html Set-Cookie: A=B Set-Cookie: C=D """ def __init__(self, *args, **kwargs): # Don't pass args or kwargs to dict.__init__, as it will bypass # our __setitem__ dict.__init__(self) self._as_list = {} self._last_key = None if (len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], HTTPHeaders)): # Copy constructor for k, v in args[0].get_all(): self.add(k, v) else: # Dict-style initialization self.update(*args, **kwargs) # new public methods
[docs] def add(self, name, value): """Adds a new value for the given key.""" norm_name = HTTPHeaders._normalize_name(name) self._last_key = norm_name if norm_name in self: # bypass our override of __setitem__ since it modifies _as_list dict.__setitem__(self, norm_name, native_str(self[norm_name]) + ',' + native_str(value)) self._as_list[norm_name].append(value) else: self[norm_name] = value
[docs] def get_list(self, name): """Returns all values for the given header as a list.""" norm_name = HTTPHeaders._normalize_name(name) return self._as_list.get(norm_name, [])
[docs] def get_all(self): """Returns an iterable of all (name, value) pairs. If a header has multiple values, multiple pairs will be returned with the same name. """ for name, values in self._as_list.items(): for value in values: yield (name, value)
[docs] def parse_line(self, line): """Updates the dictionary with a single header line. >>> h = HTTPHeaders() >>> h.parse_line("Content-Type: text/html") >>> h.get('content-type') 'text/html' """ if line[0].isspace(): # continuation of a multi-line header new_part = ' ' + line.lstrip() self._as_list[self._last_key][-1] += new_part dict.__setitem__(self, self._last_key, self[self._last_key] + new_part) else: name, value = line.split(":", 1) self.add(name, value.strip())
[docs] def parse(cls, headers): """Returns a dictionary from HTTP header text. >>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n") >>> sorted(h.items()) [('Content-Length', '42'), ('Content-Type', 'text/html')] """ h = cls() for line in headers.splitlines(): if line: h.parse_line(line) return h # dict implementation overrides
def __setitem__(self, name, value): norm_name = HTTPHeaders._normalize_name(name) dict.__setitem__(self, norm_name, value) self._as_list[norm_name] = [value] def __getitem__(self, name): return dict.__getitem__(self, HTTPHeaders._normalize_name(name)) def __delitem__(self, name): norm_name = HTTPHeaders._normalize_name(name) dict.__delitem__(self, norm_name) del self._as_list[norm_name] def __contains__(self, name): norm_name = HTTPHeaders._normalize_name(name) return dict.__contains__(self, norm_name) def get(self, name, default=None): return dict.get(self, HTTPHeaders._normalize_name(name), default) def update(self, *args, **kwargs): # dict.update bypasses our __setitem__ for k, v in dict(*args, **kwargs).items(): self[k] = v def copy(self): # default implementation returns dict(self), not the subclass return HTTPHeaders(self) _NORMALIZED_HEADER_RE = re.compile(r'^[A-Z0-9][a-z0-9]*(-[A-Z0-9][a-z0-9]*)*$') _normalized_headers = {} @staticmethod def _normalize_name(name): """Converts a name to Http-Header-Case. >>> HTTPHeaders._normalize_name("coNtent-TYPE") 'Content-Type' """ try: return HTTPHeaders._normalized_headers[name] except KeyError: if HTTPHeaders._NORMALIZED_HEADER_RE.match(name): normalized = name else: normalized = "-".join([w.capitalize() for w in name.split("-")]) HTTPHeaders._normalized_headers[name] = normalized return normalized
[docs]def url_concat(url, args): """Concatenate url and argument dictionary regardless of whether url has existing query parameters. >>> url_concat("", dict(c="d")) '' """ if not args: return url if url[-1] not in ('?', '&'): url += '&' if ('?' in url) else '?' return url + urlencode(args)
[docs]class HTTPFile(ObjectDict): """Represents a file uploaded via a form. For backwards compatibility, its instance attributes are also accessible as dictionary keys. * ``filename`` * ``body`` * ``content_type`` """ pass
[docs]def parse_body_arguments(content_type, body, arguments, files): """Parses a form request body. Supports ``application/x-www-form-urlencoded`` and ``multipart/form-data``. The ``content_type`` parameter should be a string and ``body`` should be a byte string. The ``arguments`` and ``files`` parameters are dictionaries that will be updated with the parsed contents. """ if content_type.startswith("application/x-www-form-urlencoded"): uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True) for name, values in uri_arguments.items(): if values: arguments.setdefault(name, []).extend(values) elif content_type.startswith("multipart/form-data"): fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: parse_multipart_form_data(utf8(v), body, arguments, files) break else: gen_log.warning("Invalid multipart/form-data")
[docs]def parse_multipart_form_data(boundary, data, arguments, files): """Parses a ``multipart/form-data`` body. The ``boundary`` and ``data`` parameters are both byte strings. The dictionaries given in the arguments and files parameters will be updated with the contents of the body. """ # The standard allows for the boundary to be quoted in the header, # although it's rare (it happens at least for google app engine # xmpp). I think we're also supposed to handle backslash-escapes # here but I'll save that until we see a client that uses them # in the wild. if boundary.startswith(b'"') and boundary.endswith(b'"'): boundary = boundary[1:-1] final_boundary_index = data.rfind(b"--" + boundary + b"--") if final_boundary_index == -1: gen_log.warning("Invalid multipart/form-data: no final boundary") return parts = data[:final_boundary_index].split(b"--" + boundary + b"\r\n") for part in parts: if not part: continue eoh = part.find(b"\r\n\r\n") if eoh == -1: gen_log.warning("multipart/form-data missing headers") continue headers = HTTPHeaders.parse(part[:eoh].decode("utf-8")) disp_header = headers.get("Content-Disposition", "") disposition, disp_params = _parse_header(disp_header) if disposition != "form-data" or not part.endswith(b"\r\n"): gen_log.warning("Invalid multipart/form-data") continue value = part[eoh + 4:-2] if not disp_params.get("name"): gen_log.warning("multipart/form-data value missing name") continue name = disp_params["name"] if disp_params.get("filename"): ctype = headers.get("Content-Type", "application/unknown") files.setdefault(name, []).append(HTTPFile( filename=disp_params["filename"], body=value, content_type=ctype)) else: arguments.setdefault(name, []).append(value)
[docs]def format_timestamp(ts): """Formats a timestamp in the format used by HTTP. The argument may be a numeric timestamp as returned by `time.time`, a time tuple as returned by `time.gmtime`, or a `datetime.datetime` object. >>> format_timestamp(1359312200) 'Sun, 27 Jan 2013 18:43:20 GMT' """ if isinstance(ts, (tuple, time.struct_time)): pass elif isinstance(ts, datetime.datetime): ts = ts.utctimetuple() elif isinstance(ts, numbers.Real): ts = time.gmtime(ts) else: raise TypeError("unknown timestamp type: %r" % ts) return time.strftime("%a, %d %b %Y %H:%M:%S GMT", ts) # _parseparam and _parse_header are copied and modified from python2.7's # The original 2.7 version of this code did not correctly support some # combinations of semicolons and double quotes.
def _parseparam(s): while s[:1] == ';': s = s[1:] end = s.find(';') while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2: end = s.find(';', end + 1) if end < 0: end = len(s) f = s[:end] yield f.strip() s = s[end:] def _parse_header(line): """Parse a Content-type like header. Return the main content-type and a dictionary of options. """ parts = _parseparam(';' + line) key = next(parts) pdict = {} for p in parts: i = p.find('=') if i >= 0: name = p[:i].strip().lower() value = p[i + 1:].strip() if len(value) >= 2 and value[0] == value[-1] == '"': value = value[1:-1] value = value.replace('\\\\', '\\').replace('\\"', '"') pdict[name] = value return key, pdict def doctests(): import doctest return doctest.DocTestSuite()

