summaryrefslogtreecommitdiffstats
path: root/external_libs/python/zmq/green
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/zmq/green')
-rw-r--r--external_libs/python/zmq/green/__init__.py40
-rw-r--r--external_libs/python/zmq/green/core.py287
-rw-r--r--external_libs/python/zmq/green/device.py32
-rw-r--r--external_libs/python/zmq/green/eventloop/__init__.py3
-rw-r--r--external_libs/python/zmq/green/eventloop/ioloop.py33
-rw-r--r--external_libs/python/zmq/green/eventloop/zmqstream.py11
-rw-r--r--external_libs/python/zmq/green/poll.py95
7 files changed, 501 insertions, 0 deletions
diff --git a/external_libs/python/zmq/green/__init__.py b/external_libs/python/zmq/green/__init__.py
new file mode 100644
index 00000000..ff7e5965
--- /dev/null
+++ b/external_libs/python/zmq/green/__init__.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+#-----------------------------------------------------------------------------
+# Copyright (C) 2011-2012 Travis Cline
+#
+# This file is part of pyzmq
+# It is adapted from upstream project zeromq_gevent under the New BSD License
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+"""zmq.green - gevent compatibility with zeromq.
+
+Usage
+-----
+
+Instead of importing zmq directly, do so in the following manner:
+
+..
+
+ import zmq.green as zmq
+
+
+Any calls that would have blocked the current thread will now only block the
+current green thread.
+
+This compatibility is accomplished by ensuring the nonblocking flag is set
+before any blocking operation and the ØMQ file descriptor is polled internally
+to trigger needed events.
+"""
+
+from zmq import *
+from zmq.green.core import _Context, _Socket
+from zmq.green.poll import _Poller
+Context = _Context
+Socket = _Socket
+Poller = _Poller
+
+from zmq.green.device import device
+
diff --git a/external_libs/python/zmq/green/core.py b/external_libs/python/zmq/green/core.py
new file mode 100644
index 00000000..9fc73e32
--- /dev/null
+++ b/external_libs/python/zmq/green/core.py
@@ -0,0 +1,287 @@
+#-----------------------------------------------------------------------------
+# Copyright (C) 2011-2012 Travis Cline
+#
+# This file is part of pyzmq
+# It is adapted from upstream project zeromq_gevent under the New BSD License
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
+"""
+
+from __future__ import print_function
+
+import sys
+import time
+import warnings
+
+import zmq
+
+from zmq import Context as _original_Context
+from zmq import Socket as _original_Socket
+from .poll import _Poller
+
+import gevent
+from gevent.event import AsyncResult
+from gevent.hub import get_hub
+
+if hasattr(zmq, 'RCVTIMEO'):
+ TIMEOS = (zmq.RCVTIMEO, zmq.SNDTIMEO)
+else:
+ TIMEOS = ()
+
+def _stop(evt):
+ """simple wrapper for stopping an Event, allowing for method rename in gevent 1.0"""
+ try:
+ evt.stop()
+ except AttributeError as e:
+ # gevent<1.0 compat
+ evt.cancel()
+
+class _Socket(_original_Socket):
+ """Green version of :class:`zmq.Socket`
+
+ The following methods are overridden:
+
+ * send
+ * recv
+
+ To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or receiving
+ is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
+
+ The `__state_changed` method is triggered when the zmq.FD for the socket is
+ marked as readable and triggers the necessary read and write events (which
+ are waited for in the recv and send methods).
+
+ Some double underscore prefixes are used to minimize pollution of
+ :class:`zmq.Socket`'s namespace.
+ """
+ __in_send_multipart = False
+ __in_recv_multipart = False
+ __writable = None
+ __readable = None
+ _state_event = None
+ _gevent_bug_timeout = 11.6 # timeout for not trusting gevent
+ _debug_gevent = False # turn on if you think gevent is missing events
+ _poller_class = _Poller
+
+ def __init__(self, context, socket_type):
+ _original_Socket.__init__(self, context, socket_type)
+ self.__in_send_multipart = False
+ self.__in_recv_multipart = False
+ self.__setup_events()
+
+
+ def __del__(self):
+ self.close()
+
+ def close(self, linger=None):
+ super(_Socket, self).close(linger)
+ self.__cleanup_events()
+
+ def __cleanup_events(self):
+ # close the _state_event event, keeps the number of active file descriptors down
+ if getattr(self, '_state_event', None):
+ _stop(self._state_event)
+ self._state_event = None
+ # if the socket has entered a close state resume any waiting greenlets
+ self.__writable.set()
+ self.__readable.set()
+
+ def __setup_events(self):
+ self.__readable = AsyncResult()
+ self.__writable = AsyncResult()
+ self.__readable.set()
+ self.__writable.set()
+
+ try:
+ self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher
+ self._state_event.start(self.__state_changed)
+ except AttributeError:
+ # for gevent<1.0 compatibility
+ from gevent.core import read_event
+ self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True)
+
+ def __state_changed(self, event=None, _evtype=None):
+ if self.closed:
+ self.__cleanup_events()
+ return
+ try:
+ # avoid triggering __state_changed from inside __state_changed
+ events = super(_Socket, self).getsockopt(zmq.EVENTS)
+ except zmq.ZMQError as exc:
+ self.__writable.set_exception(exc)
+ self.__readable.set_exception(exc)
+ else:
+ if events & zmq.POLLOUT:
+ self.__writable.set()
+ if events & zmq.POLLIN:
+ self.__readable.set()
+
+ def _wait_write(self):
+ assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
+ self.__writable = AsyncResult()
+ # timeout is because libzmq cannot be trusted to properly signal a new send event:
+ # this is effectively a maximum poll interval of 1s
+ tic = time.time()
+ dt = self._gevent_bug_timeout
+ if dt:
+ timeout = gevent.Timeout(seconds=dt)
+ else:
+ timeout = None
+ try:
+ if timeout:
+ timeout.start()
+ self.__writable.get(block=True)
+ except gevent.Timeout as t:
+ if t is not timeout:
+ raise
+ toc = time.time()
+ # gevent bug: get can raise timeout even on clean return
+ # don't display zmq bug warning for gevent bug (this is getting ridiculous)
+ if self._debug_gevent and timeout and toc-tic > dt and \
+ self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
+ print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
+ finally:
+ if timeout:
+ timeout.cancel()
+ self.__writable.set()
+
+ def _wait_read(self):
+ assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
+ self.__readable = AsyncResult()
+ # timeout is because libzmq cannot always be trusted to play nice with libevent.
+ # I can only confirm that this actually happens for send, but lets be symmetrical
+ # with our dirty hacks.
+ # this is effectively a maximum poll interval of 1s
+ tic = time.time()
+ dt = self._gevent_bug_timeout
+ if dt:
+ timeout = gevent.Timeout(seconds=dt)
+ else:
+ timeout = None
+ try:
+ if timeout:
+ timeout.start()
+ self.__readable.get(block=True)
+ except gevent.Timeout as t:
+ if t is not timeout:
+ raise
+ toc = time.time()
+ # gevent bug: get can raise timeout even on clean return
+ # don't display zmq bug warning for gevent bug (this is getting ridiculous)
+ if self._debug_gevent and timeout and toc-tic > dt and \
+ self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
+ print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
+ finally:
+ if timeout:
+ timeout.cancel()
+ self.__readable.set()
+
+ def send(self, data, flags=0, copy=True, track=False):
+ """send, which will only block current greenlet
+
+ state_changed always fires exactly once (success or fail) at the
+ end of this method.
+ """
+
+ # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
+ if flags & zmq.NOBLOCK:
+ try:
+ msg = super(_Socket, self).send(data, flags, copy, track)
+ finally:
+ if not self.__in_send_multipart:
+ self.__state_changed()
+ return msg
+ # ensure the zmq.NOBLOCK flag is part of flags
+ flags |= zmq.NOBLOCK
+ while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
+ try:
+ # attempt the actual call
+ msg = super(_Socket, self).send(data, flags, copy, track)
+ except zmq.ZMQError as e:
+ # if the raised ZMQError is not EAGAIN, reraise
+ if e.errno != zmq.EAGAIN:
+ if not self.__in_send_multipart:
+ self.__state_changed()
+ raise
+ else:
+ if not self.__in_send_multipart:
+ self.__state_changed()
+ return msg
+ # defer to the event loop until we're notified the socket is writable
+ self._wait_write()
+
+ def recv(self, flags=0, copy=True, track=False):
+ """recv, which will only block current greenlet
+
+ state_changed always fires exactly once (success or fail) at the
+ end of this method.
+ """
+ if flags & zmq.NOBLOCK:
+ try:
+ msg = super(_Socket, self).recv(flags, copy, track)
+ finally:
+ if not self.__in_recv_multipart:
+ self.__state_changed()
+ return msg
+
+ flags |= zmq.NOBLOCK
+ while True:
+ try:
+ msg = super(_Socket, self).recv(flags, copy, track)
+ except zmq.ZMQError as e:
+ if e.errno != zmq.EAGAIN:
+ if not self.__in_recv_multipart:
+ self.__state_changed()
+ raise
+ else:
+ if not self.__in_recv_multipart:
+ self.__state_changed()
+ return msg
+ self._wait_read()
+
+ def send_multipart(self, *args, **kwargs):
+ """wrap send_multipart to prevent state_changed on each partial send"""
+ self.__in_send_multipart = True
+ try:
+ msg = super(_Socket, self).send_multipart(*args, **kwargs)
+ finally:
+ self.__in_send_multipart = False
+ self.__state_changed()
+ return msg
+
+ def recv_multipart(self, *args, **kwargs):
+ """wrap recv_multipart to prevent state_changed on each partial recv"""
+ self.__in_recv_multipart = True
+ try:
+ msg = super(_Socket, self).recv_multipart(*args, **kwargs)
+ finally:
+ self.__in_recv_multipart = False
+ self.__state_changed()
+ return msg
+
+ def get(self, opt):
+ """trigger state_changed on getsockopt(EVENTS)"""
+ if opt in TIMEOS:
+ warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
+ optval = super(_Socket, self).get(opt)
+ if opt == zmq.EVENTS:
+ self.__state_changed()
+ return optval
+
+ def set(self, opt, val):
+ """set socket option"""
+ if opt in TIMEOS:
+ warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
+ return super(_Socket, self).set(opt, val)
+
+
+class _Context(_original_Context):
+ """Replacement for :class:`zmq.Context`
+
+ Ensures that the greened Socket above is used in calls to `socket`.
+ """
+ _socket_class = _Socket
diff --git a/external_libs/python/zmq/green/device.py b/external_libs/python/zmq/green/device.py
new file mode 100644
index 00000000..4b070237
--- /dev/null
+++ b/external_libs/python/zmq/green/device.py
@@ -0,0 +1,32 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import zmq
+from zmq.green import Poller
+
+def device(device_type, isocket, osocket):
+ """Start a zeromq device (gevent-compatible).
+
+ Unlike the true zmq.device, this does not release the GIL.
+
+ Parameters
+ ----------
+ device_type : (QUEUE, FORWARDER, STREAMER)
+ The type of device to start (ignored).
+ isocket : Socket
+ The Socket instance for the incoming traffic.
+ osocket : Socket
+ The Socket instance for the outbound traffic.
+ """
+ p = Poller()
+ if osocket == -1:
+ osocket = isocket
+ p.register(isocket, zmq.POLLIN)
+ p.register(osocket, zmq.POLLIN)
+
+ while True:
+ events = dict(p.poll())
+ if isocket in events:
+ osocket.send_multipart(isocket.recv_multipart())
+ if osocket in events:
+ isocket.send_multipart(osocket.recv_multipart())
diff --git a/external_libs/python/zmq/green/eventloop/__init__.py b/external_libs/python/zmq/green/eventloop/__init__.py
new file mode 100644
index 00000000..c5150efe
--- /dev/null
+++ b/external_libs/python/zmq/green/eventloop/__init__.py
@@ -0,0 +1,3 @@
+from zmq.green.eventloop.ioloop import IOLoop
+
+__all__ = ['IOLoop'] \ No newline at end of file
diff --git a/external_libs/python/zmq/green/eventloop/ioloop.py b/external_libs/python/zmq/green/eventloop/ioloop.py
new file mode 100644
index 00000000..e12fd5e9
--- /dev/null
+++ b/external_libs/python/zmq/green/eventloop/ioloop.py
@@ -0,0 +1,33 @@
+from zmq.eventloop.ioloop import *
+from zmq.green import Poller
+
+RealIOLoop = IOLoop
+RealZMQPoller = ZMQPoller
+
+class IOLoop(RealIOLoop):
+
+ def initialize(self, impl=None):
+ impl = _poll() if impl is None else impl
+ super(IOLoop, self).initialize(impl)
+
+ @staticmethod
+ def instance():
+ """Returns a global `IOLoop` instance.
+
+ Most applications have a single, global `IOLoop` running on the
+ main thread. Use this method to get this instance from
+ another thread. To get the current thread's `IOLoop`, use `current()`.
+ """
+ # install this class as the active IOLoop implementation
+ # when using tornado 3
+ if tornado_version >= (3,):
+ PollIOLoop.configure(IOLoop)
+ return PollIOLoop.instance()
+
+
+class ZMQPoller(RealZMQPoller):
+ """gevent-compatible version of ioloop.ZMQPoller"""
+ def __init__(self):
+ self._poller = Poller()
+
+_poll = ZMQPoller
diff --git a/external_libs/python/zmq/green/eventloop/zmqstream.py b/external_libs/python/zmq/green/eventloop/zmqstream.py
new file mode 100644
index 00000000..90fbd1f5
--- /dev/null
+++ b/external_libs/python/zmq/green/eventloop/zmqstream.py
@@ -0,0 +1,11 @@
+from zmq.eventloop.zmqstream import *
+
+from zmq.green.eventloop.ioloop import IOLoop
+
+RealZMQStream = ZMQStream
+
+class ZMQStream(RealZMQStream):
+
+ def __init__(self, socket, io_loop=None):
+ io_loop = io_loop or IOLoop.instance()
+ super(ZMQStream, self).__init__(socket, io_loop=io_loop)
diff --git a/external_libs/python/zmq/green/poll.py b/external_libs/python/zmq/green/poll.py
new file mode 100644
index 00000000..8f016129
--- /dev/null
+++ b/external_libs/python/zmq/green/poll.py
@@ -0,0 +1,95 @@
+import zmq
+import gevent
+from gevent import select
+
+from zmq import Poller as _original_Poller
+
+
+class _Poller(_original_Poller):
+ """Replacement for :class:`zmq.Poller`
+
+ Ensures that the greened Poller below is used in calls to
+ :meth:`zmq.Poller.poll`.
+ """
+ _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
+
+ def _get_descriptors(self):
+ """Returns three elements tuple with socket descriptors ready
+ for gevent.select.select
+ """
+ rlist = []
+ wlist = []
+ xlist = []
+
+ for socket, flags in self.sockets:
+ if isinstance(socket, zmq.Socket):
+ rlist.append(socket.getsockopt(zmq.FD))
+ continue
+ elif isinstance(socket, int):
+ fd = socket
+ elif hasattr(socket, 'fileno'):
+ try:
+ fd = int(socket.fileno())
+ except:
+ raise ValueError('fileno() must return an valid integer fd')
+ else:
+ raise TypeError('Socket must be a 0MQ socket, an integer fd '
+ 'or have a fileno() method: %r' % socket)
+
+ if flags & zmq.POLLIN:
+ rlist.append(fd)
+ if flags & zmq.POLLOUT:
+ wlist.append(fd)
+ if flags & zmq.POLLERR:
+ xlist.append(fd)
+
+ return (rlist, wlist, xlist)
+
+ def poll(self, timeout=-1):
+ """Overridden method to ensure that the green version of
+ Poller is used.
+
+ Behaves the same as :meth:`zmq.core.Poller.poll`
+ """
+
+ if timeout is None:
+ timeout = -1
+
+ if timeout < 0:
+ timeout = -1
+
+ rlist = None
+ wlist = None
+ xlist = None
+
+ if timeout > 0:
+ tout = gevent.Timeout.start_new(timeout/1000.0)
+
+ try:
+ # Loop until timeout or events available
+ rlist, wlist, xlist = self._get_descriptors()
+ while True:
+ events = super(_Poller, self).poll(0)
+ if events or timeout == 0:
+ return events
+
+ # wait for activity on sockets in a green way
+ # set a minimum poll frequency,
+ # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
+ _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
+ try:
+ select.select(rlist, wlist, xlist)
+ except gevent.Timeout as t:
+ if t is not _bug_timeout:
+ raise
+ finally:
+ _bug_timeout.cancel()
+
+ except gevent.Timeout as t:
+ if t is not tout:
+ raise
+ return []
+ finally:
+ if timeout > 0:
+ tout.cancel()
+