# # Copyright 2009 Facebook # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """A non-blocking, single-threaded HTTP server. Typical applications have little direct interaction with the `HTTPServer` class except to start a server at the beginning of the process (and even that is often done indirectly via `tornado.web.Application.listen`). .. versionchanged:: 4.0 The ``HTTPRequest`` class that used to live in this module has been moved to `tornado.httputil.HTTPServerRequest`. The old name remains as an alias. """ import socket import ssl from tornado_py3.escape import native_str from tornado_py3.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters from tornado_py3 import httputil from tornado_py3 import iostream from tornado_py3 import netutil from tornado_py3.tcpserver import TCPServer from tornado_py3.util import Configurable import typing from typing import Union, Any, Dict, Callable, List, Type, Tuple, Optional, Awaitable if typing.TYPE_CHECKING: from typing import Set # noqa: F401 class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate): r"""A non-blocking, single-threaded HTTP server. A server is defined by a subclass of `.HTTPServerConnectionDelegate`, or, for backwards compatibility, a callback that takes an `.HTTPServerRequest` as an argument. The delegate is usually a `tornado.web.Application`. `HTTPServer` supports keep-alive connections by default (automatically for HTTP/1.1, or for HTTP/1.0 when the client requests ``Connection: keep-alive``). If ``xheaders`` is ``True``, we support the ``X-Real-Ip``/``X-Forwarded-For`` and ``X-Scheme``/``X-Forwarded-Proto`` headers, which override the remote IP and URI scheme/protocol for all requests. These headers are useful when running Tornado behind a reverse proxy or load balancer. The ``protocol`` argument can also be set to ``https`` if Tornado is run behind an SSL-decoding proxy that does not set one of the supported ``xheaders``. By default, when parsing the ``X-Forwarded-For`` header, Tornado will select the last (i.e., the closest) address on the list of hosts as the remote host IP address. To select the next server in the chain, a list of trusted downstream hosts may be passed as the ``trusted_downstream`` argument. These hosts will be skipped when parsing the ``X-Forwarded-For`` header. To make this server serve SSL traffic, send the ``ssl_options`` keyword argument with an `ssl.SSLContext` object. For compatibility with older versions of Python ``ssl_options`` may also be a dictionary of keyword arguments for the `ssl.wrap_socket` method.:: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"), os.path.join(data_dir, "mydomain.key")) HTTPServer(application, ssl_options=ssl_ctx) `HTTPServer` initialization follows one of three patterns (the initialization methods are defined on `tornado.tcpserver.TCPServer`): 1. `~tornado.tcpserver.TCPServer.listen`: simple single-process:: server = HTTPServer(app) server.listen(8888) IOLoop.current().start() In many cases, `tornado.web.Application.listen` can be used to avoid the need to explicitly create the `HTTPServer`. 2. `~tornado.tcpserver.TCPServer.bind`/`~tornado.tcpserver.TCPServer.start`: simple multi-process:: server = HTTPServer(app) server.bind(8888) server.start(0) # Forks multiple sub-processes IOLoop.current().start() When using this interface, an `.IOLoop` must *not* be passed to the `HTTPServer` constructor. `~.TCPServer.start` will always start the server on the default singleton `.IOLoop`. 3. `~tornado.tcpserver.TCPServer.add_sockets`: advanced multi-process:: sockets = tornado.netutil.bind_sockets(8888) tornado.process.fork_processes(0) server = HTTPServer(app) server.add_sockets(sockets) IOLoop.current().start() The `~.TCPServer.add_sockets` interface is more complicated, but it can be used with `tornado.process.fork_processes` to give you more flexibility in when the fork happens. `~.TCPServer.add_sockets` can also be used in single-process servers if you want to create your listening sockets in some way other than `tornado.netutil.bind_sockets`. .. versionchanged:: 4.0 Added ``decompress_request``, ``chunk_size``, ``max_header_size``, ``idle_connection_timeout``, ``body_timeout``, ``max_body_size`` arguments. Added support for `.HTTPServerConnectionDelegate` instances as ``request_callback``. .. versionchanged:: 4.1 `.HTTPServerConnectionDelegate.start_request` is now called with two arguments ``(server_conn, request_conn)`` (in accordance with the documentation) instead of one ``(request_conn)``. .. versionchanged:: 4.2 `HTTPServer` is now a subclass of `tornado.util.Configurable`. .. versionchanged:: 4.5 Added the ``trusted_downstream`` argument. .. versionchanged:: 5.0 The ``io_loop`` argument has been removed. """ def __init__(self, *args: Any, **kwargs: Any) -> None: # Ignore args to __init__; real initialization belongs in # initialize since we're Configurable. (there's something # weird in initialization order between this class, # Configurable, and TCPServer so we can't leave __init__ out # completely) pass def initialize( self, request_callback: Union[ httputil.HTTPServerConnectionDelegate, Callable[[httputil.HTTPServerRequest], None], ], no_keep_alive: bool = False, xheaders: bool = False, ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, protocol: Optional[str] = None, decompress_request: bool = False, chunk_size: Optional[int] = None, max_header_size: Optional[int] = None, idle_connection_timeout: Optional[float] = None, body_timeout: Optional[float] = None, max_body_size: Optional[int] = None, max_buffer_size: Optional[int] = None, trusted_downstream: Optional[List[str]] = None, ) -> None: # This method's signature is not extracted with autodoc # because we want its arguments to appear on the class # constructor. When changing this signature, also update the # copy in httpserver.rst. self.request_callback = request_callback self.xheaders = xheaders self.protocol = protocol self.conn_params = HTTP1ConnectionParameters( decompress=decompress_request, chunk_size=chunk_size, max_header_size=max_header_size, header_timeout=idle_connection_timeout or 3600, max_body_size=max_body_size, body_timeout=body_timeout, no_keep_alive=no_keep_alive, ) TCPServer.__init__( self, ssl_options=ssl_options, max_buffer_size=max_buffer_size, read_chunk_size=chunk_size, ) self._connections = set() # type: Set[HTTP1ServerConnection] self.trusted_downstream = trusted_downstream @classmethod def configurable_base(cls) -> Type[Configurable]: return HTTPServer @classmethod def configurable_default(cls) -> Type[Configurable]: return HTTPServer async def close_all_connections(self) -> None: """Close all open connections and asynchronously wait for them to finish. This method is used in combination with `~.TCPServer.stop` to support clean shutdowns (especially for unittests). Typical usage would call ``stop()`` first to stop accepting new connections, then ``await close_all_connections()`` to wait for existing connections to finish. This method does not currently close open websocket connections. Note that this method is a coroutine and must be called with ``await``. """ while self._connections: # Peek at an arbitrary element of the set conn = next(iter(self._connections)) await conn.close() def handle_stream(self, stream: iostream.IOStream, address: Tuple) -> None: context = _HTTPRequestContext( stream, address, self.protocol, self.trusted_downstream ) conn = HTTP1ServerConnection(stream, self.conn_params, context) self._connections.add(conn) conn.start_serving(self) def start_request( self, server_conn: object, request_conn: httputil.HTTPConnection ) -> httputil.HTTPMessageDelegate: if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate): delegate = self.request_callback.start_request(server_conn, request_conn) else: delegate = _CallableAdapter(self.request_callback, request_conn) if self.xheaders: delegate = _ProxyAdapter(delegate, request_conn) return delegate def on_close(self, server_conn: object) -> None: self._connections.remove(typing.cast(HTTP1ServerConnection, server_conn)) class _CallableAdapter(httputil.HTTPMessageDelegate): def __init__( self, request_callback: Callable[[httputil.HTTPServerRequest], None], request_conn: httputil.HTTPConnection, ) -> None: self.connection = request_conn self.request_callback = request_callback self.request = None # type: Optional[httputil.HTTPServerRequest] self.delegate = None self._chunks = [] # type: List[bytes] def headers_received( self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders, ) -> Optional[Awaitable[None]]: self.request = httputil.HTTPServerRequest( connection=self.connection, start_line=typing.cast(httputil.RequestStartLine, start_line), headers=headers, ) return None def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: self._chunks.append(chunk) return None def finish(self) -> None: assert self.request is not None self.request.body = b"".join(self._chunks) self.request._parse_body() self.request_callback(self.request) def on_connection_close(self) -> None: del self._chunks class _HTTPRequestContext(object): def __init__( self, stream: iostream.IOStream, address: Tuple, protocol: Optional[str], trusted_downstream: Optional[List[str]] = None, ) -> None: self.address = address # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed # and its socket attribute replaced with None. if stream.socket is not None: self.address_family = stream.socket.family else: self.address_family = None # In HTTPServerRequest we want an IP, not a full socket address. if ( self.address_family in (socket.AF_INET, socket.AF_INET6) and address is not None ): self.remote_ip = address[0] else: # Unix (or other) socket; fake the remote address. self.remote_ip = "0.0.0.0" if protocol: self.protocol = protocol elif isinstance(stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" self._orig_remote_ip = self.remote_ip self._orig_protocol = self.protocol self.trusted_downstream = set(trusted_downstream or []) def __str__(self) -> str: if self.address_family in (socket.AF_INET, socket.AF_INET6): return self.remote_ip elif isinstance(self.address, bytes): # Python 3 with the -bb option warns about str(bytes), # so convert it explicitly. # Unix socket addresses are str on mac but bytes on linux. return native_str(self.address) else: return str(self.address) def _apply_xheaders(self, headers: httputil.HTTPHeaders) -> None: """Rewrite the ``remote_ip`` and ``protocol`` fields.""" # Squid uses X-Forwarded-For, others use X-Real-Ip ip = headers.get("X-Forwarded-For", self.remote_ip) # Skip trusted downstream hosts in X-Forwarded-For list for ip in (cand.strip() for cand in reversed(ip.split(","))): if ip not in self.trusted_downstream: break ip = headers.get("X-Real-Ip", ip) if netutil.is_valid_ip(ip): self.remote_ip = ip # AWS uses X-Forwarded-Proto proto_header = headers.get( "X-Scheme", headers.get("X-Forwarded-Proto", self.protocol) ) if proto_header: # use only the last proto entry if there is more than one # TODO: support trusting multiple layers of proxied protocol proto_header = proto_header.split(",")[-1].strip() if proto_header in ("http", "https"): self.protocol = proto_header def _unapply_xheaders(self) -> None: """Undo changes from `_apply_xheaders`. Xheaders are per-request so they should not leak to the next request on the same connection. """ self.remote_ip = self._orig_remote_ip self.protocol = self._orig_protocol class _ProxyAdapter(httputil.HTTPMessageDelegate): def __init__( self, delegate: httputil.HTTPMessageDelegate, request_conn: httputil.HTTPConnection, ) -> None: self.connection = request_conn self.delegate = delegate def headers_received( self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders, ) -> Optional[Awaitable[None]]: # TODO: either make context an official part of the # HTTPConnection interface or figure out some other way to do this. self.connection.context._apply_xheaders(headers) # type: ignore return self.delegate.headers_received(start_line, headers) def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: return self.delegate.data_received(chunk) def finish(self) -> None: self.delegate.finish() self._cleanup() def on_connection_close(self) -> None: self.delegate.on_connection_close() self._cleanup() def _cleanup(self) -> None: self.connection.context._unapply_xheaders() # type: ignore HTTPRequest = httputil.HTTPServerRequest