summaryrefslogtreecommitdiffstats
path: root/external_libs/python/zmq/tests/test_monqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/zmq/tests/test_monqueue.py')
-rw-r--r--external_libs/python/zmq/tests/test_monqueue.py227
1 files changed, 0 insertions, 227 deletions
diff --git a/external_libs/python/zmq/tests/test_monqueue.py b/external_libs/python/zmq/tests/test_monqueue.py
deleted file mode 100644
index e855602e..00000000
--- a/external_libs/python/zmq/tests/test_monqueue.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-from unittest import TestCase
-
-import zmq
-from zmq import devices
-
-from zmq.tests import BaseZMQTestCase, SkipTest, PYPY
-from zmq.utils.strtypes import unicode
-
-
-if PYPY or zmq.zmq_version_info() >= (4,1):
- # cleanup of shared Context doesn't work on PyPy
- # there also seems to be a bug in cleanup in libzmq-4.1 (zeromq/libzmq#1052)
- devices.Device.context_factory = zmq.Context
-
-
-class TestMonitoredQueue(BaseZMQTestCase):
-
- sockets = []
-
- def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
- self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
- in_prefix, out_prefix)
- alice = self.context.socket(zmq.PAIR)
- bob = self.context.socket(zmq.PAIR)
- mon = self.context.socket(zmq.SUB)
-
- aport = alice.bind_to_random_port('tcp://127.0.0.1')
- bport = bob.bind_to_random_port('tcp://127.0.0.1')
- mport = mon.bind_to_random_port('tcp://127.0.0.1')
- mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
-
- self.device.connect_in("tcp://127.0.0.1:%i"%aport)
- self.device.connect_out("tcp://127.0.0.1:%i"%bport)
- self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
- self.device.start()
- time.sleep(.2)
- try:
- # this is currenlty necessary to ensure no dropped monitor messages
- # see LIBZMQ-248 for more info
- mon.recv_multipart(zmq.NOBLOCK)
- except zmq.ZMQError:
- pass
- self.sockets.extend([alice, bob, mon])
- return alice, bob, mon
-
-
- def teardown_device(self):
- for socket in self.sockets:
- socket.close()
- del socket
- del self.device
-
- def test_reply(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- self.teardown_device()
-
- def test_queue(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- self.teardown_device()
-
- def test_monitor(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+bobs, mons)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+alices2, mons)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+alices3, mons)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'out']+bobs, mons)
- self.teardown_device()
-
- def test_prefix(self):
- alice, bob, mon = self.build_device(b"", b'foo', b'bar')
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+bobs, mons)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+alices2, mons)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+alices3, mons)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'bar']+bobs, mons)
- self.teardown_device()
-
- def test_monitor_subscribe(self):
- alice, bob, mon = self.build_device(b"out")
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'out']+bobs, mons)
- self.teardown_device()
-
- def test_router_router(self):
- """test router-router MQ devices"""
- dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
- self.device = dev
- dev.setsockopt_in(zmq.LINGER, 0)
- dev.setsockopt_out(zmq.LINGER, 0)
- dev.setsockopt_mon(zmq.LINGER, 0)
-
- binder = self.context.socket(zmq.DEALER)
- porta = binder.bind_to_random_port('tcp://127.0.0.1')
- portb = binder.bind_to_random_port('tcp://127.0.0.1')
- binder.close()
- time.sleep(0.1)
- a = self.context.socket(zmq.DEALER)
- a.identity = b'a'
- b = self.context.socket(zmq.DEALER)
- b.identity = b'b'
- self.sockets.extend([a, b])
-
- a.connect('tcp://127.0.0.1:%i'%porta)
- dev.bind_in('tcp://127.0.0.1:%i'%porta)
- b.connect('tcp://127.0.0.1:%i'%portb)
- dev.bind_out('tcp://127.0.0.1:%i'%portb)
- dev.start()
- time.sleep(0.2)
- if zmq.zmq_version_info() >= (3,1,0):
- # flush erroneous poll state, due to LIBZMQ-280
- ping_msg = [ b'ping', b'pong' ]
- for s in (a,b):
- s.send_multipart(ping_msg)
- try:
- s.recv(zmq.NOBLOCK)
- except zmq.ZMQError:
- pass
- msg = [ b'hello', b'there' ]
- a.send_multipart([b'b']+msg)
- bmsg = self.recv_multipart(b)
- self.assertEqual(bmsg, [b'a']+msg)
- b.send_multipart(bmsg)
- amsg = self.recv_multipart(a)
- self.assertEqual(amsg, [b'b']+msg)
- self.teardown_device()
-
- def test_default_mq_args(self):
- self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
- dev.setsockopt_in(zmq.LINGER, 0)
- dev.setsockopt_out(zmq.LINGER, 0)
- dev.setsockopt_mon(zmq.LINGER, 0)
- # this will raise if default args are wrong
- dev.start()
- self.teardown_device()
-
- def test_mq_check_prefix(self):
- ins = self.context.socket(zmq.ROUTER)
- outs = self.context.socket(zmq.DEALER)
- mons = self.context.socket(zmq.PUB)
- self.sockets.extend([ins, outs, mons])
-
- ins = unicode('in')
- outs = unicode('out')
- self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)