diff options
Diffstat (limited to 'external_libs/python/zmq/tests/test_monitor.py')
-rw-r--r-- | external_libs/python/zmq/tests/test_monitor.py | 71 |
1 files changed, 0 insertions, 71 deletions
diff --git a/external_libs/python/zmq/tests/test_monitor.py b/external_libs/python/zmq/tests/test_monitor.py deleted file mode 100644 index 4f035388..00000000 --- a/external_libs/python/zmq/tests/test_monitor.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - - -import sys -import time -import struct - -from unittest import TestCase - -import zmq -from zmq.tests import BaseZMQTestCase, skip_if, skip_pypy -from zmq.utils.monitor import recv_monitor_message - -skip_lt_4 = skip_if(zmq.zmq_version_info() < (4,), "requires zmq >= 4") - -class TestSocketMonitor(BaseZMQTestCase): - - @skip_lt_4 - def test_monitor(self): - """Test monitoring interface for sockets.""" - s_rep = self.context.socket(zmq.REP) - s_req = self.context.socket(zmq.REQ) - self.sockets.extend([s_rep, s_req]) - s_req.bind("tcp://127.0.0.1:6666") - # try monitoring the REP socket - - s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL) - # create listening socket for monitor - s_event = self.context.socket(zmq.PAIR) - self.sockets.append(s_event) - s_event.connect("inproc://monitor.rep") - s_event.linger = 0 - # test receive event for connect event - s_rep.connect("tcp://127.0.0.1:6666") - m = recv_monitor_message(s_event) - if m['event'] == zmq.EVENT_CONNECT_DELAYED: - self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666") - # test receive event for connected event - m = recv_monitor_message(s_event) - self.assertEqual(m['event'], zmq.EVENT_CONNECTED) - self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666") - - # test monitor can be disabled. - s_rep.disable_monitor() - m = recv_monitor_message(s_event) - self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED) - - - @skip_lt_4 - def test_monitor_connected(self): - """Test connected monitoring socket.""" - s_rep = self.context.socket(zmq.REP) - s_req = self.context.socket(zmq.REQ) - self.sockets.extend([s_rep, s_req]) - s_req.bind("tcp://127.0.0.1:6667") - # try monitoring the REP socket - # create listening socket for monitor - s_event = s_rep.get_monitor_socket() - s_event.linger = 0 - self.sockets.append(s_event) - # test receive event for connect event - s_rep.connect("tcp://127.0.0.1:6667") - m = recv_monitor_message(s_event) - if m['event'] == zmq.EVENT_CONNECT_DELAYED: - self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667") - # test receive event for connected event - m = recv_monitor_message(s_event) - self.assertEqual(m['event'], zmq.EVENT_CONNECTED) - self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667") |