summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/tests')
-rwxr-xr-xsrc/console/zmq/tests/__init__.py211
-rwxr-xr-xsrc/console/zmq/tests/test_auth.py431
-rwxr-xr-xsrc/console/zmq/tests/test_cffi_backend.py310
-rwxr-xr-xsrc/console/zmq/tests/test_constants.py104
-rwxr-xr-xsrc/console/zmq/tests/test_context.py257
-rwxr-xr-xsrc/console/zmq/tests/test_device.py146
-rwxr-xr-xsrc/console/zmq/tests/test_error.py43
-rwxr-xr-xsrc/console/zmq/tests/test_etc.py15
-rwxr-xr-xsrc/console/zmq/tests/test_imports.py62
-rwxr-xr-xsrc/console/zmq/tests/test_ioloop.py113
-rwxr-xr-xsrc/console/zmq/tests/test_log.py116
-rwxr-xr-xsrc/console/zmq/tests/test_message.py362
-rwxr-xr-xsrc/console/zmq/tests/test_monitor.py71
-rwxr-xr-xsrc/console/zmq/tests/test_monqueue.py227
-rwxr-xr-xsrc/console/zmq/tests/test_multipart.py35
-rwxr-xr-xsrc/console/zmq/tests/test_pair.py53
-rwxr-xr-xsrc/console/zmq/tests/test_poll.py229
-rwxr-xr-xsrc/console/zmq/tests/test_pubsub.py41
-rwxr-xr-xsrc/console/zmq/tests/test_reqrep.py62
-rwxr-xr-xsrc/console/zmq/tests/test_security.py212
-rwxr-xr-xsrc/console/zmq/tests/test_socket.py450
-rwxr-xr-xsrc/console/zmq/tests/test_stopwatch.py42
-rwxr-xr-xsrc/console/zmq/tests/test_version.py44
-rwxr-xr-xsrc/console/zmq/tests/test_win32_shim.py56
-rwxr-xr-xsrc/console/zmq/tests/test_z85.py63
-rwxr-xr-xsrc/console/zmq/tests/test_zmqstream.py34
26 files changed, 0 insertions, 3789 deletions
diff --git a/src/console/zmq/tests/__init__.py b/src/console/zmq/tests/__init__.py
deleted file mode 100755
index 325a3f19..00000000
--- a/src/console/zmq/tests/__init__.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright (c) PyZMQ Developers.
-# Distributed under the terms of the Modified BSD License.
-
-import functools
-import sys
-import time
-from threading import Thread
-
-from unittest import TestCase
-
-import zmq
-from zmq.utils import jsonapi
-
-try:
- import gevent
- from zmq import green as gzmq
- have_gevent = True
-except ImportError:
- have_gevent = False
-
-try:
- from unittest import SkipTest
-except ImportError:
- try:
- from nose import SkipTest
- except ImportError:
- class SkipTest(Exception):
- pass
-
-PYPY = 'PyPy' in sys.version
-
-#-----------------------------------------------------------------------------
-# skip decorators (directly from unittest)
-#-----------------------------------------------------------------------------
-
-_id = lambda x: x
-
-def skip(reason):
- """
- Unconditionally skip a test.
- """
- def decorator(test_item):
- if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
- @functools.wraps(test_item)
- def skip_wrapper(*args, **kwargs):
- raise SkipTest(reason)
- test_item = skip_wrapper
-
- test_item.__unittest_skip__ = True
- test_item.__unittest_skip_why__ = reason
- return test_item
- return decorator
-
-def skip_if(condition, reason="Skipped"):
- """
- Skip a test if the condition is true.
- """
- if condition:
- return skip(reason)
- return _id
-
-skip_pypy = skip_if(PYPY, "Doesn't work on PyPy")
-
-#-----------------------------------------------------------------------------
-# Base test class
-#-----------------------------------------------------------------------------
-
-class BaseZMQTestCase(TestCase):
- green = False
-
- @property
- def Context(self):
- if self.green:
- return gzmq.Context
- else:
- return zmq.Context
-
- def socket(self, socket_type):
- s = self.context.socket(socket_type)
- self.sockets.append(s)
- return s
-
- def setUp(self):
- if self.green and not have_gevent:
- raise SkipTest("requires gevent")
- self.context = self.Context.instance()
- self.sockets = []
-
- def tearDown(self):
- contexts = set([self.context])
- while self.sockets:
- sock = self.sockets.pop()
- contexts.add(sock.context) # in case additional contexts are created
- sock.close(0)
- for ctx in contexts:
- t = Thread(target=ctx.term)
- t.daemon = True
- t.start()
- t.join(timeout=2)
- if t.is_alive():
- # reset Context.instance, so the failure to term doesn't corrupt subsequent tests
- zmq.sugar.context.Context._instance = None
- raise RuntimeError("context could not terminate, open sockets likely remain in test")
-
- def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
- """Create a bound socket pair using a random port."""
- s1 = self.context.socket(type1)
- s1.setsockopt(zmq.LINGER, 0)
- port = s1.bind_to_random_port(interface)
- s2 = self.context.socket(type2)
- s2.setsockopt(zmq.LINGER, 0)
- s2.connect('%s:%s' % (interface, port))
- self.sockets.extend([s1,s2])
- return s1, s2
-
- def ping_pong(self, s1, s2, msg):
- s1.send(msg)
- msg2 = s2.recv()
- s2.send(msg2)
- msg3 = s1.recv()
- return msg3
-
- def ping_pong_json(self, s1, s2, o):
- if jsonapi.jsonmod is None:
- raise SkipTest("No json library")
- s1.send_json(o)
- o2 = s2.recv_json()
- s2.send_json(o2)
- o3 = s1.recv_json()
- return o3
-
- def ping_pong_pyobj(self, s1, s2, o):
- s1.send_pyobj(o)
- o2 = s2.recv_pyobj()
- s2.send_pyobj(o2)
- o3 = s1.recv_pyobj()
- return o3
-
- def assertRaisesErrno(self, errno, func, *args, **kwargs):
- try:
- func(*args, **kwargs)
- except zmq.ZMQError as e:
- self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
-got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
- else:
- self.fail("Function did not raise any error")
-
- def _select_recv(self, multipart, socket, **kwargs):
- """call recv[_multipart] in a way that raises if there is nothing to receive"""
- if zmq.zmq_version_info() >= (3,1,0):
- # zmq 3.1 has a bug, where poll can return false positives,
- # so we wait a little bit just in case
- # See LIBZMQ-280 on JIRA
- time.sleep(0.1)
-
- r,w,x = zmq.select([socket], [], [], timeout=5)
- assert len(r) > 0, "Should have received a message"
- kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
-
- recv = socket.recv_multipart if multipart else socket.recv
- return recv(**kwargs)
-
- def recv(self, socket, **kwargs):
- """call recv in a way that raises if there is nothing to receive"""
- return self._select_recv(False, socket, **kwargs)
-
- def recv_multipart(self, socket, **kwargs):
- """call recv_multipart in a way that raises if there is nothing to receive"""
- return self._select_recv(True, socket, **kwargs)
-
-
-class PollZMQTestCase(BaseZMQTestCase):
- pass
-
-class GreenTest:
- """Mixin for making green versions of test classes"""
- green = True
-
- def assertRaisesErrno(self, errno, func, *args, **kwargs):
- if errno == zmq.EAGAIN:
- raise SkipTest("Skipping because we're green.")
- try:
- func(*args, **kwargs)
- except zmq.ZMQError:
- e = sys.exc_info()[1]
- self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
-got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
- else:
- self.fail("Function did not raise any error")
-
- def tearDown(self):
- contexts = set([self.context])
- while self.sockets:
- sock = self.sockets.pop()
- contexts.add(sock.context) # in case additional contexts are created
- sock.close()
- try:
- gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
- except gevent.Timeout:
- raise RuntimeError("context could not terminate, open sockets likely remain in test")
-
- def skip_green(self):
- raise SkipTest("Skipping because we are green")
-
-def skip_green(f):
- def skipping_test(self, *args, **kwargs):
- if self.green:
- raise SkipTest("Skipping because we are green")
- else:
- return f(self, *args, **kwargs)
- return skipping_test
diff --git a/src/console/zmq/tests/test_auth.py b/src/console/zmq/tests/test_auth.py
deleted file mode 100755
index d350f61f..00000000
--- a/src/console/zmq/tests/test_auth.py
+++ /dev/null
@@ -1,431 +0,0 @@
-# -*- coding: utf8 -*-
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import logging
-import os
-import shutil
-import sys
-import tempfile
-
-import zmq.auth
-from zmq.auth.ioloop import IOLoopAuthenticator
-from zmq.auth.thread import ThreadAuthenticator
-
-from zmq.eventloop import ioloop, zmqstream
-from zmq.tests import (BaseZMQTestCase, SkipTest)
-
-class BaseAuthTestCase(BaseZMQTestCase):
- def setUp(self):
- if zmq.zmq_version_info() < (4,0):
- raise SkipTest("security is new in libzmq 4.0")
- try:
- zmq.curve_keypair()
- except zmq.ZMQError:
- raise SkipTest("security requires libzmq to be linked against libsodium")
- super(BaseAuthTestCase, self).setUp()
- # enable debug logging while we run tests
- logging.getLogger('zmq.auth').setLevel(logging.DEBUG)
- self.auth = self.make_auth()
- self.auth.start()
- self.base_dir, self.public_keys_dir, self.secret_keys_dir = self.create_certs()
-
- def make_auth(self):
- raise NotImplementedError()
-
- def tearDown(self):
- if self.auth:
- self.auth.stop()
- self.auth = None
- self.remove_certs(self.base_dir)
- super(BaseAuthTestCase, self).tearDown()
-
- def create_certs(self):
- """Create CURVE certificates for a test"""
-
- # Create temporary CURVE keypairs for this test run. We create all keys in a
- # temp directory and then move them into the appropriate private or public
- # directory.
-
- base_dir = tempfile.mkdtemp()
- keys_dir = os.path.join(base_dir, 'certificates')
- public_keys_dir = os.path.join(base_dir, 'public_keys')
- secret_keys_dir = os.path.join(base_dir, 'private_keys')
-
- os.mkdir(keys_dir)
- os.mkdir(public_keys_dir)
- os.mkdir(secret_keys_dir)
-
- server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
- client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
-
- for key_file in os.listdir(keys_dir):
- if key_file.endswith(".key"):
- shutil.move(os.path.join(keys_dir, key_file),
- os.path.join(public_keys_dir, '.'))
-
- for key_file in os.listdir(keys_dir):
- if key_file.endswith(".key_secret"):
- shutil.move(os.path.join(keys_dir, key_file),
- os.path.join(secret_keys_dir, '.'))
-
- return (base_dir, public_keys_dir, secret_keys_dir)
-
- def remove_certs(self, base_dir):
- """Remove certificates for a test"""
- shutil.rmtree(base_dir)
-
- def load_certs(self, secret_keys_dir):
- """Return server and client certificate keys"""
- server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
- client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
-
- server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
- client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
-
- return server_public, server_secret, client_public, client_secret
-
-
-class TestThreadAuthentication(BaseAuthTestCase):
- """Test authentication running in a thread"""
-
- def make_auth(self):
- return ThreadAuthenticator(self.context)
-
- def can_connect(self, server, client):
- """Check if client can connect to server using tcp transport"""
- result = False
- iface = 'tcp://127.0.0.1'
- port = server.bind_to_random_port(iface)
- client.connect("%s:%i" % (iface, port))
- msg = [b"Hello World"]
- server.send_multipart(msg)
- if client.poll(1000):
- rcvd_msg = client.recv_multipart()
- self.assertEqual(rcvd_msg, msg)
- result = True
- return result
-
- def test_null(self):
- """threaded auth - NULL"""
- # A default NULL connection should always succeed, and not
- # go through our authentication infrastructure at all.
- self.auth.stop()
- self.auth = None
-
- server = self.socket(zmq.PUSH)
- client = self.socket(zmq.PULL)
- self.assertTrue(self.can_connect(server, client))
-
- # By setting a domain we switch on authentication for NULL sockets,
- # though no policies are configured yet. The client connection
- # should still be allowed.
- server = self.socket(zmq.PUSH)
- server.zap_domain = b'global'
- client = self.socket(zmq.PULL)
- self.assertTrue(self.can_connect(server, client))
-
- def test_blacklist(self):
- """threaded auth - Blacklist"""
- # Blacklist 127.0.0.1, connection should fail
- self.auth.deny('127.0.0.1')
- server = self.socket(zmq.PUSH)
- # By setting a domain we switch on authentication for NULL sockets,
- # though no policies are configured yet.
- server.zap_domain = b'global'
- client = self.socket(zmq.PULL)
- self.assertFalse(self.can_connect(server, client))
-
- def test_whitelist(self):
- """threaded auth - Whitelist"""
- # Whitelist 127.0.0.1, connection should pass"
- self.auth.allow('127.0.0.1')
- server = self.socket(zmq.PUSH)
- # By setting a domain we switch on authentication for NULL sockets,
- # though no policies are configured yet.
- server.zap_domain = b'global'
- client = self.socket(zmq.PULL)
- self.assertTrue(self.can_connect(server, client))
-
- def test_plain(self):
- """threaded auth - PLAIN"""
-
- # Try PLAIN authentication - without configuring server, connection should fail
- server = self.socket(zmq.PUSH)
- server.plain_server = True
- client = self.socket(zmq.PULL)
- client.plain_username = b'admin'
- client.plain_password = b'Password'
- self.assertFalse(self.can_connect(server, client))
-
- # Try PLAIN authentication - with server configured, connection should pass
- server = self.socket(zmq.PUSH)
- server.plain_server = True
- client = self.socket(zmq.PULL)
- client.plain_username = b'admin'
- client.plain_password = b'Password'
- self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
- self.assertTrue(self.can_connect(server, client))
-
- # Try PLAIN authentication - with bogus credentials, connection should fail
- server = self.socket(zmq.PUSH)
- server.plain_server = True
- client = self.socket(zmq.PULL)
- client.plain_username = b'admin'
- client.plain_password = b'Bogus'
- self.assertFalse(self.can_connect(server, client))
-
- # Remove authenticator and check that a normal connection works
- self.auth.stop()
- self.auth = None
-
- server = self.socket(zmq.PUSH)
- client = self.socket(zmq.PULL)
- self.assertTrue(self.can_connect(server, client))
- client.close()
- server.close()
-
- def test_curve(self):
- """threaded auth - CURVE"""
- self.auth.allow('127.0.0.1')
- certs = self.load_certs(self.secret_keys_dir)
- server_public, server_secret, client_public, client_secret = certs
-
- #Try CURVE authentication - without configuring server, connection should fail
- server = self.socket(zmq.PUSH)
- server.curve_publickey = server_public
- server.curve_secretkey = server_secret
- server.curve_server = True
- client = self.socket(zmq.PULL)
- client.curve_publickey = client_public
- client.curve_secretkey = client_secret
- client.curve_serverkey = server_public
- self.assertFalse(self.can_connect(server, client))
-
- #Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass
- self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
- server = self.socket(zmq.PUSH)
- server.curve_publickey = server_public
- server.curve_secretkey = server_secret
- server.curve_server = True
- client = self.socket(zmq.PULL)
- client.curve_publickey = client_public
- client.curve_secretkey = client_secret
- client.curve_serverkey = server_public
- self.assertTrue(self.can_connect(server, client))
-
- # Try CURVE authentication - with server configured, connection should pass
- self.auth.configure_curve(domain='*', location=self.public_keys_dir)
- server = self.socket(zmq.PUSH)
- server.curve_publickey = server_public
- server.curve_secretkey = server_secret
- server.curve_server = True
- client = self.socket(zmq.PULL)
- client.curve_publickey = client_public
- client.curve_secretkey = client_secret
- client.curve_serverkey = server_public
- self.assertTrue(self.can_connect(server, client))
-
- # Remove authenticator and check that a normal connection works
- self.auth.stop()
- self.auth = None
-
- # Try connecting using NULL and no authentication enabled, connection should pass
- server = self.socket(zmq.PUSH)
- client = self.socket(zmq.PULL)
- self.assertTrue(self.can_connect(server, client))
-
-
-def with_ioloop(method, expect_success=True):
- """decorator for running tests with an IOLoop"""
- def test_method(self):
- r = method(self)
-
- loop = self.io_loop
- if expect_success:
- self.pullstream.on_recv(self.on_message_succeed)
- else:
- self.pullstream.on_recv(self.on_message_fail)
-
- t = loop.time()
- loop.add_callback(self.attempt_connection)
- loop.add_callback(self.send_msg)
- if expect_success:
- loop.add_timeout(t + 1, self.on_test_timeout_fail)
- else:
- loop.add_timeout(t + 1, self.on_test_timeout_succeed)
-
- loop.start()
- if self.fail_msg:
- self.fail(self.fail_msg)
-
- return r
- return test_method
-
-def should_auth(method):
- return with_ioloop(method, True)
-
-def should_not_auth(method):
- return with_ioloop(method, False)
-
-class TestIOLoopAuthentication(BaseAuthTestCase):
- """Test authentication running in ioloop"""
-
- def setUp(self):
- self.fail_msg = None
- self.io_loop = ioloop.IOLoop()
- super(TestIOLoopAuthentication, self).setUp()
- self.server = self.socket(zmq.PUSH)
- self.client = self.socket(zmq.PULL)
- self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop)
- self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop)
-
- def make_auth(self):
- return IOLoopAuthenticator(self.context, io_loop=self.io_loop)
-
- def tearDown(self):
- if self.auth:
- self.auth.stop()
- self.auth = None
- self.io_loop.close(all_fds=True)
- super(TestIOLoopAuthentication, self).tearDown()
-
- def attempt_connection(self):
- """Check if client can connect to server using tcp transport"""
- iface = 'tcp://127.0.0.1'
- port = self.server.bind_to_random_port(iface)
- self.client.connect("%s:%i" % (iface, port))
-
- def send_msg(self):
- """Send a message from server to a client"""
- msg = [b"Hello World"]
- self.pushstream.send_multipart(msg)
-
- def on_message_succeed(self, frames):
- """A message was received, as expected."""
- if frames != [b"Hello World"]:
- self.fail_msg = "Unexpected message received"
- self.io_loop.stop()
-
- def on_message_fail(self, frames):
- """A message was received, unexpectedly."""
- self.fail_msg = 'Received messaged unexpectedly, security failed'
- self.io_loop.stop()
-
- def on_test_timeout_succeed(self):
- """Test timer expired, indicates test success"""
- self.io_loop.stop()
-
- def on_test_timeout_fail(self):
- """Test timer expired, indicates test failure"""
- self.fail_msg = 'Test timed out'
- self.io_loop.stop()
-
- @should_auth
- def test_none(self):
- """ioloop auth - NONE"""
- # A default NULL connection should always succeed, and not
- # go through our authentication infrastructure at all.
- # no auth should be running
- self.auth.stop()
- self.auth = None
-
- @should_auth
- def test_null(self):
- """ioloop auth - NULL"""
- # By setting a domain we switch on authentication for NULL sockets,
- # though no policies are configured yet. The client connection
- # should still be allowed.
- self.server.zap_domain = b'global'
-
- @should_not_auth
- def test_blacklist(self):
- """ioloop auth - Blacklist"""
- # Blacklist 127.0.0.1, connection should fail
- self.auth.deny('127.0.0.1')
- self.server.zap_domain = b'global'
-
- @should_auth
- def test_whitelist(self):
- """ioloop auth - Whitelist"""
- # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
- self.auth.allow('127.0.0.1')
-
- self.server.setsockopt(zmq.ZAP_DOMAIN, b'global')
-
- @should_not_auth
- def test_plain_unconfigured_server(self):
- """ioloop auth - PLAIN, unconfigured server"""
- self.client.plain_username = b'admin'
- self.client.plain_password = b'Password'
- # Try PLAIN authentication - without configuring server, connection should fail
- self.server.plain_server = True
-
- @should_auth
- def test_plain_configured_server(self):
- """ioloop auth - PLAIN, configured server"""
- self.client.plain_username = b'admin'
- self.client.plain_password = b'Password'
- # Try PLAIN authentication - with server configured, connection should pass
- self.server.plain_server = True
- self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
-
- @should_not_auth
- def test_plain_bogus_credentials(self):
- """ioloop auth - PLAIN, bogus credentials"""
- self.client.plain_username = b'admin'
- self.client.plain_password = b'Bogus'
- self.server.plain_server = True
-
- self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
-
- @should_not_auth
- def test_curve_unconfigured_server(self):
- """ioloop auth - CURVE, unconfigured server"""
- certs = self.load_certs(self.secret_keys_dir)
- server_public, server_secret, client_public, client_secret = certs
-
- self.auth.allow('127.0.0.1')
-
- self.server.curve_publickey = server_public
- self.server.curve_secretkey = server_secret
- self.server.curve_server = True
-
- self.client.curve_publickey = client_public
- self.client.curve_secretkey = client_secret
- self.client.curve_serverkey = server_public
-
- @should_auth
- def test_curve_allow_any(self):
- """ioloop auth - CURVE, CURVE_ALLOW_ANY"""
- certs = self.load_certs(self.secret_keys_dir)
- server_public, server_secret, client_public, client_secret = certs
-
- self.auth.allow('127.0.0.1')
- self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
-
- self.server.curve_publickey = server_public
- self.server.curve_secretkey = server_secret
- self.server.curve_server = True
-
- self.client.curve_publickey = client_public
- self.client.curve_secretkey = client_secret
- self.client.curve_serverkey = server_public
-
- @should_auth
- def test_curve_configured_server(self):
- """ioloop auth - CURVE, configured server"""
- self.auth.allow('127.0.0.1')
- certs = self.load_certs(self.secret_keys_dir)
- server_public, server_secret, client_public, client_secret = certs
-
- self.auth.configure_curve(domain='*', location=self.public_keys_dir)
-
- self.server.curve_publickey = server_public
- self.server.curve_secretkey = server_secret
- self.server.curve_server = True
-
- self.client.curve_publickey = client_public
- self.client.curve_secretkey = client_secret
- self.client.curve_serverkey = server_public
diff --git a/src/console/zmq/tests/test_cffi_backend.py b/src/console/zmq/tests/test_cffi_backend.py
deleted file mode 100755
index 1f85eebf..00000000
--- a/src/console/zmq/tests/test_cffi_backend.py
+++ /dev/null
@@ -1,310 +0,0 @@
-# -*- coding: utf8 -*-
-
-import sys
-import time
-
-from unittest import TestCase
-
-from zmq.tests import BaseZMQTestCase, SkipTest
-
-try:
- from zmq.backend.cffi import (
- zmq_version_info,
- PUSH, PULL, IDENTITY,
- REQ, REP, POLLIN, POLLOUT,
- )
- from zmq.backend.cffi._cffi import ffi, C
- have_ffi_backend = True
-except ImportError:
- have_ffi_backend = False
-
-
-class TestCFFIBackend(TestCase):
-
- def setUp(self):
- if not have_ffi_backend or not 'PyPy' in sys.version:
- raise SkipTest('PyPy Tests Only')
-
- def test_zmq_version_info(self):
- version = zmq_version_info()
-
- assert version[0] in range(2,11)
-
- def test_zmq_ctx_new_destroy(self):
- ctx = C.zmq_ctx_new()
-
- assert ctx != ffi.NULL
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_socket_open_close(self):
- ctx = C.zmq_ctx_new()
- socket = C.zmq_socket(ctx, PUSH)
-
- assert ctx != ffi.NULL
- assert ffi.NULL != socket
- assert 0 == C.zmq_close(socket)
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_setsockopt(self):
- ctx = C.zmq_ctx_new()
- socket = C.zmq_socket(ctx, PUSH)
-
- identity = ffi.new('char[3]', 'zmq')
- ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
-
- assert ret == 0
- assert ctx != ffi.NULL
- assert ffi.NULL != socket
- assert 0 == C.zmq_close(socket)
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_getsockopt(self):
- ctx = C.zmq_ctx_new()
- socket = C.zmq_socket(ctx, PUSH)
-
- identity = ffi.new('char[]', 'zmq')
- ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
- assert ret == 0
-
- option_len = ffi.new('size_t*', 3)
- option = ffi.new('char*')
- ret = C.zmq_getsockopt(socket,
- IDENTITY,
- ffi.cast('void*', option),
- option_len)
-
- assert ret == 0
- assert ffi.string(ffi.cast('char*', option))[0] == "z"
- assert ffi.string(ffi.cast('char*', option))[1] == "m"
- assert ffi.string(ffi.cast('char*', option))[2] == "q"
- assert ctx != ffi.NULL
- assert ffi.NULL != socket
- assert 0 == C.zmq_close(socket)
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_bind(self):
- ctx = C.zmq_ctx_new()
- socket = C.zmq_socket(ctx, 8)
-
- assert 0 == C.zmq_bind(socket, 'tcp://*:4444')
- assert ctx != ffi.NULL
- assert ffi.NULL != socket
- assert 0 == C.zmq_close(socket)
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_bind_connect(self):
- ctx = C.zmq_ctx_new()
-
- socket1 = C.zmq_socket(ctx, PUSH)
- socket2 = C.zmq_socket(ctx, PULL)
-
- assert 0 == C.zmq_bind(socket1, 'tcp://*:4444')
- assert 0 == C.zmq_connect(socket2, 'tcp://127.0.0.1:4444')
- assert ctx != ffi.NULL
- assert ffi.NULL != socket1
- assert ffi.NULL != socket2
- assert 0 == C.zmq_close(socket1)
- assert 0 == C.zmq_close(socket2)
- assert 0 == C.zmq_ctx_destroy(ctx)
-
- def test_zmq_msg_init_close(self):
- zmq_msg = ffi.new('zmq_msg_t*')
-
- assert ffi.NULL != zmq_msg
- assert 0 == C.zmq_msg_init(zmq_msg)
- assert 0 == C.zmq_msg_close(zmq_msg)
-
- def test_zmq_msg_init_size(self):
- zmq_msg = ffi.new('zmq_msg_t*')
-
- assert ffi.NULL != zmq_msg
- assert 0 == C.zmq_msg_init_size(zmq_msg, 10)
- assert 0 == C.zmq_msg_close(zmq_msg)
-
- def test_zmq_msg_init_data(self):
- zmq_msg = ffi.new('zmq_msg_t*')
- message = ffi.new('char[5]', 'Hello')
-
- assert 0 == C.zmq_msg_init_data(zmq_msg,
- ffi.cast('void*', message),
- 5,
- ffi.NULL,
- ffi.NULL)
-
- assert ffi.NULL != zmq_msg
- assert 0 == C.zmq_msg_close(zmq_msg)
-
- def test_zmq_msg_data(self):
- zmq_msg = ffi.new('zmq_msg_t*')
- message = ffi.new('char[]', 'Hello')
- assert 0 == C.zmq_msg_init_data(zmq_msg,
- ffi.cast('void*', message),
- 5,
- ffi.NULL,
- ffi.NULL)
-
- data = C.zmq_msg_data(zmq_msg)
-
- assert ffi.NULL != zmq_msg
- assert ffi.string(ffi.cast("char*", data)) == 'Hello'
- assert 0 == C.zmq_msg_close(zmq_msg)
-
-
- def test_zmq_send(self):
- ctx = C.zmq_ctx_new()
-
- sender = C.zmq_socket(ctx, REQ)
- receiver = C.zmq_socket(ctx, REP)
-
- assert 0 == C.zmq_bind(receiver, 'tcp://*:7777')
- assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:7777')
-
- time.sleep(0.1)
-
- zmq_msg = ffi.new('zmq_msg_t*')
- message = ffi.new('char[5]', 'Hello')
-
- C.zmq_msg_init_data(zmq_msg,
- ffi.cast('void*', message),
- ffi.cast('size_t', 5),
- ffi.NULL,
- ffi.NULL)
-
- assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
- assert 0 == C.zmq_msg_close(zmq_msg)
- assert C.zmq_close(sender) == 0
- assert C.zmq_close(receiver) == 0
- assert C.zmq_ctx_destroy(ctx) == 0
-
- def test_zmq_recv(self):
- ctx = C.zmq_ctx_new()
-
- sender = C.zmq_socket(ctx, REQ)
- receiver = C.zmq_socket(ctx, REP)
-
- assert 0 == C.zmq_bind(receiver, 'tcp://*:2222')
- assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:2222')
-
- time.sleep(0.1)
-
- zmq_msg = ffi.new('zmq_msg_t*')
- message = ffi.new('char[5]', 'Hello')
-
- C.zmq_msg_init_data(zmq_msg,
- ffi.cast('void*', message),
- ffi.cast('size_t', 5),
- ffi.NULL,
- ffi.NULL)
-
- zmq_msg2 = ffi.new('zmq_msg_t*')
- C.zmq_msg_init(zmq_msg2)
-
- assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
- assert 5 == C.zmq_msg_recv(zmq_msg2, receiver, 0)
- assert 5 == C.zmq_msg_size(zmq_msg2)
- assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
- C.zmq_msg_size(zmq_msg2))[:]
- assert C.zmq_close(sender) == 0
- assert C.zmq_close(receiver) == 0
- assert C.zmq_ctx_destroy(ctx) == 0
-
- def test_zmq_poll(self):
- ctx = C.zmq_ctx_new()
-
- sender = C.zmq_socket(ctx, REQ)
- receiver = C.zmq_socket(ctx, REP)
-
- r1 = C.zmq_bind(receiver, 'tcp://*:3333')
- r2 = C.zmq_connect(sender, 'tcp://127.0.0.1:3333')
-
- zmq_msg = ffi.new('zmq_msg_t*')
- message = ffi.new('char[5]', 'Hello')
-
- C.zmq_msg_init_data(zmq_msg,
- ffi.cast('void*', message),
- ffi.cast('size_t', 5),
- ffi.NULL,
- ffi.NULL)
-
- receiver_pollitem = ffi.new('zmq_pollitem_t*')
- receiver_pollitem.socket = receiver
- receiver_pollitem.fd = 0
- receiver_pollitem.events = POLLIN | POLLOUT
- receiver_pollitem.revents = 0
-
- ret = C.zmq_poll(ffi.NULL, 0, 0)
- assert ret == 0
-
- ret = C.zmq_poll(receiver_pollitem, 1, 0)
- assert ret == 0
-
- ret = C.zmq_msg_send(zmq_msg, sender, 0)
- print(ffi.string(C.zmq_strerror(C.zmq_errno())))
- assert ret == 5
-
- time.sleep(0.2)
-
- ret = C.zmq_poll(receiver_pollitem, 1, 0)
- assert ret == 1
-
- assert int(receiver_pollitem.revents) & POLLIN
- assert not int(receiver_pollitem.revents) & POLLOUT
-
- zmq_msg2 = ffi.new('zmq_msg_t*')
- C.zmq_msg_init(zmq_msg2)
-
- ret_recv = C.zmq_msg_recv(zmq_msg2, receiver, 0)
- assert ret_recv == 5
-
- assert 5 == C.zmq_msg_size(zmq_msg2)
- assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
- C.zmq_msg_size(zmq_msg2))[:]
-
- sender_pollitem = ffi.new('zmq_pollitem_t*')
- sender_pollitem.socket = sender
- sender_pollitem.fd = 0
- sender_pollitem.events = POLLIN | POLLOUT
- sender_pollitem.revents = 0
-
- ret = C.zmq_poll(sender_pollitem, 1, 0)
- assert ret == 0
-
- zmq_msg_again = ffi.new('zmq_msg_t*')
- message_again = ffi.new('char[11]', 'Hello Again')
-
- C.zmq_msg_init_data(zmq_msg_again,
- ffi.cast('void*', message_again),
- ffi.cast('size_t', 11),
- ffi.NULL,
- ffi.NULL)
-
- assert 11 == C.zmq_msg_send(zmq_msg_again, receiver, 0)
-
- time.sleep(0.2)
-
- assert 0 <= C.zmq_poll(sender_pollitem, 1, 0)
- assert int(sender_pollitem.revents) & POLLIN
- assert 11 == C.zmq_msg_recv(zmq_msg2, sender, 0)
- assert 11 == C.zmq_msg_size(zmq_msg2)
- assert b"Hello Again" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
- int(C.zmq_msg_size(zmq_msg2)))[:]
- assert 0 == C.zmq_close(sender)
- assert 0 == C.zmq_close(receiver)
- assert 0 == C.zmq_ctx_destroy(ctx)
- assert 0 == C.zmq_msg_close(zmq_msg)
- assert 0 == C.zmq_msg_close(zmq_msg2)
- assert 0 == C.zmq_msg_close(zmq_msg_again)
-
- def test_zmq_stopwatch_functions(self):
- stopwatch = C.zmq_stopwatch_start()
- ret = C.zmq_stopwatch_stop(stopwatch)
-
- assert ffi.NULL != stopwatch
- assert 0 < int(ret)
-
- def test_zmq_sleep(self):
- try:
- C.zmq_sleep(1)
- except Exception as e:
- raise AssertionError("Error executing zmq_sleep(int)")
-
diff --git a/src/console/zmq/tests/test_constants.py b/src/console/zmq/tests/test_constants.py
deleted file mode 100755
index d32b2b48..00000000
--- a/src/console/zmq/tests/test_constants.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import json
-from unittest import TestCase
-
-import zmq
-
-from zmq.utils import constant_names
-from zmq.sugar import constants as sugar_constants
-from zmq.backend import constants as backend_constants
-
-all_set = set(constant_names.all_names)
-
-class TestConstants(TestCase):
-
- def _duplicate_test(self, namelist, listname):
- """test that a given list has no duplicates"""
- dupes = {}
- for name in set(namelist):
- cnt = namelist.count(name)
- if cnt > 1:
- dupes[name] = cnt
- if dupes:
- self.fail("The following names occur more than once in %s: %s" % (listname, json.dumps(dupes, indent=2)))
-
- def test_duplicate_all(self):
- return self._duplicate_test(constant_names.all_names, "all_names")
-
- def _change_key(self, change, version):
- """return changed-in key"""
- return "%s-in %d.%d.%d" % tuple([change] + list(version))
-
- def test_duplicate_changed(self):
- all_changed = []
- for change in ("new", "removed"):
- d = getattr(constant_names, change + "_in")
- for version, namelist in d.items():
- all_changed.extend(namelist)
- self._duplicate_test(namelist, self._change_key(change, version))
-
- self._duplicate_test(all_changed, "all-changed")
-
- def test_changed_in_all(self):
- missing = {}
- for change in ("new", "removed"):
- d = getattr(constant_names, change + "_in")
- for version, namelist in d.items():
- key = self._change_key(change, version)
- for name in namelist:
- if name not in all_set:
- if key not in missing:
- missing[key] = []
- missing[key].append(name)
-
- if missing:
- self.fail(
- "The following names are missing in `all_names`: %s" % json.dumps(missing, indent=2)
- )
-
- def test_no_negative_constants(self):
- for name in sugar_constants.__all__:
- self.assertNotEqual(getattr(zmq, name), sugar_constants._UNDEFINED)
-
- def test_undefined_constants(self):
- all_aliases = []
- for alias_group in sugar_constants.aliases:
- all_aliases.extend(alias_group)
-
- for name in all_set.difference(all_aliases):
- raw = getattr(backend_constants, name)
- if raw == sugar_constants._UNDEFINED:
- self.assertRaises(AttributeError, getattr, zmq, name)
- else:
- self.assertEqual(getattr(zmq, name), raw)
-
- def test_new(self):
- zmq_version = zmq.zmq_version_info()
- for version, new_names in constant_names.new_in.items():
- should_have = zmq_version >= version
- for name in new_names:
- try:
- value = getattr(zmq, name)
- except AttributeError:
- if should_have:
- self.fail("AttributeError: zmq.%s" % name)
- else:
- if not should_have:
- self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
-
- def test_removed(self):
- zmq_version = zmq.zmq_version_info()
- for version, new_names in constant_names.removed_in.items():
- should_have = zmq_version < version
- for name in new_names:
- try:
- value = getattr(zmq, name)
- except AttributeError:
- if should_have:
- self.fail("AttributeError: zmq.%s" % name)
- else:
- if not should_have:
- self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
-
diff --git a/src/console/zmq/tests/test_context.py b/src/console/zmq/tests/test_context.py
deleted file mode 100755
index e3280778..00000000
--- a/src/console/zmq/tests/test_context.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import gc
-import sys
-import time
-from threading import Thread, Event
-
-import zmq
-from zmq.tests import (
- BaseZMQTestCase, have_gevent, GreenTest, skip_green, PYPY, SkipTest,
-)
-
-
-class TestContext(BaseZMQTestCase):
-
- def test_init(self):
- c1 = self.Context()
- self.assert_(isinstance(c1, self.Context))
- del c1
- c2 = self.Context()
- self.assert_(isinstance(c2, self.Context))
- del c2
- c3 = self.Context()
- self.assert_(isinstance(c3, self.Context))
- del c3
-
- def test_dir(self):
- ctx = self.Context()
- self.assertTrue('socket' in dir(ctx))
- if zmq.zmq_version_info() > (3,):
- self.assertTrue('IO_THREADS' in dir(ctx))
- ctx.term()
-
- def test_term(self):
- c = self.Context()
- c.term()
- self.assert_(c.closed)
-
- def test_context_manager(self):
- with self.Context() as c:
- pass
- self.assert_(c.closed)
-
- def test_fail_init(self):
- self.assertRaisesErrno(zmq.EINVAL, self.Context, -1)
-
- def test_term_hang(self):
- rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
- req.setsockopt(zmq.LINGER, 0)
- req.send(b'hello', copy=False)
- req.close()
- rep.close()
- self.context.term()
-
- def test_instance(self):
- ctx = self.Context.instance()
- c2 = self.Context.instance(io_threads=2)
- self.assertTrue(c2 is ctx)
- c2.term()
- c3 = self.Context.instance()
- c4 = self.Context.instance()
- self.assertFalse(c3 is c2)
- self.assertFalse(c3.closed)
- self.assertTrue(c3 is c4)
-
- def test_many_sockets(self):
- """opening and closing many sockets shouldn't cause problems"""
- ctx = self.Context()
- for i in range(16):
- sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
- [ s.close() for s in sockets ]
- # give the reaper a chance
- time.sleep(1e-2)
- ctx.term()
-
- def test_sockopts(self):
- """setting socket options with ctx attributes"""
- ctx = self.Context()
- ctx.linger = 5
- self.assertEqual(ctx.linger, 5)
- s = ctx.socket(zmq.REQ)
- self.assertEqual(s.linger, 5)
- self.assertEqual(s.getsockopt(zmq.LINGER), 5)
- s.close()
- # check that subscribe doesn't get set on sockets that don't subscribe:
- ctx.subscribe = b''
- s = ctx.socket(zmq.REQ)
- s.close()
-
- ctx.term()
-
-
- def test_destroy(self):
- """Context.destroy should close sockets"""
- ctx = self.Context()
- sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
-
- # close half of the sockets
- [ s.close() for s in sockets[::2] ]
-
- ctx.destroy()
- # reaper is not instantaneous
- time.sleep(1e-2)
- for s in sockets:
- self.assertTrue(s.closed)
-
- def test_destroy_linger(self):
- """Context.destroy should set linger on closing sockets"""
- req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
- req.send(b'hi')
- time.sleep(1e-2)
- self.context.destroy(linger=0)
- # reaper is not instantaneous
- time.sleep(1e-2)
- for s in (req,rep):
- self.assertTrue(s.closed)
-
- def test_term_noclose(self):
- """Context.term won't close sockets"""
- ctx = self.Context()
- s = ctx.socket(zmq.REQ)
- self.assertFalse(s.closed)
- t = Thread(target=ctx.term)
- t.start()
- t.join(timeout=0.1)
- self.assertTrue(t.is_alive(), "Context should be waiting")
- s.close()
- t.join(timeout=0.1)
- self.assertFalse(t.is_alive(), "Context should have closed")
-
- def test_gc(self):
- """test close&term by garbage collection alone"""
- if PYPY:
- raise SkipTest("GC doesn't work ")
-
- # test credit @dln (GH #137):
- def gcf():
- def inner():
- ctx = self.Context()
- s = ctx.socket(zmq.PUSH)
- inner()
- gc.collect()
- t = Thread(target=gcf)
- t.start()
- t.join(timeout=1)
- self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
-
- def test_cyclic_destroy(self):
- """ctx.destroy should succeed when cyclic ref prevents gc"""
- # test credit @dln (GH #137):
- class CyclicReference(object):
- def __init__(self, parent=None):
- self.parent = parent
-
- def crash(self, sock):
- self.sock = sock
- self.child = CyclicReference(self)
-
- def crash_zmq():
- ctx = self.Context()
- sock = ctx.socket(zmq.PULL)
- c = CyclicReference()
- c.crash(sock)
- ctx.destroy()
-
- crash_zmq()
-
- def test_term_thread(self):
- """ctx.term should not crash active threads (#139)"""
- ctx = self.Context()
- evt = Event()
- evt.clear()
-
- def block():
- s = ctx.socket(zmq.REP)
- s.bind_to_random_port('tcp://127.0.0.1')
- evt.set()
- try:
- s.recv()
- except zmq.ZMQError as e:
- self.assertEqual(e.errno, zmq.ETERM)
- return
- finally:
- s.close()
- self.fail("recv should have been interrupted with ETERM")
- t = Thread(target=block)
- t.start()
-
- evt.wait(1)
- self.assertTrue(evt.is_set(), "sync event never fired")
- time.sleep(0.01)
- ctx.term()
- t.join(timeout=1)
- self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
-
- def test_destroy_no_sockets(self):
- ctx = self.Context()
- s = ctx.socket(zmq.PUB)
- s.bind_to_random_port('tcp://127.0.0.1')
- s.close()
- ctx.destroy()
- assert s.closed
- assert ctx.closed
-
- def test_ctx_opts(self):
- if zmq.zmq_version_info() < (3,):
- raise SkipTest("context options require libzmq 3")
- ctx = self.Context()
- ctx.set(zmq.MAX_SOCKETS, 2)
- self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 2)
- ctx.max_sockets = 100
- self.assertEqual(ctx.max_sockets, 100)
- self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 100)
-
- def test_shadow(self):
- ctx = self.Context()
- ctx2 = self.Context.shadow(ctx.underlying)
- self.assertEqual(ctx.underlying, ctx2.underlying)
- s = ctx.socket(zmq.PUB)
- s.close()
- del ctx2
- self.assertFalse(ctx.closed)
- s = ctx.socket(zmq.PUB)
- ctx2 = self.Context.shadow(ctx.underlying)
- s2 = ctx2.socket(zmq.PUB)
- s.close()
- s2.close()
- ctx.term()
- self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
- del ctx2
-
- def test_shadow_pyczmq(self):
- try:
- from pyczmq import zctx, zsocket, zstr
- except Exception:
- raise SkipTest("Requires pyczmq")
-
- ctx = zctx.new()
- a = zsocket.new(ctx, zmq.PUSH)
- zsocket.bind(a, "inproc://a")
- ctx2 = self.Context.shadow_pyczmq(ctx)
- b = ctx2.socket(zmq.PULL)
- b.connect("inproc://a")
- zstr.send(a, b'hi')
- rcvd = self.recv(b)
- self.assertEqual(rcvd, b'hi')
- b.close()
-
-
-if False: # disable green context tests
- class TestContextGreen(GreenTest, TestContext):
- """gevent subclass of context tests"""
- # skip tests that use real threads:
- test_gc = GreenTest.skip_green
- test_term_thread = GreenTest.skip_green
- test_destroy_linger = GreenTest.skip_green
diff --git a/src/console/zmq/tests/test_device.py b/src/console/zmq/tests/test_device.py
deleted file mode 100755
index f8305074..00000000
--- a/src/console/zmq/tests/test_device.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-
-import zmq
-from zmq import devices
-from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest, PYPY
-from zmq.utils.strtypes import (bytes,unicode,basestring)
-
-if PYPY:
- # cleanup of shared Context doesn't work on PyPy
- devices.Device.context_factory = zmq.Context
-
-class TestDevice(BaseZMQTestCase):
-
- def test_device_types(self):
- for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
- dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
- self.assertEqual(dev.device_type, devtype)
- del dev
-
- def test_device_attributes(self):
- dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
- self.assertEqual(dev.in_type, zmq.SUB)
- self.assertEqual(dev.out_type, zmq.PUB)
- self.assertEqual(dev.device_type, zmq.QUEUE)
- self.assertEqual(dev.daemon, True)
- del dev
-
- def test_tsdevice_attributes(self):
- dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
- self.assertEqual(dev.in_type, zmq.SUB)
- self.assertEqual(dev.out_type, zmq.PUB)
- self.assertEqual(dev.device_type, zmq.QUEUE)
- self.assertEqual(dev.daemon, True)
- del dev
-
-
- def test_single_socket_forwarder_connect(self):
- dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = req.bind_to_random_port('tcp://127.0.0.1')
- dev.connect_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = b'hello'
- req.send(msg)
- self.assertEqual(msg, self.recv(req))
- del dev
- req.close()
- dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = req.bind_to_random_port('tcp://127.0.0.1')
- dev.connect_out('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = b'hello again'
- req.send(msg)
- self.assertEqual(msg, self.recv(req))
- del dev
- req.close()
-
- def test_single_socket_forwarder_bind(self):
- dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
- # select random port:
- binder = self.context.socket(zmq.REQ)
- port = binder.bind_to_random_port('tcp://127.0.0.1')
- binder.close()
- time.sleep(0.1)
- req = self.context.socket(zmq.REQ)
- req.connect('tcp://127.0.0.1:%i'%port)
- dev.bind_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = b'hello'
- req.send(msg)
- self.assertEqual(msg, self.recv(req))
- del dev
- req.close()
- dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
- # select random port:
- binder = self.context.socket(zmq.REQ)
- port = binder.bind_to_random_port('tcp://127.0.0.1')
- binder.close()
- time.sleep(0.1)
- req = self.context.socket(zmq.REQ)
- req.connect('tcp://127.0.0.1:%i'%port)
- dev.bind_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = b'hello again'
- req.send(msg)
- self.assertEqual(msg, self.recv(req))
- del dev
- req.close()
-
- def test_proxy(self):
- if zmq.zmq_version_info() < (3,2):
- raise SkipTest("Proxies only in libzmq >= 3")
- dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
- binder = self.context.socket(zmq.REQ)
- iface = 'tcp://127.0.0.1'
- port = binder.bind_to_random_port(iface)
- port2 = binder.bind_to_random_port(iface)
- port3 = binder.bind_to_random_port(iface)
- binder.close()
- time.sleep(0.1)
- dev.bind_in("%s:%i" % (iface, port))
- dev.bind_out("%s:%i" % (iface, port2))
- dev.bind_mon("%s:%i" % (iface, port3))
- dev.start()
- time.sleep(0.25)
- msg = b'hello'
- push = self.context.socket(zmq.PUSH)
- push.connect("%s:%i" % (iface, port))
- pull = self.context.socket(zmq.PULL)
- pull.connect("%s:%i" % (iface, port2))
- mon = self.context.socket(zmq.PULL)
- mon.connect("%s:%i" % (iface, port3))
- push.send(msg)
- self.sockets.extend([push, pull, mon])
- self.assertEqual(msg, self.recv(pull))
- self.assertEqual(msg, self.recv(mon))
-
-if have_gevent:
- import gevent
- import zmq.green
-
- class TestDeviceGreen(GreenTest, BaseZMQTestCase):
-
- def test_green_device(self):
- rep = self.context.socket(zmq.REP)
- req = self.context.socket(zmq.REQ)
- self.sockets.extend([req, rep])
- port = rep.bind_to_random_port('tcp://127.0.0.1')
- g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
- req.connect('tcp://127.0.0.1:%i' % port)
- req.send(b'hi')
- timeout = gevent.Timeout(3)
- timeout.start()
- receiver = gevent.spawn(req.recv)
- self.assertEqual(receiver.get(2), b'hi')
- timeout.cancel()
- g.kill(block=True)
-
diff --git a/src/console/zmq/tests/test_error.py b/src/console/zmq/tests/test_error.py
deleted file mode 100755
index a2eee14a..00000000
--- a/src/console/zmq/tests/test_error.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# -*- coding: utf8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import sys
-import time
-
-import zmq
-from zmq import ZMQError, strerror, Again, ContextTerminated
-from zmq.tests import BaseZMQTestCase
-
-if sys.version_info[0] >= 3:
- long = int
-
-class TestZMQError(BaseZMQTestCase):
-
- def test_strerror(self):
- """test that strerror gets the right type."""
- for i in range(10):
- e = strerror(i)
- self.assertTrue(isinstance(e, str))
-
- def test_zmqerror(self):
- for errno in range(10):
- e = ZMQError(errno)
- self.assertEqual(e.errno, errno)
- self.assertEqual(str(e), strerror(errno))
-
- def test_again(self):
- s = self.context.socket(zmq.REP)
- self.assertRaises(Again, s.recv, zmq.NOBLOCK)
- self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
- s.close()
-
- def atest_ctxterm(self):
- s = self.context.socket(zmq.REP)
- t = Thread(target=self.context.term)
- t.start()
- self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK)
- self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK)
- s.close()
- t.join()
-
diff --git a/src/console/zmq/tests/test_etc.py b/src/console/zmq/tests/test_etc.py
deleted file mode 100755
index ad224064..00000000
--- a/src/console/zmq/tests/test_etc.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright (c) PyZMQ Developers.
-# Distributed under the terms of the Modified BSD License.
-
-import sys
-
-import zmq
-
-from . import skip_if
-
-@skip_if(zmq.zmq_version_info() < (4,1), "libzmq < 4.1")
-def test_has():
- assert not zmq.has('something weird')
- has_ipc = zmq.has('ipc')
- not_windows = not sys.platform.startswith('win')
- assert has_ipc == not_windows
diff --git a/src/console/zmq/tests/test_imports.py b/src/console/zmq/tests/test_imports.py
deleted file mode 100755
index c0ddfaac..00000000
--- a/src/console/zmq/tests/test_imports.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import sys
-from unittest import TestCase
-
-class TestImports(TestCase):
- """Test Imports - the quickest test to ensure that we haven't
- introduced version-incompatible syntax errors."""
-
- def test_toplevel(self):
- """test toplevel import"""
- import zmq
-
- def test_core(self):
- """test core imports"""
- from zmq import Context
- from zmq import Socket
- from zmq import Poller
- from zmq import Frame
- from zmq import constants
- from zmq import device, proxy
- from zmq import Stopwatch
- from zmq import (
- zmq_version,
- zmq_version_info,
- pyzmq_version,
- pyzmq_version_info,
- )
-
- def test_devices(self):
- """test device imports"""
- import zmq.devices
- from zmq.devices import basedevice
- from zmq.devices import monitoredqueue
- from zmq.devices import monitoredqueuedevice
-
- def test_log(self):
- """test log imports"""
- import zmq.log
- from zmq.log import handlers
-
- def test_eventloop(self):
- """test eventloop imports"""
- import zmq.eventloop
- from zmq.eventloop import ioloop
- from zmq.eventloop import zmqstream
- from zmq.eventloop.minitornado.platform import auto
- from zmq.eventloop.minitornado import ioloop
-
- def test_utils(self):
- """test util imports"""
- import zmq.utils
- from zmq.utils import strtypes
- from zmq.utils import jsonapi
-
- def test_ssh(self):
- """test ssh imports"""
- from zmq.ssh import tunnel
-
-
-
diff --git a/src/console/zmq/tests/test_ioloop.py b/src/console/zmq/tests/test_ioloop.py
deleted file mode 100755
index 2a8b1153..00000000
--- a/src/console/zmq/tests/test_ioloop.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-import os
-import threading
-
-import zmq
-from zmq.tests import BaseZMQTestCase
-from zmq.eventloop import ioloop
-from zmq.eventloop.minitornado.ioloop import _Timeout
-try:
- from tornado.ioloop import PollIOLoop, IOLoop as BaseIOLoop
-except ImportError:
- from zmq.eventloop.minitornado.ioloop import IOLoop as BaseIOLoop
-
-
-def printer():
- os.system("say hello")
- raise Exception
- print (time.time())
-
-
-class Delay(threading.Thread):
- def __init__(self, f, delay=1):
- self.f=f
- self.delay=delay
- self.aborted=False
- self.cond=threading.Condition()
- super(Delay, self).__init__()
-
- def run(self):
- self.cond.acquire()
- self.cond.wait(self.delay)
- self.cond.release()
- if not self.aborted:
- self.f()
-
- def abort(self):
- self.aborted=True
- self.cond.acquire()
- self.cond.notify()
- self.cond.release()
-
-
-class TestIOLoop(BaseZMQTestCase):
-
- def test_simple(self):
- """simple IOLoop creation test"""
- loop = ioloop.IOLoop()
- dc = ioloop.PeriodicCallback(loop.stop, 200, loop)
- pc = ioloop.PeriodicCallback(lambda : None, 10, loop)
- pc.start()
- dc.start()
- t = Delay(loop.stop,1)
- t.start()
- loop.start()
- if t.isAlive():
- t.abort()
- else:
- self.fail("IOLoop failed to exit")
-
- def test_timeout_compare(self):
- """test timeout comparisons"""
- loop = ioloop.IOLoop()
- t = _Timeout(1, 2, loop)
- t2 = _Timeout(1, 3, loop)
- self.assertEqual(t < t2, id(t) < id(t2))
- t2 = _Timeout(2,1, loop)
- self.assertTrue(t < t2)
-
- def test_poller_events(self):
- """Tornado poller implementation maps events correctly"""
- req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
- poller = ioloop.ZMQPoller()
- poller.register(req, ioloop.IOLoop.READ)
- poller.register(rep, ioloop.IOLoop.READ)
- events = dict(poller.poll(0))
- self.assertEqual(events.get(rep), None)
- self.assertEqual(events.get(req), None)
-
- poller.register(req, ioloop.IOLoop.WRITE)
- poller.register(rep, ioloop.IOLoop.WRITE)
- events = dict(poller.poll(1))
- self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
- self.assertEqual(events.get(rep), None)
-
- poller.register(rep, ioloop.IOLoop.READ)
- req.send(b'hi')
- events = dict(poller.poll(1))
- self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
- self.assertEqual(events.get(req), None)
-
- def test_instance(self):
- """Test IOLoop.instance returns the right object"""
- loop = ioloop.IOLoop.instance()
- self.assertEqual(loop.__class__, ioloop.IOLoop)
- loop = BaseIOLoop.instance()
- self.assertEqual(loop.__class__, ioloop.IOLoop)
-
- def test_close_all(self):
- """Test close(all_fds=True)"""
- loop = ioloop.IOLoop.instance()
- req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
- loop.add_handler(req, lambda msg: msg, ioloop.IOLoop.READ)
- loop.add_handler(rep, lambda msg: msg, ioloop.IOLoop.READ)
- self.assertEqual(req.closed, False)
- self.assertEqual(rep.closed, False)
- loop.close(all_fds=True)
- self.assertEqual(req.closed, True)
- self.assertEqual(rep.closed, True)
-
-
diff --git a/src/console/zmq/tests/test_log.py b/src/console/zmq/tests/test_log.py
deleted file mode 100755
index 9206f095..00000000
--- a/src/console/zmq/tests/test_log.py
+++ /dev/null
@@ -1,116 +0,0 @@
-# encoding: utf-8
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import logging
-import time
-from unittest import TestCase
-
-import zmq
-from zmq.log import handlers
-from zmq.utils.strtypes import b, u
-from zmq.tests import BaseZMQTestCase
-
-
-class TestPubLog(BaseZMQTestCase):
-
- iface = 'inproc://zmqlog'
- topic= 'zmq'
-
- @property
- def logger(self):
- # print dir(self)
- logger = logging.getLogger('zmqtest')
- logger.setLevel(logging.DEBUG)
- return logger
-
- def connect_handler(self, topic=None):
- topic = self.topic if topic is None else topic
- logger = self.logger
- pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
- handler = handlers.PUBHandler(pub)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = topic
- logger.addHandler(handler)
- sub.setsockopt(zmq.SUBSCRIBE, b(topic))
- time.sleep(0.1)
- return logger, handler, sub
-
- def test_init_iface(self):
- logger = self.logger
- ctx = self.context
- handler = handlers.PUBHandler(self.iface)
- self.assertFalse(handler.ctx is ctx)
- self.sockets.append(handler.socket)
- # handler.ctx.term()
- handler = handlers.PUBHandler(self.iface, self.context)
- self.sockets.append(handler.socket)
- self.assertTrue(handler.ctx is ctx)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = self.topic
- logger.addHandler(handler)
- sub = ctx.socket(zmq.SUB)
- self.sockets.append(sub)
- sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
- sub.connect(self.iface)
- import time; time.sleep(0.25)
- msg1 = 'message'
- logger.info(msg1)
-
- (topic, msg2) = sub.recv_multipart()
- self.assertEqual(topic, b'zmq.INFO')
- self.assertEqual(msg2, b(msg1)+b'\n')
- logger.removeHandler(handler)
-
- def test_init_socket(self):
- pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
- logger = self.logger
- handler = handlers.PUBHandler(pub)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = self.topic
- logger.addHandler(handler)
-
- self.assertTrue(handler.socket is pub)
- self.assertTrue(handler.ctx is pub.context)
- self.assertTrue(handler.ctx is self.context)
- sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
- import time; time.sleep(0.1)
- msg1 = 'message'
- logger.info(msg1)
-
- (topic, msg2) = sub.recv_multipart()
- self.assertEqual(topic, b'zmq.INFO')
- self.assertEqual(msg2, b(msg1)+b'\n')
- logger.removeHandler(handler)
-
- def test_root_topic(self):
- logger, handler, sub = self.connect_handler()
- handler.socket.bind(self.iface)
- sub2 = sub.context.socket(zmq.SUB)
- self.sockets.append(sub2)
- sub2.connect(self.iface)
- sub2.setsockopt(zmq.SUBSCRIBE, b'')
- handler.root_topic = b'twoonly'
- msg1 = 'ignored'
- logger.info(msg1)
- self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
- topic,msg2 = sub2.recv_multipart()
- self.assertEqual(topic, b'twoonly.INFO')
- self.assertEqual(msg2, b(msg1)+b'\n')
-
- logger.removeHandler(handler)
-
- def test_unicode_message(self):
- logger, handler, sub = self.connect_handler()
- base_topic = b(self.topic + '.INFO')
- for msg, expected in [
- (u('hello'), [base_topic, b('hello\n')]),
- (u('héllo'), [base_topic, b('héllo\n')]),
- (u('tøpic::héllo'), [base_topic + b('.tøpic'), b('héllo\n')]),
- ]:
- logger.info(msg)
- received = sub.recv_multipart()
- self.assertEqual(received, expected)
-
diff --git a/src/console/zmq/tests/test_message.py b/src/console/zmq/tests/test_message.py
deleted file mode 100755
index d8770bdf..00000000
--- a/src/console/zmq/tests/test_message.py
+++ /dev/null
@@ -1,362 +0,0 @@
-# -*- coding: utf8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import copy
-import sys
-try:
- from sys import getrefcount as grc
-except ImportError:
- grc = None
-
-import time
-from pprint import pprint
-from unittest import TestCase
-
-import zmq
-from zmq.tests import BaseZMQTestCase, SkipTest, skip_pypy, PYPY
-from zmq.utils.strtypes import unicode, bytes, b, u
-
-
-# some useful constants:
-
-x = b'x'
-
-try:
- view = memoryview
-except NameError:
- view = buffer
-
-if grc:
- rc0 = grc(x)
- v = view(x)
- view_rc = grc(x) - rc0
-
-def await_gc(obj, rc):
- """wait for refcount on an object to drop to an expected value
-
- Necessary because of the zero-copy gc thread,
- which can take some time to receive its DECREF message.
- """
- for i in range(50):
- # rc + 2 because of the refs in this function
- if grc(obj) <= rc + 2:
- return
- time.sleep(0.05)
-
-class TestFrame(BaseZMQTestCase):
-
- @skip_pypy
- def test_above_30(self):
- """Message above 30 bytes are never copied by 0MQ."""
- for i in range(5, 16): # 32, 64,..., 65536
- s = (2**i)*x
- self.assertEqual(grc(s), 2)
- m = zmq.Frame(s)
- self.assertEqual(grc(s), 4)
- del m
- await_gc(s, 2)
- self.assertEqual(grc(s), 2)
- del s
-
- def test_str(self):
- """Test the str representations of the Frames."""
- for i in range(16):
- s = (2**i)*x
- m = zmq.Frame(s)
- m_str = str(m)
- m_str_b = b(m_str) # py3compat
- self.assertEqual(s, m_str_b)
-
- def test_bytes(self):
- """Test the Frame.bytes property."""
- for i in range(1,16):
- s = (2**i)*x
- m = zmq.Frame(s)
- b = m.bytes
- self.assertEqual(s, m.bytes)
- if not PYPY:
- # check that it copies
- self.assert_(b is not s)
- # check that it copies only once
- self.assert_(b is m.bytes)
-
- def test_unicode(self):
- """Test the unicode representations of the Frames."""
- s = u('asdf')
- self.assertRaises(TypeError, zmq.Frame, s)
- for i in range(16):
- s = (2**i)*u('§')
- m = zmq.Frame(s.encode('utf8'))
- self.assertEqual(s, unicode(m.bytes,'utf8'))
-
- def test_len(self):
- """Test the len of the Frames."""
- for i in range(16):
- s = (2**i)*x
- m = zmq.Frame(s)
- self.assertEqual(len(s), len(m))
-
- @skip_pypy
- def test_lifecycle1(self):
- """Run through a ref counting cycle with a copy."""
- for i in range(5, 16): # 32, 64,..., 65536
- s = (2**i)*x
- rc = 2
- self.assertEqual(grc(s), rc)
- m = zmq.Frame(s)
- rc += 2
- self.assertEqual(grc(s), rc)
- m2 = copy.copy(m)
- rc += 1
- self.assertEqual(grc(s), rc)
- buf = m2.buffer
-
- rc += view_rc
- self.assertEqual(grc(s), rc)
-
- self.assertEqual(s, b(str(m)))
- self.assertEqual(s, bytes(m2))
- self.assertEqual(s, m.bytes)
- # self.assert_(s is str(m))
- # self.assert_(s is str(m2))
- del m2
- rc -= 1
- self.assertEqual(grc(s), rc)
- rc -= view_rc
- del buf
- self.assertEqual(grc(s), rc)
- del m
- rc -= 2
- await_gc(s, rc)
- self.assertEqual(grc(s), rc)
- self.assertEqual(rc, 2)
- del s
-
- @skip_pypy
- def test_lifecycle2(self):
- """Run through a different ref counting cycle with a copy."""
- for i in range(5, 16): # 32, 64,..., 65536
- s = (2**i)*x
- rc = 2
- self.assertEqual(grc(s), rc)
- m = zmq.Frame(s)
- rc += 2
- self.assertEqual(grc(s), rc)
- m2 = copy.copy(m)
- rc += 1
- self.assertEqual(grc(s), rc)
- buf = m.buffer
- rc += view_rc
- self.assertEqual(grc(s), rc)
- self.assertEqual(s, b(str(m)))
- self.assertEqual(s, bytes(m2))
- self.assertEqual(s, m2.bytes)
- self.assertEqual(s, m.bytes)
- # self.assert_(s is str(m))
- # self.assert_(s is str(m2))
- del buf
- self.assertEqual(grc(s), rc)
- del m
- # m.buffer is kept until m is del'd
- rc -= view_rc
- rc -= 1
- self.assertEqual(grc(s), rc)
- del m2
- rc -= 2
- await_gc(s, rc)
- self.assertEqual(grc(s), rc)
- self.assertEqual(rc, 2)
- del s
-
- @skip_pypy
- def test_tracker(self):
- m = zmq.Frame(b'asdf', track=True)
- self.assertFalse(m.tracker.done)
- pm = zmq.MessageTracker(m)
- self.assertFalse(pm.done)
- del m
- for i in range(10):
- if pm.done:
- break
- time.sleep(0.1)
- self.assertTrue(pm.done)
-
- def test_no_tracker(self):
- m = zmq.Frame(b'asdf', track=False)
- self.assertEqual(m.tracker, None)
- m2 = copy.copy(m)
- self.assertEqual(m2.tracker, None)
- self.assertRaises(ValueError, zmq.MessageTracker, m)
-
- @skip_pypy
- def test_multi_tracker(self):
- m = zmq.Frame(b'asdf', track=True)
- m2 = zmq.Frame(b'whoda', track=True)
- mt = zmq.MessageTracker(m,m2)
- self.assertFalse(m.tracker.done)
- self.assertFalse(mt.done)
- self.assertRaises(zmq.NotDone, mt.wait, 0.1)
- del m
- time.sleep(0.1)
- self.assertRaises(zmq.NotDone, mt.wait, 0.1)
- self.assertFalse(mt.done)
- del m2
- self.assertTrue(mt.wait() is None)
- self.assertTrue(mt.done)
-
-
- def test_buffer_in(self):
- """test using a buffer as input"""
- ins = b("§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√")
- m = zmq.Frame(view(ins))
-
- def test_bad_buffer_in(self):
- """test using a bad object"""
- self.assertRaises(TypeError, zmq.Frame, 5)
- self.assertRaises(TypeError, zmq.Frame, object())
-
- def test_buffer_out(self):
- """receiving buffered output"""
- ins = b("§§¶•ªº˜µ¬˚…∆˙åß∂©œ∑´†≈ç√")
- m = zmq.Frame(ins)
- outb = m.buffer
- self.assertTrue(isinstance(outb, view))
- self.assert_(outb is m.buffer)
- self.assert_(m.buffer is m.buffer)
-
- def test_multisend(self):
- """ensure that a message remains intact after multiple sends"""
- a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- s = b"message"
- m = zmq.Frame(s)
- self.assertEqual(s, m.bytes)
-
- a.send(m, copy=False)
- time.sleep(0.1)
- self.assertEqual(s, m.bytes)
- a.send(m, copy=False)
- time.sleep(0.1)
- self.assertEqual(s, m.bytes)
- a.send(m, copy=True)
- time.sleep(0.1)
- self.assertEqual(s, m.bytes)
- a.send(m, copy=True)
- time.sleep(0.1)
- self.assertEqual(s, m.bytes)
- for i in range(4):
- r = b.recv()
- self.assertEqual(s,r)
- self.assertEqual(s, m.bytes)
-
- def test_buffer_numpy(self):
- """test non-copying numpy array messages"""
- try:
- import numpy
- except ImportError:
- raise SkipTest("numpy required")
- rand = numpy.random.randint
- shapes = [ rand(2,16) for i in range(5) ]
- for i in range(1,len(shapes)+1):
- shape = shapes[:i]
- A = numpy.random.random(shape)
- m = zmq.Frame(A)
- if view.__name__ == 'buffer':
- self.assertEqual(A.data, m.buffer)
- B = numpy.frombuffer(m.buffer,dtype=A.dtype).reshape(A.shape)
- else:
- self.assertEqual(memoryview(A), m.buffer)
- B = numpy.array(m.buffer,dtype=A.dtype).reshape(A.shape)
- self.assertEqual((A==B).all(), True)
-
- def test_memoryview(self):
- """test messages from memoryview"""
- major,minor = sys.version_info[:2]
- if not (major >= 3 or (major == 2 and minor >= 7)):
- raise SkipTest("memoryviews only in python >= 2.7")
-
- s = b'carrotjuice'
- v = memoryview(s)
- m = zmq.Frame(s)
- buf = m.buffer
- s2 = buf.tobytes()
- self.assertEqual(s2,s)
- self.assertEqual(m.bytes,s)
-
- def test_noncopying_recv(self):
- """check for clobbering message buffers"""
- null = b'\0'*64
- sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- for i in range(32):
- # try a few times
- sb.send(null, copy=False)
- m = sa.recv(copy=False)
- mb = m.bytes
- # buf = view(m)
- buf = m.buffer
- del m
- for i in range(5):
- ff=b'\xff'*(40 + i*10)
- sb.send(ff, copy=False)
- m2 = sa.recv(copy=False)
- if view.__name__ == 'buffer':
- b = bytes(buf)
- else:
- b = buf.tobytes()
- self.assertEqual(b, null)
- self.assertEqual(mb, null)
- self.assertEqual(m2.bytes, ff)
-
- @skip_pypy
- def test_buffer_numpy(self):
- """test non-copying numpy array messages"""
- try:
- import numpy
- except ImportError:
- raise SkipTest("requires numpy")
- if sys.version_info < (2,7):
- raise SkipTest("requires new-style buffer interface (py >= 2.7)")
- rand = numpy.random.randint
- shapes = [ rand(2,5) for i in range(5) ]
- a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- dtypes = [int, float, '>i4', 'B']
- for i in range(1,len(shapes)+1):
- shape = shapes[:i]
- for dt in dtypes:
- A = numpy.empty(shape, dtype=dt)
- while numpy.isnan(A).any():
- # don't let nan sneak in
- A = numpy.ndarray(shape, dtype=dt)
- a.send(A, copy=False)
- msg = b.recv(copy=False)
-
- B = numpy.frombuffer(msg, A.dtype).reshape(A.shape)
- self.assertEqual(A.shape, B.shape)
- self.assertTrue((A==B).all())
- A = numpy.empty(shape, dtype=[('a', int), ('b', float), ('c', 'a32')])
- A['a'] = 1024
- A['b'] = 1e9
- A['c'] = 'hello there'
- a.send(A, copy=False)
- msg = b.recv(copy=False)
-
- B = numpy.frombuffer(msg, A.dtype).reshape(A.shape)
- self.assertEqual(A.shape, B.shape)
- self.assertTrue((A==B).all())
-
- def test_frame_more(self):
- """test Frame.more attribute"""
- frame = zmq.Frame(b"hello")
- self.assertFalse(frame.more)
- sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- sa.send_multipart([b'hi', b'there'])
- frame = self.recv(sb, copy=False)
- self.assertTrue(frame.more)
- if zmq.zmq_version_info()[0] >= 3 and not PYPY:
- self.assertTrue(frame.get(zmq.MORE))
- frame = self.recv(sb, copy=False)
- self.assertFalse(frame.more)
- if zmq.zmq_version_info()[0] >= 3 and not PYPY:
- self.assertFalse(frame.get(zmq.MORE))
-
diff --git a/src/console/zmq/tests/test_monitor.py b/src/console/zmq/tests/test_monitor.py
deleted file mode 100755
index 4f035388..00000000
--- a/src/console/zmq/tests/test_monitor.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import sys
-import time
-import struct
-
-from unittest import TestCase
-
-import zmq
-from zmq.tests import BaseZMQTestCase, skip_if, skip_pypy
-from zmq.utils.monitor import recv_monitor_message
-
-skip_lt_4 = skip_if(zmq.zmq_version_info() < (4,), "requires zmq >= 4")
-
-class TestSocketMonitor(BaseZMQTestCase):
-
- @skip_lt_4
- def test_monitor(self):
- """Test monitoring interface for sockets."""
- s_rep = self.context.socket(zmq.REP)
- s_req = self.context.socket(zmq.REQ)
- self.sockets.extend([s_rep, s_req])
- s_req.bind("tcp://127.0.0.1:6666")
- # try monitoring the REP socket
-
- s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
- # create listening socket for monitor
- s_event = self.context.socket(zmq.PAIR)
- self.sockets.append(s_event)
- s_event.connect("inproc://monitor.rep")
- s_event.linger = 0
- # test receive event for connect event
- s_rep.connect("tcp://127.0.0.1:6666")
- m = recv_monitor_message(s_event)
- if m['event'] == zmq.EVENT_CONNECT_DELAYED:
- self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
- # test receive event for connected event
- m = recv_monitor_message(s_event)
- self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
- self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
-
- # test monitor can be disabled.
- s_rep.disable_monitor()
- m = recv_monitor_message(s_event)
- self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
-
-
- @skip_lt_4
- def test_monitor_connected(self):
- """Test connected monitoring socket."""
- s_rep = self.context.socket(zmq.REP)
- s_req = self.context.socket(zmq.REQ)
- self.sockets.extend([s_rep, s_req])
- s_req.bind("tcp://127.0.0.1:6667")
- # try monitoring the REP socket
- # create listening socket for monitor
- s_event = s_rep.get_monitor_socket()
- s_event.linger = 0
- self.sockets.append(s_event)
- # test receive event for connect event
- s_rep.connect("tcp://127.0.0.1:6667")
- m = recv_monitor_message(s_event)
- if m['event'] == zmq.EVENT_CONNECT_DELAYED:
- self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
- # test receive event for connected event
- m = recv_monitor_message(s_event)
- self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
- self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
diff --git a/src/console/zmq/tests/test_monqueue.py b/src/console/zmq/tests/test_monqueue.py
deleted file mode 100755
index e855602e..00000000
--- a/src/console/zmq/tests/test_monqueue.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-from unittest import TestCase
-
-import zmq
-from zmq import devices
-
-from zmq.tests import BaseZMQTestCase, SkipTest, PYPY
-from zmq.utils.strtypes import unicode
-
-
-if PYPY or zmq.zmq_version_info() >= (4,1):
- # cleanup of shared Context doesn't work on PyPy
- # there also seems to be a bug in cleanup in libzmq-4.1 (zeromq/libzmq#1052)
- devices.Device.context_factory = zmq.Context
-
-
-class TestMonitoredQueue(BaseZMQTestCase):
-
- sockets = []
-
- def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
- self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
- in_prefix, out_prefix)
- alice = self.context.socket(zmq.PAIR)
- bob = self.context.socket(zmq.PAIR)
- mon = self.context.socket(zmq.SUB)
-
- aport = alice.bind_to_random_port('tcp://127.0.0.1')
- bport = bob.bind_to_random_port('tcp://127.0.0.1')
- mport = mon.bind_to_random_port('tcp://127.0.0.1')
- mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
-
- self.device.connect_in("tcp://127.0.0.1:%i"%aport)
- self.device.connect_out("tcp://127.0.0.1:%i"%bport)
- self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
- self.device.start()
- time.sleep(.2)
- try:
- # this is currenlty necessary to ensure no dropped monitor messages
- # see LIBZMQ-248 for more info
- mon.recv_multipart(zmq.NOBLOCK)
- except zmq.ZMQError:
- pass
- self.sockets.extend([alice, bob, mon])
- return alice, bob, mon
-
-
- def teardown_device(self):
- for socket in self.sockets:
- socket.close()
- del socket
- del self.device
-
- def test_reply(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- self.teardown_device()
-
- def test_queue(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- self.teardown_device()
-
- def test_monitor(self):
- alice, bob, mon = self.build_device()
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+bobs, mons)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+alices2, mons)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'in']+alices3, mons)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'out']+bobs, mons)
- self.teardown_device()
-
- def test_prefix(self):
- alice, bob, mon = self.build_device(b"", b'foo', b'bar')
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+bobs, mons)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+alices2, mons)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'foo']+alices3, mons)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'bar']+bobs, mons)
- self.teardown_device()
-
- def test_monitor_subscribe(self):
- alice, bob, mon = self.build_device(b"out")
- alices = b"hello bob".split()
- alice.send_multipart(alices)
- alices2 = b"hello again".split()
- alice.send_multipart(alices2)
- alices3 = b"hello again and again".split()
- alice.send_multipart(alices3)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices2, bobs)
- bobs = self.recv_multipart(bob)
- self.assertEqual(alices3, bobs)
- bobs = b"hello alice".split()
- bob.send_multipart(bobs)
- alices = self.recv_multipart(alice)
- self.assertEqual(alices, bobs)
- mons = self.recv_multipart(mon)
- self.assertEqual([b'out']+bobs, mons)
- self.teardown_device()
-
- def test_router_router(self):
- """test router-router MQ devices"""
- dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
- self.device = dev
- dev.setsockopt_in(zmq.LINGER, 0)
- dev.setsockopt_out(zmq.LINGER, 0)
- dev.setsockopt_mon(zmq.LINGER, 0)
-
- binder = self.context.socket(zmq.DEALER)
- porta = binder.bind_to_random_port('tcp://127.0.0.1')
- portb = binder.bind_to_random_port('tcp://127.0.0.1')
- binder.close()
- time.sleep(0.1)
- a = self.context.socket(zmq.DEALER)
- a.identity = b'a'
- b = self.context.socket(zmq.DEALER)
- b.identity = b'b'
- self.sockets.extend([a, b])
-
- a.connect('tcp://127.0.0.1:%i'%porta)
- dev.bind_in('tcp://127.0.0.1:%i'%porta)
- b.connect('tcp://127.0.0.1:%i'%portb)
- dev.bind_out('tcp://127.0.0.1:%i'%portb)
- dev.start()
- time.sleep(0.2)
- if zmq.zmq_version_info() >= (3,1,0):
- # flush erroneous poll state, due to LIBZMQ-280
- ping_msg = [ b'ping', b'pong' ]
- for s in (a,b):
- s.send_multipart(ping_msg)
- try:
- s.recv(zmq.NOBLOCK)
- except zmq.ZMQError:
- pass
- msg = [ b'hello', b'there' ]
- a.send_multipart([b'b']+msg)
- bmsg = self.recv_multipart(b)
- self.assertEqual(bmsg, [b'a']+msg)
- b.send_multipart(bmsg)
- amsg = self.recv_multipart(a)
- self.assertEqual(amsg, [b'b']+msg)
- self.teardown_device()
-
- def test_default_mq_args(self):
- self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
- dev.setsockopt_in(zmq.LINGER, 0)
- dev.setsockopt_out(zmq.LINGER, 0)
- dev.setsockopt_mon(zmq.LINGER, 0)
- # this will raise if default args are wrong
- dev.start()
- self.teardown_device()
-
- def test_mq_check_prefix(self):
- ins = self.context.socket(zmq.ROUTER)
- outs = self.context.socket(zmq.DEALER)
- mons = self.context.socket(zmq.PUB)
- self.sockets.extend([ins, outs, mons])
-
- ins = unicode('in')
- outs = unicode('out')
- self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
diff --git a/src/console/zmq/tests/test_multipart.py b/src/console/zmq/tests/test_multipart.py
deleted file mode 100755
index 24d41be0..00000000
--- a/src/console/zmq/tests/test_multipart.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import zmq
-
-
-from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
-
-
-class TestMultipart(BaseZMQTestCase):
-
- def test_router_dealer(self):
- router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
-
- msg1 = b'message1'
- dealer.send(msg1)
- ident = self.recv(router)
- more = router.rcvmore
- self.assertEqual(more, True)
- msg2 = self.recv(router)
- self.assertEqual(msg1, msg2)
- more = router.rcvmore
- self.assertEqual(more, False)
-
- def test_basic_multipart(self):
- a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- msg = [ b'hi', b'there', b'b']
- a.send_multipart(msg)
- recvd = b.recv_multipart()
- self.assertEqual(msg, recvd)
-
-if have_gevent:
- class TestMultipartGreen(GreenTest, TestMultipart):
- pass
diff --git a/src/console/zmq/tests/test_pair.py b/src/console/zmq/tests/test_pair.py
deleted file mode 100755
index e88c1e8b..00000000
--- a/src/console/zmq/tests/test_pair.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import zmq
-
-
-from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
-
-
-x = b' '
-class TestPair(BaseZMQTestCase):
-
- def test_basic(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
-
- msg1 = b'message1'
- msg2 = self.ping_pong(s1, s2, msg1)
- self.assertEqual(msg1, msg2)
-
- def test_multiple(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
-
- for i in range(10):
- msg = i*x
- s1.send(msg)
-
- for i in range(10):
- msg = i*x
- s2.send(msg)
-
- for i in range(10):
- msg = s1.recv()
- self.assertEqual(msg, i*x)
-
- for i in range(10):
- msg = s2.recv()
- self.assertEqual(msg, i*x)
-
- def test_json(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- o = dict(a=10,b=list(range(10)))
- o2 = self.ping_pong_json(s1, s2, o)
-
- def test_pyobj(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- o = dict(a=10,b=range(10))
- o2 = self.ping_pong_pyobj(s1, s2, o)
-
-if have_gevent:
- class TestReqRepGreen(GreenTest, TestPair):
- pass
-
diff --git a/src/console/zmq/tests/test_poll.py b/src/console/zmq/tests/test_poll.py
deleted file mode 100755
index 57346c89..00000000
--- a/src/console/zmq/tests/test_poll.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import time
-from unittest import TestCase
-
-import zmq
-
-from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
-
-def wait():
- time.sleep(.25)
-
-
-class TestPoll(PollZMQTestCase):
-
- Poller = zmq.Poller
-
- # This test is failing due to this issue:
- # http://github.com/sustrik/zeromq2/issues#issue/26
- def test_pair(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
-
- # Sleep to allow sockets to connect.
- wait()
-
- poller = self.Poller()
- poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
- poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
- # Poll result should contain both sockets
- socks = dict(poller.poll())
- # Now make sure that both are send ready.
- self.assertEqual(socks[s1], zmq.POLLOUT)
- self.assertEqual(socks[s2], zmq.POLLOUT)
- # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
- s1.send(b'msg1')
- s2.send(b'msg2')
- wait()
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
- self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
- # Make sure that both are in POLLOUT after recv.
- s1.recv()
- s2.recv()
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLOUT)
- self.assertEqual(socks[s2], zmq.POLLOUT)
-
- poller.unregister(s1)
- poller.unregister(s2)
-
- # Wait for everything to finish.
- wait()
-
- def test_reqrep(self):
- s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ)
-
- # Sleep to allow sockets to connect.
- wait()
-
- poller = self.Poller()
- poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
- poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
-
- # Make sure that s1 is in state 0 and s2 is in POLLOUT
- socks = dict(poller.poll())
- self.assertEqual(s1 in socks, 0)
- self.assertEqual(socks[s2], zmq.POLLOUT)
-
- # Make sure that s2 goes immediately into state 0 after send.
- s2.send(b'msg1')
- socks = dict(poller.poll())
- self.assertEqual(s2 in socks, 0)
-
- # Make sure that s1 goes into POLLIN state after a time.sleep().
- time.sleep(0.5)
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLIN)
-
- # Make sure that s1 goes into POLLOUT after recv.
- s1.recv()
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLOUT)
-
- # Make sure s1 goes into state 0 after send.
- s1.send(b'msg2')
- socks = dict(poller.poll())
- self.assertEqual(s1 in socks, 0)
-
- # Wait and then see that s2 is in POLLIN.
- time.sleep(0.5)
- socks = dict(poller.poll())
- self.assertEqual(socks[s2], zmq.POLLIN)
-
- # Make sure that s2 is in POLLOUT after recv.
- s2.recv()
- socks = dict(poller.poll())
- self.assertEqual(socks[s2], zmq.POLLOUT)
-
- poller.unregister(s1)
- poller.unregister(s2)
-
- # Wait for everything to finish.
- wait()
-
- def test_no_events(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- poller = self.Poller()
- poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
- poller.register(s2, 0)
- self.assertTrue(s1 in poller)
- self.assertFalse(s2 in poller)
- poller.register(s1, 0)
- self.assertFalse(s1 in poller)
-
- def test_pubsub(self):
- s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
- s2.setsockopt(zmq.SUBSCRIBE, b'')
-
- # Sleep to allow sockets to connect.
- wait()
-
- poller = self.Poller()
- poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
- poller.register(s2, zmq.POLLIN)
-
- # Now make sure that both are send ready.
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLOUT)
- self.assertEqual(s2 in socks, 0)
- # Make sure that s1 stays in POLLOUT after a send.
- s1.send(b'msg1')
- socks = dict(poller.poll())
- self.assertEqual(socks[s1], zmq.POLLOUT)
-
- # Make sure that s2 is POLLIN after waiting.
- wait()
- socks = dict(poller.poll())
- self.assertEqual(socks[s2], zmq.POLLIN)
-
- # Make sure that s2 goes into 0 after recv.
- s2.recv()
- socks = dict(poller.poll())
- self.assertEqual(s2 in socks, 0)
-
- poller.unregister(s1)
- poller.unregister(s2)
-
- # Wait for everything to finish.
- wait()
- def test_timeout(self):
- """make sure Poller.poll timeout has the right units (milliseconds)."""
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- poller = self.Poller()
- poller.register(s1, zmq.POLLIN)
- tic = time.time()
- evt = poller.poll(.005)
- toc = time.time()
- self.assertTrue(toc-tic < 0.1)
- tic = time.time()
- evt = poller.poll(5)
- toc = time.time()
- self.assertTrue(toc-tic < 0.1)
- self.assertTrue(toc-tic > .001)
- tic = time.time()
- evt = poller.poll(500)
- toc = time.time()
- self.assertTrue(toc-tic < 1)
- self.assertTrue(toc-tic > 0.1)
-
-class TestSelect(PollZMQTestCase):
-
- def test_pair(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
-
- # Sleep to allow sockets to connect.
- wait()
-
- rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2])
- self.assert_(s1 in wlist)
- self.assert_(s2 in wlist)
- self.assert_(s1 not in rlist)
- self.assert_(s2 not in rlist)
-
- def test_timeout(self):
- """make sure select timeout has the right units (seconds)."""
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- tic = time.time()
- r,w,x = zmq.select([s1,s2],[],[],.005)
- toc = time.time()
- self.assertTrue(toc-tic < 1)
- self.assertTrue(toc-tic > 0.001)
- tic = time.time()
- r,w,x = zmq.select([s1,s2],[],[],.25)
- toc = time.time()
- self.assertTrue(toc-tic < 1)
- self.assertTrue(toc-tic > 0.1)
-
-
-if have_gevent:
- import gevent
- from zmq import green as gzmq
-
- class TestPollGreen(GreenTest, TestPoll):
- Poller = gzmq.Poller
-
- def test_wakeup(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- poller = self.Poller()
- poller.register(s2, zmq.POLLIN)
-
- tic = time.time()
- r = gevent.spawn(lambda: poller.poll(10000))
- s = gevent.spawn(lambda: s1.send(b'msg1'))
- r.join()
- toc = time.time()
- self.assertTrue(toc-tic < 1)
-
- def test_socket_poll(self):
- s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
-
- tic = time.time()
- r = gevent.spawn(lambda: s2.poll(10000))
- s = gevent.spawn(lambda: s1.send(b'msg1'))
- r.join()
- toc = time.time()
- self.assertTrue(toc-tic < 1)
-
diff --git a/src/console/zmq/tests/test_pubsub.py b/src/console/zmq/tests/test_pubsub.py
deleted file mode 100755
index a3ee22aa..00000000
--- a/src/console/zmq/tests/test_pubsub.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import time
-from unittest import TestCase
-
-import zmq
-
-from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
-
-
-class TestPubSub(BaseZMQTestCase):
-
- pass
-
- # We are disabling this test while an issue is being resolved.
- def test_basic(self):
- s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
- s2.setsockopt(zmq.SUBSCRIBE,b'')
- time.sleep(0.1)
- msg1 = b'message'
- s1.send(msg1)
- msg2 = s2.recv() # This is blocking!
- self.assertEqual(msg1, msg2)
-
- def test_topic(self):
- s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
- s2.setsockopt(zmq.SUBSCRIBE, b'x')
- time.sleep(0.1)
- msg1 = b'message'
- s1.send(msg1)
- self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
- msg1 = b'xmessage'
- s1.send(msg1)
- msg2 = s2.recv()
- self.assertEqual(msg1, msg2)
-
-if have_gevent:
- class TestPubSubGreen(GreenTest, TestPubSub):
- pass
diff --git a/src/console/zmq/tests/test_reqrep.py b/src/console/zmq/tests/test_reqrep.py
deleted file mode 100755
index de17f2b3..00000000
--- a/src/console/zmq/tests/test_reqrep.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from unittest import TestCase
-
-import zmq
-from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
-
-
-class TestReqRep(BaseZMQTestCase):
-
- def test_basic(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
-
- msg1 = b'message 1'
- msg2 = self.ping_pong(s1, s2, msg1)
- self.assertEqual(msg1, msg2)
-
- def test_multiple(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
-
- for i in range(10):
- msg1 = i*b' '
- msg2 = self.ping_pong(s1, s2, msg1)
- self.assertEqual(msg1, msg2)
-
- def test_bad_send_recv(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
-
- if zmq.zmq_version() != '2.1.8':
- # this doesn't work on 2.1.8
- for copy in (True,False):
- self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
- self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
-
- # I have to have this or we die on an Abort trap.
- msg1 = b'asdf'
- msg2 = self.ping_pong(s1, s2, msg1)
- self.assertEqual(msg1, msg2)
-
- def test_json(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
- o = dict(a=10,b=list(range(10)))
- o2 = self.ping_pong_json(s1, s2, o)
-
- def test_pyobj(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
- o = dict(a=10,b=range(10))
- o2 = self.ping_pong_pyobj(s1, s2, o)
-
- def test_large_msg(self):
- s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
- msg1 = 10000*b'X'
-
- for i in range(10):
- msg2 = self.ping_pong(s1, s2, msg1)
- self.assertEqual(msg1, msg2)
-
-if have_gevent:
- class TestReqRepGreen(GreenTest, TestReqRep):
- pass
diff --git a/src/console/zmq/tests/test_security.py b/src/console/zmq/tests/test_security.py
deleted file mode 100755
index 687b7e0f..00000000
--- a/src/console/zmq/tests/test_security.py
+++ /dev/null
@@ -1,212 +0,0 @@
-"""Test libzmq security (libzmq >= 3.3.0)"""
-# -*- coding: utf8 -*-
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import os
-from threading import Thread
-
-import zmq
-from zmq.tests import (
- BaseZMQTestCase, SkipTest, PYPY
-)
-from zmq.utils import z85
-
-
-USER = b"admin"
-PASS = b"password"
-
-class TestSecurity(BaseZMQTestCase):
-
- def setUp(self):
- if zmq.zmq_version_info() < (4,0):
- raise SkipTest("security is new in libzmq 4.0")
- try:
- zmq.curve_keypair()
- except zmq.ZMQError:
- raise SkipTest("security requires libzmq to be linked against libsodium")
- super(TestSecurity, self).setUp()
-
-
- def zap_handler(self):
- socket = self.context.socket(zmq.REP)
- socket.bind("inproc://zeromq.zap.01")
- try:
- msg = self.recv_multipart(socket)
-
- version, sequence, domain, address, identity, mechanism = msg[:6]
- if mechanism == b'PLAIN':
- username, password = msg[6:]
- elif mechanism == b'CURVE':
- key = msg[6]
-
- self.assertEqual(version, b"1.0")
- self.assertEqual(identity, b"IDENT")
- reply = [version, sequence]
- if mechanism == b'CURVE' or \
- (mechanism == b'PLAIN' and username == USER and password == PASS) or \
- (mechanism == b'NULL'):
- reply.extend([
- b"200",
- b"OK",
- b"anonymous",
- b"\5Hello\0\0\0\5World",
- ])
- else:
- reply.extend([
- b"400",
- b"Invalid username or password",
- b"",
- b"",
- ])
- socket.send_multipart(reply)
- finally:
- socket.close()
-
- def start_zap(self):
- self.zap_thread = Thread(target=self.zap_handler)
- self.zap_thread.start()
-
- def stop_zap(self):
- self.zap_thread.join()
-
- def bounce(self, server, client, test_metadata=True):
- msg = [os.urandom(64), os.urandom(64)]
- client.send_multipart(msg)
- frames = self.recv_multipart(server, copy=False)
- recvd = list(map(lambda x: x.bytes, frames))
-
- try:
- if test_metadata and not PYPY:
- for frame in frames:
- self.assertEqual(frame.get('User-Id'), 'anonymous')
- self.assertEqual(frame.get('Hello'), 'World')
- self.assertEqual(frame['Socket-Type'], 'DEALER')
- except zmq.ZMQVersionError:
- pass
-
- self.assertEqual(recvd, msg)
- server.send_multipart(recvd)
- msg2 = self.recv_multipart(client)
- self.assertEqual(msg2, msg)
-
- def test_null(self):
- """test NULL (default) security"""
- server = self.socket(zmq.DEALER)
- client = self.socket(zmq.DEALER)
- self.assertEqual(client.MECHANISM, zmq.NULL)
- self.assertEqual(server.mechanism, zmq.NULL)
- self.assertEqual(client.plain_server, 0)
- self.assertEqual(server.plain_server, 0)
- iface = 'tcp://127.0.0.1'
- port = server.bind_to_random_port(iface)
- client.connect("%s:%i" % (iface, port))
- self.bounce(server, client, False)
-
- def test_plain(self):
- """test PLAIN authentication"""
- server = self.socket(zmq.DEALER)
- server.identity = b'IDENT'
- client = self.socket(zmq.DEALER)
- self.assertEqual(client.plain_username, b'')
- self.assertEqual(client.plain_password, b'')
- client.plain_username = USER
- client.plain_password = PASS
- self.assertEqual(client.getsockopt(zmq.PLAIN_USERNAME), USER)
- self.assertEqual(client.getsockopt(zmq.PLAIN_PASSWORD), PASS)
- self.assertEqual(client.plain_server, 0)
- self.assertEqual(server.plain_server, 0)
- server.plain_server = True
- self.assertEqual(server.mechanism, zmq.PLAIN)
- self.assertEqual(client.mechanism, zmq.PLAIN)
-
- assert not client.plain_server
- assert server.plain_server
-
- self.start_zap()
-
- iface = 'tcp://127.0.0.1'
- port = server.bind_to_random_port(iface)
- client.connect("%s:%i" % (iface, port))
- self.bounce(server, client)
- self.stop_zap()
-
- def skip_plain_inauth(self):
- """test PLAIN failed authentication"""
- server = self.socket(zmq.DEALER)
- server.identity = b'IDENT'
- client = self.socket(zmq.DEALER)
- self.sockets.extend([server, client])
- client.plain_username = USER
- client.plain_password = b'incorrect'
- server.plain_server = True
- self.assertEqual(server.mechanism, zmq.PLAIN)
- self.assertEqual(client.mechanism, zmq.PLAIN)
-
- self.start_zap()
-
- iface = 'tcp://127.0.0.1'
- port = server.bind_to_random_port(iface)
- client.connect("%s:%i" % (iface, port))
- client.send(b'ping')
- server.rcvtimeo = 250
- self.assertRaisesErrno(zmq.EAGAIN, server.recv)
- self.stop_zap()
-
- def test_keypair(self):
- """test curve_keypair"""
- try:
- public, secret = zmq.curve_keypair()
- except zmq.ZMQError:
- raise SkipTest("CURVE unsupported")
-
- self.assertEqual(type(secret), bytes)
- self.assertEqual(type(public), bytes)
- self.assertEqual(len(secret), 40)
- self.assertEqual(len(public), 40)
-
- # verify that it is indeed Z85
- bsecret, bpublic = [ z85.decode(key) for key in (public, secret) ]
- self.assertEqual(type(bsecret), bytes)
- self.assertEqual(type(bpublic), bytes)
- self.assertEqual(len(bsecret), 32)
- self.assertEqual(len(bpublic), 32)
-
-
- def test_curve(self):
- """test CURVE encryption"""
- server = self.socket(zmq.DEALER)
- server.identity = b'IDENT'
- client = self.socket(zmq.DEALER)
- self.sockets.extend([server, client])
- try:
- server.curve_server = True
- except zmq.ZMQError as e:
- # will raise EINVAL if not linked against libsodium
- if e.errno == zmq.EINVAL:
- raise SkipTest("CURVE unsupported")
-
- server_public, server_secret = zmq.curve_keypair()
- client_public, client_secret = zmq.curve_keypair()
-
- server.curve_secretkey = server_secret
- server.curve_publickey = server_public
- client.curve_serverkey = server_public
- client.curve_publickey = client_public
- client.curve_secretkey = client_secret
-
- self.assertEqual(server.mechanism, zmq.CURVE)
- self.assertEqual(client.mechanism, zmq.CURVE)
-
- self.assertEqual(server.get(zmq.CURVE_SERVER), True)
- self.assertEqual(client.get(zmq.CURVE_SERVER), False)
-
- self.start_zap()
-
- iface = 'tcp://127.0.0.1'
- port = server.bind_to_random_port(iface)
- client.connect("%s:%i" % (iface, port))
- self.bounce(server, client)
- self.stop_zap()
-
diff --git a/src/console/zmq/tests/test_socket.py b/src/console/zmq/tests/test_socket.py
deleted file mode 100755
index 5c842edc..00000000
--- a/src/console/zmq/tests/test_socket.py
+++ /dev/null
@@ -1,450 +0,0 @@
-# -*- coding: utf8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-import warnings
-
-import zmq
-from zmq.tests import (
- BaseZMQTestCase, SkipTest, have_gevent, GreenTest, skip_pypy, skip_if
-)
-from zmq.utils.strtypes import bytes, unicode
-
-
-class TestSocket(BaseZMQTestCase):
-
- def test_create(self):
- ctx = self.Context()
- s = ctx.socket(zmq.PUB)
- # Superluminal protocol not yet implemented
- self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a')
- self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://a')
- self.assertRaisesErrno(zmq.EINVAL, s.bind, 'tcp://')
- s.close()
- del ctx
-
- def test_context_manager(self):
- url = 'inproc://a'
- with self.Context() as ctx:
- with ctx.socket(zmq.PUSH) as a:
- a.bind(url)
- with ctx.socket(zmq.PULL) as b:
- b.connect(url)
- msg = b'hi'
- a.send(msg)
- rcvd = self.recv(b)
- self.assertEqual(rcvd, msg)
- self.assertEqual(b.closed, True)
- self.assertEqual(a.closed, True)
- self.assertEqual(ctx.closed, True)
-
- def test_dir(self):
- ctx = self.Context()
- s = ctx.socket(zmq.PUB)
- self.assertTrue('send' in dir(s))
- self.assertTrue('IDENTITY' in dir(s))
- self.assertTrue('AFFINITY' in dir(s))
- self.assertTrue('FD' in dir(s))
- s.close()
- ctx.term()
-
- def test_bind_unicode(self):
- s = self.socket(zmq.PUB)
- p = s.bind_to_random_port(unicode("tcp://*"))
-
- def test_connect_unicode(self):
- s = self.socket(zmq.PUB)
- s.connect(unicode("tcp://127.0.0.1:5555"))
-
- def test_bind_to_random_port(self):
- # Check that bind_to_random_port do not hide usefull exception
- ctx = self.Context()
- c = ctx.socket(zmq.PUB)
- # Invalid format
- try:
- c.bind_to_random_port('tcp:*')
- except zmq.ZMQError as e:
- self.assertEqual(e.errno, zmq.EINVAL)
- # Invalid protocol
- try:
- c.bind_to_random_port('rand://*')
- except zmq.ZMQError as e:
- self.assertEqual(e.errno, zmq.EPROTONOSUPPORT)
-
- def test_identity(self):
- s = self.context.socket(zmq.PULL)
- self.sockets.append(s)
- ident = b'identity\0\0'
- s.identity = ident
- self.assertEqual(s.get(zmq.IDENTITY), ident)
-
- def test_unicode_sockopts(self):
- """test setting/getting sockopts with unicode strings"""
- topic = "tést"
- if str is not unicode:
- topic = topic.decode('utf8')
- p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
- self.assertEqual(s.send_unicode, s.send_unicode)
- self.assertEqual(p.recv_unicode, p.recv_unicode)
- self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
- self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
- s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
- self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
- s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
- self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
- self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
-
- identb = s.getsockopt(zmq.IDENTITY)
- identu = identb.decode('utf16')
- identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
- self.assertEqual(identu, identu2)
- time.sleep(0.1) # wait for connection/subscription
- p.send_unicode(topic,zmq.SNDMORE)
- p.send_unicode(topic*2, encoding='latin-1')
- self.assertEqual(topic, s.recv_unicode())
- self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
-
- def test_int_sockopts(self):
- "test integer sockopts"
- v = zmq.zmq_version_info()
- if v < (3,0):
- default_hwm = 0
- else:
- default_hwm = 1000
- p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
- p.setsockopt(zmq.LINGER, 0)
- self.assertEqual(p.getsockopt(zmq.LINGER), 0)
- p.setsockopt(zmq.LINGER, -1)
- self.assertEqual(p.getsockopt(zmq.LINGER), -1)
- self.assertEqual(p.hwm, default_hwm)
- p.hwm = 11
- self.assertEqual(p.hwm, 11)
- # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
- self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
- self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
- self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
- self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
- self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
- self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
-
- # check for overflow / wrong type:
- errors = []
- backref = {}
- constants = zmq.constants
- for name in constants.__all__:
- value = getattr(constants, name)
- if isinstance(value, int):
- backref[value] = name
- for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
- sopt = backref[opt]
- if sopt.startswith((
- 'ROUTER', 'XPUB', 'TCP', 'FAIL',
- 'REQ_', 'CURVE_', 'PROBE_ROUTER',
- 'IPC_FILTER', 'GSSAPI',
- )):
- # some sockopts are write-only
- continue
- try:
- n = p.getsockopt(opt)
- except zmq.ZMQError as e:
- errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
- else:
- if n > 2**31:
- errors.append("getsockopt(zmq.%s) returned a ridiculous value."
- " It is probably the wrong type."%sopt)
- if errors:
- self.fail('\n'.join([''] + errors))
-
- def test_bad_sockopts(self):
- """Test that appropriate errors are raised on bad socket options"""
- s = self.context.socket(zmq.PUB)
- self.sockets.append(s)
- s.setsockopt(zmq.LINGER, 0)
- # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
- self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
- self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
- # but only int sockopts are allowed through this way, otherwise raise a TypeError
- self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
- # some sockopts are valid in general, but not on every socket:
- self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
-
- def test_sockopt_roundtrip(self):
- "test set/getsockopt roundtrip."
- p = self.context.socket(zmq.PUB)
- self.sockets.append(p)
- p.setsockopt(zmq.LINGER, 11)
- self.assertEqual(p.getsockopt(zmq.LINGER), 11)
-
- def test_send_unicode(self):
- "test sending unicode objects"
- a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
- self.sockets.extend([a,b])
- u = "çπ§"
- if str is not unicode:
- u = u.decode('utf8')
- self.assertRaises(TypeError, a.send, u,copy=False)
- self.assertRaises(TypeError, a.send, u,copy=True)
- a.send_unicode(u)
- s = b.recv()
- self.assertEqual(s,u.encode('utf8'))
- self.assertEqual(s.decode('utf8'),u)
- a.send_unicode(u,encoding='utf16')
- s = b.recv_unicode(encoding='utf16')
- self.assertEqual(s,u)
-
- @skip_pypy
- def test_tracker(self):
- "test the MessageTracker object for tracking when zmq is done with a buffer"
- addr = 'tcp://127.0.0.1'
- a = self.context.socket(zmq.PUB)
- port = a.bind_to_random_port(addr)
- a.close()
- iface = "%s:%i"%(addr,port)
- a = self.context.socket(zmq.PAIR)
- # a.setsockopt(zmq.IDENTITY, b"a")
- b = self.context.socket(zmq.PAIR)
- self.sockets.extend([a,b])
- a.connect(iface)
- time.sleep(0.1)
- p1 = a.send(b'something', copy=False, track=True)
- self.assertTrue(isinstance(p1, zmq.MessageTracker))
- self.assertFalse(p1.done)
- p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
- self.assert_(isinstance(p2, zmq.MessageTracker))
- self.assertEqual(p2.done, False)
- self.assertEqual(p1.done, False)
-
- b.bind(iface)
- msg = b.recv_multipart()
- for i in range(10):
- if p1.done:
- break
- time.sleep(0.1)
- self.assertEqual(p1.done, True)
- self.assertEqual(msg, [b'something'])
- msg = b.recv_multipart()
- for i in range(10):
- if p2.done:
- break
- time.sleep(0.1)
- self.assertEqual(p2.done, True)
- self.assertEqual(msg, [b'something', b'else'])
- m = zmq.Frame(b"again", track=True)
- self.assertEqual(m.tracker.done, False)
- p1 = a.send(m, copy=False)
- p2 = a.send(m, copy=False)
- self.assertEqual(m.tracker.done, False)
- self.assertEqual(p1.done, False)
- self.assertEqual(p2.done, False)
- msg = b.recv_multipart()
- self.assertEqual(m.tracker.done, False)
- self.assertEqual(msg, [b'again'])
- msg = b.recv_multipart()
- self.assertEqual(m.tracker.done, False)
- self.assertEqual(msg, [b'again'])
- self.assertEqual(p1.done, False)
- self.assertEqual(p2.done, False)
- pm = m.tracker
- del m
- for i in range(10):
- if p1.done:
- break
- time.sleep(0.1)
- self.assertEqual(p1.done, True)
- self.assertEqual(p2.done, True)
- m = zmq.Frame(b'something', track=False)
- self.assertRaises(ValueError, a.send, m, copy=False, track=True)
-
-
- def test_close(self):
- ctx = self.Context()
- s = ctx.socket(zmq.PUB)
- s.close()
- self.assertRaisesErrno(zmq.ENOTSOCK, s.bind, b'')
- self.assertRaisesErrno(zmq.ENOTSOCK, s.connect, b'')
- self.assertRaisesErrno(zmq.ENOTSOCK, s.setsockopt, zmq.SUBSCRIBE, b'')
- self.assertRaisesErrno(zmq.ENOTSOCK, s.send, b'asdf')
- self.assertRaisesErrno(zmq.ENOTSOCK, s.recv)
- del ctx
-
- def test_attr(self):
- """set setting/getting sockopts as attributes"""
- s = self.context.socket(zmq.DEALER)
- self.sockets.append(s)
- linger = 10
- s.linger = linger
- self.assertEqual(linger, s.linger)
- self.assertEqual(linger, s.getsockopt(zmq.LINGER))
- self.assertEqual(s.fd, s.getsockopt(zmq.FD))
-
- def test_bad_attr(self):
- s = self.context.socket(zmq.DEALER)
- self.sockets.append(s)
- try:
- s.apple='foo'
- except AttributeError:
- pass
- else:
- self.fail("bad setattr should have raised AttributeError")
- try:
- s.apple
- except AttributeError:
- pass
- else:
- self.fail("bad getattr should have raised AttributeError")
-
- def test_subclass(self):
- """subclasses can assign attributes"""
- class S(zmq.Socket):
- a = None
- def __init__(self, *a, **kw):
- self.a=-1
- super(S, self).__init__(*a, **kw)
-
- s = S(self.context, zmq.REP)
- self.sockets.append(s)
- self.assertEqual(s.a, -1)
- s.a=1
- self.assertEqual(s.a, 1)
- a=s.a
- self.assertEqual(a, 1)
-
- def test_recv_multipart(self):
- a,b = self.create_bound_pair()
- msg = b'hi'
- for i in range(3):
- a.send(msg)
- time.sleep(0.1)
- for i in range(3):
- self.assertEqual(b.recv_multipart(), [msg])
-
- def test_close_after_destroy(self):
- """s.close() after ctx.destroy() should be fine"""
- ctx = self.Context()
- s = ctx.socket(zmq.REP)
- ctx.destroy()
- # reaper is not instantaneous
- time.sleep(1e-2)
- s.close()
- self.assertTrue(s.closed)
-
- def test_poll(self):
- a,b = self.create_bound_pair()
- tic = time.time()
- evt = a.poll(50)
- self.assertEqual(evt, 0)
- evt = a.poll(50, zmq.POLLOUT)
- self.assertEqual(evt, zmq.POLLOUT)
- msg = b'hi'
- a.send(msg)
- evt = b.poll(50)
- self.assertEqual(evt, zmq.POLLIN)
- msg2 = self.recv(b)
- evt = b.poll(50)
- self.assertEqual(evt, 0)
- self.assertEqual(msg2, msg)
-
- def test_ipc_path_max_length(self):
- """IPC_PATH_MAX_LEN is a sensible value"""
- if zmq.IPC_PATH_MAX_LEN == 0:
- raise SkipTest("IPC_PATH_MAX_LEN undefined")
-
- msg = "Surprising value for IPC_PATH_MAX_LEN: %s" % zmq.IPC_PATH_MAX_LEN
- self.assertTrue(zmq.IPC_PATH_MAX_LEN > 30, msg)
- self.assertTrue(zmq.IPC_PATH_MAX_LEN < 1025, msg)
-
- def test_ipc_path_max_length_msg(self):
- if zmq.IPC_PATH_MAX_LEN == 0:
- raise SkipTest("IPC_PATH_MAX_LEN undefined")
-
- s = self.context.socket(zmq.PUB)
- self.sockets.append(s)
- try:
- s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
- except zmq.ZMQError as e:
- self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
-
- def test_hwm(self):
- zmq3 = zmq.zmq_version_info()[0] >= 3
- for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
- s = self.context.socket(stype)
- s.hwm = 100
- self.assertEqual(s.hwm, 100)
- if zmq3:
- try:
- self.assertEqual(s.sndhwm, 100)
- except AttributeError:
- pass
- try:
- self.assertEqual(s.rcvhwm, 100)
- except AttributeError:
- pass
- s.close()
-
- def test_shadow(self):
- p = self.socket(zmq.PUSH)
- p.bind("tcp://127.0.0.1:5555")
- p2 = zmq.Socket.shadow(p.underlying)
- self.assertEqual(p.underlying, p2.underlying)
- s = self.socket(zmq.PULL)
- s2 = zmq.Socket.shadow(s.underlying)
- self.assertNotEqual(s.underlying, p.underlying)
- self.assertEqual(s.underlying, s2.underlying)
- s2.connect("tcp://127.0.0.1:5555")
- sent = b'hi'
- p2.send(sent)
- rcvd = self.recv(s2)
- self.assertEqual(rcvd, sent)
-
- def test_shadow_pyczmq(self):
- try:
- from pyczmq import zctx, zsocket
- except Exception:
- raise SkipTest("Requires pyczmq")
-
- ctx = zctx.new()
- ca = zsocket.new(ctx, zmq.PUSH)
- cb = zsocket.new(ctx, zmq.PULL)
- a = zmq.Socket.shadow(ca)
- b = zmq.Socket.shadow(cb)
- a.bind("inproc://a")
- b.connect("inproc://a")
- a.send(b'hi')
- rcvd = self.recv(b)
- self.assertEqual(rcvd, b'hi')
-
-
-if have_gevent:
- import gevent
-
- class TestSocketGreen(GreenTest, TestSocket):
- test_bad_attr = GreenTest.skip_green
- test_close_after_destroy = GreenTest.skip_green
-
- def test_timeout(self):
- a,b = self.create_bound_pair()
- g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
- timeout = gevent.Timeout(0.1)
- timeout.start()
- self.assertRaises(gevent.Timeout, b.recv)
- g.kill()
-
- @skip_if(not hasattr(zmq, 'RCVTIMEO'))
- def test_warn_set_timeo(self):
- s = self.context.socket(zmq.REQ)
- with warnings.catch_warnings(record=True) as w:
- s.rcvtimeo = 5
- s.close()
- self.assertEqual(len(w), 1)
- self.assertEqual(w[0].category, UserWarning)
-
-
- @skip_if(not hasattr(zmq, 'SNDTIMEO'))
- def test_warn_get_timeo(self):
- s = self.context.socket(zmq.REQ)
- with warnings.catch_warnings(record=True) as w:
- s.sndtimeo
- s.close()
- self.assertEqual(len(w), 1)
- self.assertEqual(w[0].category, UserWarning)
diff --git a/src/console/zmq/tests/test_stopwatch.py b/src/console/zmq/tests/test_stopwatch.py
deleted file mode 100755
index 49fb79f2..00000000
--- a/src/console/zmq/tests/test_stopwatch.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import sys
-import time
-
-from unittest import TestCase
-
-from zmq import Stopwatch, ZMQError
-
-if sys.version_info[0] >= 3:
- long = int
-
-class TestStopWatch(TestCase):
-
- def test_stop_long(self):
- """Ensure stop returns a long int."""
- watch = Stopwatch()
- watch.start()
- us = watch.stop()
- self.assertTrue(isinstance(us, long))
-
- def test_stop_microseconds(self):
- """Test that stop/sleep have right units."""
- watch = Stopwatch()
- watch.start()
- tic = time.time()
- watch.sleep(1)
- us = watch.stop()
- toc = time.time()
- self.assertAlmostEqual(us/1e6,(toc-tic),places=0)
-
- def test_double_stop(self):
- """Test error raised on multiple calls to stop."""
- watch = Stopwatch()
- watch.start()
- watch.stop()
- self.assertRaises(ZMQError, watch.stop)
- self.assertRaises(ZMQError, watch.stop)
-
diff --git a/src/console/zmq/tests/test_version.py b/src/console/zmq/tests/test_version.py
deleted file mode 100755
index 6ebebf30..00000000
--- a/src/console/zmq/tests/test_version.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from unittest import TestCase
-import zmq
-from zmq.sugar import version
-
-
-class TestVersion(TestCase):
-
- def test_pyzmq_version(self):
- vs = zmq.pyzmq_version()
- vs2 = zmq.__version__
- self.assertTrue(isinstance(vs, str))
- if zmq.__revision__:
- self.assertEqual(vs, '@'.join(vs2, zmq.__revision__))
- else:
- self.assertEqual(vs, vs2)
- if version.VERSION_EXTRA:
- self.assertTrue(version.VERSION_EXTRA in vs)
- self.assertTrue(version.VERSION_EXTRA in vs2)
-
- def test_pyzmq_version_info(self):
- info = zmq.pyzmq_version_info()
- self.assertTrue(isinstance(info, tuple))
- for n in info[:3]:
- self.assertTrue(isinstance(n, int))
- if version.VERSION_EXTRA:
- self.assertEqual(len(info), 4)
- self.assertEqual(info[-1], float('inf'))
- else:
- self.assertEqual(len(info), 3)
-
- def test_zmq_version_info(self):
- info = zmq.zmq_version_info()
- self.assertTrue(isinstance(info, tuple))
- for n in info[:3]:
- self.assertTrue(isinstance(n, int))
-
- def test_zmq_version(self):
- v = zmq.zmq_version()
- self.assertTrue(isinstance(v, str))
-
diff --git a/src/console/zmq/tests/test_win32_shim.py b/src/console/zmq/tests/test_win32_shim.py
deleted file mode 100755
index 55657bda..00000000
--- a/src/console/zmq/tests/test_win32_shim.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from __future__ import print_function
-
-import os
-
-from functools import wraps
-from zmq.tests import BaseZMQTestCase
-from zmq.utils.win32 import allow_interrupt
-
-
-def count_calls(f):
- @wraps(f)
- def _(*args, **kwds):
- try:
- return f(*args, **kwds)
- finally:
- _.__calls__ += 1
- _.__calls__ = 0
- return _
-
-
-class TestWindowsConsoleControlHandler(BaseZMQTestCase):
-
- def test_handler(self):
- @count_calls
- def interrupt_polling():
- print('Caught CTRL-C!')
-
- if os.name == 'nt':
- from ctypes import windll
- from ctypes.wintypes import BOOL, DWORD
-
- kernel32 = windll.LoadLibrary('kernel32')
-
- # <http://msdn.microsoft.com/en-us/library/ms683155.aspx>
- GenerateConsoleCtrlEvent = kernel32.GenerateConsoleCtrlEvent
- GenerateConsoleCtrlEvent.argtypes = (DWORD, DWORD)
- GenerateConsoleCtrlEvent.restype = BOOL
-
- try:
- # Simulate CTRL-C event while handler is active.
- with allow_interrupt(interrupt_polling):
- result = GenerateConsoleCtrlEvent(0, 0)
- if result == 0:
- raise WindowsError
- except KeyboardInterrupt:
- pass
- else:
- self.fail('Expecting `KeyboardInterrupt` exception!')
-
- # Make sure our handler was called.
- self.assertEqual(interrupt_polling.__calls__, 1)
- else:
- # On non-Windows systems, this utility is just a no-op!
- with allow_interrupt(interrupt_polling):
- pass
- self.assertEqual(interrupt_polling.__calls__, 0)
diff --git a/src/console/zmq/tests/test_z85.py b/src/console/zmq/tests/test_z85.py
deleted file mode 100755
index 8a73cb4d..00000000
--- a/src/console/zmq/tests/test_z85.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# -*- coding: utf8 -*-
-"""Test Z85 encoding
-
-confirm values and roundtrip with test values from the reference implementation.
-"""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-from unittest import TestCase
-from zmq.utils import z85
-
-
-class TestZ85(TestCase):
-
- def test_client_public(self):
- client_public = \
- b"\xBB\x88\x47\x1D\x65\xE2\x65\x9B" \
- b"\x30\xC5\x5A\x53\x21\xCE\xBB\x5A" \
- b"\xAB\x2B\x70\xA3\x98\x64\x5C\x26" \
- b"\xDC\xA2\xB2\xFC\xB4\x3F\xC5\x18"
- encoded = z85.encode(client_public)
-
- self.assertEqual(encoded, b"Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID")
- decoded = z85.decode(encoded)
- self.assertEqual(decoded, client_public)
-
- def test_client_secret(self):
- client_secret = \
- b"\x7B\xB8\x64\xB4\x89\xAF\xA3\x67" \
- b"\x1F\xBE\x69\x10\x1F\x94\xB3\x89" \
- b"\x72\xF2\x48\x16\xDF\xB0\x1B\x51" \
- b"\x65\x6B\x3F\xEC\x8D\xFD\x08\x88"
- encoded = z85.encode(client_secret)
-
- self.assertEqual(encoded, b"D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs")
- decoded = z85.decode(encoded)
- self.assertEqual(decoded, client_secret)
-
- def test_server_public(self):
- server_public = \
- b"\x54\xFC\xBA\x24\xE9\x32\x49\x96" \
- b"\x93\x16\xFB\x61\x7C\x87\x2B\xB0" \
- b"\xC1\xD1\xFF\x14\x80\x04\x27\xC5" \
- b"\x94\xCB\xFA\xCF\x1B\xC2\xD6\x52"
- encoded = z85.encode(server_public)
-
- self.assertEqual(encoded, b"rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7")
- decoded = z85.decode(encoded)
- self.assertEqual(decoded, server_public)
-
- def test_server_secret(self):
- server_secret = \
- b"\x8E\x0B\xDD\x69\x76\x28\xB9\x1D" \
- b"\x8F\x24\x55\x87\xEE\x95\xC5\xB0" \
- b"\x4D\x48\x96\x3F\x79\x25\x98\x77" \
- b"\xB4\x9C\xD9\x06\x3A\xEA\xD3\xB7"
- encoded = z85.encode(server_secret)
-
- self.assertEqual(encoded, b"JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6")
- decoded = z85.decode(encoded)
- self.assertEqual(decoded, server_secret)
-
diff --git a/src/console/zmq/tests/test_zmqstream.py b/src/console/zmq/tests/test_zmqstream.py
deleted file mode 100755
index cdb3a171..00000000
--- a/src/console/zmq/tests/test_zmqstream.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# -*- coding: utf8 -*-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import sys
-import time
-
-from unittest import TestCase
-
-import zmq
-from zmq.eventloop import ioloop, zmqstream
-
-class TestZMQStream(TestCase):
-
- def setUp(self):
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.REP)
- self.loop = ioloop.IOLoop.instance()
- self.stream = zmqstream.ZMQStream(self.socket)
-
- def tearDown(self):
- self.socket.close()
- self.context.term()
-
- def test_callable_check(self):
- """Ensure callable check works (py3k)."""
-
- self.stream.on_send(lambda *args: None)
- self.stream.on_recv(lambda *args: None)
- self.assertRaises(AssertionError, self.stream.on_recv, 1)
- self.assertRaises(AssertionError, self.stream.on_send, 1)
- self.assertRaises(AssertionError, self.stream.on_recv, zmq)
-