1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# coding: utf-8
"""miscellaneous zmq_utils wrapping"""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
from ._cffi import ffi, C
from zmq.error import ZMQError, _check_rc, _check_version
from zmq.utils.strtypes import unicode
def has(capability):
"""Check for zmq capability by name (e.g. 'ipc', 'curve')
.. versionadded:: libzmq-4.1
.. versionadded:: 14.1
"""
_check_version((4,1), 'zmq.has')
if isinstance(capability, unicode):
capability = capability.encode('utf8')
return bool(C.zmq_has(capability))
def curve_keypair():
"""generate a Z85 keypair for use with zmq.CURVE security
Requires libzmq (≥ 4.0) to have been linked with libsodium.
Returns
-------
(public, secret) : two bytestrings
The public and private keypair as 40 byte z85-encoded bytestrings.
"""
_check_version((3,2), "monitor")
public = ffi.new('char[64]')
private = ffi.new('char[64]')
rc = C.zmq_curve_keypair(public, private)
_check_rc(rc)
return ffi.buffer(public)[:40], ffi.buffer(private)[:40]
class Stopwatch(object):
def __init__(self):
self.watch = ffi.NULL
def start(self):
if self.watch == ffi.NULL:
self.watch = C.zmq_stopwatch_start()
else:
raise ZMQError('Stopwatch is already runing.')
def stop(self):
if self.watch == ffi.NULL:
raise ZMQError('Must start the Stopwatch before calling stop.')
else:
time = C.zmq_stopwatch_stop(self.watch)
self.watch = ffi.NULL
return time
def sleep(self, seconds):
C.zmq_sleep(seconds)
__all__ = ['has', 'curve_keypair', 'Stopwatch']
|