From 781d71db20b0c5acbe940eff1b1ef2f1b765ce54 Mon Sep 17 00:00:00 2001 From: Yaroslav Brustinov Date: Wed, 1 Feb 2017 21:13:23 +0200 Subject: zmq os independent Change-Id: Iaf5a782be4db26a979a7535454719e8e62b5969a Signed-off-by: Yaroslav Brustinov --- .../python3/cel59/32bit/zmq/green/core.py | 287 --------------------- 1 file changed, 287 deletions(-) delete mode 100644 scripts/external_libs/pyzmq-14.5.0/python3/cel59/32bit/zmq/green/core.py (limited to 'scripts/external_libs/pyzmq-14.5.0/python3/cel59/32bit/zmq/green/core.py') diff --git a/scripts/external_libs/pyzmq-14.5.0/python3/cel59/32bit/zmq/green/core.py b/scripts/external_libs/pyzmq-14.5.0/python3/cel59/32bit/zmq/green/core.py deleted file mode 100644 index 9fc73e32..00000000 --- a/scripts/external_libs/pyzmq-14.5.0/python3/cel59/32bit/zmq/green/core.py +++ /dev/null @@ -1,287 +0,0 @@ -#----------------------------------------------------------------------------- -# Copyright (C) 2011-2012 Travis Cline -# -# This file is part of pyzmq -# It is adapted from upstream project zeromq_gevent under the New BSD License -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq ` to be non blocking -""" - -from __future__ import print_function - -import sys -import time -import warnings - -import zmq - -from zmq import Context as _original_Context -from zmq import Socket as _original_Socket -from .poll import _Poller - -import gevent -from gevent.event import AsyncResult -from gevent.hub import get_hub - -if hasattr(zmq, 'RCVTIMEO'): - TIMEOS = (zmq.RCVTIMEO, zmq.SNDTIMEO) -else: - TIMEOS = () - -def _stop(evt): - """simple wrapper for stopping an Event, allowing for method rename in gevent 1.0""" - try: - evt.stop() - except AttributeError as e: - # gevent<1.0 compat - evt.cancel() - -class _Socket(_original_Socket): - """Green version of :class:`zmq.Socket` - - The following methods are overridden: - - * send - * recv - - To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or receiving - is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised. - - The `__state_changed` method is triggered when the zmq.FD for the socket is - marked as readable and triggers the necessary read and write events (which - are waited for in the recv and send methods). - - Some double underscore prefixes are used to minimize pollution of - :class:`zmq.Socket`'s namespace. - """ - __in_send_multipart = False - __in_recv_multipart = False - __writable = None - __readable = None - _state_event = None - _gevent_bug_timeout = 11.6 # timeout for not trusting gevent - _debug_gevent = False # turn on if you think gevent is missing events - _poller_class = _Poller - - def __init__(self, context, socket_type): - _original_Socket.__init__(self, context, socket_type) - self.__in_send_multipart = False - self.__in_recv_multipart = False - self.__setup_events() - - - def __del__(self): - self.close() - - def close(self, linger=None): - super(_Socket, self).close(linger) - self.__cleanup_events() - - def __cleanup_events(self): - # close the _state_event event, keeps the number of active file descriptors down - if getattr(self, '_state_event', None): - _stop(self._state_event) - self._state_event = None - # if the socket has entered a close state resume any waiting greenlets - self.__writable.set() - self.__readable.set() - - def __setup_events(self): - self.__readable = AsyncResult() - self.__writable = AsyncResult() - self.__readable.set() - self.__writable.set() - - try: - self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher - self._state_event.start(self.__state_changed) - except AttributeError: - # for gevent<1.0 compatibility - from gevent.core import read_event - self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True) - - def __state_changed(self, event=None, _evtype=None): - if self.closed: - self.__cleanup_events() - return - try: - # avoid triggering __state_changed from inside __state_changed - events = super(_Socket, self).getsockopt(zmq.EVENTS) - except zmq.ZMQError as exc: - self.__writable.set_exception(exc) - self.__readable.set_exception(exc) - else: - if events & zmq.POLLOUT: - self.__writable.set() - if events & zmq.POLLIN: - self.__readable.set() - - def _wait_write(self): - assert self.__writable.ready(), "Only one greenlet can be waiting on this event" - self.__writable = AsyncResult() - # timeout is because libzmq cannot be trusted to properly signal a new send event: - # this is effectively a maximum poll interval of 1s - tic = time.time() - dt = self._gevent_bug_timeout - if dt: - timeout = gevent.Timeout(seconds=dt) - else: - timeout = None - try: - if timeout: - timeout.start() - self.__writable.get(block=True) - except gevent.Timeout as t: - if t is not timeout: - raise - toc = time.time() - # gevent bug: get can raise timeout even on clean return - # don't display zmq bug warning for gevent bug (this is getting ridiculous) - if self._debug_gevent and timeout and toc-tic > dt and \ - self.getsockopt(zmq.EVENTS) & zmq.POLLOUT: - print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr) - finally: - if timeout: - timeout.cancel() - self.__writable.set() - - def _wait_read(self): - assert self.__readable.ready(), "Only one greenlet can be waiting on this event" - self.__readable = AsyncResult() - # timeout is because libzmq cannot always be trusted to play nice with libevent. - # I can only confirm that this actually happens for send, but lets be symmetrical - # with our dirty hacks. - # this is effectively a maximum poll interval of 1s - tic = time.time() - dt = self._gevent_bug_timeout - if dt: - timeout = gevent.Timeout(seconds=dt) - else: - timeout = None - try: - if timeout: - timeout.start() - self.__readable.get(block=True) - except gevent.Timeout as t: - if t is not timeout: - raise - toc = time.time() - # gevent bug: get can raise timeout even on clean return - # don't display zmq bug warning for gevent bug (this is getting ridiculous) - if self._debug_gevent and timeout and toc-tic > dt and \ - self.getsockopt(zmq.EVENTS) & zmq.POLLIN: - print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr) - finally: - if timeout: - timeout.cancel() - self.__readable.set() - - def send(self, data, flags=0, copy=True, track=False): - """send, which will only block current greenlet - - state_changed always fires exactly once (success or fail) at the - end of this method. - """ - - # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised - if flags & zmq.NOBLOCK: - try: - msg = super(_Socket, self).send(data, flags, copy, track) - finally: - if not self.__in_send_multipart: - self.__state_changed() - return msg - # ensure the zmq.NOBLOCK flag is part of flags - flags |= zmq.NOBLOCK - while True: # Attempt to complete this operation indefinitely, blocking the current greenlet - try: - # attempt the actual call - msg = super(_Socket, self).send(data, flags, copy, track) - except zmq.ZMQError as e: - # if the raised ZMQError is not EAGAIN, reraise - if e.errno != zmq.EAGAIN: - if not self.__in_send_multipart: - self.__state_changed() - raise - else: - if not self.__in_send_multipart: - self.__state_changed() - return msg - # defer to the event loop until we're notified the socket is writable - self._wait_write() - - def recv(self, flags=0, copy=True, track=False): - """recv, which will only block current greenlet - - state_changed always fires exactly once (success or fail) at the - end of this method. - """ - if flags & zmq.NOBLOCK: - try: - msg = super(_Socket, self).recv(flags, copy, track) - finally: - if not self.__in_recv_multipart: - self.__state_changed() - return msg - - flags |= zmq.NOBLOCK - while True: - try: - msg = super(_Socket, self).recv(flags, copy, track) - except zmq.ZMQError as e: - if e.errno != zmq.EAGAIN: - if not self.__in_recv_multipart: - self.__state_changed() - raise - else: - if not self.__in_recv_multipart: - self.__state_changed() - return msg - self._wait_read() - - def send_multipart(self, *args, **kwargs): - """wrap send_multipart to prevent state_changed on each partial send""" - self.__in_send_multipart = True - try: - msg = super(_Socket, self).send_multipart(*args, **kwargs) - finally: - self.__in_send_multipart = False - self.__state_changed() - return msg - - def recv_multipart(self, *args, **kwargs): - """wrap recv_multipart to prevent state_changed on each partial recv""" - self.__in_recv_multipart = True - try: - msg = super(_Socket, self).recv_multipart(*args, **kwargs) - finally: - self.__in_recv_multipart = False - self.__state_changed() - return msg - - def get(self, opt): - """trigger state_changed on getsockopt(EVENTS)""" - if opt in TIMEOS: - warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning) - optval = super(_Socket, self).get(opt) - if opt == zmq.EVENTS: - self.__state_changed() - return optval - - def set(self, opt, val): - """set socket option""" - if opt in TIMEOS: - warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning) - return super(_Socket, self).set(opt, val) - - -class _Context(_original_Context): - """Replacement for :class:`zmq.Context` - - Ensures that the greened Socket above is used in calls to `socket`. - """ - _socket_class = _Socket -- cgit 1.2.3-korg