diff --git a/libs/tornado/__init__.py b/libs/tornado/__init__.py index 0e39f84..6f4f47d 100755 --- a/libs/tornado/__init__.py +++ b/libs/tornado/__init__.py @@ -25,5 +25,5 @@ from __future__ import absolute_import, division, print_function, with_statement # is zero for an official release, positive for a development branch, # or negative for a release candidate or beta (after the base version # number has been incremented) -version = "4.1.dev1" -version_info = (4, 1, 0, -100) +version = "4.1" +version_info = (4, 1, 0, 0) diff --git a/libs/tornado/autoreload.py b/libs/tornado/autoreload.py index 3982579..a548cf0 100755 --- a/libs/tornado/autoreload.py +++ b/libs/tornado/autoreload.py @@ -108,7 +108,11 @@ _io_loops = weakref.WeakKeyDictionary() def start(io_loop=None, check_time=500): - """Begins watching source files for changes using the given `.IOLoop`. """ + """Begins watching source files for changes. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. + """ io_loop = io_loop or ioloop.IOLoop.current() if io_loop in _io_loops: return diff --git a/libs/tornado/concurrent.py b/libs/tornado/concurrent.py index 6bab5d2..acfbcd8 100755 --- a/libs/tornado/concurrent.py +++ b/libs/tornado/concurrent.py @@ -25,11 +25,13 @@ module. from __future__ import absolute_import, division, print_function, with_statement import functools +import platform +import traceback import sys +from tornado.log import app_log from tornado.stack_context import ExceptionStackContext, wrap from tornado.util import raise_exc_info, ArgReplacer -from tornado.log import app_log try: from concurrent import futures @@ -37,9 +39,88 @@ except ImportError: futures = None +# Can the garbage collector handle cycles that include __del__ methods? +# This is true in cpython beginning with version 3.4 (PEP 442). +_GC_CYCLE_FINALIZERS = (platform.python_implementation() == 'CPython' and + sys.version_info >= (3, 4)) + class ReturnValueIgnoredError(Exception): pass +# This class and associated code in the future object is derived +# from the Trollius project, a backport of asyncio to Python 2.x - 3.x + +class _TracebackLogger(object): + """Helper to log a traceback upon destruction if not cleared. + + This solves a nasty problem with Futures and Tasks that have an + exception set: if nobody asks for the exception, the exception is + never logged. This violates the Zen of Python: 'Errors should + never pass silently. Unless explicitly silenced.' + + However, we don't want to log the exception as soon as + set_exception() is called: if the calling code is written + properly, it will get the exception and handle it properly. But + we *do* want to log it if result() or exception() was never called + -- otherwise developers waste a lot of time wondering why their + buggy code fails silently. + + An earlier attempt added a __del__() method to the Future class + itself, but this backfired because the presence of __del__() + prevents garbage collection from breaking cycles. A way out of + this catch-22 is to avoid having a __del__() method on the Future + class itself, but instead to have a reference to a helper object + with a __del__() method that logs the traceback, where we ensure + that the helper object doesn't participate in cycles, and only the + Future has a reference to it. + + The helper object is added when set_exception() is called. When + the Future is collected, and the helper is present, the helper + object is also collected, and its __del__() method will log the + traceback. When the Future's result() or exception() method is + called (and a helper object is present), it removes the the helper + object, after calling its clear() method to prevent it from + logging. + + One downside is that we do a fair amount of work to extract the + traceback from the exception, even when it is never logged. It + would seem cheaper to just store the exception object, but that + references the traceback, which references stack frames, which may + reference the Future, which references the _TracebackLogger, and + then the _TracebackLogger would be included in a cycle, which is + what we're trying to avoid! As an optimization, we don't + immediately format the exception; we only do the work when + activate() is called, which call is delayed until after all the + Future's callbacks have run. Since usually a Future has at least + one callback (typically set by 'yield From') and usually that + callback extracts the callback, thereby removing the need to + format the exception. + + PS. I don't claim credit for this solution. I first heard of it + in a discussion about closing files when they are collected. + """ + + __slots__ = ('exc_info', 'formatted_tb') + + def __init__(self, exc_info): + self.exc_info = exc_info + self.formatted_tb = None + + def activate(self): + exc_info = self.exc_info + if exc_info is not None: + self.exc_info = None + self.formatted_tb = traceback.format_exception(*exc_info) + + def clear(self): + self.exc_info = None + self.formatted_tb = None + + def __del__(self): + if self.formatted_tb: + app_log.error('Future exception was never retrieved: %s', + ''.join(self.formatted_tb).rstrip()) + class Future(object): """Placeholder for an asynchronous result. @@ -68,12 +149,23 @@ class Future(object): if that package was available and fall back to the thread-unsafe implementation if it was not. + .. versionchanged:: 4.1 + If a `.Future` contains an error but that error is never observed + (by calling ``result()``, ``exception()``, or ``exc_info()``), + a stack trace will be logged when the `.Future` is garbage collected. + This normally indicates an error in the application, but in cases + where it results in undesired logging it may be necessary to + suppress the logging by ensuring that the exception is observed: + ``f.add_done_callback(lambda f: f.exception())``. """ def __init__(self): self._done = False self._result = None - self._exception = None self._exc_info = None + + self._log_traceback = False # Used for Python >= 3.4 + self._tb_logger = None # Used for Python <= 3.3 + self._callbacks = [] def cancel(self): @@ -100,16 +192,21 @@ class Future(object): """Returns True if the future has finished running.""" return self._done + def _clear_tb_log(self): + self._log_traceback = False + if self._tb_logger is not None: + self._tb_logger.clear() + self._tb_logger = None + def result(self, timeout=None): """If the operation succeeded, return its result. If it failed, re-raise its exception. """ + self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) - elif self._exception is not None: - raise self._exception self._check_done() return self._result @@ -117,8 +214,9 @@ class Future(object): """If the operation raised an exception, return the `Exception` object. Otherwise returns None. """ - if self._exception is not None: - return self._exception + self._clear_tb_log() + if self._exc_info is not None: + return self._exc_info[1] else: self._check_done() return None @@ -147,14 +245,17 @@ class Future(object): def set_exception(self, exception): """Sets the exception of a ``Future.``""" - self._exception = exception - self._set_done() + self.set_exc_info( + (exception.__class__, + exception, + getattr(exception, '__traceback__', None))) def exc_info(self): """Returns a tuple in the same format as `sys.exc_info` or None. .. versionadded:: 4.0 """ + self._clear_tb_log() return self._exc_info def set_exc_info(self, exc_info): @@ -165,7 +266,18 @@ class Future(object): .. versionadded:: 4.0 """ self._exc_info = exc_info - self.set_exception(exc_info[1]) + self._log_traceback = True + if not _GC_CYCLE_FINALIZERS: + self._tb_logger = _TracebackLogger(exc_info) + + try: + self._set_done() + finally: + # Activate the logger after all callbacks have had a + # chance to call result() or exception(). + if self._log_traceback and self._tb_logger is not None: + self._tb_logger.activate() + self._exc_info = exc_info def _check_done(self): if not self._done: @@ -181,6 +293,21 @@ class Future(object): cb, self) self._callbacks = None + # On Python 3.3 or older, objects with a destructor part of a reference + # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to + # the PEP 442. + if _GC_CYCLE_FINALIZERS: + def __del__(self): + if not self._log_traceback: + # set_exception() was not called, or result() or exception() + # has consumed the exception + return + + tb = traceback.format_exception(*self._exc_info) + + app_log.error('Future %r exception was never retrieved: %s', + self, ''.join(tb).rstrip()) + TracebackFuture = Future if futures is None: @@ -293,7 +420,7 @@ def return_future(f): # If the initial synchronous part of f() raised an exception, # go ahead and raise it to the caller directly without waiting # for them to inspect the Future. - raise_exc_info(exc_info) + future.result() # If the caller passed in a callback, schedule it to be called # when the future resolves. It is important that this happens diff --git a/libs/tornado/curl_httpclient.py b/libs/tornado/curl_httpclient.py index 68047cc..ebbe0e8 100755 --- a/libs/tornado/curl_httpclient.py +++ b/libs/tornado/curl_httpclient.py @@ -28,12 +28,12 @@ from io import BytesIO from tornado import httputil from tornado import ioloop -from tornado.log import gen_log from tornado import stack_context from tornado.escape import utf8, native_str from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main +curl_log = logging.getLogger('tornado.curl_httpclient') class CurlAsyncHTTPClient(AsyncHTTPClient): def initialize(self, io_loop, max_clients=10, defaults=None): @@ -257,7 +257,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): def _curl_create(self): curl = pycurl.Curl() - if gen_log.isEnabledFor(logging.DEBUG): + if curl_log.isEnabledFor(logging.DEBUG): curl.setopt(pycurl.VERBOSE, 1) curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug) return curl @@ -403,11 +403,11 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): raise ValueError("Unsupported auth_mode %s" % request.auth_mode) curl.setopt(pycurl.USERPWD, native_str(userpwd)) - gen_log.debug("%s %s (username: %r)", request.method, request.url, + curl_log.debug("%s %s (username: %r)", request.method, request.url, request.auth_username) else: curl.unsetopt(pycurl.USERPWD) - gen_log.debug("%s %s", request.method, request.url) + curl_log.debug("%s %s", request.method, request.url) if request.client_cert is not None: curl.setopt(pycurl.SSLCERT, request.client_cert) @@ -448,12 +448,12 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): def _curl_debug(self, debug_type, debug_msg): debug_types = ('I', '<', '>', '<', '>') if debug_type == 0: - gen_log.debug('%s', debug_msg.strip()) + curl_log.debug('%s', debug_msg.strip()) elif debug_type in (1, 2): for line in debug_msg.splitlines(): - gen_log.debug('%s %s', debug_types[debug_type], line) + curl_log.debug('%s %s', debug_types[debug_type], line) elif debug_type == 4: - gen_log.debug('%s %r', debug_types[debug_type], debug_msg) + curl_log.debug('%s %r', debug_types[debug_type], debug_msg) class CurlError(HTTPError): diff --git a/libs/tornado/gen.py b/libs/tornado/gen.py index 2fc9b0c..86fe2f1 100755 --- a/libs/tornado/gen.py +++ b/libs/tornado/gen.py @@ -43,8 +43,21 @@ be returned when they are all finished:: response3 = response_dict['response3'] response4 = response_dict['response4'] +If the `~functools.singledispatch` library is available (standard in +Python 3.4, available via the `singledispatch +`_ package on older +versions), additional types of objects may be yielded. Tornado includes +support for ``asyncio.Future`` and Twisted's ``Deferred`` class when +``tornado.platform.asyncio`` and ``tornado.platform.twisted`` are imported. +See the `convert_yielded` function to extend this mechanism. + .. versionchanged:: 3.2 Dict support added. + +.. versionchanged:: 4.1 + Support added for yielding ``asyncio`` Futures and Twisted Deferreds + via ``singledispatch``. + """ from __future__ import absolute_import, division, print_function, with_statement @@ -53,11 +66,21 @@ import functools import itertools import sys import types +import weakref from tornado.concurrent import Future, TracebackFuture, is_future, chain_future from tornado.ioloop import IOLoop +from tornado.log import app_log from tornado import stack_context +try: + from functools import singledispatch # py34+ +except ImportError as e: + try: + from singledispatch import singledispatch # backport + except ImportError: + singledispatch = None + class KeyReuseError(Exception): pass @@ -240,6 +263,106 @@ class Return(Exception): super(Return, self).__init__() self.value = value +class WaitIterator(object): + """Provides an iterator to yield the results of futures as they finish. + + Yielding a set of futures like this: + + ``results = yield [future1, future2]`` + + pauses the coroutine until both ``future1`` and ``future2`` + return, and then restarts the coroutine with the results of both + futures. If either future is an exception, the expression will + raise that exception and all the results will be lost. + + If you need to get the result of each future as soon as possible, + or if you need the result of some futures even if others produce + errors, you can use ``WaitIterator``: + + :: + + wait_iterator = gen.WaitIterator(future1, future2) + while not wait_iterator.done(): + try: + result = yield wait_iterator.next() + except Exception as e: + print "Error {} from {}".format(e, wait_iterator.current_future) + else: + print "Result {} recieved from {} at {}".format( + result, wait_iterator.current_future, + wait_iterator.current_index) + + Because results are returned as soon as they are available the + output from the iterator *will not be in the same order as the + input arguments*. If you need to know which future produced the + current result, you can use the attributes + ``WaitIterator.current_future``, or ``WaitIterator.current_index`` + to get the index of the future from the input list. (if keyword + arguments were used in the construction of the `WaitIterator`, + ``current_index`` will use the corresponding keyword). + + .. versionadded:: 4.1 + """ + def __init__(self, *args, **kwargs): + if args and kwargs: + raise ValueError( + "You must provide args or kwargs, not both") + + if kwargs: + self._unfinished = dict((f, k) for (k, f) in kwargs.items()) + futures = list(kwargs.values()) + else: + self._unfinished = dict((f, i) for (i, f) in enumerate(args)) + futures = args + + self._finished = collections.deque() + self.current_index = self.current_future = None + self._running_future = None + + self_ref = weakref.ref(self) + for future in futures: + future.add_done_callback(functools.partial( + self._done_callback, self_ref)) + + def done(self): + """Returns True if this iterator has no more results.""" + if self._finished or self._unfinished: + return False + # Clear the 'current' values when iteration is done. + self.current_index = self.current_future = None + return True + + def next(self): + """Returns a `.Future` that will yield the next available result. + + Note that this `.Future` will not be the same object as any of + the inputs. + """ + self._running_future = TracebackFuture() + + if self._finished: + self._return_result(self._finished.popleft()) + + return self._running_future + + @staticmethod + def _done_callback(self_ref, done): + self = self_ref() + if self is not None: + if self._running_future and not self._running_future.done(): + self._return_result(done) + else: + self._finished.append(done) + + def _return_result(self, done): + """Called set the returned future's state that of the future + we yielded, and set the current future for the iterator. + """ + chain_future(done, self._running_future) + + self.current_future = done + self.current_index = self._unfinished.pop(done) + class YieldPoint(object): """Base class for objects that may be yielded from the generator. @@ -371,6 +494,11 @@ def Task(func, *args, **kwargs): class YieldFuture(YieldPoint): def __init__(self, future, io_loop=None): + """Adapts a `.Future` to the `YieldPoint` interface. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. + """ self.future = future self.io_loop = io_loop or IOLoop.current() @@ -504,7 +632,7 @@ def maybe_future(x): return fut -def with_timeout(timeout, future, io_loop=None): +def with_timeout(timeout, future, io_loop=None, quiet_exceptions=()): """Wraps a `.Future` in a timeout. Raises `TimeoutError` if the input future does not complete before @@ -512,9 +640,17 @@ def with_timeout(timeout, future, io_loop=None): `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time relative to `.IOLoop.time`) + If the wrapped `.Future` fails after it has timed out, the exception + will be logged unless it is of a type contained in ``quiet_exceptions`` + (which may be an exception type or a sequence of types). + Currently only supports Futures, not other `YieldPoint` classes. .. versionadded:: 4.0 + + .. versionchanged:: 4.1 + Added the ``quiet_exceptions`` argument and the logging of unhandled + exceptions. """ # TODO: allow yield points in addition to futures? # Tricky to do with stack_context semantics. @@ -528,9 +664,19 @@ def with_timeout(timeout, future, io_loop=None): chain_future(future, result) if io_loop is None: io_loop = IOLoop.current() + def error_callback(future): + try: + future.result() + except Exception as e: + if not isinstance(e, quiet_exceptions): + app_log.error("Exception in Future %r after timeout", + future, exc_info=True) + def timeout_callback(): + result.set_exception(TimeoutError("Timeout")) + # In case the wrapped future goes on to fail, log it. + future.add_done_callback(error_callback) timeout_handle = io_loop.add_timeout( - timeout, - lambda: result.set_exception(TimeoutError("Timeout"))) + timeout, timeout_callback) if isinstance(future, Future): # We know this future will resolve on the IOLoop, so we don't # need the extra thread-safety of IOLoop.add_future (and we also @@ -545,6 +691,25 @@ def with_timeout(timeout, future, io_loop=None): return result +def sleep(duration): + """Return a `.Future` that resolves after the given number of seconds. + + When used with ``yield`` in a coroutine, this is a non-blocking + analogue to `time.sleep` (which should not be used in coroutines + because it is blocking):: + + yield gen.sleep(0.5) + + Note that calling this function on its own does nothing; you must + wait on the `.Future` it returns (usually by yielding it). + + .. versionadded:: 4.1 + """ + f = Future() + IOLoop.current().call_later(duration, lambda: f.set_result(None)) + return f + + _null_future = Future() _null_future.set_result(None) @@ -678,18 +843,18 @@ class Runner(object): self.running = False def handle_yield(self, yielded): - if isinstance(yielded, list): - if all(is_future(f) for f in yielded): - yielded = multi_future(yielded) - else: - yielded = Multi(yielded) - elif isinstance(yielded, dict): - if all(is_future(f) for f in yielded.values()): - yielded = multi_future(yielded) - else: - yielded = Multi(yielded) + # Lists containing YieldPoints require stack contexts; + # other lists are handled via multi_future in convert_yielded. + if (isinstance(yielded, list) and + any(isinstance(f, YieldPoint) for f in yielded)): + yielded = Multi(yielded) + elif (isinstance(yielded, dict) and + any(isinstance(f, YieldPoint) for f in yielded.values())): + yielded = Multi(yielded) if isinstance(yielded, YieldPoint): + # YieldPoints are too closely coupled to the Runner to go + # through the generic convert_yielded mechanism. self.future = TracebackFuture() def start_yield_point(): try: @@ -702,6 +867,7 @@ class Runner(object): except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) + if self.stack_context_deactivate is None: # Start a stack context if this is the first # YieldPoint we've seen. @@ -715,16 +881,17 @@ class Runner(object): return False else: start_yield_point() - elif is_future(yielded): - self.future = yielded - if not self.future.done() or self.future is moment: - self.io_loop.add_future( - self.future, lambda f: self.run()) - return False else: - self.future = TracebackFuture() - self.future.set_exception(BadYieldError( - "yielded unknown object %r" % (yielded,))) + try: + self.future = convert_yielded(yielded) + except BadYieldError: + self.future = TracebackFuture() + self.future.set_exc_info(sys.exc_info()) + + if not self.future.done() or self.future is moment: + self.io_loop.add_future( + self.future, lambda f: self.run()) + return False return True def result_callback(self, key): @@ -763,3 +930,30 @@ def _argument_adapter(callback): else: callback(None) return wrapper + + +def convert_yielded(yielded): + """Convert a yielded object into a `.Future`. + + The default implementation accepts lists, dictionaries, and Futures. + + If the `~functools.singledispatch` library is available, this function + may be extended to support additional types. For example:: + + @convert_yielded.register(asyncio.Future) + def _(asyncio_future): + return tornado.platform.asyncio.to_tornado_future(asyncio_future) + + .. versionadded:: 4.1 + """ + # Lists and dicts containing YieldPoints were handled separately + # via Multi(). + if isinstance(yielded, (list, dict)): + return multi_future(yielded) + elif is_future(yielded): + return yielded + else: + raise BadYieldError("yielded unknown object %r" % (yielded,)) + +if singledispatch is not None: + convert_yielded = singledispatch(convert_yielded) diff --git a/libs/tornado/http1connection.py b/libs/tornado/http1connection.py index 90895cc..181319c 100644 --- a/libs/tornado/http1connection.py +++ b/libs/tornado/http1connection.py @@ -162,7 +162,8 @@ class HTTP1Connection(httputil.HTTPConnection): header_data = yield gen.with_timeout( self.stream.io_loop.time() + self.params.header_timeout, header_future, - io_loop=self.stream.io_loop) + io_loop=self.stream.io_loop, + quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: self.close() raise gen.Return(False) @@ -221,7 +222,8 @@ class HTTP1Connection(httputil.HTTPConnection): try: yield gen.with_timeout( self.stream.io_loop.time() + self._body_timeout, - body_future, self.stream.io_loop) + body_future, self.stream.io_loop, + quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: gen_log.info("Timeout reading body from %s", self.context) @@ -326,8 +328,10 @@ class HTTP1Connection(httputil.HTTPConnection): def write_headers(self, start_line, headers, chunk=None, callback=None): """Implements `.HTTPConnection.write_headers`.""" + lines = [] if self.is_client: self._request_start_line = start_line + lines.append(utf8('%s %s HTTP/1.1' % (start_line[0], start_line[1]))) # Client requests with a non-empty body must have either a # Content-Length or a Transfer-Encoding. self._chunking_output = ( @@ -336,6 +340,7 @@ class HTTP1Connection(httputil.HTTPConnection): 'Transfer-Encoding' not in headers) else: self._response_start_line = start_line + lines.append(utf8('HTTP/1.1 %s %s' % (start_line[1], start_line[2]))) self._chunking_output = ( # TODO: should this use # self._request_start_line.version or @@ -365,7 +370,6 @@ class HTTP1Connection(httputil.HTTPConnection): self._expected_content_remaining = int(headers['Content-Length']) else: self._expected_content_remaining = None - lines = [utf8("%s %s %s" % start_line)] lines.extend([utf8(n) + b": " + utf8(v) for n, v in headers.get_all()]) for line in lines: if b'\n' in line: @@ -374,6 +378,7 @@ class HTTP1Connection(httputil.HTTPConnection): if self.stream.closed(): future = self._write_future = Future() future.set_exception(iostream.StreamClosedError()) + future.exception() else: if callback is not None: self._write_callback = stack_context.wrap(callback) @@ -412,6 +417,7 @@ class HTTP1Connection(httputil.HTTPConnection): if self.stream.closed(): future = self._write_future = Future() self._write_future.set_exception(iostream.StreamClosedError()) + self._write_future.exception() else: if callback is not None: self._write_callback = stack_context.wrap(callback) @@ -451,6 +457,9 @@ class HTTP1Connection(httputil.HTTPConnection): self._pending_write.add_done_callback(self._finish_request) def _on_write_complete(self, future): + exc = future.exception() + if exc is not None and not isinstance(exc, iostream.StreamClosedError): + future.result() if self._write_callback is not None: callback = self._write_callback self._write_callback = None @@ -491,8 +500,9 @@ class HTTP1Connection(httputil.HTTPConnection): # we SHOULD ignore at least one empty line before the request. # http://tools.ietf.org/html/rfc7230#section-3.5 data = native_str(data.decode('latin1')).lstrip("\r\n") - eol = data.find("\r\n") - start_line = data[:eol] + # RFC 7230 section allows for both CRLF and bare LF. + eol = data.find("\n") + start_line = data[:eol].rstrip("\r") try: headers = httputil.HTTPHeaders.parse(data[eol:]) except ValueError: diff --git a/libs/tornado/httpclient.py b/libs/tornado/httpclient.py index 6ea872d..0ae9e48 100755 --- a/libs/tornado/httpclient.py +++ b/libs/tornado/httpclient.py @@ -137,6 +137,9 @@ class AsyncHTTPClient(Configurable): # or with force_instance: client = AsyncHTTPClient(force_instance=True, defaults=dict(user_agent="MyUserAgent")) + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ @classmethod def configurable_base(cls): diff --git a/libs/tornado/httpserver.py b/libs/tornado/httpserver.py index 47c7472..e470e0e 100755 --- a/libs/tornado/httpserver.py +++ b/libs/tornado/httpserver.py @@ -114,6 +114,11 @@ class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate): ``idle_connection_timeout``, ``body_timeout``, ``max_body_size`` arguments. Added support for `.HTTPServerConnectionDelegate` instances as ``request_callback``. + + .. versionchanged:: 4.1 + `.HTTPServerConnectionDelegate.start_request` is now called with + two arguments ``(server_conn, request_conn)`` (in accordance with the + documentation) instead of one ``(request_conn)``. """ def __init__(self, request_callback, no_keep_alive=False, io_loop=None, xheaders=False, ssl_options=None, protocol=None, @@ -153,7 +158,7 @@ class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate): conn.start_serving(self) def start_request(self, server_conn, request_conn): - return _ServerRequestAdapter(self, request_conn) + return _ServerRequestAdapter(self, server_conn, request_conn) def on_close(self, server_conn): self._connections.remove(server_conn) @@ -226,13 +231,14 @@ class _ServerRequestAdapter(httputil.HTTPMessageDelegate): """Adapts the `HTTPMessageDelegate` interface to the interface expected by our clients. """ - def __init__(self, server, connection): + def __init__(self, server, server_conn, request_conn): self.server = server - self.connection = connection + self.connection = request_conn self.request = None if isinstance(server.request_callback, httputil.HTTPServerConnectionDelegate): - self.delegate = server.request_callback.start_request(connection) + self.delegate = server.request_callback.start_request( + server_conn, request_conn) self._chunks = None else: self.delegate = None diff --git a/libs/tornado/httputil.py b/libs/tornado/httputil.py index 88389fe..9c99b3e 100755 --- a/libs/tornado/httputil.py +++ b/libs/tornado/httputil.py @@ -62,6 +62,11 @@ except ImportError: pass +# RFC 7230 section 3.5: a recipient MAY recognize a single LF as a line +# terminator and ignore any preceding CR. +_CRLF_RE = re.compile(r'\r?\n') + + class _NormalizedHeaderCache(dict): """Dynamic cached mapping of header names to Http-Header-Case. @@ -193,7 +198,7 @@ class HTTPHeaders(dict): [('Content-Length', '42'), ('Content-Type', 'text/html')] """ h = cls() - for line in headers.splitlines(): + for line in _CRLF_RE.split(headers): if line: h.parse_line(line) return h @@ -543,6 +548,8 @@ class HTTPConnection(object): headers. :arg callback: a callback to be run when the write is complete. + The ``version`` field of ``start_line`` is ignored. + Returns a `.Future` if no callback is given. """ raise NotImplementedError() @@ -689,14 +696,17 @@ def parse_body_arguments(content_type, body, arguments, files, headers=None): if values: arguments.setdefault(name, []).extend(values) elif content_type.startswith("multipart/form-data"): - fields = content_type.split(";") - for field in fields: - k, sep, v = field.strip().partition("=") - if k == "boundary" and v: - parse_multipart_form_data(utf8(v), body, arguments, files) - break - else: - gen_log.warning("Invalid multipart/form-data") + try: + fields = content_type.split(";") + for field in fields: + k, sep, v = field.strip().partition("=") + if k == "boundary" and v: + parse_multipart_form_data(utf8(v), body, arguments, files) + break + else: + raise ValueError("multipart boundary not found") + except Exception as e: + gen_log.warning("Invalid multipart/form-data: %s", e) def parse_multipart_form_data(boundary, data, arguments, files): @@ -782,7 +792,7 @@ def parse_request_start_line(line): method, path, version = line.split(" ") except ValueError: raise HTTPInputError("Malformed HTTP request line") - if not version.startswith("HTTP/"): + if not re.match(r"^HTTP/1\.[0-9]$", version): raise HTTPInputError( "Malformed HTTP version in HTTP Request-Line: %r" % version) return RequestStartLine(method, path, version) @@ -801,7 +811,7 @@ def parse_response_start_line(line): ResponseStartLine(version='HTTP/1.1', code=200, reason='OK') """ line = native_str(line) - match = re.match("(HTTP/1.[01]) ([0-9]+) ([^\r]*)", line) + match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line) if not match: raise HTTPInputError("Error parsing response start line") return ResponseStartLine(match.group(1), int(match.group(2)), @@ -878,6 +888,8 @@ def split_host_and_port(netloc): """Returns ``(host, port)`` tuple from ``netloc``. Returned ``port`` will be ``None`` if not present. + + .. versionadded:: 4.1 """ match = re.match(r'^(.+):(\d+)$', netloc) if match: diff --git a/libs/tornado/ioloop.py b/libs/tornado/ioloop.py index 0319386..680dc40 100755 --- a/libs/tornado/ioloop.py +++ b/libs/tornado/ioloop.py @@ -167,28 +167,26 @@ class IOLoop(Configurable): del IOLoop._instance @staticmethod - def current(): + def current(instance=True): """Returns the current thread's `IOLoop`. - If an `IOLoop` is currently running or has been marked as current - by `make_current`, returns that instance. Otherwise returns - `IOLoop.instance()`, i.e. the main thread's `IOLoop`. - - A common pattern for classes that depend on ``IOLoops`` is to use - a default argument to enable programs with multiple ``IOLoops`` - but not require the argument for simpler applications:: - - class MyClass(object): - def __init__(self, io_loop=None): - self.io_loop = io_loop or IOLoop.current() + If an `IOLoop` is currently running or has been marked as + current by `make_current`, returns that instance. If there is + no current `IOLoop`, returns `IOLoop.instance()` (i.e. the + main thread's `IOLoop`, creating one if necessary) if ``instance`` + is true. In general you should use `IOLoop.current` as the default when constructing an asynchronous object, and use `IOLoop.instance` when you mean to communicate to the main thread from a different one. + + .. versionchanged:: 4.1 + Added ``instance`` argument to control the + """ current = getattr(IOLoop._current, "instance", None) - if current is None: + if current is None and instance: return IOLoop.instance() return current @@ -200,6 +198,10 @@ class IOLoop(Configurable): `make_current` explicitly before starting the `IOLoop`, so that code run at startup time can find the right instance. + + .. versionchanged:: 4.1 + An `IOLoop` created while there is no current `IOLoop` + will automatically become current. """ IOLoop._current.instance = self @@ -224,7 +226,8 @@ class IOLoop(Configurable): return SelectIOLoop def initialize(self): - pass + if IOLoop.current(instance=False) is None: + self.make_current() def close(self, all_fds=False): """Closes the `IOLoop`, freeing any resources used. @@ -946,6 +949,9 @@ class PeriodicCallback(object): The callback is called every ``callback_time`` milliseconds. `start` must be called after the `PeriodicCallback` is created. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def __init__(self, callback, callback_time, io_loop=None): self.callback = callback @@ -969,6 +975,13 @@ class PeriodicCallback(object): self.io_loop.remove_timeout(self._timeout) self._timeout = None + def is_running(self): + """Return True if this `.PeriodicCallback` has been started. + + .. versionadded:: 4.1 + """ + return self._running + def _run(self): if not self._running: return diff --git a/libs/tornado/iostream.py b/libs/tornado/iostream.py index 2d5df99..cdb6250 100755 --- a/libs/tornado/iostream.py +++ b/libs/tornado/iostream.py @@ -68,6 +68,14 @@ _ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, if hasattr(errno, "WSAECONNRESET"): _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT) +if sys.platform == 'darwin': + # OSX appears to have a race condition that causes send(2) to return + # EPROTOTYPE if called while a socket is being torn down: + # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/ + # Since the socket is being closed anyway, treat this as an ECONNRESET + # instead of an unexpected error. + _ERRNO_CONNRESET += (errno.EPROTOTYPE,) + # More non-portable errnos: _ERRNO_INPROGRESS = (errno.EINPROGRESS,) @@ -122,6 +130,7 @@ class BaseIOStream(object): """`BaseIOStream` constructor. :arg io_loop: The `.IOLoop` to use; defaults to `.IOLoop.current`. + Deprecated since Tornado 4.1. :arg max_buffer_size: Maximum amount of incoming data to buffer; defaults to 100MB. :arg read_chunk_size: Amount of data to read at one time from the @@ -230,6 +239,12 @@ class BaseIOStream(object): gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) return future + except: + if future is not None: + # Ensure that the future doesn't log an error because its + # failure was never examined. + future.add_done_callback(lambda f: f.exception()) + raise return future def read_until(self, delimiter, callback=None, max_bytes=None): @@ -257,6 +272,10 @@ class BaseIOStream(object): gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) return future + except: + if future is not None: + future.add_done_callback(lambda f: f.exception()) + raise return future def read_bytes(self, num_bytes, callback=None, streaming_callback=None, @@ -281,7 +300,12 @@ class BaseIOStream(object): self._read_bytes = num_bytes self._read_partial = partial self._streaming_callback = stack_context.wrap(streaming_callback) - self._try_inline_read() + try: + self._try_inline_read() + except: + if future is not None: + future.add_done_callback(lambda f: f.exception()) + raise return future def read_until_close(self, callback=None, streaming_callback=None): @@ -305,7 +329,11 @@ class BaseIOStream(object): self._run_read_callback(self._read_buffer_size, False) return future self._read_until_close = True - self._try_inline_read() + try: + self._try_inline_read() + except: + future.add_done_callback(lambda f: f.exception()) + raise return future def write(self, data, callback=None): @@ -344,6 +372,7 @@ class BaseIOStream(object): future = None else: future = self._write_future = TracebackFuture() + future.add_done_callback(lambda f: f.exception()) if not self._connecting: self._handle_write() if self._write_buffer: @@ -1010,8 +1039,9 @@ class IOStream(BaseIOStream): # reported later in _handle_connect. if (errno_from_exception(e) not in _ERRNO_INPROGRESS and errno_from_exception(e) not in _ERRNO_WOULDBLOCK): - gen_log.warning("Connect error on fd %s: %s", - self.socket.fileno(), e) + if future is None: + gen_log.warning("Connect error on fd %s: %s", + self.socket.fileno(), e) self.close(exc_info=True) return future self._add_io_state(self.io_loop.WRITE) @@ -1058,7 +1088,9 @@ class IOStream(BaseIOStream): socket = self.socket self.io_loop.remove_handler(socket) self.socket = None - socket = ssl_wrap_socket(socket, ssl_options, server_side=server_side, + socket = ssl_wrap_socket(socket, ssl_options, + server_hostname=server_hostname, + server_side=server_side, do_handshake_on_connect=False) orig_close_callback = self._close_callback self._close_callback = None diff --git a/libs/tornado/netutil.py b/libs/tornado/netutil.py index e85f62b..17e9580 100755 --- a/libs/tornado/netutil.py +++ b/libs/tornado/netutil.py @@ -187,6 +187,9 @@ def add_accept_handler(sock, callback, io_loop=None): address of the other end of the connection). Note that this signature is different from the ``callback(fd, events)`` signature used for `.IOLoop` handlers. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ if io_loop is None: io_loop = IOLoop.current() @@ -301,6 +304,9 @@ class ExecutorResolver(Resolver): The executor will be shut down when the resolver is closed unless ``close_resolver=False``; use this if you want to reuse the same executor elsewhere. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def initialize(self, io_loop=None, executor=None, close_executor=True): self.io_loop = io_loop or IOLoop.current() diff --git a/libs/tornado/platform/asyncio.py b/libs/tornado/platform/asyncio.py index dd6722a..bc68517 100644 --- a/libs/tornado/platform/asyncio.py +++ b/libs/tornado/platform/asyncio.py @@ -12,6 +12,8 @@ unfinished callbacks on the event loop that fail when it resumes) from __future__ import absolute_import, division, print_function, with_statement import functools +import tornado.concurrent +from tornado.gen import convert_yielded from tornado.ioloop import IOLoop from tornado import stack_context @@ -138,3 +140,18 @@ class AsyncIOLoop(BaseAsyncIOLoop): def initialize(self): super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(), close_loop=True) + +def to_tornado_future(asyncio_future): + """Convert an ``asyncio.Future`` to a `tornado.concurrent.Future`.""" + tf = tornado.concurrent.Future() + tornado.concurrent.chain_future(asyncio_future, tf) + return tf + +def to_asyncio_future(tornado_future): + """Convert a `tornado.concurrent.Future` to an ``asyncio.Future``.""" + af = asyncio.Future() + tornado.concurrent.chain_future(tornado_future, af) + return af + +if hasattr(convert_yielded, 'register'): + convert_yielded.register(asyncio.Future, to_tornado_future) diff --git a/libs/tornado/platform/caresresolver.py b/libs/tornado/platform/caresresolver.py index c4648c2..5559614 100755 --- a/libs/tornado/platform/caresresolver.py +++ b/libs/tornado/platform/caresresolver.py @@ -18,6 +18,9 @@ class CaresResolver(Resolver): so it is only recommended for use in ``AF_INET`` (i.e. IPv4). This is the default for ``tornado.simple_httpclient``, but other libraries may default to ``AF_UNSPEC``. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def initialize(self, io_loop=None): self.io_loop = io_loop or IOLoop.current() diff --git a/libs/tornado/platform/twisted.py b/libs/tornado/platform/twisted.py index 27d991c..09b3283 100755 --- a/libs/tornado/platform/twisted.py +++ b/libs/tornado/platform/twisted.py @@ -70,8 +70,10 @@ import datetime import functools import numbers import socket +import sys import twisted.internet.abstract +from twisted.internet.defer import Deferred from twisted.internet.posixbase import PosixReactorBase from twisted.internet.interfaces import \ IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor @@ -84,6 +86,7 @@ import twisted.names.resolve from zope.interface import implementer +from tornado.concurrent import Future from tornado.escape import utf8 from tornado import gen import tornado.ioloop @@ -147,6 +150,9 @@ class TornadoReactor(PosixReactorBase): We override `mainLoop` instead of `doIteration` and must implement timed call functionality on top of `IOLoop.add_timeout` rather than using the implementation in `PosixReactorBase`. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def __init__(self, io_loop=None): if not io_loop: @@ -356,7 +362,11 @@ class _TestReactor(TornadoReactor): def install(io_loop=None): - """Install this package as the default Twisted reactor.""" + """Install this package as the default Twisted reactor. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. + """ if not io_loop: io_loop = tornado.ioloop.IOLoop.current() reactor = TornadoReactor(io_loop) @@ -512,6 +522,9 @@ class TwistedResolver(Resolver): ``socket.AF_UNSPEC``. Requires Twisted 12.1 or newer. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def initialize(self, io_loop=None): self.io_loop = io_loop or IOLoop.current() @@ -554,3 +567,17 @@ class TwistedResolver(Resolver): (resolved_family, (resolved, port)), ] raise gen.Return(result) + +if hasattr(gen.convert_yielded, 'register'): + @gen.convert_yielded.register(Deferred) + def _(d): + f = Future() + def errback(failure): + try: + failure.raiseException() + # Should never happen, but just in case + raise Exception("errback called without error") + except: + f.set_exc_info(sys.exc_info()) + d.addCallbacks(f.set_result, errback) + return f diff --git a/libs/tornado/process.py b/libs/tornado/process.py index cea3dbd..3790ca0 100755 --- a/libs/tornado/process.py +++ b/libs/tornado/process.py @@ -191,6 +191,9 @@ class Subprocess(object): ``tornado.process.Subprocess.STREAM``, which will make the corresponding attribute of the resulting Subprocess a `.PipeIOStream`. * A new keyword argument ``io_loop`` may be used to pass in an IOLoop. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ STREAM = object() @@ -263,6 +266,9 @@ class Subprocess(object): Note that the `.IOLoop` used for signal handling need not be the same one used by individual Subprocess objects (as long as the ``IOLoops`` are each running in separate threads). + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ if cls._initialized: return diff --git a/libs/tornado/simple_httpclient.py b/libs/tornado/simple_httpclient.py index 7c915e9..31d076e 100755 --- a/libs/tornado/simple_httpclient.py +++ b/libs/tornado/simple_httpclient.py @@ -345,7 +345,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): decompress=self.request.decompress_response), self._sockaddr) start_line = httputil.RequestStartLine(self.request.method, - req_path, 'HTTP/1.1') + req_path, '') self.connection.write_headers(start_line, self.request.headers) if self.request.expect_100_continue: self._read_response() diff --git a/libs/tornado/tcpclient.py b/libs/tornado/tcpclient.py index 0abbea2..f594d91 100644 --- a/libs/tornado/tcpclient.py +++ b/libs/tornado/tcpclient.py @@ -111,6 +111,7 @@ class _Connector(object): if self.timeout is not None: # If the first attempt failed, don't wait for the # timeout to try an address from the secondary queue. + self.io_loop.remove_timeout(self.timeout) self.on_timeout() return self.clear_timeout() @@ -135,6 +136,9 @@ class _Connector(object): class TCPClient(object): """A non-blocking TCP connection factory. + + .. versionchanged:: 4.1 + The ``io_loop`` argument is deprecated. """ def __init__(self, resolver=None, io_loop=None): self.io_loop = io_loop or IOLoop.current() diff --git a/libs/tornado/tcpserver.py b/libs/tornado/tcpserver.py index 427acec..a02b36f 100755 --- a/libs/tornado/tcpserver.py +++ b/libs/tornado/tcpserver.py @@ -95,7 +95,7 @@ class TCPServer(object): self._pending_sockets = [] self._started = False self.max_buffer_size = max_buffer_size - self.read_chunk_size = None + self.read_chunk_size = read_chunk_size # Verify the SSL options. Otherwise we don't get errors until clients # connect. This doesn't verify that the keys are legitimate, but diff --git a/libs/tornado/testing.py b/libs/tornado/testing.py index 4511863..3d3bcf7 100755 --- a/libs/tornado/testing.py +++ b/libs/tornado/testing.py @@ -543,6 +543,9 @@ class LogTrapTestCase(unittest.TestCase): `logging.basicConfig` and the "pretty logging" configured by `tornado.options`. It is not compatible with other log buffering mechanisms, such as those provided by some test runners. + + .. deprecated:: 4.1 + Use the unittest module's ``--buffer`` option instead, or `.ExpectLog`. """ def run(self, result=None): logger = logging.getLogger() diff --git a/libs/tornado/web.py b/libs/tornado/web.py index 2d1dac0..52bfce3 100755 --- a/libs/tornado/web.py +++ b/libs/tornado/web.py @@ -268,6 +268,7 @@ class RequestHandler(object): if _has_stream_request_body(self.__class__): if not self.request.body.done(): self.request.body.set_exception(iostream.StreamClosedError()) + self.request.body.exception() def clear(self): """Resets all headers and content for this response.""" @@ -840,7 +841,7 @@ class RequestHandler(object): for cookie in self._new_cookie.values(): self.add_header("Set-Cookie", cookie.OutputString(None)) - start_line = httputil.ResponseStartLine(self.request.version, + start_line = httputil.ResponseStartLine('', self._status_code, self._reason) return self.request.connection.write_headers( @@ -1120,28 +1121,36 @@ class RequestHandler(object): """Convert a cookie string into a the tuple form returned by _get_raw_xsrf_token. """ - m = _signed_value_version_re.match(utf8(cookie)) - if m: - version = int(m.group(1)) - if version == 2: - _, mask, masked_token, timestamp = cookie.split("|") - mask = binascii.a2b_hex(utf8(mask)) - token = _websocket_mask( - mask, binascii.a2b_hex(utf8(masked_token))) - timestamp = int(timestamp) - return version, token, timestamp + + try: + m = _signed_value_version_re.match(utf8(cookie)) + + if m: + version = int(m.group(1)) + if version == 2: + _, mask, masked_token, timestamp = cookie.split("|") + + mask = binascii.a2b_hex(utf8(mask)) + token = _websocket_mask( + mask, binascii.a2b_hex(utf8(masked_token))) + timestamp = int(timestamp) + return version, token, timestamp + else: + # Treat unknown versions as not present instead of failing. + raise Exception("Unknown xsrf cookie version") else: - # Treat unknown versions as not present instead of failing. - return None, None, None - else: - version = 1 - try: - token = binascii.a2b_hex(utf8(cookie)) - except (binascii.Error, TypeError): - token = utf8(cookie) - # We don't have a usable timestamp in older versions. - timestamp = int(time.time()) - return (version, token, timestamp) + version = 1 + try: + token = binascii.a2b_hex(utf8(cookie)) + except (binascii.Error, TypeError): + token = utf8(cookie) + # We don't have a usable timestamp in older versions. + timestamp = int(time.time()) + return (version, token, timestamp) + except Exception: + # Catch exceptions and return nothing instead of failing. + gen_log.debug("Uncaught exception in _decode_xsrf_token", exc_info=True) + return None, None, None def check_xsrf_cookie(self): """Verifies that the ``_xsrf`` cookie matches the ``_xsrf`` argument. @@ -1771,9 +1780,9 @@ class Application(httputil.HTTPServerConnectionDelegate): except TypeError: pass - def start_request(self, connection): + def start_request(self, server_conn, request_conn): # Modern HTTPServer interface - return _RequestDispatcher(self, connection) + return _RequestDispatcher(self, request_conn) def __call__(self, request): # Legacy HTTPServer interface @@ -1915,8 +1924,10 @@ class _RequestDispatcher(httputil.HTTPMessageDelegate): # trapped in the Future it returns (which we are ignoring here). # However, that shouldn't happen because _execute has a blanket # except handler, and we cannot easily access the IOLoop here to - # call add_future. - self.handler._execute(transforms, *self.path_args, **self.path_kwargs) + # call add_future (because of the requirement to remain compatible + # with WSGI) + f = self.handler._execute(transforms, *self.path_args, **self.path_kwargs) + f.add_done_callback(lambda f: f.exception()) # If we are streaming the request body, then execute() is finished # when the handler has prepared to receive the body. If not, # it doesn't matter when execute() finishes (so we return None) @@ -2622,6 +2633,8 @@ class UIModule(object): UI modules often execute additional queries, and they can include additional CSS and JavaScript that will be included in the output page, which is automatically inserted on page render. + + Subclasses of UIModule must override the `render` method. """ def __init__(self, handler): self.handler = handler @@ -2634,31 +2647,43 @@ class UIModule(object): return self.handler.current_user def render(self, *args, **kwargs): - """Overridden in subclasses to return this module's output.""" + """Override in subclasses to return this module's output.""" raise NotImplementedError() def embedded_javascript(self): - """Returns a JavaScript string that will be embedded in the page.""" + """Override to return a JavaScript string to be embedded in the page.""" return None def javascript_files(self): - """Returns a list of JavaScript files required by this module.""" + """Override to return a list of JavaScript files needed by this module. + + If the return values are relative paths, they will be passed to + `RequestHandler.static_url`; otherwise they will be used as-is. + """ return None def embedded_css(self): - """Returns a CSS string that will be embedded in the page.""" + """Override to return a CSS string that will be embedded in the page.""" return None def css_files(self): - """Returns a list of CSS files required by this module.""" + """Override to returns a list of CSS files required by this module. + + If the return values are relative paths, they will be passed to + `RequestHandler.static_url`; otherwise they will be used as-is. + """ return None def html_head(self): - """Returns a CSS string that will be put in the element""" + """Override to return an HTML string that will be put in the + element. + """ return None def html_body(self): - """Returns an HTML string that will be put in the element""" + """Override to return an HTML string that will be put at the end of + the element. + """ return None def render_string(self, path, **kwargs): diff --git a/libs/tornado/websocket.py b/libs/tornado/websocket.py index 5c762ad..c009225 100755 --- a/libs/tornado/websocket.py +++ b/libs/tornado/websocket.py @@ -129,6 +129,7 @@ class WebSocketHandler(tornado.web.RequestHandler): self.close_code = None self.close_reason = None self.stream = None + self._on_close_called = False @tornado.web.asynchronous def get(self, *args, **kwargs): @@ -171,18 +172,16 @@ class WebSocketHandler(tornado.web.RequestHandler): self.stream = self.request.connection.detach() self.stream.set_close_callback(self.on_connection_close) - if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"): - self.ws_connection = WebSocketProtocol13( - self, compression_options=self.get_compression_options()) + self.ws_connection = self.get_websocket_protocol() + if self.ws_connection: self.ws_connection.accept_connection() else: if not self.stream.closed(): self.stream.write(tornado.escape.utf8( "HTTP/1.1 426 Upgrade Required\r\n" - "Sec-WebSocket-Version: 8\r\n\r\n")) + "Sec-WebSocket-Version: 7, 8, 13\r\n\r\n")) self.stream.close() - def write_message(self, message, binary=False): """Sends the given message to the client of this Web Socket. @@ -350,6 +349,8 @@ class WebSocketHandler(tornado.web.RequestHandler): if self.ws_connection: self.ws_connection.on_connection_close() self.ws_connection = None + if not self._on_close_called: + self._on_close_called self.on_close() def send_error(self, *args, **kwargs): @@ -362,6 +363,13 @@ class WebSocketHandler(tornado.web.RequestHandler): # we can close the connection more gracefully. self.stream.close() + def get_websocket_protocol(self): + websocket_version = self.request.headers.get("Sec-WebSocket-Version") + if websocket_version in ("7", "8", "13"): + return WebSocketProtocol13( + self, compression_options=self.get_compression_options()) + + def _wrap_method(method): def _disallow_for_websocket(self, *args, **kwargs): if self.stream is None: @@ -852,12 +860,15 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): This class should not be instantiated directly; use the `websocket_connect` function instead. """ - def __init__(self, io_loop, request, compression_options=None): + def __init__(self, io_loop, request, on_message_callback=None, + compression_options=None): self.compression_options = compression_options self.connect_future = TracebackFuture() + self.protocol = None self.read_future = None self.read_queue = collections.deque() self.key = base64.b64encode(os.urandom(16)) + self._on_message_callback = on_message_callback scheme, sep, rest = request.url.partition(':') scheme = {'ws': 'http', 'wss': 'https'}[scheme] @@ -919,9 +930,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): start_line, headers) self.headers = headers - self.protocol = WebSocketProtocol13( - self, mask_outgoing=True, - compression_options=self.compression_options) + self.protocol = self.get_websocket_protocol() self.protocol._process_server_headers(self.key, self.headers) self.protocol._receive_frame() @@ -946,6 +955,9 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): def read_message(self, callback=None): """Reads a message from the WebSocket server. + If on_message_callback was specified at WebSocket + initialization, this function will never return messages + Returns a future whose result is the message, or None if the connection is closed. If a callback argument is given it will be called with the future when it is @@ -962,7 +974,9 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): return future def on_message(self, message): - if self.read_future is not None: + if self._on_message_callback: + self._on_message_callback(message) + elif self.read_future is not None: self.read_future.set_result(message) self.read_future = None else: @@ -971,9 +985,13 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): def on_pong(self, data): pass + def get_websocket_protocol(self): + return WebSocketProtocol13(self, mask_outgoing=True, + compression_options=self.compression_options) + def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, - compression_options=None): + on_message_callback=None, compression_options=None): """Client-side websocket support. Takes a url and returns a Future whose result is a @@ -982,11 +1000,26 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, ``compression_options`` is interpreted in the same way as the return value of `.WebSocketHandler.get_compression_options`. + The connection supports two styles of operation. In the coroutine + style, the application typically calls + `~.WebSocketClientConnection.read_message` in a loop:: + + conn = yield websocket_connection(loop) + while True: + msg = yield conn.read_message() + if msg is None: break + # Do something with msg + + In the callback style, pass an ``on_message_callback`` to + ``websocket_connect``. In both styles, a message of ``None`` + indicates that the connection has been closed. + .. versionchanged:: 3.2 Also accepts ``HTTPRequest`` objects in place of urls. .. versionchanged:: 4.1 - Added ``compression_options``. + Added ``compression_options`` and ``on_message_callback``. + The ``io_loop`` argument is deprecated. """ if io_loop is None: io_loop = IOLoop.current() @@ -1000,7 +1033,9 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) request = httpclient._RequestProxy( request, httpclient.HTTPRequest._DEFAULTS) - conn = WebSocketClientConnection(io_loop, request, compression_options) + conn = WebSocketClientConnection(io_loop, request, + on_message_callback=on_message_callback, + compression_options=compression_options) if callback is not None: io_loop.add_future(conn.connect_future, callback) return conn.connect_future