summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/eventloop
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/eventloop')
-rwxr-xr-xsrc/console/zmq/eventloop/__init__.py5
-rwxr-xr-xsrc/console/zmq/eventloop/ioloop.py193
-rw-r--r--src/console/zmq/eventloop/minitornado/__init__.py0
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/concurrent.py11
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/ioloop.py829
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/log.py6
-rw-r--r--src/console/zmq/eventloop/minitornado/platform/__init__.py0
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/platform/auto.py45
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/platform/common.py91
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/platform/interface.py63
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/platform/posix.py70
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/platform/windows.py20
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/stack_context.py376
-rwxr-xr-xsrc/console/zmq/eventloop/minitornado/util.py184
-rwxr-xr-xsrc/console/zmq/eventloop/zmqstream.py529
15 files changed, 2422 insertions, 0 deletions
diff --git a/src/console/zmq/eventloop/__init__.py b/src/console/zmq/eventloop/__init__.py
new file mode 100755
index 00000000..568e8e8d
--- /dev/null
+++ b/src/console/zmq/eventloop/__init__.py
@@ -0,0 +1,5 @@
+"""A Tornado based event loop for PyZMQ."""
+
+from zmq.eventloop.ioloop import IOLoop
+
+__all__ = ['IOLoop'] \ No newline at end of file
diff --git a/src/console/zmq/eventloop/ioloop.py b/src/console/zmq/eventloop/ioloop.py
new file mode 100755
index 00000000..35f4c418
--- /dev/null
+++ b/src/console/zmq/eventloop/ioloop.py
@@ -0,0 +1,193 @@
+# coding: utf-8
+"""tornado IOLoop API with zmq compatibility
+
+If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
+otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.
+
+The minimal shipped version of tornado's IOLoop does not include
+support for concurrent futures - this will only be available if you
+have tornado ≥ 3.0.
+"""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+from __future__ import absolute_import, division, with_statement
+
+import os
+import time
+import warnings
+
+from zmq import (
+ Poller,
+ POLLIN, POLLOUT, POLLERR,
+ ZMQError, ETERM,
+)
+
+try:
+ import tornado
+ tornado_version = tornado.version_info
+except (ImportError, AttributeError):
+ tornado_version = ()
+
+try:
+ # tornado ≥ 3
+ from tornado.ioloop import PollIOLoop, PeriodicCallback
+ from tornado.log import gen_log
+except ImportError:
+ from .minitornado.ioloop import PollIOLoop, PeriodicCallback
+ from .minitornado.log import gen_log
+
+
+class DelayedCallback(PeriodicCallback):
+ """Schedules the given callback to be called once.
+
+ The callback is called once, after callback_time milliseconds.
+
+ `start` must be called after the DelayedCallback is created.
+
+ The timeout is calculated from when `start` is called.
+ """
+ def __init__(self, callback, callback_time, io_loop=None):
+ # PeriodicCallback require callback_time to be positive
+ warnings.warn("""DelayedCallback is deprecated.
+ Use loop.add_timeout instead.""", DeprecationWarning)
+ callback_time = max(callback_time, 1e-3)
+ super(DelayedCallback, self).__init__(callback, callback_time, io_loop)
+
+ def start(self):
+ """Starts the timer."""
+ self._running = True
+ self._firstrun = True
+ self._next_timeout = time.time() + self.callback_time / 1000.0
+ self.io_loop.add_timeout(self._next_timeout, self._run)
+
+ def _run(self):
+ if not self._running: return
+ self._running = False
+ try:
+ self.callback()
+ except Exception:
+ gen_log.error("Error in delayed callback", exc_info=True)
+
+
+class ZMQPoller(object):
+ """A poller that can be used in the tornado IOLoop.
+
+ This simply wraps a regular zmq.Poller, scaling the timeout
+ by 1000, so that it is in seconds rather than milliseconds.
+ """
+
+ def __init__(self):
+ self._poller = Poller()
+
+ @staticmethod
+ def _map_events(events):
+ """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
+ z_events = 0
+ if events & IOLoop.READ:
+ z_events |= POLLIN
+ if events & IOLoop.WRITE:
+ z_events |= POLLOUT
+ if events & IOLoop.ERROR:
+ z_events |= POLLERR
+ return z_events
+
+ @staticmethod
+ def _remap_events(z_events):
+ """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
+ events = 0
+ if z_events & POLLIN:
+ events |= IOLoop.READ
+ if z_events & POLLOUT:
+ events |= IOLoop.WRITE
+ if z_events & POLLERR:
+ events |= IOLoop.ERROR
+ return events
+
+ def register(self, fd, events):
+ return self._poller.register(fd, self._map_events(events))
+
+ def modify(self, fd, events):
+ return self._poller.modify(fd, self._map_events(events))
+
+ def unregister(self, fd):
+ return self._poller.unregister(fd)
+
+ def poll(self, timeout):
+ """poll in seconds rather than milliseconds.
+
+ Event masks will be IOLoop.READ/WRITE/ERROR
+ """
+ z_events = self._poller.poll(1000*timeout)
+ return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]
+
+ def close(self):
+ pass
+
+
+class ZMQIOLoop(PollIOLoop):
+ """ZMQ subclass of tornado's IOLoop"""
+ def initialize(self, impl=None, **kwargs):
+ impl = ZMQPoller() if impl is None else impl
+ super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)
+
+ @staticmethod
+ def instance():
+ """Returns a global `IOLoop` instance.
+
+ Most applications have a single, global `IOLoop` running on the
+ main thread. Use this method to get this instance from
+ another thread. To get the current thread's `IOLoop`, use `current()`.
+ """
+ # install ZMQIOLoop as the active IOLoop implementation
+ # when using tornado 3
+ if tornado_version >= (3,):
+ PollIOLoop.configure(ZMQIOLoop)
+ return PollIOLoop.instance()
+
+ def start(self):
+ try:
+ super(ZMQIOLoop, self).start()
+ except ZMQError as e:
+ if e.errno == ETERM:
+ # quietly return on ETERM
+ pass
+ else:
+ raise e
+
+
+if tornado_version >= (3,0) and tornado_version < (3,1):
+ def backport_close(self, all_fds=False):
+ """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
+ from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
+ return mini_loop.close.__get__(self)(all_fds)
+ ZMQIOLoop.close = backport_close
+
+
+# public API name
+IOLoop = ZMQIOLoop
+
+
+def install():
+ """set the tornado IOLoop instance with the pyzmq IOLoop.
+
+ After calling this function, tornado's IOLoop.instance() and pyzmq's
+ IOLoop.instance() will return the same object.
+
+ An assertion error will be raised if tornado's IOLoop has been initialized
+ prior to calling this function.
+ """
+ from tornado import ioloop
+ # check if tornado's IOLoop is already initialized to something other
+ # than the pyzmq IOLoop instance:
+ assert (not ioloop.IOLoop.initialized()) or \
+ ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"
+
+ if tornado_version >= (3,):
+ # tornado 3 has an official API for registering new defaults, yay!
+ ioloop.IOLoop.configure(ZMQIOLoop)
+ else:
+ # we have to set the global instance explicitly
+ ioloop.IOLoop._instance = IOLoop.instance()
+
diff --git a/src/console/zmq/eventloop/minitornado/__init__.py b/src/console/zmq/eventloop/minitornado/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/__init__.py
diff --git a/src/console/zmq/eventloop/minitornado/concurrent.py b/src/console/zmq/eventloop/minitornado/concurrent.py
new file mode 100755
index 00000000..519b23d5
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/concurrent.py
@@ -0,0 +1,11 @@
+"""pyzmq does not ship tornado's futures,
+this just raises informative NotImplementedErrors to avoid having to change too much code.
+"""
+
+class NotImplementedFuture(object):
+ def __init__(self, *args, **kwargs):
+ raise NotImplementedError("pyzmq does not ship tornado's Futures, "
+ "install tornado >= 3.0 for future support."
+ )
+
+Future = TracebackFuture = NotImplementedFuture
diff --git a/src/console/zmq/eventloop/minitornado/ioloop.py b/src/console/zmq/eventloop/minitornado/ioloop.py
new file mode 100755
index 00000000..710a3ecb
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/ioloop.py
@@ -0,0 +1,829 @@
+#!/usr/bin/env python
+#
+# Copyright 2009 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""An I/O event loop for non-blocking sockets.
+
+Typical applications will use a single `IOLoop` object, in the
+`IOLoop.instance` singleton. The `IOLoop.start` method should usually
+be called at the end of the ``main()`` function. Atypical applications may
+use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
+case.
+
+In addition to I/O events, the `IOLoop` can also schedule time-based events.
+`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
+"""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+import datetime
+import errno
+import functools
+import heapq
+import logging
+import numbers
+import os
+import select
+import sys
+import threading
+import time
+import traceback
+
+from .concurrent import Future, TracebackFuture
+from .log import app_log, gen_log
+from . import stack_context
+from .util import Configurable
+
+try:
+ import signal
+except ImportError:
+ signal = None
+
+try:
+ import thread # py2
+except ImportError:
+ import _thread as thread # py3
+
+from .platform.auto import set_close_exec, Waker
+
+
+class TimeoutError(Exception):
+ pass
+
+
+class IOLoop(Configurable):
+ """A level-triggered I/O loop.
+
+ We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
+ are available, or else we fall back on select(). If you are
+ implementing a system that needs to handle thousands of
+ simultaneous connections, you should use a system that supports
+ either ``epoll`` or ``kqueue``.
+
+ Example usage for a simple TCP server::
+
+ import errno
+ import functools
+ import ioloop
+ import socket
+
+ def connection_ready(sock, fd, events):
+ while True:
+ try:
+ connection, address = sock.accept()
+ except socket.error, e:
+ if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
+ raise
+ return
+ connection.setblocking(0)
+ handle_connection(connection, address)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.setblocking(0)
+ sock.bind(("", port))
+ sock.listen(128)
+
+ io_loop = ioloop.IOLoop.instance()
+ callback = functools.partial(connection_ready, sock)
+ io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
+ io_loop.start()
+
+ """
+ # Constants from the epoll module
+ _EPOLLIN = 0x001
+ _EPOLLPRI = 0x002
+ _EPOLLOUT = 0x004
+ _EPOLLERR = 0x008
+ _EPOLLHUP = 0x010
+ _EPOLLRDHUP = 0x2000
+ _EPOLLONESHOT = (1 << 30)
+ _EPOLLET = (1 << 31)
+
+ # Our events map exactly to the epoll events
+ NONE = 0
+ READ = _EPOLLIN
+ WRITE = _EPOLLOUT
+ ERROR = _EPOLLERR | _EPOLLHUP
+
+ # Global lock for creating global IOLoop instance
+ _instance_lock = threading.Lock()
+
+ _current = threading.local()
+
+ @staticmethod
+ def instance():
+ """Returns a global `IOLoop` instance.
+
+ Most applications have a single, global `IOLoop` running on the
+ main thread. Use this method to get this instance from
+ another thread. To get the current thread's `IOLoop`, use `current()`.
+ """
+ if not hasattr(IOLoop, "_instance"):
+ with IOLoop._instance_lock:
+ if not hasattr(IOLoop, "_instance"):
+ # New instance after double check
+ IOLoop._instance = IOLoop()
+ return IOLoop._instance
+
+ @staticmethod
+ def initialized():
+ """Returns true if the singleton instance has been created."""
+ return hasattr(IOLoop, "_instance")
+
+ def install(self):
+ """Installs this `IOLoop` object as the singleton instance.
+
+ This is normally not necessary as `instance()` will create
+ an `IOLoop` on demand, but you may want to call `install` to use
+ a custom subclass of `IOLoop`.
+ """
+ assert not IOLoop.initialized()
+ IOLoop._instance = self
+
+ @staticmethod
+ def current():
+ """Returns the current thread's `IOLoop`.
+
+ If an `IOLoop` is currently running or has been marked as current
+ by `make_current`, returns that instance. Otherwise returns
+ `IOLoop.instance()`, i.e. the main thread's `IOLoop`.
+
+ A common pattern for classes that depend on ``IOLoops`` is to use
+ a default argument to enable programs with multiple ``IOLoops``
+ but not require the argument for simpler applications::
+
+ class MyClass(object):
+ def __init__(self, io_loop=None):
+ self.io_loop = io_loop or IOLoop.current()
+
+ In general you should use `IOLoop.current` as the default when
+ constructing an asynchronous object, and use `IOLoop.instance`
+ when you mean to communicate to the main thread from a different
+ one.
+ """
+ current = getattr(IOLoop._current, "instance", None)
+ if current is None:
+ return IOLoop.instance()
+ return current
+
+ def make_current(self):
+ """Makes this the `IOLoop` for the current thread.
+
+ An `IOLoop` automatically becomes current for its thread
+ when it is started, but it is sometimes useful to call
+ `make_current` explictly before starting the `IOLoop`,
+ so that code run at startup time can find the right
+ instance.
+ """
+ IOLoop._current.instance = self
+
+ @staticmethod
+ def clear_current():
+ IOLoop._current.instance = None
+
+ @classmethod
+ def configurable_base(cls):
+ return IOLoop
+
+ @classmethod
+ def configurable_default(cls):
+ # this is the only patch to IOLoop:
+ from zmq.eventloop.ioloop import ZMQIOLoop
+ return ZMQIOLoop
+ # the remainder of this method is unused,
+ # but left for preservation reasons
+ if hasattr(select, "epoll"):
+ from tornado.platform.epoll import EPollIOLoop
+ return EPollIOLoop
+ if hasattr(select, "kqueue"):
+ # Python 2.6+ on BSD or Mac
+ from tornado.platform.kqueue import KQueueIOLoop
+ return KQueueIOLoop
+ from tornado.platform.select import SelectIOLoop
+ return SelectIOLoop
+
+ def initialize(self):
+ pass
+
+ def close(self, all_fds=False):
+ """Closes the `IOLoop`, freeing any resources used.
+
+ If ``all_fds`` is true, all file descriptors registered on the
+ IOLoop will be closed (not just the ones created by the
+ `IOLoop` itself).
+
+ Many applications will only use a single `IOLoop` that runs for the
+ entire lifetime of the process. In that case closing the `IOLoop`
+ is not necessary since everything will be cleaned up when the
+ process exits. `IOLoop.close` is provided mainly for scenarios
+ such as unit tests, which create and destroy a large number of
+ ``IOLoops``.
+
+ An `IOLoop` must be completely stopped before it can be closed. This
+ means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
+ be allowed to return before attempting to call `IOLoop.close()`.
+ Therefore the call to `close` will usually appear just after
+ the call to `start` rather than near the call to `stop`.
+
+ .. versionchanged:: 3.1
+ If the `IOLoop` implementation supports non-integer objects
+ for "file descriptors", those objects will have their
+ ``close`` method when ``all_fds`` is true.
+ """
+ raise NotImplementedError()
+
+ def add_handler(self, fd, handler, events):
+ """Registers the given handler to receive the given events for fd.
+
+ The ``events`` argument is a bitwise or of the constants
+ ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
+
+ When an event occurs, ``handler(fd, events)`` will be run.
+ """
+ raise NotImplementedError()
+
+ def update_handler(self, fd, events):
+ """Changes the events we listen for fd."""
+ raise NotImplementedError()
+
+ def remove_handler(self, fd):
+ """Stop listening for events on fd."""
+ raise NotImplementedError()
+
+ def set_blocking_signal_threshold(self, seconds, action):
+ """Sends a signal if the `IOLoop` is blocked for more than
+ ``s`` seconds.
+
+ Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy
+ platform.
+
+ The action parameter is a Python signal handler. Read the
+ documentation for the `signal` module for more information.
+ If ``action`` is None, the process will be killed if it is
+ blocked for too long.
+ """
+ raise NotImplementedError()
+
+ def set_blocking_log_threshold(self, seconds):
+ """Logs a stack trace if the `IOLoop` is blocked for more than
+ ``s`` seconds.
+
+ Equivalent to ``set_blocking_signal_threshold(seconds,
+ self.log_stack)``
+ """
+ self.set_blocking_signal_threshold(seconds, self.log_stack)
+
+ def log_stack(self, signal, frame):
+ """Signal handler to log the stack trace of the current thread.
+
+ For use with `set_blocking_signal_threshold`.
+ """
+ gen_log.warning('IOLoop blocked for %f seconds in\n%s',
+ self._blocking_signal_threshold,
+ ''.join(traceback.format_stack(frame)))
+
+ def start(self):
+ """Starts the I/O loop.
+
+ The loop will run until one of the callbacks calls `stop()`, which
+ will make the loop stop after the current event iteration completes.
+ """
+ raise NotImplementedError()
+
+ def stop(self):
+ """Stop the I/O loop.
+
+ If the event loop is not currently running, the next call to `start()`
+ will return immediately.
+
+ To use asynchronous methods from otherwise-synchronous code (such as
+ unit tests), you can start and stop the event loop like this::
+
+ ioloop = IOLoop()
+ async_method(ioloop=ioloop, callback=ioloop.stop)
+ ioloop.start()
+
+ ``ioloop.start()`` will return after ``async_method`` has run
+ its callback, whether that callback was invoked before or
+ after ``ioloop.start``.
+
+ Note that even after `stop` has been called, the `IOLoop` is not
+ completely stopped until `IOLoop.start` has also returned.
+ Some work that was scheduled before the call to `stop` may still
+ be run before the `IOLoop` shuts down.
+ """
+ raise NotImplementedError()
+
+ def run_sync(self, func, timeout=None):
+ """Starts the `IOLoop`, runs the given function, and stops the loop.
+
+ If the function returns a `.Future`, the `IOLoop` will run
+ until the future is resolved. If it raises an exception, the
+ `IOLoop` will stop and the exception will be re-raised to the
+ caller.
+
+ The keyword-only argument ``timeout`` may be used to set
+ a maximum duration for the function. If the timeout expires,
+ a `TimeoutError` is raised.
+
+ This method is useful in conjunction with `tornado.gen.coroutine`
+ to allow asynchronous calls in a ``main()`` function::
+
+ @gen.coroutine
+ def main():
+ # do stuff...
+
+ if __name__ == '__main__':
+ IOLoop.instance().run_sync(main)
+ """
+ future_cell = [None]
+
+ def run():
+ try:
+ result = func()
+ except Exception:
+ future_cell[0] = TracebackFuture()
+ future_cell[0].set_exc_info(sys.exc_info())
+ else:
+ if isinstance(result, Future):
+ future_cell[0] = result
+ else:
+ future_cell[0] = Future()
+ future_cell[0].set_result(result)
+ self.add_future(future_cell[0], lambda future: self.stop())
+ self.add_callback(run)
+ if timeout is not None:
+ timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
+ self.start()
+ if timeout is not None:
+ self.remove_timeout(timeout_handle)
+ if not future_cell[0].done():
+ raise TimeoutError('Operation timed out after %s seconds' % timeout)
+ return future_cell[0].result()
+
+ def time(self):
+ """Returns the current time according to the `IOLoop`'s clock.
+
+ The return value is a floating-point number relative to an
+ unspecified time in the past.
+
+ By default, the `IOLoop`'s time function is `time.time`. However,
+ it may be configured to use e.g. `time.monotonic` instead.
+ Calls to `add_timeout` that pass a number instead of a
+ `datetime.timedelta` should use this function to compute the
+ appropriate time, so they can work no matter what time function
+ is chosen.
+ """
+ return time.time()
+
+ def add_timeout(self, deadline, callback):
+ """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
+
+ Returns an opaque handle that may be passed to
+ `remove_timeout` to cancel.
+
+ ``deadline`` may be a number denoting a time (on the same
+ scale as `IOLoop.time`, normally `time.time`), or a
+ `datetime.timedelta` object for a deadline relative to the
+ current time.
+
+ Note that it is not safe to call `add_timeout` from other threads.
+ Instead, you must use `add_callback` to transfer control to the
+ `IOLoop`'s thread, and then call `add_timeout` from there.
+ """
+ raise NotImplementedError()
+
+ def remove_timeout(self, timeout):
+ """Cancels a pending timeout.
+
+ The argument is a handle as returned by `add_timeout`. It is
+ safe to call `remove_timeout` even if the callback has already
+ been run.
+ """
+ raise NotImplementedError()
+
+ def add_callback(self, callback, *args, **kwargs):
+ """Calls the given callback on the next I/O loop iteration.
+
+ It is safe to call this method from any thread at any time,
+ except from a signal handler. Note that this is the **only**
+ method in `IOLoop` that makes this thread-safety guarantee; all
+ other interaction with the `IOLoop` must be done from that
+ `IOLoop`'s thread. `add_callback()` may be used to transfer
+ control from other threads to the `IOLoop`'s thread.
+
+ To add a callback from a signal handler, see
+ `add_callback_from_signal`.
+ """
+ raise NotImplementedError()
+
+ def add_callback_from_signal(self, callback, *args, **kwargs):
+ """Calls the given callback on the next I/O loop iteration.
+
+ Safe for use from a Python signal handler; should not be used
+ otherwise.
+
+ Callbacks added with this method will be run without any
+ `.stack_context`, to avoid picking up the context of the function
+ that was interrupted by the signal.
+ """
+ raise NotImplementedError()
+
+ def add_future(self, future, callback):
+ """Schedules a callback on the ``IOLoop`` when the given
+ `.Future` is finished.
+
+ The callback is invoked with one argument, the
+ `.Future`.
+ """
+ assert isinstance(future, Future)
+ callback = stack_context.wrap(callback)
+ future.add_done_callback(
+ lambda future: self.add_callback(callback, future))
+
+ def _run_callback(self, callback):
+ """Runs a callback with error handling.
+
+ For use in subclasses.
+ """
+ try:
+ callback()
+ except Exception:
+ self.handle_callback_exception(callback)
+
+ def handle_callback_exception(self, callback):
+ """This method is called whenever a callback run by the `IOLoop`
+ throws an exception.
+
+ By default simply logs the exception as an error. Subclasses
+ may override this method to customize reporting of exceptions.
+
+ The exception itself is not passed explicitly, but is available
+ in `sys.exc_info`.
+ """
+ app_log.error("Exception in callback %r", callback, exc_info=True)
+
+
+class PollIOLoop(IOLoop):
+ """Base class for IOLoops built around a select-like function.
+
+ For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
+ (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
+ `tornado.platform.select.SelectIOLoop` (all platforms).
+ """
+ def initialize(self, impl, time_func=None):
+ super(PollIOLoop, self).initialize()
+ self._impl = impl
+ if hasattr(self._impl, 'fileno'):
+ set_close_exec(self._impl.fileno())
+ self.time_func = time_func or time.time
+ self._handlers = {}
+ self._events = {}
+ self._callbacks = []
+ self._callback_lock = threading.Lock()
+ self._timeouts = []
+ self._cancellations = 0
+ self._running = False
+ self._stopped = False
+ self._closing = False
+ self._thread_ident = None
+ self._blocking_signal_threshold = None
+
+ # Create a pipe that we send bogus data to when we want to wake
+ # the I/O loop when it is idle
+ self._waker = Waker()
+ self.add_handler(self._waker.fileno(),
+ lambda fd, events: self._waker.consume(),
+ self.READ)
+
+ def close(self, all_fds=False):
+ with self._callback_lock:
+ self._closing = True
+ self.remove_handler(self._waker.fileno())
+ if all_fds:
+ for fd in self._handlers.keys():
+ try:
+ close_method = getattr(fd, 'close', None)
+ if close_method is not None:
+ close_method()
+ else:
+ os.close(fd)
+ except Exception:
+ gen_log.debug("error closing fd %s", fd, exc_info=True)
+ self._waker.close()
+ self._impl.close()
+
+ def add_handler(self, fd, handler, events):
+ self._handlers[fd] = stack_context.wrap(handler)
+ self._impl.register(fd, events | self.ERROR)
+
+ def update_handler(self, fd, events):
+ self._impl.modify(fd, events | self.ERROR)
+
+ def remove_handler(self, fd):
+ self._handlers.pop(fd, None)
+ self._events.pop(fd, None)
+ try:
+ self._impl.unregister(fd)
+ except Exception:
+ gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
+
+ def set_blocking_signal_threshold(self, seconds, action):
+ if not hasattr(signal, "setitimer"):
+ gen_log.error("set_blocking_signal_threshold requires a signal module "
+ "with the setitimer method")
+ return
+ self._blocking_signal_threshold = seconds
+ if seconds is not None:
+ signal.signal(signal.SIGALRM,
+ action if action is not None else signal.SIG_DFL)
+
+ def start(self):
+ if not logging.getLogger().handlers:
+ # The IOLoop catches and logs exceptions, so it's
+ # important that log output be visible. However, python's
+ # default behavior for non-root loggers (prior to python
+ # 3.2) is to print an unhelpful "no handlers could be
+ # found" message rather than the actual log entry, so we
+ # must explicitly configure logging if we've made it this
+ # far without anything.
+ logging.basicConfig()
+ if self._stopped:
+ self._stopped = False
+ return
+ old_current = getattr(IOLoop._current, "instance", None)
+ IOLoop._current.instance = self
+ self._thread_ident = thread.get_ident()
+ self._running = True
+
+ # signal.set_wakeup_fd closes a race condition in event loops:
+ # a signal may arrive at the beginning of select/poll/etc
+ # before it goes into its interruptible sleep, so the signal
+ # will be consumed without waking the select. The solution is
+ # for the (C, synchronous) signal handler to write to a pipe,
+ # which will then be seen by select.
+ #
+ # In python's signal handling semantics, this only matters on the
+ # main thread (fortunately, set_wakeup_fd only works on the main
+ # thread and will raise a ValueError otherwise).
+ #
+ # If someone has already set a wakeup fd, we don't want to
+ # disturb it. This is an issue for twisted, which does its
+ # SIGCHILD processing in response to its own wakeup fd being
+ # written to. As long as the wakeup fd is registered on the IOLoop,
+ # the loop will still wake up and everything should work.
+ old_wakeup_fd = None
+ if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
+ # requires python 2.6+, unix. set_wakeup_fd exists but crashes
+ # the python process on windows.
+ try:
+ old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
+ if old_wakeup_fd != -1:
+ # Already set, restore previous value. This is a little racy,
+ # but there's no clean get_wakeup_fd and in real use the
+ # IOLoop is just started once at the beginning.
+ signal.set_wakeup_fd(old_wakeup_fd)
+ old_wakeup_fd = None
+ except ValueError: # non-main thread
+ pass
+
+ while True:
+ poll_timeout = 3600.0
+
+ # Prevent IO event starvation by delaying new callbacks
+ # to the next iteration of the event loop.
+ with self._callback_lock:
+ callbacks = self._callbacks
+ self._callbacks = []
+ for callback in callbacks:
+ self._run_callback(callback)
+
+ if self._timeouts:
+ now = self.time()
+ while self._timeouts:
+ if self._timeouts[0].callback is None:
+ # the timeout was cancelled
+ heapq.heappop(self._timeouts)
+ self._cancellations -= 1
+ elif self._timeouts[0].deadline <= now:
+ timeout = heapq.heappop(self._timeouts)
+ self._run_callback(timeout.callback)
+ else:
+ seconds = self._timeouts[0].deadline - now
+ poll_timeout = min(seconds, poll_timeout)
+ break
+ if (self._cancellations > 512
+ and self._cancellations > (len(self._timeouts) >> 1)):
+ # Clean up the timeout queue when it gets large and it's
+ # more than half cancellations.
+ self._cancellations = 0
+ self._timeouts = [x for x in self._timeouts
+ if x.callback is not None]
+ heapq.heapify(self._timeouts)
+
+ if self._callbacks:
+ # If any callbacks or timeouts called add_callback,
+ # we don't want to wait in poll() before we run them.
+ poll_timeout = 0.0
+
+ if not self._running:
+ break
+
+ if self._blocking_signal_threshold is not None:
+ # clear alarm so it doesn't fire while poll is waiting for
+ # events.
+ signal.setitimer(signal.ITIMER_REAL, 0, 0)
+
+ try:
+ event_pairs = self._impl.poll(poll_timeout)
+ except Exception as e:
+ # Depending on python version and IOLoop implementation,
+ # different exception types may be thrown and there are
+ # two ways EINTR might be signaled:
+ # * e.errno == errno.EINTR
+ # * e.args is like (errno.EINTR, 'Interrupted system call')
+ if (getattr(e, 'errno', None) == errno.EINTR or
+ (isinstance(getattr(e, 'args', None), tuple) and
+ len(e.args) == 2 and e.args[0] == errno.EINTR)):
+ continue
+ else:
+ raise
+
+ if self._blocking_signal_threshold is not None:
+ signal.setitimer(signal.ITIMER_REAL,
+ self._blocking_signal_threshold, 0)
+
+ # Pop one fd at a time from the set of pending fds and run
+ # its handler. Since that handler may perform actions on
+ # other file descriptors, there may be reentrant calls to
+ # this IOLoop that update self._events
+ self._events.update(event_pairs)
+ while self._events:
+ fd, events = self._events.popitem()
+ try:
+ self._handlers[fd](fd, events)
+ except (OSError, IOError) as e:
+ if e.args[0] == errno.EPIPE:
+ # Happens when the client closes the connection
+ pass
+ else:
+ app_log.error("Exception in I/O handler for fd %s",
+ fd, exc_info=True)
+ except Exception:
+ app_log.error("Exception in I/O handler for fd %s",
+ fd, exc_info=True)
+ # reset the stopped flag so another start/stop pair can be issued
+ self._stopped = False
+ if self._blocking_signal_threshold is not None:
+ signal.setitimer(signal.ITIMER_REAL, 0, 0)
+ IOLoop._current.instance = old_current
+ if old_wakeup_fd is not None:
+ signal.set_wakeup_fd(old_wakeup_fd)
+
+ def stop(self):
+ self._running = False
+ self._stopped = True
+ self._waker.wake()
+
+ def time(self):
+ return self.time_func()
+
+ def add_timeout(self, deadline, callback):
+ timeout = _Timeout(deadline, stack_context.wrap(callback), self)
+ heapq.heappush(self._timeouts, timeout)
+ return timeout
+
+ def remove_timeout(self, timeout):
+ # Removing from a heap is complicated, so just leave the defunct
+ # timeout object in the queue (see discussion in
+ # http://docs.python.org/library/heapq.html).
+ # If this turns out to be a problem, we could add a garbage
+ # collection pass whenever there are too many dead timeouts.
+ timeout.callback = None
+ self._cancellations += 1
+
+ def add_callback(self, callback, *args, **kwargs):
+ with self._callback_lock:
+ if self._closing:
+ raise RuntimeError("IOLoop is closing")
+ list_empty = not self._callbacks
+ self._callbacks.append(functools.partial(
+ stack_context.wrap(callback), *args, **kwargs))
+ if list_empty and thread.get_ident() != self._thread_ident:
+ # If we're in the IOLoop's thread, we know it's not currently
+ # polling. If we're not, and we added the first callback to an
+ # empty list, we may need to wake it up (it may wake up on its
+ # own, but an occasional extra wake is harmless). Waking
+ # up a polling IOLoop is relatively expensive, so we try to
+ # avoid it when we can.
+ self._waker.wake()
+
+ def add_callback_from_signal(self, callback, *args, **kwargs):
+ with stack_context.NullContext():
+ if thread.get_ident() != self._thread_ident:
+ # if the signal is handled on another thread, we can add
+ # it normally (modulo the NullContext)
+ self.add_callback(callback, *args, **kwargs)
+ else:
+ # If we're on the IOLoop's thread, we cannot use
+ # the regular add_callback because it may deadlock on
+ # _callback_lock. Blindly insert into self._callbacks.
+ # This is safe because the GIL makes list.append atomic.
+ # One subtlety is that if the signal interrupted the
+ # _callback_lock block in IOLoop.start, we may modify
+ # either the old or new version of self._callbacks,
+ # but either way will work.
+ self._callbacks.append(functools.partial(
+ stack_context.wrap(callback), *args, **kwargs))
+
+
+class _Timeout(object):
+ """An IOLoop timeout, a UNIX timestamp and a callback"""
+
+ # Reduce memory overhead when there are lots of pending callbacks
+ __slots__ = ['deadline', 'callback']
+
+ def __init__(self, deadline, callback, io_loop):
+ if isinstance(deadline, numbers.Real):
+ self.deadline = deadline
+ elif isinstance(deadline, datetime.timedelta):
+ self.deadline = io_loop.time() + _Timeout.timedelta_to_seconds(deadline)
+ else:
+ raise TypeError("Unsupported deadline %r" % deadline)
+ self.callback = callback
+
+ @staticmethod
+ def timedelta_to_seconds(td):
+ """Equivalent to td.total_seconds() (introduced in python 2.7)."""
+ return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
+
+ # Comparison methods to sort by deadline, with object id as a tiebreaker
+ # to guarantee a consistent ordering. The heapq module uses __le__
+ # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
+ # use __lt__).
+ def __lt__(self, other):
+ return ((self.deadline, id(self)) <
+ (other.deadline, id(other)))
+
+ def __le__(self, other):
+ return ((self.deadline, id(self)) <=
+ (other.deadline, id(other)))
+
+
+class PeriodicCallback(object):
+ """Schedules the given callback to be called periodically.
+
+ The callback is called every ``callback_time`` milliseconds.
+
+ `start` must be called after the `PeriodicCallback` is created.
+ """
+ def __init__(self, callback, callback_time, io_loop=None):
+ self.callback = callback
+ if callback_time <= 0:
+ raise ValueError("Periodic callback must have a positive callback_time")
+ self.callback_time = callback_time
+ self.io_loop = io_loop or IOLoop.current()
+ self._running = False
+ self._timeout = None
+
+ def start(self):
+ """Starts the timer."""
+ self._running = True
+ self._next_timeout = self.io_loop.time()
+ self._schedule_next()
+
+ def stop(self):
+ """Stops the timer."""
+ self._running = False
+ if self._timeout is not None:
+ self.io_loop.remove_timeout(self._timeout)
+ self._timeout = None
+
+ def _run(self):
+ if not self._running:
+ return
+ try:
+ self.callback()
+ except Exception:
+ app_log.error("Error in periodic callback", exc_info=True)
+ self._schedule_next()
+
+ def _schedule_next(self):
+ if self._running:
+ current_time = self.io_loop.time()
+ while self._next_timeout <= current_time:
+ self._next_timeout += self.callback_time / 1000.0
+ self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
diff --git a/src/console/zmq/eventloop/minitornado/log.py b/src/console/zmq/eventloop/minitornado/log.py
new file mode 100755
index 00000000..49051e89
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/log.py
@@ -0,0 +1,6 @@
+"""minimal subset of tornado.log for zmq.eventloop.minitornado"""
+
+import logging
+
+app_log = logging.getLogger("tornado.application")
+gen_log = logging.getLogger("tornado.general")
diff --git a/src/console/zmq/eventloop/minitornado/platform/__init__.py b/src/console/zmq/eventloop/minitornado/platform/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/__init__.py
diff --git a/src/console/zmq/eventloop/minitornado/platform/auto.py b/src/console/zmq/eventloop/minitornado/platform/auto.py
new file mode 100755
index 00000000..b40ccd94
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/auto.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Implementation of platform-specific functionality.
+
+For each function or class described in `tornado.platform.interface`,
+the appropriate platform-specific implementation exists in this module.
+Most code that needs access to this functionality should do e.g.::
+
+ from tornado.platform.auto import set_close_exec
+"""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+import os
+
+if os.name == 'nt':
+ from .common import Waker
+ from .windows import set_close_exec
+else:
+ from .posix import set_close_exec, Waker
+
+try:
+ # monotime monkey-patches the time module to have a monotonic function
+ # in versions of python before 3.3.
+ import monotime
+except ImportError:
+ pass
+try:
+ from time import monotonic as monotonic_time
+except ImportError:
+ monotonic_time = None
diff --git a/src/console/zmq/eventloop/minitornado/platform/common.py b/src/console/zmq/eventloop/minitornado/platform/common.py
new file mode 100755
index 00000000..2d75dc1e
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/common.py
@@ -0,0 +1,91 @@
+"""Lowest-common-denominator implementations of platform functionality."""
+from __future__ import absolute_import, division, print_function, with_statement
+
+import errno
+import socket
+
+from . import interface
+
+
+class Waker(interface.Waker):
+ """Create an OS independent asynchronous pipe.
+
+ For use on platforms that don't have os.pipe() (or where pipes cannot
+ be passed to select()), but do have sockets. This includes Windows
+ and Jython.
+ """
+ def __init__(self):
+ # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py
+
+ self.writer = socket.socket()
+ # Disable buffering -- pulling the trigger sends 1 byte,
+ # and we want that sent immediately, to wake up ASAP.
+ self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ count = 0
+ while 1:
+ count += 1
+ # Bind to a local port; for efficiency, let the OS pick
+ # a free port for us.
+ # Unfortunately, stress tests showed that we may not
+ # be able to connect to that port ("Address already in
+ # use") despite that the OS picked it. This appears
+ # to be a race bug in the Windows socket implementation.
+ # So we loop until a connect() succeeds (almost always
+ # on the first try). See the long thread at
+ # http://mail.zope.org/pipermail/zope/2005-July/160433.html
+ # for hideous details.
+ a = socket.socket()
+ a.bind(("127.0.0.1", 0))
+ a.listen(1)
+ connect_address = a.getsockname() # assigned (host, port) pair
+ try:
+ self.writer.connect(connect_address)
+ break # success
+ except socket.error as detail:
+ if (not hasattr(errno, 'WSAEADDRINUSE') or
+ detail[0] != errno.WSAEADDRINUSE):
+ # "Address already in use" is the only error
+ # I've seen on two WinXP Pro SP2 boxes, under
+ # Pythons 2.3.5 and 2.4.1.
+ raise
+ # (10048, 'Address already in use')
+ # assert count <= 2 # never triggered in Tim's tests
+ if count >= 10: # I've never seen it go above 2
+ a.close()
+ self.writer.close()
+ raise socket.error("Cannot bind trigger!")
+ # Close `a` and try again. Note: I originally put a short
+ # sleep() here, but it didn't appear to help or hurt.
+ a.close()
+
+ self.reader, addr = a.accept()
+ self.reader.setblocking(0)
+ self.writer.setblocking(0)
+ a.close()
+ self.reader_fd = self.reader.fileno()
+
+ def fileno(self):
+ return self.reader.fileno()
+
+ def write_fileno(self):
+ return self.writer.fileno()
+
+ def wake(self):
+ try:
+ self.writer.send(b"x")
+ except (IOError, socket.error):
+ pass
+
+ def consume(self):
+ try:
+ while True:
+ result = self.reader.recv(1024)
+ if not result:
+ break
+ except (IOError, socket.error):
+ pass
+
+ def close(self):
+ self.reader.close()
+ self.writer.close()
diff --git a/src/console/zmq/eventloop/minitornado/platform/interface.py b/src/console/zmq/eventloop/minitornado/platform/interface.py
new file mode 100755
index 00000000..07da6bab
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/interface.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Interfaces for platform-specific functionality.
+
+This module exists primarily for documentation purposes and as base classes
+for other tornado.platform modules. Most code should import the appropriate
+implementation from `tornado.platform.auto`.
+"""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+
+def set_close_exec(fd):
+ """Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor."""
+ raise NotImplementedError()
+
+
+class Waker(object):
+ """A socket-like object that can wake another thread from ``select()``.
+
+ The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to
+ its ``select`` (or ``epoll`` or ``kqueue``) calls. When another
+ thread wants to wake up the loop, it calls `wake`. Once it has woken
+ up, it will call `consume` to do any necessary per-wake cleanup. When
+ the ``IOLoop`` is closed, it closes its waker too.
+ """
+ def fileno(self):
+ """Returns the read file descriptor for this waker.
+
+ Must be suitable for use with ``select()`` or equivalent on the
+ local platform.
+ """
+ raise NotImplementedError()
+
+ def write_fileno(self):
+ """Returns the write file descriptor for this waker."""
+ raise NotImplementedError()
+
+ def wake(self):
+ """Triggers activity on the waker's file descriptor."""
+ raise NotImplementedError()
+
+ def consume(self):
+ """Called after the listen has woken up to do any necessary cleanup."""
+ raise NotImplementedError()
+
+ def close(self):
+ """Closes the waker's file descriptor(s)."""
+ raise NotImplementedError()
diff --git a/src/console/zmq/eventloop/minitornado/platform/posix.py b/src/console/zmq/eventloop/minitornado/platform/posix.py
new file mode 100755
index 00000000..ccffbb66
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/posix.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Posix implementations of platform-specific functionality."""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+import fcntl
+import os
+
+from . import interface
+
+
+def set_close_exec(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+
+def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+
+class Waker(interface.Waker):
+ def __init__(self):
+ r, w = os.pipe()
+ _set_nonblocking(r)
+ _set_nonblocking(w)
+ set_close_exec(r)
+ set_close_exec(w)
+ self.reader = os.fdopen(r, "rb", 0)
+ self.writer = os.fdopen(w, "wb", 0)
+
+ def fileno(self):
+ return self.reader.fileno()
+
+ def write_fileno(self):
+ return self.writer.fileno()
+
+ def wake(self):
+ try:
+ self.writer.write(b"x")
+ except IOError:
+ pass
+
+ def consume(self):
+ try:
+ while True:
+ result = self.reader.read()
+ if not result:
+ break
+ except IOError:
+ pass
+
+ def close(self):
+ self.reader.close()
+ self.writer.close()
diff --git a/src/console/zmq/eventloop/minitornado/platform/windows.py b/src/console/zmq/eventloop/minitornado/platform/windows.py
new file mode 100755
index 00000000..817bdca1
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/platform/windows.py
@@ -0,0 +1,20 @@
+# NOTE: win32 support is currently experimental, and not recommended
+# for production use.
+
+
+from __future__ import absolute_import, division, print_function, with_statement
+import ctypes
+import ctypes.wintypes
+
+# See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx
+SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
+SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)
+SetHandleInformation.restype = ctypes.wintypes.BOOL
+
+HANDLE_FLAG_INHERIT = 0x00000001
+
+
+def set_close_exec(fd):
+ success = SetHandleInformation(fd, HANDLE_FLAG_INHERIT, 0)
+ if not success:
+ raise ctypes.GetLastError()
diff --git a/src/console/zmq/eventloop/minitornado/stack_context.py b/src/console/zmq/eventloop/minitornado/stack_context.py
new file mode 100755
index 00000000..226d8042
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/stack_context.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""`StackContext` allows applications to maintain threadlocal-like state
+that follows execution as it moves to other execution contexts.
+
+The motivating examples are to eliminate the need for explicit
+``async_callback`` wrappers (as in `tornado.web.RequestHandler`), and to
+allow some additional context to be kept for logging.
+
+This is slightly magic, but it's an extension of the idea that an
+exception handler is a kind of stack-local state and when that stack
+is suspended and resumed in a new context that state needs to be
+preserved. `StackContext` shifts the burden of restoring that state
+from each call site (e.g. wrapping each `.AsyncHTTPClient` callback
+in ``async_callback``) to the mechanisms that transfer control from
+one context to another (e.g. `.AsyncHTTPClient` itself, `.IOLoop`,
+thread pools, etc).
+
+Example usage::
+
+ @contextlib.contextmanager
+ def die_on_error():
+ try:
+ yield
+ except Exception:
+ logging.error("exception in asynchronous operation",exc_info=True)
+ sys.exit(1)
+
+ with StackContext(die_on_error):
+ # Any exception thrown here *or in callback and its desendents*
+ # will cause the process to exit instead of spinning endlessly
+ # in the ioloop.
+ http_client.fetch(url, callback)
+ ioloop.start()
+
+Most applications shouln't have to work with `StackContext` directly.
+Here are a few rules of thumb for when it's necessary:
+
+* If you're writing an asynchronous library that doesn't rely on a
+ stack_context-aware library like `tornado.ioloop` or `tornado.iostream`
+ (for example, if you're writing a thread pool), use
+ `.stack_context.wrap()` before any asynchronous operations to capture the
+ stack context from where the operation was started.
+
+* If you're writing an asynchronous library that has some shared
+ resources (such as a connection pool), create those shared resources
+ within a ``with stack_context.NullContext():`` block. This will prevent
+ ``StackContexts`` from leaking from one request to another.
+
+* If you want to write something like an exception handler that will
+ persist across asynchronous calls, create a new `StackContext` (or
+ `ExceptionStackContext`), and make your asynchronous calls in a ``with``
+ block that references your `StackContext`.
+"""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+import sys
+import threading
+
+from .util import raise_exc_info
+
+
+class StackContextInconsistentError(Exception):
+ pass
+
+
+class _State(threading.local):
+ def __init__(self):
+ self.contexts = (tuple(), None)
+_state = _State()
+
+
+class StackContext(object):
+ """Establishes the given context as a StackContext that will be transferred.
+
+ Note that the parameter is a callable that returns a context
+ manager, not the context itself. That is, where for a
+ non-transferable context manager you would say::
+
+ with my_context():
+
+ StackContext takes the function itself rather than its result::
+
+ with StackContext(my_context):
+
+ The result of ``with StackContext() as cb:`` is a deactivation
+ callback. Run this callback when the StackContext is no longer
+ needed to ensure that it is not propagated any further (note that
+ deactivating a context does not affect any instances of that
+ context that are currently pending). This is an advanced feature
+ and not necessary in most applications.
+ """
+ def __init__(self, context_factory):
+ self.context_factory = context_factory
+ self.contexts = []
+ self.active = True
+
+ def _deactivate(self):
+ self.active = False
+
+ # StackContext protocol
+ def enter(self):
+ context = self.context_factory()
+ self.contexts.append(context)
+ context.__enter__()
+
+ def exit(self, type, value, traceback):
+ context = self.contexts.pop()
+ context.__exit__(type, value, traceback)
+
+ # Note that some of this code is duplicated in ExceptionStackContext
+ # below. ExceptionStackContext is more common and doesn't need
+ # the full generality of this class.
+ def __enter__(self):
+ self.old_contexts = _state.contexts
+ self.new_contexts = (self.old_contexts[0] + (self,), self)
+ _state.contexts = self.new_contexts
+
+ try:
+ self.enter()
+ except:
+ _state.contexts = self.old_contexts
+ raise
+
+ return self._deactivate
+
+ def __exit__(self, type, value, traceback):
+ try:
+ self.exit(type, value, traceback)
+ finally:
+ final_contexts = _state.contexts
+ _state.contexts = self.old_contexts
+
+ # Generator coroutines and with-statements with non-local
+ # effects interact badly. Check here for signs of
+ # the stack getting out of sync.
+ # Note that this check comes after restoring _state.context
+ # so that if it fails things are left in a (relatively)
+ # consistent state.
+ if final_contexts is not self.new_contexts:
+ raise StackContextInconsistentError(
+ 'stack_context inconsistency (may be caused by yield '
+ 'within a "with StackContext" block)')
+
+ # Break up a reference to itself to allow for faster GC on CPython.
+ self.new_contexts = None
+
+
+class ExceptionStackContext(object):
+ """Specialization of StackContext for exception handling.
+
+ The supplied ``exception_handler`` function will be called in the
+ event of an uncaught exception in this context. The semantics are
+ similar to a try/finally clause, and intended use cases are to log
+ an error, close a socket, or similar cleanup actions. The
+ ``exc_info`` triple ``(type, value, traceback)`` will be passed to the
+ exception_handler function.
+
+ If the exception handler returns true, the exception will be
+ consumed and will not be propagated to other exception handlers.
+ """
+ def __init__(self, exception_handler):
+ self.exception_handler = exception_handler
+ self.active = True
+
+ def _deactivate(self):
+ self.active = False
+
+ def exit(self, type, value, traceback):
+ if type is not None:
+ return self.exception_handler(type, value, traceback)
+
+ def __enter__(self):
+ self.old_contexts = _state.contexts
+ self.new_contexts = (self.old_contexts[0], self)
+ _state.contexts = self.new_contexts
+
+ return self._deactivate
+
+ def __exit__(self, type, value, traceback):
+ try:
+ if type is not None:
+ return self.exception_handler(type, value, traceback)
+ finally:
+ final_contexts = _state.contexts
+ _state.contexts = self.old_contexts
+
+ if final_contexts is not self.new_contexts:
+ raise StackContextInconsistentError(
+ 'stack_context inconsistency (may be caused by yield '
+ 'within a "with StackContext" block)')
+
+ # Break up a reference to itself to allow for faster GC on CPython.
+ self.new_contexts = None
+
+
+class NullContext(object):
+ """Resets the `StackContext`.
+
+ Useful when creating a shared resource on demand (e.g. an
+ `.AsyncHTTPClient`) where the stack that caused the creating is
+ not relevant to future operations.
+ """
+ def __enter__(self):
+ self.old_contexts = _state.contexts
+ _state.contexts = (tuple(), None)
+
+ def __exit__(self, type, value, traceback):
+ _state.contexts = self.old_contexts
+
+
+def _remove_deactivated(contexts):
+ """Remove deactivated handlers from the chain"""
+ # Clean ctx handlers
+ stack_contexts = tuple([h for h in contexts[0] if h.active])
+
+ # Find new head
+ head = contexts[1]
+ while head is not None and not head.active:
+ head = head.old_contexts[1]
+
+ # Process chain
+ ctx = head
+ while ctx is not None:
+ parent = ctx.old_contexts[1]
+
+ while parent is not None:
+ if parent.active:
+ break
+ ctx.old_contexts = parent.old_contexts
+ parent = parent.old_contexts[1]
+
+ ctx = parent
+
+ return (stack_contexts, head)
+
+
+def wrap(fn):
+ """Returns a callable object that will restore the current `StackContext`
+ when executed.
+
+ Use this whenever saving a callback to be executed later in a
+ different execution context (either in a different thread or
+ asynchronously in the same thread).
+ """
+ # Check if function is already wrapped
+ if fn is None or hasattr(fn, '_wrapped'):
+ return fn
+
+ # Capture current stack head
+ # TODO: Any other better way to store contexts and update them in wrapped function?
+ cap_contexts = [_state.contexts]
+
+ def wrapped(*args, **kwargs):
+ ret = None
+ try:
+ # Capture old state
+ current_state = _state.contexts
+
+ # Remove deactivated items
+ cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])
+
+ # Force new state
+ _state.contexts = contexts
+
+ # Current exception
+ exc = (None, None, None)
+ top = None
+
+ # Apply stack contexts
+ last_ctx = 0
+ stack = contexts[0]
+
+ # Apply state
+ for n in stack:
+ try:
+ n.enter()
+ last_ctx += 1
+ except:
+ # Exception happened. Record exception info and store top-most handler
+ exc = sys.exc_info()
+ top = n.old_contexts[1]
+
+ # Execute callback if no exception happened while restoring state
+ if top is None:
+ try:
+ ret = fn(*args, **kwargs)
+ except:
+ exc = sys.exc_info()
+ top = contexts[1]
+
+ # If there was exception, try to handle it by going through the exception chain
+ if top is not None:
+ exc = _handle_exception(top, exc)
+ else:
+ # Otherwise take shorter path and run stack contexts in reverse order
+ while last_ctx > 0:
+ last_ctx -= 1
+ c = stack[last_ctx]
+
+ try:
+ c.exit(*exc)
+ except:
+ exc = sys.exc_info()
+ top = c.old_contexts[1]
+ break
+ else:
+ top = None
+
+ # If if exception happened while unrolling, take longer exception handler path
+ if top is not None:
+ exc = _handle_exception(top, exc)
+
+ # If exception was not handled, raise it
+ if exc != (None, None, None):
+ raise_exc_info(exc)
+ finally:
+ _state.contexts = current_state
+ return ret
+
+ wrapped._wrapped = True
+ return wrapped
+
+
+def _handle_exception(tail, exc):
+ while tail is not None:
+ try:
+ if tail.exit(*exc):
+ exc = (None, None, None)
+ except:
+ exc = sys.exc_info()
+
+ tail = tail.old_contexts[1]
+
+ return exc
+
+
+def run_with_stack_context(context, func):
+ """Run a coroutine ``func`` in the given `StackContext`.
+
+ It is not safe to have a ``yield`` statement within a ``with StackContext``
+ block, so it is difficult to use stack context with `.gen.coroutine`.
+ This helper function runs the function in the correct context while
+ keeping the ``yield`` and ``with`` statements syntactically separate.
+
+ Example::
+
+ @gen.coroutine
+ def incorrect():
+ with StackContext(ctx):
+ # ERROR: this will raise StackContextInconsistentError
+ yield other_coroutine()
+
+ @gen.coroutine
+ def correct():
+ yield run_with_stack_context(StackContext(ctx), other_coroutine)
+
+ .. versionadded:: 3.1
+ """
+ with context:
+ return func()
diff --git a/src/console/zmq/eventloop/minitornado/util.py b/src/console/zmq/eventloop/minitornado/util.py
new file mode 100755
index 00000000..c1e2eb95
--- /dev/null
+++ b/src/console/zmq/eventloop/minitornado/util.py
@@ -0,0 +1,184 @@
+"""Miscellaneous utility functions and classes.
+
+This module is used internally by Tornado. It is not necessarily expected
+that the functions and classes defined here will be useful to other
+applications, but they are documented here in case they are.
+
+The one public-facing part of this module is the `Configurable` class
+and its `~Configurable.configure` method, which becomes a part of the
+interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`,
+and `.Resolver`.
+"""
+
+from __future__ import absolute_import, division, print_function, with_statement
+
+import sys
+
+
+def import_object(name):
+ """Imports an object by name.
+
+ import_object('x') is equivalent to 'import x'.
+ import_object('x.y.z') is equivalent to 'from x.y import z'.
+
+ >>> import tornado.escape
+ >>> import_object('tornado.escape') is tornado.escape
+ True
+ >>> import_object('tornado.escape.utf8') is tornado.escape.utf8
+ True
+ >>> import_object('tornado') is tornado
+ True
+ >>> import_object('tornado.missing_module')
+ Traceback (most recent call last):
+ ...
+ ImportError: No module named missing_module
+ """
+ if name.count('.') == 0:
+ return __import__(name, None, None)
+
+ parts = name.split('.')
+ obj = __import__('.'.join(parts[:-1]), None, None, [parts[-1]], 0)
+ try:
+ return getattr(obj, parts[-1])
+ except AttributeError:
+ raise ImportError("No module named %s" % parts[-1])
+
+
+# Fake unicode literal support: Python 3.2 doesn't have the u'' marker for
+# literal strings, and alternative solutions like "from __future__ import
+# unicode_literals" have other problems (see PEP 414). u() can be applied
+# to ascii strings that include \u escapes (but they must not contain
+# literal non-ascii characters).
+if type('') is not type(b''):
+ def u(s):
+ return s
+ bytes_type = bytes
+ unicode_type = str
+ basestring_type = str
+else:
+ def u(s):
+ return s.decode('unicode_escape')
+ bytes_type = str
+ unicode_type = unicode
+ basestring_type = basestring
+
+
+if sys.version_info > (3,):
+ exec("""
+def raise_exc_info(exc_info):
+ raise exc_info[1].with_traceback(exc_info[2])
+
+def exec_in(code, glob, loc=None):
+ if isinstance(code, str):
+ code = compile(code, '<string>', 'exec', dont_inherit=True)
+ exec(code, glob, loc)
+""")
+else:
+ exec("""
+def raise_exc_info(exc_info):
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+def exec_in(code, glob, loc=None):
+ if isinstance(code, basestring):
+ # exec(string) inherits the caller's future imports; compile
+ # the string first to prevent that.
+ code = compile(code, '<string>', 'exec', dont_inherit=True)
+ exec code in glob, loc
+""")
+
+
+class Configurable(object):
+ """Base class for configurable interfaces.
+
+ A configurable interface is an (abstract) class whose constructor
+ acts as a factory function for one of its implementation subclasses.
+ The implementation subclass as well as optional keyword arguments to
+ its initializer can be set globally at runtime with `configure`.
+
+ By using the constructor as the factory method, the interface
+ looks like a normal class, `isinstance` works as usual, etc. This
+ pattern is most useful when the choice of implementation is likely
+ to be a global decision (e.g. when `~select.epoll` is available,
+ always use it instead of `~select.select`), or when a
+ previously-monolithic class has been split into specialized
+ subclasses.
+
+ Configurable subclasses must define the class methods
+ `configurable_base` and `configurable_default`, and use the instance
+ method `initialize` instead of ``__init__``.
+ """
+ __impl_class = None
+ __impl_kwargs = None
+
+ def __new__(cls, **kwargs):
+ base = cls.configurable_base()
+ args = {}
+ if cls is base:
+ impl = cls.configured_class()
+ if base.__impl_kwargs:
+ args.update(base.__impl_kwargs)
+ else:
+ impl = cls
+ args.update(kwargs)
+ instance = super(Configurable, cls).__new__(impl)
+ # initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
+ # singleton magic. If we get rid of that we can switch to __init__
+ # here too.
+ instance.initialize(**args)
+ return instance
+
+ @classmethod
+ def configurable_base(cls):
+ """Returns the base class of a configurable hierarchy.
+
+ This will normally return the class in which it is defined.
+ (which is *not* necessarily the same as the cls classmethod parameter).
+ """
+ raise NotImplementedError()
+
+ @classmethod
+ def configurable_default(cls):
+ """Returns the implementation class to be used if none is configured."""
+ raise NotImplementedError()
+
+ def initialize(self):
+ """Initialize a `Configurable` subclass instance.
+
+ Configurable classes should use `initialize` instead of ``__init__``.
+ """
+
+ @classmethod
+ def configure(cls, impl, **kwargs):
+ """Sets the class to use when the base class is instantiated.
+
+ Keyword arguments will be saved and added to the arguments passed
+ to the constructor. This can be used to set global defaults for
+ some parameters.
+ """
+ base = cls.configurable_base()
+ if isinstance(impl, (unicode_type, bytes_type)):
+ impl = import_object(impl)
+ if impl is not None and not issubclass(impl, cls):
+ raise ValueError("Invalid subclass of %s" % cls)
+ base.__impl_class = impl
+ base.__impl_kwargs = kwargs
+
+ @classmethod
+ def configured_class(cls):
+ """Returns the currently configured class."""
+ base = cls.configurable_base()
+ if cls.__impl_class is None:
+ base.__impl_class = cls.configurable_default()
+ return base.__impl_class
+
+ @classmethod
+ def _save_configuration(cls):
+ base = cls.configurable_base()
+ return (base.__impl_class, base.__impl_kwargs)
+
+ @classmethod
+ def _restore_configuration(cls, saved):
+ base = cls.configurable_base()
+ base.__impl_class = saved[0]
+ base.__impl_kwargs = saved[1]
+
diff --git a/src/console/zmq/eventloop/zmqstream.py b/src/console/zmq/eventloop/zmqstream.py
new file mode 100755
index 00000000..86a97e44
--- /dev/null
+++ b/src/console/zmq/eventloop/zmqstream.py
@@ -0,0 +1,529 @@
+#
+# Copyright 2009 Facebook
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""A utility class to send to and recv from a non-blocking socket."""
+
+from __future__ import with_statement
+
+import sys
+
+import zmq
+from zmq.utils import jsonapi
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from .ioloop import IOLoop
+
+try:
+ # gen_log will only import from >= 3.0
+ from tornado.log import gen_log
+ from tornado import stack_context
+except ImportError:
+ from .minitornado.log import gen_log
+ from .minitornado import stack_context
+
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+from zmq.utils.strtypes import bytes, unicode, basestring
+
+try:
+ callable
+except NameError:
+ callable = lambda obj: hasattr(obj, '__call__')
+
+
+class ZMQStream(object):
+ """A utility class to register callbacks when a zmq socket sends and receives
+
+ For use with zmq.eventloop.ioloop
+
+ There are three main methods
+
+ Methods:
+
+ * **on_recv(callback, copy=True):**
+ register a callback to be run every time the socket has something to receive
+ * **on_send(callback):**
+ register a callback to be run every time you call send
+ * **send(self, msg, flags=0, copy=False, callback=None):**
+ perform a send that will trigger the callback
+ if callback is passed, on_send is also called.
+
+ There are also send_multipart(), send_json(), send_pyobj()
+
+ Three other methods for deactivating the callbacks:
+
+ * **stop_on_recv():**
+ turn off the recv callback
+ * **stop_on_send():**
+ turn off the send callback
+
+ which simply call ``on_<evt>(None)``.
+
+ The entire socket interface, excluding direct recv methods, is also
+ provided, primarily through direct-linking the methods.
+ e.g.
+
+ >>> stream.bind is stream.socket.bind
+ True
+
+ """
+
+ socket = None
+ io_loop = None
+ poller = None
+
+ def __init__(self, socket, io_loop=None):
+ self.socket = socket
+ self.io_loop = io_loop or IOLoop.instance()
+ self.poller = zmq.Poller()
+
+ self._send_queue = Queue()
+ self._recv_callback = None
+ self._send_callback = None
+ self._close_callback = None
+ self._recv_copy = False
+ self._flushed = False
+
+ self._state = self.io_loop.ERROR
+ self._init_io_state()
+
+ # shortcircuit some socket methods
+ self.bind = self.socket.bind
+ self.bind_to_random_port = self.socket.bind_to_random_port
+ self.connect = self.socket.connect
+ self.setsockopt = self.socket.setsockopt
+ self.getsockopt = self.socket.getsockopt
+ self.setsockopt_string = self.socket.setsockopt_string
+ self.getsockopt_string = self.socket.getsockopt_string
+ self.setsockopt_unicode = self.socket.setsockopt_unicode
+ self.getsockopt_unicode = self.socket.getsockopt_unicode
+
+
+ def stop_on_recv(self):
+ """Disable callback and automatic receiving."""
+ return self.on_recv(None)
+
+ def stop_on_send(self):
+ """Disable callback on sending."""
+ return self.on_send(None)
+
+ def stop_on_err(self):
+ """DEPRECATED, does nothing"""
+ gen_log.warn("on_err does nothing, and will be removed")
+
+ def on_err(self, callback):
+ """DEPRECATED, does nothing"""
+ gen_log.warn("on_err does nothing, and will be removed")
+
+ def on_recv(self, callback, copy=True):
+ """Register a callback for when a message is ready to recv.
+
+ There can be only one callback registered at a time, so each
+ call to `on_recv` replaces previously registered callbacks.
+
+ on_recv(None) disables recv event polling.
+
+ Use on_recv_stream(callback) instead, to register a callback that will receive
+ both this ZMQStream and the message, instead of just the message.
+
+ Parameters
+ ----------
+
+ callback : callable
+ callback must take exactly one argument, which will be a
+ list, as returned by socket.recv_multipart()
+ if callback is None, recv callbacks are disabled.
+ copy : bool
+ copy is passed directly to recv, so if copy is False,
+ callback will receive Message objects. If copy is True,
+ then callback will receive bytes/str objects.
+
+ Returns : None
+ """
+
+ self._check_closed()
+ assert callback is None or callable(callback)
+ self._recv_callback = stack_context.wrap(callback)
+ self._recv_copy = copy
+ if callback is None:
+ self._drop_io_state(self.io_loop.READ)
+ else:
+ self._add_io_state(self.io_loop.READ)
+
+ def on_recv_stream(self, callback, copy=True):
+ """Same as on_recv, but callback will get this stream as first argument
+
+ callback must take exactly two arguments, as it will be called as::
+
+ callback(stream, msg)
+
+ Useful when a single callback should be used with multiple streams.
+ """
+ if callback is None:
+ self.stop_on_recv()
+ else:
+ self.on_recv(lambda msg: callback(self, msg), copy=copy)
+
+ def on_send(self, callback):
+ """Register a callback to be called on each send
+
+ There will be two arguments::
+
+ callback(msg, status)
+
+ * `msg` will be the list of sendable objects that was just sent
+ * `status` will be the return result of socket.send_multipart(msg) -
+ MessageTracker or None.
+
+ Non-copying sends return a MessageTracker object whose
+ `done` attribute will be True when the send is complete.
+ This allows users to track when an object is safe to write to
+ again.
+
+ The second argument will always be None if copy=True
+ on the send.
+
+ Use on_send_stream(callback) to register a callback that will be passed
+ this ZMQStream as the first argument, in addition to the other two.
+
+ on_send(None) disables recv event polling.
+
+ Parameters
+ ----------
+
+ callback : callable
+ callback must take exactly two arguments, which will be
+ the message being sent (always a list),
+ and the return result of socket.send_multipart(msg) -
+ MessageTracker or None.
+
+ if callback is None, send callbacks are disabled.
+ """
+
+ self._check_closed()
+ assert callback is None or callable(callback)
+ self._send_callback = stack_context.wrap(callback)
+
+
+ def on_send_stream(self, callback):
+ """Same as on_send, but callback will get this stream as first argument
+
+ Callback will be passed three arguments::
+
+ callback(stream, msg, status)
+
+ Useful when a single callback should be used with multiple streams.
+ """
+ if callback is None:
+ self.stop_on_send()
+ else:
+ self.on_send(lambda msg, status: callback(self, msg, status))
+
+
+ def send(self, msg, flags=0, copy=True, track=False, callback=None):
+ """Send a message, optionally also register a new callback for sends.
+ See zmq.socket.send for details.
+ """
+ return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback)
+
+ def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None):
+ """Send a multipart message, optionally also register a new callback for sends.
+ See zmq.socket.send_multipart for details.
+ """
+ kwargs = dict(flags=flags, copy=copy, track=track)
+ self._send_queue.put((msg, kwargs))
+ callback = callback or self._send_callback
+ if callback is not None:
+ self.on_send(callback)
+ else:
+ # noop callback
+ self.on_send(lambda *args: None)
+ self._add_io_state(self.io_loop.WRITE)
+
+ def send_string(self, u, flags=0, encoding='utf-8', callback=None):
+ """Send a unicode message with an encoding.
+ See zmq.socket.send_unicode for details.
+ """
+ if not isinstance(u, basestring):
+ raise TypeError("unicode/str objects only")
+ return self.send(u.encode(encoding), flags=flags, callback=callback)
+
+ send_unicode = send_string
+
+ def send_json(self, obj, flags=0, callback=None):
+ """Send json-serialized version of an object.
+ See zmq.socket.send_json for details.
+ """
+ if jsonapi is None:
+ raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
+ else:
+ msg = jsonapi.dumps(obj)
+ return self.send(msg, flags=flags, callback=callback)
+
+ def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
+ """Send a Python object as a message using pickle to serialize.
+
+ See zmq.socket.send_json for details.
+ """
+ msg = pickle.dumps(obj, protocol)
+ return self.send(msg, flags, callback=callback)
+
+ def _finish_flush(self):
+ """callback for unsetting _flushed flag."""
+ self._flushed = False
+
+ def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None):
+ """Flush pending messages.
+
+ This method safely handles all pending incoming and/or outgoing messages,
+ bypassing the inner loop, passing them to the registered callbacks.
+
+ A limit can be specified, to prevent blocking under high load.
+
+ flush will return the first time ANY of these conditions are met:
+ * No more events matching the flag are pending.
+ * the total number of events handled reaches the limit.
+
+ Note that if ``flag|POLLIN != 0``, recv events will be flushed even if no callback
+ is registered, unlike normal IOLoop operation. This allows flush to be
+ used to remove *and ignore* incoming messages.
+
+ Parameters
+ ----------
+ flag : int, default=POLLIN|POLLOUT
+ 0MQ poll flags.
+ If flag|POLLIN, recv events will be flushed.
+ If flag|POLLOUT, send events will be flushed.
+ Both flags can be set at once, which is the default.
+ limit : None or int, optional
+ The maximum number of messages to send or receive.
+ Both send and recv count against this limit.
+
+ Returns
+ -------
+ int : count of events handled (both send and recv)
+ """
+ self._check_closed()
+ # unset self._flushed, so callbacks will execute, in case flush has
+ # already been called this iteration
+ already_flushed = self._flushed
+ self._flushed = False
+ # initialize counters
+ count = 0
+ def update_flag():
+ """Update the poll flag, to prevent registering POLLOUT events
+ if we don't have pending sends."""
+ return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT)
+ flag = update_flag()
+ if not flag:
+ # nothing to do
+ return 0
+ self.poller.register(self.socket, flag)
+ events = self.poller.poll(0)
+ while events and (not limit or count < limit):
+ s,event = events[0]
+ if event & zmq.POLLIN: # receiving
+ self._handle_recv()
+ count += 1
+ if self.socket is None:
+ # break if socket was closed during callback
+ break
+ if event & zmq.POLLOUT and self.sending():
+ self._handle_send()
+ count += 1
+ if self.socket is None:
+ # break if socket was closed during callback
+ break
+
+ flag = update_flag()
+ if flag:
+ self.poller.register(self.socket, flag)
+ events = self.poller.poll(0)
+ else:
+ events = []
+ if count: # only bypass loop if we actually flushed something
+ # skip send/recv callbacks this iteration
+ self._flushed = True
+ # reregister them at the end of the loop
+ if not already_flushed: # don't need to do it again
+ self.io_loop.add_callback(self._finish_flush)
+ elif already_flushed:
+ self._flushed = True
+
+ # update ioloop poll state, which may have changed
+ self._rebuild_io_state()
+ return count
+
+ def set_close_callback(self, callback):
+ """Call the given callback when the stream is closed."""
+ self._close_callback = stack_context.wrap(callback)
+
+ def close(self, linger=None):
+ """Close this stream."""
+ if self.socket is not None:
+ self.io_loop.remove_handler(self.socket)
+ self.socket.close(linger)
+ self.socket = None
+ if self._close_callback:
+ self._run_callback(self._close_callback)
+
+ def receiving(self):
+ """Returns True if we are currently receiving from the stream."""
+ return self._recv_callback is not None
+
+ def sending(self):
+ """Returns True if we are currently sending to the stream."""
+ return not self._send_queue.empty()
+
+ def closed(self):
+ return self.socket is None
+
+ def _run_callback(self, callback, *args, **kwargs):
+ """Wrap running callbacks in try/except to allow us to
+ close our socket."""
+ try:
+ # Use a NullContext to ensure that all StackContexts are run
+ # inside our blanket exception handler rather than outside.
+ with stack_context.NullContext():
+ callback(*args, **kwargs)
+ except:
+ gen_log.error("Uncaught exception, closing connection.",
+ exc_info=True)
+ # Close the socket on an uncaught exception from a user callback
+ # (It would eventually get closed when the socket object is
+ # gc'd, but we don't want to rely on gc happening before we
+ # run out of file descriptors)
+ self.close()
+ # Re-raise the exception so that IOLoop.handle_callback_exception
+ # can see it and log the error
+ raise
+
+ def _handle_events(self, fd, events):
+ """This method is the actual handler for IOLoop, that gets called whenever
+ an event on my socket is posted. It dispatches to _handle_recv, etc."""
+ # print "handling events"
+ if not self.socket:
+ gen_log.warning("Got events for closed stream %s", fd)
+ return
+ try:
+ # dispatch events:
+ if events & IOLoop.ERROR:
+ gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
+ return
+ if events & IOLoop.READ:
+ self._handle_recv()
+ if not self.socket:
+ return
+ if events & IOLoop.WRITE:
+ self._handle_send()
+ if not self.socket:
+ return
+
+ # rebuild the poll state
+ self._rebuild_io_state()
+ except:
+ gen_log.error("Uncaught exception, closing connection.",
+ exc_info=True)
+ self.close()
+ raise
+
+ def _handle_recv(self):
+ """Handle a recv event."""
+ if self._flushed:
+ return
+ try:
+ msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
+ except zmq.ZMQError as e:
+ if e.errno == zmq.EAGAIN:
+ # state changed since poll event
+ pass
+ else:
+ gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
+ else:
+ if self._recv_callback:
+ callback = self._recv_callback
+ # self._recv_callback = None
+ self._run_callback(callback, msg)
+
+ # self.update_state()
+
+
+ def _handle_send(self):
+ """Handle a send event."""
+ if self._flushed:
+ return
+ if not self.sending():
+ gen_log.error("Shouldn't have handled a send event")
+ return
+
+ msg, kwargs = self._send_queue.get()
+ try:
+ status = self.socket.send_multipart(msg, **kwargs)
+ except zmq.ZMQError as e:
+ gen_log.error("SEND Error: %s", e)
+ status = e
+ if self._send_callback:
+ callback = self._send_callback
+ self._run_callback(callback, msg, status)
+
+ # self.update_state()
+
+ def _check_closed(self):
+ if not self.socket:
+ raise IOError("Stream is closed")
+
+ def _rebuild_io_state(self):
+ """rebuild io state based on self.sending() and receiving()"""
+ if self.socket is None:
+ return
+ state = self.io_loop.ERROR
+ if self.receiving():
+ state |= self.io_loop.READ
+ if self.sending():
+ state |= self.io_loop.WRITE
+ if state != self._state:
+ self._state = state
+ self._update_handler(state)
+
+ def _add_io_state(self, state):
+ """Add io_state to poller."""
+ if not self._state & state:
+ self._state = self._state | state
+ self._update_handler(self._state)
+
+ def _drop_io_state(self, state):
+ """Stop poller from watching an io_state."""
+ if self._state & state:
+ self._state = self._state & (~state)
+ self._update_handler(self._state)
+
+ def _update_handler(self, state):
+ """Update IOLoop handler with state."""
+ if self.socket is None:
+ return
+ self.io_loop.update_handler(self.socket, state)
+
+ def _init_io_state(self):
+ """initialize the ioloop event handler"""
+ with stack_context.NullContext():
+ self.io_loop.add_handler(self.socket, self._handle_events, self._state)
+