summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/perf/perf.py
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/perf/perf.py')
-rw-r--r--external_libs/python/pyzmq-14.7.0/perf/perf.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/perf/perf.py b/external_libs/python/pyzmq-14.7.0/perf/perf.py
new file mode 100644
index 00000000..a0254b4a
--- /dev/null
+++ b/external_libs/python/pyzmq-14.7.0/perf/perf.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+#
+# Some original test code Copyright (c) 2007-2010 iMatix Corporation,
+# Used under LGPLv3
+
+import argparse
+import time
+
+from multiprocessing import Process
+
+import zmq
+
+def parse_args(argv=None):
+
+ parser = argparse.ArgumentParser(description='Run a zmq performance test')
+ parser.add_argument('-p', '--poll', action='store_true',
+ help='use a zmq Poller instead of raw send/recv')
+ parser.add_argument('-c', '--copy', action='store_true',
+ help='copy messages instead of using zero-copy')
+ parser.add_argument('-s', '--size', type=int, default=10240,
+ help='size (in bytes) of the test message')
+ parser.add_argument('-n', '--count', type=int, default=10240,
+ help='number of test messages to send')
+ parser.add_argument('--url', dest='url', type=str, default='tcp://127.0.0.1:5555',
+ help='the zmq URL on which to run the test')
+ parser.add_argument(dest='test', type=str, default='lat', choices=['lat', 'thr'],
+ help='which test to run')
+ return parser.parse_args(argv)
+
+def latency_echo(url, count, poll, copy):
+ """echo messages on a REP socket
+
+ Should be started before `latency`
+ """
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.REP)
+
+ if poll:
+ p = zmq.Poller()
+ p.register(s)
+
+ s.bind(url)
+
+ block = zmq.NOBLOCK if poll else 0
+
+ for i in range(count):
+ if poll:
+ res = p.poll()
+ msg = s.recv(block, copy=copy)
+
+ if poll:
+ res = p.poll()
+ s.send(msg, block, copy=copy)
+
+ msg = s.recv()
+ assert msg == b'done'
+
+ s.close()
+ ctx.term()
+
+def latency(url, count, size, poll, copy):
+ """Perform a latency test"""
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.REQ)
+ s.setsockopt(zmq.LINGER, -1)
+ s.connect(url)
+ if poll:
+ p = zmq.Poller()
+ p.register(s)
+
+ msg = b' ' * size
+
+ watch = zmq.Stopwatch()
+
+ block = zmq.NOBLOCK if poll else 0
+ time.sleep(1)
+ watch.start()
+
+ for i in range (0, count):
+ if poll:
+ res = p.poll()
+ assert(res[0][1] & zmq.POLLOUT)
+ s.send(msg, block, copy=copy)
+
+ if poll:
+ res = p.poll()
+ assert(res[0][1] & zmq.POLLIN)
+ msg = s.recv(block, copy=copy)
+
+ assert len(msg) == size
+
+ elapsed = watch.stop()
+
+ s.send(b'done')
+
+ latency = elapsed / (count * 2.)
+
+ print ("message size : %8i [B]" % (size, ))
+ print ("roundtrip count: %8i [msgs]" % (count, ))
+ print ("mean latency : %12.3f [µs]" % (latency, ))
+ print ("test time : %12.3f [s]" % (elapsed * 1e-6, ))
+
+def pusher(url, count, size, copy, poll):
+ """send a bunch of messages on a PUSH socket"""
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PUSH)
+
+ # Add your socket options here.
+ # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
+
+ if poll:
+ p = zmq.Poller()
+ p.register(s)
+
+ s.connect(url)
+
+ msg = zmq.Message(b' ' * size)
+ block = zmq.NOBLOCK if poll else 0
+
+ for i in range(count):
+ if poll:
+ res = p.poll()
+ assert(res[0][1] & zmq.POLLOUT)
+ s.send(msg, block, copy=copy)
+
+ s.close()
+ ctx.term()
+
+def throughput(url, count, size, poll, copy):
+ """recv a bunch of messages on a PULL socket
+
+ Should be started before `pusher`
+ """
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PULL)
+
+ # Add your socket options here.
+ # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
+
+ if poll:
+ p = zmq.Poller()
+ p.register(s)
+
+ s.bind(url)
+
+ watch = zmq.Stopwatch()
+ block = zmq.NOBLOCK if poll else 0
+
+ # Wait for the other side to connect.
+ msg = s.recv()
+ assert len (msg) == size
+
+ watch.start()
+ for i in range (count-1):
+ if poll:
+ res = p.poll()
+ msg = s.recv(block, copy=copy)
+ elapsed = watch.stop()
+ if elapsed == 0:
+ elapsed = 1
+
+ throughput = (1e6 * float(count)) / float(elapsed)
+ megabits = float(throughput * size * 8) / 1e6
+
+ print ("message size : %8i [B]" % (size, ))
+ print ("message count : %8i [msgs]" % (count, ))
+ print ("mean throughput: %8.0f [msg/s]" % (throughput, ))
+ print ("mean throughput: %12.3f [Mb/s]" % (megabits, ))
+ print ("test time : %12.3f [s]" % (elapsed * 1e-6, ))
+
+
+def main():
+ args = parse_args()
+ tic = time.time()
+ if args.test == 'lat':
+ bg = Process(target=latency_echo, args=(args.url, args.count, args.poll, args.copy))
+ bg.start()
+ latency(args.url, args.count, args.size, args.poll, args.copy)
+ elif args.test == 'thr':
+ bg = Process(target=throughput, args=(args.url, args.count, args.size, args.poll, args.copy))
+ bg.start()
+ pusher(args.url, args.count, args.size, args.poll, args.copy)
+ bg.join()
+ toc = time.time()
+ if (toc - tic) < 3:
+ print ("For best results, tests should take at least a few seconds.")
+
+if __name__ == '__main__':
+ main()