usenetbinary-newsreaderquickboxtraktkodistabletvshowsqnaptautullifanartsickbeardtvseriesplexswizzinembyseedboxtvdbnzbgetsubtitlewebui
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102 lines
4.8 KiB
102 lines
4.8 KiB
import datetime
|
|
import ssl
|
|
from io import BytesIO
|
|
from tornado_py3 import gen as gen, httputil as httputil
|
|
from tornado_py3.concurrent import Future as Future, future_set_exception_unless_cancelled as future_set_exception_unless_cancelled, future_set_result_unless_cancelled as future_set_result_unless_cancelled
|
|
from tornado_py3.escape import native_str as native_str, utf8 as utf8
|
|
from tornado_py3.ioloop import IOLoop as IOLoop
|
|
from tornado_py3.util import Configurable as Configurable
|
|
from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
|
|
|
|
class HTTPClient:
|
|
def __init__(self, async_client_class: Type[AsyncHTTPClient]=..., **kwargs: Any) -> None: ...
|
|
def __del__(self) -> None: ...
|
|
def close(self) -> None: ...
|
|
def fetch(self, request: Union[HTTPRequest, str], **kwargs: Any) -> HTTPResponse: ...
|
|
|
|
class AsyncHTTPClient(Configurable):
|
|
@classmethod
|
|
def configurable_base(cls: Any) -> Type[Configurable]: ...
|
|
@classmethod
|
|
def configurable_default(cls: Any) -> Type[Configurable]: ...
|
|
def __new__(cls: Any, force_instance: bool=..., **kwargs: Any) -> AsyncHTTPClient: ...
|
|
io_loop: Any = ...
|
|
defaults: Any = ...
|
|
def initialize(self, defaults: Dict[str, Any]=...) -> None: ...
|
|
def close(self) -> None: ...
|
|
def fetch(self, request: Union[str, HTTPRequest], raise_error: bool=..., **kwargs: Any) -> Awaitable[HTTPResponse]: ...
|
|
def fetch_impl(self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]) -> None: ...
|
|
@classmethod
|
|
def configure(cls: Any, impl: Union[None, str, Type[Configurable]], **kwargs: Any) -> None: ...
|
|
|
|
class HTTPRequest:
|
|
proxy_host: Any = ...
|
|
proxy_port: Any = ...
|
|
proxy_username: Any = ...
|
|
proxy_password: Any = ...
|
|
proxy_auth_mode: Any = ...
|
|
url: Any = ...
|
|
method: Any = ...
|
|
body_producer: Any = ...
|
|
auth_username: Any = ...
|
|
auth_password: Any = ...
|
|
auth_mode: Any = ...
|
|
connect_timeout: Any = ...
|
|
request_timeout: Any = ...
|
|
follow_redirects: Any = ...
|
|
max_redirects: Any = ...
|
|
user_agent: Any = ...
|
|
decompress_response: Any = ...
|
|
network_interface: Any = ...
|
|
streaming_callback: Any = ...
|
|
header_callback: Any = ...
|
|
prepare_curl_callback: Any = ...
|
|
allow_nonstandard_methods: Any = ...
|
|
validate_cert: Any = ...
|
|
ca_certs: Any = ...
|
|
allow_ipv6: Any = ...
|
|
client_key: Any = ...
|
|
client_cert: Any = ...
|
|
ssl_options: Any = ...
|
|
expect_100_continue: Any = ...
|
|
start_time: Any = ...
|
|
def __init__(self, url: str, method: str=..., headers: Union[Dict[str, str], httputil.HTTPHeaders]=..., body: Union[bytes, str]=..., auth_username: str=..., auth_password: str=..., auth_mode: str=..., connect_timeout: float=..., request_timeout: float=..., if_modified_since: Union[float, datetime.datetime]=..., follow_redirects: bool=..., max_redirects: int=..., user_agent: str=..., use_gzip: bool=..., network_interface: str=..., streaming_callback: Callable[[bytes], None]=..., header_callback: Callable[[str], None]=..., prepare_curl_callback: Callable[[Any], None]=..., proxy_host: str=..., proxy_port: int=..., proxy_username: str=..., proxy_password: str=..., proxy_auth_mode: str=..., allow_nonstandard_methods: bool=..., validate_cert: bool=..., ca_certs: str=..., allow_ipv6: bool=..., client_key: str=..., client_cert: str=..., body_producer: Callable[[Callable[[bytes], None]], Future[None]]=..., expect_100_continue: bool=..., decompress_response: bool=..., ssl_options: Union[Dict[str, Any], ssl.SSLContext]=...) -> None: ...
|
|
@property
|
|
def headers(self) -> httputil.HTTPHeaders: ...
|
|
@headers.setter
|
|
def headers(self, value: Union[Dict[str, str], httputil.HTTPHeaders]) -> None: ...
|
|
@property
|
|
def body(self) -> bytes: ...
|
|
@body.setter
|
|
def body(self, value: Union[bytes, str]) -> None: ...
|
|
|
|
class HTTPResponse:
|
|
error: Optional[BaseException] = ...
|
|
request: HTTPRequest = ...
|
|
code: Any = ...
|
|
reason: Any = ...
|
|
headers: Any = ...
|
|
buffer: Any = ...
|
|
effective_url: Any = ...
|
|
start_time: Any = ...
|
|
request_time: Any = ...
|
|
time_info: Any = ...
|
|
def __init__(self, request: HTTPRequest, code: int, headers: httputil.HTTPHeaders=..., buffer: BytesIO=..., effective_url: str=..., error: BaseException=..., request_time: float=..., time_info: Dict[str, float]=..., reason: str=..., start_time: float=...) -> None: ...
|
|
@property
|
|
def body(self) -> bytes: ...
|
|
def rethrow(self) -> None: ...
|
|
|
|
class HTTPClientError(Exception):
|
|
code: Any = ...
|
|
message: Any = ...
|
|
response: Any = ...
|
|
def __init__(self, code: int, message: str=..., response: HTTPResponse=...) -> None: ...
|
|
HTTPError = HTTPClientError
|
|
|
|
class _RequestProxy:
|
|
request: Any = ...
|
|
defaults: Any = ...
|
|
def __init__(self, request: HTTPRequest, defaults: Optional[Dict[str, Any]]) -> None: ...
|
|
def __getattr__(self, name: str) -> Any: ...
|
|
|
|
def main() -> None: ...
|
|
|