import datetime import ssl from io import BytesIO from tornado_py3 import gen as gen, httputil as httputil from tornado_py3.concurrent import Future as Future, future_set_exception_unless_cancelled as future_set_exception_unless_cancelled, future_set_result_unless_cancelled as future_set_result_unless_cancelled from tornado_py3.escape import native_str as native_str, utf8 as utf8 from tornado_py3.ioloop import IOLoop as IOLoop from tornado_py3.util import Configurable as Configurable from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union class HTTPClient: def __init__(self, async_client_class: Type[AsyncHTTPClient]=..., **kwargs: Any) -> None: ... def __del__(self) -> None: ... def close(self) -> None: ... def fetch(self, request: Union[HTTPRequest, str], **kwargs: Any) -> HTTPResponse: ... class AsyncHTTPClient(Configurable): @classmethod def configurable_base(cls: Any) -> Type[Configurable]: ... @classmethod def configurable_default(cls: Any) -> Type[Configurable]: ... def __new__(cls: Any, force_instance: bool=..., **kwargs: Any) -> AsyncHTTPClient: ... io_loop: Any = ... defaults: Any = ... def initialize(self, defaults: Dict[str, Any]=...) -> None: ... def close(self) -> None: ... def fetch(self, request: Union[str, HTTPRequest], raise_error: bool=..., **kwargs: Any) -> Awaitable[HTTPResponse]: ... def fetch_impl(self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]) -> None: ... @classmethod def configure(cls: Any, impl: Union[None, str, Type[Configurable]], **kwargs: Any) -> None: ... class HTTPRequest: proxy_host: Any = ... proxy_port: Any = ... proxy_username: Any = ... proxy_password: Any = ... proxy_auth_mode: Any = ... url: Any = ... method: Any = ... body_producer: Any = ... auth_username: Any = ... auth_password: Any = ... auth_mode: Any = ... connect_timeout: Any = ... request_timeout: Any = ... follow_redirects: Any = ... max_redirects: Any = ... user_agent: Any = ... decompress_response: Any = ... network_interface: Any = ... streaming_callback: Any = ... header_callback: Any = ... prepare_curl_callback: Any = ... allow_nonstandard_methods: Any = ... validate_cert: Any = ... ca_certs: Any = ... allow_ipv6: Any = ... client_key: Any = ... client_cert: Any = ... ssl_options: Any = ... expect_100_continue: Any = ... start_time: Any = ... def __init__(self, url: str, method: str=..., headers: Union[Dict[str, str], httputil.HTTPHeaders]=..., body: Union[bytes, str]=..., auth_username: str=..., auth_password: str=..., auth_mode: str=..., connect_timeout: float=..., request_timeout: float=..., if_modified_since: Union[float, datetime.datetime]=..., follow_redirects: bool=..., max_redirects: int=..., user_agent: str=..., use_gzip: bool=..., network_interface: str=..., streaming_callback: Callable[[bytes], None]=..., header_callback: Callable[[str], None]=..., prepare_curl_callback: Callable[[Any], None]=..., proxy_host: str=..., proxy_port: int=..., proxy_username: str=..., proxy_password: str=..., proxy_auth_mode: str=..., allow_nonstandard_methods: bool=..., validate_cert: bool=..., ca_certs: str=..., allow_ipv6: bool=..., client_key: str=..., client_cert: str=..., body_producer: Callable[[Callable[[bytes], None]], Future[None]]=..., expect_100_continue: bool=..., decompress_response: bool=..., ssl_options: Union[Dict[str, Any], ssl.SSLContext]=...) -> None: ... @property def headers(self) -> httputil.HTTPHeaders: ... @headers.setter def headers(self, value: Union[Dict[str, str], httputil.HTTPHeaders]) -> None: ... @property def body(self) -> bytes: ... @body.setter def body(self, value: Union[bytes, str]) -> None: ... class HTTPResponse: error: Optional[BaseException] = ... request: HTTPRequest = ... code: Any = ... reason: Any = ... headers: Any = ... buffer: Any = ... effective_url: Any = ... start_time: Any = ... request_time: Any = ... time_info: Any = ... def __init__(self, request: HTTPRequest, code: int, headers: httputil.HTTPHeaders=..., buffer: BytesIO=..., effective_url: str=..., error: BaseException=..., request_time: float=..., time_info: Dict[str, float]=..., reason: str=..., start_time: float=...) -> None: ... @property def body(self) -> bytes: ... def rethrow(self) -> None: ... class HTTPClientError(Exception): code: Any = ... message: Any = ... response: Any = ... def __init__(self, code: int, message: str=..., response: HTTPResponse=...) -> None: ... HTTPError = HTTPClientError class _RequestProxy: request: Any = ... defaults: Any = ... def __init__(self, request: HTTPRequest, defaults: Optional[Dict[str, Any]]) -> None: ... def __getattr__(self, name: str) -> Any: ... def main() -> None: ...