1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# coding: utf-8
"""zmq Context class"""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import weakref
from ._cffi import C, ffi
from .socket import *
from .constants import *
from zmq.error import ZMQError, _check_rc
class Context(object):
_zmq_ctx = None
_iothreads = None
_closed = None
_sockets = None
_shadow = False
def __init__(self, io_threads=1, shadow=None):
if shadow:
self._zmq_ctx = ffi.cast("void *", shadow)
self._shadow = True
else:
self._shadow = False
if not io_threads >= 0:
raise ZMQError(EINVAL)
self._zmq_ctx = C.zmq_ctx_new()
if self._zmq_ctx == ffi.NULL:
raise ZMQError(C.zmq_errno())
if not shadow:
C.zmq_ctx_set(self._zmq_ctx, IO_THREADS, io_threads)
self._closed = False
self._sockets = set()
@property
def underlying(self):
"""The address of the underlying libzmq context"""
return int(ffi.cast('size_t', self._zmq_ctx))
@property
def closed(self):
return self._closed
def _add_socket(self, socket):
ref = weakref.ref(socket)
self._sockets.add(ref)
return ref
def _rm_socket(self, ref):
if ref in self._sockets:
self._sockets.remove(ref)
def set(self, option, value):
"""set a context option
see zmq_ctx_set
"""
rc = C.zmq_ctx_set(self._zmq_ctx, option, value)
_check_rc(rc)
def get(self, option):
"""get context option
see zmq_ctx_get
"""
rc = C.zmq_ctx_get(self._zmq_ctx, option)
_check_rc(rc)
return rc
def term(self):
if self.closed:
return
C.zmq_ctx_destroy(self._zmq_ctx)
self._zmq_ctx = None
self._closed = True
def destroy(self, linger=None):
if self.closed:
return
sockets = self._sockets
self._sockets = set()
for s in sockets:
s = s()
if s and not s.closed:
if linger:
s.setsockopt(LINGER, linger)
s.close()
self.term()
__all__ = ['Context']
|