35 changed files with 3591 additions and 4891 deletions
File diff suppressed because it is too large
@ -0,0 +1,624 @@ |
|||
#!/usr/bin/env python |
|||
# |
|||
# Copyright 2014 Facebook |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
|||
# not use this file except in compliance with the License. You may obtain |
|||
# a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|||
# License for the specific language governing permissions and limitations |
|||
# under the License. |
|||
|
|||
"""Client and server implementations of HTTP/1.x. |
|||
|
|||
.. versionadded:: 3.3 |
|||
""" |
|||
|
|||
from __future__ import absolute_import, division, print_function, with_statement |
|||
|
|||
from tornado.concurrent import Future |
|||
from tornado.escape import native_str, utf8 |
|||
from tornado import gen |
|||
from tornado import httputil |
|||
from tornado import iostream |
|||
from tornado.log import gen_log, app_log |
|||
from tornado import stack_context |
|||
from tornado.util import GzipDecompressor |
|||
|
|||
|
|||
class HTTP1ConnectionParameters(object): |
|||
"""Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`. |
|||
""" |
|||
def __init__(self, no_keep_alive=False, chunk_size=None, |
|||
max_header_size=None, header_timeout=None, max_body_size=None, |
|||
body_timeout=None, use_gzip=False): |
|||
""" |
|||
:arg bool no_keep_alive: If true, always close the connection after |
|||
one request. |
|||
:arg int chunk_size: how much data to read into memory at once |
|||
:arg int max_header_size: maximum amount of data for HTTP headers |
|||
:arg float header_timeout: how long to wait for all headers (seconds) |
|||
:arg int max_body_size: maximum amount of data for body |
|||
:arg float body_timeout: how long to wait while reading body (seconds) |
|||
:arg bool use_gzip: if true, decode incoming ``Content-Encoding: gzip`` |
|||
""" |
|||
self.no_keep_alive = no_keep_alive |
|||
self.chunk_size = chunk_size or 65536 |
|||
self.max_header_size = max_header_size or 65536 |
|||
self.header_timeout = header_timeout |
|||
self.max_body_size = max_body_size |
|||
self.body_timeout = body_timeout |
|||
self.use_gzip = use_gzip |
|||
|
|||
|
|||
class HTTP1Connection(httputil.HTTPConnection): |
|||
"""Implements the HTTP/1.x protocol. |
|||
|
|||
This class can be on its own for clients, or via `HTTP1ServerConnection` |
|||
for servers. |
|||
""" |
|||
def __init__(self, stream, is_client, params=None, context=None): |
|||
""" |
|||
:arg stream: an `.IOStream` |
|||
:arg bool is_client: client or server |
|||
:arg params: a `.HTTP1ConnectionParameters` instance or ``None`` |
|||
:arg context: an opaque application-defined object that can be accessed |
|||
as ``connection.context``. |
|||
""" |
|||
self.is_client = is_client |
|||
self.stream = stream |
|||
if params is None: |
|||
params = HTTP1ConnectionParameters() |
|||
self.params = params |
|||
self.context = context |
|||
self.no_keep_alive = params.no_keep_alive |
|||
# The body limits can be altered by the delegate, so save them |
|||
# here instead of just referencing self.params later. |
|||
self._max_body_size = (self.params.max_body_size or |
|||
self.stream.max_buffer_size) |
|||
self._body_timeout = self.params.body_timeout |
|||
# _write_finished is set to True when finish() has been called, |
|||
# i.e. there will be no more data sent. Data may still be in the |
|||
# stream's write buffer. |
|||
self._write_finished = False |
|||
# True when we have read the entire incoming body. |
|||
self._read_finished = False |
|||
# _finish_future resolves when all data has been written and flushed |
|||
# to the IOStream. |
|||
self._finish_future = Future() |
|||
# If true, the connection should be closed after this request |
|||
# (after the response has been written in the server side, |
|||
# and after it has been read in the client) |
|||
self._disconnect_on_finish = False |
|||
self._clear_callbacks() |
|||
# Save the start lines after we read or write them; they |
|||
# affect later processing (e.g. 304 responses and HEAD methods |
|||
# have content-length but no bodies) |
|||
self._request_start_line = None |
|||
self._response_start_line = None |
|||
self._request_headers = None |
|||
# True if we are writing output with chunked encoding. |
|||
self._chunking_output = None |
|||
# While reading a body with a content-length, this is the |
|||
# amount left to read. |
|||
self._expected_content_remaining = None |
|||
# A Future for our outgoing writes, returned by IOStream.write. |
|||
self._pending_write = None |
|||
|
|||
def read_response(self, delegate): |
|||
"""Read a single HTTP response. |
|||
|
|||
Typical client-mode usage is to write a request using `write_headers`, |
|||
`write`, and `finish`, and then call ``read_response``. |
|||
|
|||
:arg delegate: a `.HTTPMessageDelegate` |
|||
|
|||
Returns a `.Future` that resolves to None after the full response has |
|||
been read. |
|||
""" |
|||
if self.params.use_gzip: |
|||
delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) |
|||
return self._read_message(delegate) |
|||
|
|||
@gen.coroutine |
|||
def _read_message(self, delegate): |
|||
need_delegate_close = False |
|||
try: |
|||
header_future = self.stream.read_until_regex( |
|||
b"\r?\n\r?\n", |
|||
max_bytes=self.params.max_header_size) |
|||
if self.params.header_timeout is None: |
|||
header_data = yield header_future |
|||
else: |
|||
try: |
|||
header_data = yield gen.with_timeout( |
|||
self.stream.io_loop.time() + self.params.header_timeout, |
|||
header_future, |
|||
io_loop=self.stream.io_loop) |
|||
except gen.TimeoutError: |
|||
self.close() |
|||
raise gen.Return(False) |
|||
start_line, headers = self._parse_headers(header_data) |
|||
if self.is_client: |
|||
start_line = httputil.parse_response_start_line(start_line) |
|||
self._response_start_line = start_line |
|||
else: |
|||
start_line = httputil.parse_request_start_line(start_line) |
|||
self._request_start_line = start_line |
|||
self._request_headers = headers |
|||
|
|||
self._disconnect_on_finish = not self._can_keep_alive( |
|||
start_line, headers) |
|||
need_delegate_close = True |
|||
header_future = delegate.headers_received(start_line, headers) |
|||
if header_future is not None: |
|||
yield header_future |
|||
if self.stream is None: |
|||
# We've been detached. |
|||
need_delegate_close = False |
|||
raise gen.Return(False) |
|||
skip_body = False |
|||
if self.is_client: |
|||
if (self._request_start_line is not None and |
|||
self._request_start_line.method == 'HEAD'): |
|||
skip_body = True |
|||
code = start_line.code |
|||
if code == 304: |
|||
skip_body = True |
|||
if code >= 100 and code < 200: |
|||
# TODO: client delegates will get headers_received twice |
|||
# in the case of a 100-continue. Document or change? |
|||
yield self._read_message(delegate) |
|||
else: |
|||
if (headers.get("Expect") == "100-continue" and |
|||
not self._write_finished): |
|||
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") |
|||
if not skip_body: |
|||
body_future = self._read_body(headers, delegate) |
|||
if body_future is not None: |
|||
if self._body_timeout is None: |
|||
yield body_future |
|||
else: |
|||
try: |
|||
yield gen.with_timeout( |
|||
self.stream.io_loop.time() + self._body_timeout, |
|||
body_future, self.stream.io_loop) |
|||
except gen.TimeoutError: |
|||
gen_log.info("Timeout reading body from %s", |
|||
self.context) |
|||
self.stream.close() |
|||
raise gen.Return(False) |
|||
self._read_finished = True |
|||
if not self._write_finished or self.is_client: |
|||
need_delegate_close = False |
|||
delegate.finish() |
|||
# If we're waiting for the application to produce an asynchronous |
|||
# response, and we're not detached, register a close callback |
|||
# on the stream (we didn't need one while we were reading) |
|||
if (not self._finish_future.done() and |
|||
self.stream is not None and |
|||
not self.stream.closed()): |
|||
self.stream.set_close_callback(self._on_connection_close) |
|||
yield self._finish_future |
|||
if self.is_client and self._disconnect_on_finish: |
|||
self.close() |
|||
if self.stream is None: |
|||
raise gen.Return(False) |
|||
except httputil.HTTPInputException as e: |
|||
gen_log.info("Malformed HTTP message from %s: %s", |
|||
self.context, e) |
|||
self.close() |
|||
raise gen.Return(False) |
|||
finally: |
|||
if need_delegate_close: |
|||
delegate.on_connection_close() |
|||
self._clear_callbacks() |
|||
raise gen.Return(True) |
|||
|
|||
def _clear_callbacks(self): |
|||
"""Clears the callback attributes. |
|||
|
|||
This allows the request handler to be garbage collected more |
|||
quickly in CPython by breaking up reference cycles. |
|||
""" |
|||
self._write_callback = None |
|||
self._write_future = None |
|||
self._close_callback = None |
|||
if self.stream is not None: |
|||
self.stream.set_close_callback(None) |
|||
|
|||
def set_close_callback(self, callback): |
|||
"""Sets a callback that will be run when the connection is closed. |
|||
|
|||
.. deprecated:: 3.3 |
|||
Use `.HTTPMessageDelegate.on_connection_close` instead. |
|||
""" |
|||
self._close_callback = stack_context.wrap(callback) |
|||
|
|||
def _on_connection_close(self): |
|||
# Note that this callback is only registered on the IOStream |
|||
# when we have finished reading the request and are waiting for |
|||
# the application to produce its response. |
|||
if self._close_callback is not None: |
|||
callback = self._close_callback |
|||
self._close_callback = None |
|||
callback() |
|||
if not self._finish_future.done(): |
|||
self._finish_future.set_result(None) |
|||
self._clear_callbacks() |
|||
|
|||
def close(self): |
|||
if self.stream is not None: |
|||
self.stream.close() |
|||
self._clear_callbacks() |
|||
if not self._finish_future.done(): |
|||
self._finish_future.set_result(None) |
|||
|
|||
def detach(self): |
|||
"""Take control of the underlying stream. |
|||
|
|||
Returns the underlying `.IOStream` object and stops all further |
|||
HTTP processing. May only be called during |
|||
`.HTTPMessageDelegate.headers_received`. Intended for implementing |
|||
protocols like websockets that tunnel over an HTTP handshake. |
|||
""" |
|||
self._clear_callbacks() |
|||
stream = self.stream |
|||
self.stream = None |
|||
return stream |
|||
|
|||
def set_body_timeout(self, timeout): |
|||
"""Sets the body timeout for a single request. |
|||
|
|||
Overrides the value from `.HTTP1ConnectionParameters`. |
|||
""" |
|||
self._body_timeout = timeout |
|||
|
|||
def set_max_body_size(self, max_body_size): |
|||
"""Sets the body size limit for a single request. |
|||
|
|||
Overrides the value from `.HTTP1ConnectionParameters`. |
|||
""" |
|||
self._max_body_size = max_body_size |
|||
|
|||
def write_headers(self, start_line, headers, chunk=None, callback=None): |
|||
"""Implements `.HTTPConnection.write_headers`.""" |
|||
if self.is_client: |
|||
self._request_start_line = start_line |
|||
# Client requests with a non-empty body must have either a |
|||
# Content-Length or a Transfer-Encoding. |
|||
self._chunking_output = ( |
|||
start_line.method in ('POST', 'PUT', 'PATCH') and |
|||
'Content-Length' not in headers and |
|||
'Transfer-Encoding' not in headers) |
|||
else: |
|||
self._response_start_line = start_line |
|||
self._chunking_output = ( |
|||
# TODO: should this use |
|||
# self._request_start_line.version or |
|||
# start_line.version? |
|||
self._request_start_line.version == 'HTTP/1.1' and |
|||
# 304 responses have no body (not even a zero-length body), and so |
|||
# should not have either Content-Length or Transfer-Encoding. |
|||
# headers. |
|||
start_line.code != 304 and |
|||
# No need to chunk the output if a Content-Length is specified. |
|||
'Content-Length' not in headers and |
|||
# Applications are discouraged from touching Transfer-Encoding, |
|||
# but if they do, leave it alone. |
|||
'Transfer-Encoding' not in headers) |
|||
# If a 1.0 client asked for keep-alive, add the header. |
|||
if (self._request_start_line.version == 'HTTP/1.0' and |
|||
(self._request_headers.get('Connection', '').lower() |
|||
== 'keep-alive')): |
|||
headers['Connection'] = 'Keep-Alive' |
|||
if self._chunking_output: |
|||
headers['Transfer-Encoding'] = 'chunked' |
|||
if (not self.is_client and |
|||
(self._request_start_line.method == 'HEAD' or |
|||
start_line.code == 304)): |
|||
self._expected_content_remaining = 0 |
|||
elif 'Content-Length' in headers: |
|||
self._expected_content_remaining = int(headers['Content-Length']) |
|||
else: |
|||
self._expected_content_remaining = None |
|||
lines = [utf8("%s %s %s" % start_line)] |
|||
lines.extend([utf8(n) + b": " + utf8(v) for n, v in headers.get_all()]) |
|||
for line in lines: |
|||
if b'\n' in line: |
|||
raise ValueError('Newline in header: ' + repr(line)) |
|||
future = None |
|||
if self.stream.closed(): |
|||
future = self._write_future = Future() |
|||
future.set_exception(iostream.StreamClosedError()) |
|||
else: |
|||
if callback is not None: |
|||
self._write_callback = stack_context.wrap(callback) |
|||
else: |
|||
future = self._write_future = Future() |
|||
data = b"\r\n".join(lines) + b"\r\n\r\n" |
|||
if chunk: |
|||
data += self._format_chunk(chunk) |
|||
self._pending_write = self.stream.write(data) |
|||
self._pending_write.add_done_callback(self._on_write_complete) |
|||
return future |
|||
|
|||
def _format_chunk(self, chunk): |
|||
if self._expected_content_remaining is not None: |
|||
self._expected_content_remaining -= len(chunk) |
|||
if self._expected_content_remaining < 0: |
|||
# Close the stream now to stop further framing errors. |
|||
self.stream.close() |
|||
raise httputil.HTTPOutputException( |
|||
"Tried to write more data than Content-Length") |
|||
if self._chunking_output and chunk: |
|||
# Don't write out empty chunks because that means END-OF-STREAM |
|||
# with chunked encoding |
|||
return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" |
|||
else: |
|||
return chunk |
|||
|
|||
def write(self, chunk, callback=None): |
|||
"""Implements `.HTTPConnection.write`. |
|||
|
|||
For backwards compatibility is is allowed but deprecated to |
|||
skip `write_headers` and instead call `write()` with a |
|||
pre-encoded header block. |
|||
""" |
|||
future = None |
|||
if self.stream.closed(): |
|||
future = self._write_future = Future() |
|||
self._write_future.set_exception(iostream.StreamClosedError()) |
|||
else: |
|||
if callback is not None: |
|||
self._write_callback = stack_context.wrap(callback) |
|||
else: |
|||
future = self._write_future = Future() |
|||
self._pending_write = self.stream.write(self._format_chunk(chunk)) |
|||
self._pending_write.add_done_callback(self._on_write_complete) |
|||
return future |
|||
|
|||
def finish(self): |
|||
"""Implements `.HTTPConnection.finish`.""" |
|||
if (self._expected_content_remaining is not None and |
|||
self._expected_content_remaining != 0 and |
|||
not self.stream.closed()): |
|||
self.stream.close() |
|||
raise httputil.HTTPOutputException( |
|||
"Tried to write %d bytes less than Content-Length" % |
|||
self._expected_content_remaining) |
|||
if self._chunking_output: |
|||
if not self.stream.closed(): |
|||
self._pending_write = self.stream.write(b"0\r\n\r\n") |
|||
self._pending_write.add_done_callback(self._on_write_complete) |
|||
self._write_finished = True |
|||
# If the app finished the request while we're still reading, |
|||
# divert any remaining data away from the delegate and |
|||
# close the connection when we're done sending our response. |
|||
# Closing the connection is the only way to avoid reading the |
|||
# whole input body. |
|||
if not self._read_finished: |
|||
self._disconnect_on_finish = True |
|||
# No more data is coming, so instruct TCP to send any remaining |
|||
# data immediately instead of waiting for a full packet or ack. |
|||
self.stream.set_nodelay(True) |
|||
if self._pending_write is None: |
|||
self._finish_request(None) |
|||
else: |
|||
self._pending_write.add_done_callback(self._finish_request) |
|||
|
|||
def _on_write_complete(self, future): |
|||
if self._write_callback is not None: |
|||
callback = self._write_callback |
|||
self._write_callback = None |
|||
self.stream.io_loop.add_callback(callback) |
|||
if self._write_future is not None: |
|||
future = self._write_future |
|||
self._write_future = None |
|||
future.set_result(None) |
|||
|
|||
def _can_keep_alive(self, start_line, headers): |
|||
if self.params.no_keep_alive: |
|||
return False |
|||
connection_header = headers.get("Connection") |
|||
if connection_header is not None: |
|||
connection_header = connection_header.lower() |
|||
if start_line.version == "HTTP/1.1": |
|||
return connection_header != "close" |
|||
elif ("Content-Length" in headers |
|||
or start_line.method in ("HEAD", "GET")): |
|||
return connection_header == "keep-alive" |
|||
return False |
|||
|
|||
def _finish_request(self, future): |
|||
self._clear_callbacks() |
|||
if not self.is_client and self._disconnect_on_finish: |
|||
self.close() |
|||
return |
|||
# Turn Nagle's algorithm back on, leaving the stream in its |
|||
# default state for the next request. |
|||
self.stream.set_nodelay(False) |
|||
if not self._finish_future.done(): |
|||
self._finish_future.set_result(None) |
|||
|
|||
def _parse_headers(self, data): |
|||
data = native_str(data.decode('latin1')) |
|||
eol = data.find("\r\n") |
|||
start_line = data[:eol] |
|||
try: |
|||
headers = httputil.HTTPHeaders.parse(data[eol:]) |
|||
except ValueError: |
|||
# probably form split() if there was no ':' in the line |
|||
raise httputil.HTTPInputException("Malformed HTTP headers: %r" % |
|||
data[eol:100]) |
|||
return start_line, headers |
|||
|
|||
def _read_body(self, headers, delegate): |
|||
content_length = headers.get("Content-Length") |
|||
if content_length: |
|||
content_length = int(content_length) |
|||
if content_length > self._max_body_size: |
|||
raise httputil.HTTPInputException("Content-Length too long") |
|||
return self._read_fixed_body(content_length, delegate) |
|||
if headers.get("Transfer-Encoding") == "chunked": |
|||
return self._read_chunked_body(delegate) |
|||
if self.is_client: |
|||
return self._read_body_until_close(delegate) |
|||
return None |
|||
|
|||
@gen.coroutine |
|||
def _read_fixed_body(self, content_length, delegate): |
|||
while content_length > 0: |
|||
body = yield self.stream.read_bytes( |
|||
min(self.params.chunk_size, content_length), partial=True) |
|||
content_length -= len(body) |
|||
if not self._write_finished or self.is_client: |
|||
yield gen.maybe_future(delegate.data_received(body)) |
|||
|
|||
@gen.coroutine |
|||
def _read_chunked_body(self, delegate): |
|||
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 |
|||
total_size = 0 |
|||
while True: |
|||
chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64) |
|||
chunk_len = int(chunk_len.strip(), 16) |
|||
if chunk_len == 0: |
|||
return |
|||
total_size += chunk_len |
|||
if total_size > self._max_body_size: |
|||
raise httputil.HTTPInputException("chunked body too large") |
|||
bytes_to_read = chunk_len |
|||
while bytes_to_read: |
|||
chunk = yield self.stream.read_bytes( |
|||
min(bytes_to_read, self.params.chunk_size), partial=True) |
|||
bytes_to_read -= len(chunk) |
|||
if not self._write_finished or self.is_client: |
|||
yield gen.maybe_future( |
|||
delegate.data_received(chunk)) |
|||
# chunk ends with \r\n |
|||
crlf = yield self.stream.read_bytes(2) |
|||
assert crlf == b"\r\n" |
|||
|
|||
@gen.coroutine |
|||
def _read_body_until_close(self, delegate): |
|||
body = yield self.stream.read_until_close() |
|||
if not self._write_finished or self.is_client: |
|||
delegate.data_received(body) |
|||
|
|||
|
|||
class _GzipMessageDelegate(httputil.HTTPMessageDelegate): |
|||
"""Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``. |
|||
""" |
|||
def __init__(self, delegate, chunk_size): |
|||
self._delegate = delegate |
|||
self._chunk_size = chunk_size |
|||
self._decompressor = None |
|||
|
|||
def headers_received(self, start_line, headers): |
|||
if headers.get("Content-Encoding") == "gzip": |
|||
self._decompressor = GzipDecompressor() |
|||
# Downstream delegates will only see uncompressed data, |
|||
# so rename the content-encoding header. |
|||
# (but note that curl_httpclient doesn't do this). |
|||
headers.add("X-Consumed-Content-Encoding", |
|||
headers["Content-Encoding"]) |
|||
del headers["Content-Encoding"] |
|||
return self._delegate.headers_received(start_line, headers) |
|||
|
|||
@gen.coroutine |
|||
def data_received(self, chunk): |
|||
if self._decompressor: |
|||
compressed_data = chunk |
|||
while compressed_data: |
|||
decompressed = self._decompressor.decompress( |
|||
compressed_data, self._chunk_size) |
|||
if decompressed: |
|||
yield gen.maybe_future( |
|||
self._delegate.data_received(decompressed)) |
|||
compressed_data = self._decompressor.unconsumed_tail |
|||
else: |
|||
yield gen.maybe_future(self._delegate.data_received(chunk)) |
|||
|
|||
def finish(self): |
|||
if self._decompressor is not None: |
|||
tail = self._decompressor.flush() |
|||
if tail: |
|||
# I believe the tail will always be empty (i.e. |
|||
# decompress will return all it can). The purpose |
|||
# of the flush call is to detect errors such |
|||
# as truncated input. But in case it ever returns |
|||
# anything, treat it as an extra chunk |
|||
self._delegate.data_received(tail) |
|||
return self._delegate.finish() |
|||
|
|||
|
|||
class HTTP1ServerConnection(object): |
|||
"""An HTTP/1.x server.""" |
|||
def __init__(self, stream, params=None, context=None): |
|||
""" |
|||
:arg stream: an `.IOStream` |
|||
:arg params: a `.HTTP1ConnectionParameters` or None |
|||
:arg context: an opaque application-defined object that is accessible |
|||
as ``connection.context`` |
|||
""" |
|||
self.stream = stream |
|||
if params is None: |
|||
params = HTTP1ConnectionParameters() |
|||
self.params = params |
|||
self.context = context |
|||
self._serving_future = None |
|||
|
|||
@gen.coroutine |
|||
def close(self): |
|||
"""Closes the connection. |
|||
|
|||
Returns a `.Future` that resolves after the serving loop has exited. |
|||
""" |
|||
self.stream.close() |
|||
# Block until the serving loop is done, but ignore any exceptions |
|||
# (start_serving is already responsible for logging them). |
|||
try: |
|||
yield self._serving_future |
|||
except Exception: |
|||
pass |
|||
|
|||
def start_serving(self, delegate): |
|||
"""Starts serving requests on this connection. |
|||
|
|||
:arg delegate: a `.HTTPServerConnectionDelegate` |
|||
""" |
|||
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) |
|||
self._serving_future = self._server_request_loop(delegate) |
|||
# Register the future on the IOLoop so its errors get logged. |
|||
self.stream.io_loop.add_future(self._serving_future, |
|||
lambda f: f.result()) |
|||
|
|||
@gen.coroutine |
|||
def _server_request_loop(self, delegate): |
|||
try: |
|||
while True: |
|||
conn = HTTP1Connection(self.stream, False, |
|||
self.params, self.context) |
|||
request_delegate = delegate.start_request(self, conn) |
|||
try: |
|||
ret = yield conn.read_response(request_delegate) |
|||
except (iostream.StreamClosedError, |
|||
iostream.UnsatisfiableReadError): |
|||
return |
|||
except Exception: |
|||
# TODO: this is probably too broad; it would be better to |
|||
# wrap all delegate calls in something that writes to app_log, |
|||
# and then errors that reach this point can be gen_log. |
|||
app_log.error("Uncaught exception", exc_info=True) |
|||
conn.close() |
|||
return |
|||
if not ret: |
|||
return |
|||
yield gen.moment |
|||
finally: |
|||
delegate.on_close(self) |
@ -0,0 +1,179 @@ |
|||
#!/usr/bin/env python |
|||
# |
|||
# Copyright 2014 Facebook |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
|||
# not use this file except in compliance with the License. You may obtain |
|||
# a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|||
# License for the specific language governing permissions and limitations |
|||
# under the License. |
|||
|
|||
"""A non-blocking TCP connection factory. |
|||
""" |
|||
from __future__ import absolute_import, division, print_function, with_statement |
|||
|
|||
import functools |
|||
import socket |
|||
|
|||
from tornado.concurrent import Future |
|||
from tornado.ioloop import IOLoop |
|||
from tornado.iostream import IOStream |
|||
from tornado import gen |
|||
from tornado.netutil import Resolver |
|||
|
|||
_INITIAL_CONNECT_TIMEOUT = 0.3 |
|||
|
|||
|
|||
class _Connector(object): |
|||
"""A stateless implementation of the "Happy Eyeballs" algorithm. |
|||
|
|||
"Happy Eyeballs" is documented in RFC6555 as the recommended practice |
|||
for when both IPv4 and IPv6 addresses are available. |
|||
|
|||
In this implementation, we partition the addresses by family, and |
|||
make the first connection attempt to whichever address was |
|||
returned first by ``getaddrinfo``. If that connection fails or |
|||
times out, we begin a connection in parallel to the first address |
|||
of the other family. If there are additional failures we retry |
|||
with other addresses, keeping one connection attempt per family |
|||
in flight at a time. |
|||
|
|||
http://tools.ietf.org/html/rfc6555 |
|||
|
|||
""" |
|||
def __init__(self, addrinfo, io_loop, connect): |
|||
self.io_loop = io_loop |
|||
self.connect = connect |
|||
|
|||
self.future = Future() |
|||
self.timeout = None |
|||
self.last_error = None |
|||
self.remaining = len(addrinfo) |
|||
self.primary_addrs, self.secondary_addrs = self.split(addrinfo) |
|||
|
|||
@staticmethod |
|||
def split(addrinfo): |
|||
"""Partition the ``addrinfo`` list by address family. |
|||
|
|||
Returns two lists. The first list contains the first entry from |
|||
``addrinfo`` and all others with the same family, and the |
|||
second list contains all other addresses (normally one list will |
|||
be AF_INET and the other AF_INET6, although non-standard resolvers |
|||
may return additional families). |
|||
""" |
|||
primary = [] |
|||
secondary = [] |
|||
primary_af = addrinfo[0][0] |
|||
for af, addr in addrinfo: |
|||
if af == primary_af: |
|||
primary.append((af, addr)) |
|||
else: |
|||
secondary.append((af, addr)) |
|||
return primary, secondary |
|||
|
|||
def start(self, timeout=_INITIAL_CONNECT_TIMEOUT): |
|||
self.try_connect(iter(self.primary_addrs)) |
|||
self.set_timout(timeout) |
|||
return self.future |
|||
|
|||
def try_connect(self, addrs): |
|||
try: |
|||
af, addr = next(addrs) |
|||
except StopIteration: |
|||
# We've reached the end of our queue, but the other queue |
|||
# might still be working. Send a final error on the future |
|||
# only when both queues are finished. |
|||
if self.remaining == 0 and not self.future.done(): |
|||
self.future.set_exception(self.last_error or |
|||
IOError("connection failed")) |
|||
return |
|||
future = self.connect(af, addr) |
|||
future.add_done_callback(functools.partial(self.on_connect_done, |
|||
addrs, af, addr)) |
|||
|
|||
def on_connect_done(self, addrs, af, addr, future): |
|||
self.remaining -= 1 |
|||
try: |
|||
stream = future.result() |
|||
except Exception as e: |
|||
if self.future.done(): |
|||
return |
|||
# Error: try again (but remember what happened so we have an |
|||
# error to raise in the end) |
|||
self.last_error = e |
|||
self.try_connect(addrs) |
|||
if self.timeout is not None: |
|||
# If the first attempt failed, don't wait for the |
|||
# timeout to try an address from the secondary queue. |
|||
self.on_timeout() |
|||
return |
|||
self.clear_timeout() |
|||
if self.future.done(): |
|||
# This is a late arrival; just drop it. |
|||
stream.close() |
|||
else: |
|||
self.future.set_result((af, addr, stream)) |
|||
|
|||
def set_timout(self, timeout): |
|||
self.timeout = self.io_loop.add_timeout(self.io_loop.time() + timeout, |
|||
self.on_timeout) |
|||
|
|||
def on_timeout(self): |
|||
self.timeout = None |
|||
self.try_connect(iter(self.secondary_addrs)) |
|||
|
|||
def clear_timeout(self): |
|||
if self.timeout is not None: |
|||
self.io_loop.remove_timeout(self.timeout) |
|||
|
|||
|
|||
class TCPClient(object): |
|||
"""A non-blocking TCP connection factory. |
|||
""" |
|||
def __init__(self, resolver=None, io_loop=None): |
|||
self.io_loop = io_loop or IOLoop.current() |
|||
if resolver is not None: |
|||
self.resolver = resolver |
|||
self._own_resolver = False |
|||
else: |
|||
self.resolver = Resolver(io_loop=io_loop) |
|||
self._own_resolver = True |
|||
|
|||
def close(self): |
|||
if self._own_resolver: |
|||
self.resolver.close() |
|||
|
|||
@gen.coroutine |
|||
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None, |
|||
max_buffer_size=None): |
|||
"""Connect to the given host and port. |
|||
|
|||
Asynchronously returns an `.IOStream` (or `.SSLIOStream` if |
|||
``ssl_options`` is not None). |
|||
""" |
|||
addrinfo = yield self.resolver.resolve(host, port, af) |
|||
connector = _Connector( |
|||
addrinfo, self.io_loop, |
|||
functools.partial(self._create_stream, max_buffer_size)) |
|||
af, addr, stream = yield connector.start() |
|||
# TODO: For better performance we could cache the (af, addr) |
|||
# information here and re-use it on sbusequent connections to |
|||
# the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) |
|||
if ssl_options is not None: |
|||
stream = yield stream.start_tls(False, ssl_options=ssl_options, |
|||
server_hostname=host) |
|||
raise gen.Return(stream) |
|||
|
|||
def _create_stream(self, max_buffer_size, af, addr): |
|||
# Always connect in plaintext; we'll convert to ssl if necessary |
|||
# after one connection has completed. |
|||
stream = IOStream(socket.socket(af), |
|||
io_loop=self.io_loop, |
|||
max_buffer_size=max_buffer_size) |
|||
return stream.connect(addr) |
Loading…
Reference in new issue