|
|
@ -43,8 +43,21 @@ be returned when they are all finished:: |
|
|
|
response3 = response_dict['response3'] |
|
|
|
response4 = response_dict['response4'] |
|
|
|
|
|
|
|
If the `~functools.singledispatch` library is available (standard in |
|
|
|
Python 3.4, available via the `singledispatch |
|
|
|
<https://pypi.python.org/pypi/singledispatch>`_ package on older |
|
|
|
versions), additional types of objects may be yielded. Tornado includes |
|
|
|
support for ``asyncio.Future`` and Twisted's ``Deferred`` class when |
|
|
|
``tornado.platform.asyncio`` and ``tornado.platform.twisted`` are imported. |
|
|
|
See the `convert_yielded` function to extend this mechanism. |
|
|
|
|
|
|
|
.. versionchanged:: 3.2 |
|
|
|
Dict support added. |
|
|
|
|
|
|
|
.. versionchanged:: 4.1 |
|
|
|
Support added for yielding ``asyncio`` Futures and Twisted Deferreds |
|
|
|
via ``singledispatch``. |
|
|
|
|
|
|
|
""" |
|
|
|
from __future__ import absolute_import, division, print_function, with_statement |
|
|
|
|
|
|
@ -53,11 +66,21 @@ import functools |
|
|
|
import itertools |
|
|
|
import sys |
|
|
|
import types |
|
|
|
import weakref |
|
|
|
|
|
|
|
from tornado.concurrent import Future, TracebackFuture, is_future, chain_future |
|
|
|
from tornado.ioloop import IOLoop |
|
|
|
from tornado.log import app_log |
|
|
|
from tornado import stack_context |
|
|
|
|
|
|
|
try: |
|
|
|
from functools import singledispatch # py34+ |
|
|
|
except ImportError as e: |
|
|
|
try: |
|
|
|
from singledispatch import singledispatch # backport |
|
|
|
except ImportError: |
|
|
|
singledispatch = None |
|
|
|
|
|
|
|
|
|
|
|
class KeyReuseError(Exception): |
|
|
|
pass |
|
|
@ -240,6 +263,106 @@ class Return(Exception): |
|
|
|
super(Return, self).__init__() |
|
|
|
self.value = value |
|
|
|
|
|
|
|
class WaitIterator(object): |
|
|
|
"""Provides an iterator to yield the results of futures as they finish. |
|
|
|
|
|
|
|
Yielding a set of futures like this: |
|
|
|
|
|
|
|
``results = yield [future1, future2]`` |
|
|
|
|
|
|
|
pauses the coroutine until both ``future1`` and ``future2`` |
|
|
|
return, and then restarts the coroutine with the results of both |
|
|
|
futures. If either future is an exception, the expression will |
|
|
|
raise that exception and all the results will be lost. |
|
|
|
|
|
|
|
If you need to get the result of each future as soon as possible, |
|
|
|
or if you need the result of some futures even if others produce |
|
|
|
errors, you can use ``WaitIterator``: |
|
|
|
|
|
|
|
:: |
|
|
|
|
|
|
|
wait_iterator = gen.WaitIterator(future1, future2) |
|
|
|
while not wait_iterator.done(): |
|
|
|
try: |
|
|
|
result = yield wait_iterator.next() |
|
|
|
except Exception as e: |
|
|
|
print "Error {} from {}".format(e, wait_iterator.current_future) |
|
|
|
else: |
|
|
|
print "Result {} recieved from {} at {}".format( |
|
|
|
result, wait_iterator.current_future, |
|
|
|
wait_iterator.current_index) |
|
|
|
|
|
|
|
Because results are returned as soon as they are available the |
|
|
|
output from the iterator *will not be in the same order as the |
|
|
|
input arguments*. If you need to know which future produced the |
|
|
|
current result, you can use the attributes |
|
|
|
``WaitIterator.current_future``, or ``WaitIterator.current_index`` |
|
|
|
to get the index of the future from the input list. (if keyword |
|
|
|
arguments were used in the construction of the `WaitIterator`, |
|
|
|
``current_index`` will use the corresponding keyword). |
|
|
|
|
|
|
|
.. versionadded:: 4.1 |
|
|
|
""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
if args and kwargs: |
|
|
|
raise ValueError( |
|
|
|
"You must provide args or kwargs, not both") |
|
|
|
|
|
|
|
if kwargs: |
|
|
|
self._unfinished = dict((f, k) for (k, f) in kwargs.items()) |
|
|
|
futures = list(kwargs.values()) |
|
|
|
else: |
|
|
|
self._unfinished = dict((f, i) for (i, f) in enumerate(args)) |
|
|
|
futures = args |
|
|
|
|
|
|
|
self._finished = collections.deque() |
|
|
|
self.current_index = self.current_future = None |
|
|
|
self._running_future = None |
|
|
|
|
|
|
|
self_ref = weakref.ref(self) |
|
|
|
for future in futures: |
|
|
|
future.add_done_callback(functools.partial( |
|
|
|
self._done_callback, self_ref)) |
|
|
|
|
|
|
|
def done(self): |
|
|
|
"""Returns True if this iterator has no more results.""" |
|
|
|
if self._finished or self._unfinished: |
|
|
|
return False |
|
|
|
# Clear the 'current' values when iteration is done. |
|
|
|
self.current_index = self.current_future = None |
|
|
|
return True |
|
|
|
|
|
|
|
def next(self): |
|
|
|
"""Returns a `.Future` that will yield the next available result. |
|
|
|
|
|
|
|
Note that this `.Future` will not be the same object as any of |
|
|
|
the inputs. |
|
|
|
""" |
|
|
|
self._running_future = TracebackFuture() |
|
|
|
|
|
|
|
if self._finished: |
|
|
|
self._return_result(self._finished.popleft()) |
|
|
|
|
|
|
|
return self._running_future |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _done_callback(self_ref, done): |
|
|
|
self = self_ref() |
|
|
|
if self is not None: |
|
|
|
if self._running_future and not self._running_future.done(): |
|
|
|
self._return_result(done) |
|
|
|
else: |
|
|
|
self._finished.append(done) |
|
|
|
|
|
|
|
def _return_result(self, done): |
|
|
|
"""Called set the returned future's state that of the future |
|
|
|
we yielded, and set the current future for the iterator. |
|
|
|
""" |
|
|
|
chain_future(done, self._running_future) |
|
|
|
|
|
|
|
self.current_future = done |
|
|
|
self.current_index = self._unfinished.pop(done) |
|
|
|
|
|
|
|
|
|
|
|
class YieldPoint(object): |
|
|
|
"""Base class for objects that may be yielded from the generator. |
|
|
@ -371,6 +494,11 @@ def Task(func, *args, **kwargs): |
|
|
|
|
|
|
|
class YieldFuture(YieldPoint): |
|
|
|
def __init__(self, future, io_loop=None): |
|
|
|
"""Adapts a `.Future` to the `YieldPoint` interface. |
|
|
|
|
|
|
|
.. versionchanged:: 4.1 |
|
|
|
The ``io_loop`` argument is deprecated. |
|
|
|
""" |
|
|
|
self.future = future |
|
|
|
self.io_loop = io_loop or IOLoop.current() |
|
|
|
|
|
|
@ -504,7 +632,7 @@ def maybe_future(x): |
|
|
|
return fut |
|
|
|
|
|
|
|
|
|
|
|
def with_timeout(timeout, future, io_loop=None): |
|
|
|
def with_timeout(timeout, future, io_loop=None, quiet_exceptions=()): |
|
|
|
"""Wraps a `.Future` in a timeout. |
|
|
|
|
|
|
|
Raises `TimeoutError` if the input future does not complete before |
|
|
@ -512,9 +640,17 @@ def with_timeout(timeout, future, io_loop=None): |
|
|
|
`.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time |
|
|
|
relative to `.IOLoop.time`) |
|
|
|
|
|
|
|
If the wrapped `.Future` fails after it has timed out, the exception |
|
|
|
will be logged unless it is of a type contained in ``quiet_exceptions`` |
|
|
|
(which may be an exception type or a sequence of types). |
|
|
|
|
|
|
|
Currently only supports Futures, not other `YieldPoint` classes. |
|
|
|
|
|
|
|
.. versionadded:: 4.0 |
|
|
|
|
|
|
|
.. versionchanged:: 4.1 |
|
|
|
Added the ``quiet_exceptions`` argument and the logging of unhandled |
|
|
|
exceptions. |
|
|
|
""" |
|
|
|
# TODO: allow yield points in addition to futures? |
|
|
|
# Tricky to do with stack_context semantics. |
|
|
@ -528,9 +664,19 @@ def with_timeout(timeout, future, io_loop=None): |
|
|
|
chain_future(future, result) |
|
|
|
if io_loop is None: |
|
|
|
io_loop = IOLoop.current() |
|
|
|
def error_callback(future): |
|
|
|
try: |
|
|
|
future.result() |
|
|
|
except Exception as e: |
|
|
|
if not isinstance(e, quiet_exceptions): |
|
|
|
app_log.error("Exception in Future %r after timeout", |
|
|
|
future, exc_info=True) |
|
|
|
def timeout_callback(): |
|
|
|
result.set_exception(TimeoutError("Timeout")) |
|
|
|
# In case the wrapped future goes on to fail, log it. |
|
|
|
future.add_done_callback(error_callback) |
|
|
|
timeout_handle = io_loop.add_timeout( |
|
|
|
timeout, |
|
|
|
lambda: result.set_exception(TimeoutError("Timeout"))) |
|
|
|
timeout, timeout_callback) |
|
|
|
if isinstance(future, Future): |
|
|
|
# We know this future will resolve on the IOLoop, so we don't |
|
|
|
# need the extra thread-safety of IOLoop.add_future (and we also |
|
|
@ -545,6 +691,25 @@ def with_timeout(timeout, future, io_loop=None): |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def sleep(duration): |
|
|
|
"""Return a `.Future` that resolves after the given number of seconds. |
|
|
|
|
|
|
|
When used with ``yield`` in a coroutine, this is a non-blocking |
|
|
|
analogue to `time.sleep` (which should not be used in coroutines |
|
|
|
because it is blocking):: |
|
|
|
|
|
|
|
yield gen.sleep(0.5) |
|
|
|
|
|
|
|
Note that calling this function on its own does nothing; you must |
|
|
|
wait on the `.Future` it returns (usually by yielding it). |
|
|
|
|
|
|
|
.. versionadded:: 4.1 |
|
|
|
""" |
|
|
|
f = Future() |
|
|
|
IOLoop.current().call_later(duration, lambda: f.set_result(None)) |
|
|
|
return f |
|
|
|
|
|
|
|
|
|
|
|
_null_future = Future() |
|
|
|
_null_future.set_result(None) |
|
|
|
|
|
|
@ -678,18 +843,18 @@ class Runner(object): |
|
|
|
self.running = False |
|
|
|
|
|
|
|
def handle_yield(self, yielded): |
|
|
|
if isinstance(yielded, list): |
|
|
|
if all(is_future(f) for f in yielded): |
|
|
|
yielded = multi_future(yielded) |
|
|
|
else: |
|
|
|
yielded = Multi(yielded) |
|
|
|
elif isinstance(yielded, dict): |
|
|
|
if all(is_future(f) for f in yielded.values()): |
|
|
|
yielded = multi_future(yielded) |
|
|
|
else: |
|
|
|
yielded = Multi(yielded) |
|
|
|
# Lists containing YieldPoints require stack contexts; |
|
|
|
# other lists are handled via multi_future in convert_yielded. |
|
|
|
if (isinstance(yielded, list) and |
|
|
|
any(isinstance(f, YieldPoint) for f in yielded)): |
|
|
|
yielded = Multi(yielded) |
|
|
|
elif (isinstance(yielded, dict) and |
|
|
|
any(isinstance(f, YieldPoint) for f in yielded.values())): |
|
|
|
yielded = Multi(yielded) |
|
|
|
|
|
|
|
if isinstance(yielded, YieldPoint): |
|
|
|
# YieldPoints are too closely coupled to the Runner to go |
|
|
|
# through the generic convert_yielded mechanism. |
|
|
|
self.future = TracebackFuture() |
|
|
|
def start_yield_point(): |
|
|
|
try: |
|
|
@ -702,6 +867,7 @@ class Runner(object): |
|
|
|
except Exception: |
|
|
|
self.future = TracebackFuture() |
|
|
|
self.future.set_exc_info(sys.exc_info()) |
|
|
|
|
|
|
|
if self.stack_context_deactivate is None: |
|
|
|
# Start a stack context if this is the first |
|
|
|
# YieldPoint we've seen. |
|
|
@ -715,16 +881,17 @@ class Runner(object): |
|
|
|
return False |
|
|
|
else: |
|
|
|
start_yield_point() |
|
|
|
elif is_future(yielded): |
|
|
|
self.future = yielded |
|
|
|
if not self.future.done() or self.future is moment: |
|
|
|
self.io_loop.add_future( |
|
|
|
self.future, lambda f: self.run()) |
|
|
|
return False |
|
|
|
else: |
|
|
|
self.future = TracebackFuture() |
|
|
|
self.future.set_exception(BadYieldError( |
|
|
|
"yielded unknown object %r" % (yielded,))) |
|
|
|
try: |
|
|
|
self.future = convert_yielded(yielded) |
|
|
|
except BadYieldError: |
|
|
|
self.future = TracebackFuture() |
|
|
|
self.future.set_exc_info(sys.exc_info()) |
|
|
|
|
|
|
|
if not self.future.done() or self.future is moment: |
|
|
|
self.io_loop.add_future( |
|
|
|
self.future, lambda f: self.run()) |
|
|
|
return False |
|
|
|
return True |
|
|
|
|
|
|
|
def result_callback(self, key): |
|
|
@ -763,3 +930,30 @@ def _argument_adapter(callback): |
|
|
|
else: |
|
|
|
callback(None) |
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
def convert_yielded(yielded): |
|
|
|
"""Convert a yielded object into a `.Future`. |
|
|
|
|
|
|
|
The default implementation accepts lists, dictionaries, and Futures. |
|
|
|
|
|
|
|
If the `~functools.singledispatch` library is available, this function |
|
|
|
may be extended to support additional types. For example:: |
|
|
|
|
|
|
|
@convert_yielded.register(asyncio.Future) |
|
|
|
def _(asyncio_future): |
|
|
|
return tornado.platform.asyncio.to_tornado_future(asyncio_future) |
|
|
|
|
|
|
|
.. versionadded:: 4.1 |
|
|
|
""" |
|
|
|
# Lists and dicts containing YieldPoints were handled separately |
|
|
|
# via Multi(). |
|
|
|
if isinstance(yielded, (list, dict)): |
|
|
|
return multi_future(yielded) |
|
|
|
elif is_future(yielded): |
|
|
|
return yielded |
|
|
|
else: |
|
|
|
raise BadYieldError("yielded unknown object %r" % (yielded,)) |
|
|
|
|
|
|
|
if singledispatch is not None: |
|
|
|
convert_yielded = singledispatch(convert_yielded) |
|
|
|