diff options
author | 2015-08-24 13:22:48 +0300 | |
---|---|---|
committer | 2015-08-24 13:22:48 +0300 | |
commit | dab741a80699f86e86c91718872a052cca9bbb25 (patch) | |
tree | 1959c4a2cea440170a5113dcb067796cb20ffb64 /external_libs/python/pyzmq-14.7.0/examples/pubsub | |
parent | d3f26ece7d4383df0b22fe9c3cb3e695381ec737 (diff) |
Fixed dependencies of Control Plane to use external_lib sources
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/examples/pubsub')
4 files changed, 251 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/examples/pubsub/publisher.py b/external_libs/python/pyzmq-14.7.0/examples/pubsub/publisher.py new file mode 100644 index 00000000..a2ce6c9c --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/pubsub/publisher.py @@ -0,0 +1,57 @@ +"""A test that publishes NumPy arrays. + +Uses REQ/REP (on PUB/SUB socket + 1) to synchronize +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import time + +import zmq +import numpy + +def sync(bind_to): + # use bind socket + 1 + sync_with = ':'.join(bind_to.split(':')[:-1] + + [str(int(bind_to.split(':')[-1]) + 1)]) + ctx = zmq.Context.instance() + s = ctx.socket(zmq.REP) + s.bind(sync_with) + print "Waiting for subscriber to connect..." + s.recv() + print " Done." + s.send('GO') + +def main(): + if len (sys.argv) != 4: + print 'usage: publisher <bind-to> <array-size> <array-count>' + sys.exit (1) + + try: + bind_to = sys.argv[1] + array_size = int(sys.argv[2]) + array_count = int (sys.argv[3]) + except (ValueError, OverflowError), e: + print 'array-size and array-count must be integers' + sys.exit (1) + + ctx = zmq.Context() + s = ctx.socket(zmq.PUB) + s.bind(bind_to) + + sync(bind_to) + + print "Sending arrays..." + for i in range(array_count): + a = numpy.random.rand(array_size, array_size) + s.send_pyobj(a) + print " Done." + +if __name__ == "__main__": + main() diff --git a/external_libs/python/pyzmq-14.7.0/examples/pubsub/subscriber.py b/external_libs/python/pyzmq-14.7.0/examples/pubsub/subscriber.py new file mode 100644 index 00000000..b996ad8d --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/pubsub/subscriber.py @@ -0,0 +1,74 @@ +"""A test that subscribes to NumPy arrays. + +Uses REQ/REP (on PUB/SUB socket + 1) to synchronize +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + + +import sys +import time + +import zmq +import numpy + +def sync(connect_to): + # use connect socket + 1 + sync_with = ':'.join(connect_to.split(':')[:-1] + + [str(int(connect_to.split(':')[-1]) + 1)] + ) + ctx = zmq.Context.instance() + s = ctx.socket(zmq.REQ) + s.connect(sync_with) + s.send('READY') + s.recv() + +def main(): + if len (sys.argv) != 3: + print 'usage: subscriber <connect_to> <array-count>' + sys.exit (1) + + try: + connect_to = sys.argv[1] + array_count = int (sys.argv[2]) + except (ValueError, OverflowError), e: + print 'array-count must be integers' + sys.exit (1) + + ctx = zmq.Context() + s = ctx.socket(zmq.SUB) + s.connect(connect_to) + s.setsockopt(zmq.SUBSCRIBE,'') + + sync(connect_to) + + start = time.clock() + + print "Receiving arrays..." + for i in range(array_count): + a = s.recv_pyobj() + print " Done." + + end = time.clock() + + elapsed = (end - start) * 1000000 + if elapsed == 0: + elapsed = 1 + throughput = (1000000.0 * float (array_count)) / float (elapsed) + message_size = a.nbytes + megabits = float (throughput * message_size * 8) / 1000000 + + print "message size: %.0f [B]" % (message_size, ) + print "array count: %.0f" % (array_count, ) + print "mean throughput: %.0f [msg/s]" % (throughput, ) + print "mean throughput: %.3f [Mb/s]" % (megabits, ) + + time.sleep(1.0) + +if __name__ == "__main__": + main() diff --git a/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_pub.py b/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_pub.py new file mode 100644 index 00000000..73b3d1c5 --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_pub.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +"""Simple example of publish/subscribe illustrating topics. + +Publisher and subscriber can be started in any order, though if publisher +starts first, any messages sent before subscriber starts are lost. More than +one subscriber can listen, and they can listen to different topics. + +Topic filtering is done simply on the start of the string, e.g. listening to +'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to +catch 'weather'. +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import itertools +import sys +import time + +import zmq + +def main(): + if len (sys.argv) != 2: + print 'usage: publisher <bind-to>' + sys.exit (1) + + bind_to = sys.argv[1] + + all_topics = ['sports.general','sports.football','sports.basketball', + 'stocks.general','stocks.GOOG','stocks.AAPL', + 'weather'] + + ctx = zmq.Context() + s = ctx.socket(zmq.PUB) + s.bind(bind_to) + + print "Starting broadcast on topics:" + print " %s" % all_topics + print "Hit Ctrl-C to stop broadcasting." + print "Waiting so subscriber sockets can connect..." + print + time.sleep(1.0) + + msg_counter = itertools.count() + try: + for topic in itertools.cycle(all_topics): + msg_body = str(msg_counter.next()) + print ' Topic: %s, msg:%s' % (topic, msg_body) + s.send_multipart([topic, msg_body]) + # short wait so we don't hog the cpu + time.sleep(0.1) + except KeyboardInterrupt: + pass + + print "Waiting for message queues to flush..." + time.sleep(0.5) + print "Done." + +if __name__ == "__main__": + main() diff --git a/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_sub.py b/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_sub.py new file mode 100644 index 00000000..4a61fb55 --- /dev/null +++ b/external_libs/python/pyzmq-14.7.0/examples/pubsub/topics_sub.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +"""Simple example of publish/subscribe illustrating topics. + +Publisher and subscriber can be started in any order, though if publisher +starts first, any messages sent before subscriber starts are lost. More than +one subscriber can listen, and they can listen to different topics. + +Topic filtering is done simply on the start of the string, e.g. listening to +'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to +catch 'weather'. +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger, Fernando Perez +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import time + +import zmq +import numpy + +def main(): + if len (sys.argv) < 2: + print 'usage: subscriber <connect_to> [topic topic ...]' + sys.exit (1) + + connect_to = sys.argv[1] + topics = sys.argv[2:] + + ctx = zmq.Context() + s = ctx.socket(zmq.SUB) + s.connect(connect_to) + + # manage subscriptions + if not topics: + print "Receiving messages on ALL topics..." + s.setsockopt(zmq.SUBSCRIBE,'') + else: + print "Receiving messages on topics: %s ..." % topics + for t in topics: + s.setsockopt(zmq.SUBSCRIBE,t) + print + try: + while True: + topic, msg = s.recv_multipart() + print ' Topic: %s, msg:%s' % (topic, msg) + except KeyboardInterrupt: + pass + print "Done." + +if __name__ == "__main__": + main() |