# Stubs for tornado_py3.websocket (Python 3) # # NOTE: This dynamically typed stub was automatically generated by stubgen. import abc import tornado_py3.web from tornado_py3 import httpclient, httputil, simple_httpclient from tornado_py3.concurrent import Future from tornado_py3.iostream import IOStream from types import TracebackType from typing import Any, Awaitable, Callable, Dict, List, Optional, Type, Union class _Compressor: def compress(self, data: bytes) -> bytes: ... def flush(self, mode: int) -> bytes: ... class _Decompressor: unconsumed_tail: bytes = ... def decompress(self, data: bytes, max_length: int) -> bytes: ... class _WebSocketDelegate: def on_ws_connection_close(self, close_code: Optional[int]=..., close_reason: Optional[str]=...) -> None: ... def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: ... def on_ping(self, data: bytes) -> None: ... def on_pong(self, data: bytes) -> None: ... def log_exception(self, typ: Optional[Type[BaseException]], value: Optional[BaseException], tb: Optional[TracebackType]) -> None: ... class WebSocketError(Exception): ... class WebSocketClosedError(WebSocketError): ... class _DecompressTooLargeError(Exception): ... class _WebSocketParams: ping_interval: Any = ... ping_timeout: Any = ... max_message_size: Any = ... compression_options: Any = ... def __init__(self, ping_interval: Optional[float]=..., ping_timeout: Optional[float]=..., max_message_size: int=..., compression_options: Optional[Dict[str, Any]]=...) -> None: ... class WebSocketHandler(tornado_py3.web.RequestHandler): ws_connection: Any = ... close_code: Any = ... close_reason: Any = ... stream: Any = ... def __init__(self, application: tornado_py3.web.Application, request: httputil.HTTPServerRequest, **kwargs: Any) -> None: ... open_args: Any = ... open_kwargs: Any = ... async def get(self, *args: Any, **kwargs: Any) -> None: ... @property def ping_interval(self) -> Optional[float]: ... @property def ping_timeout(self) -> Optional[float]: ... @property def max_message_size(self) -> int: ... def write_message(self, message: Union[bytes, str, Dict[str, Any]], binary: bool=...) -> Future[None]: ... def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]: ... @property def selected_subprotocol(self) -> Optional[str]: ... def get_compression_options(self) -> Optional[Dict[str, Any]]: ... def open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]: ... def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: ... def ping(self, data: Union[str, bytes]=...) -> None: ... def on_pong(self, data: bytes) -> None: ... def on_ping(self, data: bytes) -> None: ... def on_close(self) -> None: ... def close(self, code: Optional[int]=..., reason: Optional[str]=...) -> None: ... def check_origin(self, origin: str) -> bool: ... def set_nodelay(self, value: bool) -> None: ... def on_connection_close(self) -> None: ... def on_ws_connection_close(self, close_code: Optional[int]=..., close_reason: Optional[str]=...) -> None: ... def send_error(self, *args: Any, **kwargs: Any) -> None: ... def get_websocket_protocol(self) -> Optional[WebSocketProtocol]: ... class WebSocketProtocol(abc.ABC, metaclass=abc.ABCMeta): handler: Any = ... stream: Any = ... client_terminated: bool = ... server_terminated: bool = ... def __init__(self, handler: _WebSocketDelegate) -> None: ... def on_connection_close(self) -> None: ... @abc.abstractmethod def close(self, code: Optional[int]=..., reason: Optional[str]=...) -> None: ... @abc.abstractmethod def is_closing(self) -> bool: ... @abc.abstractmethod async def accept_connection(self, handler: WebSocketHandler) -> None: ... @abc.abstractmethod def write_message(self, message: Union[str, bytes], binary: bool=...) -> Future[None]: ... @property @abc.abstractmethod def selected_subprotocol(self) -> Optional[str]: ... @abc.abstractmethod def write_ping(self, data: bytes) -> None: ... @abc.abstractmethod def start_pinging(self) -> None: ... @abc.abstractmethod def set_nodelay(self, x: bool) -> None: ... class _PerMessageDeflateCompressor: def __init__(self, persistent: bool, max_wbits: Optional[int], compression_options: Optional[Dict[str, Any]]=...) -> None: ... def compress(self, data: bytes) -> bytes: ... class _PerMessageDeflateDecompressor: def __init__(self, persistent: bool, max_wbits: Optional[int], max_message_size: int, compression_options: Optional[Dict[str, Any]]=...) -> None: ... def decompress(self, data: bytes) -> bytes: ... class WebSocketProtocol13(WebSocketProtocol): FIN: int = ... RSV1: int = ... RSV2: int = ... RSV3: int = ... RSV_MASK: Any = ... OPCODE_MASK: int = ... stream: IOStream = ... mask_outgoing: Any = ... params: Any = ... ping_callback: Any = ... last_ping: float = ... last_pong: float = ... close_code: Any = ... close_reason: Any = ... def __init__(self, handler: _WebSocketDelegate, mask_outgoing: bool, params: _WebSocketParams) -> None: ... @property def selected_subprotocol(self) -> Optional[str]: ... @selected_subprotocol.setter def selected_subprotocol(self, value: Optional[str]) -> None: ... async def accept_connection(self, handler: WebSocketHandler) -> None: ... @staticmethod def compute_accept_value(key: Union[str, bytes]) -> str: ... def write_message(self, message: Union[str, bytes], binary: bool=...) -> Future[None]: ... def write_ping(self, data: bytes) -> None: ... server_terminated: bool = ... def close(self, code: Optional[int]=..., reason: Optional[str]=...) -> None: ... def is_closing(self) -> bool: ... @property def ping_interval(self) -> Optional[float]: ... @property def ping_timeout(self) -> Optional[float]: ... def start_pinging(self) -> None: ... def periodic_ping(self) -> None: ... def set_nodelay(self, x: bool) -> None: ... class WebSocketClientConnection(simple_httpclient._HTTPConnection): protocol: WebSocketProtocol = ... connect_future: Any = ... read_queue: Any = ... key: Any = ... close_code: Any = ... close_reason: Any = ... params: Any = ... tcp_client: Any = ... def __init__(self, request: httpclient.HTTPRequest, on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]]=..., compression_options: Optional[Dict[str, Any]]=..., ping_interval: Optional[float]=..., ping_timeout: Optional[float]=..., max_message_size: int=..., subprotocols: Optional[List[str]]=...) -> None: ... def close(self, code: Optional[int]=..., reason: Optional[str]=...) -> None: ... def on_connection_close(self) -> None: ... def on_ws_connection_close(self, close_code: Optional[int]=..., close_reason: Optional[str]=...) -> None: ... headers: Any = ... final_callback: Any = ... async def headers_received(self, start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], headers: httputil.HTTPHeaders) -> None: ... def write_message(self, message: Union[str, bytes], binary: bool=...) -> Future[None]: ... def read_message(self, callback: Optional[Callable[[Future[Union[None, str, bytes]]], None]]=...) -> Awaitable[Union[None, str, bytes]]: ... def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: ... def ping(self, data: bytes=...) -> None: ... def on_pong(self, data: bytes) -> None: ... def on_ping(self, data: bytes) -> None: ... def get_websocket_protocol(self) -> WebSocketProtocol: ... @property def selected_subprotocol(self) -> Optional[str]: ... def log_exception(self, typ: Optional[Type[BaseException]], value: Optional[BaseException], tb: Optional[TracebackType]) -> None: ... def websocket_connect(url: Union[str, httpclient.HTTPRequest], callback: Optional[Callable[[Future[WebSocketClientConnection]], None]]=..., connect_timeout: Optional[float]=..., on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]]=..., compression_options: Optional[Dict[str, Any]]=..., ping_interval: Optional[float]=..., ping_timeout: Optional[float]=..., max_message_size: int=..., subprotocols: Optional[List[str]]=...) -> Awaitable[WebSocketClientConnection]: ...