diff options
Diffstat (limited to 'src/console/zmq/tests/test_poll.py')
-rwxr-xr-x | src/console/zmq/tests/test_poll.py | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/src/console/zmq/tests/test_poll.py b/src/console/zmq/tests/test_poll.py new file mode 100755 index 00000000..57346c89 --- /dev/null +++ b/src/console/zmq/tests/test_poll.py @@ -0,0 +1,229 @@ +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + + +import time +from unittest import TestCase + +import zmq + +from zmq.tests import PollZMQTestCase, have_gevent, GreenTest + +def wait(): + time.sleep(.25) + + +class TestPoll(PollZMQTestCase): + + Poller = zmq.Poller + + # This test is failing due to this issue: + # http://github.com/sustrik/zeromq2/issues#issue/26 + def test_pair(self): + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + + # Sleep to allow sockets to connect. + wait() + + poller = self.Poller() + poller.register(s1, zmq.POLLIN|zmq.POLLOUT) + poller.register(s2, zmq.POLLIN|zmq.POLLOUT) + # Poll result should contain both sockets + socks = dict(poller.poll()) + # Now make sure that both are send ready. + self.assertEqual(socks[s1], zmq.POLLOUT) + self.assertEqual(socks[s2], zmq.POLLOUT) + # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN + s1.send(b'msg1') + s2.send(b'msg2') + wait() + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN) + self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN) + # Make sure that both are in POLLOUT after recv. + s1.recv() + s2.recv() + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLOUT) + self.assertEqual(socks[s2], zmq.POLLOUT) + + poller.unregister(s1) + poller.unregister(s2) + + # Wait for everything to finish. + wait() + + def test_reqrep(self): + s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ) + + # Sleep to allow sockets to connect. + wait() + + poller = self.Poller() + poller.register(s1, zmq.POLLIN|zmq.POLLOUT) + poller.register(s2, zmq.POLLIN|zmq.POLLOUT) + + # Make sure that s1 is in state 0 and s2 is in POLLOUT + socks = dict(poller.poll()) + self.assertEqual(s1 in socks, 0) + self.assertEqual(socks[s2], zmq.POLLOUT) + + # Make sure that s2 goes immediately into state 0 after send. + s2.send(b'msg1') + socks = dict(poller.poll()) + self.assertEqual(s2 in socks, 0) + + # Make sure that s1 goes into POLLIN state after a time.sleep(). + time.sleep(0.5) + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLIN) + + # Make sure that s1 goes into POLLOUT after recv. + s1.recv() + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLOUT) + + # Make sure s1 goes into state 0 after send. + s1.send(b'msg2') + socks = dict(poller.poll()) + self.assertEqual(s1 in socks, 0) + + # Wait and then see that s2 is in POLLIN. + time.sleep(0.5) + socks = dict(poller.poll()) + self.assertEqual(socks[s2], zmq.POLLIN) + + # Make sure that s2 is in POLLOUT after recv. + s2.recv() + socks = dict(poller.poll()) + self.assertEqual(socks[s2], zmq.POLLOUT) + + poller.unregister(s1) + poller.unregister(s2) + + # Wait for everything to finish. + wait() + + def test_no_events(self): + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + poller = self.Poller() + poller.register(s1, zmq.POLLIN|zmq.POLLOUT) + poller.register(s2, 0) + self.assertTrue(s1 in poller) + self.assertFalse(s2 in poller) + poller.register(s1, 0) + self.assertFalse(s1 in poller) + + def test_pubsub(self): + s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) + s2.setsockopt(zmq.SUBSCRIBE, b'') + + # Sleep to allow sockets to connect. + wait() + + poller = self.Poller() + poller.register(s1, zmq.POLLIN|zmq.POLLOUT) + poller.register(s2, zmq.POLLIN) + + # Now make sure that both are send ready. + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLOUT) + self.assertEqual(s2 in socks, 0) + # Make sure that s1 stays in POLLOUT after a send. + s1.send(b'msg1') + socks = dict(poller.poll()) + self.assertEqual(socks[s1], zmq.POLLOUT) + + # Make sure that s2 is POLLIN after waiting. + wait() + socks = dict(poller.poll()) + self.assertEqual(socks[s2], zmq.POLLIN) + + # Make sure that s2 goes into 0 after recv. + s2.recv() + socks = dict(poller.poll()) + self.assertEqual(s2 in socks, 0) + + poller.unregister(s1) + poller.unregister(s2) + + # Wait for everything to finish. + wait() + def test_timeout(self): + """make sure Poller.poll timeout has the right units (milliseconds).""" + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + poller = self.Poller() + poller.register(s1, zmq.POLLIN) + tic = time.time() + evt = poller.poll(.005) + toc = time.time() + self.assertTrue(toc-tic < 0.1) + tic = time.time() + evt = poller.poll(5) + toc = time.time() + self.assertTrue(toc-tic < 0.1) + self.assertTrue(toc-tic > .001) + tic = time.time() + evt = poller.poll(500) + toc = time.time() + self.assertTrue(toc-tic < 1) + self.assertTrue(toc-tic > 0.1) + +class TestSelect(PollZMQTestCase): + + def test_pair(self): + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + + # Sleep to allow sockets to connect. + wait() + + rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2]) + self.assert_(s1 in wlist) + self.assert_(s2 in wlist) + self.assert_(s1 not in rlist) + self.assert_(s2 not in rlist) + + def test_timeout(self): + """make sure select timeout has the right units (seconds).""" + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + tic = time.time() + r,w,x = zmq.select([s1,s2],[],[],.005) + toc = time.time() + self.assertTrue(toc-tic < 1) + self.assertTrue(toc-tic > 0.001) + tic = time.time() + r,w,x = zmq.select([s1,s2],[],[],.25) + toc = time.time() + self.assertTrue(toc-tic < 1) + self.assertTrue(toc-tic > 0.1) + + +if have_gevent: + import gevent + from zmq import green as gzmq + + class TestPollGreen(GreenTest, TestPoll): + Poller = gzmq.Poller + + def test_wakeup(self): + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + poller = self.Poller() + poller.register(s2, zmq.POLLIN) + + tic = time.time() + r = gevent.spawn(lambda: poller.poll(10000)) + s = gevent.spawn(lambda: s1.send(b'msg1')) + r.join() + toc = time.time() + self.assertTrue(toc-tic < 1) + + def test_socket_poll(self): + s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + + tic = time.time() + r = gevent.spawn(lambda: s2.poll(10000)) + s = gevent.spawn(lambda: s1.send(b'msg1')) + r.join() + toc = time.time() + self.assertTrue(toc-tic < 1) + |