|
|
@ -46,6 +46,14 @@ try: |
|
|
|
except ImportError: |
|
|
|
_set_nonblocking = None |
|
|
|
|
|
|
|
# These errnos indicate that a non-blocking operation must be retried |
|
|
|
# at a later time. On most platforms they're the same value, but on |
|
|
|
# some they differ. |
|
|
|
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN) |
|
|
|
|
|
|
|
# These errnos indicate that a connection has been abruptly terminated. |
|
|
|
# They should be caught and handled less noisily than other errors. |
|
|
|
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE) |
|
|
|
|
|
|
|
class StreamClosedError(IOError): |
|
|
|
"""Exception raised by `IOStream` methods when the stream is closed. |
|
|
@ -257,15 +265,19 @@ class BaseIOStream(object): |
|
|
|
self._maybe_run_close_callback() |
|
|
|
|
|
|
|
def _maybe_run_close_callback(self): |
|
|
|
if (self.closed() and self._close_callback and |
|
|
|
self._pending_callbacks == 0): |
|
|
|
# if there are pending callbacks, don't run the close callback |
|
|
|
# until they're done (see _maybe_add_error_handler) |
|
|
|
cb = self._close_callback |
|
|
|
self._close_callback = None |
|
|
|
self._run_callback(cb) |
|
|
|
# If there are pending callbacks, don't run the close callback |
|
|
|
# until they're done (see _maybe_add_error_handler) |
|
|
|
if self.closed() and self._pending_callbacks == 0: |
|
|
|
if self._close_callback is not None: |
|
|
|
cb = self._close_callback |
|
|
|
self._close_callback = None |
|
|
|
self._run_callback(cb) |
|
|
|
# Delete any unfinished callbacks to break up reference cycles. |
|
|
|
self._read_callback = self._write_callback = None |
|
|
|
# Clear the buffers so they can be cleared immediately even |
|
|
|
# if the IOStream object is kept alive by a reference cycle. |
|
|
|
# TODO: Clear the read buffer too; it currently breaks some tests. |
|
|
|
self._write_buffer = None |
|
|
|
|
|
|
|
def reading(self): |
|
|
|
"""Returns true if we are currently reading from the stream.""" |
|
|
@ -447,7 +459,7 @@ class BaseIOStream(object): |
|
|
|
chunk = self.read_from_fd() |
|
|
|
except (socket.error, IOError, OSError) as e: |
|
|
|
# ssl.SSLError is a subclass of socket.error |
|
|
|
if e.args[0] == errno.ECONNRESET: |
|
|
|
if e.args[0] in _ERRNO_CONNRESET: |
|
|
|
# Treat ECONNRESET as a connection close rather than |
|
|
|
# an error to minimize log spam (the exception will |
|
|
|
# be available on self.error for apps that care). |
|
|
@ -550,12 +562,12 @@ class BaseIOStream(object): |
|
|
|
self._write_buffer_frozen = False |
|
|
|
_merge_prefix(self._write_buffer, num_bytes) |
|
|
|
self._write_buffer.popleft() |
|
|
|
except socket.error as e: |
|
|
|
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): |
|
|
|
except (socket.error, IOError, OSError) as e: |
|
|
|
if e.args[0] in _ERRNO_WOULDBLOCK: |
|
|
|
self._write_buffer_frozen = True |
|
|
|
break |
|
|
|
else: |
|
|
|
if e.args[0] not in (errno.EPIPE, errno.ECONNRESET): |
|
|
|
if e.args[0] not in _ERRNO_CONNRESET: |
|
|
|
# Broken pipe errors are usually caused by connection |
|
|
|
# reset, and its better to not log EPIPE errors to |
|
|
|
# minimize log spam |
|
|
@ -682,7 +694,7 @@ class IOStream(BaseIOStream): |
|
|
|
try: |
|
|
|
chunk = self.socket.recv(self.read_chunk_size) |
|
|
|
except socket.error as e: |
|
|
|
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): |
|
|
|
if e.args[0] in _ERRNO_WOULDBLOCK: |
|
|
|
return None |
|
|
|
else: |
|
|
|
raise |
|
|
@ -725,7 +737,8 @@ class IOStream(BaseIOStream): |
|
|
|
# returned immediately when attempting to connect to |
|
|
|
# localhost, so handle them the same way as an error |
|
|
|
# reported later in _handle_connect. |
|
|
|
if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK): |
|
|
|
if (e.args[0] != errno.EINPROGRESS and |
|
|
|
e.args[0] not in _ERRNO_WOULDBLOCK): |
|
|
|
gen_log.warning("Connect error on fd %d: %s", |
|
|
|
self.socket.fileno(), e) |
|
|
|
self.close(exc_info=True) |
|
|
@ -789,6 +802,17 @@ class SSLIOStream(IOStream): |
|
|
|
self._ssl_connect_callback = None |
|
|
|
self._server_hostname = None |
|
|
|
|
|
|
|
# If the socket is already connected, attempt to start the handshake. |
|
|
|
try: |
|
|
|
self.socket.getpeername() |
|
|
|
except socket.error: |
|
|
|
pass |
|
|
|
else: |
|
|
|
# Indirectly start the handshake, which will run on the next |
|
|
|
# IOLoop iteration and then the real IO state will be set in |
|
|
|
# _handle_events. |
|
|
|
self._add_io_state(self.io_loop.WRITE) |
|
|
|
|
|
|
|
def reading(self): |
|
|
|
return self._handshake_reading or super(SSLIOStream, self).reading() |
|
|
|
|
|
|
@ -821,7 +845,7 @@ class SSLIOStream(IOStream): |
|
|
|
return self.close(exc_info=True) |
|
|
|
raise |
|
|
|
except socket.error as err: |
|
|
|
if err.args[0] in (errno.ECONNABORTED, errno.ECONNRESET): |
|
|
|
if err.args[0] in _ERRNO_CONNRESET: |
|
|
|
return self.close(exc_info=True) |
|
|
|
except AttributeError: |
|
|
|
# On Linux, if the connection was reset before the call to |
|
|
@ -917,7 +941,7 @@ class SSLIOStream(IOStream): |
|
|
|
else: |
|
|
|
raise |
|
|
|
except socket.error as e: |
|
|
|
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): |
|
|
|
if e.args[0] in _ERRNO_WOULDBLOCK: |
|
|
|
return None |
|
|
|
else: |
|
|
|
raise |
|
|
@ -953,7 +977,7 @@ class PipeIOStream(BaseIOStream): |
|
|
|
try: |
|
|
|
chunk = os.read(self.fd, self.read_chunk_size) |
|
|
|
except (IOError, OSError) as e: |
|
|
|
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): |
|
|
|
if e.args[0] in _ERRNO_WOULDBLOCK: |
|
|
|
return None |
|
|
|
elif e.args[0] == errno.EBADF: |
|
|
|
# If the writing half of a pipe is closed, select will |
|
|
|