1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
|
# -*- coding: utf8 -*-
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import logging
import os
import shutil
import sys
import tempfile
import zmq.auth
from zmq.auth.ioloop import IOLoopAuthenticator
from zmq.auth.thread import ThreadAuthenticator
from zmq.eventloop import ioloop, zmqstream
from zmq.tests import (BaseZMQTestCase, SkipTest)
class BaseAuthTestCase(BaseZMQTestCase):
def setUp(self):
if zmq.zmq_version_info() < (4,0):
raise SkipTest("security is new in libzmq 4.0")
try:
zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("security requires libzmq to be linked against libsodium")
super(BaseAuthTestCase, self).setUp()
# enable debug logging while we run tests
logging.getLogger('zmq.auth').setLevel(logging.DEBUG)
self.auth = self.make_auth()
self.auth.start()
self.base_dir, self.public_keys_dir, self.secret_keys_dir = self.create_certs()
def make_auth(self):
raise NotImplementedError()
def tearDown(self):
if self.auth:
self.auth.stop()
self.auth = None
self.remove_certs(self.base_dir)
super(BaseAuthTestCase, self).tearDown()
def create_certs(self):
"""Create CURVE certificates for a test"""
# Create temporary CURVE keypairs for this test run. We create all keys in a
# temp directory and then move them into the appropriate private or public
# directory.
base_dir = tempfile.mkdtemp()
keys_dir = os.path.join(base_dir, 'certificates')
public_keys_dir = os.path.join(base_dir, 'public_keys')
secret_keys_dir = os.path.join(base_dir, 'private_keys')
os.mkdir(keys_dir)
os.mkdir(public_keys_dir)
os.mkdir(secret_keys_dir)
server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key"):
shutil.move(os.path.join(keys_dir, key_file),
os.path.join(public_keys_dir, '.'))
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key_secret"):
shutil.move(os.path.join(keys_dir, key_file),
os.path.join(secret_keys_dir, '.'))
return (base_dir, public_keys_dir, secret_keys_dir)
def remove_certs(self, base_dir):
"""Remove certificates for a test"""
shutil.rmtree(base_dir)
def load_certs(self, secret_keys_dir):
"""Return server and client certificate keys"""
server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
return server_public, server_secret, client_public, client_secret
class TestThreadAuthentication(BaseAuthTestCase):
"""Test authentication running in a thread"""
def make_auth(self):
return ThreadAuthenticator(self.context)
def can_connect(self, server, client):
"""Check if client can connect to server using tcp transport"""
result = False
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
msg = [b"Hello World"]
server.send_multipart(msg)
if client.poll(1000):
rcvd_msg = client.recv_multipart()
self.assertEqual(rcvd_msg, msg)
result = True
return result
def test_null(self):
"""threaded auth - NULL"""
# A default NULL connection should always succeed, and not
# go through our authentication infrastructure at all.
self.auth.stop()
self.auth = None
server = self.socket(zmq.PUSH)
client = self.socket(zmq.PULL)
self.assertTrue(self.can_connect(server, client))
# By setting a domain we switch on authentication for NULL sockets,
# though no policies are configured yet. The client connection
# should still be allowed.
server = self.socket(zmq.PUSH)
server.zap_domain = b'global'
client = self.socket(zmq.PULL)
self.assertTrue(self.can_connect(server, client))
def test_blacklist(self):
"""threaded auth - Blacklist"""
# Blacklist 127.0.0.1, connection should fail
self.auth.deny('127.0.0.1')
server = self.socket(zmq.PUSH)
# By setting a domain we switch on authentication for NULL sockets,
# though no policies are configured yet.
server.zap_domain = b'global'
client = self.socket(zmq.PULL)
self.assertFalse(self.can_connect(server, client))
def test_whitelist(self):
"""threaded auth - Whitelist"""
# Whitelist 127.0.0.1, connection should pass"
self.auth.allow('127.0.0.1')
server = self.socket(zmq.PUSH)
# By setting a domain we switch on authentication for NULL sockets,
# though no policies are configured yet.
server.zap_domain = b'global'
client = self.socket(zmq.PULL)
self.assertTrue(self.can_connect(server, client))
def test_plain(self):
"""threaded auth - PLAIN"""
# Try PLAIN authentication - without configuring server, connection should fail
server = self.socket(zmq.PUSH)
server.plain_server = True
client = self.socket(zmq.PULL)
client.plain_username = b'admin'
client.plain_password = b'Password'
self.assertFalse(self.can_connect(server, client))
# Try PLAIN authentication - with server configured, connection should pass
server = self.socket(zmq.PUSH)
server.plain_server = True
client = self.socket(zmq.PULL)
client.plain_username = b'admin'
client.plain_password = b'Password'
self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
self.assertTrue(self.can_connect(server, client))
# Try PLAIN authentication - with bogus credentials, connection should fail
server = self.socket(zmq.PUSH)
server.plain_server = True
client = self.socket(zmq.PULL)
client.plain_username = b'admin'
client.plain_password = b'Bogus'
self.assertFalse(self.can_connect(server, client))
# Remove authenticator and check that a normal connection works
self.auth.stop()
self.auth = None
server = self.socket(zmq.PUSH)
client = self.socket(zmq.PULL)
self.assertTrue(self.can_connect(server, client))
client.close()
server.close()
def test_curve(self):
"""threaded auth - CURVE"""
self.auth.allow('127.0.0.1')
certs = self.load_certs(self.secret_keys_dir)
server_public, server_secret, client_public, client_secret = certs
#Try CURVE authentication - without configuring server, connection should fail
server = self.socket(zmq.PUSH)
server.curve_publickey = server_public
server.curve_secretkey = server_secret
server.curve_server = True
client = self.socket(zmq.PULL)
client.curve_publickey = client_public
client.curve_secretkey = client_secret
client.curve_serverkey = server_public
self.assertFalse(self.can_connect(server, client))
#Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass
self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
server = self.socket(zmq.PUSH)
server.curve_publickey = server_public
server.curve_secretkey = server_secret
server.curve_server = True
client = self.socket(zmq.PULL)
client.curve_publickey = client_public
client.curve_secretkey = client_secret
client.curve_serverkey = server_public
self.assertTrue(self.can_connect(server, client))
# Try CURVE authentication - with server configured, connection should pass
self.auth.configure_curve(domain='*', location=self.public_keys_dir)
server = self.socket(zmq.PUSH)
server.curve_publickey = server_public
server.curve_secretkey = server_secret
server.curve_server = True
client = self.socket(zmq.PULL)
client.curve_publickey = client_public
client.curve_secretkey = client_secret
client.curve_serverkey = server_public
self.assertTrue(self.can_connect(server, client))
# Remove authenticator and check that a normal connection works
self.auth.stop()
self.auth = None
# Try connecting using NULL and no authentication enabled, connection should pass
server = self.socket(zmq.PUSH)
client = self.socket(zmq.PULL)
self.assertTrue(self.can_connect(server, client))
def with_ioloop(method, expect_success=True):
"""decorator for running tests with an IOLoop"""
def test_method(self):
r = method(self)
loop = self.io_loop
if expect_success:
self.pullstream.on_recv(self.on_message_succeed)
else:
self.pullstream.on_recv(self.on_message_fail)
t = loop.time()
loop.add_callback(self.attempt_connection)
loop.add_callback(self.send_msg)
if expect_success:
loop.add_timeout(t + 1, self.on_test_timeout_fail)
else:
loop.add_timeout(t + 1, self.on_test_timeout_succeed)
loop.start()
if self.fail_msg:
self.fail(self.fail_msg)
return r
return test_method
def should_auth(method):
return with_ioloop(method, True)
def should_not_auth(method):
return with_ioloop(method, False)
class TestIOLoopAuthentication(BaseAuthTestCase):
"""Test authentication running in ioloop"""
def setUp(self):
self.fail_msg = None
self.io_loop = ioloop.IOLoop()
super(TestIOLoopAuthentication, self).setUp()
self.server = self.socket(zmq.PUSH)
self.client = self.socket(zmq.PULL)
self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop)
self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop)
def make_auth(self):
return IOLoopAuthenticator(self.context, io_loop=self.io_loop)
def tearDown(self):
if self.auth:
self.auth.stop()
self.auth = None
self.io_loop.close(all_fds=True)
super(TestIOLoopAuthentication, self).tearDown()
def attempt_connection(self):
"""Check if client can connect to server using tcp transport"""
iface = 'tcp://127.0.0.1'
port = self.server.bind_to_random_port(iface)
self.client.connect("%s:%i" % (iface, port))
def send_msg(self):
"""Send a message from server to a client"""
msg = [b"Hello World"]
self.pushstream.send_multipart(msg)
def on_message_succeed(self, frames):
"""A message was received, as expected."""
if frames != [b"Hello World"]:
self.fail_msg = "Unexpected message received"
self.io_loop.stop()
def on_message_fail(self, frames):
"""A message was received, unexpectedly."""
self.fail_msg = 'Received messaged unexpectedly, security failed'
self.io_loop.stop()
def on_test_timeout_succeed(self):
"""Test timer expired, indicates test success"""
self.io_loop.stop()
def on_test_timeout_fail(self):
"""Test timer expired, indicates test failure"""
self.fail_msg = 'Test timed out'
self.io_loop.stop()
@should_auth
def test_none(self):
"""ioloop auth - NONE"""
# A default NULL connection should always succeed, and not
# go through our authentication infrastructure at all.
# no auth should be running
self.auth.stop()
self.auth = None
@should_auth
def test_null(self):
"""ioloop auth - NULL"""
# By setting a domain we switch on authentication for NULL sockets,
# though no policies are configured yet. The client connection
# should still be allowed.
self.server.zap_domain = b'global'
@should_not_auth
def test_blacklist(self):
"""ioloop auth - Blacklist"""
# Blacklist 127.0.0.1, connection should fail
self.auth.deny('127.0.0.1')
self.server.zap_domain = b'global'
@should_auth
def test_whitelist(self):
"""ioloop auth - Whitelist"""
# Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
self.auth.allow('127.0.0.1')
self.server.setsockopt(zmq.ZAP_DOMAIN, b'global')
@should_not_auth
def test_plain_unconfigured_server(self):
"""ioloop auth - PLAIN, unconfigured server"""
self.client.plain_username = b'admin'
self.client.plain_password = b'Password'
# Try PLAIN authentication - without configuring server, connection should fail
self.server.plain_server = True
@should_auth
def test_plain_configured_server(self):
"""ioloop auth - PLAIN, configured server"""
self.client.plain_username = b'admin'
self.client.plain_password = b'Password'
# Try PLAIN authentication - with server configured, connection should pass
self.server.plain_server = True
self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
@should_not_auth
def test_plain_bogus_credentials(self):
"""ioloop auth - PLAIN, bogus credentials"""
self.client.plain_username = b'admin'
self.client.plain_password = b'Bogus'
self.server.plain_server = True
self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
@should_not_auth
def test_curve_unconfigured_server(self):
"""ioloop auth - CURVE, unconfigured server"""
certs = self.load_certs(self.secret_keys_dir)
server_public, server_secret, client_public, client_secret = certs
self.auth.allow('127.0.0.1')
self.server.curve_publickey = server_public
self.server.curve_secretkey = server_secret
self.server.curve_server = True
self.client.curve_publickey = client_public
self.client.curve_secretkey = client_secret
self.client.curve_serverkey = server_public
@should_auth
def test_curve_allow_any(self):
"""ioloop auth - CURVE, CURVE_ALLOW_ANY"""
certs = self.load_certs(self.secret_keys_dir)
server_public, server_secret, client_public, client_secret = certs
self.auth.allow('127.0.0.1')
self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
self.server.curve_publickey = server_public
self.server.curve_secretkey = server_secret
self.server.curve_server = True
self.client.curve_publickey = client_public
self.client.curve_secretkey = client_secret
self.client.curve_serverkey = server_public
@should_auth
def test_curve_configured_server(self):
"""ioloop auth - CURVE, configured server"""
self.auth.allow('127.0.0.1')
certs = self.load_certs(self.secret_keys_dir)
server_public, server_secret, client_public, client_secret = certs
self.auth.configure_curve(domain='*', location=self.public_keys_dir)
self.server.curve_publickey = server_public
self.server.curve_secretkey = server_secret
self.server.curve_server = True
self.client.curve_publickey = client_public
self.client.curve_secretkey = client_secret
self.client.curve_serverkey = server_public
|