summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py')
-rw-r--r--external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py b/external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py
new file mode 100644
index 00000000..180828a6
--- /dev/null
+++ b/external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+"""
+
+For use with heart.py
+
+A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts
+are tracked based on their DEALER identities.
+
+You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding.
+
+Authors
+-------
+* MinRK
+"""
+
+import time
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+
+class HeartBeater(object):
+ """A basic HeartBeater class
+ pingstream: a PUB stream
+ pongstream: an ROUTER stream"""
+
+ def __init__(self, loop, pingstream, pongstream, period=1000):
+ self.loop = loop
+ self.period = period
+
+ self.pingstream = pingstream
+ self.pongstream = pongstream
+ self.pongstream.on_recv(self.handle_pong)
+
+ self.hearts = set()
+ self.responses = set()
+ self.lifetime = 0
+ self.tic = time.time()
+
+ self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
+ self.caller.start()
+
+ def beat(self):
+ toc = time.time()
+ self.lifetime += toc-self.tic
+ self.tic = toc
+ print self.lifetime
+ # self.message = str(self.lifetime)
+ goodhearts = self.hearts.intersection(self.responses)
+ heartfailures = self.hearts.difference(goodhearts)
+ newhearts = self.responses.difference(goodhearts)
+ # print newhearts, goodhearts, heartfailures
+ map(self.handle_new_heart, newhearts)
+ map(self.handle_heart_failure, heartfailures)
+ self.responses = set()
+ print "%i beating hearts: %s"%(len(self.hearts),self.hearts)
+ self.pingstream.send(str(self.lifetime))
+
+ def handle_new_heart(self, heart):
+ print "yay, got new heart %s!"%heart
+ self.hearts.add(heart)
+
+ def handle_heart_failure(self, heart):
+ print "Heart %s failed :("%heart
+ self.hearts.remove(heart)
+
+
+ def handle_pong(self, msg):
+ "if heart is beating"
+ if msg[1] == str(self.lifetime):
+ self.responses.add(msg[0])
+ else:
+ print "got bad heartbeat (possibly old?): %s"%msg[1]
+
+# sub.setsockopt(zmq.SUBSCRIBE)
+
+
+if __name__ == '__main__':
+ loop = ioloop.IOLoop()
+ context = zmq.Context()
+ pub = context.socket(zmq.PUB)
+ pub.bind('tcp://127.0.0.1:5555')
+ router = context.socket(zmq.ROUTER)
+ router.bind('tcp://127.0.0.1:5556')
+
+ outstream = zmqstream.ZMQStream(pub, loop)
+ instream = zmqstream.ZMQStream(router, loop)
+
+ hb = HeartBeater(loop, outstream, instream)
+
+ loop.start()