summaryrefslogtreecommitdiffstats
path: root/external_libs/python/zmq/tests
diff options
context:
space:
mode:
authorDan Klein <danklei@cisco.com>2015-08-24 10:51:13 +0300
committerDan Klein <danklei@cisco.com>2015-08-24 10:51:13 +0300
commitd3f26ece7d4383df0b22fe9c3cb3e695381ec737 (patch)
treeba42ddb547d363e92b1846df8a8712433981ddac /external_libs/python/zmq/tests
parent651a7d779551e193bd9dbadbe8b2a02bdab231b4 (diff)
Initial push to external_lib migration
Diffstat (limited to 'external_libs/python/zmq/tests')
-rw-r--r--external_libs/python/zmq/tests/__init__.py211
-rw-r--r--external_libs/python/zmq/tests/test_auth.py431
-rw-r--r--external_libs/python/zmq/tests/test_cffi_backend.py310
-rw-r--r--external_libs/python/zmq/tests/test_constants.py104
-rw-r--r--external_libs/python/zmq/tests/test_context.py257
-rw-r--r--external_libs/python/zmq/tests/test_device.py146
-rw-r--r--external_libs/python/zmq/tests/test_error.py43
-rw-r--r--external_libs/python/zmq/tests/test_etc.py15
-rw-r--r--external_libs/python/zmq/tests/test_imports.py62
-rw-r--r--external_libs/python/zmq/tests/test_ioloop.py113
-rw-r--r--external_libs/python/zmq/tests/test_log.py116
-rw-r--r--external_libs/python/zmq/tests/test_message.py362
-rw-r--r--external_libs/python/zmq/tests/test_monitor.py71
-rw-r--r--external_libs/python/zmq/tests/test_monqueue.py227
-rw-r--r--external_libs/python/zmq/tests/test_multipart.py35
-rw-r--r--external_libs/python/zmq/tests/test_pair.py53
-rw-r--r--external_libs/python/zmq/tests/test_poll.py229
-rw-r--r--external_libs/python/zmq/tests/test_pubsub.py41
-rw-r--r--external_libs/python/zmq/tests/test_reqrep.py62
-rw-r--r--external_libs/python/zmq/tests/test_security.py212
-rw-r--r--external_libs/python/zmq/tests/test_socket.py450
-rw-r--r--external_libs/python/zmq/tests/test_stopwatch.py42
-rw-r--r--external_libs/python/zmq/tests/test_version.py44
-rw-r--r--external_libs/python/zmq/tests/test_win32_shim.py56
-rw-r--r--external_libs/python/zmq/tests/test_z85.py63
-rw-r--r--external_libs/python/zmq/tests/test_zmqstream.py34
26 files changed, 3789 insertions, 0 deletions
diff --git a/external_libs/python/zmq/tests/__init__.py b/external_libs/python/zmq/tests/__init__.py
new file mode 100644
index 00000000..325a3f19
--- /dev/null
+++ b/external_libs/python/zmq/tests/__init__.py
@@ -0,0 +1,211 @@
+# Copyright (c) PyZMQ Developers.
+# Distributed under the terms of the Modified BSD License.
+
+import functools
+import sys
+import time
+from threading import Thread
+
+from unittest import TestCase
+
+import zmq
+from zmq.utils import jsonapi
+
+try:
+ import gevent
+ from zmq import green as gzmq
+ have_gevent = True
+except ImportError:
+ have_gevent = False
+
+try:
+ from unittest import SkipTest
+except ImportError:
+ try:
+ from nose import SkipTest
+ except ImportError:
+ class SkipTest(Exception):
+ pass
+
+PYPY = 'PyPy' in sys.version
+
+#-----------------------------------------------------------------------------
+# skip decorators (directly from unittest)
+#-----------------------------------------------------------------------------
+
+_id = lambda x: x
+
+def skip(reason):
+ """
+ Unconditionally skip a test.
+ """
+ def decorator(test_item):
+ if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
+ @functools.wraps(test_item)
+ def skip_wrapper(*args, **kwargs):
+ raise SkipTest(reason)
+ test_item = skip_wrapper
+
+ test_item.__unittest_skip__ = True
+ test_item.__unittest_skip_why__ = reason
+ return test_item
+ return decorator
+
+def skip_if(condition, reason="Skipped"):
+ """
+ Skip a test if the condition is true.
+ """
+ if condition:
+ return skip(reason)
+ return _id
+
+skip_pypy = skip_if(PYPY, "Doesn't work on PyPy")
+
+#-----------------------------------------------------------------------------
+# Base test class
+#-----------------------------------------------------------------------------
+
+class BaseZMQTestCase(TestCase):
+ green = False
+
+ @property
+ def Context(self):
+ if self.green:
+ return gzmq.Context
+ else:
+ return zmq.Context
+
+ def socket(self, socket_type):
+ s = self.context.socket(socket_type)
+ self.sockets.append(s)
+ return s
+
+ def setUp(self):
+ if self.green and not have_gevent:
+ raise SkipTest("requires gevent")
+ self.context = self.Context.instance()
+ self.sockets = []
+
+ def tearDown(self):
+ contexts = set([self.context])
+ while self.sockets:
+ sock = self.sockets.pop()
+ contexts.add(sock.context) # in case additional contexts are created
+ sock.close(0)
+ for ctx in contexts:
+ t = Thread(target=ctx.term)
+ t.daemon = True
+ t.start()
+ t.join(timeout=2)
+ if t.is_alive():
+ # reset Context.instance, so the failure to term doesn't corrupt subsequent tests
+ zmq.sugar.context.Context._instance = None
+ raise RuntimeError("context could not terminate, open sockets likely remain in test")
+
+ def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
+ """Create a bound socket pair using a random port."""
+ s1 = self.context.socket(type1)
+ s1.setsockopt(zmq.LINGER, 0)
+ port = s1.bind_to_random_port(interface)
+ s2 = self.context.socket(type2)
+ s2.setsockopt(zmq.LINGER, 0)
+ s2.connect('%s:%s' % (interface, port))
+ self.sockets.extend([s1,s2])
+ return s1, s2
+
+ def ping_pong(self, s1, s2, msg):
+ s1.send(msg)
+ msg2 = s2.recv()
+ s2.send(msg2)
+ msg3 = s1.recv()
+ return msg3
+
+ def ping_pong_json(self, s1, s2, o):
+ if jsonapi.jsonmod is None:
+ raise SkipTest("No json library")
+ s1.send_json(o)
+ o2 = s2.recv_json()
+ s2.send_json(o2)
+ o3 = s1.recv_json()
+ return o3
+
+ def ping_pong_pyobj(self, s1, s2, o):
+ s1.send_pyobj(o)
+ o2 = s2.recv_pyobj()
+ s2.send_pyobj(o2)
+ o3 = s1.recv_pyobj()
+ return o3
+
+ def assertRaisesErrno(self, errno, func, *args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except zmq.ZMQError as e:
+ self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
+got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
+ else:
+ self.fail("Function did not raise any error")
+
+ def _select_recv(self, multipart, socket, **kwargs):
+ """call recv[_multipart] in a way that raises if there is nothing to receive"""
+ if zmq.zmq_version_info() >= (3,1,0):
+ # zmq 3.1 has a bug, where poll can return false positives,
+ # so we wait a little bit just in case
+ # See LIBZMQ-280 on JIRA
+ time.sleep(0.1)
+
+ r,w,x = zmq.select([socket], [], [], timeout=5)
+ assert len(r) > 0, "Should have received a message"
+ kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
+
+ recv = socket.recv_multipart if multipart else socket.recv
+ return recv(**kwargs)
+
+ def recv(self, socket, **kwargs):
+ """call recv in a way that raises if there is nothing to receive"""
+ return self._select_recv(False, socket, **kwargs)
+
+ def recv_multipart(self, socket, **kwargs):
+ """call recv_multipart in a way that raises if there is nothing to receive"""
+ return self._select_recv(True, socket, **kwargs)
+
+
+class PollZMQTestCase(BaseZMQTestCase):
+ pass
+
+class GreenTest:
+ """Mixin for making green versions of test classes"""
+ green = True
+
+ def assertRaisesErrno(self, errno, func, *args, **kwargs):
+ if errno == zmq.EAGAIN:
+ raise SkipTest("Skipping because we're green.")
+ try:
+ func(*args, **kwargs)
+ except zmq.ZMQError:
+ e = sys.exc_info()[1]
+ self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
+got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
+ else:
+ self.fail("Function did not raise any error")
+
+ def tearDown(self):
+ contexts = set([self.context])
+ while self.sockets:
+ sock = self.sockets.pop()
+ contexts.add(sock.context) # in case additional contexts are created
+ sock.close()
+ try:
+ gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
+ except gevent.Timeout:
+ raise RuntimeError("context could not terminate, open sockets likely remain in test")
+
+ def skip_green(self):
+ raise SkipTest("Skipping because we are green")
+
+def skip_green(f):
+ def skipping_test(self, *args, **kwargs):
+ if self.green:
+ raise SkipTest("Skipping because we are green")
+ else:
+ return f(self, *args, **kwargs)
+ return skipping_test
diff --git a/external_libs/python/zmq/tests/test_auth.py b/external_libs/python/zmq/tests/test_auth.py
new file mode 100644
index 00000000..d350f61f
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_auth.py
@@ -0,0 +1,431 @@
+# -*- coding: utf8 -*-
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import logging
+import os
+import shutil
+import sys
+import tempfile
+
+import zmq.auth
+from zmq.auth.ioloop import IOLoopAuthenticator
+from zmq.auth.thread import ThreadAuthenticator
+
+from zmq.eventloop import ioloop, zmqstream
+from zmq.tests import (BaseZMQTestCase, SkipTest)
+
+class BaseAuthTestCase(BaseZMQTestCase):
+ def setUp(self):
+ if zmq.zmq_version_info() < (4,0):
+ raise SkipTest("security is new in libzmq 4.0")
+ try:
+ zmq.curve_keypair()
+ except zmq.ZMQError:
+ raise SkipTest("security requires libzmq to be linked against libsodium")
+ super(BaseAuthTestCase, self).setUp()
+ # enable debug logging while we run tests
+ logging.getLogger('zmq.auth').setLevel(logging.DEBUG)
+ self.auth = self.make_auth()
+ self.auth.start()
+ self.base_dir, self.public_keys_dir, self.secret_keys_dir = self.create_certs()
+
+ def make_auth(self):
+ raise NotImplementedError()
+
+ def tearDown(self):
+ if self.auth:
+ self.auth.stop()
+ self.auth = None
+ self.remove_certs(self.base_dir)
+ super(BaseAuthTestCase, self).tearDown()
+
+ def create_certs(self):
+ """Create CURVE certificates for a test"""
+
+ # Create temporary CURVE keypairs for this test run. We create all keys in a
+ # temp directory and then move them into the appropriate private or public
+ # directory.
+
+ base_dir = tempfile.mkdtemp()
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ os.mkdir(keys_dir)
+ os.mkdir(public_keys_dir)
+ os.mkdir(secret_keys_dir)
+
+ server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
+ client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
+
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(public_keys_dir, '.'))
+
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key_secret"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(secret_keys_dir, '.'))
+
+ return (base_dir, public_keys_dir, secret_keys_dir)
+
+ def remove_certs(self, base_dir):
+ """Remove certificates for a test"""
+ shutil.rmtree(base_dir)
+
+ def load_certs(self, secret_keys_dir):
+ """Return server and client certificate keys"""
+ server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
+ client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
+
+ server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
+ client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+
+ return server_public, server_secret, client_public, client_secret
+
+
+class TestThreadAuthentication(BaseAuthTestCase):
+ """Test authentication running in a thread"""
+
+ def make_auth(self):
+ return ThreadAuthenticator(self.context)
+
+ def can_connect(self, server, client):
+ """Check if client can connect to server using tcp transport"""
+ result = False
+ iface = 'tcp://127.0.0.1'
+ port = server.bind_to_random_port(iface)
+ client.connect("%s:%i" % (iface, port))
+ msg = [b"Hello World"]
+ server.send_multipart(msg)
+ if client.poll(1000):
+ rcvd_msg = client.recv_multipart()
+ self.assertEqual(rcvd_msg, msg)
+ result = True
+ return result
+
+ def test_null(self):
+ """threaded auth - NULL"""
+ # A default NULL connection should always succeed, and not
+ # go through our authentication infrastructure at all.
+ self.auth.stop()
+ self.auth = None
+
+ server = self.socket(zmq.PUSH)
+ client = self.socket(zmq.PULL)
+ self.assertTrue(self.can_connect(server, client))
+
+ # By setting a domain we switch on authentication for NULL sockets,
+ # though no policies are configured yet. The client connection
+ # should still be allowed.
+ server = self.socket(zmq.PUSH)
+ server.zap_domain = b'global'
+ client = self.socket(zmq.PULL)
+ self.assertTrue(self.can_connect(server, client))
+
+ def test_blacklist(self):
+ """threaded auth - Blacklist"""
+ # Blacklist 127.0.0.1, connection should fail
+ self.auth.deny('127.0.0.1')
+ server = self.socket(zmq.PUSH)
+ # By setting a domain we switch on authentication for NULL sockets,
+ # though no policies are configured yet.
+ server.zap_domain = b'global'
+ client = self.socket(zmq.PULL)
+ self.assertFalse(self.can_connect(server, client))
+
+ def test_whitelist(self):
+ """threaded auth - Whitelist"""
+ # Whitelist 127.0.0.1, connection should pass"
+ self.auth.allow('127.0.0.1')
+ server = self.socket(zmq.PUSH)
+ # By setting a domain we switch on authentication for NULL sockets,
+ # though no policies are configured yet.
+ server.zap_domain = b'global'
+ client = self.socket(zmq.PULL)
+ self.assertTrue(self.can_connect(server, client))
+
+ def test_plain(self):
+ """threaded auth - PLAIN"""
+
+ # Try PLAIN authentication - without configuring server, connection should fail
+ server = self.socket(zmq.PUSH)
+ server.plain_server = True
+ client = self.socket(zmq.PULL)
+ client.plain_username = b'admin'
+ client.plain_password = b'Password'
+ self.assertFalse(self.can_connect(server, client))
+
+ # Try PLAIN authentication - with server configured, connection should pass
+ server = self.socket(zmq.PUSH)
+ server.plain_server = True
+ client = self.socket(zmq.PULL)
+ client.plain_username = b'admin'
+ client.plain_password = b'Password'
+ self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
+ self.assertTrue(self.can_connect(server, client))
+
+ # Try PLAIN authentication - with bogus credentials, connection should fail
+ server = self.socket(zmq.PUSH)
+ server.plain_server = True
+ client = self.socket(zmq.PULL)
+ client.plain_username = b'admin'
+ client.plain_password = b'Bogus'
+ self.assertFalse(self.can_connect(server, client))
+
+ # Remove authenticator and check that a normal connection works
+ self.auth.stop()
+ self.auth = None
+
+ server = self.socket(zmq.PUSH)
+ client = self.socket(zmq.PULL)
+ self.assertTrue(self.can_connect(server, client))
+ client.close()
+ server.close()
+
+ def test_curve(self):
+ """threaded auth - CURVE"""
+ self.auth.allow('127.0.0.1')
+ certs = self.load_certs(self.secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ #Try CURVE authentication - without configuring server, connection should fail
+ server = self.socket(zmq.PUSH)
+ server.curve_publickey = server_public
+ server.curve_secretkey = server_secret
+ server.curve_server = True
+ client = self.socket(zmq.PULL)
+ client.curve_publickey = client_public
+ client.curve_secretkey = client_secret
+ client.curve_serverkey = server_public
+ self.assertFalse(self.can_connect(server, client))
+
+ #Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass
+ self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+ server = self.socket(zmq.PUSH)
+ server.curve_publickey = server_public
+ server.curve_secretkey = server_secret
+ server.curve_server = True
+ client = self.socket(zmq.PULL)
+ client.curve_publickey = client_public
+ client.curve_secretkey = client_secret
+ client.curve_serverkey = server_public
+ self.assertTrue(self.can_connect(server, client))
+
+ # Try CURVE authentication - with server configured, connection should pass
+ self.auth.configure_curve(domain='*', location=self.public_keys_dir)
+ server = self.socket(zmq.PUSH)
+ server.curve_publickey = server_public
+ server.curve_secretkey = server_secret
+ server.curve_server = True
+ client = self.socket(zmq.PULL)
+ client.curve_publickey = client_public
+ client.curve_secretkey = client_secret
+ client.curve_serverkey = server_public
+ self.assertTrue(self.can_connect(server, client))
+
+ # Remove authenticator and check that a normal connection works
+ self.auth.stop()
+ self.auth = None
+
+ # Try connecting using NULL and no authentication enabled, connection should pass
+ server = self.socket(zmq.PUSH)
+ client = self.socket(zmq.PULL)
+ self.assertTrue(self.can_connect(server, client))
+
+
+def with_ioloop(method, expect_success=True):
+ """decorator for running tests with an IOLoop"""
+ def test_method(self):
+ r = method(self)
+
+ loop = self.io_loop
+ if expect_success:
+ self.pullstream.on_recv(self.on_message_succeed)
+ else:
+ self.pullstream.on_recv(self.on_message_fail)
+
+ t = loop.time()
+ loop.add_callback(self.attempt_connection)
+ loop.add_callback(self.send_msg)
+ if expect_success:
+ loop.add_timeout(t + 1, self.on_test_timeout_fail)
+ else:
+ loop.add_timeout(t + 1, self.on_test_timeout_succeed)
+
+ loop.start()
+ if self.fail_msg:
+ self.fail(self.fail_msg)
+
+ return r
+ return test_method
+
+def should_auth(method):
+ return with_ioloop(method, True)
+
+def should_not_auth(method):
+ return with_ioloop(method, False)
+
+class TestIOLoopAuthentication(BaseAuthTestCase):
+ """Test authentication running in ioloop"""
+
+ def setUp(self):
+ self.fail_msg = None
+ self.io_loop = ioloop.IOLoop()
+ super(TestIOLoopAuthentication, self).setUp()
+ self.server = self.socket(zmq.PUSH)
+ self.client = self.socket(zmq.PULL)
+ self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop)
+ self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop)
+
+ def make_auth(self):
+ return IOLoopAuthenticator(self.context, io_loop=self.io_loop)
+
+ def tearDown(self):
+ if self.auth:
+ self.auth.stop()
+ self.auth = None
+ self.io_loop.close(all_fds=True)
+ super(TestIOLoopAuthentication, self).tearDown()
+
+ def attempt_connection(self):
+ """Check if client can connect to server using tcp transport"""
+ iface = 'tcp://127.0.0.1'
+ port = self.server.bind_to_random_port(iface)
+ self.client.connect("%s:%i" % (iface, port))
+
+ def send_msg(self):
+ """Send a message from server to a client"""
+ msg = [b"Hello World"]
+ self.pushstream.send_multipart(msg)
+
+ def on_message_succeed(self, frames):
+ """A message was received, as expected."""
+ if frames != [b"Hello World"]:
+ self.fail_msg = "Unexpected message received"
+ self.io_loop.stop()
+
+ def on_message_fail(self, frames):
+ """A message was received, unexpectedly."""
+ self.fail_msg = 'Received messaged unexpectedly, security failed'
+ self.io_loop.stop()
+
+ def on_test_timeout_succeed(self):
+ """Test timer expired, indicates test success"""
+ self.io_loop.stop()
+
+ def on_test_timeout_fail(self):
+ """Test timer expired, indicates test failure"""
+ self.fail_msg = 'Test timed out'
+ self.io_loop.stop()
+
+ @should_auth
+ def test_none(self):
+ """ioloop auth - NONE"""
+ # A default NULL connection should always succeed, and not
+ # go through our authentication infrastructure at all.
+ # no auth should be running
+ self.auth.stop()
+ self.auth = None
+
+ @should_auth
+ def test_null(self):
+ """ioloop auth - NULL"""
+ # By setting a domain we switch on authentication for NULL sockets,
+ # though no policies are configured yet. The client connection
+ # should still be allowed.
+ self.server.zap_domain = b'global'
+
+ @should_not_auth
+ def test_blacklist(self):
+ """ioloop auth - Blacklist"""
+ # Blacklist 127.0.0.1, connection should fail
+ self.auth.deny('127.0.0.1')
+ self.server.zap_domain = b'global'
+
+ @should_auth
+ def test_whitelist(self):
+ """ioloop auth - Whitelist"""
+ # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
+ self.auth.allow('127.0.0.1')
+
+ self.server.setsockopt(zmq.ZAP_DOMAIN, b'global')
+
+ @should_not_auth
+ def test_plain_unconfigured_server(self):
+ """ioloop auth - PLAIN, unconfigured server"""
+ self.client.plain_username = b'admin'
+ self.client.plain_password = b'Password'
+ # Try PLAIN authentication - without configuring server, connection should fail
+ self.server.plain_server = True
+
+ @should_auth
+ def test_plain_configured_server(self):
+ """ioloop auth - PLAIN, configured server"""
+ self.client.plain_username = b'admin'
+ self.client.plain_password = b'Password'
+ # Try PLAIN authentication - with server configured, connection should pass
+ self.server.plain_server = True
+ self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
+
+ @should_not_auth
+ def test_plain_bogus_credentials(self):
+ """ioloop auth - PLAIN, bogus credentials"""
+ self.client.plain_username = b'admin'
+ self.client.plain_password = b'Bogus'
+ self.server.plain_server = True
+
+ self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
+
+ @should_not_auth
+ def test_curve_unconfigured_server(self):
+ """ioloop auth - CURVE, unconfigured server"""
+ certs = self.load_certs(self.secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ self.auth.allow('127.0.0.1')
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
+
+ @should_auth
+ def test_curve_allow_any(self):
+ """ioloop auth - CURVE, CURVE_ALLOW_ANY"""
+ certs = self.load_certs(self.secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ self.auth.allow('127.0.0.1')
+ self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
+
+ @should_auth
+ def test_curve_configured_server(self):
+ """ioloop auth - CURVE, configured server"""
+ self.auth.allow('127.0.0.1')
+ certs = self.load_certs(self.secret_keys_dir)
+ server_public, server_secret, client_public, client_secret = certs
+
+ self.auth.configure_curve(domain='*', location=self.public_keys_dir)
+
+ self.server.curve_publickey = server_public
+ self.server.curve_secretkey = server_secret
+ self.server.curve_server = True
+
+ self.client.curve_publickey = client_public
+ self.client.curve_secretkey = client_secret
+ self.client.curve_serverkey = server_public
diff --git a/external_libs/python/zmq/tests/test_cffi_backend.py b/external_libs/python/zmq/tests/test_cffi_backend.py
new file mode 100644
index 00000000..1f85eebf
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_cffi_backend.py
@@ -0,0 +1,310 @@
+# -*- coding: utf8 -*-
+
+import sys
+import time
+
+from unittest import TestCase
+
+from zmq.tests import BaseZMQTestCase, SkipTest
+
+try:
+ from zmq.backend.cffi import (
+ zmq_version_info,
+ PUSH, PULL, IDENTITY,
+ REQ, REP, POLLIN, POLLOUT,
+ )
+ from zmq.backend.cffi._cffi import ffi, C
+ have_ffi_backend = True
+except ImportError:
+ have_ffi_backend = False
+
+
+class TestCFFIBackend(TestCase):
+
+ def setUp(self):
+ if not have_ffi_backend or not 'PyPy' in sys.version:
+ raise SkipTest('PyPy Tests Only')
+
+ def test_zmq_version_info(self):
+ version = zmq_version_info()
+
+ assert version[0] in range(2,11)
+
+ def test_zmq_ctx_new_destroy(self):
+ ctx = C.zmq_ctx_new()
+
+ assert ctx != ffi.NULL
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_socket_open_close(self):
+ ctx = C.zmq_ctx_new()
+ socket = C.zmq_socket(ctx, PUSH)
+
+ assert ctx != ffi.NULL
+ assert ffi.NULL != socket
+ assert 0 == C.zmq_close(socket)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_setsockopt(self):
+ ctx = C.zmq_ctx_new()
+ socket = C.zmq_socket(ctx, PUSH)
+
+ identity = ffi.new('char[3]', 'zmq')
+ ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
+
+ assert ret == 0
+ assert ctx != ffi.NULL
+ assert ffi.NULL != socket
+ assert 0 == C.zmq_close(socket)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_getsockopt(self):
+ ctx = C.zmq_ctx_new()
+ socket = C.zmq_socket(ctx, PUSH)
+
+ identity = ffi.new('char[]', 'zmq')
+ ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
+ assert ret == 0
+
+ option_len = ffi.new('size_t*', 3)
+ option = ffi.new('char*')
+ ret = C.zmq_getsockopt(socket,
+ IDENTITY,
+ ffi.cast('void*', option),
+ option_len)
+
+ assert ret == 0
+ assert ffi.string(ffi.cast('char*', option))[0] == "z"
+ assert ffi.string(ffi.cast('char*', option))[1] == "m"
+ assert ffi.string(ffi.cast('char*', option))[2] == "q"
+ assert ctx != ffi.NULL
+ assert ffi.NULL != socket
+ assert 0 == C.zmq_close(socket)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_bind(self):
+ ctx = C.zmq_ctx_new()
+ socket = C.zmq_socket(ctx, 8)
+
+ assert 0 == C.zmq_bind(socket, 'tcp://*:4444')
+ assert ctx != ffi.NULL
+ assert ffi.NULL != socket
+ assert 0 == C.zmq_close(socket)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_bind_connect(self):
+ ctx = C.zmq_ctx_new()
+
+ socket1 = C.zmq_socket(ctx, PUSH)
+ socket2 = C.zmq_socket(ctx, PULL)
+
+ assert 0 == C.zmq_bind(socket1, 'tcp://*:4444')
+ assert 0 == C.zmq_connect(socket2, 'tcp://127.0.0.1:4444')
+ assert ctx != ffi.NULL
+ assert ffi.NULL != socket1
+ assert ffi.NULL != socket2
+ assert 0 == C.zmq_close(socket1)
+ assert 0 == C.zmq_close(socket2)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+
+ def test_zmq_msg_init_close(self):
+ zmq_msg = ffi.new('zmq_msg_t*')
+
+ assert ffi.NULL != zmq_msg
+ assert 0 == C.zmq_msg_init(zmq_msg)
+ assert 0 == C.zmq_msg_close(zmq_msg)
+
+ def test_zmq_msg_init_size(self):
+ zmq_msg = ffi.new('zmq_msg_t*')
+
+ assert ffi.NULL != zmq_msg
+ assert 0 == C.zmq_msg_init_size(zmq_msg, 10)
+ assert 0 == C.zmq_msg_close(zmq_msg)
+
+ def test_zmq_msg_init_data(self):
+ zmq_msg = ffi.new('zmq_msg_t*')
+ message = ffi.new('char[5]', 'Hello')
+
+ assert 0 == C.zmq_msg_init_data(zmq_msg,
+ ffi.cast('void*', message),
+ 5,
+ ffi.NULL,
+ ffi.NULL)
+
+ assert ffi.NULL != zmq_msg
+ assert 0 == C.zmq_msg_close(zmq_msg)
+
+ def test_zmq_msg_data(self):
+ zmq_msg = ffi.new('zmq_msg_t*')
+ message = ffi.new('char[]', 'Hello')
+ assert 0 == C.zmq_msg_init_data(zmq_msg,
+ ffi.cast('void*', message),
+ 5,
+ ffi.NULL,
+ ffi.NULL)
+
+ data = C.zmq_msg_data(zmq_msg)
+
+ assert ffi.NULL != zmq_msg
+ assert ffi.string(ffi.cast("char*", data)) == 'Hello'
+ assert 0 == C.zmq_msg_close(zmq_msg)
+
+
+ def test_zmq_send(self):
+ ctx = C.zmq_ctx_new()
+
+ sender = C.zmq_socket(ctx, REQ)
+ receiver = C.zmq_socket(ctx, REP)
+
+ assert 0 == C.zmq_bind(receiver, 'tcp://*:7777')
+ assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:7777')
+
+ time.sleep(0.1)
+
+ zmq_msg = ffi.new('zmq_msg_t*')
+ message = ffi.new('char[5]', 'Hello')
+
+ C.zmq_msg_init_data(zmq_msg,
+ ffi.cast('void*', message),
+ ffi.cast('size_t', 5),
+ ffi.NULL,
+ ffi.NULL)
+
+ assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
+ assert 0 == C.zmq_msg_close(zmq_msg)
+ assert C.zmq_close(sender) == 0
+ assert C.zmq_close(receiver) == 0
+ assert C.zmq_ctx_destroy(ctx) == 0
+
+ def test_zmq_recv(self):
+ ctx = C.zmq_ctx_new()
+
+ sender = C.zmq_socket(ctx, REQ)
+ receiver = C.zmq_socket(ctx, REP)
+
+ assert 0 == C.zmq_bind(receiver, 'tcp://*:2222')
+ assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:2222')
+
+ time.sleep(0.1)
+
+ zmq_msg = ffi.new('zmq_msg_t*')
+ message = ffi.new('char[5]', 'Hello')
+
+ C.zmq_msg_init_data(zmq_msg,
+ ffi.cast('void*', message),
+ ffi.cast('size_t', 5),
+ ffi.NULL,
+ ffi.NULL)
+
+ zmq_msg2 = ffi.new('zmq_msg_t*')
+ C.zmq_msg_init(zmq_msg2)
+
+ assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
+ assert 5 == C.zmq_msg_recv(zmq_msg2, receiver, 0)
+ assert 5 == C.zmq_msg_size(zmq_msg2)
+ assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
+ C.zmq_msg_size(zmq_msg2))[:]
+ assert C.zmq_close(sender) == 0
+ assert C.zmq_close(receiver) == 0
+ assert C.zmq_ctx_destroy(ctx) == 0
+
+ def test_zmq_poll(self):
+ ctx = C.zmq_ctx_new()
+
+ sender = C.zmq_socket(ctx, REQ)
+ receiver = C.zmq_socket(ctx, REP)
+
+ r1 = C.zmq_bind(receiver, 'tcp://*:3333')
+ r2 = C.zmq_connect(sender, 'tcp://127.0.0.1:3333')
+
+ zmq_msg = ffi.new('zmq_msg_t*')
+ message = ffi.new('char[5]', 'Hello')
+
+ C.zmq_msg_init_data(zmq_msg,
+ ffi.cast('void*', message),
+ ffi.cast('size_t', 5),
+ ffi.NULL,
+ ffi.NULL)
+
+ receiver_pollitem = ffi.new('zmq_pollitem_t*')
+ receiver_pollitem.socket = receiver
+ receiver_pollitem.fd = 0
+ receiver_pollitem.events = POLLIN | POLLOUT
+ receiver_pollitem.revents = 0
+
+ ret = C.zmq_poll(ffi.NULL, 0, 0)
+ assert ret == 0
+
+ ret = C.zmq_poll(receiver_pollitem, 1, 0)
+ assert ret == 0
+
+ ret = C.zmq_msg_send(zmq_msg, sender, 0)
+ print(ffi.string(C.zmq_strerror(C.zmq_errno())))
+ assert ret == 5
+
+ time.sleep(0.2)
+
+ ret = C.zmq_poll(receiver_pollitem, 1, 0)
+ assert ret == 1
+
+ assert int(receiver_pollitem.revents) & POLLIN
+ assert not int(receiver_pollitem.revents) & POLLOUT
+
+ zmq_msg2 = ffi.new('zmq_msg_t*')
+ C.zmq_msg_init(zmq_msg2)
+
+ ret_recv = C.zmq_msg_recv(zmq_msg2, receiver, 0)
+ assert ret_recv == 5
+
+ assert 5 == C.zmq_msg_size(zmq_msg2)
+ assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
+ C.zmq_msg_size(zmq_msg2))[:]
+
+ sender_pollitem = ffi.new('zmq_pollitem_t*')
+ sender_pollitem.socket = sender
+ sender_pollitem.fd = 0
+ sender_pollitem.events = POLLIN | POLLOUT
+ sender_pollitem.revents = 0
+
+ ret = C.zmq_poll(sender_pollitem, 1, 0)
+ assert ret == 0
+
+ zmq_msg_again = ffi.new('zmq_msg_t*')
+ message_again = ffi.new('char[11]', 'Hello Again')
+
+ C.zmq_msg_init_data(zmq_msg_again,
+ ffi.cast('void*', message_again),
+ ffi.cast('size_t', 11),
+ ffi.NULL,
+ ffi.NULL)
+
+ assert 11 == C.zmq_msg_send(zmq_msg_again, receiver, 0)
+
+ time.sleep(0.2)
+
+ assert 0 <= C.zmq_poll(sender_pollitem, 1, 0)
+ assert int(sender_pollitem.revents) & POLLIN
+ assert 11 == C.zmq_msg_recv(zmq_msg2, sender, 0)
+ assert 11 == C.zmq_msg_size(zmq_msg2)
+ assert b"Hello Again" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
+ int(C.zmq_msg_size(zmq_msg2)))[:]
+ assert 0 == C.zmq_close(sender)
+ assert 0 == C.zmq_close(receiver)
+ assert 0 == C.zmq_ctx_destroy(ctx)
+ assert 0 == C.zmq_msg_close(zmq_msg)
+ assert 0 == C.zmq_msg_close(zmq_msg2)
+ assert 0 == C.zmq_msg_close(zmq_msg_again)
+
+ def test_zmq_stopwatch_functions(self):
+ stopwatch = C.zmq_stopwatch_start()
+ ret = C.zmq_stopwatch_stop(stopwatch)
+
+ assert ffi.NULL != stopwatch
+ assert 0 < int(ret)
+
+ def test_zmq_sleep(self):
+ try:
+ C.zmq_sleep(1)
+ except Exception as e:
+ raise AssertionError("Error executing zmq_sleep(int)")
+
diff --git a/external_libs/python/zmq/tests/test_constants.py b/external_libs/python/zmq/tests/test_constants.py
new file mode 100644
index 00000000..d32b2b48
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_constants.py
@@ -0,0 +1,104 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import json
+from unittest import TestCase
+
+import zmq
+
+from zmq.utils import constant_names
+from zmq.sugar import constants as sugar_constants
+from zmq.backend import constants as backend_constants
+
+all_set = set(constant_names.all_names)
+
+class TestConstants(TestCase):
+
+ def _duplicate_test(self, namelist, listname):
+ """test that a given list has no duplicates"""
+ dupes = {}
+ for name in set(namelist):
+ cnt = namelist.count(name)
+ if cnt > 1:
+ dupes[name] = cnt
+ if dupes:
+ self.fail("The following names occur more than once in %s: %s" % (listname, json.dumps(dupes, indent=2)))
+
+ def test_duplicate_all(self):
+ return self._duplicate_test(constant_names.all_names, "all_names")
+
+ def _change_key(self, change, version):
+ """return changed-in key"""
+ return "%s-in %d.%d.%d" % tuple([change] + list(version))
+
+ def test_duplicate_changed(self):
+ all_changed = []
+ for change in ("new", "removed"):
+ d = getattr(constant_names, change + "_in")
+ for version, namelist in d.items():
+ all_changed.extend(namelist)
+ self._duplicate_test(namelist, self._change_key(change, version))
+
+ self._duplicate_test(all_changed, "all-changed")
+
+ def test_changed_in_all(self):
+ missing = {}
+ for change in ("new", "removed"):
+ d = getattr(constant_names, change + "_in")
+ for version, namelist in d.items():
+ key = self._change_key(change, version)
+ for name in namelist:
+ if name not in all_set:
+ if key not in missing:
+ missing[key] = []
+ missing[key].append(name)
+
+ if missing:
+ self.fail(
+ "The following names are missing in `all_names`: %s" % json.dumps(missing, indent=2)
+ )
+
+ def test_no_negative_constants(self):
+ for name in sugar_constants.__all__:
+ self.assertNotEqual(getattr(zmq, name), sugar_constants._UNDEFINED)
+
+ def test_undefined_constants(self):
+ all_aliases = []
+ for alias_group in sugar_constants.aliases:
+ all_aliases.extend(alias_group)
+
+ for name in all_set.difference(all_aliases):
+ raw = getattr(backend_constants, name)
+ if raw == sugar_constants._UNDEFINED:
+ self.assertRaises(AttributeError, getattr, zmq, name)
+ else:
+ self.assertEqual(getattr(zmq, name), raw)
+
+ def test_new(self):
+ zmq_version = zmq.zmq_version_info()
+ for version, new_names in constant_names.new_in.items():
+ should_have = zmq_version >= version
+ for name in new_names:
+ try:
+ value = getattr(zmq, name)
+ except AttributeError:
+ if should_have:
+ self.fail("AttributeError: zmq.%s" % name)
+ else:
+ if not should_have:
+ self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
+
+ def test_removed(self):
+ zmq_version = zmq.zmq_version_info()
+ for version, new_names in constant_names.removed_in.items():
+ should_have = zmq_version < version
+ for name in new_names:
+ try:
+ value = getattr(zmq, name)
+ except AttributeError:
+ if should_have:
+ self.fail("AttributeError: zmq.%s" % name)
+ else:
+ if not should_have:
+ self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
+
diff --git a/external_libs/python/zmq/tests/test_context.py b/external_libs/python/zmq/tests/test_context.py
new file mode 100644
index 00000000..e3280778
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_context.py
@@ -0,0 +1,257 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import gc
+import sys
+import time
+from threading import Thread, Event
+
+import zmq
+from zmq.tests import (
+ BaseZMQTestCase, have_gevent, GreenTest, skip_green, PYPY, SkipTest,
+)
+
+
+class TestContext(BaseZMQTestCase):
+
+ def test_init(self):
+ c1 = self.Context()
+ self.assert_(isinstance(c1, self.Context))
+ del c1
+ c2 = self.Context()
+ self.assert_(isinstance(c2, self.Context))
+ del c2
+ c3 = self.Context()
+ self.assert_(isinstance(c3, self.Context))
+ del c3
+
+ def test_dir(self):
+ ctx = self.Context()
+ self.assertTrue('socket' in dir(ctx))
+ if zmq.zmq_version_info() > (3,):
+ self.assertTrue('IO_THREADS' in dir(ctx))
+ ctx.term()
+
+ def test_term(self):
+ c = self.Context()
+ c.term()
+ self.assert_(c.closed)
+
+ def test_context_manager(self):
+ with self.Context() as c:
+ pass
+ self.assert_(c.closed)
+
+ def test_fail_init(self):
+ self.assertRaisesErrno(zmq.EINVAL, self.Context, -1)
+
+ def test_term_hang(self):
+ rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
+ req.setsockopt(zmq.LINGER, 0)
+ req.send(b'hello', copy=False)
+ req.close()
+ rep.close()
+ self.context.term()
+
+ def test_instance(self):
+ ctx = self.Context.instance()
+ c2 = self.Context.instance(io_threads=2)
+ self.assertTrue(c2 is ctx)
+ c2.term()
+ c3 = self.Context.instance()
+ c4 = self.Context.instance()
+ self.assertFalse(c3 is c2)
+ self.assertFalse(c3.closed)
+ self.assertTrue(c3 is c4)
+
+ def test_many_sockets(self):
+ """opening and closing many sockets shouldn't cause problems"""
+ ctx = self.Context()
+ for i in range(16):
+ sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
+ [ s.close() for s in sockets ]
+ # give the reaper a chance
+ time.sleep(1e-2)
+ ctx.term()
+
+ def test_sockopts(self):
+ """setting socket options with ctx attributes"""
+ ctx = self.Context()
+ ctx.linger = 5
+ self.assertEqual(ctx.linger, 5)
+ s = ctx.socket(zmq.REQ)
+ self.assertEqual(s.linger, 5)
+ self.assertEqual(s.getsockopt(zmq.LINGER), 5)
+ s.close()
+ # check that subscribe doesn't get set on sockets that don't subscribe:
+ ctx.subscribe = b''
+ s = ctx.socket(zmq.REQ)
+ s.close()
+
+ ctx.term()
+
+
+ def test_destroy(self):
+ """Context.destroy should close sockets"""
+ ctx = self.Context()
+ sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
+
+ # close half of the sockets
+ [ s.close() for s in sockets[::2] ]
+
+ ctx.destroy()
+ # reaper is not instantaneous
+ time.sleep(1e-2)
+ for s in sockets:
+ self.assertTrue(s.closed)
+
+ def test_destroy_linger(self):
+ """Context.destroy should set linger on closing sockets"""
+ req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
+ req.send(b'hi')
+ time.sleep(1e-2)
+ self.context.destroy(linger=0)
+ # reaper is not instantaneous
+ time.sleep(1e-2)
+ for s in (req,rep):
+ self.assertTrue(s.closed)
+
+ def test_term_noclose(self):
+ """Context.term won't close sockets"""
+ ctx = self.Context()
+ s = ctx.socket(zmq.REQ)
+ self.assertFalse(s.closed)
+ t = Thread(target=ctx.term)
+ t.start()
+ t.join(timeout=0.1)
+ self.assertTrue(t.is_alive(), "Context should be waiting")
+ s.close()
+ t.join(timeout=0.1)
+ self.assertFalse(t.is_alive(), "Context should have closed")
+
+ def test_gc(self):
+ """test close&term by garbage collection alone"""
+ if PYPY:
+ raise SkipTest("GC doesn't work ")
+
+ # test credit @dln (GH #137):
+ def gcf():
+ def inner():
+ ctx = self.Context()
+ s = ctx.socket(zmq.PUSH)
+ inner()
+ gc.collect()
+ t = Thread(target=gcf)
+ t.start()
+ t.join(timeout=1)
+ self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
+
+ def test_cyclic_destroy(self):
+ """ctx.destroy should succeed when cyclic ref prevents gc"""
+ # test credit @dln (GH #137):
+ class CyclicReference(object):
+ def __init__(self, parent=None):
+ self.parent = parent
+
+ def crash(self, sock):
+ self.sock = sock
+ self.child = CyclicReference(self)
+
+ def crash_zmq():
+ ctx = self.Context()
+ sock = ctx.socket(zmq.PULL)
+ c = CyclicReference()
+ c.crash(sock)
+ ctx.destroy()
+
+ crash_zmq()
+
+ def test_term_thread(self):
+ """ctx.term should not crash active threads (#139)"""
+ ctx = self.Context()
+ evt = Event()
+ evt.clear()
+
+ def block():
+ s = ctx.socket(zmq.REP)
+ s.bind_to_random_port('tcp://127.0.0.1')
+ evt.set()
+ try:
+ s.recv()
+ except zmq.ZMQError as e:
+ self.assertEqual(e.errno, zmq.ETERM)
+ return
+ finally:
+ s.close()
+ self.fail("recv should have been interrupted with ETERM")
+ t = Thread(target=block)
+ t.start()
+
+ evt.wait(1)
+ self.assertTrue(evt.is_set(), "sync event never fired")
+ time.sleep(0.01)
+ ctx.term()
+ t.join(timeout=1)
+ self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
+
+ def test_destroy_no_sockets(self):
+ ctx = self.Context()
+ s = ctx.socket(zmq.PUB)
+ s.bind_to_random_port('tcp://127.0.0.1')
+ s.close()
+ ctx.destroy()
+ assert s.closed
+ assert ctx.closed
+
+ def test_ctx_opts(self):
+ if zmq.zmq_version_info() < (3,):
+ raise SkipTest("context options require libzmq 3")
+ ctx = self.Context()
+ ctx.set(zmq.MAX_SOCKETS, 2)
+ self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 2)
+ ctx.max_sockets = 100
+ self.assertEqual(ctx.max_sockets, 100)
+ self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 100)
+
+ def test_shadow(self):
+ ctx = self.Context()
+ ctx2 = self.Context.shadow(ctx.underlying)
+ self.assertEqual(ctx.underlying, ctx2.underlying)
+ s = ctx.socket(zmq.PUB)
+ s.close()
+ del ctx2
+ self.assertFalse(ctx.closed)
+ s = ctx.socket(zmq.PUB)
+ ctx2 = self.Context.shadow(ctx.underlying)
+ s2 = ctx2.socket(zmq.PUB)
+ s.close()
+ s2.close()
+ ctx.term()
+ self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
+ del ctx2
+
+ def test_shadow_pyczmq(self):
+ try:
+ from pyczmq import zctx, zsocket, zstr
+ except Exception:
+ raise SkipTest("Requires pyczmq")
+
+ ctx = zctx.new()
+ a = zsocket.new(ctx, zmq.PUSH)
+ zsocket.bind(a, "inproc://a")
+ ctx2 = self.Context.shadow_pyczmq(ctx)
+ b = ctx2.socket(zmq.PULL)
+ b.connect("inproc://a")
+ zstr.send(a, b'hi')
+ rcvd = self.recv(b)
+ self.assertEqual(rcvd, b'hi')
+ b.close()
+
+
+if False: # disable green context tests
+ class TestContextGreen(GreenTest, TestContext):
+ """gevent subclass of context tests"""
+ # skip tests that use real threads:
+ test_gc = GreenTest.skip_green
+ test_term_thread = GreenTest.skip_green
+ test_destroy_linger = GreenTest.skip_green
diff --git a/external_libs/python/zmq/tests/test_device.py b/external_libs/python/zmq/tests/test_device.py
new file mode 100644
index 00000000..f8305074
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_device.py
@@ -0,0 +1,146 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import time
+
+import zmq
+from zmq import devices
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest, PYPY
+from zmq.utils.strtypes import (bytes,unicode,basestring)
+
+if PYPY:
+ # cleanup of shared Context doesn't work on PyPy
+ devices.Device.context_factory = zmq.Context
+
+class TestDevice(BaseZMQTestCase):
+
+ def test_device_types(self):
+ for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
+ dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
+ self.assertEqual(dev.device_type, devtype)
+ del dev
+
+ def test_device_attributes(self):
+ dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
+ self.assertEqual(dev.in_type, zmq.SUB)
+ self.assertEqual(dev.out_type, zmq.PUB)
+ self.assertEqual(dev.device_type, zmq.QUEUE)
+ self.assertEqual(dev.daemon, True)
+ del dev
+
+ def test_tsdevice_attributes(self):
+ dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
+ self.assertEqual(dev.in_type, zmq.SUB)
+ self.assertEqual(dev.out_type, zmq.PUB)
+ self.assertEqual(dev.device_type, zmq.QUEUE)
+ self.assertEqual(dev.daemon, True)
+ del dev
+
+
+ def test_single_socket_forwarder_connect(self):
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ req = self.context.socket(zmq.REQ)
+ port = req.bind_to_random_port('tcp://127.0.0.1')
+ dev.connect_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ req = self.context.socket(zmq.REQ)
+ port = req.bind_to_random_port('tcp://127.0.0.1')
+ dev.connect_out('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello again'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+
+ def test_single_socket_forwarder_bind(self):
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ # select random port:
+ binder = self.context.socket(zmq.REQ)
+ port = binder.bind_to_random_port('tcp://127.0.0.1')
+ binder.close()
+ time.sleep(0.1)
+ req = self.context.socket(zmq.REQ)
+ req.connect('tcp://127.0.0.1:%i'%port)
+ dev.bind_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+ dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
+ # select random port:
+ binder = self.context.socket(zmq.REQ)
+ port = binder.bind_to_random_port('tcp://127.0.0.1')
+ binder.close()
+ time.sleep(0.1)
+ req = self.context.socket(zmq.REQ)
+ req.connect('tcp://127.0.0.1:%i'%port)
+ dev.bind_in('tcp://127.0.0.1:%i'%port)
+ dev.start()
+ time.sleep(.25)
+ msg = b'hello again'
+ req.send(msg)
+ self.assertEqual(msg, self.recv(req))
+ del dev
+ req.close()
+
+ def test_proxy(self):
+ if zmq.zmq_version_info() < (3,2):
+ raise SkipTest("Proxies only in libzmq >= 3")
+ dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
+ binder = self.context.socket(zmq.REQ)
+ iface = 'tcp://127.0.0.1'
+ port = binder.bind_to_random_port(iface)
+ port2 = binder.bind_to_random_port(iface)
+ port3 = binder.bind_to_random_port(iface)
+ binder.close()
+ time.sleep(0.1)
+ dev.bind_in("%s:%i" % (iface, port))
+ dev.bind_out("%s:%i" % (iface, port2))
+ dev.bind_mon("%s:%i" % (iface, port3))
+ dev.start()
+ time.sleep(0.25)
+ msg = b'hello'
+ push = self.context.socket(zmq.PUSH)
+ push.connect("%s:%i" % (iface, port))
+ pull = self.context.socket(zmq.PULL)
+ pull.connect("%s:%i" % (iface, port2))
+ mon = self.context.socket(zmq.PULL)
+ mon.connect("%s:%i" % (iface, port3))
+ push.send(msg)
+ self.sockets.extend([push, pull, mon])
+ self.assertEqual(msg, self.recv(pull))
+ self.assertEqual(msg, self.recv(mon))
+
+if have_gevent:
+ import gevent
+ import zmq.green
+
+ class TestDeviceGreen(GreenTest, BaseZMQTestCase):
+
+ def test_green_device(self):
+ rep = self.context.socket(zmq.REP)
+ req = self.context.socket(zmq.REQ)
+ self.sockets.extend([req, rep])
+ port = rep.bind_to_random_port('tcp://127.0.0.1')
+ g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
+ req.connect('tcp://127.0.0.1:%i' % port)
+ req.send(b'hi')
+ timeout = gevent.Timeout(3)
+ timeout.start()
+ receiver = gevent.spawn(req.recv)
+ self.assertEqual(receiver.get(2), b'hi')
+ timeout.cancel()
+ g.kill(block=True)
+
diff --git a/external_libs/python/zmq/tests/test_error.py b/external_libs/python/zmq/tests/test_error.py
new file mode 100644
index 00000000..a2eee14a
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_error.py
@@ -0,0 +1,43 @@
+# -*- coding: utf8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import sys
+import time
+
+import zmq
+from zmq import ZMQError, strerror, Again, ContextTerminated
+from zmq.tests import BaseZMQTestCase
+
+if sys.version_info[0] >= 3:
+ long = int
+
+class TestZMQError(BaseZMQTestCase):
+
+ def test_strerror(self):
+ """test that strerror gets the right type."""
+ for i in range(10):
+ e = strerror(i)
+ self.assertTrue(isinstance(e, str))
+
+ def test_zmqerror(self):
+ for errno in range(10):
+ e = ZMQError(errno)
+ self.assertEqual(e.errno, errno)
+ self.assertEqual(str(e), strerror(errno))
+
+ def test_again(self):
+ s = self.context.socket(zmq.REP)
+ self.assertRaises(Again, s.recv, zmq.NOBLOCK)
+ self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
+ s.close()
+
+ def atest_ctxterm(self):
+ s = self.context.socket(zmq.REP)
+ t = Thread(target=self.context.term)
+ t.start()
+ self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK)
+ self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK)
+ s.close()
+ t.join()
+
diff --git a/external_libs/python/zmq/tests/test_etc.py b/external_libs/python/zmq/tests/test_etc.py
new file mode 100644
index 00000000..ad224064
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_etc.py
@@ -0,0 +1,15 @@
+# Copyright (c) PyZMQ Developers.
+# Distributed under the terms of the Modified BSD License.
+
+import sys
+
+import zmq
+
+from . import skip_if
+
+@skip_if(zmq.zmq_version_info() < (4,1), "libzmq < 4.1")
+def test_has():
+ assert not zmq.has('something weird')
+ has_ipc = zmq.has('ipc')
+ not_windows = not sys.platform.startswith('win')
+ assert has_ipc == not_windows
diff --git a/external_libs/python/zmq/tests/test_imports.py b/external_libs/python/zmq/tests/test_imports.py
new file mode 100644
index 00000000..c0ddfaac
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_imports.py
@@ -0,0 +1,62 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import sys
+from unittest import TestCase
+
+class TestImports(TestCase):
+ """Test Imports - the quickest test to ensure that we haven't
+ introduced version-incompatible syntax errors."""
+
+ def test_toplevel(self):
+ """test toplevel import"""
+ import zmq
+
+ def test_core(self):
+ """test core imports"""
+ from zmq import Context
+ from zmq import Socket
+ from zmq import Poller
+ from zmq import Frame
+ from zmq import constants
+ from zmq import device, proxy
+ from zmq import Stopwatch
+ from zmq import (
+ zmq_version,
+ zmq_version_info,
+ pyzmq_version,
+ pyzmq_version_info,
+ )
+
+ def test_devices(self):
+ """test device imports"""
+ import zmq.devices
+ from zmq.devices import basedevice
+ from zmq.devices import monitoredqueue
+ from zmq.devices import monitoredqueuedevice
+
+ def test_log(self):
+ """test log imports"""
+ import zmq.log
+ from zmq.log import handlers
+
+ def test_eventloop(self):
+ """test eventloop imports"""
+ import zmq.eventloop
+ from zmq.eventloop import ioloop
+ from zmq.eventloop import zmqstream
+ from zmq.eventloop.minitornado.platform import auto
+ from zmq.eventloop.minitornado import ioloop
+
+ def test_utils(self):
+ """test util imports"""
+ import zmq.utils
+ from zmq.utils import strtypes
+ from zmq.utils import jsonapi
+
+ def test_ssh(self):
+ """test ssh imports"""
+ from zmq.ssh import tunnel
+
+
+
diff --git a/external_libs/python/zmq/tests/test_ioloop.py b/external_libs/python/zmq/tests/test_ioloop.py
new file mode 100644
index 00000000..2a8b1153
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_ioloop.py
@@ -0,0 +1,113 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import time
+import os
+import threading
+
+import zmq
+from zmq.tests import BaseZMQTestCase
+from zmq.eventloop import ioloop
+from zmq.eventloop.minitornado.ioloop import _Timeout
+try:
+ from tornado.ioloop import PollIOLoop, IOLoop as BaseIOLoop
+except ImportError:
+ from zmq.eventloop.minitornado.ioloop import IOLoop as BaseIOLoop
+
+
+def printer():
+ os.system("say hello")
+ raise Exception
+ print (time.time())
+
+
+class Delay(threading.Thread):
+ def __init__(self, f, delay=1):
+ self.f=f
+ self.delay=delay
+ self.aborted=False
+ self.cond=threading.Condition()
+ super(Delay, self).__init__()
+
+ def run(self):
+ self.cond.acquire()
+ self.cond.wait(self.delay)
+ self.cond.release()
+ if not self.aborted:
+ self.f()
+
+ def abort(self):
+ self.aborted=True
+ self.cond.acquire()
+ self.cond.notify()
+ self.cond.release()
+
+
+class TestIOLoop(BaseZMQTestCase):
+
+ def test_simple(self):
+ """simple IOLoop creation test"""
+ loop = ioloop.IOLoop()
+ dc = ioloop.PeriodicCallback(loop.stop, 200, loop)
+ pc = ioloop.PeriodicCallback(lambda : None, 10, loop)
+ pc.start()
+ dc.start()
+ t = Delay(loop.stop,1)
+ t.start()
+ loop.start()
+ if t.isAlive():
+ t.abort()
+ else:
+ self.fail("IOLoop failed to exit")
+
+ def test_timeout_compare(self):
+ """test timeout comparisons"""
+ loop = ioloop.IOLoop()
+ t = _Timeout(1, 2, loop)
+ t2 = _Timeout(1, 3, loop)
+ self.assertEqual(t < t2, id(t) < id(t2))
+ t2 = _Timeout(2,1, loop)
+ self.assertTrue(t < t2)
+
+ def test_poller_events(self):
+ """Tornado poller implementation maps events correctly"""
+ req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
+ poller = ioloop.ZMQPoller()
+ poller.register(req, ioloop.IOLoop.READ)
+ poller.register(rep, ioloop.IOLoop.READ)
+ events = dict(poller.poll(0))
+ self.assertEqual(events.get(rep), None)
+ self.assertEqual(events.get(req), None)
+
+ poller.register(req, ioloop.IOLoop.WRITE)
+ poller.register(rep, ioloop.IOLoop.WRITE)
+ events = dict(poller.poll(1))
+ self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
+ self.assertEqual(events.get(rep), None)
+
+ poller.register(rep, ioloop.IOLoop.READ)
+ req.send(b'hi')
+ events = dict(poller.poll(1))
+ self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
+ self.assertEqual(events.get(req), None)
+
+ def test_instance(self):
+ """Test IOLoop.instance returns the right object"""
+ loop = ioloop.IOLoop.instance()
+ self.assertEqual(loop.__class__, ioloop.IOLoop)
+ loop = BaseIOLoop.instance()
+ self.assertEqual(loop.__class__, ioloop.IOLoop)
+
+ def test_close_all(self):
+ """Test close(all_fds=True)"""
+ loop = ioloop.IOLoop.instance()
+ req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
+ loop.add_handler(req, lambda msg: msg, ioloop.IOLoop.READ)
+ loop.add_handler(rep, lambda msg: msg, ioloop.IOLoop.READ)
+ self.assertEqual(req.closed, False)
+ self.assertEqual(rep.closed, False)
+ loop.close(all_fds=True)
+ self.assertEqual(req.closed, True)
+ self.assertEqual(rep.closed, True)
+
+
diff --git a/external_libs/python/zmq/tests/test_log.py b/external_libs/python/zmq/tests/test_log.py
new file mode 100644
index 00000000..9206f095
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_log.py
@@ -0,0 +1,116 @@
+# encoding: utf-8
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import logging
+import time
+from unittest import TestCase
+
+import zmq
+from zmq.log import handlers
+from zmq.utils.strtypes import b, u
+from zmq.tests import BaseZMQTestCase
+
+
+class TestPubLog(BaseZMQTestCase):
+
+ iface = 'inproc://zmqlog'
+ topic= 'zmq'
+
+ @property
+ def logger(self):
+ # print dir(self)
+ logger = logging.getLogger('zmqtest')
+ logger.setLevel(logging.DEBUG)
+ return logger
+
+ def connect_handler(self, topic=None):
+ topic = self.topic if topic is None else topic
+ logger = self.logger
+ pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ handler = handlers.PUBHandler(pub)
+ handler.setLevel(logging.DEBUG)
+ handler.root_topic = topic
+ logger.addHandler(handler)
+ sub.setsockopt(zmq.SUBSCRIBE, b(topic))
+ time.sleep(0.1)
+ return logger, handler, sub
+
+ def test_init_iface(self):
+ logger = self.logger
+ ctx = self.context
+ handler = handlers.PUBHandler(self.iface)
+ self.assertFalse(handler.ctx is ctx)
+ self.sockets.append(handler.socket)
+ # handler.ctx.term()
+ handler = handlers.PUBHandler(self.iface, self.context)
+ self.sockets.append(handler.socket)
+ self.assertTrue(handler.ctx is ctx)
+ handler.setLevel(logging.DEBUG)
+ handler.root_topic = self.topic
+ logger.addHandler(handler)
+ sub = ctx.socket(zmq.SUB)
+ self.sockets.append(sub)
+ sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
+ sub.connect(self.iface)
+ import time; time.sleep(0.25)
+ msg1 = 'message'
+ logger.info(msg1)
+
+ (topic, msg2) = sub.recv_multipart()
+ self.assertEqual(topic, b'zmq.INFO')
+ self.assertEqual(msg2, b(msg1)+b'\n')
+ logger.removeHandler(handler)
+
+ def test_init_socket(self):
+ pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ logger = self.logger
+ handler = handlers.PUBHandler(pub)
+ handler.setLevel(logging.DEBUG)
+ handler.root_topic = self.topic
+ logger.addHandler(handler)
+
+ self.assertTrue(handler.socket is pub)
+ self.assertTrue(handler.ctx is pub.context)
+ self.assertTrue(handler.ctx is self.context)
+ sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
+ import time; time.sleep(0.1)
+ msg1 = 'message'
+ logger.info(msg1)
+
+ (topic, msg2) = sub.recv_multipart()
+ self.assertEqual(topic, b'zmq.INFO')
+ self.assertEqual(msg2, b(msg1)+b'\n')
+ logger.removeHandler(handler)
+
+ def test_root_topic(self):
+ logger, handler, sub = self.connect_handler()
+ handler.socket.bind(self.iface)
+ sub2 = sub.context.socket(zmq.SUB)
+ self.sockets.append(sub2)
+ sub2.connect(self.iface)
+ sub2.setsockopt(zmq.SUBSCRIBE, b'')
+ handler.root_topic = b'twoonly'
+ msg1 = 'ignored'
+ logger.info(msg1)
+ self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
+ topic,msg2 = sub2.recv_multipart()
+ self.assertEqual(topic, b'twoonly.INFO')
+ self.assertEqual(msg2, b(msg1)+b'\n')
+
+ logger.removeHandler(handler)
+
+ def test_unicode_message(self):
+ logger, handler, sub = self.connect_handler()
+ base_topic = b(self.topic + '.INFO')
+ for msg, expected in [
+ (u('hello'), [base_topic, b('hello\n')]),
+ (u('héllo'), [base_topic, b('héllo\n')]),
+ (u('tøpic::héllo'), [base_topic + b('.tøpic'), b('héllo\n')]),
+ ]:
+ logger.info(msg)
+ received = sub.recv_multipart()
+ self.assertEqual(received, expected)
+
diff --git a/external_libs/python/zmq/tests/test_message.py b/external_libs/python/zmq/tests/test_message.py
new file mode 100644
index 00000000..d8770bdf
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_message.py
@@ -0,0 +1,362 @@
+# -*- coding: utf8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import copy
+import sys
+try:
+ from sys import getrefcount as grc
+except ImportError:
+ grc = None
+
+import time
+from pprint import pprint
+from unittest import TestCase
+
+import zmq
+from zmq.tests import BaseZMQTestCase, SkipTest, skip_pypy, PYPY
+from zmq.utils.strtypes import unicode, bytes, b, u
+
+
+# some useful constants:
+
+x = b'x'
+
+try:
+ view = memoryview
+except NameError:
+ view = buffer
+
+if grc:
+ rc0 = grc(x)
+ v = view(x)
+ view_rc = grc(x) - rc0
+
+def await_gc(obj, rc):
+ """wait for refcount on an object to drop to an expected value
+
+ Necessary because of the zero-copy gc thread,
+ which can take some time to receive its DECREF message.
+ """
+ for i in range(50):
+ # rc + 2 because of the refs in this function
+ if grc(obj) <= rc + 2:
+ return
+ time.sleep(0.05)
+
+class TestFrame(BaseZMQTestCase):
+
+ @skip_pypy
+ def test_above_30(self):
+ """Message above 30 bytes are never copied by 0MQ."""
+ for i in range(5, 16): # 32, 64,..., 65536
+ s = (2**i)*x
+ self.assertEqual(grc(s), 2)
+ m = zmq.Frame(s)
+ self.assertEqual(grc(s), 4)
+ del m
+ await_gc(s, 2)
+ self.assertEqual(grc(s), 2)
+ del s
+
+ def test_str(self):
+ """Test the str representations of the Frames."""
+ for i in range(16):
+ s = (2**i)*x
+ m = zmq.Frame(s)
+ m_str = str(m)
+ m_str_b = b(m_str) # py3compat
+ self.assertEqual(s, m_str_b)
+
+ def test_bytes(self):
+ """Test the Frame.bytes property."""
+ for i in range(1,16):
+ s = (2**i)*x
+ m = zmq.Frame(s)
+ b = m.bytes
+ self.assertEqual(s, m.bytes)
+ if not PYPY:
+ # check that it copies
+ self.assert_(b is not s)
+ # check that it copies only once
+ self.assert_(b is m.bytes)
+
+ def test_unicode(self):
+ """Test the unicode representations of the Frames."""
+ s = u('asdf')
+ self.assertRaises(TypeError, zmq.Frame, s)
+ for i in range(16):
+ s = (2**i)*u('§')
+ m = zmq.Frame(s.encode('utf8'))
+ self.assertEqual(s, unicode(m.bytes,'utf8'))
+
+ def test_len(self):
+ """Test the len of the Frames."""
+ for i in range(16):
+ s = (2**i)*x
+ m = zmq.Frame(s)
+ self.assertEqual(len(s), len(m))
+
+ @skip_pypy
+ def test_lifecycle1(self):
+ """Run through a ref counting cycle with a copy."""
+ for i in range(5, 16): # 32, 64,..., 65536
+ s = (2**i)*x
+ rc = 2
+ self.assertEqual(grc(s), rc)
+ m = zmq.Frame(s)
+ rc += 2
+ self.assertEqual(grc(s), rc)
+ m2 = copy.copy(m)
+ rc += 1
+ self.assertEqual(grc(s), rc)
+ buf = m2.buffer
+
+ rc += view_rc
+ self.assertEqual(grc(s), rc)
+
+ self.assertEqual(s, b(str(m)))
+ self.assertEqual(s, bytes(m2))
+ self.assertEqual(s, m.bytes)
+ # self.assert_(s is str(m))
+ # self.assert_(s is str(m2))
+ del m2
+ rc -= 1
+ self.assertEqual(grc(s), rc)
+ rc -= view_rc
+ del buf
+ self.assertEqual(grc(s), rc)
+ del m
+ rc -= 2
+ await_gc(s, rc)
+ self.assertEqual(grc(s), rc)
+ self.assertEqual(rc, 2)
+ del s
+
+ @skip_pypy
+ def test_lifecycle2(self):
+ """Run through a different ref counting cycle with a copy."""
+ for i in range(5, 16): # 32, 64,..., 65536
+ s = (2**i)*x
+ rc = 2
+ self.assertEqual(grc(s), rc)
+ m = zmq.Frame(s)
+ rc += 2
+ self.assertEqual(grc(s), rc)
+ m2 = copy.copy(m)
+ rc += 1
+ self.assertEqual(grc(s), rc)
+ buf = m.buffer
+ rc += view_rc
+ self.assertEqual(grc(s), rc)
+ self.assertEqual(s, b(str(m)))
+ self.assertEqual(s, bytes(m2))
+ self.assertEqual(s, m2.bytes)
+ self.assertEqual(s, m.bytes)
+ # self.assert_(s is str(m))
+ # self.assert_(s is str(m2))
+ del buf
+ self.assertEqual(grc(s), rc)
+ del m
+ # m.buffer is kept until m is del'd
+ rc -= view_rc
+ rc -= 1
+ self.assertEqual(grc(s), rc)
+ del m2
+ rc -= 2
+ await_gc(s, rc)
+ self.assertEqual(grc(s), rc)
+ self.assertEqual(rc, 2)
+ del s
+
+ @skip_pypy
+ def test_tracker(self):
+ m = zmq.Frame(b'asdf', track=True)
+ self.assertFalse(m.tracker.done)
+ pm = zmq.MessageTracker(m)
+ self.assertFalse(pm.done)
+ del m
+ for i in range(10):
+ if pm.done:
+ break
+ time.sleep(0.1)
+ self.assertTrue(pm.done)
+
+ def test_no_tracker(self):
+ m = zmq.Frame(b'asdf', track=False)
+ self.assertEqual(m.tracker, None)
+ m2 = copy.copy(m)
+ self.assertEqual(m2.tracker, None)
+ self.assertRaises(ValueError, zmq.MessageTracker, m)
+
+ @skip_pypy
+ def test_multi_tracker(self):
+ m = zmq.Frame(b'asdf', track=True)
+ m2 = zmq.Frame(b'whoda', track=True)
+ mt = zmq.MessageTracker(m,m2)
+ self.assertFalse(m.tracker.done)
+ self.assertFalse(mt.done)
+ self.assertRaises(zmq.NotDone, mt.wait, 0.1)
+ del m
+ time.sleep(0.1)
+ self.assertRaises(zmq.NotDone, mt.wait, 0.1)
+ self.assertFalse(mt.done)
+ del m2
+ self.assertTrue(mt.wait() is None)
+ self.assertTrue(mt.done)
+
+
+ def test_buffer_in(self):
+ """test using a buffer as input"""
+ ins = b("§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√")
+ m = zmq.Frame(view(ins))
+
+ def test_bad_buffer_in(self):
+ """test using a bad object"""
+ self.assertRaises(TypeError, zmq.Frame, 5)
+ self.assertRaises(TypeError, zmq.Frame, object())
+
+ def test_buffer_out(self):
+ """receiving buffered output"""
+ ins = b("§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√")
+ m = zmq.Frame(ins)
+ outb = m.buffer
+ self.assertTrue(isinstance(outb, view))
+ self.assert_(outb is m.buffer)
+ self.assert_(m.buffer is m.buffer)
+
+ def test_multisend(self):
+ """ensure that a message remains intact after multiple sends"""
+ a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ s = b"message"
+ m = zmq.Frame(s)
+ self.assertEqual(s, m.bytes)
+
+ a.send(m, copy=False)
+ time.sleep(0.1)
+ self.assertEqual(s, m.bytes)
+ a.send(m, copy=False)
+ time.sleep(0.1)
+ self.assertEqual(s, m.bytes)
+ a.send(m, copy=True)
+ time.sleep(0.1)
+ self.assertEqual(s, m.bytes)
+ a.send(m, copy=True)
+ time.sleep(0.1)
+ self.assertEqual(s, m.bytes)
+ for i in range(4):
+ r = b.recv()
+ self.assertEqual(s,r)
+ self.assertEqual(s, m.bytes)
+
+ def test_buffer_numpy(self):
+ """test non-copying numpy array messages"""
+ try:
+ import numpy
+ except ImportError:
+ raise SkipTest("numpy required")
+ rand = numpy.random.randint
+ shapes = [ rand(2,16) for i in range(5) ]
+ for i in range(1,len(shapes)+1):
+ shape = shapes[:i]
+ A = numpy.random.random(shape)
+ m = zmq.Frame(A)
+ if view.__name__ == 'buffer':
+ self.assertEqual(A.data, m.buffer)
+ B = numpy.frombuffer(m.buffer,dtype=A.dtype).reshape(A.shape)
+ else:
+ self.assertEqual(memoryview(A), m.buffer)
+ B = numpy.array(m.buffer,dtype=A.dtype).reshape(A.shape)
+ self.assertEqual((A==B).all(), True)
+
+ def test_memoryview(self):
+ """test messages from memoryview"""
+ major,minor = sys.version_info[:2]
+ if not (major >= 3 or (major == 2 and minor >= 7)):
+ raise SkipTest("memoryviews only in python >= 2.7")
+
+ s = b'carrotjuice'
+ v = memoryview(s)
+ m = zmq.Frame(s)
+ buf = m.buffer
+ s2 = buf.tobytes()
+ self.assertEqual(s2,s)
+ self.assertEqual(m.bytes,s)
+
+ def test_noncopying_recv(self):
+ """check for clobbering message buffers"""
+ null = b'\0'*64
+ sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ for i in range(32):
+ # try a few times
+ sb.send(null, copy=False)
+ m = sa.recv(copy=False)
+ mb = m.bytes
+ # buf = view(m)
+ buf = m.buffer
+ del m
+ for i in range(5):
+ ff=b'\xff'*(40 + i*10)
+ sb.send(ff, copy=False)
+ m2 = sa.recv(copy=False)
+ if view.__name__ == 'buffer':
+ b = bytes(buf)
+ else:
+ b = buf.tobytes()
+ self.assertEqual(b, null)
+ self.assertEqual(mb, null)
+ self.assertEqual(m2.bytes, ff)
+
+ @skip_pypy
+ def test_buffer_numpy(self):
+ """test non-copying numpy array messages"""
+ try:
+ import numpy
+ except ImportError:
+ raise SkipTest("requires numpy")
+ if sys.version_info < (2,7):
+ raise SkipTest("requires new-style buffer interface (py >= 2.7)")
+ rand = numpy.random.randint
+ shapes = [ rand(2,5) for i in range(5) ]
+ a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ dtypes = [int, float, '>i4', 'B']
+ for i in range(1,len(shapes)+1):
+ shape = shapes[:i]
+ for dt in dtypes:
+ A = numpy.empty(shape, dtype=dt)
+ while numpy.isnan(A).any():
+ # don't let nan sneak in
+ A = numpy.ndarray(shape, dtype=dt)
+ a.send(A, copy=False)
+ msg = b.recv(copy=False)
+
+ B = numpy.frombuffer(msg, A.dtype).reshape(A.shape)
+ self.assertEqual(A.shape, B.shape)
+ self.assertTrue((A==B).all())
+ A = numpy.empty(shape, dtype=[('a', int), ('b', float), ('c', 'a32')])
+ A['a'] = 1024
+ A['b'] = 1e9
+ A['c'] = 'hello there'
+ a.send(A, copy=False)
+ msg = b.recv(copy=False)
+
+ B = numpy.frombuffer(msg, A.dtype).reshape(A.shape)
+ self.assertEqual(A.shape, B.shape)
+ self.assertTrue((A==B).all())
+
+ def test_frame_more(self):
+ """test Frame.more attribute"""
+ frame = zmq.Frame(b"hello")
+ self.assertFalse(frame.more)
+ sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ sa.send_multipart([b'hi', b'there'])
+ frame = self.recv(sb, copy=False)
+ self.assertTrue(frame.more)
+ if zmq.zmq_version_info()[0] >= 3 and not PYPY:
+ self.assertTrue(frame.get(zmq.MORE))
+ frame = self.recv(sb, copy=False)
+ self.assertFalse(frame.more)
+ if zmq.zmq_version_info()[0] >= 3 and not PYPY:
+ self.assertFalse(frame.get(zmq.MORE))
+
diff --git a/external_libs/python/zmq/tests/test_monitor.py b/external_libs/python/zmq/tests/test_monitor.py
new file mode 100644
index 00000000..4f035388
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_monitor.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import sys
+import time
+import struct
+
+from unittest import TestCase
+
+import zmq
+from zmq.tests import BaseZMQTestCase, skip_if, skip_pypy
+from zmq.utils.monitor import recv_monitor_message
+
+skip_lt_4 = skip_if(zmq.zmq_version_info() < (4,), "requires zmq >= 4")
+
+class TestSocketMonitor(BaseZMQTestCase):
+
+ @skip_lt_4
+ def test_monitor(self):
+ """Test monitoring interface for sockets."""
+ s_rep = self.context.socket(zmq.REP)
+ s_req = self.context.socket(zmq.REQ)
+ self.sockets.extend([s_rep, s_req])
+ s_req.bind("tcp://127.0.0.1:6666")
+ # try monitoring the REP socket
+
+ s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
+ # create listening socket for monitor
+ s_event = self.context.socket(zmq.PAIR)
+ self.sockets.append(s_event)
+ s_event.connect("inproc://monitor.rep")
+ s_event.linger = 0
+ # test receive event for connect event
+ s_rep.connect("tcp://127.0.0.1:6666")
+ m = recv_monitor_message(s_event)
+ if m['event'] == zmq.EVENT_CONNECT_DELAYED:
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
+ # test receive event for connected event
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
+
+ # test monitor can be disabled.
+ s_rep.disable_monitor()
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
+
+
+ @skip_lt_4
+ def test_monitor_connected(self):
+ """Test connected monitoring socket."""
+ s_rep = self.context.socket(zmq.REP)
+ s_req = self.context.socket(zmq.REQ)
+ self.sockets.extend([s_rep, s_req])
+ s_req.bind("tcp://127.0.0.1:6667")
+ # try monitoring the REP socket
+ # create listening socket for monitor
+ s_event = s_rep.get_monitor_socket()
+ s_event.linger = 0
+ self.sockets.append(s_event)
+ # test receive event for connect event
+ s_rep.connect("tcp://127.0.0.1:6667")
+ m = recv_monitor_message(s_event)
+ if m['event'] == zmq.EVENT_CONNECT_DELAYED:
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
+ # test receive event for connected event
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
diff --git a/external_libs/python/zmq/tests/test_monqueue.py b/external_libs/python/zmq/tests/test_monqueue.py
new file mode 100644
index 00000000..e855602e
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_monqueue.py
@@ -0,0 +1,227 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import time
+from unittest import TestCase
+
+import zmq
+from zmq import devices
+
+from zmq.tests import BaseZMQTestCase, SkipTest, PYPY
+from zmq.utils.strtypes import unicode
+
+
+if PYPY or zmq.zmq_version_info() >= (4,1):
+ # cleanup of shared Context doesn't work on PyPy
+ # there also seems to be a bug in cleanup in libzmq-4.1 (zeromq/libzmq#1052)
+ devices.Device.context_factory = zmq.Context
+
+
+class TestMonitoredQueue(BaseZMQTestCase):
+
+ sockets = []
+
+ def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
+ self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
+ in_prefix, out_prefix)
+ alice = self.context.socket(zmq.PAIR)
+ bob = self.context.socket(zmq.PAIR)
+ mon = self.context.socket(zmq.SUB)
+
+ aport = alice.bind_to_random_port('tcp://127.0.0.1')
+ bport = bob.bind_to_random_port('tcp://127.0.0.1')
+ mport = mon.bind_to_random_port('tcp://127.0.0.1')
+ mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
+
+ self.device.connect_in("tcp://127.0.0.1:%i"%aport)
+ self.device.connect_out("tcp://127.0.0.1:%i"%bport)
+ self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
+ self.device.start()
+ time.sleep(.2)
+ try:
+ # this is currenlty necessary to ensure no dropped monitor messages
+ # see LIBZMQ-248 for more info
+ mon.recv_multipart(zmq.NOBLOCK)
+ except zmq.ZMQError:
+ pass
+ self.sockets.extend([alice, bob, mon])
+ return alice, bob, mon
+
+
+ def teardown_device(self):
+ for socket in self.sockets:
+ socket.close()
+ del socket
+ del self.device
+
+ def test_reply(self):
+ alice, bob, mon = self.build_device()
+ alices = b"hello bob".split()
+ alice.send_multipart(alices)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices, bobs)
+ bobs = b"hello alice".split()
+ bob.send_multipart(bobs)
+ alices = self.recv_multipart(alice)
+ self.assertEqual(alices, bobs)
+ self.teardown_device()
+
+ def test_queue(self):
+ alice, bob, mon = self.build_device()
+ alices = b"hello bob".split()
+ alice.send_multipart(alices)
+ alices2 = b"hello again".split()
+ alice.send_multipart(alices2)
+ alices3 = b"hello again and again".split()
+ alice.send_multipart(alices3)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices2, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices3, bobs)
+ bobs = b"hello alice".split()
+ bob.send_multipart(bobs)
+ alices = self.recv_multipart(alice)
+ self.assertEqual(alices, bobs)
+ self.teardown_device()
+
+ def test_monitor(self):
+ alice, bob, mon = self.build_device()
+ alices = b"hello bob".split()
+ alice.send_multipart(alices)
+ alices2 = b"hello again".split()
+ alice.send_multipart(alices2)
+ alices3 = b"hello again and again".split()
+ alice.send_multipart(alices3)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'in']+bobs, mons)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices2, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices3, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'in']+alices2, mons)
+ bobs = b"hello alice".split()
+ bob.send_multipart(bobs)
+ alices = self.recv_multipart(alice)
+ self.assertEqual(alices, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'in']+alices3, mons)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'out']+bobs, mons)
+ self.teardown_device()
+
+ def test_prefix(self):
+ alice, bob, mon = self.build_device(b"", b'foo', b'bar')
+ alices = b"hello bob".split()
+ alice.send_multipart(alices)
+ alices2 = b"hello again".split()
+ alice.send_multipart(alices2)
+ alices3 = b"hello again and again".split()
+ alice.send_multipart(alices3)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'foo']+bobs, mons)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices2, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices3, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'foo']+alices2, mons)
+ bobs = b"hello alice".split()
+ bob.send_multipart(bobs)
+ alices = self.recv_multipart(alice)
+ self.assertEqual(alices, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'foo']+alices3, mons)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'bar']+bobs, mons)
+ self.teardown_device()
+
+ def test_monitor_subscribe(self):
+ alice, bob, mon = self.build_device(b"out")
+ alices = b"hello bob".split()
+ alice.send_multipart(alices)
+ alices2 = b"hello again".split()
+ alice.send_multipart(alices2)
+ alices3 = b"hello again and again".split()
+ alice.send_multipart(alices3)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices2, bobs)
+ bobs = self.recv_multipart(bob)
+ self.assertEqual(alices3, bobs)
+ bobs = b"hello alice".split()
+ bob.send_multipart(bobs)
+ alices = self.recv_multipart(alice)
+ self.assertEqual(alices, bobs)
+ mons = self.recv_multipart(mon)
+ self.assertEqual([b'out']+bobs, mons)
+ self.teardown_device()
+
+ def test_router_router(self):
+ """test router-router MQ devices"""
+ dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
+ self.device = dev
+ dev.setsockopt_in(zmq.LINGER, 0)
+ dev.setsockopt_out(zmq.LINGER, 0)
+ dev.setsockopt_mon(zmq.LINGER, 0)
+
+ binder = self.context.socket(zmq.DEALER)
+ porta = binder.bind_to_random_port('tcp://127.0.0.1')
+ portb = binder.bind_to_random_port('tcp://127.0.0.1')
+ binder.close()
+ time.sleep(0.1)
+ a = self.context.socket(zmq.DEALER)
+ a.identity = b'a'
+ b = self.context.socket(zmq.DEALER)
+ b.identity = b'b'
+ self.sockets.extend([a, b])
+
+ a.connect('tcp://127.0.0.1:%i'%porta)
+ dev.bind_in('tcp://127.0.0.1:%i'%porta)
+ b.connect('tcp://127.0.0.1:%i'%portb)
+ dev.bind_out('tcp://127.0.0.1:%i'%portb)
+ dev.start()
+ time.sleep(0.2)
+ if zmq.zmq_version_info() >= (3,1,0):
+ # flush erroneous poll state, due to LIBZMQ-280
+ ping_msg = [ b'ping', b'pong' ]
+ for s in (a,b):
+ s.send_multipart(ping_msg)
+ try:
+ s.recv(zmq.NOBLOCK)
+ except zmq.ZMQError:
+ pass
+ msg = [ b'hello', b'there' ]
+ a.send_multipart([b'b']+msg)
+ bmsg = self.recv_multipart(b)
+ self.assertEqual(bmsg, [b'a']+msg)
+ b.send_multipart(bmsg)
+ amsg = self.recv_multipart(a)
+ self.assertEqual(amsg, [b'b']+msg)
+ self.teardown_device()
+
+ def test_default_mq_args(self):
+ self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
+ dev.setsockopt_in(zmq.LINGER, 0)
+ dev.setsockopt_out(zmq.LINGER, 0)
+ dev.setsockopt_mon(zmq.LINGER, 0)
+ # this will raise if default args are wrong
+ dev.start()
+ self.teardown_device()
+
+ def test_mq_check_prefix(self):
+ ins = self.context.socket(zmq.ROUTER)
+ outs = self.context.socket(zmq.DEALER)
+ mons = self.context.socket(zmq.PUB)
+ self.sockets.extend([ins, outs, mons])
+
+ ins = unicode('in')
+ outs = unicode('out')
+ self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
diff --git a/external_libs/python/zmq/tests/test_multipart.py b/external_libs/python/zmq/tests/test_multipart.py
new file mode 100644
index 00000000..24d41be0
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_multipart.py
@@ -0,0 +1,35 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import zmq
+
+
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
+
+
+class TestMultipart(BaseZMQTestCase):
+
+ def test_router_dealer(self):
+ router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
+
+ msg1 = b'message1'
+ dealer.send(msg1)
+ ident = self.recv(router)
+ more = router.rcvmore
+ self.assertEqual(more, True)
+ msg2 = self.recv(router)
+ self.assertEqual(msg1, msg2)
+ more = router.rcvmore
+ self.assertEqual(more, False)
+
+ def test_basic_multipart(self):
+ a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ msg = [ b'hi', b'there', b'b']
+ a.send_multipart(msg)
+ recvd = b.recv_multipart()
+ self.assertEqual(msg, recvd)
+
+if have_gevent:
+ class TestMultipartGreen(GreenTest, TestMultipart):
+ pass
diff --git a/external_libs/python/zmq/tests/test_pair.py b/external_libs/python/zmq/tests/test_pair.py
new file mode 100644
index 00000000..e88c1e8b
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_pair.py
@@ -0,0 +1,53 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import zmq
+
+
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
+
+
+x = b' '
+class TestPair(BaseZMQTestCase):
+
+ def test_basic(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ msg1 = b'message1'
+ msg2 = self.ping_pong(s1, s2, msg1)
+ self.assertEqual(msg1, msg2)
+
+ def test_multiple(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ for i in range(10):
+ msg = i*x
+ s1.send(msg)
+
+ for i in range(10):
+ msg = i*x
+ s2.send(msg)
+
+ for i in range(10):
+ msg = s1.recv()
+ self.assertEqual(msg, i*x)
+
+ for i in range(10):
+ msg = s2.recv()
+ self.assertEqual(msg, i*x)
+
+ def test_json(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ o = dict(a=10,b=list(range(10)))
+ o2 = self.ping_pong_json(s1, s2, o)
+
+ def test_pyobj(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ o = dict(a=10,b=range(10))
+ o2 = self.ping_pong_pyobj(s1, s2, o)
+
+if have_gevent:
+ class TestReqRepGreen(GreenTest, TestPair):
+ pass
+
diff --git a/external_libs/python/zmq/tests/test_poll.py b/external_libs/python/zmq/tests/test_poll.py
new file mode 100644
index 00000000..57346c89
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_poll.py
@@ -0,0 +1,229 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import time
+from unittest import TestCase
+
+import zmq
+
+from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
+
+def wait():
+ time.sleep(.25)
+
+
+class TestPoll(PollZMQTestCase):
+
+ Poller = zmq.Poller
+
+ # This test is failing due to this issue:
+ # http://github.com/sustrik/zeromq2/issues#issue/26
+ def test_pair(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+ # Poll result should contain both sockets
+ socks = dict(poller.poll())
+ # Now make sure that both are send ready.
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+ # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
+ s1.send(b'msg1')
+ s2.send(b'msg2')
+ wait()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
+ self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
+ # Make sure that both are in POLLOUT after recv.
+ s1.recv()
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+
+ def test_reqrep(self):
+ s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+
+ # Make sure that s1 is in state 0 and s2 is in POLLOUT
+ socks = dict(poller.poll())
+ self.assertEqual(s1 in socks, 0)
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ # Make sure that s2 goes immediately into state 0 after send.
+ s2.send(b'msg1')
+ socks = dict(poller.poll())
+ self.assertEqual(s2 in socks, 0)
+
+ # Make sure that s1 goes into POLLIN state after a time.sleep().
+ time.sleep(0.5)
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLIN)
+
+ # Make sure that s1 goes into POLLOUT after recv.
+ s1.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+
+ # Make sure s1 goes into state 0 after send.
+ s1.send(b'msg2')
+ socks = dict(poller.poll())
+ self.assertEqual(s1 in socks, 0)
+
+ # Wait and then see that s2 is in POLLIN.
+ time.sleep(0.5)
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLIN)
+
+ # Make sure that s2 is in POLLOUT after recv.
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLOUT)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+
+ def test_no_events(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, 0)
+ self.assertTrue(s1 in poller)
+ self.assertFalse(s2 in poller)
+ poller.register(s1, 0)
+ self.assertFalse(s1 in poller)
+
+ def test_pubsub(self):
+ s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ s2.setsockopt(zmq.SUBSCRIBE, b'')
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+ poller.register(s2, zmq.POLLIN)
+
+ # Now make sure that both are send ready.
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+ self.assertEqual(s2 in socks, 0)
+ # Make sure that s1 stays in POLLOUT after a send.
+ s1.send(b'msg1')
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s1], zmq.POLLOUT)
+
+ # Make sure that s2 is POLLIN after waiting.
+ wait()
+ socks = dict(poller.poll())
+ self.assertEqual(socks[s2], zmq.POLLIN)
+
+ # Make sure that s2 goes into 0 after recv.
+ s2.recv()
+ socks = dict(poller.poll())
+ self.assertEqual(s2 in socks, 0)
+
+ poller.unregister(s1)
+ poller.unregister(s2)
+
+ # Wait for everything to finish.
+ wait()
+ def test_timeout(self):
+ """make sure Poller.poll timeout has the right units (milliseconds)."""
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s1, zmq.POLLIN)
+ tic = time.time()
+ evt = poller.poll(.005)
+ toc = time.time()
+ self.assertTrue(toc-tic < 0.1)
+ tic = time.time()
+ evt = poller.poll(5)
+ toc = time.time()
+ self.assertTrue(toc-tic < 0.1)
+ self.assertTrue(toc-tic > .001)
+ tic = time.time()
+ evt = poller.poll(500)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.1)
+
+class TestSelect(PollZMQTestCase):
+
+ def test_pair(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ # Sleep to allow sockets to connect.
+ wait()
+
+ rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2])
+ self.assert_(s1 in wlist)
+ self.assert_(s2 in wlist)
+ self.assert_(s1 not in rlist)
+ self.assert_(s2 not in rlist)
+
+ def test_timeout(self):
+ """make sure select timeout has the right units (seconds)."""
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ tic = time.time()
+ r,w,x = zmq.select([s1,s2],[],[],.005)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.001)
+ tic = time.time()
+ r,w,x = zmq.select([s1,s2],[],[],.25)
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+ self.assertTrue(toc-tic > 0.1)
+
+
+if have_gevent:
+ import gevent
+ from zmq import green as gzmq
+
+ class TestPollGreen(GreenTest, TestPoll):
+ Poller = gzmq.Poller
+
+ def test_wakeup(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ poller = self.Poller()
+ poller.register(s2, zmq.POLLIN)
+
+ tic = time.time()
+ r = gevent.spawn(lambda: poller.poll(10000))
+ s = gevent.spawn(lambda: s1.send(b'msg1'))
+ r.join()
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+
+ def test_socket_poll(self):
+ s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
+ tic = time.time()
+ r = gevent.spawn(lambda: s2.poll(10000))
+ s = gevent.spawn(lambda: s1.send(b'msg1'))
+ r.join()
+ toc = time.time()
+ self.assertTrue(toc-tic < 1)
+
diff --git a/external_libs/python/zmq/tests/test_pubsub.py b/external_libs/python/zmq/tests/test_pubsub.py
new file mode 100644
index 00000000..a3ee22aa
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_pubsub.py
@@ -0,0 +1,41 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import time
+from unittest import TestCase
+
+import zmq
+
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
+
+
+class TestPubSub(BaseZMQTestCase):
+
+ pass
+
+ # We are disabling this test while an issue is being resolved.
+ def test_basic(self):
+ s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ s2.setsockopt(zmq.SUBSCRIBE,b'')
+ time.sleep(0.1)
+ msg1 = b'message'
+ s1.send(msg1)
+ msg2 = s2.recv() # This is blocking!
+ self.assertEqual(msg1, msg2)
+
+ def test_topic(self):
+ s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ s2.setsockopt(zmq.SUBSCRIBE, b'x')
+ time.sleep(0.1)
+ msg1 = b'message'
+ s1.send(msg1)
+ self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
+ msg1 = b'xmessage'
+ s1.send(msg1)
+ msg2 = s2.recv()
+ self.assertEqual(msg1, msg2)
+
+if have_gevent:
+ class TestPubSubGreen(GreenTest, TestPubSub):
+ pass
diff --git a/external_libs/python/zmq/tests/test_reqrep.py b/external_libs/python/zmq/tests/test_reqrep.py
new file mode 100644
index 00000000..de17f2b3
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_reqrep.py
@@ -0,0 +1,62 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+from unittest import TestCase
+
+import zmq
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
+
+
+class TestReqRep(BaseZMQTestCase):
+
+ def test_basic(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+
+ msg1 = b'message 1'
+ msg2 = self.ping_pong(s1, s2, msg1)
+ self.assertEqual(msg1, msg2)
+
+ def test_multiple(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+
+ for i in range(10):
+ msg1 = i*b' '
+ msg2 = self.ping_pong(s1, s2, msg1)
+ self.assertEqual(msg1, msg2)
+
+ def test_bad_send_recv(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+
+ if zmq.zmq_version() != '2.1.8':
+ # this doesn't work on 2.1.8
+ for copy in (True,False):
+ self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
+ self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
+
+ # I have to have this or we die on an Abort trap.
+ msg1 = b'asdf'
+ msg2 = self.ping_pong(s1, s2, msg1)
+ self.assertEqual(msg1, msg2)
+
+ def test_json(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+ o = dict(a=10,b=list(range(10)))
+ o2 = self.ping_pong_json(s1, s2, o)
+
+ def test_pyobj(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+ o = dict(a=10,b=range(10))
+ o2 = self.ping_pong_pyobj(s1, s2, o)
+
+ def test_large_msg(self):
+ s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
+ msg1 = 10000*b'X'
+
+ for i in range(10):
+ msg2 = self.ping_pong(s1, s2, msg1)
+ self.assertEqual(msg1, msg2)
+
+if have_gevent:
+ class TestReqRepGreen(GreenTest, TestReqRep):
+ pass
diff --git a/external_libs/python/zmq/tests/test_security.py b/external_libs/python/zmq/tests/test_security.py
new file mode 100644
index 00000000..687b7e0f
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_security.py
@@ -0,0 +1,212 @@
+"""Test libzmq security (libzmq >= 3.3.0)"""
+# -*- coding: utf8 -*-
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import os
+from threading import Thread
+
+import zmq
+from zmq.tests import (
+ BaseZMQTestCase, SkipTest, PYPY
+)
+from zmq.utils import z85
+
+
+USER = b"admin"
+PASS = b"password"
+
+class TestSecurity(BaseZMQTestCase):
+
+ def setUp(self):
+ if zmq.zmq_version_info() < (4,0):
+ raise SkipTest("security is new in libzmq 4.0")
+ try:
+ zmq.curve_keypair()
+ except zmq.ZMQError:
+ raise SkipTest("security requires libzmq to be linked against libsodium")
+ super(TestSecurity, self).setUp()
+
+
+ def zap_handler(self):
+ socket = self.context.socket(zmq.REP)
+ socket.bind("inproc://zeromq.zap.01")
+ try:
+ msg = self.recv_multipart(socket)
+
+ version, sequence, domain, address, identity, mechanism = msg[:6]
+ if mechanism == b'PLAIN':
+ username, password = msg[6:]
+ elif mechanism == b'CURVE':
+ key = msg[6]
+
+ self.assertEqual(version, b"1.0")
+ self.assertEqual(identity, b"IDENT")
+ reply = [version, sequence]
+ if mechanism == b'CURVE' or \
+ (mechanism == b'PLAIN' and username == USER and password == PASS) or \
+ (mechanism == b'NULL'):
+ reply.extend([
+ b"200",
+ b"OK",
+ b"anonymous",
+ b"\5Hello\0\0\0\5World",
+ ])
+ else:
+ reply.extend([
+ b"400",
+ b"Invalid username or password",
+ b"",
+ b"",
+ ])
+ socket.send_multipart(reply)
+ finally:
+ socket.close()
+
+ def start_zap(self):
+ self.zap_thread = Thread(target=self.zap_handler)
+ self.zap_thread.start()
+
+ def stop_zap(self):
+ self.zap_thread.join()
+
+ def bounce(self, server, client, test_metadata=True):
+ msg = [os.urandom(64), os.urandom(64)]
+ client.send_multipart(msg)
+ frames = self.recv_multipart(server, copy=False)
+ recvd = list(map(lambda x: x.bytes, frames))
+
+ try:
+ if test_metadata and not PYPY:
+ for frame in frames:
+ self.assertEqual(frame.get('User-Id'), 'anonymous')
+ self.assertEqual(frame.get('Hello'), 'World')
+ self.assertEqual(frame['Socket-Type'], 'DEALER')
+ except zmq.ZMQVersionError:
+ pass
+
+ self.assertEqual(recvd, msg)
+ server.send_multipart(recvd)
+ msg2 = self.recv_multipart(client)
+ self.assertEqual(msg2, msg)
+
+ def test_null(self):
+ """test NULL (default) security"""
+ server = self.socket(zmq.DEALER)
+ client = self.socket(zmq.DEALER)
+ self.assertEqual(client.MECHANISM, zmq.NULL)
+ self.assertEqual(server.mechanism, zmq.NULL)
+ self.assertEqual(client.plain_server, 0)
+ self.assertEqual(server.plain_server, 0)
+ iface = 'tcp://127.0.0.1'
+ port = server.bind_to_random_port(iface)
+ client.connect("%s:%i" % (iface, port))
+ self.bounce(server, client, False)
+
+ def test_plain(self):
+ """test PLAIN authentication"""
+ server = self.socket(zmq.DEALER)
+ server.identity = b'IDENT'
+ client = self.socket(zmq.DEALER)
+ self.assertEqual(client.plain_username, b'')
+ self.assertEqual(client.plain_password, b'')
+ client.plain_username = USER
+ client.plain_password = PASS
+ self.assertEqual(client.getsockopt(zmq.PLAIN_USERNAME), USER)
+ self.assertEqual(client.getsockopt(zmq.PLAIN_PASSWORD), PASS)
+ self.assertEqual(client.plain_server, 0)
+ self.assertEqual(server.plain_server, 0)
+ server.plain_server = True
+ self.assertEqual(server.mechanism, zmq.PLAIN)
+ self.assertEqual(client.mechanism, zmq.PLAIN)
+
+ assert not client.plain_server
+ assert server.plain_server
+
+ self.start_zap()
+
+ iface = 'tcp://127.0.0.1'
+ port = server.bind_to_random_port(iface)
+ client.connect("%s:%i" % (iface, port))
+ self.bounce(server, client)
+ self.stop_zap()
+
+ def skip_plain_inauth(self):
+ """test PLAIN failed authentication"""
+ server = self.socket(zmq.DEALER)
+ server.identity = b'IDENT'
+ client = self.socket(zmq.DEALER)
+ self.sockets.extend([server, client])
+ client.plain_username = USER
+ client.plain_password = b'incorrect'
+ server.plain_server = True
+ self.assertEqual(server.mechanism, zmq.PLAIN)
+ self.assertEqual(client.mechanism, zmq.PLAIN)
+
+ self.start_zap()
+
+ iface = 'tcp://127.0.0.1'
+ port = server.bind_to_random_port(iface)
+ client.connect("%s:%i" % (iface, port))
+ client.send(b'ping')
+ server.rcvtimeo = 250
+ self.assertRaisesErrno(zmq.EAGAIN, server.recv)
+ self.stop_zap()
+
+ def test_keypair(self):
+ """test curve_keypair"""
+ try:
+ public, secret = zmq.curve_keypair()
+ except zmq.ZMQError:
+ raise SkipTest("CURVE unsupported")
+
+ self.assertEqual(type(secret), bytes)
+ self.assertEqual(type(public), bytes)
+ self.assertEqual(len(secret), 40)
+ self.assertEqual(len(public), 40)
+
+ # verify that it is indeed Z85
+ bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
+ self.assertEqual(type(bsecret), bytes)
+ self.assertEqual(type(bpublic), bytes)
+ self.assertEqual(len(bsecret), 32)
+ self.assertEqual(len(bpublic), 32)
+
+
+ def test_curve(self):
+ """test CURVE encryption"""
+ server = self.socket(zmq.DEALER)
+ server.identity = b'IDENT'
+ client = self.socket(zmq.DEALER)
+ self.sockets.extend([server, client])
+ try:
+ server.curve_server = True
+ except zmq.ZMQError as e:
+ # will raise EINVAL if not linked against libsodium
+ if e.errno == zmq.EINVAL:
+ raise SkipTest("CURVE unsupported")
+
+ server_public, server_secret = zmq.curve_keypair()
+ client_public, client_secret = zmq.curve_keypair()
+
+ server.curve_secretkey = server_secret
+ server.curve_publickey = server_public
+ client.curve_serverkey = server_public
+ client.curve_publickey = client_public
+ client.curve_secretkey = client_secret
+
+ self.assertEqual(server.mechanism, zmq.CURVE)
+ self.assertEqual(client.mechanism, zmq.CURVE)
+
+ self.assertEqual(server.get(zmq.CURVE_SERVER), True)
+ self.assertEqual(client.get(zmq.CURVE_SERVER), False)
+
+ self.start_zap()
+
+ iface = 'tcp://127.0.0.1'
+ port = server.bind_to_random_port(iface)
+ client.connect("%s:%i" % (iface, port))
+ self.bounce(server, client)
+ self.stop_zap()
+
diff --git a/external_libs/python/zmq/tests/test_socket.py b/external_libs/python/zmq/tests/test_socket.py
new file mode 100644
index 00000000..5c842edc
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_socket.py
@@ -0,0 +1,450 @@
+# -*- coding: utf8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+import time
+import warnings
+
+import zmq
+from zmq.tests import (
+ BaseZMQTestCase, SkipTest, have_gevent, GreenTest, skip_pypy, skip_if
+)
+from zmq.utils.strtypes import bytes, unicode
+
+
+class TestSocket(BaseZMQTestCase):
+
+ def test_create(self):
+ ctx = self.Context()
+ s = ctx.socket(zmq.PUB)
+ # Superluminal protocol not yet implemented
+ self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a')
+ self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://a')
+ self.assertRaisesErrno(zmq.EINVAL, s.bind, 'tcp://')
+ s.close()
+ del ctx
+
+ def test_context_manager(self):
+ url = 'inproc://a'
+ with self.Context() as ctx:
+ with ctx.socket(zmq.PUSH) as a:
+ a.bind(url)
+ with ctx.socket(zmq.PULL) as b:
+ b.connect(url)
+ msg = b'hi'
+ a.send(msg)
+ rcvd = self.recv(b)
+ self.assertEqual(rcvd, msg)
+ self.assertEqual(b.closed, True)
+ self.assertEqual(a.closed, True)
+ self.assertEqual(ctx.closed, True)
+
+ def test_dir(self):
+ ctx = self.Context()
+ s = ctx.socket(zmq.PUB)
+ self.assertTrue('send' in dir(s))
+ self.assertTrue('IDENTITY' in dir(s))
+ self.assertTrue('AFFINITY' in dir(s))
+ self.assertTrue('FD' in dir(s))
+ s.close()
+ ctx.term()
+
+ def test_bind_unicode(self):
+ s = self.socket(zmq.PUB)
+ p = s.bind_to_random_port(unicode("tcp://*"))
+
+ def test_connect_unicode(self):
+ s = self.socket(zmq.PUB)
+ s.connect(unicode("tcp://127.0.0.1:5555"))
+
+ def test_bind_to_random_port(self):
+ # Check that bind_to_random_port do not hide usefull exception
+ ctx = self.Context()
+ c = ctx.socket(zmq.PUB)
+ # Invalid format
+ try:
+ c.bind_to_random_port('tcp:*')
+ except zmq.ZMQError as e:
+ self.assertEqual(e.errno, zmq.EINVAL)
+ # Invalid protocol
+ try:
+ c.bind_to_random_port('rand://*')
+ except zmq.ZMQError as e:
+ self.assertEqual(e.errno, zmq.EPROTONOSUPPORT)
+
+ def test_identity(self):
+ s = self.context.socket(zmq.PULL)
+ self.sockets.append(s)
+ ident = b'identity\0\0'
+ s.identity = ident
+ self.assertEqual(s.get(zmq.IDENTITY), ident)
+
+ def test_unicode_sockopts(self):
+ """test setting/getting sockopts with unicode strings"""
+ topic = "tést"
+ if str is not unicode:
+ topic = topic.decode('utf8')
+ p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ self.assertEqual(s.send_unicode, s.send_unicode)
+ self.assertEqual(p.recv_unicode, p.recv_unicode)
+ self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
+ self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
+ s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
+ self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
+ s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
+ self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
+ self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
+
+ identb = s.getsockopt(zmq.IDENTITY)
+ identu = identb.decode('utf16')
+ identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
+ self.assertEqual(identu, identu2)
+ time.sleep(0.1) # wait for connection/subscription
+ p.send_unicode(topic,zmq.SNDMORE)
+ p.send_unicode(topic*2, encoding='latin-1')
+ self.assertEqual(topic, s.recv_unicode())
+ self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
+
+ def test_int_sockopts(self):
+ "test integer sockopts"
+ v = zmq.zmq_version_info()
+ if v < (3,0):
+ default_hwm = 0
+ else:
+ default_hwm = 1000
+ p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
+ p.setsockopt(zmq.LINGER, 0)
+ self.assertEqual(p.getsockopt(zmq.LINGER), 0)
+ p.setsockopt(zmq.LINGER, -1)
+ self.assertEqual(p.getsockopt(zmq.LINGER), -1)
+ self.assertEqual(p.hwm, default_hwm)
+ p.hwm = 11
+ self.assertEqual(p.hwm, 11)
+ # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
+ self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
+ self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
+ self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
+ self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
+ self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
+ self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
+
+ # check for overflow / wrong type:
+ errors = []
+ backref = {}
+ constants = zmq.constants
+ for name in constants.__all__:
+ value = getattr(constants, name)
+ if isinstance(value, int):
+ backref[value] = name
+ for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
+ sopt = backref[opt]
+ if sopt.startswith((
+ 'ROUTER', 'XPUB', 'TCP', 'FAIL',
+ 'REQ_', 'CURVE_', 'PROBE_ROUTER',
+ 'IPC_FILTER', 'GSSAPI',
+ )):
+ # some sockopts are write-only
+ continue
+ try:
+ n = p.getsockopt(opt)
+ except zmq.ZMQError as e:
+ errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
+ else:
+ if n > 2**31:
+ errors.append("getsockopt(zmq.%s) returned a ridiculous value."
+ " It is probably the wrong type."%sopt)
+ if errors:
+ self.fail('\n'.join([''] + errors))
+
+ def test_bad_sockopts(self):
+ """Test that appropriate errors are raised on bad socket options"""
+ s = self.context.socket(zmq.PUB)
+ self.sockets.append(s)
+ s.setsockopt(zmq.LINGER, 0)
+ # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
+ self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
+ self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
+ # but only int sockopts are allowed through this way, otherwise raise a TypeError
+ self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
+ # some sockopts are valid in general, but not on every socket:
+ self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
+
+ def test_sockopt_roundtrip(self):
+ "test set/getsockopt roundtrip."
+ p = self.context.socket(zmq.PUB)
+ self.sockets.append(p)
+ p.setsockopt(zmq.LINGER, 11)
+ self.assertEqual(p.getsockopt(zmq.LINGER), 11)
+
+ def test_send_unicode(self):
+ "test sending unicode objects"
+ a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+ self.sockets.extend([a,b])
+ u = "çπ§"
+ if str is not unicode:
+ u = u.decode('utf8')
+ self.assertRaises(TypeError, a.send, u,copy=False)
+ self.assertRaises(TypeError, a.send, u,copy=True)
+ a.send_unicode(u)
+ s = b.recv()
+ self.assertEqual(s,u.encode('utf8'))
+ self.assertEqual(s.decode('utf8'),u)
+ a.send_unicode(u,encoding='utf16')
+ s = b.recv_unicode(encoding='utf16')
+ self.assertEqual(s,u)
+
+ @skip_pypy
+ def test_tracker(self):
+ "test the MessageTracker object for tracking when zmq is done with a buffer"
+ addr = 'tcp://127.0.0.1'
+ a = self.context.socket(zmq.PUB)
+ port = a.bind_to_random_port(addr)
+ a.close()
+ iface = "%s:%i"%(addr,port)
+ a = self.context.socket(zmq.PAIR)
+ # a.setsockopt(zmq.IDENTITY, b"a")
+ b = self.context.socket(zmq.PAIR)
+ self.sockets.extend([a,b])
+ a.connect(iface)
+ time.sleep(0.1)
+ p1 = a.send(b'something', copy=False, track=True)
+ self.assertTrue(isinstance(p1, zmq.MessageTracker))
+ self.assertFalse(p1.done)
+ p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
+ self.assert_(isinstance(p2, zmq.MessageTracker))
+ self.assertEqual(p2.done, False)
+ self.assertEqual(p1.done, False)
+
+ b.bind(iface)
+ msg = b.recv_multipart()
+ for i in range(10):
+ if p1.done:
+ break
+ time.sleep(0.1)
+ self.assertEqual(p1.done, True)
+ self.assertEqual(msg, [b'something'])
+ msg = b.recv_multipart()
+ for i in range(10):
+ if p2.done:
+ break
+ time.sleep(0.1)
+ self.assertEqual(p2.done, True)
+ self.assertEqual(msg, [b'something', b'else'])
+ m = zmq.Frame(b"again", track=True)
+ self.assertEqual(m.tracker.done, False)
+ p1 = a.send(m, copy=False)
+ p2 = a.send(m, copy=False)
+ self.assertEqual(m.tracker.done, False)
+ self.assertEqual(p1.done, False)
+ self.assertEqual(p2.done, False)
+ msg = b.recv_multipart()
+ self.assertEqual(m.tracker.done, False)
+ self.assertEqual(msg, [b'again'])
+ msg = b.recv_multipart()
+ self.assertEqual(m.tracker.done, False)
+ self.assertEqual(msg, [b'again'])
+ self.assertEqual(p1.done, False)
+ self.assertEqual(p2.done, False)
+ pm = m.tracker
+ del m
+ for i in range(10):
+ if p1.done:
+ break
+ time.sleep(0.1)
+ self.assertEqual(p1.done, True)
+ self.assertEqual(p2.done, True)
+ m = zmq.Frame(b'something', track=False)
+ self.assertRaises(ValueError, a.send, m, copy=False, track=True)
+
+
+ def test_close(self):
+ ctx = self.Context()
+ s = ctx.socket(zmq.PUB)
+ s.close()
+ self.assertRaisesErrno(zmq.ENOTSOCK, s.bind, b'')
+ self.assertRaisesErrno(zmq.ENOTSOCK, s.connect, b'')
+ self.assertRaisesErrno(zmq.ENOTSOCK, s.setsockopt, zmq.SUBSCRIBE, b'')
+ self.assertRaisesErrno(zmq.ENOTSOCK, s.send, b'asdf')
+ self.assertRaisesErrno(zmq.ENOTSOCK, s.recv)
+ del ctx
+
+ def test_attr(self):
+ """set setting/getting sockopts as attributes"""
+ s = self.context.socket(zmq.DEALER)
+ self.sockets.append(s)
+ linger = 10
+ s.linger = linger
+ self.assertEqual(linger, s.linger)
+ self.assertEqual(linger, s.getsockopt(zmq.LINGER))
+ self.assertEqual(s.fd, s.getsockopt(zmq.FD))
+
+ def test_bad_attr(self):
+ s = self.context.socket(zmq.DEALER)
+ self.sockets.append(s)
+ try:
+ s.apple='foo'
+ except AttributeError:
+ pass
+ else:
+ self.fail("bad setattr should have raised AttributeError")
+ try:
+ s.apple
+ except AttributeError:
+ pass
+ else:
+ self.fail("bad getattr should have raised AttributeError")
+
+ def test_subclass(self):
+ """subclasses can assign attributes"""
+ class S(zmq.Socket):
+ a = None
+ def __init__(self, *a, **kw):
+ self.a=-1
+ super(S, self).__init__(*a, **kw)
+
+ s = S(self.context, zmq.REP)
+ self.sockets.append(s)
+ self.assertEqual(s.a, -1)
+ s.a=1
+ self.assertEqual(s.a, 1)
+ a=s.a
+ self.assertEqual(a, 1)
+
+ def test_recv_multipart(self):
+ a,b = self.create_bound_pair()
+ msg = b'hi'
+ for i in range(3):
+ a.send(msg)
+ time.sleep(0.1)
+ for i in range(3):
+ self.assertEqual(b.recv_multipart(), [msg])
+
+ def test_close_after_destroy(self):
+ """s.close() after ctx.destroy() should be fine"""
+ ctx = self.Context()
+ s = ctx.socket(zmq.REP)
+ ctx.destroy()
+ # reaper is not instantaneous
+ time.sleep(1e-2)
+ s.close()
+ self.assertTrue(s.closed)
+
+ def test_poll(self):
+ a,b = self.create_bound_pair()
+ tic = time.time()
+ evt = a.poll(50)
+ self.assertEqual(evt, 0)
+ evt = a.poll(50, zmq.POLLOUT)
+ self.assertEqual(evt, zmq.POLLOUT)
+ msg = b'hi'
+ a.send(msg)
+ evt = b.poll(50)
+ self.assertEqual(evt, zmq.POLLIN)
+ msg2 = self.recv(b)
+ evt = b.poll(50)
+ self.assertEqual(evt, 0)
+ self.assertEqual(msg2, msg)
+
+ def test_ipc_path_max_length(self):
+ """IPC_PATH_MAX_LEN is a sensible value"""
+ if zmq.IPC_PATH_MAX_LEN == 0:
+ raise SkipTest("IPC_PATH_MAX_LEN undefined")
+
+ msg = "Surprising value for IPC_PATH_MAX_LEN: %s" % zmq.IPC_PATH_MAX_LEN
+ self.assertTrue(zmq.IPC_PATH_MAX_LEN > 30, msg)
+ self.assertTrue(zmq.IPC_PATH_MAX_LEN < 1025, msg)
+
+ def test_ipc_path_max_length_msg(self):
+ if zmq.IPC_PATH_MAX_LEN == 0:
+ raise SkipTest("IPC_PATH_MAX_LEN undefined")
+
+ s = self.context.socket(zmq.PUB)
+ self.sockets.append(s)
+ try:
+ s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
+ except zmq.ZMQError as e:
+ self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
+
+ def test_hwm(self):
+ zmq3 = zmq.zmq_version_info()[0] >= 3
+ for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
+ s = self.context.socket(stype)
+ s.hwm = 100
+ self.assertEqual(s.hwm, 100)
+ if zmq3:
+ try:
+ self.assertEqual(s.sndhwm, 100)
+ except AttributeError:
+ pass
+ try:
+ self.assertEqual(s.rcvhwm, 100)
+ except AttributeError:
+ pass
+ s.close()
+
+ def test_shadow(self):
+ p = self.socket(zmq.PUSH)
+ p.bind("tcp://127.0.0.1:5555")
+ p2 = zmq.Socket.shadow(p.underlying)
+ self.assertEqual(p.underlying, p2.underlying)
+ s = self.socket(zmq.PULL)
+ s2 = zmq.Socket.shadow(s.underlying)
+ self.assertNotEqual(s.underlying, p.underlying)
+ self.assertEqual(s.underlying, s2.underlying)
+ s2.connect("tcp://127.0.0.1:5555")
+ sent = b'hi'
+ p2.send(sent)
+ rcvd = self.recv(s2)
+ self.assertEqual(rcvd, sent)
+
+ def test_shadow_pyczmq(self):
+ try:
+ from pyczmq import zctx, zsocket
+ except Exception:
+ raise SkipTest("Requires pyczmq")
+
+ ctx = zctx.new()
+ ca = zsocket.new(ctx, zmq.PUSH)
+ cb = zsocket.new(ctx, zmq.PULL)
+ a = zmq.Socket.shadow(ca)
+ b = zmq.Socket.shadow(cb)
+ a.bind("inproc://a")
+ b.connect("inproc://a")
+ a.send(b'hi')
+ rcvd = self.recv(b)
+ self.assertEqual(rcvd, b'hi')
+
+
+if have_gevent:
+ import gevent
+
+ class TestSocketGreen(GreenTest, TestSocket):
+ test_bad_attr = GreenTest.skip_green
+ test_close_after_destroy = GreenTest.skip_green
+
+ def test_timeout(self):
+ a,b = self.create_bound_pair()
+ g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
+ timeout = gevent.Timeout(0.1)
+ timeout.start()
+ self.assertRaises(gevent.Timeout, b.recv)
+ g.kill()
+
+ @skip_if(not hasattr(zmq, 'RCVTIMEO'))
+ def test_warn_set_timeo(self):
+ s = self.context.socket(zmq.REQ)
+ with warnings.catch_warnings(record=True) as w:
+ s.rcvtimeo = 5
+ s.close()
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].category, UserWarning)
+
+
+ @skip_if(not hasattr(zmq, 'SNDTIMEO'))
+ def test_warn_get_timeo(self):
+ s = self.context.socket(zmq.REQ)
+ with warnings.catch_warnings(record=True) as w:
+ s.sndtimeo
+ s.close()
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].category, UserWarning)
diff --git a/external_libs/python/zmq/tests/test_stopwatch.py b/external_libs/python/zmq/tests/test_stopwatch.py
new file mode 100644
index 00000000..49fb79f2
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_stopwatch.py
@@ -0,0 +1,42 @@
+# -*- coding: utf8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import sys
+import time
+
+from unittest import TestCase
+
+from zmq import Stopwatch, ZMQError
+
+if sys.version_info[0] >= 3:
+ long = int
+
+class TestStopWatch(TestCase):
+
+ def test_stop_long(self):
+ """Ensure stop returns a long int."""
+ watch = Stopwatch()
+ watch.start()
+ us = watch.stop()
+ self.assertTrue(isinstance(us, long))
+
+ def test_stop_microseconds(self):
+ """Test that stop/sleep have right units."""
+ watch = Stopwatch()
+ watch.start()
+ tic = time.time()
+ watch.sleep(1)
+ us = watch.stop()
+ toc = time.time()
+ self.assertAlmostEqual(us/1e6,(toc-tic),places=0)
+
+ def test_double_stop(self):
+ """Test error raised on multiple calls to stop."""
+ watch = Stopwatch()
+ watch.start()
+ watch.stop()
+ self.assertRaises(ZMQError, watch.stop)
+ self.assertRaises(ZMQError, watch.stop)
+
diff --git a/external_libs/python/zmq/tests/test_version.py b/external_libs/python/zmq/tests/test_version.py
new file mode 100644
index 00000000..6ebebf30
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_version.py
@@ -0,0 +1,44 @@
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+from unittest import TestCase
+import zmq
+from zmq.sugar import version
+
+
+class TestVersion(TestCase):
+
+ def test_pyzmq_version(self):
+ vs = zmq.pyzmq_version()
+ vs2 = zmq.__version__
+ self.assertTrue(isinstance(vs, str))
+ if zmq.__revision__:
+ self.assertEqual(vs, '@'.join(vs2, zmq.__revision__))
+ else:
+ self.assertEqual(vs, vs2)
+ if version.VERSION_EXTRA:
+ self.assertTrue(version.VERSION_EXTRA in vs)
+ self.assertTrue(version.VERSION_EXTRA in vs2)
+
+ def test_pyzmq_version_info(self):
+ info = zmq.pyzmq_version_info()
+ self.assertTrue(isinstance(info, tuple))
+ for n in info[:3]:
+ self.assertTrue(isinstance(n, int))
+ if version.VERSION_EXTRA:
+ self.assertEqual(len(info), 4)
+ self.assertEqual(info[-1], float('inf'))
+ else:
+ self.assertEqual(len(info), 3)
+
+ def test_zmq_version_info(self):
+ info = zmq.zmq_version_info()
+ self.assertTrue(isinstance(info, tuple))
+ for n in info[:3]:
+ self.assertTrue(isinstance(n, int))
+
+ def test_zmq_version(self):
+ v = zmq.zmq_version()
+ self.assertTrue(isinstance(v, str))
+
diff --git a/external_libs/python/zmq/tests/test_win32_shim.py b/external_libs/python/zmq/tests/test_win32_shim.py
new file mode 100644
index 00000000..55657bda
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_win32_shim.py
@@ -0,0 +1,56 @@
+from __future__ import print_function
+
+import os
+
+from functools import wraps
+from zmq.tests import BaseZMQTestCase
+from zmq.utils.win32 import allow_interrupt
+
+
+def count_calls(f):
+ @wraps(f)
+ def _(*args, **kwds):
+ try:
+ return f(*args, **kwds)
+ finally:
+ _.__calls__ += 1
+ _.__calls__ = 0
+ return _
+
+
+class TestWindowsConsoleControlHandler(BaseZMQTestCase):
+
+ def test_handler(self):
+ @count_calls
+ def interrupt_polling():
+ print('Caught CTRL-C!')
+
+ if os.name == 'nt':
+ from ctypes import windll
+ from ctypes.wintypes import BOOL, DWORD
+
+ kernel32 = windll.LoadLibrary('kernel32')
+
+ # <http://msdn.microsoft.com/en-us/library/ms683155.aspx>
+ GenerateConsoleCtrlEvent = kernel32.GenerateConsoleCtrlEvent
+ GenerateConsoleCtrlEvent.argtypes = (DWORD, DWORD)
+ GenerateConsoleCtrlEvent.restype = BOOL
+
+ try:
+ # Simulate CTRL-C event while handler is active.
+ with allow_interrupt(interrupt_polling):
+ result = GenerateConsoleCtrlEvent(0, 0)
+ if result == 0:
+ raise WindowsError
+ except KeyboardInterrupt:
+ pass
+ else:
+ self.fail('Expecting `KeyboardInterrupt` exception!')
+
+ # Make sure our handler was called.
+ self.assertEqual(interrupt_polling.__calls__, 1)
+ else:
+ # On non-Windows systems, this utility is just a no-op!
+ with allow_interrupt(interrupt_polling):
+ pass
+ self.assertEqual(interrupt_polling.__calls__, 0)
diff --git a/external_libs/python/zmq/tests/test_z85.py b/external_libs/python/zmq/tests/test_z85.py
new file mode 100644
index 00000000..8a73cb4d
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_z85.py
@@ -0,0 +1,63 @@
+# -*- coding: utf8 -*-
+"""Test Z85 encoding
+
+confirm values and roundtrip with test values from the reference implementation.
+"""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+from unittest import TestCase
+from zmq.utils import z85
+
+
+class TestZ85(TestCase):
+
+ def test_client_public(self):
+ client_public = \
+ b"\xBB\x88\x47\x1D\x65\xE2\x65\x9B" \
+ b"\x30\xC5\x5A\x53\x21\xCE\xBB\x5A" \
+ b"\xAB\x2B\x70\xA3\x98\x64\x5C\x26" \
+ b"\xDC\xA2\xB2\xFC\xB4\x3F\xC5\x18"
+ encoded = z85.encode(client_public)
+
+ self.assertEqual(encoded, b"Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID")
+ decoded = z85.decode(encoded)
+ self.assertEqual(decoded, client_public)
+
+ def test_client_secret(self):
+ client_secret = \
+ b"\x7B\xB8\x64\xB4\x89\xAF\xA3\x67" \
+ b"\x1F\xBE\x69\x10\x1F\x94\xB3\x89" \
+ b"\x72\xF2\x48\x16\xDF\xB0\x1B\x51" \
+ b"\x65\x6B\x3F\xEC\x8D\xFD\x08\x88"
+ encoded = z85.encode(client_secret)
+
+ self.assertEqual(encoded, b"D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs")
+ decoded = z85.decode(encoded)
+ self.assertEqual(decoded, client_secret)
+
+ def test_server_public(self):
+ server_public = \
+ b"\x54\xFC\xBA\x24\xE9\x32\x49\x96" \
+ b"\x93\x16\xFB\x61\x7C\x87\x2B\xB0" \
+ b"\xC1\xD1\xFF\x14\x80\x04\x27\xC5" \
+ b"\x94\xCB\xFA\xCF\x1B\xC2\xD6\x52"
+ encoded = z85.encode(server_public)
+
+ self.assertEqual(encoded, b"rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7")
+ decoded = z85.decode(encoded)
+ self.assertEqual(decoded, server_public)
+
+ def test_server_secret(self):
+ server_secret = \
+ b"\x8E\x0B\xDD\x69\x76\x28\xB9\x1D" \
+ b"\x8F\x24\x55\x87\xEE\x95\xC5\xB0" \
+ b"\x4D\x48\x96\x3F\x79\x25\x98\x77" \
+ b"\xB4\x9C\xD9\x06\x3A\xEA\xD3\xB7"
+ encoded = z85.encode(server_secret)
+
+ self.assertEqual(encoded, b"JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6")
+ decoded = z85.decode(encoded)
+ self.assertEqual(decoded, server_secret)
+
diff --git a/external_libs/python/zmq/tests/test_zmqstream.py b/external_libs/python/zmq/tests/test_zmqstream.py
new file mode 100644
index 00000000..cdb3a171
--- /dev/null
+++ b/external_libs/python/zmq/tests/test_zmqstream.py
@@ -0,0 +1,34 @@
+# -*- coding: utf8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import sys
+import time
+
+from unittest import TestCase
+
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+class TestZMQStream(TestCase):
+
+ def setUp(self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.REP)
+ self.loop = ioloop.IOLoop.instance()
+ self.stream = zmqstream.ZMQStream(self.socket)
+
+ def tearDown(self):
+ self.socket.close()
+ self.context.term()
+
+ def test_callable_check(self):
+ """Ensure callable check works (py3k)."""
+
+ self.stream.on_send(lambda *args: None)
+ self.stream.on_recv(lambda *args: None)
+ self.assertRaises(AssertionError, self.stream.on_recv, 1)
+ self.assertRaises(AssertionError, self.stream.on_send, 1)
+ self.assertRaises(AssertionError, self.stream.on_recv, zmq)
+