# Author: Ovidiu Predescu # Date: July 2011 # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # Note: This module's docs are not currently extracted automatically, # so changes must be made manually to twisted.rst # TODO: refactor doc build process to use an appropriate virtualenv """A Twisted reactor built on the Tornado IOLoop. This module lets you run applications and libraries written for Twisted in a Tornado application. To use it, simply call `install` at the beginning of the application:: import tornado.platform.twisted tornado.platform.twisted.install() from twisted.internet import reactor When the app is ready to start, call `IOLoop.instance().start()` instead of `reactor.run()`. This will allow you to use a mixture of Twisted and Tornado code in the same process. It is also possible to create a non-global reactor by calling `tornado.platform.twisted.TornadoReactor(io_loop)`. However, if the `IOLoop` and reactor are to be short-lived (such as those used in unit tests), additional cleanup may be required. Specifically, it is recommended to call:: reactor.fireSystemEvent('shutdown') reactor.disconnectAll() before closing the `IOLoop`. This module has been tested with Twisted versions 11.0.0 and 11.1.0. """ from __future__ import with_statement, absolute_import import functools import logging import time from twisted.internet.posixbase import PosixReactorBase from twisted.internet.interfaces import \ IReactorFDSet, IDelayedCall, IReactorTime from twisted.python import failure, log from twisted.internet import error from zope.interface import implements import tornado import tornado.ioloop from tornado.stack_context import NullContext from tornado.ioloop import IOLoop class TornadoDelayedCall(object): """DelayedCall object for Tornado.""" implements(IDelayedCall) def __init__(self, reactor, seconds, f, *args, **kw): self._reactor = reactor self._func = functools.partial(f, *args, **kw) self._time = self._reactor.seconds() + seconds self._timeout = self._reactor._io_loop.add_timeout(self._time, self._called) self._active = True def _called(self): self._active = False self._reactor._removeDelayedCall(self) try: self._func() except: logging.error("_called caught exception", exc_info=True) def getTime(self): return self._time def cancel(self): self._active = False self._reactor._io_loop.remove_timeout(self._timeout) self._reactor._removeDelayedCall(self) def delay(self, seconds): self._reactor._io_loop.remove_timeout(self._timeout) self._time += seconds self._timeout = self._reactor._io_loop.add_timeout(self._time, self._called) def reset(self, seconds): self._reactor._io_loop.remove_timeout(self._timeout) self._time = self._reactor.seconds() + seconds self._timeout = self._reactor._io_loop.add_timeout(self._time, self._called) def active(self): return self._active class TornadoReactor(PosixReactorBase): """Twisted reactor built on the Tornado IOLoop. Since it is intented to be used in applications where the top-level event loop is ``io_loop.start()`` rather than ``reactor.run()``, it is implemented a little differently than other Twisted reactors. We override `mainLoop` instead of `doIteration` and must implement timed call functionality on top of `IOLoop.add_timeout` rather than using the implementation in `PosixReactorBase`. """ implements(IReactorTime, IReactorFDSet) def __init__(self, io_loop=None): if not io_loop: io_loop = tornado.ioloop.IOLoop.instance() self._io_loop = io_loop self._readers = {} # map of reader objects to fd self._writers = {} # map of writer objects to fd self._fds = {} # a map of fd to a (reader, writer) tuple self._delayedCalls = {} PosixReactorBase.__init__(self) # IOLoop.start() bypasses some of the reactor initialization. # Fire off the necessary events if they weren't already triggered # by reactor.run(). def start_if_necessary(): if not self._started: self.fireSystemEvent('startup') self._io_loop.add_callback(start_if_necessary) # IReactorTime def seconds(self): return time.time() def callLater(self, seconds, f, *args, **kw): dc = TornadoDelayedCall(self, seconds, f, *args, **kw) self._delayedCalls[dc] = True return dc def getDelayedCalls(self): return [x for x in self._delayedCalls if x._active] def _removeDelayedCall(self, dc): if dc in self._delayedCalls: del self._delayedCalls[dc] # IReactorThreads def callFromThread(self, f, *args, **kw): """See `twisted.internet.interfaces.IReactorThreads.callFromThread`""" assert callable(f), "%s is not callable" % f p = functools.partial(f, *args, **kw) self._io_loop.add_callback(p) # We don't need the waker code from the super class, Tornado uses # its own waker. def installWaker(self): pass def wakeUp(self): pass # IReactorFDSet def _invoke_callback(self, fd, events): (reader, writer) = self._fds[fd] if reader: err = None if reader.fileno() == -1: err = error.ConnectionLost() elif events & IOLoop.READ: err = log.callWithLogger(reader, reader.doRead) if err is None and events & IOLoop.ERROR: err = error.ConnectionLost() if err is not None: self.removeReader(reader) reader.readConnectionLost(failure.Failure(err)) if writer: err = None if writer.fileno() == -1: err = error.ConnectionLost() elif events & IOLoop.WRITE: err = log.callWithLogger(writer, writer.doWrite) if err is None and events & IOLoop.ERROR: err = error.ConnectionLost() if err is not None: self.removeWriter(writer) writer.writeConnectionLost(failure.Failure(err)) def addReader(self, reader): """Add a FileDescriptor for notification of data available to read.""" if reader in self._readers: # Don't add the reader if it's already there return fd = reader.fileno() self._readers[reader] = fd if fd in self._fds: (_, writer) = self._fds[fd] self._fds[fd] = (reader, writer) if writer: # We already registered this fd for write events, # update it for read events as well. self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) else: with NullContext(): self._fds[fd] = (reader, None) self._io_loop.add_handler(fd, self._invoke_callback, IOLoop.READ) def addWriter(self, writer): """Add a FileDescriptor for notification of data available to write.""" if writer in self._writers: return fd = writer.fileno() self._writers[writer] = fd if fd in self._fds: (reader, _) = self._fds[fd] self._fds[fd] = (reader, writer) if reader: # We already registered this fd for read events, # update it for write events as well. self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) else: with NullContext(): self._fds[fd] = (None, writer) self._io_loop.add_handler(fd, self._invoke_callback, IOLoop.WRITE) def removeReader(self, reader): """Remove a Selectable for notification of data available to read.""" if reader in self._readers: fd = self._readers.pop(reader) (_, writer) = self._fds[fd] if writer: # We have a writer so we need to update the IOLoop for # write events only. self._fds[fd] = (None, writer) self._io_loop.update_handler(fd, IOLoop.WRITE) else: # Since we have no writer registered, we remove the # entry from _fds and unregister the handler from the # IOLoop del self._fds[fd] self._io_loop.remove_handler(fd) def removeWriter(self, writer): """Remove a Selectable for notification of data available to write.""" if writer in self._writers: fd = self._writers.pop(writer) (reader, _) = self._fds[fd] if reader: # We have a reader so we need to update the IOLoop for # read events only. self._fds[fd] = (reader, None) self._io_loop.update_handler(fd, IOLoop.READ) else: # Since we have no reader registered, we remove the # entry from the _fds and unregister the handler from # the IOLoop. del self._fds[fd] self._io_loop.remove_handler(fd) def removeAll(self): return self._removeAll(self._readers, self._writers) def getReaders(self): return self._readers.keys() def getWriters(self): return self._writers.keys() # The following functions are mainly used in twisted-style test cases; # it is expected that most users of the TornadoReactor will call # IOLoop.start() instead of Reactor.run(). def stop(self): PosixReactorBase.stop(self) self._io_loop.stop() def crash(self): PosixReactorBase.crash(self) self._io_loop.stop() def doIteration(self, delay): raise NotImplementedError("doIteration") def mainLoop(self): self._io_loop.start() if self._stopped: self.fireSystemEvent("shutdown") class _TestReactor(TornadoReactor): """Subclass of TornadoReactor for use in unittests. This can't go in the test.py file because of import-order dependencies with the Twisted reactor test builder. """ def __init__(self): # always use a new ioloop super(_TestReactor, self).__init__(IOLoop()) def listenTCP(self, port, factory, backlog=50, interface=''): # default to localhost to avoid firewall prompts on the mac if not interface: interface = '127.0.0.1' return super(_TestReactor, self).listenTCP( port, factory, backlog=backlog, interface=interface) def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): if not interface: interface = '127.0.0.1' return super(_TestReactor, self).listenUDP( port, protocol, interface=interface, maxPacketSize=maxPacketSize) def install(io_loop=None): """Install this package as the default Twisted reactor.""" if not io_loop: io_loop = tornado.ioloop.IOLoop.instance() reactor = TornadoReactor(io_loop) from twisted.internet.main import installReactor installReactor(reactor) return reactor