diff options
Diffstat (limited to 'src/console/zmq/sugar/socket.py')
-rwxr-xr-x | src/console/zmq/sugar/socket.py | 495 |
1 files changed, 0 insertions, 495 deletions
diff --git a/src/console/zmq/sugar/socket.py b/src/console/zmq/sugar/socket.py deleted file mode 100755 index c91589d7..00000000 --- a/src/console/zmq/sugar/socket.py +++ /dev/null @@ -1,495 +0,0 @@ -# coding: utf-8 -"""0MQ Socket pure Python methods.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - - -import codecs -import random -import warnings - -import zmq -from zmq.backend import Socket as SocketBase -from .poll import Poller -from . import constants -from .attrsettr import AttributeSetter -from zmq.error import ZMQError, ZMQBindError -from zmq.utils import jsonapi -from zmq.utils.strtypes import bytes,unicode,basestring -from zmq.utils.interop import cast_int_addr - -from .constants import ( - SNDMORE, ENOTSUP, POLLIN, - int64_sockopt_names, - int_sockopt_names, - bytes_sockopt_names, - fd_sockopt_names, -) -try: - import cPickle - pickle = cPickle -except: - cPickle = None - import pickle - -try: - DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL -except AttributeError: - DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL - - -class Socket(SocketBase, AttributeSetter): - """The ZMQ socket object - - To create a Socket, first create a Context:: - - ctx = zmq.Context.instance() - - then call ``ctx.socket(socket_type)``:: - - s = ctx.socket(zmq.ROUTER) - - """ - _shadow = False - - def __del__(self): - if not self._shadow: - self.close() - - # socket as context manager: - def __enter__(self): - """Sockets are context managers - - .. versionadded:: 14.4 - """ - return self - - def __exit__(self, *args, **kwargs): - self.close() - - #------------------------------------------------------------------------- - # Socket creation - #------------------------------------------------------------------------- - - @classmethod - def shadow(cls, address): - """Shadow an existing libzmq socket - - address is the integer address of the libzmq socket - or an FFI pointer to it. - - .. versionadded:: 14.1 - """ - address = cast_int_addr(address) - return cls(shadow=address) - - #------------------------------------------------------------------------- - # Deprecated aliases - #------------------------------------------------------------------------- - - @property - def socket_type(self): - warnings.warn("Socket.socket_type is deprecated, use Socket.type", - DeprecationWarning - ) - return self.type - - #------------------------------------------------------------------------- - # Hooks for sockopt completion - #------------------------------------------------------------------------- - - def __dir__(self): - keys = dir(self.__class__) - for collection in ( - bytes_sockopt_names, - int_sockopt_names, - int64_sockopt_names, - fd_sockopt_names, - ): - keys.extend(collection) - return keys - - #------------------------------------------------------------------------- - # Getting/Setting options - #------------------------------------------------------------------------- - setsockopt = SocketBase.set - getsockopt = SocketBase.get - - def set_string(self, option, optval, encoding='utf-8'): - """set socket options with a unicode object - - This is simply a wrapper for setsockopt to protect from encoding ambiguity. - - See the 0MQ documentation for details on specific options. - - Parameters - ---------- - option : int - The name of the option to set. Can be any of: SUBSCRIBE, - UNSUBSCRIBE, IDENTITY - optval : unicode string (unicode on py2, str on py3) - The value of the option to set. - encoding : str - The encoding to be used, default is utf8 - """ - if not isinstance(optval, unicode): - raise TypeError("unicode strings only") - return self.set(option, optval.encode(encoding)) - - setsockopt_unicode = setsockopt_string = set_string - - def get_string(self, option, encoding='utf-8'): - """get the value of a socket option - - See the 0MQ documentation for details on specific options. - - Parameters - ---------- - option : int - The option to retrieve. - - Returns - ------- - optval : unicode string (unicode on py2, str on py3) - The value of the option as a unicode string. - """ - - if option not in constants.bytes_sockopts: - raise TypeError("option %i will not return a string to be decoded"%option) - return self.getsockopt(option).decode(encoding) - - getsockopt_unicode = getsockopt_string = get_string - - def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100): - """bind this socket to a random port in a range - - Parameters - ---------- - addr : str - The address string without the port to pass to ``Socket.bind()``. - min_port : int, optional - The minimum port in the range of ports to try (inclusive). - max_port : int, optional - The maximum port in the range of ports to try (exclusive). - max_tries : int, optional - The maximum number of bind attempts to make. - - Returns - ------- - port : int - The port the socket was bound to. - - Raises - ------ - ZMQBindError - if `max_tries` reached before successful bind - """ - for i in range(max_tries): - try: - port = random.randrange(min_port, max_port) - self.bind('%s:%s' % (addr, port)) - except ZMQError as exception: - if not exception.errno == zmq.EADDRINUSE: - raise - else: - return port - raise ZMQBindError("Could not bind socket to random port.") - - def get_hwm(self): - """get the High Water Mark - - On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM - """ - major = zmq.zmq_version_info()[0] - if major >= 3: - # return sndhwm, fallback on rcvhwm - try: - return self.getsockopt(zmq.SNDHWM) - except zmq.ZMQError as e: - pass - - return self.getsockopt(zmq.RCVHWM) - else: - return self.getsockopt(zmq.HWM) - - def set_hwm(self, value): - """set the High Water Mark - - On libzmq ≥ 3, this sets both SNDHWM and RCVHWM - """ - major = zmq.zmq_version_info()[0] - if major >= 3: - raised = None - try: - self.sndhwm = value - except Exception as e: - raised = e - try: - self.rcvhwm = value - except Exception: - raised = e - - if raised: - raise raised - else: - return self.setsockopt(zmq.HWM, value) - - hwm = property(get_hwm, set_hwm, - """property for High Water Mark - - Setting hwm sets both SNDHWM and RCVHWM as appropriate. - It gets SNDHWM if available, otherwise RCVHWM. - """ - ) - - #------------------------------------------------------------------------- - # Sending and receiving messages - #------------------------------------------------------------------------- - - def send_multipart(self, msg_parts, flags=0, copy=True, track=False): - """send a sequence of buffers as a multipart message - - The zmq.SNDMORE flag is added to all msg parts before the last. - - Parameters - ---------- - msg_parts : iterable - A sequence of objects to send as a multipart message. Each element - can be any sendable object (Frame, bytes, buffer-providers) - flags : int, optional - SNDMORE is handled automatically for frames before the last. - copy : bool, optional - Should the frame(s) be sent in a copying or non-copying manner. - track : bool, optional - Should the frame(s) be tracked for notification that ZMQ has - finished with it (ignored if copy=True). - - Returns - ------- - None : if copy or not track - MessageTracker : if track and not copy - a MessageTracker object, whose `pending` property will - be True until the last send is completed. - """ - for msg in msg_parts[:-1]: - self.send(msg, SNDMORE|flags, copy=copy, track=track) - # Send the last part without the extra SNDMORE flag. - return self.send(msg_parts[-1], flags, copy=copy, track=track) - - def recv_multipart(self, flags=0, copy=True, track=False): - """receive a multipart message as a list of bytes or Frame objects - - Parameters - ---------- - flags : int, optional - Any supported flag: NOBLOCK. If NOBLOCK is set, this method - will raise a ZMQError with EAGAIN if a message is not ready. - If NOBLOCK is not set, then this method will block until a - message arrives. - copy : bool, optional - Should the message frame(s) be received in a copying or non-copying manner? - If False a Frame object is returned for each part, if True a copy of - the bytes is made for each frame. - track : bool, optional - Should the message frame(s) be tracked for notification that ZMQ has - finished with it? (ignored if copy=True) - - Returns - ------- - msg_parts : list - A list of frames in the multipart message; either Frames or bytes, - depending on `copy`. - - """ - parts = [self.recv(flags, copy=copy, track=track)] - # have first part already, only loop while more to receive - while self.getsockopt(zmq.RCVMORE): - part = self.recv(flags, copy=copy, track=track) - parts.append(part) - - return parts - - def send_string(self, u, flags=0, copy=True, encoding='utf-8'): - """send a Python unicode string as a message with an encoding - - 0MQ communicates with raw bytes, so you must encode/decode - text (unicode on py2, str on py3) around 0MQ. - - Parameters - ---------- - u : Python unicode string (unicode on py2, str on py3) - The unicode string to send. - flags : int, optional - Any valid send flag. - encoding : str [default: 'utf-8'] - The encoding to be used - """ - if not isinstance(u, basestring): - raise TypeError("unicode/str objects only") - return self.send(u.encode(encoding), flags=flags, copy=copy) - - send_unicode = send_string - - def recv_string(self, flags=0, encoding='utf-8'): - """receive a unicode string, as sent by send_string - - Parameters - ---------- - flags : int - Any valid recv flag. - encoding : str [default: 'utf-8'] - The encoding to be used - - Returns - ------- - s : unicode string (unicode on py2, str on py3) - The Python unicode string that arrives as encoded bytes. - """ - b = self.recv(flags=flags) - return b.decode(encoding) - - recv_unicode = recv_string - - def send_pyobj(self, obj, flags=0, protocol=DEFAULT_PROTOCOL): - """send a Python object as a message using pickle to serialize - - Parameters - ---------- - obj : Python object - The Python object to send. - flags : int - Any valid send flag. - protocol : int - The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOl - where defined, and pickle.HIGHEST_PROTOCOL elsewhere. - """ - msg = pickle.dumps(obj, protocol) - return self.send(msg, flags) - - def recv_pyobj(self, flags=0): - """receive a Python object as a message using pickle to serialize - - Parameters - ---------- - flags : int - Any valid recv flag. - - Returns - ------- - obj : Python object - The Python object that arrives as a message. - """ - s = self.recv(flags) - return pickle.loads(s) - - def send_json(self, obj, flags=0, **kwargs): - """send a Python object as a message using json to serialize - - Keyword arguments are passed on to json.dumps - - Parameters - ---------- - obj : Python object - The Python object to send - flags : int - Any valid send flag - """ - msg = jsonapi.dumps(obj, **kwargs) - return self.send(msg, flags) - - def recv_json(self, flags=0, **kwargs): - """receive a Python object as a message using json to serialize - - Keyword arguments are passed on to json.loads - - Parameters - ---------- - flags : int - Any valid recv flag. - - Returns - ------- - obj : Python object - The Python object that arrives as a message. - """ - msg = self.recv(flags) - return jsonapi.loads(msg, **kwargs) - - _poller_class = Poller - - def poll(self, timeout=None, flags=POLLIN): - """poll the socket for events - - The default is to poll forever for incoming - events. Timeout is in milliseconds, if specified. - - Parameters - ---------- - timeout : int [default: None] - The timeout (in milliseconds) to wait for an event. If unspecified - (or specified None), will wait forever for an event. - flags : bitfield (int) [default: POLLIN] - The event flags to poll for (any combination of POLLIN|POLLOUT). - The default is to check for incoming events (POLLIN). - - Returns - ------- - events : bitfield (int) - The events that are ready and waiting. Will be 0 if no events were ready - by the time timeout was reached. - """ - - if self.closed: - raise ZMQError(ENOTSUP) - - p = self._poller_class() - p.register(self, flags) - evts = dict(p.poll(timeout)) - # return 0 if no events, otherwise return event bitfield - return evts.get(self, 0) - - def get_monitor_socket(self, events=None, addr=None): - """Return a connected PAIR socket ready to receive the event notifications. - - .. versionadded:: libzmq-4.0 - .. versionadded:: 14.0 - - Parameters - ---------- - events : bitfield (int) [default: ZMQ_EVENTS_ALL] - The bitmask defining which events are wanted. - addr : string [default: None] - The optional endpoint for the monitoring sockets. - - Returns - ------- - socket : (PAIR) - The socket is already connected and ready to receive messages. - """ - # safe-guard, method only available on libzmq >= 4 - if zmq.zmq_version_info() < (4,): - raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()) - if addr is None: - # create endpoint name from internal fd - addr = "inproc://monitor.s-%d" % self.FD - if events is None: - # use all events - events = zmq.EVENT_ALL - # attach monitoring socket - self.monitor(addr, events) - # create new PAIR socket and connect it - ret = self.context.socket(zmq.PAIR) - ret.connect(addr) - return ret - - def disable_monitor(self): - """Shutdown the PAIR socket (created using get_monitor_socket) - that is serving socket events. - - .. versionadded:: 14.4 - """ - self.monitor(None, 0) - - -__all__ = ['Socket'] |