diff options
Diffstat (limited to 'src/console/zmq/tests/test_poll.py')
-rwxr-xr-x | src/console/zmq/tests/test_poll.py | 229 |
1 files changed, 0 insertions, 229 deletions
diff --git a/src/console/zmq/tests/test_poll.py b/src/console/zmq/tests/test_poll.py deleted file mode 100755 index 57346c89..00000000 --- a/src/console/zmq/tests/test_poll.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - - -import time -from unittest import TestCase - -import zmq - -from zmq.tests import PollZMQTestCase, have_gevent, GreenTest - -def wait(): - time.sleep(.25) - - -class TestPoll(PollZMQTestCase): - - Poller = zmq.Poller - - # This test is failing due to this issue: - # http://github.com/sustrik/zeromq2/issues#issue/26 - def test_pair(self): - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - - # Sleep to allow sockets to connect. - wait() - - poller = self.Poller() - poller.register(s1, zmq.POLLIN|zmq.POLLOUT) - poller.register(s2, zmq.POLLIN|zmq.POLLOUT) - # Poll result should contain both sockets - socks = dict(poller.poll()) - # Now make sure that both are send ready. - self.assertEqual(socks[s1], zmq.POLLOUT) - self.assertEqual(socks[s2], zmq.POLLOUT) - # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN - s1.send(b'msg1') - s2.send(b'msg2') - wait() - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN) - self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN) - # Make sure that both are in POLLOUT after recv. - s1.recv() - s2.recv() - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLOUT) - self.assertEqual(socks[s2], zmq.POLLOUT) - - poller.unregister(s1) - poller.unregister(s2) - - # Wait for everything to finish. - wait() - - def test_reqrep(self): - s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ) - - # Sleep to allow sockets to connect. - wait() - - poller = self.Poller() - poller.register(s1, zmq.POLLIN|zmq.POLLOUT) - poller.register(s2, zmq.POLLIN|zmq.POLLOUT) - - # Make sure that s1 is in state 0 and s2 is in POLLOUT - socks = dict(poller.poll()) - self.assertEqual(s1 in socks, 0) - self.assertEqual(socks[s2], zmq.POLLOUT) - - # Make sure that s2 goes immediately into state 0 after send. - s2.send(b'msg1') - socks = dict(poller.poll()) - self.assertEqual(s2 in socks, 0) - - # Make sure that s1 goes into POLLIN state after a time.sleep(). - time.sleep(0.5) - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLIN) - - # Make sure that s1 goes into POLLOUT after recv. - s1.recv() - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLOUT) - - # Make sure s1 goes into state 0 after send. - s1.send(b'msg2') - socks = dict(poller.poll()) - self.assertEqual(s1 in socks, 0) - - # Wait and then see that s2 is in POLLIN. - time.sleep(0.5) - socks = dict(poller.poll()) - self.assertEqual(socks[s2], zmq.POLLIN) - - # Make sure that s2 is in POLLOUT after recv. - s2.recv() - socks = dict(poller.poll()) - self.assertEqual(socks[s2], zmq.POLLOUT) - - poller.unregister(s1) - poller.unregister(s2) - - # Wait for everything to finish. - wait() - - def test_no_events(self): - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - poller = self.Poller() - poller.register(s1, zmq.POLLIN|zmq.POLLOUT) - poller.register(s2, 0) - self.assertTrue(s1 in poller) - self.assertFalse(s2 in poller) - poller.register(s1, 0) - self.assertFalse(s1 in poller) - - def test_pubsub(self): - s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) - s2.setsockopt(zmq.SUBSCRIBE, b'') - - # Sleep to allow sockets to connect. - wait() - - poller = self.Poller() - poller.register(s1, zmq.POLLIN|zmq.POLLOUT) - poller.register(s2, zmq.POLLIN) - - # Now make sure that both are send ready. - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLOUT) - self.assertEqual(s2 in socks, 0) - # Make sure that s1 stays in POLLOUT after a send. - s1.send(b'msg1') - socks = dict(poller.poll()) - self.assertEqual(socks[s1], zmq.POLLOUT) - - # Make sure that s2 is POLLIN after waiting. - wait() - socks = dict(poller.poll()) - self.assertEqual(socks[s2], zmq.POLLIN) - - # Make sure that s2 goes into 0 after recv. - s2.recv() - socks = dict(poller.poll()) - self.assertEqual(s2 in socks, 0) - - poller.unregister(s1) - poller.unregister(s2) - - # Wait for everything to finish. - wait() - def test_timeout(self): - """make sure Poller.poll timeout has the right units (milliseconds).""" - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - poller = self.Poller() - poller.register(s1, zmq.POLLIN) - tic = time.time() - evt = poller.poll(.005) - toc = time.time() - self.assertTrue(toc-tic < 0.1) - tic = time.time() - evt = poller.poll(5) - toc = time.time() - self.assertTrue(toc-tic < 0.1) - self.assertTrue(toc-tic > .001) - tic = time.time() - evt = poller.poll(500) - toc = time.time() - self.assertTrue(toc-tic < 1) - self.assertTrue(toc-tic > 0.1) - -class TestSelect(PollZMQTestCase): - - def test_pair(self): - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - - # Sleep to allow sockets to connect. - wait() - - rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2]) - self.assert_(s1 in wlist) - self.assert_(s2 in wlist) - self.assert_(s1 not in rlist) - self.assert_(s2 not in rlist) - - def test_timeout(self): - """make sure select timeout has the right units (seconds).""" - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - tic = time.time() - r,w,x = zmq.select([s1,s2],[],[],.005) - toc = time.time() - self.assertTrue(toc-tic < 1) - self.assertTrue(toc-tic > 0.001) - tic = time.time() - r,w,x = zmq.select([s1,s2],[],[],.25) - toc = time.time() - self.assertTrue(toc-tic < 1) - self.assertTrue(toc-tic > 0.1) - - -if have_gevent: - import gevent - from zmq import green as gzmq - - class TestPollGreen(GreenTest, TestPoll): - Poller = gzmq.Poller - - def test_wakeup(self): - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - poller = self.Poller() - poller.register(s2, zmq.POLLIN) - - tic = time.time() - r = gevent.spawn(lambda: poller.poll(10000)) - s = gevent.spawn(lambda: s1.send(b'msg1')) - r.join() - toc = time.time() - self.assertTrue(toc-tic < 1) - - def test_socket_poll(self): - s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) - - tic = time.time() - r = gevent.spawn(lambda: s2.poll(10000)) - s = gevent.spawn(lambda: s1.send(b'msg1')) - r.join() - toc = time.time() - self.assertTrue(toc-tic < 1) - |