|
@ -26,6 +26,7 @@ import os |
|
|
import struct |
|
|
import struct |
|
|
import tornado.escape |
|
|
import tornado.escape |
|
|
import tornado.web |
|
|
import tornado.web |
|
|
|
|
|
import zlib |
|
|
|
|
|
|
|
|
from tornado.concurrent import TracebackFuture |
|
|
from tornado.concurrent import TracebackFuture |
|
|
from tornado.escape import utf8, native_str, to_unicode |
|
|
from tornado.escape import utf8, native_str, to_unicode |
|
@ -35,7 +36,7 @@ from tornado.iostream import StreamClosedError |
|
|
from tornado.log import gen_log, app_log |
|
|
from tornado.log import gen_log, app_log |
|
|
from tornado import simple_httpclient |
|
|
from tornado import simple_httpclient |
|
|
from tornado.tcpclient import TCPClient |
|
|
from tornado.tcpclient import TCPClient |
|
|
from tornado.util import bytes_type, _websocket_mask |
|
|
from tornado.util import _websocket_mask |
|
|
|
|
|
|
|
|
try: |
|
|
try: |
|
|
from urllib.parse import urlparse # py2 |
|
|
from urllib.parse import urlparse # py2 |
|
@ -171,13 +172,15 @@ class WebSocketHandler(tornado.web.RequestHandler): |
|
|
self.stream.set_close_callback(self.on_connection_close) |
|
|
self.stream.set_close_callback(self.on_connection_close) |
|
|
|
|
|
|
|
|
if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"): |
|
|
if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"): |
|
|
self.ws_connection = WebSocketProtocol13(self) |
|
|
self.ws_connection = WebSocketProtocol13( |
|
|
|
|
|
self, compression_options=self.get_compression_options()) |
|
|
self.ws_connection.accept_connection() |
|
|
self.ws_connection.accept_connection() |
|
|
else: |
|
|
else: |
|
|
self.stream.write(tornado.escape.utf8( |
|
|
if not self.stream.closed(): |
|
|
"HTTP/1.1 426 Upgrade Required\r\n" |
|
|
self.stream.write(tornado.escape.utf8( |
|
|
"Sec-WebSocket-Version: 8\r\n\r\n")) |
|
|
"HTTP/1.1 426 Upgrade Required\r\n" |
|
|
self.stream.close() |
|
|
"Sec-WebSocket-Version: 8\r\n\r\n")) |
|
|
|
|
|
self.stream.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_message(self, message, binary=False): |
|
|
def write_message(self, message, binary=False): |
|
@ -213,6 +216,19 @@ class WebSocketHandler(tornado.web.RequestHandler): |
|
|
""" |
|
|
""" |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def get_compression_options(self): |
|
|
|
|
|
"""Override to return compression options for the connection. |
|
|
|
|
|
|
|
|
|
|
|
If this method returns None (the default), compression will |
|
|
|
|
|
be disabled. If it returns a dict (even an empty one), it |
|
|
|
|
|
will be enabled. The contents of the dict may be used to |
|
|
|
|
|
control the memory and CPU usage of the compression, |
|
|
|
|
|
but no such options are currently implemented. |
|
|
|
|
|
|
|
|
|
|
|
.. versionadded:: 4.1 |
|
|
|
|
|
""" |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def open(self): |
|
|
def open(self): |
|
|
"""Invoked when a new WebSocket is opened. |
|
|
"""Invoked when a new WebSocket is opened. |
|
|
|
|
|
|
|
@ -336,6 +352,15 @@ class WebSocketHandler(tornado.web.RequestHandler): |
|
|
self.ws_connection = None |
|
|
self.ws_connection = None |
|
|
self.on_close() |
|
|
self.on_close() |
|
|
|
|
|
|
|
|
|
|
|
def send_error(self, *args, **kwargs): |
|
|
|
|
|
if self.stream is None: |
|
|
|
|
|
super(WebSocketHandler, self).send_error(*args, **kwargs) |
|
|
|
|
|
else: |
|
|
|
|
|
# If we get an uncaught exception during the handshake, |
|
|
|
|
|
# we have no choice but to abruptly close the connection. |
|
|
|
|
|
# TODO: for uncaught exceptions after the handshake, |
|
|
|
|
|
# we can close the connection more gracefully. |
|
|
|
|
|
self.stream.close() |
|
|
|
|
|
|
|
|
def _wrap_method(method): |
|
|
def _wrap_method(method): |
|
|
def _disallow_for_websocket(self, *args, **kwargs): |
|
|
def _disallow_for_websocket(self, *args, **kwargs): |
|
@ -344,7 +369,7 @@ def _wrap_method(method): |
|
|
else: |
|
|
else: |
|
|
raise RuntimeError("Method not supported for Web Sockets") |
|
|
raise RuntimeError("Method not supported for Web Sockets") |
|
|
return _disallow_for_websocket |
|
|
return _disallow_for_websocket |
|
|
for method in ["write", "redirect", "set_header", "send_error", "set_cookie", |
|
|
for method in ["write", "redirect", "set_header", "set_cookie", |
|
|
"set_status", "flush", "finish"]: |
|
|
"set_status", "flush", "finish"]: |
|
|
setattr(WebSocketHandler, method, |
|
|
setattr(WebSocketHandler, method, |
|
|
_wrap_method(getattr(WebSocketHandler, method))) |
|
|
_wrap_method(getattr(WebSocketHandler, method))) |
|
@ -383,13 +408,68 @@ class WebSocketProtocol(object): |
|
|
self.close() # let the subclass cleanup |
|
|
self.close() # let the subclass cleanup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _PerMessageDeflateCompressor(object): |
|
|
|
|
|
def __init__(self, persistent, max_wbits): |
|
|
|
|
|
if max_wbits is None: |
|
|
|
|
|
max_wbits = zlib.MAX_WBITS |
|
|
|
|
|
# There is no symbolic constant for the minimum wbits value. |
|
|
|
|
|
if not (8 <= max_wbits <= zlib.MAX_WBITS): |
|
|
|
|
|
raise ValueError("Invalid max_wbits value %r; allowed range 8-%d", |
|
|
|
|
|
max_wbits, zlib.MAX_WBITS) |
|
|
|
|
|
self._max_wbits = max_wbits |
|
|
|
|
|
if persistent: |
|
|
|
|
|
self._compressor = self._create_compressor() |
|
|
|
|
|
else: |
|
|
|
|
|
self._compressor = None |
|
|
|
|
|
|
|
|
|
|
|
def _create_compressor(self): |
|
|
|
|
|
return zlib.compressobj(-1, zlib.DEFLATED, -self._max_wbits) |
|
|
|
|
|
|
|
|
|
|
|
def compress(self, data): |
|
|
|
|
|
compressor = self._compressor or self._create_compressor() |
|
|
|
|
|
data = (compressor.compress(data) + |
|
|
|
|
|
compressor.flush(zlib.Z_SYNC_FLUSH)) |
|
|
|
|
|
assert data.endswith(b'\x00\x00\xff\xff') |
|
|
|
|
|
return data[:-4] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _PerMessageDeflateDecompressor(object): |
|
|
|
|
|
def __init__(self, persistent, max_wbits): |
|
|
|
|
|
if max_wbits is None: |
|
|
|
|
|
max_wbits = zlib.MAX_WBITS |
|
|
|
|
|
if not (8 <= max_wbits <= zlib.MAX_WBITS): |
|
|
|
|
|
raise ValueError("Invalid max_wbits value %r; allowed range 8-%d", |
|
|
|
|
|
max_wbits, zlib.MAX_WBITS) |
|
|
|
|
|
self._max_wbits = max_wbits |
|
|
|
|
|
if persistent: |
|
|
|
|
|
self._decompressor = self._create_decompressor() |
|
|
|
|
|
else: |
|
|
|
|
|
self._decompressor = None |
|
|
|
|
|
|
|
|
|
|
|
def _create_decompressor(self): |
|
|
|
|
|
return zlib.decompressobj(-self._max_wbits) |
|
|
|
|
|
|
|
|
|
|
|
def decompress(self, data): |
|
|
|
|
|
decompressor = self._decompressor or self._create_decompressor() |
|
|
|
|
|
return decompressor.decompress(data + b'\x00\x00\xff\xff') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WebSocketProtocol13(WebSocketProtocol): |
|
|
class WebSocketProtocol13(WebSocketProtocol): |
|
|
"""Implementation of the WebSocket protocol from RFC 6455. |
|
|
"""Implementation of the WebSocket protocol from RFC 6455. |
|
|
|
|
|
|
|
|
This class supports versions 7 and 8 of the protocol in addition to the |
|
|
This class supports versions 7 and 8 of the protocol in addition to the |
|
|
final version 13. |
|
|
final version 13. |
|
|
""" |
|
|
""" |
|
|
def __init__(self, handler, mask_outgoing=False): |
|
|
# Bit masks for the first byte of a frame. |
|
|
|
|
|
FIN = 0x80 |
|
|
|
|
|
RSV1 = 0x40 |
|
|
|
|
|
RSV2 = 0x20 |
|
|
|
|
|
RSV3 = 0x10 |
|
|
|
|
|
RSV_MASK = RSV1 | RSV2 | RSV3 |
|
|
|
|
|
OPCODE_MASK = 0x0f |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, handler, mask_outgoing=False, |
|
|
|
|
|
compression_options=None): |
|
|
WebSocketProtocol.__init__(self, handler) |
|
|
WebSocketProtocol.__init__(self, handler) |
|
|
self.mask_outgoing = mask_outgoing |
|
|
self.mask_outgoing = mask_outgoing |
|
|
self._final_frame = False |
|
|
self._final_frame = False |
|
@ -400,6 +480,19 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._fragmented_message_buffer = None |
|
|
self._fragmented_message_buffer = None |
|
|
self._fragmented_message_opcode = None |
|
|
self._fragmented_message_opcode = None |
|
|
self._waiting = None |
|
|
self._waiting = None |
|
|
|
|
|
self._compression_options = compression_options |
|
|
|
|
|
self._decompressor = None |
|
|
|
|
|
self._compressor = None |
|
|
|
|
|
self._frame_compressed = None |
|
|
|
|
|
# The total uncompressed size of all messages received or sent. |
|
|
|
|
|
# Unicode messages are encoded to utf8. |
|
|
|
|
|
# Only for testing; subject to change. |
|
|
|
|
|
self._message_bytes_in = 0 |
|
|
|
|
|
self._message_bytes_out = 0 |
|
|
|
|
|
# The total size of all packets received or sent. Includes |
|
|
|
|
|
# the effect of compression, frame overhead, and control frames. |
|
|
|
|
|
self._wire_bytes_in = 0 |
|
|
|
|
|
self._wire_bytes_out = 0 |
|
|
|
|
|
|
|
|
def accept_connection(self): |
|
|
def accept_connection(self): |
|
|
try: |
|
|
try: |
|
@ -444,24 +537,99 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
assert selected in subprotocols |
|
|
assert selected in subprotocols |
|
|
subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected |
|
|
subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected |
|
|
|
|
|
|
|
|
|
|
|
extension_header = '' |
|
|
|
|
|
extensions = self._parse_extensions_header(self.request.headers) |
|
|
|
|
|
for ext in extensions: |
|
|
|
|
|
if (ext[0] == 'permessage-deflate' and |
|
|
|
|
|
self._compression_options is not None): |
|
|
|
|
|
# TODO: negotiate parameters if compression_options |
|
|
|
|
|
# specifies limits. |
|
|
|
|
|
self._create_compressors('server', ext[1]) |
|
|
|
|
|
if ('client_max_window_bits' in ext[1] and |
|
|
|
|
|
ext[1]['client_max_window_bits'] is None): |
|
|
|
|
|
# Don't echo an offered client_max_window_bits |
|
|
|
|
|
# parameter with no value. |
|
|
|
|
|
del ext[1]['client_max_window_bits'] |
|
|
|
|
|
extension_header = ('Sec-WebSocket-Extensions: %s\r\n' % |
|
|
|
|
|
httputil._encode_header( |
|
|
|
|
|
'permessage-deflate', ext[1])) |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
if self.stream.closed(): |
|
|
|
|
|
self._abort() |
|
|
|
|
|
return |
|
|
self.stream.write(tornado.escape.utf8( |
|
|
self.stream.write(tornado.escape.utf8( |
|
|
"HTTP/1.1 101 Switching Protocols\r\n" |
|
|
"HTTP/1.1 101 Switching Protocols\r\n" |
|
|
"Upgrade: websocket\r\n" |
|
|
"Upgrade: websocket\r\n" |
|
|
"Connection: Upgrade\r\n" |
|
|
"Connection: Upgrade\r\n" |
|
|
"Sec-WebSocket-Accept: %s\r\n" |
|
|
"Sec-WebSocket-Accept: %s\r\n" |
|
|
"%s" |
|
|
"%s%s" |
|
|
"\r\n" % (self._challenge_response(), subprotocol_header))) |
|
|
"\r\n" % (self._challenge_response(), |
|
|
|
|
|
subprotocol_header, extension_header))) |
|
|
|
|
|
|
|
|
self._run_callback(self.handler.open, *self.handler.open_args, |
|
|
self._run_callback(self.handler.open, *self.handler.open_args, |
|
|
**self.handler.open_kwargs) |
|
|
**self.handler.open_kwargs) |
|
|
self._receive_frame() |
|
|
self._receive_frame() |
|
|
|
|
|
|
|
|
def _write_frame(self, fin, opcode, data): |
|
|
def _parse_extensions_header(self, headers): |
|
|
|
|
|
extensions = headers.get("Sec-WebSocket-Extensions", '') |
|
|
|
|
|
if extensions: |
|
|
|
|
|
return [httputil._parse_header(e.strip()) |
|
|
|
|
|
for e in extensions.split(',')] |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
def _process_server_headers(self, key, headers): |
|
|
|
|
|
"""Process the headers sent by the server to this client connection. |
|
|
|
|
|
|
|
|
|
|
|
'key' is the websocket handshake challenge/response key. |
|
|
|
|
|
""" |
|
|
|
|
|
assert headers['Upgrade'].lower() == 'websocket' |
|
|
|
|
|
assert headers['Connection'].lower() == 'upgrade' |
|
|
|
|
|
accept = self.compute_accept_value(key) |
|
|
|
|
|
assert headers['Sec-Websocket-Accept'] == accept |
|
|
|
|
|
|
|
|
|
|
|
extensions = self._parse_extensions_header(headers) |
|
|
|
|
|
for ext in extensions: |
|
|
|
|
|
if (ext[0] == 'permessage-deflate' and |
|
|
|
|
|
self._compression_options is not None): |
|
|
|
|
|
self._create_compressors('client', ext[1]) |
|
|
|
|
|
else: |
|
|
|
|
|
raise ValueError("unsupported extension %r", ext) |
|
|
|
|
|
|
|
|
|
|
|
def _get_compressor_options(self, side, agreed_parameters): |
|
|
|
|
|
"""Converts a websocket agreed_parameters set to keyword arguments |
|
|
|
|
|
for our compressor objects. |
|
|
|
|
|
""" |
|
|
|
|
|
options = dict( |
|
|
|
|
|
persistent=(side + '_no_context_takeover') not in agreed_parameters) |
|
|
|
|
|
wbits_header = agreed_parameters.get(side + '_max_window_bits', None) |
|
|
|
|
|
if wbits_header is None: |
|
|
|
|
|
options['max_wbits'] = zlib.MAX_WBITS |
|
|
|
|
|
else: |
|
|
|
|
|
options['max_wbits'] = int(wbits_header) |
|
|
|
|
|
return options |
|
|
|
|
|
|
|
|
|
|
|
def _create_compressors(self, side, agreed_parameters): |
|
|
|
|
|
# TODO: handle invalid parameters gracefully |
|
|
|
|
|
allowed_keys = set(['server_no_context_takeover', |
|
|
|
|
|
'client_no_context_takeover', |
|
|
|
|
|
'server_max_window_bits', |
|
|
|
|
|
'client_max_window_bits']) |
|
|
|
|
|
for key in agreed_parameters: |
|
|
|
|
|
if key not in allowed_keys: |
|
|
|
|
|
raise ValueError("unsupported compression parameter %r" % key) |
|
|
|
|
|
other_side = 'client' if (side == 'server') else 'server' |
|
|
|
|
|
self._compressor = _PerMessageDeflateCompressor( |
|
|
|
|
|
**self._get_compressor_options(side, agreed_parameters)) |
|
|
|
|
|
self._decompressor = _PerMessageDeflateDecompressor( |
|
|
|
|
|
**self._get_compressor_options(other_side, agreed_parameters)) |
|
|
|
|
|
|
|
|
|
|
|
def _write_frame(self, fin, opcode, data, flags=0): |
|
|
if fin: |
|
|
if fin: |
|
|
finbit = 0x80 |
|
|
finbit = self.FIN |
|
|
else: |
|
|
else: |
|
|
finbit = 0 |
|
|
finbit = 0 |
|
|
frame = struct.pack("B", finbit | opcode) |
|
|
frame = struct.pack("B", finbit | opcode | flags) |
|
|
l = len(data) |
|
|
l = len(data) |
|
|
if self.mask_outgoing: |
|
|
if self.mask_outgoing: |
|
|
mask_bit = 0x80 |
|
|
mask_bit = 0x80 |
|
@ -477,7 +645,11 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
mask = os.urandom(4) |
|
|
mask = os.urandom(4) |
|
|
data = mask + _websocket_mask(mask, data) |
|
|
data = mask + _websocket_mask(mask, data) |
|
|
frame += data |
|
|
frame += data |
|
|
self.stream.write(frame) |
|
|
self._wire_bytes_out += len(frame) |
|
|
|
|
|
try: |
|
|
|
|
|
self.stream.write(frame) |
|
|
|
|
|
except StreamClosedError: |
|
|
|
|
|
self._abort() |
|
|
|
|
|
|
|
|
def write_message(self, message, binary=False): |
|
|
def write_message(self, message, binary=False): |
|
|
"""Sends the given message to the client of this Web Socket.""" |
|
|
"""Sends the given message to the client of this Web Socket.""" |
|
@ -486,15 +658,17 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
else: |
|
|
else: |
|
|
opcode = 0x1 |
|
|
opcode = 0x1 |
|
|
message = tornado.escape.utf8(message) |
|
|
message = tornado.escape.utf8(message) |
|
|
assert isinstance(message, bytes_type) |
|
|
assert isinstance(message, bytes) |
|
|
try: |
|
|
self._message_bytes_out += len(message) |
|
|
self._write_frame(True, opcode, message) |
|
|
flags = 0 |
|
|
except StreamClosedError: |
|
|
if self._compressor: |
|
|
self._abort() |
|
|
message = self._compressor.compress(message) |
|
|
|
|
|
flags |= self.RSV1 |
|
|
|
|
|
self._write_frame(True, opcode, message, flags=flags) |
|
|
|
|
|
|
|
|
def write_ping(self, data): |
|
|
def write_ping(self, data): |
|
|
"""Send ping frame.""" |
|
|
"""Send ping frame.""" |
|
|
assert isinstance(data, bytes_type) |
|
|
assert isinstance(data, bytes) |
|
|
self._write_frame(True, 0x9, data) |
|
|
self._write_frame(True, 0x9, data) |
|
|
|
|
|
|
|
|
def _receive_frame(self): |
|
|
def _receive_frame(self): |
|
@ -504,11 +678,15 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._abort() |
|
|
self._abort() |
|
|
|
|
|
|
|
|
def _on_frame_start(self, data): |
|
|
def _on_frame_start(self, data): |
|
|
|
|
|
self._wire_bytes_in += len(data) |
|
|
header, payloadlen = struct.unpack("BB", data) |
|
|
header, payloadlen = struct.unpack("BB", data) |
|
|
self._final_frame = header & 0x80 |
|
|
self._final_frame = header & self.FIN |
|
|
reserved_bits = header & 0x70 |
|
|
reserved_bits = header & self.RSV_MASK |
|
|
self._frame_opcode = header & 0xf |
|
|
self._frame_opcode = header & self.OPCODE_MASK |
|
|
self._frame_opcode_is_control = self._frame_opcode & 0x8 |
|
|
self._frame_opcode_is_control = self._frame_opcode & 0x8 |
|
|
|
|
|
if self._decompressor is not None: |
|
|
|
|
|
self._frame_compressed = bool(reserved_bits & self.RSV1) |
|
|
|
|
|
reserved_bits &= ~self.RSV1 |
|
|
if reserved_bits: |
|
|
if reserved_bits: |
|
|
# client is using as-yet-undefined extensions; abort |
|
|
# client is using as-yet-undefined extensions; abort |
|
|
self._abort() |
|
|
self._abort() |
|
@ -534,6 +712,7 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._abort() |
|
|
self._abort() |
|
|
|
|
|
|
|
|
def _on_frame_length_16(self, data): |
|
|
def _on_frame_length_16(self, data): |
|
|
|
|
|
self._wire_bytes_in += len(data) |
|
|
self._frame_length = struct.unpack("!H", data)[0] |
|
|
self._frame_length = struct.unpack("!H", data)[0] |
|
|
try: |
|
|
try: |
|
|
if self._masked_frame: |
|
|
if self._masked_frame: |
|
@ -544,6 +723,7 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._abort() |
|
|
self._abort() |
|
|
|
|
|
|
|
|
def _on_frame_length_64(self, data): |
|
|
def _on_frame_length_64(self, data): |
|
|
|
|
|
self._wire_bytes_in += len(data) |
|
|
self._frame_length = struct.unpack("!Q", data)[0] |
|
|
self._frame_length = struct.unpack("!Q", data)[0] |
|
|
try: |
|
|
try: |
|
|
if self._masked_frame: |
|
|
if self._masked_frame: |
|
@ -554,6 +734,7 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._abort() |
|
|
self._abort() |
|
|
|
|
|
|
|
|
def _on_masking_key(self, data): |
|
|
def _on_masking_key(self, data): |
|
|
|
|
|
self._wire_bytes_in += len(data) |
|
|
self._frame_mask = data |
|
|
self._frame_mask = data |
|
|
try: |
|
|
try: |
|
|
self.stream.read_bytes(self._frame_length, self._on_masked_frame_data) |
|
|
self.stream.read_bytes(self._frame_length, self._on_masked_frame_data) |
|
@ -561,9 +742,11 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._abort() |
|
|
self._abort() |
|
|
|
|
|
|
|
|
def _on_masked_frame_data(self, data): |
|
|
def _on_masked_frame_data(self, data): |
|
|
|
|
|
# Don't touch _wire_bytes_in; we'll do it in _on_frame_data. |
|
|
self._on_frame_data(_websocket_mask(self._frame_mask, data)) |
|
|
self._on_frame_data(_websocket_mask(self._frame_mask, data)) |
|
|
|
|
|
|
|
|
def _on_frame_data(self, data): |
|
|
def _on_frame_data(self, data): |
|
|
|
|
|
self._wire_bytes_in += len(data) |
|
|
if self._frame_opcode_is_control: |
|
|
if self._frame_opcode_is_control: |
|
|
# control frames may be interleaved with a series of fragmented |
|
|
# control frames may be interleaved with a series of fragmented |
|
|
# data frames, so control frames must not interact with |
|
|
# data frames, so control frames must not interact with |
|
@ -604,8 +787,12 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
if self.client_terminated: |
|
|
if self.client_terminated: |
|
|
return |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if self._frame_compressed: |
|
|
|
|
|
data = self._decompressor.decompress(data) |
|
|
|
|
|
|
|
|
if opcode == 0x1: |
|
|
if opcode == 0x1: |
|
|
# UTF-8 data |
|
|
# UTF-8 data |
|
|
|
|
|
self._message_bytes_in += len(data) |
|
|
try: |
|
|
try: |
|
|
decoded = data.decode("utf-8") |
|
|
decoded = data.decode("utf-8") |
|
|
except UnicodeDecodeError: |
|
|
except UnicodeDecodeError: |
|
@ -614,6 +801,7 @@ class WebSocketProtocol13(WebSocketProtocol): |
|
|
self._run_callback(self.handler.on_message, decoded) |
|
|
self._run_callback(self.handler.on_message, decoded) |
|
|
elif opcode == 0x2: |
|
|
elif opcode == 0x2: |
|
|
# Binary data |
|
|
# Binary data |
|
|
|
|
|
self._message_bytes_in += len(data) |
|
|
self._run_callback(self.handler.on_message, data) |
|
|
self._run_callback(self.handler.on_message, data) |
|
|
elif opcode == 0x8: |
|
|
elif opcode == 0x8: |
|
|
# Close |
|
|
# Close |
|
@ -664,7 +852,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): |
|
|
This class should not be instantiated directly; use the |
|
|
This class should not be instantiated directly; use the |
|
|
`websocket_connect` function instead. |
|
|
`websocket_connect` function instead. |
|
|
""" |
|
|
""" |
|
|
def __init__(self, io_loop, request): |
|
|
def __init__(self, io_loop, request, compression_options=None): |
|
|
|
|
|
self.compression_options = compression_options |
|
|
self.connect_future = TracebackFuture() |
|
|
self.connect_future = TracebackFuture() |
|
|
self.read_future = None |
|
|
self.read_future = None |
|
|
self.read_queue = collections.deque() |
|
|
self.read_queue = collections.deque() |
|
@ -679,6 +868,14 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): |
|
|
'Sec-WebSocket-Key': self.key, |
|
|
'Sec-WebSocket-Key': self.key, |
|
|
'Sec-WebSocket-Version': '13', |
|
|
'Sec-WebSocket-Version': '13', |
|
|
}) |
|
|
}) |
|
|
|
|
|
if self.compression_options is not None: |
|
|
|
|
|
# Always offer to let the server set our max_wbits (and even though |
|
|
|
|
|
# we don't offer it, we will accept a client_no_context_takeover |
|
|
|
|
|
# from the server). |
|
|
|
|
|
# TODO: set server parameters for deflate extension |
|
|
|
|
|
# if requested in self.compression_options. |
|
|
|
|
|
request.headers['Sec-WebSocket-Extensions'] = ( |
|
|
|
|
|
'permessage-deflate; client_max_window_bits') |
|
|
|
|
|
|
|
|
self.tcp_client = TCPClient(io_loop=io_loop) |
|
|
self.tcp_client = TCPClient(io_loop=io_loop) |
|
|
super(WebSocketClientConnection, self).__init__( |
|
|
super(WebSocketClientConnection, self).__init__( |
|
@ -722,12 +919,10 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): |
|
|
start_line, headers) |
|
|
start_line, headers) |
|
|
|
|
|
|
|
|
self.headers = headers |
|
|
self.headers = headers |
|
|
assert self.headers['Upgrade'].lower() == 'websocket' |
|
|
self.protocol = WebSocketProtocol13( |
|
|
assert self.headers['Connection'].lower() == 'upgrade' |
|
|
self, mask_outgoing=True, |
|
|
accept = WebSocketProtocol13.compute_accept_value(self.key) |
|
|
compression_options=self.compression_options) |
|
|
assert self.headers['Sec-Websocket-Accept'] == accept |
|
|
self.protocol._process_server_headers(self.key, self.headers) |
|
|
|
|
|
|
|
|
self.protocol = WebSocketProtocol13(self, mask_outgoing=True) |
|
|
|
|
|
self.protocol._receive_frame() |
|
|
self.protocol._receive_frame() |
|
|
|
|
|
|
|
|
if self._timeout is not None: |
|
|
if self._timeout is not None: |
|
@ -777,14 +972,21 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None): |
|
|
def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, |
|
|
|
|
|
compression_options=None): |
|
|
"""Client-side websocket support. |
|
|
"""Client-side websocket support. |
|
|
|
|
|
|
|
|
Takes a url and returns a Future whose result is a |
|
|
Takes a url and returns a Future whose result is a |
|
|
`WebSocketClientConnection`. |
|
|
`WebSocketClientConnection`. |
|
|
|
|
|
|
|
|
|
|
|
``compression_options`` is interpreted in the same way as the |
|
|
|
|
|
return value of `.WebSocketHandler.get_compression_options`. |
|
|
|
|
|
|
|
|
.. versionchanged:: 3.2 |
|
|
.. versionchanged:: 3.2 |
|
|
Also accepts ``HTTPRequest`` objects in place of urls. |
|
|
Also accepts ``HTTPRequest`` objects in place of urls. |
|
|
|
|
|
|
|
|
|
|
|
.. versionchanged:: 4.1 |
|
|
|
|
|
Added ``compression_options``. |
|
|
""" |
|
|
""" |
|
|
if io_loop is None: |
|
|
if io_loop is None: |
|
|
io_loop = IOLoop.current() |
|
|
io_loop = IOLoop.current() |
|
@ -798,7 +1000,7 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None): |
|
|
request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) |
|
|
request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) |
|
|
request = httpclient._RequestProxy( |
|
|
request = httpclient._RequestProxy( |
|
|
request, httpclient.HTTPRequest._DEFAULTS) |
|
|
request, httpclient.HTTPRequest._DEFAULTS) |
|
|
conn = WebSocketClientConnection(io_loop, request) |
|
|
conn = WebSocketClientConnection(io_loop, request, compression_options) |
|
|
if callback is not None: |
|
|
if callback is not None: |
|
|
io_loop.add_future(conn.connect_future, callback) |
|
|
io_loop.add_future(conn.connect_future, callback) |
|
|
return conn.connect_future |
|
|
return conn.connect_future |
|
|