You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

53 lines
3.2 KiB

import ssl
from tornado_py3 import httputil as httputil, iostream as iostream, netutil as netutil
from tornado_py3.escape import native_str as native_str
from tornado_py3.http1connection import HTTP1ConnectionParameters as HTTP1ConnectionParameters, HTTP1ServerConnection as HTTP1ServerConnection
from tornado_py3.tcpserver import TCPServer as TCPServer
from tornado_py3.util import Configurable as Configurable
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Type, Union
class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate):
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
request_callback: Any = ...
xheaders: Any = ...
protocol: Any = ...
conn_params: Any = ...
trusted_downstream: Any = ...
def initialize(self, request_callback: Union[httputil.HTTPServerConnectionDelegate, Callable[[httputil.HTTPServerRequest], None]], no_keep_alive: bool=..., xheaders: bool=..., ssl_options: Union[Dict[str, Any], ssl.SSLContext]=..., protocol: str=..., decompress_request: bool=..., chunk_size: int=..., max_header_size: int=..., idle_connection_timeout: float=..., body_timeout: float=..., max_body_size: int=..., max_buffer_size: int=..., trusted_downstream: List[str]=...) -> None: ...
@classmethod
def configurable_base(cls: Any) -> Type[Configurable]: ...
@classmethod
def configurable_default(cls: Any) -> Type[Configurable]: ...
async def close_all_connections(self) -> None: ...
def handle_stream(self, stream: iostream.IOStream, address: Tuple) -> None: ...
def start_request(self, server_conn: object, request_conn: httputil.HTTPConnection) -> httputil.HTTPMessageDelegate: ...
def on_close(self, server_conn: object) -> None: ...
class _CallableAdapter(httputil.HTTPMessageDelegate):
connection: Any = ...
request_callback: Any = ...
request: Any = ...
delegate: Any = ...
def __init__(self, request_callback: Callable[[httputil.HTTPServerRequest], None], request_conn: httputil.HTTPConnection) -> None: ...
def headers_received(self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders) -> Optional[Awaitable[None]]: ...
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: ...
def finish(self) -> None: ...
def on_connection_close(self) -> None: ...
class _HTTPRequestContext:
address: Any = ...
address_family: Any = ...
remote_ip: Any = ...
protocol: Any = ...
trusted_downstream: Any = ...
def __init__(self, stream: iostream.IOStream, address: Tuple, protocol: Optional[str], trusted_downstream: List[str]=...) -> None: ...
class _ProxyAdapter(httputil.HTTPMessageDelegate):
connection: Any = ...
delegate: Any = ...
def __init__(self, delegate: httputil.HTTPMessageDelegate, request_conn: httputil.HTTPConnection) -> None: ...
def headers_received(self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders) -> Optional[Awaitable[None]]: ...
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: ...
def finish(self) -> None: ...
def on_connection_close(self) -> None: ...
HTTPRequest = httputil.HTTPServerRequest