summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/tests/test_monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/console/zmq/tests/test_monitor.py')
-rwxr-xr-xsrc/console/zmq/tests/test_monitor.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/src/console/zmq/tests/test_monitor.py b/src/console/zmq/tests/test_monitor.py
new file mode 100755
index 00000000..4f035388
--- /dev/null
+++ b/src/console/zmq/tests/test_monitor.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import sys
+import time
+import struct
+
+from unittest import TestCase
+
+import zmq
+from zmq.tests import BaseZMQTestCase, skip_if, skip_pypy
+from zmq.utils.monitor import recv_monitor_message
+
+skip_lt_4 = skip_if(zmq.zmq_version_info() < (4,), "requires zmq >= 4")
+
+class TestSocketMonitor(BaseZMQTestCase):
+
+ @skip_lt_4
+ def test_monitor(self):
+ """Test monitoring interface for sockets."""
+ s_rep = self.context.socket(zmq.REP)
+ s_req = self.context.socket(zmq.REQ)
+ self.sockets.extend([s_rep, s_req])
+ s_req.bind("tcp://127.0.0.1:6666")
+ # try monitoring the REP socket
+
+ s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
+ # create listening socket for monitor
+ s_event = self.context.socket(zmq.PAIR)
+ self.sockets.append(s_event)
+ s_event.connect("inproc://monitor.rep")
+ s_event.linger = 0
+ # test receive event for connect event
+ s_rep.connect("tcp://127.0.0.1:6666")
+ m = recv_monitor_message(s_event)
+ if m['event'] == zmq.EVENT_CONNECT_DELAYED:
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
+ # test receive event for connected event
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
+
+ # test monitor can be disabled.
+ s_rep.disable_monitor()
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
+
+
+ @skip_lt_4
+ def test_monitor_connected(self):
+ """Test connected monitoring socket."""
+ s_rep = self.context.socket(zmq.REP)
+ s_req = self.context.socket(zmq.REQ)
+ self.sockets.extend([s_rep, s_req])
+ s_req.bind("tcp://127.0.0.1:6667")
+ # try monitoring the REP socket
+ # create listening socket for monitor
+ s_event = s_rep.get_monitor_socket()
+ s_event.linger = 0
+ self.sockets.append(s_event)
+ # test receive event for connect event
+ s_rep.connect("tcp://127.0.0.1:6667")
+ m = recv_monitor_message(s_event)
+ if m['event'] == zmq.EVENT_CONNECT_DELAYED:
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
+ # test receive event for connected event
+ m = recv_monitor_message(s_event)
+ self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
+ self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")