1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
"""Proxy classes and functions."""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import zmq
from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice
class ProxyBase(object):
"""Base class for overriding methods."""
def __init__(self, in_type, out_type, mon_type=zmq.PUB):
Device.__init__(self, in_type=in_type, out_type=out_type)
self.mon_type = mon_type
self._mon_binds = []
self._mon_connects = []
self._mon_sockopts = []
def bind_mon(self, addr):
"""Enqueue ZMQ address for binding on mon_socket.
See zmq.Socket.bind for details.
"""
self._mon_binds.append(addr)
def connect_mon(self, addr):
"""Enqueue ZMQ address for connecting on mon_socket.
See zmq.Socket.bind for details.
"""
self._mon_connects.append(addr)
def setsockopt_mon(self, opt, value):
"""Enqueue setsockopt(opt, value) for mon_socket
See zmq.Socket.setsockopt for details.
"""
self._mon_sockopts.append((opt, value))
def _setup_sockets(self):
ins,outs = Device._setup_sockets(self)
ctx = self._context
mons = ctx.socket(self.mon_type)
# set sockopts (must be done first, in case of zmq.IDENTITY)
for opt,value in self._mon_sockopts:
mons.setsockopt(opt, value)
for iface in self._mon_binds:
mons.bind(iface)
for iface in self._mon_connects:
mons.connect(iface)
return ins,outs,mons
def run_device(self):
ins,outs,mons = self._setup_sockets()
zmq.proxy(ins, outs, mons)
class Proxy(ProxyBase, Device):
"""Threadsafe Proxy object.
See zmq.devices.Device for most of the spec. This subclass adds a
<method>_mon version of each <method>_{in|out} method, for configuring the
monitor socket.
A Proxy is a 3-socket ZMQ Device that functions just like a
QUEUE, except each message is also sent out on the monitor socket.
A PUB socket is the most logical choice for the mon_socket, but it is not required.
"""
pass
class ThreadProxy(ProxyBase, ThreadDevice):
"""Proxy in a Thread. See Proxy for more."""
pass
class ProcessProxy(ProxyBase, ProcessDevice):
"""Proxy in a Process. See Proxy for more."""
pass
__all__ = [
'Proxy',
'ThreadProxy',
'ProcessProxy',
]
|