diff options
Diffstat (limited to 'scripts/external_libs/zmq/sugar/tracker.py')
-rw-r--r-- | scripts/external_libs/zmq/sugar/tracker.py | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/scripts/external_libs/zmq/sugar/tracker.py b/scripts/external_libs/zmq/sugar/tracker.py new file mode 100644 index 00000000..fb8c007f --- /dev/null +++ b/scripts/external_libs/zmq/sugar/tracker.py @@ -0,0 +1,120 @@ +"""Tracker for zero-copy messages with 0MQ.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +import time + +try: + # below 3.3 + from threading import _Event as Event +except (ImportError, AttributeError): + # python throws ImportError, cython throws AttributeError + from threading import Event + +from zmq.error import NotDone +from zmq.backend import Frame + +class MessageTracker(object): + """MessageTracker(*towatch) + + A class for tracking if 0MQ is done using one or more messages. + + When you send a 0MQ message, it is not sent immediately. The 0MQ IO thread + sends the message at some later time. Often you want to know when 0MQ has + actually sent the message though. This is complicated by the fact that + a single 0MQ message can be sent multiple times using different sockets. + This class allows you to track all of the 0MQ usages of a message. + + Parameters + ---------- + *towatch : tuple of Event, MessageTracker, Message instances. + This list of objects to track. This class can track the low-level + Events used by the Message class, other MessageTrackers or + actual Messages. + """ + events = None + peers = None + + def __init__(self, *towatch): + """MessageTracker(*towatch) + + Create a message tracker to track a set of mesages. + + Parameters + ---------- + *towatch : tuple of Event, MessageTracker, Message instances. + This list of objects to track. This class can track the low-level + Events used by the Message class, other MessageTrackers or + actual Messages. + """ + self.events = set() + self.peers = set() + for obj in towatch: + if isinstance(obj, Event): + self.events.add(obj) + elif isinstance(obj, MessageTracker): + self.peers.add(obj) + elif isinstance(obj, Frame): + if not obj.tracker: + raise ValueError("Not a tracked message") + self.peers.add(obj.tracker) + else: + raise TypeError("Require Events or Message Frames, not %s"%type(obj)) + + @property + def done(self): + """Is 0MQ completely done with the message(s) being tracked?""" + for evt in self.events: + if not evt.is_set(): + return False + for pm in self.peers: + if not pm.done: + return False + return True + + def wait(self, timeout=-1): + """mt.wait(timeout=-1) + + Wait for 0MQ to be done with the message or until `timeout`. + + Parameters + ---------- + timeout : float [default: -1, wait forever] + Maximum time in (s) to wait before raising NotDone. + + Returns + ------- + None + if done before `timeout` + + Raises + ------ + NotDone + if `timeout` reached before I am done. + """ + tic = time.time() + if timeout is False or timeout < 0: + remaining = 3600*24*7 # a week + else: + remaining = timeout + done = False + for evt in self.events: + if remaining < 0: + raise NotDone + evt.wait(timeout=remaining) + if not evt.is_set(): + raise NotDone + toc = time.time() + remaining -= (toc-tic) + tic = toc + + for peer in self.peers: + if remaining < 0: + raise NotDone + peer.wait(timeout=remaining) + toc = time.time() + remaining -= (toc-tic) + tic = toc + +__all__ = ['MessageTracker']
\ No newline at end of file |