summaryrefslogtreecommitdiffstats
path: root/external_libs/python/zmq/green
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/zmq/green')
-rw-r--r--external_libs/python/zmq/green/__init__.py40
-rw-r--r--external_libs/python/zmq/green/core.py287
-rw-r--r--external_libs/python/zmq/green/device.py32
-rw-r--r--external_libs/python/zmq/green/eventloop/__init__.py3
-rw-r--r--external_libs/python/zmq/green/eventloop/ioloop.py33
-rw-r--r--external_libs/python/zmq/green/eventloop/zmqstream.py11
-rw-r--r--external_libs/python/zmq/green/poll.py95
7 files changed, 0 insertions, 501 deletions
diff --git a/external_libs/python/zmq/green/__init__.py b/external_libs/python/zmq/green/__init__.py
deleted file mode 100644
index ff7e5965..00000000
--- a/external_libs/python/zmq/green/__init__.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# -*- coding: utf-8 -*-
-#-----------------------------------------------------------------------------
-# Copyright (C) 2011-2012 Travis Cline
-#
-# This file is part of pyzmq
-# It is adapted from upstream project zeromq_gevent under the New BSD License
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-"""zmq.green - gevent compatibility with zeromq.
-
-Usage
------
-
-Instead of importing zmq directly, do so in the following manner:
-
-..
-
- import zmq.green as zmq
-
-
-Any calls that would have blocked the current thread will now only block the
-current green thread.
-
-This compatibility is accomplished by ensuring the nonblocking flag is set
-before any blocking operation and the ØMQ file descriptor is polled internally
-to trigger needed events.
-"""
-
-from zmq import *
-from zmq.green.core import _Context, _Socket
-from zmq.green.poll import _Poller
-Context = _Context
-Socket = _Socket
-Poller = _Poller
-
-from zmq.green.device import device
-
diff --git a/external_libs/python/zmq/green/core.py b/external_libs/python/zmq/green/core.py
deleted file mode 100644
index 9fc73e32..00000000
--- a/external_libs/python/zmq/green/core.py
+++ /dev/null
@@ -1,287 +0,0 @@
-#-----------------------------------------------------------------------------
-# Copyright (C) 2011-2012 Travis Cline
-#
-# This file is part of pyzmq
-# It is adapted from upstream project zeromq_gevent under the New BSD License
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
-"""
-
-from __future__ import print_function
-
-import sys
-import time
-import warnings
-
-import zmq
-
-from zmq import Context as _original_Context
-from zmq import Socket as _original_Socket
-from .poll import _Poller
-
-import gevent
-from gevent.event import AsyncResult
-from gevent.hub import get_hub
-
-if hasattr(zmq, 'RCVTIMEO'):
- TIMEOS = (zmq.RCVTIMEO, zmq.SNDTIMEO)
-else:
- TIMEOS = ()
-
-def _stop(evt):
- """simple wrapper for stopping an Event, allowing for method rename in gevent 1.0"""
- try:
- evt.stop()
- except AttributeError as e:
- # gevent<1.0 compat
- evt.cancel()
-
-class _Socket(_original_Socket):
- """Green version of :class:`zmq.Socket`
-
- The following methods are overridden:
-
- * send
- * recv
-
- To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or receiving
- is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
-
- The `__state_changed` method is triggered when the zmq.FD for the socket is
- marked as readable and triggers the necessary read and write events (which
- are waited for in the recv and send methods).
-
- Some double underscore prefixes are used to minimize pollution of
- :class:`zmq.Socket`'s namespace.
- """
- __in_send_multipart = False
- __in_recv_multipart = False
- __writable = None
- __readable = None
- _state_event = None
- _gevent_bug_timeout = 11.6 # timeout for not trusting gevent
- _debug_gevent = False # turn on if you think gevent is missing events
- _poller_class = _Poller
-
- def __init__(self, context, socket_type):
- _original_Socket.__init__(self, context, socket_type)
- self.__in_send_multipart = False
- self.__in_recv_multipart = False
- self.__setup_events()
-
-
- def __del__(self):
- self.close()
-
- def close(self, linger=None):
- super(_Socket, self).close(linger)
- self.__cleanup_events()
-
- def __cleanup_events(self):
- # close the _state_event event, keeps the number of active file descriptors down
- if getattr(self, '_state_event', None):
- _stop(self._state_event)
- self._state_event = None
- # if the socket has entered a close state resume any waiting greenlets
- self.__writable.set()
- self.__readable.set()
-
- def __setup_events(self):
- self.__readable = AsyncResult()
- self.__writable = AsyncResult()
- self.__readable.set()
- self.__writable.set()
-
- try:
- self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher
- self._state_event.start(self.__state_changed)
- except AttributeError:
- # for gevent<1.0 compatibility
- from gevent.core import read_event
- self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True)
-
- def __state_changed(self, event=None, _evtype=None):
- if self.closed:
- self.__cleanup_events()
- return
- try:
- # avoid triggering __state_changed from inside __state_changed
- events = super(_Socket, self).getsockopt(zmq.EVENTS)
- except zmq.ZMQError as exc:
- self.__writable.set_exception(exc)
- self.__readable.set_exception(exc)
- else:
- if events & zmq.POLLOUT:
- self.__writable.set()
- if events & zmq.POLLIN:
- self.__readable.set()
-
- def _wait_write(self):
- assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
- self.__writable = AsyncResult()
- # timeout is because libzmq cannot be trusted to properly signal a new send event:
- # this is effectively a maximum poll interval of 1s
- tic = time.time()
- dt = self._gevent_bug_timeout
- if dt:
- timeout = gevent.Timeout(seconds=dt)
- else:
- timeout = None
- try:
- if timeout:
- timeout.start()
- self.__writable.get(block=True)
- except gevent.Timeout as t:
- if t is not timeout:
- raise
- toc = time.time()
- # gevent bug: get can raise timeout even on clean return
- # don't display zmq bug warning for gevent bug (this is getting ridiculous)
- if self._debug_gevent and timeout and toc-tic > dt and \
- self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
- print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
- finally:
- if timeout:
- timeout.cancel()
- self.__writable.set()
-
- def _wait_read(self):
- assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
- self.__readable = AsyncResult()
- # timeout is because libzmq cannot always be trusted to play nice with libevent.
- # I can only confirm that this actually happens for send, but lets be symmetrical
- # with our dirty hacks.
- # this is effectively a maximum poll interval of 1s
- tic = time.time()
- dt = self._gevent_bug_timeout
- if dt:
- timeout = gevent.Timeout(seconds=dt)
- else:
- timeout = None
- try:
- if timeout:
- timeout.start()
- self.__readable.get(block=True)
- except gevent.Timeout as t:
- if t is not timeout:
- raise
- toc = time.time()
- # gevent bug: get can raise timeout even on clean return
- # don't display zmq bug warning for gevent bug (this is getting ridiculous)
- if self._debug_gevent and timeout and toc-tic > dt and \
- self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
- print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
- finally:
- if timeout:
- timeout.cancel()
- self.__readable.set()
-
- def send(self, data, flags=0, copy=True, track=False):
- """send, which will only block current greenlet
-
- state_changed always fires exactly once (success or fail) at the
- end of this method.
- """
-
- # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
- if flags & zmq.NOBLOCK:
- try:
- msg = super(_Socket, self).send(data, flags, copy, track)
- finally:
- if not self.__in_send_multipart:
- self.__state_changed()
- return msg
- # ensure the zmq.NOBLOCK flag is part of flags
- flags |= zmq.NOBLOCK
- while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
- try:
- # attempt the actual call
- msg = super(_Socket, self).send(data, flags, copy, track)
- except zmq.ZMQError as e:
- # if the raised ZMQError is not EAGAIN, reraise
- if e.errno != zmq.EAGAIN:
- if not self.__in_send_multipart:
- self.__state_changed()
- raise
- else:
- if not self.__in_send_multipart:
- self.__state_changed()
- return msg
- # defer to the event loop until we're notified the socket is writable
- self._wait_write()
-
- def recv(self, flags=0, copy=True, track=False):
- """recv, which will only block current greenlet
-
- state_changed always fires exactly once (success or fail) at the
- end of this method.
- """
- if flags & zmq.NOBLOCK:
- try:
- msg = super(_Socket, self).recv(flags, copy, track)
- finally:
- if not self.__in_recv_multipart:
- self.__state_changed()
- return msg
-
- flags |= zmq.NOBLOCK
- while True:
- try:
- msg = super(_Socket, self).recv(flags, copy, track)
- except zmq.ZMQError as e:
- if e.errno != zmq.EAGAIN:
- if not self.__in_recv_multipart:
- self.__state_changed()
- raise
- else:
- if not self.__in_recv_multipart:
- self.__state_changed()
- return msg
- self._wait_read()
-
- def send_multipart(self, *args, **kwargs):
- """wrap send_multipart to prevent state_changed on each partial send"""
- self.__in_send_multipart = True
- try:
- msg = super(_Socket, self).send_multipart(*args, **kwargs)
- finally:
- self.__in_send_multipart = False
- self.__state_changed()
- return msg
-
- def recv_multipart(self, *args, **kwargs):
- """wrap recv_multipart to prevent state_changed on each partial recv"""
- self.__in_recv_multipart = True
- try:
- msg = super(_Socket, self).recv_multipart(*args, **kwargs)
- finally:
- self.__in_recv_multipart = False
- self.__state_changed()
- return msg
-
- def get(self, opt):
- """trigger state_changed on getsockopt(EVENTS)"""
- if opt in TIMEOS:
- warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
- optval = super(_Socket, self).get(opt)
- if opt == zmq.EVENTS:
- self.__state_changed()
- return optval
-
- def set(self, opt, val):
- """set socket option"""
- if opt in TIMEOS:
- warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
- return super(_Socket, self).set(opt, val)
-
-
-class _Context(_original_Context):
- """Replacement for :class:`zmq.Context`
-
- Ensures that the greened Socket above is used in calls to `socket`.
- """
- _socket_class = _Socket
diff --git a/external_libs/python/zmq/green/device.py b/external_libs/python/zmq/green/device.py
deleted file mode 100644
index 4b070237..00000000
--- a/external_libs/python/zmq/green/device.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import zmq
-from zmq.green import Poller
-
-def device(device_type, isocket, osocket):
- """Start a zeromq device (gevent-compatible).
-
- Unlike the true zmq.device, this does not release the GIL.
-
- Parameters
- ----------
- device_type : (QUEUE, FORWARDER, STREAMER)
- The type of device to start (ignored).
- isocket : Socket
- The Socket instance for the incoming traffic.
- osocket : Socket
- The Socket instance for the outbound traffic.
- """
- p = Poller()
- if osocket == -1:
- osocket = isocket
- p.register(isocket, zmq.POLLIN)
- p.register(osocket, zmq.POLLIN)
-
- while True:
- events = dict(p.poll())
- if isocket in events:
- osocket.send_multipart(isocket.recv_multipart())
- if osocket in events:
- isocket.send_multipart(osocket.recv_multipart())
diff --git a/external_libs/python/zmq/green/eventloop/__init__.py b/external_libs/python/zmq/green/eventloop/__init__.py
deleted file mode 100644
index c5150efe..00000000
--- a/external_libs/python/zmq/green/eventloop/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from zmq.green.eventloop.ioloop import IOLoop
-
-__all__ = ['IOLoop'] \ No newline at end of file
diff --git a/external_libs/python/zmq/green/eventloop/ioloop.py b/external_libs/python/zmq/green/eventloop/ioloop.py
deleted file mode 100644
index e12fd5e9..00000000
--- a/external_libs/python/zmq/green/eventloop/ioloop.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from zmq.eventloop.ioloop import *
-from zmq.green import Poller
-
-RealIOLoop = IOLoop
-RealZMQPoller = ZMQPoller
-
-class IOLoop(RealIOLoop):
-
- def initialize(self, impl=None):
- impl = _poll() if impl is None else impl
- super(IOLoop, self).initialize(impl)
-
- @staticmethod
- def instance():
- """Returns a global `IOLoop` instance.
-
- Most applications have a single, global `IOLoop` running on the
- main thread. Use this method to get this instance from
- another thread. To get the current thread's `IOLoop`, use `current()`.
- """
- # install this class as the active IOLoop implementation
- # when using tornado 3
- if tornado_version >= (3,):
- PollIOLoop.configure(IOLoop)
- return PollIOLoop.instance()
-
-
-class ZMQPoller(RealZMQPoller):
- """gevent-compatible version of ioloop.ZMQPoller"""
- def __init__(self):
- self._poller = Poller()
-
-_poll = ZMQPoller
diff --git a/external_libs/python/zmq/green/eventloop/zmqstream.py b/external_libs/python/zmq/green/eventloop/zmqstream.py
deleted file mode 100644
index 90fbd1f5..00000000
--- a/external_libs/python/zmq/green/eventloop/zmqstream.py
+++ /dev/null
@@ -1,11 +0,0 @@
-from zmq.eventloop.zmqstream import *
-
-from zmq.green.eventloop.ioloop import IOLoop
-
-RealZMQStream = ZMQStream
-
-class ZMQStream(RealZMQStream):
-
- def __init__(self, socket, io_loop=None):
- io_loop = io_loop or IOLoop.instance()
- super(ZMQStream, self).__init__(socket, io_loop=io_loop)
diff --git a/external_libs/python/zmq/green/poll.py b/external_libs/python/zmq/green/poll.py
deleted file mode 100644
index 8f016129..00000000
--- a/external_libs/python/zmq/green/poll.py
+++ /dev/null
@@ -1,95 +0,0 @@
-import zmq
-import gevent
-from gevent import select
-
-from zmq import Poller as _original_Poller
-
-
-class _Poller(_original_Poller):
- """Replacement for :class:`zmq.Poller`
-
- Ensures that the greened Poller below is used in calls to
- :meth:`zmq.Poller.poll`.
- """
- _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
-
- def _get_descriptors(self):
- """Returns three elements tuple with socket descriptors ready
- for gevent.select.select
- """
- rlist = []
- wlist = []
- xlist = []
-
- for socket, flags in self.sockets:
- if isinstance(socket, zmq.Socket):
- rlist.append(socket.getsockopt(zmq.FD))
- continue
- elif isinstance(socket, int):
- fd = socket
- elif hasattr(socket, 'fileno'):
- try:
- fd = int(socket.fileno())
- except:
- raise ValueError('fileno() must return an valid integer fd')
- else:
- raise TypeError('Socket must be a 0MQ socket, an integer fd '
- 'or have a fileno() method: %r' % socket)
-
- if flags & zmq.POLLIN:
- rlist.append(fd)
- if flags & zmq.POLLOUT:
- wlist.append(fd)
- if flags & zmq.POLLERR:
- xlist.append(fd)
-
- return (rlist, wlist, xlist)
-
- def poll(self, timeout=-1):
- """Overridden method to ensure that the green version of
- Poller is used.
-
- Behaves the same as :meth:`zmq.core.Poller.poll`
- """
-
- if timeout is None:
- timeout = -1
-
- if timeout < 0:
- timeout = -1
-
- rlist = None
- wlist = None
- xlist = None
-
- if timeout > 0:
- tout = gevent.Timeout.start_new(timeout/1000.0)
-
- try:
- # Loop until timeout or events available
- rlist, wlist, xlist = self._get_descriptors()
- while True:
- events = super(_Poller, self).poll(0)
- if events or timeout == 0:
- return events
-
- # wait for activity on sockets in a green way
- # set a minimum poll frequency,
- # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
- _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
- try:
- select.select(rlist, wlist, xlist)
- except gevent.Timeout as t:
- if t is not _bug_timeout:
- raise
- finally:
- _bug_timeout.cancel()
-
- except gevent.Timeout as t:
- if t is not tout:
- raise
- return []
- finally:
- if timeout > 0:
- tout.cancel()
-