1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
"""0MQ utils."""
#
# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
#
# This file is part of pyzmq.
#
# pyzmq is free software; you can redistribute it and/or modify it under
# the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# pyzmq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Lesser GNU General Public License for more details.
#
# You should have received a copy of the Lesser GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from libzmq cimport (
zmq_stopwatch_start, zmq_stopwatch_stop, zmq_sleep, zmq_curve_keypair,
zmq_has, const_char_ptr
)
from zmq.error import ZMQError, _check_rc, _check_version
from zmq.utils.strtypes import unicode
def has(capability):
"""Check for zmq capability by name (e.g. 'ipc', 'curve')
.. versionadded:: libzmq-4.1
.. versionadded:: 14.1
"""
_check_version((4,1), 'zmq.has')
cdef bytes ccap
if isinstance(capability, unicode):
capability = capability.encode('utf8')
ccap = capability
return bool(zmq_has(ccap))
def curve_keypair():
"""generate a Z85 keypair for use with zmq.CURVE security
Requires libzmq (≥ 4.0) to have been linked with libsodium.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Returns
-------
(public, secret) : two bytestrings
The public and private keypair as 40 byte z85-encoded bytestrings.
"""
cdef int rc
cdef char[64] public_key
cdef char[64] secret_key
_check_version((4,0), "curve_keypair")
rc = zmq_curve_keypair (public_key, secret_key)
_check_rc(rc)
return public_key, secret_key
cdef class Stopwatch:
"""Stopwatch()
A simple stopwatch based on zmq_stopwatch_start/stop.
This class should be used for benchmarking and timing 0MQ code.
"""
def __cinit__(self):
self.watch = NULL
def __dealloc__(self):
# copy of self.stop() we can't call object methods in dealloc as it
# might already be partially deleted
if self.watch:
zmq_stopwatch_stop(self.watch)
self.watch = NULL
def start(self):
"""s.start()
Start the stopwatch.
"""
if self.watch == NULL:
self.watch = zmq_stopwatch_start()
else:
raise ZMQError('Stopwatch is already running.')
def stop(self):
"""s.stop()
Stop the stopwatch.
Returns
-------
t : unsigned long int
the number of microseconds since ``start()`` was called.
"""
cdef unsigned long time
if self.watch == NULL:
raise ZMQError('Must start the Stopwatch before calling stop.')
else:
time = zmq_stopwatch_stop(self.watch)
self.watch = NULL
return time
def sleep(self, int seconds):
"""s.sleep(seconds)
Sleep for an integer number of seconds.
"""
with nogil:
zmq_sleep(seconds)
__all__ = ['has', 'curve_keypair', 'Stopwatch']
|