summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/tests/test_device.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-08-18 10:26:51 +0300
committerimarom <imarom@cisco.com>2015-08-18 10:26:51 +0300
commitcbc645cba025f2098031350fc1323e6ffff33633 (patch)
treedb867b2eb09d5b38d47e28a9876bc21f51f8cc57 /src/console/zmq/tests/test_device.py
parent396b54ea57308890c29c1d9746f0cce32d990cf8 (diff)
new files for Python console
Diffstat (limited to 'src/console/zmq/tests/test_device.py')
-rwxr-xr-xsrc/console/zmq/tests/test_device.py146
1 files changed, 146 insertions, 0 deletions
diff --git a/src/console/zmq/tests/test_device.py b/src/console/zmq/tests/test_device.py
new file mode 100755
index 00000000..f8305074
--- /dev/null
+++ b/src/console/zmq/tests/test_device.py
@@ -0,0 +1,146 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import time
+
+import zmq
+from zmq import devices
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest, PYPY
+from zmq.utils.strtypes import (bytes,unicode,basestring)
+
+if PYPY:
+ # cleanup of shared Context doesn't work on PyPy
+ devices.Device.context_factory = zmq.Context
+
+class TestDevice(BaseZMQTestCase):
+
+ def test_device_types(self):
+ for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
+ dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
+ self.assertEqual(dev.device_type, devtype)
+ del dev
+
+ def test_device_attributes(self):
+ dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
+ self.assertEqual(dev.in_type, zmq.SUB)
+ self.assertEqual(dev.out_type, zmq.PUB)
+ self.assertEqual(dev.device_type, zmq.QUEUE)
+ self.assertEqual(dev.daemon, True)
+ del dev
+
+ def test_tsdevice_attributes(self):
+ dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
+ self.assertEqual(dev.in_type, zmq.SUB)
+ self.assertEqual(dev.out_type, zmq.PUB)
+ self.assertEqual(dev.device_type, zmq.QUEUE)
+ self.assertEqual(dev.daemon, True)
+ del dev
+
+
+ def test_single_socket_forwarder_connect(self):
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ req = self.context.socket(zmq.REQ)
+ port = req.bind_to_random_port('tcp://127.0.0.1')
+ dev.connect_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ req = self.context.socket(zmq.REQ)
+ port = req.bind_to_random_port('tcp://127.0.0.1')
+ dev.connect_out('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello again'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+
+ def test_single_socket_forwarder_bind(self):
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ # select random port:
+ binder = self.context.socket(zmq.REQ)
+ port = binder.bind_to_random_port('tcp://127.0.0.1')
+ binder.close()
+ time.sleep(0.1)
+ req = self.context.socket(zmq.REQ)
+ req.connect('tcp://127.0.0.1:%i'%port)
+ dev.bind_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ # select random port:
+ binder = self.context.socket(zmq.REQ)
+ port = binder.bind_to_random_port('tcp://127.0.0.1')
+ binder.close()
+ time.sleep(0.1)
+ req = self.context.socket(zmq.REQ)
+ req.connect('tcp://127.0.0.1:%i'%port)
+ dev.bind_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello again'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+
+ def test_proxy(self):
+ if zmq.zmq_version_info() < (3,2):
+ raise SkipTest("Proxies only in libzmq >= 3")
+ dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
+ binder = self.context.socket(zmq.REQ)
+ iface = 'tcp://127.0.0.1'
+ port = binder.bind_to_random_port(iface)
+ port2 = binder.bind_to_random_port(iface)
+ port3 = binder.bind_to_random_port(iface)
+ binder.close()
+ time.sleep(0.1)
+ dev.bind_in("%s:%i" % (iface, port))
+ dev.bind_out("%s:%i" % (iface, port2))
+ dev.bind_mon("%s:%i" % (iface, port3))
+ dev.start()
+ time.sleep(0.25)
+ msg = b'hello'
+ push = self.context.socket(zmq.PUSH)
+ push.connect("%s:%i" % (iface, port))
+ pull = self.context.socket(zmq.PULL)
+ pull.connect("%s:%i" % (iface, port2))
+ mon = self.context.socket(zmq.PULL)
+ mon.connect("%s:%i" % (iface, port3))
+ push.send(msg)
+ self.sockets.extend([push, pull, mon])
+ self.assertEqual(msg, self.recv(pull))
+ self.assertEqual(msg, self.recv(mon))
+
+if have_gevent:
+ import gevent
+ import zmq.green
+
+ class TestDeviceGreen(GreenTest, BaseZMQTestCase):
+
+ def test_green_device(self):
+ rep = self.context.socket(zmq.REP)
+ req = self.context.socket(zmq.REQ)
+ self.sockets.extend([req, rep])
+ port = rep.bind_to_random_port('tcp://127.0.0.1')
+ g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
+ req.connect('tcp://127.0.0.1:%i' % port)
+ req.send(b'hi')
+ timeout = gevent.Timeout(3)
+ timeout.start()
+ receiver = gevent.spawn(req.recv)
+ self.assertEqual(receiver.get(2), b'hi')
+ timeout.cancel()
+ g.kill(block=True)
+