diff options
Diffstat (limited to 'src/console/zmq/tests/__init__.py')
-rwxr-xr-x | src/console/zmq/tests/__init__.py | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/src/console/zmq/tests/__init__.py b/src/console/zmq/tests/__init__.py new file mode 100755 index 00000000..325a3f19 --- /dev/null +++ b/src/console/zmq/tests/__init__.py @@ -0,0 +1,211 @@ +# Copyright (c) PyZMQ Developers. +# Distributed under the terms of the Modified BSD License. + +import functools +import sys +import time +from threading import Thread + +from unittest import TestCase + +import zmq +from zmq.utils import jsonapi + +try: + import gevent + from zmq import green as gzmq + have_gevent = True +except ImportError: + have_gevent = False + +try: + from unittest import SkipTest +except ImportError: + try: + from nose import SkipTest + except ImportError: + class SkipTest(Exception): + pass + +PYPY = 'PyPy' in sys.version + +#----------------------------------------------------------------------------- +# skip decorators (directly from unittest) +#----------------------------------------------------------------------------- + +_id = lambda x: x + +def skip(reason): + """ + Unconditionally skip a test. + """ + def decorator(test_item): + if not (isinstance(test_item, type) and issubclass(test_item, TestCase)): + @functools.wraps(test_item) + def skip_wrapper(*args, **kwargs): + raise SkipTest(reason) + test_item = skip_wrapper + + test_item.__unittest_skip__ = True + test_item.__unittest_skip_why__ = reason + return test_item + return decorator + +def skip_if(condition, reason="Skipped"): + """ + Skip a test if the condition is true. + """ + if condition: + return skip(reason) + return _id + +skip_pypy = skip_if(PYPY, "Doesn't work on PyPy") + +#----------------------------------------------------------------------------- +# Base test class +#----------------------------------------------------------------------------- + +class BaseZMQTestCase(TestCase): + green = False + + @property + def Context(self): + if self.green: + return gzmq.Context + else: + return zmq.Context + + def socket(self, socket_type): + s = self.context.socket(socket_type) + self.sockets.append(s) + return s + + def setUp(self): + if self.green and not have_gevent: + raise SkipTest("requires gevent") + self.context = self.Context.instance() + self.sockets = [] + + def tearDown(self): + contexts = set([self.context]) + while self.sockets: + sock = self.sockets.pop() + contexts.add(sock.context) # in case additional contexts are created + sock.close(0) + for ctx in contexts: + t = Thread(target=ctx.term) + t.daemon = True + t.start() + t.join(timeout=2) + if t.is_alive(): + # reset Context.instance, so the failure to term doesn't corrupt subsequent tests + zmq.sugar.context.Context._instance = None + raise RuntimeError("context could not terminate, open sockets likely remain in test") + + def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'): + """Create a bound socket pair using a random port.""" + s1 = self.context.socket(type1) + s1.setsockopt(zmq.LINGER, 0) + port = s1.bind_to_random_port(interface) + s2 = self.context.socket(type2) + s2.setsockopt(zmq.LINGER, 0) + s2.connect('%s:%s' % (interface, port)) + self.sockets.extend([s1,s2]) + return s1, s2 + + def ping_pong(self, s1, s2, msg): + s1.send(msg) + msg2 = s2.recv() + s2.send(msg2) + msg3 = s1.recv() + return msg3 + + def ping_pong_json(self, s1, s2, o): + if jsonapi.jsonmod is None: + raise SkipTest("No json library") + s1.send_json(o) + o2 = s2.recv_json() + s2.send_json(o2) + o3 = s1.recv_json() + return o3 + + def ping_pong_pyobj(self, s1, s2, o): + s1.send_pyobj(o) + o2 = s2.recv_pyobj() + s2.send_pyobj(o2) + o3 = s1.recv_pyobj() + return o3 + + def assertRaisesErrno(self, errno, func, *args, **kwargs): + try: + func(*args, **kwargs) + except zmq.ZMQError as e: + self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \ +got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) + else: + self.fail("Function did not raise any error") + + def _select_recv(self, multipart, socket, **kwargs): + """call recv[_multipart] in a way that raises if there is nothing to receive""" + if zmq.zmq_version_info() >= (3,1,0): + # zmq 3.1 has a bug, where poll can return false positives, + # so we wait a little bit just in case + # See LIBZMQ-280 on JIRA + time.sleep(0.1) + + r,w,x = zmq.select([socket], [], [], timeout=5) + assert len(r) > 0, "Should have received a message" + kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0) + + recv = socket.recv_multipart if multipart else socket.recv + return recv(**kwargs) + + def recv(self, socket, **kwargs): + """call recv in a way that raises if there is nothing to receive""" + return self._select_recv(False, socket, **kwargs) + + def recv_multipart(self, socket, **kwargs): + """call recv_multipart in a way that raises if there is nothing to receive""" + return self._select_recv(True, socket, **kwargs) + + +class PollZMQTestCase(BaseZMQTestCase): + pass + +class GreenTest: + """Mixin for making green versions of test classes""" + green = True + + def assertRaisesErrno(self, errno, func, *args, **kwargs): + if errno == zmq.EAGAIN: + raise SkipTest("Skipping because we're green.") + try: + func(*args, **kwargs) + except zmq.ZMQError: + e = sys.exc_info()[1] + self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \ +got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) + else: + self.fail("Function did not raise any error") + + def tearDown(self): + contexts = set([self.context]) + while self.sockets: + sock = self.sockets.pop() + contexts.add(sock.context) # in case additional contexts are created + sock.close() + try: + gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True) + except gevent.Timeout: + raise RuntimeError("context could not terminate, open sockets likely remain in test") + + def skip_green(self): + raise SkipTest("Skipping because we are green") + +def skip_green(f): + def skipping_test(self, *args, **kwargs): + if self.green: + raise SkipTest("Skipping because we are green") + else: + return f(self, *args, **kwargs) + return skipping_test |