diff options
Diffstat (limited to 'src/console/zmq/devices')
-rwxr-xr-x | src/console/zmq/devices/__init__.py | 16 | ||||
-rwxr-xr-x | src/console/zmq/devices/basedevice.py | 229 | ||||
-rw-r--r-- | src/console/zmq/devices/monitoredqueue.pxd | 177 | ||||
-rwxr-xr-x | src/console/zmq/devices/monitoredqueue.py | 7 | ||||
-rwxr-xr-x | src/console/zmq/devices/monitoredqueuedevice.py | 66 | ||||
-rwxr-xr-x | src/console/zmq/devices/proxydevice.py | 90 |
6 files changed, 585 insertions, 0 deletions
diff --git a/src/console/zmq/devices/__init__.py b/src/console/zmq/devices/__init__.py new file mode 100755 index 00000000..23715963 --- /dev/null +++ b/src/console/zmq/devices/__init__.py @@ -0,0 +1,16 @@ +"""0MQ Device classes for running in background threads or processes.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from zmq import device +from zmq.devices import basedevice, proxydevice, monitoredqueue, monitoredqueuedevice + +from zmq.devices.basedevice import * +from zmq.devices.proxydevice import * +from zmq.devices.monitoredqueue import * +from zmq.devices.monitoredqueuedevice import * + +__all__ = ['device'] +for submod in (basedevice, proxydevice, monitoredqueue, monitoredqueuedevice): + __all__.extend(submod.__all__) diff --git a/src/console/zmq/devices/basedevice.py b/src/console/zmq/devices/basedevice.py new file mode 100755 index 00000000..7ba1b7ac --- /dev/null +++ b/src/console/zmq/devices/basedevice.py @@ -0,0 +1,229 @@ +"""Classes for running 0MQ Devices in the background.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + + +import time +from threading import Thread +from multiprocessing import Process + +from zmq import device, QUEUE, Context, ETERM, ZMQError + + +class Device: + """A 0MQ Device to be run in the background. + + You do not pass Socket instances to this, but rather Socket types:: + + Device(device_type, in_socket_type, out_socket_type) + + For instance:: + + dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER) + + Similar to zmq.device, but socket types instead of sockets themselves are + passed, and the sockets are created in the work thread, to avoid issues + with thread safety. As a result, additional bind_{in|out} and + connect_{in|out} methods and setsockopt_{in|out} allow users to specify + connections for the sockets. + + Parameters + ---------- + device_type : int + The 0MQ Device type + {in|out}_type : int + zmq socket types, to be passed later to context.socket(). e.g. + zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used + for both in_socket and out_socket. + + Methods + ------- + bind_{in_out}(iface) + passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread + connect_{in_out}(iface) + passthrough for ``{in|out}_socket.connect(iface)``, to be called in the + thread + setsockopt_{in_out}(opt,value) + passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in + the thread + + Attributes + ---------- + daemon : int + sets whether the thread should be run as a daemon + Default is true, because if it is false, the thread will not + exit unless it is killed + context_factory : callable (class attribute) + Function for creating the Context. This will be Context.instance + in ThreadDevices, and Context in ProcessDevices. The only reason + it is not instance() in ProcessDevices is that there may be a stale + Context instance already initialized, and the forked environment + should *never* try to use it. + """ + + context_factory = Context.instance + """Callable that returns a context. Typically either Context.instance or Context, + depending on whether the device should share the global instance or not. + """ + + def __init__(self, device_type=QUEUE, in_type=None, out_type=None): + self.device_type = device_type + if in_type is None: + raise TypeError("in_type must be specified") + if out_type is None: + raise TypeError("out_type must be specified") + self.in_type = in_type + self.out_type = out_type + self._in_binds = [] + self._in_connects = [] + self._in_sockopts = [] + self._out_binds = [] + self._out_connects = [] + self._out_sockopts = [] + self.daemon = True + self.done = False + + def bind_in(self, addr): + """Enqueue ZMQ address for binding on in_socket. + + See zmq.Socket.bind for details. + """ + self._in_binds.append(addr) + + def connect_in(self, addr): + """Enqueue ZMQ address for connecting on in_socket. + + See zmq.Socket.connect for details. + """ + self._in_connects.append(addr) + + def setsockopt_in(self, opt, value): + """Enqueue setsockopt(opt, value) for in_socket + + See zmq.Socket.setsockopt for details. + """ + self._in_sockopts.append((opt, value)) + + def bind_out(self, addr): + """Enqueue ZMQ address for binding on out_socket. + + See zmq.Socket.bind for details. + """ + self._out_binds.append(addr) + + def connect_out(self, addr): + """Enqueue ZMQ address for connecting on out_socket. + + See zmq.Socket.connect for details. + """ + self._out_connects.append(addr) + + def setsockopt_out(self, opt, value): + """Enqueue setsockopt(opt, value) for out_socket + + See zmq.Socket.setsockopt for details. + """ + self._out_sockopts.append((opt, value)) + + def _setup_sockets(self): + ctx = self.context_factory() + + self._context = ctx + + # create the sockets + ins = ctx.socket(self.in_type) + if self.out_type < 0: + outs = ins + else: + outs = ctx.socket(self.out_type) + + # set sockopts (must be done first, in case of zmq.IDENTITY) + for opt,value in self._in_sockopts: + ins.setsockopt(opt, value) + for opt,value in self._out_sockopts: + outs.setsockopt(opt, value) + + for iface in self._in_binds: + ins.bind(iface) + for iface in self._out_binds: + outs.bind(iface) + + for iface in self._in_connects: + ins.connect(iface) + for iface in self._out_connects: + outs.connect(iface) + + return ins,outs + + def run_device(self): + """The runner method. + + Do not call me directly, instead call ``self.start()``, just like a Thread. + """ + ins,outs = self._setup_sockets() + device(self.device_type, ins, outs) + + def run(self): + """wrap run_device in try/catch ETERM""" + try: + self.run_device() + except ZMQError as e: + if e.errno == ETERM: + # silence TERM errors, because this should be a clean shutdown + pass + else: + raise + finally: + self.done = True + + def start(self): + """Start the device. Override me in subclass for other launchers.""" + return self.run() + + def join(self,timeout=None): + """wait for me to finish, like Thread.join. + + Reimplemented appropriately by subclasses.""" + tic = time.time() + toc = tic + while not self.done and not (timeout is not None and toc-tic > timeout): + time.sleep(.001) + toc = time.time() + + +class BackgroundDevice(Device): + """Base class for launching Devices in background processes and threads.""" + + launcher=None + _launch_class=None + + def start(self): + self.launcher = self._launch_class(target=self.run) + self.launcher.daemon = self.daemon + return self.launcher.start() + + def join(self, timeout=None): + return self.launcher.join(timeout=timeout) + + +class ThreadDevice(BackgroundDevice): + """A Device that will be run in a background Thread. + + See Device for details. + """ + _launch_class=Thread + +class ProcessDevice(BackgroundDevice): + """A Device that will be run in a background Process. + + See Device for details. + """ + _launch_class=Process + context_factory = Context + """Callable that returns a context. Typically either Context.instance or Context, + depending on whether the device should share the global instance or not. + """ + + +__all__ = ['Device', 'ThreadDevice', 'ProcessDevice'] diff --git a/src/console/zmq/devices/monitoredqueue.pxd b/src/console/zmq/devices/monitoredqueue.pxd new file mode 100644 index 00000000..1e26ed86 --- /dev/null +++ b/src/console/zmq/devices/monitoredqueue.pxd @@ -0,0 +1,177 @@ +"""MonitoredQueue class declarations. + +Authors +------- +* MinRK +* Brian Granger +""" + +# +# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger +# +# This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp +# originally from libzmq-2.1.6, used under LGPLv3 +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport * + +#----------------------------------------------------------------------------- +# MonitoredQueue C functions +#----------------------------------------------------------------------------- + +cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_, + zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg, + bint swap_ids) nogil: + cdef int rc + cdef int64_t flag_2 + cdef int flag_3 + cdef int flags + cdef bint more + cdef size_t flagsz + cdef void * flag_ptr + + if ZMQ_VERSION_MAJOR < 3: + flagsz = sizeof (int64_t) + flag_ptr = &flag_2 + else: + flagsz = sizeof (int) + flag_ptr = &flag_3 + + if swap_ids:# both router, must send second identity first + # recv two ids into msg, id_msg + rc = zmq_msg_recv(&msg, insocket_, 0) + if rc < 0: return rc + + rc = zmq_msg_recv(&id_msg, insocket_, 0) + if rc < 0: return rc + + # send second id (id_msg) first + #!!!! always send a copy before the original !!!! + rc = zmq_msg_copy(&side_msg, &id_msg) + if rc < 0: return rc + rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE) + if rc < 0: return rc + rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE) + if rc < 0: return rc + # send first id (msg) second + rc = zmq_msg_copy(&side_msg, &msg) + if rc < 0: return rc + rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE) + if rc < 0: return rc + rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE) + if rc < 0: return rc + while (True): + rc = zmq_msg_recv(&msg, insocket_, 0) + if rc < 0: return rc + # assert (rc == 0) + rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz) + if rc < 0: return rc + flags = 0 + if ZMQ_VERSION_MAJOR < 3: + if flag_2: + flags |= ZMQ_SNDMORE + else: + if flag_3: + flags |= ZMQ_SNDMORE + # LABEL has been removed: + # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz) + # if flag_3: + # flags |= ZMQ_SNDLABEL + # assert (rc == 0) + + rc = zmq_msg_copy(&side_msg, &msg) + if rc < 0: return rc + if flags: + rc = zmq_msg_send(&side_msg, outsocket_, flags) + if rc < 0: return rc + # only SNDMORE for side-socket + rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE) + if rc < 0: return rc + else: + rc = zmq_msg_send(&side_msg, outsocket_, 0) + if rc < 0: return rc + rc = zmq_msg_send(&msg, sidesocket_, 0) + if rc < 0: return rc + break + return rc + +# the MonitoredQueue C function, adapted from zmq::queue.cpp : +cdef inline int c_monitored_queue (void *insocket_, void *outsocket_, + void *sidesocket_, zmq_msg_t *in_msg_ptr, + zmq_msg_t *out_msg_ptr, int swap_ids) nogil: + """The actual C function for a monitored queue device. + + See ``monitored_queue()`` for details. + """ + + cdef zmq_msg_t msg + cdef int rc = zmq_msg_init (&msg) + cdef zmq_msg_t id_msg + rc = zmq_msg_init (&id_msg) + if rc < 0: return rc + cdef zmq_msg_t side_msg + rc = zmq_msg_init (&side_msg) + if rc < 0: return rc + + cdef zmq_pollitem_t items [2] + items [0].socket = insocket_ + items [0].fd = 0 + items [0].events = ZMQ_POLLIN + items [0].revents = 0 + items [1].socket = outsocket_ + items [1].fd = 0 + items [1].events = ZMQ_POLLIN + items [1].revents = 0 + # I don't think sidesocket should be polled? + # items [2].socket = sidesocket_ + # items [2].fd = 0 + # items [2].events = ZMQ_POLLIN + # items [2].revents = 0 + + while (True): + + # // Wait while there are either requests or replies to process. + rc = zmq_poll (&items [0], 2, -1) + if rc < 0: return rc + # // The algorithm below asumes ratio of request and replies processed + # // under full load to be 1:1. Although processing requests replies + # // first is tempting it is suspectible to DoS attacks (overloading + # // the system with unsolicited replies). + # + # // Process a request. + if (items [0].revents & ZMQ_POLLIN): + # send in_prefix to side socket + rc = zmq_msg_copy(&side_msg, in_msg_ptr) + if rc < 0: return rc + rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE) + if rc < 0: return rc + # relay the rest of the message + rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids) + if rc < 0: return rc + if (items [1].revents & ZMQ_POLLIN): + # send out_prefix to side socket + rc = zmq_msg_copy(&side_msg, out_msg_ptr) + if rc < 0: return rc + rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE) + if rc < 0: return rc + # relay the rest of the message + rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids) + if rc < 0: return rc + return rc diff --git a/src/console/zmq/devices/monitoredqueue.py b/src/console/zmq/devices/monitoredqueue.py new file mode 100755 index 00000000..6d714e51 --- /dev/null +++ b/src/console/zmq/devices/monitoredqueue.py @@ -0,0 +1,7 @@ +def __bootstrap__(): + global __bootstrap__, __loader__, __file__ + import sys, pkg_resources, imp + __file__ = pkg_resources.resource_filename(__name__,'monitoredqueue.so') + __loader__ = None; del __bootstrap__, __loader__ + imp.load_dynamic(__name__,__file__) +__bootstrap__() diff --git a/src/console/zmq/devices/monitoredqueuedevice.py b/src/console/zmq/devices/monitoredqueuedevice.py new file mode 100755 index 00000000..9723f866 --- /dev/null +++ b/src/console/zmq/devices/monitoredqueuedevice.py @@ -0,0 +1,66 @@ +"""MonitoredQueue classes and functions.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + + +from zmq import ZMQError, PUB +from zmq.devices.proxydevice import ProxyBase, Proxy, ThreadProxy, ProcessProxy +from zmq.devices.monitoredqueue import monitored_queue + + +class MonitoredQueueBase(ProxyBase): + """Base class for overriding methods.""" + + _in_prefix = b'' + _out_prefix = b'' + + def __init__(self, in_type, out_type, mon_type=PUB, in_prefix=b'in', out_prefix=b'out'): + + ProxyBase.__init__(self, in_type=in_type, out_type=out_type, mon_type=mon_type) + + self._in_prefix = in_prefix + self._out_prefix = out_prefix + + def run_device(self): + ins,outs,mons = self._setup_sockets() + monitored_queue(ins, outs, mons, self._in_prefix, self._out_prefix) + + +class MonitoredQueue(MonitoredQueueBase, Proxy): + """Class for running monitored_queue in the background. + + See zmq.devices.Device for most of the spec. MonitoredQueue differs from Proxy, + only in that it adds a ``prefix`` to messages sent on the monitor socket, + with a different prefix for each direction. + + MQ also supports ROUTER on both sides, which zmq.proxy does not. + + If a message arrives on `in_sock`, it will be prefixed with `in_prefix` on the monitor socket. + If it arrives on out_sock, it will be prefixed with `out_prefix`. + + A PUB socket is the most logical choice for the mon_socket, but it is not required. + """ + pass + + +class ThreadMonitoredQueue(MonitoredQueueBase, ThreadProxy): + """Run zmq.monitored_queue in a background thread. + + See MonitoredQueue and Proxy for details. + """ + pass + + +class ProcessMonitoredQueue(MonitoredQueueBase, ProcessProxy): + """Run zmq.monitored_queue in a background thread. + + See MonitoredQueue and Proxy for details. + """ + + +__all__ = [ + 'MonitoredQueue', + 'ThreadMonitoredQueue', + 'ProcessMonitoredQueue' +] diff --git a/src/console/zmq/devices/proxydevice.py b/src/console/zmq/devices/proxydevice.py new file mode 100755 index 00000000..68be3f15 --- /dev/null +++ b/src/console/zmq/devices/proxydevice.py @@ -0,0 +1,90 @@ +"""Proxy classes and functions.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +import zmq +from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice + + +class ProxyBase(object): + """Base class for overriding methods.""" + + def __init__(self, in_type, out_type, mon_type=zmq.PUB): + + Device.__init__(self, in_type=in_type, out_type=out_type) + self.mon_type = mon_type + self._mon_binds = [] + self._mon_connects = [] + self._mon_sockopts = [] + + def bind_mon(self, addr): + """Enqueue ZMQ address for binding on mon_socket. + + See zmq.Socket.bind for details. + """ + self._mon_binds.append(addr) + + def connect_mon(self, addr): + """Enqueue ZMQ address for connecting on mon_socket. + + See zmq.Socket.bind for details. + """ + self._mon_connects.append(addr) + + def setsockopt_mon(self, opt, value): + """Enqueue setsockopt(opt, value) for mon_socket + + See zmq.Socket.setsockopt for details. + """ + self._mon_sockopts.append((opt, value)) + + def _setup_sockets(self): + ins,outs = Device._setup_sockets(self) + ctx = self._context + mons = ctx.socket(self.mon_type) + + # set sockopts (must be done first, in case of zmq.IDENTITY) + for opt,value in self._mon_sockopts: + mons.setsockopt(opt, value) + + for iface in self._mon_binds: + mons.bind(iface) + + for iface in self._mon_connects: + mons.connect(iface) + + return ins,outs,mons + + def run_device(self): + ins,outs,mons = self._setup_sockets() + zmq.proxy(ins, outs, mons) + +class Proxy(ProxyBase, Device): + """Threadsafe Proxy object. + + See zmq.devices.Device for most of the spec. This subclass adds a + <method>_mon version of each <method>_{in|out} method, for configuring the + monitor socket. + + A Proxy is a 3-socket ZMQ Device that functions just like a + QUEUE, except each message is also sent out on the monitor socket. + + A PUB socket is the most logical choice for the mon_socket, but it is not required. + """ + pass + +class ThreadProxy(ProxyBase, ThreadDevice): + """Proxy in a Thread. See Proxy for more.""" + pass + +class ProcessProxy(ProxyBase, ProcessDevice): + """Proxy in a Process. See Proxy for more.""" + pass + + +__all__ = [ + 'Proxy', + 'ThreadProxy', + 'ProcessProxy', +] |