You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

102 lines
4.8 KiB

import datetime
import ssl
from io import BytesIO
from tornado_py3 import gen as gen, httputil as httputil
from tornado_py3.concurrent import Future as Future, future_set_exception_unless_cancelled as future_set_exception_unless_cancelled, future_set_result_unless_cancelled as future_set_result_unless_cancelled
from tornado_py3.escape import native_str as native_str, utf8 as utf8
from tornado_py3.ioloop import IOLoop as IOLoop
from tornado_py3.util import Configurable as Configurable
from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
class HTTPClient:
def __init__(self, async_client_class: Type[AsyncHTTPClient]=..., **kwargs: Any) -> None: ...
def __del__(self) -> None: ...
def close(self) -> None: ...
def fetch(self, request: Union[HTTPRequest, str], **kwargs: Any) -> HTTPResponse: ...
class AsyncHTTPClient(Configurable):
@classmethod
def configurable_base(cls: Any) -> Type[Configurable]: ...
@classmethod
def configurable_default(cls: Any) -> Type[Configurable]: ...
def __new__(cls: Any, force_instance: bool=..., **kwargs: Any) -> AsyncHTTPClient: ...
io_loop: Any = ...
defaults: Any = ...
def initialize(self, defaults: Dict[str, Any]=...) -> None: ...
def close(self) -> None: ...
def fetch(self, request: Union[str, HTTPRequest], raise_error: bool=..., **kwargs: Any) -> Awaitable[HTTPResponse]: ...
def fetch_impl(self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]) -> None: ...
@classmethod
def configure(cls: Any, impl: Union[None, str, Type[Configurable]], **kwargs: Any) -> None: ...
class HTTPRequest:
proxy_host: Any = ...
proxy_port: Any = ...
proxy_username: Any = ...
proxy_password: Any = ...
proxy_auth_mode: Any = ...
url: Any = ...
method: Any = ...
body_producer: Any = ...
auth_username: Any = ...
auth_password: Any = ...
auth_mode: Any = ...
connect_timeout: Any = ...
request_timeout: Any = ...
follow_redirects: Any = ...
max_redirects: Any = ...
user_agent: Any = ...
decompress_response: Any = ...
network_interface: Any = ...
streaming_callback: Any = ...
header_callback: Any = ...
prepare_curl_callback: Any = ...
allow_nonstandard_methods: Any = ...
validate_cert: Any = ...
ca_certs: Any = ...
allow_ipv6: Any = ...
client_key: Any = ...
client_cert: Any = ...
ssl_options: Any = ...
expect_100_continue: Any = ...
start_time: Any = ...
def __init__(self, url: str, method: str=..., headers: Union[Dict[str, str], httputil.HTTPHeaders]=..., body: Union[bytes, str]=..., auth_username: str=..., auth_password: str=..., auth_mode: str=..., connect_timeout: float=..., request_timeout: float=..., if_modified_since: Union[float, datetime.datetime]=..., follow_redirects: bool=..., max_redirects: int=..., user_agent: str=..., use_gzip: bool=..., network_interface: str=..., streaming_callback: Callable[[bytes], None]=..., header_callback: Callable[[str], None]=..., prepare_curl_callback: Callable[[Any], None]=..., proxy_host: str=..., proxy_port: int=..., proxy_username: str=..., proxy_password: str=..., proxy_auth_mode: str=..., allow_nonstandard_methods: bool=..., validate_cert: bool=..., ca_certs: str=..., allow_ipv6: bool=..., client_key: str=..., client_cert: str=..., body_producer: Callable[[Callable[[bytes], None]], Future[None]]=..., expect_100_continue: bool=..., decompress_response: bool=..., ssl_options: Union[Dict[str, Any], ssl.SSLContext]=...) -> None: ...
@property
def headers(self) -> httputil.HTTPHeaders: ...
@headers.setter
def headers(self, value: Union[Dict[str, str], httputil.HTTPHeaders]) -> None: ...
@property
def body(self) -> bytes: ...
@body.setter
def body(self, value: Union[bytes, str]) -> None: ...
class HTTPResponse:
error: Optional[BaseException] = ...
request: HTTPRequest = ...
code: Any = ...
reason: Any = ...
headers: Any = ...
buffer: Any = ...
effective_url: Any = ...
start_time: Any = ...
request_time: Any = ...
time_info: Any = ...
def __init__(self, request: HTTPRequest, code: int, headers: httputil.HTTPHeaders=..., buffer: BytesIO=..., effective_url: str=..., error: BaseException=..., request_time: float=..., time_info: Dict[str, float]=..., reason: str=..., start_time: float=...) -> None: ...
@property
def body(self) -> bytes: ...
def rethrow(self) -> None: ...
class HTTPClientError(Exception):
code: Any = ...
message: Any = ...
response: Any = ...
def __init__(self, code: int, message: str=..., response: HTTPResponse=...) -> None: ...
HTTPError = HTTPClientError
class _RequestProxy:
request: Any = ...
defaults: Any = ...
def __init__(self, request: HTTPRequest, defaults: Optional[Dict[str, Any]]) -> None: ...
def __getattr__(self, name: str) -> Any: ...
def main() -> None: ...