1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
import zmq
import gevent
from gevent import select
from zmq import Poller as _original_Poller
class _Poller(_original_Poller):
"""Replacement for :class:`zmq.Poller`
Ensures that the greened Poller below is used in calls to
:meth:`zmq.Poller.poll`.
"""
_gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def poll(self, timeout=-1):
"""Overridden method to ensure that the green version of
Poller is used.
Behaves the same as :meth:`zmq.core.Poller.poll`
"""
if timeout is None:
timeout = -1
if timeout < 0:
timeout = -1
rlist = None
wlist = None
xlist = None
if timeout > 0:
tout = gevent.Timeout.start_new(timeout/1000.0)
try:
# Loop until timeout or events available
rlist, wlist, xlist = self._get_descriptors()
while True:
events = super(_Poller, self).poll(0)
if events or timeout == 0:
return events
# wait for activity on sockets in a green way
# set a minimum poll frequency,
# because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
_bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
try:
select.select(rlist, wlist, xlist)
except gevent.Timeout as t:
if t is not _bug_timeout:
raise
finally:
_bug_timeout.cancel()
except gevent.Timeout as t:
if t is not tout:
raise
return []
finally:
if timeout > 0:
tout.cancel()
|