summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/examples/heartbeat/heartbeater.py
blob: 180828a679ff196a7b8cf602caec24d56a20d0e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
"""

For use with heart.py

A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts
are tracked based on their DEALER identities.

You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding.

Authors
-------
* MinRK
"""

import time
import zmq
from zmq.eventloop import ioloop, zmqstream


class HeartBeater(object):
    """A basic HeartBeater class
    pingstream: a PUB stream
    pongstream: an ROUTER stream"""
    
    def __init__(self, loop, pingstream, pongstream, period=1000):
        self.loop = loop
        self.period = period
        
        self.pingstream = pingstream
        self.pongstream = pongstream
        self.pongstream.on_recv(self.handle_pong)
        
        self.hearts = set()
        self.responses = set()
        self.lifetime = 0
        self.tic = time.time()
        
        self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
        self.caller.start()
    
    def beat(self):
        toc = time.time()
        self.lifetime += toc-self.tic
        self.tic = toc
        print self.lifetime
        # self.message = str(self.lifetime)
        goodhearts = self.hearts.intersection(self.responses)
        heartfailures = self.hearts.difference(goodhearts)
        newhearts = self.responses.difference(goodhearts)
        # print newhearts, goodhearts, heartfailures
        map(self.handle_new_heart, newhearts)
        map(self.handle_heart_failure, heartfailures)
        self.responses = set()
        print "%i beating hearts: %s"%(len(self.hearts),self.hearts)
        self.pingstream.send(str(self.lifetime))
    
    def handle_new_heart(self, heart):
        print "yay, got new heart %s!"%heart
        self.hearts.add(heart)
    
    def handle_heart_failure(self, heart):
        print "Heart %s failed :("%heart
        self.hearts.remove(heart)
        
    
    def handle_pong(self, msg):
        "if heart is beating"
        if msg[1] == str(self.lifetime):
            self.responses.add(msg[0])
        else:
            print "got bad heartbeat (possibly old?): %s"%msg[1]
        
# sub.setsockopt(zmq.SUBSCRIBE)


if __name__ == '__main__':
    loop = ioloop.IOLoop()
    context = zmq.Context()
    pub = context.socket(zmq.PUB)
    pub.bind('tcp://127.0.0.1:5555')
    router = context.socket(zmq.ROUTER)
    router.bind('tcp://127.0.0.1:5556')
    
    outstream = zmqstream.ZMQStream(pub, loop)
    instream = zmqstream.ZMQStream(router, loop)
    
    hb = HeartBeater(loop, outstream, instream)
    
    loop.start()