diff options
author | Hanoh Haim <hhaim@cisco.com> | 2015-08-26 15:50:06 +0300 |
---|---|---|
committer | Hanoh Haim <hhaim@cisco.com> | 2015-08-26 15:50:06 +0300 |
commit | 53f0e28d7f30c7175cbb15884c309613593859d8 (patch) | |
tree | e75ff499233519baa6c50c60fa0c726da7b42e03 /external_libs/python/zmq/eventloop/minitornado/ioloop.py | |
parent | a628a35b10fbd38211c353f506a8c49c2cc3dd7e (diff) | |
parent | 00d6f001971b324bb9c884aaf0384a4cee076550 (diff) |
Merge branch 'lib_fix'
Diffstat (limited to 'external_libs/python/zmq/eventloop/minitornado/ioloop.py')
-rw-r--r-- | external_libs/python/zmq/eventloop/minitornado/ioloop.py | 829 |
1 files changed, 0 insertions, 829 deletions
diff --git a/external_libs/python/zmq/eventloop/minitornado/ioloop.py b/external_libs/python/zmq/eventloop/minitornado/ioloop.py deleted file mode 100644 index 710a3ecb..00000000 --- a/external_libs/python/zmq/eventloop/minitornado/ioloop.py +++ /dev/null @@ -1,829 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2009 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""An I/O event loop for non-blocking sockets. - -Typical applications will use a single `IOLoop` object, in the -`IOLoop.instance` singleton. The `IOLoop.start` method should usually -be called at the end of the ``main()`` function. Atypical applications may -use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest` -case. - -In addition to I/O events, the `IOLoop` can also schedule time-based events. -`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`. -""" - -from __future__ import absolute_import, division, print_function, with_statement - -import datetime -import errno -import functools -import heapq -import logging -import numbers -import os -import select -import sys -import threading -import time -import traceback - -from .concurrent import Future, TracebackFuture -from .log import app_log, gen_log -from . import stack_context -from .util import Configurable - -try: - import signal -except ImportError: - signal = None - -try: - import thread # py2 -except ImportError: - import _thread as thread # py3 - -from .platform.auto import set_close_exec, Waker - - -class TimeoutError(Exception): - pass - - -class IOLoop(Configurable): - """A level-triggered I/O loop. - - We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they - are available, or else we fall back on select(). If you are - implementing a system that needs to handle thousands of - simultaneous connections, you should use a system that supports - either ``epoll`` or ``kqueue``. - - Example usage for a simple TCP server:: - - import errno - import functools - import ioloop - import socket - - def connection_ready(sock, fd, events): - while True: - try: - connection, address = sock.accept() - except socket.error, e: - if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): - raise - return - connection.setblocking(0) - handle_connection(connection, address) - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setblocking(0) - sock.bind(("", port)) - sock.listen(128) - - io_loop = ioloop.IOLoop.instance() - callback = functools.partial(connection_ready, sock) - io_loop.add_handler(sock.fileno(), callback, io_loop.READ) - io_loop.start() - - """ - # Constants from the epoll module - _EPOLLIN = 0x001 - _EPOLLPRI = 0x002 - _EPOLLOUT = 0x004 - _EPOLLERR = 0x008 - _EPOLLHUP = 0x010 - _EPOLLRDHUP = 0x2000 - _EPOLLONESHOT = (1 << 30) - _EPOLLET = (1 << 31) - - # Our events map exactly to the epoll events - NONE = 0 - READ = _EPOLLIN - WRITE = _EPOLLOUT - ERROR = _EPOLLERR | _EPOLLHUP - - # Global lock for creating global IOLoop instance - _instance_lock = threading.Lock() - - _current = threading.local() - - @staticmethod - def instance(): - """Returns a global `IOLoop` instance. - - Most applications have a single, global `IOLoop` running on the - main thread. Use this method to get this instance from - another thread. To get the current thread's `IOLoop`, use `current()`. - """ - if not hasattr(IOLoop, "_instance"): - with IOLoop._instance_lock: - if not hasattr(IOLoop, "_instance"): - # New instance after double check - IOLoop._instance = IOLoop() - return IOLoop._instance - - @staticmethod - def initialized(): - """Returns true if the singleton instance has been created.""" - return hasattr(IOLoop, "_instance") - - def install(self): - """Installs this `IOLoop` object as the singleton instance. - - This is normally not necessary as `instance()` will create - an `IOLoop` on demand, but you may want to call `install` to use - a custom subclass of `IOLoop`. - """ - assert not IOLoop.initialized() - IOLoop._instance = self - - @staticmethod - def current(): - """Returns the current thread's `IOLoop`. - - If an `IOLoop` is currently running or has been marked as current - by `make_current`, returns that instance. Otherwise returns - `IOLoop.instance()`, i.e. the main thread's `IOLoop`. - - A common pattern for classes that depend on ``IOLoops`` is to use - a default argument to enable programs with multiple ``IOLoops`` - but not require the argument for simpler applications:: - - class MyClass(object): - def __init__(self, io_loop=None): - self.io_loop = io_loop or IOLoop.current() - - In general you should use `IOLoop.current` as the default when - constructing an asynchronous object, and use `IOLoop.instance` - when you mean to communicate to the main thread from a different - one. - """ - current = getattr(IOLoop._current, "instance", None) - if current is None: - return IOLoop.instance() - return current - - def make_current(self): - """Makes this the `IOLoop` for the current thread. - - An `IOLoop` automatically becomes current for its thread - when it is started, but it is sometimes useful to call - `make_current` explictly before starting the `IOLoop`, - so that code run at startup time can find the right - instance. - """ - IOLoop._current.instance = self - - @staticmethod - def clear_current(): - IOLoop._current.instance = None - - @classmethod - def configurable_base(cls): - return IOLoop - - @classmethod - def configurable_default(cls): - # this is the only patch to IOLoop: - from zmq.eventloop.ioloop import ZMQIOLoop - return ZMQIOLoop - # the remainder of this method is unused, - # but left for preservation reasons - if hasattr(select, "epoll"): - from tornado.platform.epoll import EPollIOLoop - return EPollIOLoop - if hasattr(select, "kqueue"): - # Python 2.6+ on BSD or Mac - from tornado.platform.kqueue import KQueueIOLoop - return KQueueIOLoop - from tornado.platform.select import SelectIOLoop - return SelectIOLoop - - def initialize(self): - pass - - def close(self, all_fds=False): - """Closes the `IOLoop`, freeing any resources used. - - If ``all_fds`` is true, all file descriptors registered on the - IOLoop will be closed (not just the ones created by the - `IOLoop` itself). - - Many applications will only use a single `IOLoop` that runs for the - entire lifetime of the process. In that case closing the `IOLoop` - is not necessary since everything will be cleaned up when the - process exits. `IOLoop.close` is provided mainly for scenarios - such as unit tests, which create and destroy a large number of - ``IOLoops``. - - An `IOLoop` must be completely stopped before it can be closed. This - means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must - be allowed to return before attempting to call `IOLoop.close()`. - Therefore the call to `close` will usually appear just after - the call to `start` rather than near the call to `stop`. - - .. versionchanged:: 3.1 - If the `IOLoop` implementation supports non-integer objects - for "file descriptors", those objects will have their - ``close`` method when ``all_fds`` is true. - """ - raise NotImplementedError() - - def add_handler(self, fd, handler, events): - """Registers the given handler to receive the given events for fd. - - The ``events`` argument is a bitwise or of the constants - ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. - - When an event occurs, ``handler(fd, events)`` will be run. - """ - raise NotImplementedError() - - def update_handler(self, fd, events): - """Changes the events we listen for fd.""" - raise NotImplementedError() - - def remove_handler(self, fd): - """Stop listening for events on fd.""" - raise NotImplementedError() - - def set_blocking_signal_threshold(self, seconds, action): - """Sends a signal if the `IOLoop` is blocked for more than - ``s`` seconds. - - Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy - platform. - - The action parameter is a Python signal handler. Read the - documentation for the `signal` module for more information. - If ``action`` is None, the process will be killed if it is - blocked for too long. - """ - raise NotImplementedError() - - def set_blocking_log_threshold(self, seconds): - """Logs a stack trace if the `IOLoop` is blocked for more than - ``s`` seconds. - - Equivalent to ``set_blocking_signal_threshold(seconds, - self.log_stack)`` - """ - self.set_blocking_signal_threshold(seconds, self.log_stack) - - def log_stack(self, signal, frame): - """Signal handler to log the stack trace of the current thread. - - For use with `set_blocking_signal_threshold`. - """ - gen_log.warning('IOLoop blocked for %f seconds in\n%s', - self._blocking_signal_threshold, - ''.join(traceback.format_stack(frame))) - - def start(self): - """Starts the I/O loop. - - The loop will run until one of the callbacks calls `stop()`, which - will make the loop stop after the current event iteration completes. - """ - raise NotImplementedError() - - def stop(self): - """Stop the I/O loop. - - If the event loop is not currently running, the next call to `start()` - will return immediately. - - To use asynchronous methods from otherwise-synchronous code (such as - unit tests), you can start and stop the event loop like this:: - - ioloop = IOLoop() - async_method(ioloop=ioloop, callback=ioloop.stop) - ioloop.start() - - ``ioloop.start()`` will return after ``async_method`` has run - its callback, whether that callback was invoked before or - after ``ioloop.start``. - - Note that even after `stop` has been called, the `IOLoop` is not - completely stopped until `IOLoop.start` has also returned. - Some work that was scheduled before the call to `stop` may still - be run before the `IOLoop` shuts down. - """ - raise NotImplementedError() - - def run_sync(self, func, timeout=None): - """Starts the `IOLoop`, runs the given function, and stops the loop. - - If the function returns a `.Future`, the `IOLoop` will run - until the future is resolved. If it raises an exception, the - `IOLoop` will stop and the exception will be re-raised to the - caller. - - The keyword-only argument ``timeout`` may be used to set - a maximum duration for the function. If the timeout expires, - a `TimeoutError` is raised. - - This method is useful in conjunction with `tornado.gen.coroutine` - to allow asynchronous calls in a ``main()`` function:: - - @gen.coroutine - def main(): - # do stuff... - - if __name__ == '__main__': - IOLoop.instance().run_sync(main) - """ - future_cell = [None] - - def run(): - try: - result = func() - except Exception: - future_cell[0] = TracebackFuture() - future_cell[0].set_exc_info(sys.exc_info()) - else: - if isinstance(result, Future): - future_cell[0] = result - else: - future_cell[0] = Future() - future_cell[0].set_result(result) - self.add_future(future_cell[0], lambda future: self.stop()) - self.add_callback(run) - if timeout is not None: - timeout_handle = self.add_timeout(self.time() + timeout, self.stop) - self.start() - if timeout is not None: - self.remove_timeout(timeout_handle) - if not future_cell[0].done(): - raise TimeoutError('Operation timed out after %s seconds' % timeout) - return future_cell[0].result() - - def time(self): - """Returns the current time according to the `IOLoop`'s clock. - - The return value is a floating-point number relative to an - unspecified time in the past. - - By default, the `IOLoop`'s time function is `time.time`. However, - it may be configured to use e.g. `time.monotonic` instead. - Calls to `add_timeout` that pass a number instead of a - `datetime.timedelta` should use this function to compute the - appropriate time, so they can work no matter what time function - is chosen. - """ - return time.time() - - def add_timeout(self, deadline, callback): - """Runs the ``callback`` at the time ``deadline`` from the I/O loop. - - Returns an opaque handle that may be passed to - `remove_timeout` to cancel. - - ``deadline`` may be a number denoting a time (on the same - scale as `IOLoop.time`, normally `time.time`), or a - `datetime.timedelta` object for a deadline relative to the - current time. - - Note that it is not safe to call `add_timeout` from other threads. - Instead, you must use `add_callback` to transfer control to the - `IOLoop`'s thread, and then call `add_timeout` from there. - """ - raise NotImplementedError() - - def remove_timeout(self, timeout): - """Cancels a pending timeout. - - The argument is a handle as returned by `add_timeout`. It is - safe to call `remove_timeout` even if the callback has already - been run. - """ - raise NotImplementedError() - - def add_callback(self, callback, *args, **kwargs): - """Calls the given callback on the next I/O loop iteration. - - It is safe to call this method from any thread at any time, - except from a signal handler. Note that this is the **only** - method in `IOLoop` that makes this thread-safety guarantee; all - other interaction with the `IOLoop` must be done from that - `IOLoop`'s thread. `add_callback()` may be used to transfer - control from other threads to the `IOLoop`'s thread. - - To add a callback from a signal handler, see - `add_callback_from_signal`. - """ - raise NotImplementedError() - - def add_callback_from_signal(self, callback, *args, **kwargs): - """Calls the given callback on the next I/O loop iteration. - - Safe for use from a Python signal handler; should not be used - otherwise. - - Callbacks added with this method will be run without any - `.stack_context`, to avoid picking up the context of the function - that was interrupted by the signal. - """ - raise NotImplementedError() - - def add_future(self, future, callback): - """Schedules a callback on the ``IOLoop`` when the given - `.Future` is finished. - - The callback is invoked with one argument, the - `.Future`. - """ - assert isinstance(future, Future) - callback = stack_context.wrap(callback) - future.add_done_callback( - lambda future: self.add_callback(callback, future)) - - def _run_callback(self, callback): - """Runs a callback with error handling. - - For use in subclasses. - """ - try: - callback() - except Exception: - self.handle_callback_exception(callback) - - def handle_callback_exception(self, callback): - """This method is called whenever a callback run by the `IOLoop` - throws an exception. - - By default simply logs the exception as an error. Subclasses - may override this method to customize reporting of exceptions. - - The exception itself is not passed explicitly, but is available - in `sys.exc_info`. - """ - app_log.error("Exception in callback %r", callback, exc_info=True) - - -class PollIOLoop(IOLoop): - """Base class for IOLoops built around a select-like function. - - For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` - (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or - `tornado.platform.select.SelectIOLoop` (all platforms). - """ - def initialize(self, impl, time_func=None): - super(PollIOLoop, self).initialize() - self._impl = impl - if hasattr(self._impl, 'fileno'): - set_close_exec(self._impl.fileno()) - self.time_func = time_func or time.time - self._handlers = {} - self._events = {} - self._callbacks = [] - self._callback_lock = threading.Lock() - self._timeouts = [] - self._cancellations = 0 - self._running = False - self._stopped = False - self._closing = False - self._thread_ident = None - self._blocking_signal_threshold = None - - # Create a pipe that we send bogus data to when we want to wake - # the I/O loop when it is idle - self._waker = Waker() - self.add_handler(self._waker.fileno(), - lambda fd, events: self._waker.consume(), - self.READ) - - def close(self, all_fds=False): - with self._callback_lock: - self._closing = True - self.remove_handler(self._waker.fileno()) - if all_fds: - for fd in self._handlers.keys(): - try: - close_method = getattr(fd, 'close', None) - if close_method is not None: - close_method() - else: - os.close(fd) - except Exception: - gen_log.debug("error closing fd %s", fd, exc_info=True) - self._waker.close() - self._impl.close() - - def add_handler(self, fd, handler, events): - self._handlers[fd] = stack_context.wrap(handler) - self._impl.register(fd, events | self.ERROR) - - def update_handler(self, fd, events): - self._impl.modify(fd, events | self.ERROR) - - def remove_handler(self, fd): - self._handlers.pop(fd, None) - self._events.pop(fd, None) - try: - self._impl.unregister(fd) - except Exception: - gen_log.debug("Error deleting fd from IOLoop", exc_info=True) - - def set_blocking_signal_threshold(self, seconds, action): - if not hasattr(signal, "setitimer"): - gen_log.error("set_blocking_signal_threshold requires a signal module " - "with the setitimer method") - return - self._blocking_signal_threshold = seconds - if seconds is not None: - signal.signal(signal.SIGALRM, - action if action is not None else signal.SIG_DFL) - - def start(self): - if not logging.getLogger().handlers: - # The IOLoop catches and logs exceptions, so it's - # important that log output be visible. However, python's - # default behavior for non-root loggers (prior to python - # 3.2) is to print an unhelpful "no handlers could be - # found" message rather than the actual log entry, so we - # must explicitly configure logging if we've made it this - # far without anything. - logging.basicConfig() - if self._stopped: - self._stopped = False - return - old_current = getattr(IOLoop._current, "instance", None) - IOLoop._current.instance = self - self._thread_ident = thread.get_ident() - self._running = True - - # signal.set_wakeup_fd closes a race condition in event loops: - # a signal may arrive at the beginning of select/poll/etc - # before it goes into its interruptible sleep, so the signal - # will be consumed without waking the select. The solution is - # for the (C, synchronous) signal handler to write to a pipe, - # which will then be seen by select. - # - # In python's signal handling semantics, this only matters on the - # main thread (fortunately, set_wakeup_fd only works on the main - # thread and will raise a ValueError otherwise). - # - # If someone has already set a wakeup fd, we don't want to - # disturb it. This is an issue for twisted, which does its - # SIGCHILD processing in response to its own wakeup fd being - # written to. As long as the wakeup fd is registered on the IOLoop, - # the loop will still wake up and everything should work. - old_wakeup_fd = None - if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': - # requires python 2.6+, unix. set_wakeup_fd exists but crashes - # the python process on windows. - try: - old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) - if old_wakeup_fd != -1: - # Already set, restore previous value. This is a little racy, - # but there's no clean get_wakeup_fd and in real use the - # IOLoop is just started once at the beginning. - signal.set_wakeup_fd(old_wakeup_fd) - old_wakeup_fd = None - except ValueError: # non-main thread - pass - - while True: - poll_timeout = 3600.0 - - # Prevent IO event starvation by delaying new callbacks - # to the next iteration of the event loop. - with self._callback_lock: - callbacks = self._callbacks - self._callbacks = [] - for callback in callbacks: - self._run_callback(callback) - - if self._timeouts: - now = self.time() - while self._timeouts: - if self._timeouts[0].callback is None: - # the timeout was cancelled - heapq.heappop(self._timeouts) - self._cancellations -= 1 - elif self._timeouts[0].deadline <= now: - timeout = heapq.heappop(self._timeouts) - self._run_callback(timeout.callback) - else: - seconds = self._timeouts[0].deadline - now - poll_timeout = min(seconds, poll_timeout) - break - if (self._cancellations > 512 - and self._cancellations > (len(self._timeouts) >> 1)): - # Clean up the timeout queue when it gets large and it's - # more than half cancellations. - self._cancellations = 0 - self._timeouts = [x for x in self._timeouts - if x.callback is not None] - heapq.heapify(self._timeouts) - - if self._callbacks: - # If any callbacks or timeouts called add_callback, - # we don't want to wait in poll() before we run them. - poll_timeout = 0.0 - - if not self._running: - break - - if self._blocking_signal_threshold is not None: - # clear alarm so it doesn't fire while poll is waiting for - # events. - signal.setitimer(signal.ITIMER_REAL, 0, 0) - - try: - event_pairs = self._impl.poll(poll_timeout) - except Exception as e: - # Depending on python version and IOLoop implementation, - # different exception types may be thrown and there are - # two ways EINTR might be signaled: - # * e.errno == errno.EINTR - # * e.args is like (errno.EINTR, 'Interrupted system call') - if (getattr(e, 'errno', None) == errno.EINTR or - (isinstance(getattr(e, 'args', None), tuple) and - len(e.args) == 2 and e.args[0] == errno.EINTR)): - continue - else: - raise - - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, - self._blocking_signal_threshold, 0) - - # Pop one fd at a time from the set of pending fds and run - # its handler. Since that handler may perform actions on - # other file descriptors, there may be reentrant calls to - # this IOLoop that update self._events - self._events.update(event_pairs) - while self._events: - fd, events = self._events.popitem() - try: - self._handlers[fd](fd, events) - except (OSError, IOError) as e: - if e.args[0] == errno.EPIPE: - # Happens when the client closes the connection - pass - else: - app_log.error("Exception in I/O handler for fd %s", - fd, exc_info=True) - except Exception: - app_log.error("Exception in I/O handler for fd %s", - fd, exc_info=True) - # reset the stopped flag so another start/stop pair can be issued - self._stopped = False - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, 0, 0) - IOLoop._current.instance = old_current - if old_wakeup_fd is not None: - signal.set_wakeup_fd(old_wakeup_fd) - - def stop(self): - self._running = False - self._stopped = True - self._waker.wake() - - def time(self): - return self.time_func() - - def add_timeout(self, deadline, callback): - timeout = _Timeout(deadline, stack_context.wrap(callback), self) - heapq.heappush(self._timeouts, timeout) - return timeout - - def remove_timeout(self, timeout): - # Removing from a heap is complicated, so just leave the defunct - # timeout object in the queue (see discussion in - # http://docs.python.org/library/heapq.html). - # If this turns out to be a problem, we could add a garbage - # collection pass whenever there are too many dead timeouts. - timeout.callback = None - self._cancellations += 1 - - def add_callback(self, callback, *args, **kwargs): - with self._callback_lock: - if self._closing: - raise RuntimeError("IOLoop is closing") - list_empty = not self._callbacks - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - if list_empty and thread.get_ident() != self._thread_ident: - # If we're in the IOLoop's thread, we know it's not currently - # polling. If we're not, and we added the first callback to an - # empty list, we may need to wake it up (it may wake up on its - # own, but an occasional extra wake is harmless). Waking - # up a polling IOLoop is relatively expensive, so we try to - # avoid it when we can. - self._waker.wake() - - def add_callback_from_signal(self, callback, *args, **kwargs): - with stack_context.NullContext(): - if thread.get_ident() != self._thread_ident: - # if the signal is handled on another thread, we can add - # it normally (modulo the NullContext) - self.add_callback(callback, *args, **kwargs) - else: - # If we're on the IOLoop's thread, we cannot use - # the regular add_callback because it may deadlock on - # _callback_lock. Blindly insert into self._callbacks. - # This is safe because the GIL makes list.append atomic. - # One subtlety is that if the signal interrupted the - # _callback_lock block in IOLoop.start, we may modify - # either the old or new version of self._callbacks, - # but either way will work. - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - - -class _Timeout(object): - """An IOLoop timeout, a UNIX timestamp and a callback""" - - # Reduce memory overhead when there are lots of pending callbacks - __slots__ = ['deadline', 'callback'] - - def __init__(self, deadline, callback, io_loop): - if isinstance(deadline, numbers.Real): - self.deadline = deadline - elif isinstance(deadline, datetime.timedelta): - self.deadline = io_loop.time() + _Timeout.timedelta_to_seconds(deadline) - else: - raise TypeError("Unsupported deadline %r" % deadline) - self.callback = callback - - @staticmethod - def timedelta_to_seconds(td): - """Equivalent to td.total_seconds() (introduced in python 2.7).""" - return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - - # Comparison methods to sort by deadline, with object id as a tiebreaker - # to guarantee a consistent ordering. The heapq module uses __le__ - # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons - # use __lt__). - def __lt__(self, other): - return ((self.deadline, id(self)) < - (other.deadline, id(other))) - - def __le__(self, other): - return ((self.deadline, id(self)) <= - (other.deadline, id(other))) - - -class PeriodicCallback(object): - """Schedules the given callback to be called periodically. - - The callback is called every ``callback_time`` milliseconds. - - `start` must be called after the `PeriodicCallback` is created. - """ - def __init__(self, callback, callback_time, io_loop=None): - self.callback = callback - if callback_time <= 0: - raise ValueError("Periodic callback must have a positive callback_time") - self.callback_time = callback_time - self.io_loop = io_loop or IOLoop.current() - self._running = False - self._timeout = None - - def start(self): - """Starts the timer.""" - self._running = True - self._next_timeout = self.io_loop.time() - self._schedule_next() - - def stop(self): - """Stops the timer.""" - self._running = False - if self._timeout is not None: - self.io_loop.remove_timeout(self._timeout) - self._timeout = None - - def _run(self): - if not self._running: - return - try: - self.callback() - except Exception: - app_log.error("Error in periodic callback", exc_info=True) - self._schedule_next() - - def _schedule_next(self): - if self._running: - current_time = self.io_loop.time() - while self._next_timeout <= current_time: - self._next_timeout += self.callback_time / 1000.0 - self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) |