import ssl from tornado_py3 import httputil as httputil, iostream as iostream, netutil as netutil from tornado_py3.escape import native_str as native_str from tornado_py3.http1connection import HTTP1ConnectionParameters as HTTP1ConnectionParameters, HTTP1ServerConnection as HTTP1ServerConnection from tornado_py3.tcpserver import TCPServer as TCPServer from tornado_py3.util import Configurable as Configurable from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Type, Union class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate): def __init__(self, *args: Any, **kwargs: Any) -> None: ... request_callback: Any = ... xheaders: Any = ... protocol: Any = ... conn_params: Any = ... trusted_downstream: Any = ... def initialize(self, request_callback: Union[httputil.HTTPServerConnectionDelegate, Callable[[httputil.HTTPServerRequest], None]], no_keep_alive: bool=..., xheaders: bool=..., ssl_options: Union[Dict[str, Any], ssl.SSLContext]=..., protocol: str=..., decompress_request: bool=..., chunk_size: int=..., max_header_size: int=..., idle_connection_timeout: float=..., body_timeout: float=..., max_body_size: int=..., max_buffer_size: int=..., trusted_downstream: List[str]=...) -> None: ... @classmethod def configurable_base(cls: Any) -> Type[Configurable]: ... @classmethod def configurable_default(cls: Any) -> Type[Configurable]: ... async def close_all_connections(self) -> None: ... def handle_stream(self, stream: iostream.IOStream, address: Tuple) -> None: ... def start_request(self, server_conn: object, request_conn: httputil.HTTPConnection) -> httputil.HTTPMessageDelegate: ... def on_close(self, server_conn: object) -> None: ... class _CallableAdapter(httputil.HTTPMessageDelegate): connection: Any = ... request_callback: Any = ... request: Any = ... delegate: Any = ... def __init__(self, request_callback: Callable[[httputil.HTTPServerRequest], None], request_conn: httputil.HTTPConnection) -> None: ... def headers_received(self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders) -> Optional[Awaitable[None]]: ... def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: ... def finish(self) -> None: ... def on_connection_close(self) -> None: ... class _HTTPRequestContext: address: Any = ... address_family: Any = ... remote_ip: Any = ... protocol: Any = ... trusted_downstream: Any = ... def __init__(self, stream: iostream.IOStream, address: Tuple, protocol: Optional[str], trusted_downstream: List[str]=...) -> None: ... class _ProxyAdapter(httputil.HTTPMessageDelegate): connection: Any = ... delegate: Any = ... def __init__(self, delegate: httputil.HTTPMessageDelegate, request_conn: httputil.HTTPConnection) -> None: ... def headers_received(self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders) -> Optional[Awaitable[None]]: ... def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: ... def finish(self) -> None: ... def on_connection_close(self) -> None: ... HTTPRequest = httputil.HTTPServerRequest