summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2016-01-04 23:31:31 +0200
committerDan Klein <danklein10@gmail.com>2016-01-04 23:31:31 +0200
commit629b54c4c9df9c718d818a004ecf15c2cf6c770a (patch)
tree7dfc3c64c7561032d690ce6188130e80d344054e /scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py
parent3757099103ed1bf56f85ccf5bb861a331287cbbb (diff)
parent857bdcf05a920b99e1cf180c700176b04801da00 (diff)
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py')
-rw-r--r--scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py b/scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py
new file mode 100644
index 00000000..c6d91429
--- /dev/null
+++ b/scripts/external_libs/platform/cel59/zmq/devices/monitoredqueue.py
@@ -0,0 +1,37 @@
+"""pure Python monitored_queue function
+
+For use when Cython extension is unavailable (PyPy).
+
+Authors
+-------
+* MinRK
+"""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import zmq
+
+def _relay(ins, outs, sides, prefix, swap_ids):
+ msg = ins.recv_multipart()
+ if swap_ids:
+ msg[:2] = msg[:2][::-1]
+ outs.send_multipart(msg)
+ sides.send_multipart([prefix] + msg)
+
+def monitored_queue(in_socket, out_socket, mon_socket,
+ in_prefix=b'in', out_prefix=b'out'):
+
+ swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
+
+ poller = zmq.Poller()
+ poller.register(in_socket, zmq.POLLIN)
+ poller.register(out_socket, zmq.POLLIN)
+ while True:
+ events = dict(poller.poll())
+ if in_socket in events:
+ _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
+ if out_socket in events:
+ _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
+
+__all__ = ['monitored_queue']