summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/tests/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/tests/__init__.py')
-rwxr-xr-xsrc/console/zmq/tests/__init__.py211
1 files changed, 0 insertions, 211 deletions
diff --git a/src/console/zmq/tests/__init__.py b/src/console/zmq/tests/__init__.py
deleted file mode 100755
index 325a3f19..00000000
--- a/src/console/zmq/tests/__init__.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright (c) PyZMQ Developers.
-# Distributed under the terms of the Modified BSD License.
-
-import functools
-import sys
-import time
-from threading import Thread
-
-from unittest import TestCase
-
-import zmq
-from zmq.utils import jsonapi
-
-try:
- import gevent
- from zmq import green as gzmq
- have_gevent = True
-except ImportError:
- have_gevent = False
-
-try:
- from unittest import SkipTest
-except ImportError:
- try:
- from nose import SkipTest
- except ImportError:
- class SkipTest(Exception):
- pass
-
-PYPY = 'PyPy' in sys.version
-
-#-----------------------------------------------------------------------------
-# skip decorators (directly from unittest)
-#-----------------------------------------------------------------------------
-
-_id = lambda x: x
-
-def skip(reason):
- """
- Unconditionally skip a test.
- """
- def decorator(test_item):
- if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
- @functools.wraps(test_item)
- def skip_wrapper(*args, **kwargs):
- raise SkipTest(reason)
- test_item = skip_wrapper
-
- test_item.__unittest_skip__ = True
- test_item.__unittest_skip_why__ = reason
- return test_item
- return decorator
-
-def skip_if(condition, reason="Skipped"):
- """
- Skip a test if the condition is true.
- """
- if condition:
- return skip(reason)
- return _id
-
-skip_pypy = skip_if(PYPY, "Doesn't work on PyPy")
-
-#-----------------------------------------------------------------------------
-# Base test class
-#-----------------------------------------------------------------------------
-
-class BaseZMQTestCase(TestCase):
- green = False
-
- @property
- def Context(self):
- if self.green:
- return gzmq.Context
- else:
- return zmq.Context
-
- def socket(self, socket_type):
- s = self.context.socket(socket_type)
- self.sockets.append(s)
- return s
-
- def setUp(self):
- if self.green and not have_gevent:
- raise SkipTest("requires gevent")
- self.context = self.Context.instance()
- self.sockets = []
-
- def tearDown(self):
- contexts = set([self.context])
- while self.sockets:
- sock = self.sockets.pop()
- contexts.add(sock.context) # in case additional contexts are created
- sock.close(0)
- for ctx in contexts:
- t = Thread(target=ctx.term)
- t.daemon = True
- t.start()
- t.join(timeout=2)
- if t.is_alive():
- # reset Context.instance, so the failure to term doesn't corrupt subsequent tests
- zmq.sugar.context.Context._instance = None
- raise RuntimeError("context could not terminate, open sockets likely remain in test")
-
- def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
- """Create a bound socket pair using a random port."""
- s1 = self.context.socket(type1)
- s1.setsockopt(zmq.LINGER, 0)
- port = s1.bind_to_random_port(interface)
- s2 = self.context.socket(type2)
- s2.setsockopt(zmq.LINGER, 0)
- s2.connect('%s:%s' % (interface, port))
- self.sockets.extend([s1,s2])
- return s1, s2
-
- def ping_pong(self, s1, s2, msg):
- s1.send(msg)
- msg2 = s2.recv()
- s2.send(msg2)
- msg3 = s1.recv()
- return msg3
-
- def ping_pong_json(self, s1, s2, o):
- if jsonapi.jsonmod is None:
- raise SkipTest("No json library")
- s1.send_json(o)
- o2 = s2.recv_json()
- s2.send_json(o2)
- o3 = s1.recv_json()
- return o3
-
- def ping_pong_pyobj(self, s1, s2, o):
- s1.send_pyobj(o)
- o2 = s2.recv_pyobj()
- s2.send_pyobj(o2)
- o3 = s1.recv_pyobj()
- return o3
-
- def assertRaisesErrno(self, errno, func, *args, **kwargs):
- try:
- func(*args, **kwargs)
- except zmq.ZMQError as e:
- self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
-got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
- else:
- self.fail("Function did not raise any error")
-
- def _select_recv(self, multipart, socket, **kwargs):
- """call recv[_multipart] in a way that raises if there is nothing to receive"""
- if zmq.zmq_version_info() >= (3,1,0):
- # zmq 3.1 has a bug, where poll can return false positives,
- # so we wait a little bit just in case
- # See LIBZMQ-280 on JIRA
- time.sleep(0.1)
-
- r,w,x = zmq.select([socket], [], [], timeout=5)
- assert len(r) > 0, "Should have received a message"
- kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
-
- recv = socket.recv_multipart if multipart else socket.recv
- return recv(**kwargs)
-
- def recv(self, socket, **kwargs):
- """call recv in a way that raises if there is nothing to receive"""
- return self._select_recv(False, socket, **kwargs)
-
- def recv_multipart(self, socket, **kwargs):
- """call recv_multipart in a way that raises if there is nothing to receive"""
- return self._select_recv(True, socket, **kwargs)
-
-
-class PollZMQTestCase(BaseZMQTestCase):
- pass
-
-class GreenTest:
- """Mixin for making green versions of test classes"""
- green = True
-
- def assertRaisesErrno(self, errno, func, *args, **kwargs):
- if errno == zmq.EAGAIN:
- raise SkipTest("Skipping because we're green.")
- try:
- func(*args, **kwargs)
- except zmq.ZMQError:
- e = sys.exc_info()[1]
- self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
-got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
- else:
- self.fail("Function did not raise any error")
-
- def tearDown(self):
- contexts = set([self.context])
- while self.sockets:
- sock = self.sockets.pop()
- contexts.add(sock.context) # in case additional contexts are created
- sock.close()
- try:
- gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
- except gevent.Timeout:
- raise RuntimeError("context could not terminate, open sockets likely remain in test")
-
- def skip_green(self):
- raise SkipTest("Skipping because we are green")
-
-def skip_green(f):
- def skipping_test(self, *args, **kwargs):
- if self.green:
- raise SkipTest("Skipping because we are green")
- else:
- return f(self, *args, **kwargs)
- return skipping_test