summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/tests/test_poll.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/tests/test_poll.py')
-rwxr-xr-xsrc/console/zmq/tests/test_poll.py229
1 files changed, 229 insertions, 0 deletions
diff --git a/src/console/zmq/tests/test_poll.py b/src/console/zmq/tests/test_poll.py
new file mode 100755
index 00000000..57346c89
--- /dev/null
+++ b/src/console/zmq/tests/test_poll.py
@@ -0,0 +1,229 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import time
+from unittest import TestCase
+
+import zmq
+
+from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
+
+def wait():
+ time.sleep(.25)
+
+
+class TestPoll(PollZMQTestCase):
+
+ Poller = zmq.Poller
+
+ # This test is failing due to this issue:
+ # http://github.com/sustrik/zeromq2/issues#issue/26
+ def test_pair(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+ # Poll result should contain both sockets
+ socks = dict(poller.poll())
+ # Now make sure that both are send ready.
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+ # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
+ s1.send(b'msg1')
+ s2.send(b'msg2')
+ wait()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
+ self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
+ # Make sure that both are in POLLOUT after recv.
+ s1.recv()
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+
+ def test_reqrep(self):
+ s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+
+ # Make sure that s1 is in state 0 and s2 is in POLLOUT
+ socks = dict(poller.poll())
+ self.assertEqual(s1 in socks, 0)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ # Make sure that s2 goes immediately into state 0 after send.
+ s2.send(b'msg1')
+ socks = dict(poller.poll())
+ self.assertEqual(s2 in socks, 0)
+
+ # Make sure that s1 goes into POLLIN state after a time.sleep().
+ time.sleep(0.5)
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLIN)
+
+ # Make sure that s1 goes into POLLOUT after recv.
+ s1.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+
+ # Make sure s1 goes into state 0 after send.
+ s1.send(b'msg2')
+ socks = dict(poller.poll())
+ self.assertEqual(s1 in socks, 0)
+
+ # Wait and then see that s2 is in POLLIN.
+ time.sleep(0.5)
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLIN)
+
+ # Make sure that s2 is in POLLOUT after recv.
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+
+ def test_no_events(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, 0)
+ self.assertTrue(s1 in poller)
+ self.assertFalse(s2 in poller)
+ poller.register(s1, 0)
+ self.assertFalse(s1 in poller)
+
+ def test_pubsub(self):
+ s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ s2.setsockopt(zmq.SUBSCRIBE, b'')
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN)
+
+ # Now make sure that both are send ready.
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(s2 in socks, 0)
+ # Make sure that s1 stays in POLLOUT after a send.
+ s1.send(b'msg1')
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+
+ # Make sure that s2 is POLLIN after waiting.
+ wait()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLIN)
+
+ # Make sure that s2 goes into 0 after recv.
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(s2 in socks, 0)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+ def test_timeout(self):
+ """make sure Poller.poll timeout has the right units (milliseconds)."""
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN)
+ tic = time.time()
+ evt = poller.poll(.005)
+ toc = time.time()
+ self.assertTrue(toc-tic < 0.1)
+ tic = time.time()
+ evt = poller.poll(5)
+ toc = time.time()
+ self.assertTrue(toc-tic < 0.1)
+ self.assertTrue(toc-tic > .001)
+ tic = time.time()
+ evt = poller.poll(500)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.1)
+
+class TestSelect(PollZMQTestCase):
+
+ def test_pair(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2])
+ self.assert_(s1 in wlist)
+ self.assert_(s2 in wlist)
+ self.assert_(s1 not in rlist)
+ self.assert_(s2 not in rlist)
+
+ def test_timeout(self):
+ """make sure select timeout has the right units (seconds)."""
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ tic = time.time()
+ r,w,x = zmq.select([s1,s2],[],[],.005)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.001)
+ tic = time.time()
+ r,w,x = zmq.select([s1,s2],[],[],.25)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.1)
+
+
+if have_gevent:
+ import gevent
+ from zmq import green as gzmq
+
+ class TestPollGreen(GreenTest, TestPoll):
+ Poller = gzmq.Poller
+
+ def test_wakeup(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s2, zmq.POLLIN)
+
+ tic = time.time()
+ r = gevent.spawn(lambda: poller.poll(10000))
+ s = gevent.spawn(lambda: s1.send(b'msg1'))
+ r.join()
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+
+ def test_socket_poll(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ tic = time.time()
+ r = gevent.spawn(lambda: s2.poll(10000))
+ s = gevent.spawn(lambda: s1.send(b'msg1'))
+ r.join()
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+