summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/devices
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/devices')
-rwxr-xr-xsrc/console/zmq/devices/__init__.py16
-rwxr-xr-xsrc/console/zmq/devices/basedevice.py229
-rw-r--r--src/console/zmq/devices/monitoredqueue.pxd177
-rwxr-xr-xsrc/console/zmq/devices/monitoredqueue.py7
-rwxr-xr-xsrc/console/zmq/devices/monitoredqueuedevice.py66
-rwxr-xr-xsrc/console/zmq/devices/proxydevice.py90
6 files changed, 585 insertions, 0 deletions
diff --git a/src/console/zmq/devices/__init__.py b/src/console/zmq/devices/__init__.py
new file mode 100755
index 00000000..23715963
--- /dev/null
+++ b/src/console/zmq/devices/__init__.py
@@ -0,0 +1,16 @@
+"""0MQ Device classes for running in background threads or processes."""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+from zmq import device
+from zmq.devices import basedevice, proxydevice, monitoredqueue, monitoredqueuedevice
+
+from zmq.devices.basedevice import *
+from zmq.devices.proxydevice import *
+from zmq.devices.monitoredqueue import *
+from zmq.devices.monitoredqueuedevice import *
+
+__all__ = ['device']
+for submod in (basedevice, proxydevice, monitoredqueue, monitoredqueuedevice):
+ __all__.extend(submod.__all__)
diff --git a/src/console/zmq/devices/basedevice.py b/src/console/zmq/devices/basedevice.py
new file mode 100755
index 00000000..7ba1b7ac
--- /dev/null
+++ b/src/console/zmq/devices/basedevice.py
@@ -0,0 +1,229 @@
+"""Classes for running 0MQ Devices in the background."""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import time
+from threading import Thread
+from multiprocessing import Process
+
+from zmq import device, QUEUE, Context, ETERM, ZMQError
+
+
+class Device:
+ """A 0MQ Device to be run in the background.
+
+ You do not pass Socket instances to this, but rather Socket types::
+
+ Device(device_type, in_socket_type, out_socket_type)
+
+ For instance::
+
+ dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
+
+ Similar to zmq.device, but socket types instead of sockets themselves are
+ passed, and the sockets are created in the work thread, to avoid issues
+ with thread safety. As a result, additional bind_{in|out} and
+ connect_{in|out} methods and setsockopt_{in|out} allow users to specify
+ connections for the sockets.
+
+ Parameters
+ ----------
+ device_type : int
+ The 0MQ Device type
+ {in|out}_type : int
+ zmq socket types, to be passed later to context.socket(). e.g.
+ zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
+ for both in_socket and out_socket.
+
+ Methods
+ -------
+ bind_{in_out}(iface)
+ passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
+ connect_{in_out}(iface)
+ passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
+ thread
+ setsockopt_{in_out}(opt,value)
+ passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
+ the thread
+
+ Attributes
+ ----------
+ daemon : int
+ sets whether the thread should be run as a daemon
+ Default is true, because if it is false, the thread will not
+ exit unless it is killed
+ context_factory : callable (class attribute)
+ Function for creating the Context. This will be Context.instance
+ in ThreadDevices, and Context in ProcessDevices. The only reason
+ it is not instance() in ProcessDevices is that there may be a stale
+ Context instance already initialized, and the forked environment
+ should *never* try to use it.
+ """
+
+ context_factory = Context.instance
+ """Callable that returns a context. Typically either Context.instance or Context,
+ depending on whether the device should share the global instance or not.
+ """
+
+ def __init__(self, device_type=QUEUE, in_type=None, out_type=None):
+ self.device_type = device_type
+ if in_type is None:
+ raise TypeError("in_type must be specified")
+ if out_type is None:
+ raise TypeError("out_type must be specified")
+ self.in_type = in_type
+ self.out_type = out_type
+ self._in_binds = []
+ self._in_connects = []
+ self._in_sockopts = []
+ self._out_binds = []
+ self._out_connects = []
+ self._out_sockopts = []
+ self.daemon = True
+ self.done = False
+
+ def bind_in(self, addr):
+ """Enqueue ZMQ address for binding on in_socket.
+
+ See zmq.Socket.bind for details.
+ """
+ self._in_binds.append(addr)
+
+ def connect_in(self, addr):
+ """Enqueue ZMQ address for connecting on in_socket.
+
+ See zmq.Socket.connect for details.
+ """
+ self._in_connects.append(addr)
+
+ def setsockopt_in(self, opt, value):
+ """Enqueue setsockopt(opt, value) for in_socket
+
+ See zmq.Socket.setsockopt for details.
+ """
+ self._in_sockopts.append((opt, value))
+
+ def bind_out(self, addr):
+ """Enqueue ZMQ address for binding on out_socket.
+
+ See zmq.Socket.bind for details.
+ """
+ self._out_binds.append(addr)
+
+ def connect_out(self, addr):
+ """Enqueue ZMQ address for connecting on out_socket.
+
+ See zmq.Socket.connect for details.
+ """
+ self._out_connects.append(addr)
+
+ def setsockopt_out(self, opt, value):
+ """Enqueue setsockopt(opt, value) for out_socket
+
+ See zmq.Socket.setsockopt for details.
+ """
+ self._out_sockopts.append((opt, value))
+
+ def _setup_sockets(self):
+ ctx = self.context_factory()
+
+ self._context = ctx
+
+ # create the sockets
+ ins = ctx.socket(self.in_type)
+ if self.out_type < 0:
+ outs = ins
+ else:
+ outs = ctx.socket(self.out_type)
+
+ # set sockopts (must be done first, in case of zmq.IDENTITY)
+ for opt,value in self._in_sockopts:
+ ins.setsockopt(opt, value)
+ for opt,value in self._out_sockopts:
+ outs.setsockopt(opt, value)
+
+ for iface in self._in_binds:
+ ins.bind(iface)
+ for iface in self._out_binds:
+ outs.bind(iface)
+
+ for iface in self._in_connects:
+ ins.connect(iface)
+ for iface in self._out_connects:
+ outs.connect(iface)
+
+ return ins,outs
+
+ def run_device(self):
+ """The runner method.
+
+ Do not call me directly, instead call ``self.start()``, just like a Thread.
+ """
+ ins,outs = self._setup_sockets()
+ device(self.device_type, ins, outs)
+
+ def run(self):
+ """wrap run_device in try/catch ETERM"""
+ try:
+ self.run_device()
+ except ZMQError as e:
+ if e.errno == ETERM:
+ # silence TERM errors, because this should be a clean shutdown
+ pass
+ else:
+ raise
+ finally:
+ self.done = True
+
+ def start(self):
+ """Start the device. Override me in subclass for other launchers."""
+ return self.run()
+
+ def join(self,timeout=None):
+ """wait for me to finish, like Thread.join.
+
+ Reimplemented appropriately by subclasses."""
+ tic = time.time()
+ toc = tic
+ while not self.done and not (timeout is not None and toc-tic > timeout):
+ time.sleep(.001)
+ toc = time.time()
+
+
+class BackgroundDevice(Device):
+ """Base class for launching Devices in background processes and threads."""
+
+ launcher=None
+ _launch_class=None
+
+ def start(self):
+ self.launcher = self._launch_class(target=self.run)
+ self.launcher.daemon = self.daemon
+ return self.launcher.start()
+
+ def join(self, timeout=None):
+ return self.launcher.join(timeout=timeout)
+
+
+class ThreadDevice(BackgroundDevice):
+ """A Device that will be run in a background Thread.
+
+ See Device for details.
+ """
+ _launch_class=Thread
+
+class ProcessDevice(BackgroundDevice):
+ """A Device that will be run in a background Process.
+
+ See Device for details.
+ """
+ _launch_class=Process
+ context_factory = Context
+ """Callable that returns a context. Typically either Context.instance or Context,
+ depending on whether the device should share the global instance or not.
+ """
+
+
+__all__ = ['Device', 'ThreadDevice', 'ProcessDevice']
diff --git a/src/console/zmq/devices/monitoredqueue.pxd b/src/console/zmq/devices/monitoredqueue.pxd
new file mode 100644
index 00000000..1e26ed86
--- /dev/null
+++ b/src/console/zmq/devices/monitoredqueue.pxd
@@ -0,0 +1,177 @@
+"""MonitoredQueue class declarations.
+
+Authors
+-------
+* MinRK
+* Brian Granger
+"""
+
+#
+# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
+#
+# This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp
+# originally from libzmq-2.1.6, used under LGPLv3
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+from libzmq cimport *
+
+#-----------------------------------------------------------------------------
+# MonitoredQueue C functions
+#-----------------------------------------------------------------------------
+
+cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_,
+ zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
+ bint swap_ids) nogil:
+ cdef int rc
+ cdef int64_t flag_2
+ cdef int flag_3
+ cdef int flags
+ cdef bint more
+ cdef size_t flagsz
+ cdef void * flag_ptr
+
+ if ZMQ_VERSION_MAJOR < 3:
+ flagsz = sizeof (int64_t)
+ flag_ptr = &flag_2
+ else:
+ flagsz = sizeof (int)
+ flag_ptr = &flag_3
+
+ if swap_ids:# both router, must send second identity first
+ # recv two ids into msg, id_msg
+ rc = zmq_msg_recv(&msg, insocket_, 0)
+ if rc < 0: return rc
+
+ rc = zmq_msg_recv(&id_msg, insocket_, 0)
+ if rc < 0: return rc
+
+ # send second id (id_msg) first
+ #!!!! always send a copy before the original !!!!
+ rc = zmq_msg_copy(&side_msg, &id_msg)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ # send first id (msg) second
+ rc = zmq_msg_copy(&side_msg, &msg)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ while (True):
+ rc = zmq_msg_recv(&msg, insocket_, 0)
+ if rc < 0: return rc
+ # assert (rc == 0)
+ rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
+ if rc < 0: return rc
+ flags = 0
+ if ZMQ_VERSION_MAJOR < 3:
+ if flag_2:
+ flags |= ZMQ_SNDMORE
+ else:
+ if flag_3:
+ flags |= ZMQ_SNDMORE
+ # LABEL has been removed:
+ # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
+ # if flag_3:
+ # flags |= ZMQ_SNDLABEL
+ # assert (rc == 0)
+
+ rc = zmq_msg_copy(&side_msg, &msg)
+ if rc < 0: return rc
+ if flags:
+ rc = zmq_msg_send(&side_msg, outsocket_, flags)
+ if rc < 0: return rc
+ # only SNDMORE for side-socket
+ rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ else:
+ rc = zmq_msg_send(&side_msg, outsocket_, 0)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&msg, sidesocket_, 0)
+ if rc < 0: return rc
+ break
+ return rc
+
+# the MonitoredQueue C function, adapted from zmq::queue.cpp :
+cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
+ void *sidesocket_, zmq_msg_t *in_msg_ptr,
+ zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
+ """The actual C function for a monitored queue device.
+
+ See ``monitored_queue()`` for details.
+ """
+
+ cdef zmq_msg_t msg
+ cdef int rc = zmq_msg_init (&msg)
+ cdef zmq_msg_t id_msg
+ rc = zmq_msg_init (&id_msg)
+ if rc < 0: return rc
+ cdef zmq_msg_t side_msg
+ rc = zmq_msg_init (&side_msg)
+ if rc < 0: return rc
+
+ cdef zmq_pollitem_t items [2]
+ items [0].socket = insocket_
+ items [0].fd = 0
+ items [0].events = ZMQ_POLLIN
+ items [0].revents = 0
+ items [1].socket = outsocket_
+ items [1].fd = 0
+ items [1].events = ZMQ_POLLIN
+ items [1].revents = 0
+ # I don't think sidesocket should be polled?
+ # items [2].socket = sidesocket_
+ # items [2].fd = 0
+ # items [2].events = ZMQ_POLLIN
+ # items [2].revents = 0
+
+ while (True):
+
+ # // Wait while there are either requests or replies to process.
+ rc = zmq_poll (&items [0], 2, -1)
+ if rc < 0: return rc
+ # // The algorithm below asumes ratio of request and replies processed
+ # // under full load to be 1:1. Although processing requests replies
+ # // first is tempting it is suspectible to DoS attacks (overloading
+ # // the system with unsolicited replies).
+ #
+ # // Process a request.
+ if (items [0].revents & ZMQ_POLLIN):
+ # send in_prefix to side socket
+ rc = zmq_msg_copy(&side_msg, in_msg_ptr)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ # relay the rest of the message
+ rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
+ if rc < 0: return rc
+ if (items [1].revents & ZMQ_POLLIN):
+ # send out_prefix to side socket
+ rc = zmq_msg_copy(&side_msg, out_msg_ptr)
+ if rc < 0: return rc
+ rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
+ if rc < 0: return rc
+ # relay the rest of the message
+ rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
+ if rc < 0: return rc
+ return rc
diff --git a/src/console/zmq/devices/monitoredqueue.py b/src/console/zmq/devices/monitoredqueue.py
new file mode 100755
index 00000000..6d714e51
--- /dev/null
+++ b/src/console/zmq/devices/monitoredqueue.py
@@ -0,0 +1,7 @@
+def __bootstrap__():
+ global __bootstrap__, __loader__, __file__
+ import sys, pkg_resources, imp
+ __file__ = pkg_resources.resource_filename(__name__,'monitoredqueue.so')
+ __loader__ = None; del __bootstrap__, __loader__
+ imp.load_dynamic(__name__,__file__)
+__bootstrap__()
diff --git a/src/console/zmq/devices/monitoredqueuedevice.py b/src/console/zmq/devices/monitoredqueuedevice.py
new file mode 100755
index 00000000..9723f866
--- /dev/null
+++ b/src/console/zmq/devices/monitoredqueuedevice.py
@@ -0,0 +1,66 @@
+"""MonitoredQueue classes and functions."""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+from zmq import ZMQError, PUB
+from zmq.devices.proxydevice import ProxyBase, Proxy, ThreadProxy, ProcessProxy
+from zmq.devices.monitoredqueue import monitored_queue
+
+
+class MonitoredQueueBase(ProxyBase):
+ """Base class for overriding methods."""
+
+ _in_prefix = b''
+ _out_prefix = b''
+
+ def __init__(self, in_type, out_type, mon_type=PUB, in_prefix=b'in', out_prefix=b'out'):
+
+ ProxyBase.__init__(self, in_type=in_type, out_type=out_type, mon_type=mon_type)
+
+ self._in_prefix = in_prefix
+ self._out_prefix = out_prefix
+
+ def run_device(self):
+ ins,outs,mons = self._setup_sockets()
+ monitored_queue(ins, outs, mons, self._in_prefix, self._out_prefix)
+
+
+class MonitoredQueue(MonitoredQueueBase, Proxy):
+ """Class for running monitored_queue in the background.
+
+ See zmq.devices.Device for most of the spec. MonitoredQueue differs from Proxy,
+ only in that it adds a ``prefix`` to messages sent on the monitor socket,
+ with a different prefix for each direction.
+
+ MQ also supports ROUTER on both sides, which zmq.proxy does not.
+
+ If a message arrives on `in_sock`, it will be prefixed with `in_prefix` on the monitor socket.
+ If it arrives on out_sock, it will be prefixed with `out_prefix`.
+
+ A PUB socket is the most logical choice for the mon_socket, but it is not required.
+ """
+ pass
+
+
+class ThreadMonitoredQueue(MonitoredQueueBase, ThreadProxy):
+ """Run zmq.monitored_queue in a background thread.
+
+ See MonitoredQueue and Proxy for details.
+ """
+ pass
+
+
+class ProcessMonitoredQueue(MonitoredQueueBase, ProcessProxy):
+ """Run zmq.monitored_queue in a background thread.
+
+ See MonitoredQueue and Proxy for details.
+ """
+
+
+__all__ = [
+ 'MonitoredQueue',
+ 'ThreadMonitoredQueue',
+ 'ProcessMonitoredQueue'
+]
diff --git a/src/console/zmq/devices/proxydevice.py b/src/console/zmq/devices/proxydevice.py
new file mode 100755
index 00000000..68be3f15
--- /dev/null
+++ b/src/console/zmq/devices/proxydevice.py
@@ -0,0 +1,90 @@
+"""Proxy classes and functions."""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import zmq
+from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice
+
+
+class ProxyBase(object):
+ """Base class for overriding methods."""
+
+ def __init__(self, in_type, out_type, mon_type=zmq.PUB):
+
+ Device.__init__(self, in_type=in_type, out_type=out_type)
+ self.mon_type = mon_type
+ self._mon_binds = []
+ self._mon_connects = []
+ self._mon_sockopts = []
+
+ def bind_mon(self, addr):
+ """Enqueue ZMQ address for binding on mon_socket.
+
+ See zmq.Socket.bind for details.
+ """
+ self._mon_binds.append(addr)
+
+ def connect_mon(self, addr):
+ """Enqueue ZMQ address for connecting on mon_socket.
+
+ See zmq.Socket.bind for details.
+ """
+ self._mon_connects.append(addr)
+
+ def setsockopt_mon(self, opt, value):
+ """Enqueue setsockopt(opt, value) for mon_socket
+
+ See zmq.Socket.setsockopt for details.
+ """
+ self._mon_sockopts.append((opt, value))
+
+ def _setup_sockets(self):
+ ins,outs = Device._setup_sockets(self)
+ ctx = self._context
+ mons = ctx.socket(self.mon_type)
+
+ # set sockopts (must be done first, in case of zmq.IDENTITY)
+ for opt,value in self._mon_sockopts:
+ mons.setsockopt(opt, value)
+
+ for iface in self._mon_binds:
+ mons.bind(iface)
+
+ for iface in self._mon_connects:
+ mons.connect(iface)
+
+ return ins,outs,mons
+
+ def run_device(self):
+ ins,outs,mons = self._setup_sockets()
+ zmq.proxy(ins, outs, mons)
+
+class Proxy(ProxyBase, Device):
+ """Threadsafe Proxy object.
+
+ See zmq.devices.Device for most of the spec. This subclass adds a
+ <method>_mon version of each <method>_{in|out} method, for configuring the
+ monitor socket.
+
+ A Proxy is a 3-socket ZMQ Device that functions just like a
+ QUEUE, except each message is also sent out on the monitor socket.
+
+ A PUB socket is the most logical choice for the mon_socket, but it is not required.
+ """
+ pass
+
+class ThreadProxy(ProxyBase, ThreadDevice):
+ """Proxy in a Thread. See Proxy for more."""
+ pass
+
+class ProcessProxy(ProxyBase, ProcessDevice):
+ """Proxy in a Process. See Proxy for more."""
+ pass
+
+
+__all__ = [
+ 'Proxy',
+ 'ThreadProxy',
+ 'ProcessProxy',
+]