diff options
author | Dan Klein <danklei@cisco.com> | 2015-08-26 18:16:09 +0300 |
---|---|---|
committer | Dan Klein <danklei@cisco.com> | 2015-08-26 18:16:09 +0300 |
commit | f8ac9d14a989c8cf1535e16165551dfa370b0b74 (patch) | |
tree | 43e396eb5d096ad74ec02afeccf8995a4d241a0f /external_libs/python/zmq/tests/test_auth.py | |
parent | cdcc62972d42f009f55e6aeb2ca5c60c3acd75eb (diff) | |
parent | 53f0e28d7f30c7175cbb15884c309613593859d8 (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'external_libs/python/zmq/tests/test_auth.py')
-rw-r--r-- | external_libs/python/zmq/tests/test_auth.py | 431 |
1 files changed, 0 insertions, 431 deletions
diff --git a/external_libs/python/zmq/tests/test_auth.py b/external_libs/python/zmq/tests/test_auth.py deleted file mode 100644 index d350f61f..00000000 --- a/external_libs/python/zmq/tests/test_auth.py +++ /dev/null @@ -1,431 +0,0 @@ -# -*- coding: utf8 -*- - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -import logging -import os -import shutil -import sys -import tempfile - -import zmq.auth -from zmq.auth.ioloop import IOLoopAuthenticator -from zmq.auth.thread import ThreadAuthenticator - -from zmq.eventloop import ioloop, zmqstream -from zmq.tests import (BaseZMQTestCase, SkipTest) - -class BaseAuthTestCase(BaseZMQTestCase): - def setUp(self): - if zmq.zmq_version_info() < (4,0): - raise SkipTest("security is new in libzmq 4.0") - try: - zmq.curve_keypair() - except zmq.ZMQError: - raise SkipTest("security requires libzmq to be linked against libsodium") - super(BaseAuthTestCase, self).setUp() - # enable debug logging while we run tests - logging.getLogger('zmq.auth').setLevel(logging.DEBUG) - self.auth = self.make_auth() - self.auth.start() - self.base_dir, self.public_keys_dir, self.secret_keys_dir = self.create_certs() - - def make_auth(self): - raise NotImplementedError() - - def tearDown(self): - if self.auth: - self.auth.stop() - self.auth = None - self.remove_certs(self.base_dir) - super(BaseAuthTestCase, self).tearDown() - - def create_certs(self): - """Create CURVE certificates for a test""" - - # Create temporary CURVE keypairs for this test run. We create all keys in a - # temp directory and then move them into the appropriate private or public - # directory. - - base_dir = tempfile.mkdtemp() - keys_dir = os.path.join(base_dir, 'certificates') - public_keys_dir = os.path.join(base_dir, 'public_keys') - secret_keys_dir = os.path.join(base_dir, 'private_keys') - - os.mkdir(keys_dir) - os.mkdir(public_keys_dir) - os.mkdir(secret_keys_dir) - - server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server") - client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client") - - for key_file in os.listdir(keys_dir): - if key_file.endswith(".key"): - shutil.move(os.path.join(keys_dir, key_file), - os.path.join(public_keys_dir, '.')) - - for key_file in os.listdir(keys_dir): - if key_file.endswith(".key_secret"): - shutil.move(os.path.join(keys_dir, key_file), - os.path.join(secret_keys_dir, '.')) - - return (base_dir, public_keys_dir, secret_keys_dir) - - def remove_certs(self, base_dir): - """Remove certificates for a test""" - shutil.rmtree(base_dir) - - def load_certs(self, secret_keys_dir): - """Return server and client certificate keys""" - server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") - client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") - - server_public, server_secret = zmq.auth.load_certificate(server_secret_file) - client_public, client_secret = zmq.auth.load_certificate(client_secret_file) - - return server_public, server_secret, client_public, client_secret - - -class TestThreadAuthentication(BaseAuthTestCase): - """Test authentication running in a thread""" - - def make_auth(self): - return ThreadAuthenticator(self.context) - - def can_connect(self, server, client): - """Check if client can connect to server using tcp transport""" - result = False - iface = 'tcp://127.0.0.1' - port = server.bind_to_random_port(iface) - client.connect("%s:%i" % (iface, port)) - msg = [b"Hello World"] - server.send_multipart(msg) - if client.poll(1000): - rcvd_msg = client.recv_multipart() - self.assertEqual(rcvd_msg, msg) - result = True - return result - - def test_null(self): - """threaded auth - NULL""" - # A default NULL connection should always succeed, and not - # go through our authentication infrastructure at all. - self.auth.stop() - self.auth = None - - server = self.socket(zmq.PUSH) - client = self.socket(zmq.PULL) - self.assertTrue(self.can_connect(server, client)) - - # By setting a domain we switch on authentication for NULL sockets, - # though no policies are configured yet. The client connection - # should still be allowed. - server = self.socket(zmq.PUSH) - server.zap_domain = b'global' - client = self.socket(zmq.PULL) - self.assertTrue(self.can_connect(server, client)) - - def test_blacklist(self): - """threaded auth - Blacklist""" - # Blacklist 127.0.0.1, connection should fail - self.auth.deny('127.0.0.1') - server = self.socket(zmq.PUSH) - # By setting a domain we switch on authentication for NULL sockets, - # though no policies are configured yet. - server.zap_domain = b'global' - client = self.socket(zmq.PULL) - self.assertFalse(self.can_connect(server, client)) - - def test_whitelist(self): - """threaded auth - Whitelist""" - # Whitelist 127.0.0.1, connection should pass" - self.auth.allow('127.0.0.1') - server = self.socket(zmq.PUSH) - # By setting a domain we switch on authentication for NULL sockets, - # though no policies are configured yet. - server.zap_domain = b'global' - client = self.socket(zmq.PULL) - self.assertTrue(self.can_connect(server, client)) - - def test_plain(self): - """threaded auth - PLAIN""" - - # Try PLAIN authentication - without configuring server, connection should fail - server = self.socket(zmq.PUSH) - server.plain_server = True - client = self.socket(zmq.PULL) - client.plain_username = b'admin' - client.plain_password = b'Password' - self.assertFalse(self.can_connect(server, client)) - - # Try PLAIN authentication - with server configured, connection should pass - server = self.socket(zmq.PUSH) - server.plain_server = True - client = self.socket(zmq.PULL) - client.plain_username = b'admin' - client.plain_password = b'Password' - self.auth.configure_plain(domain='*', passwords={'admin': 'Password'}) - self.assertTrue(self.can_connect(server, client)) - - # Try PLAIN authentication - with bogus credentials, connection should fail - server = self.socket(zmq.PUSH) - server.plain_server = True - client = self.socket(zmq.PULL) - client.plain_username = b'admin' - client.plain_password = b'Bogus' - self.assertFalse(self.can_connect(server, client)) - - # Remove authenticator and check that a normal connection works - self.auth.stop() - self.auth = None - - server = self.socket(zmq.PUSH) - client = self.socket(zmq.PULL) - self.assertTrue(self.can_connect(server, client)) - client.close() - server.close() - - def test_curve(self): - """threaded auth - CURVE""" - self.auth.allow('127.0.0.1') - certs = self.load_certs(self.secret_keys_dir) - server_public, server_secret, client_public, client_secret = certs - - #Try CURVE authentication - without configuring server, connection should fail - server = self.socket(zmq.PUSH) - server.curve_publickey = server_public - server.curve_secretkey = server_secret - server.curve_server = True - client = self.socket(zmq.PULL) - client.curve_publickey = client_public - client.curve_secretkey = client_secret - client.curve_serverkey = server_public - self.assertFalse(self.can_connect(server, client)) - - #Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass - self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) - server = self.socket(zmq.PUSH) - server.curve_publickey = server_public - server.curve_secretkey = server_secret - server.curve_server = True - client = self.socket(zmq.PULL) - client.curve_publickey = client_public - client.curve_secretkey = client_secret - client.curve_serverkey = server_public - self.assertTrue(self.can_connect(server, client)) - - # Try CURVE authentication - with server configured, connection should pass - self.auth.configure_curve(domain='*', location=self.public_keys_dir) - server = self.socket(zmq.PUSH) - server.curve_publickey = server_public - server.curve_secretkey = server_secret - server.curve_server = True - client = self.socket(zmq.PULL) - client.curve_publickey = client_public - client.curve_secretkey = client_secret - client.curve_serverkey = server_public - self.assertTrue(self.can_connect(server, client)) - - # Remove authenticator and check that a normal connection works - self.auth.stop() - self.auth = None - - # Try connecting using NULL and no authentication enabled, connection should pass - server = self.socket(zmq.PUSH) - client = self.socket(zmq.PULL) - self.assertTrue(self.can_connect(server, client)) - - -def with_ioloop(method, expect_success=True): - """decorator for running tests with an IOLoop""" - def test_method(self): - r = method(self) - - loop = self.io_loop - if expect_success: - self.pullstream.on_recv(self.on_message_succeed) - else: - self.pullstream.on_recv(self.on_message_fail) - - t = loop.time() - loop.add_callback(self.attempt_connection) - loop.add_callback(self.send_msg) - if expect_success: - loop.add_timeout(t + 1, self.on_test_timeout_fail) - else: - loop.add_timeout(t + 1, self.on_test_timeout_succeed) - - loop.start() - if self.fail_msg: - self.fail(self.fail_msg) - - return r - return test_method - -def should_auth(method): - return with_ioloop(method, True) - -def should_not_auth(method): - return with_ioloop(method, False) - -class TestIOLoopAuthentication(BaseAuthTestCase): - """Test authentication running in ioloop""" - - def setUp(self): - self.fail_msg = None - self.io_loop = ioloop.IOLoop() - super(TestIOLoopAuthentication, self).setUp() - self.server = self.socket(zmq.PUSH) - self.client = self.socket(zmq.PULL) - self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop) - self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop) - - def make_auth(self): - return IOLoopAuthenticator(self.context, io_loop=self.io_loop) - - def tearDown(self): - if self.auth: - self.auth.stop() - self.auth = None - self.io_loop.close(all_fds=True) - super(TestIOLoopAuthentication, self).tearDown() - - def attempt_connection(self): - """Check if client can connect to server using tcp transport""" - iface = 'tcp://127.0.0.1' - port = self.server.bind_to_random_port(iface) - self.client.connect("%s:%i" % (iface, port)) - - def send_msg(self): - """Send a message from server to a client""" - msg = [b"Hello World"] - self.pushstream.send_multipart(msg) - - def on_message_succeed(self, frames): - """A message was received, as expected.""" - if frames != [b"Hello World"]: - self.fail_msg = "Unexpected message received" - self.io_loop.stop() - - def on_message_fail(self, frames): - """A message was received, unexpectedly.""" - self.fail_msg = 'Received messaged unexpectedly, security failed' - self.io_loop.stop() - - def on_test_timeout_succeed(self): - """Test timer expired, indicates test success""" - self.io_loop.stop() - - def on_test_timeout_fail(self): - """Test timer expired, indicates test failure""" - self.fail_msg = 'Test timed out' - self.io_loop.stop() - - @should_auth - def test_none(self): - """ioloop auth - NONE""" - # A default NULL connection should always succeed, and not - # go through our authentication infrastructure at all. - # no auth should be running - self.auth.stop() - self.auth = None - - @should_auth - def test_null(self): - """ioloop auth - NULL""" - # By setting a domain we switch on authentication for NULL sockets, - # though no policies are configured yet. The client connection - # should still be allowed. - self.server.zap_domain = b'global' - - @should_not_auth - def test_blacklist(self): - """ioloop auth - Blacklist""" - # Blacklist 127.0.0.1, connection should fail - self.auth.deny('127.0.0.1') - self.server.zap_domain = b'global' - - @should_auth - def test_whitelist(self): - """ioloop auth - Whitelist""" - # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass" - self.auth.allow('127.0.0.1') - - self.server.setsockopt(zmq.ZAP_DOMAIN, b'global') - - @should_not_auth - def test_plain_unconfigured_server(self): - """ioloop auth - PLAIN, unconfigured server""" - self.client.plain_username = b'admin' - self.client.plain_password = b'Password' - # Try PLAIN authentication - without configuring server, connection should fail - self.server.plain_server = True - - @should_auth - def test_plain_configured_server(self): - """ioloop auth - PLAIN, configured server""" - self.client.plain_username = b'admin' - self.client.plain_password = b'Password' - # Try PLAIN authentication - with server configured, connection should pass - self.server.plain_server = True - self.auth.configure_plain(domain='*', passwords={'admin': 'Password'}) - - @should_not_auth - def test_plain_bogus_credentials(self): - """ioloop auth - PLAIN, bogus credentials""" - self.client.plain_username = b'admin' - self.client.plain_password = b'Bogus' - self.server.plain_server = True - - self.auth.configure_plain(domain='*', passwords={'admin': 'Password'}) - - @should_not_auth - def test_curve_unconfigured_server(self): - """ioloop auth - CURVE, unconfigured server""" - certs = self.load_certs(self.secret_keys_dir) - server_public, server_secret, client_public, client_secret = certs - - self.auth.allow('127.0.0.1') - - self.server.curve_publickey = server_public - self.server.curve_secretkey = server_secret - self.server.curve_server = True - - self.client.curve_publickey = client_public - self.client.curve_secretkey = client_secret - self.client.curve_serverkey = server_public - - @should_auth - def test_curve_allow_any(self): - """ioloop auth - CURVE, CURVE_ALLOW_ANY""" - certs = self.load_certs(self.secret_keys_dir) - server_public, server_secret, client_public, client_secret = certs - - self.auth.allow('127.0.0.1') - self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) - - self.server.curve_publickey = server_public - self.server.curve_secretkey = server_secret - self.server.curve_server = True - - self.client.curve_publickey = client_public - self.client.curve_secretkey = client_secret - self.client.curve_serverkey = server_public - - @should_auth - def test_curve_configured_server(self): - """ioloop auth - CURVE, configured server""" - self.auth.allow('127.0.0.1') - certs = self.load_certs(self.secret_keys_dir) - server_public, server_secret, client_public, client_secret = certs - - self.auth.configure_curve(domain='*', location=self.public_keys_dir) - - self.server.curve_publickey = server_public - self.server.curve_secretkey = server_secret - self.server.curve_server = True - - self.client.curve_publickey = client_public - self.client.curve_secretkey = client_secret - self.client.curve_serverkey = server_public |