diff options
Diffstat (limited to 'external_libs/python/zmq/auth/base.py')
-rw-r--r-- | external_libs/python/zmq/auth/base.py | 272 |
1 files changed, 0 insertions, 272 deletions
diff --git a/external_libs/python/zmq/auth/base.py b/external_libs/python/zmq/auth/base.py deleted file mode 100644 index 9b4aaed7..00000000 --- a/external_libs/python/zmq/auth/base.py +++ /dev/null @@ -1,272 +0,0 @@ -"""Base implementation of 0MQ authentication.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -import logging - -import zmq -from zmq.utils import z85 -from zmq.utils.strtypes import bytes, unicode, b, u -from zmq.error import _check_version - -from .certs import load_certificates - - -CURVE_ALLOW_ANY = '*' -VERSION = b'1.0' - -class Authenticator(object): - """Implementation of ZAP authentication for zmq connections. - - Note: - - libzmq provides four levels of security: default NULL (which the Authenticator does - not see), and authenticated NULL, PLAIN, and CURVE, which the Authenticator can see. - - until you add policies, all incoming NULL connections are allowed - (classic ZeroMQ behavior), and all PLAIN and CURVE connections are denied. - """ - - def __init__(self, context=None, encoding='utf-8', log=None): - _check_version((4,0), "security") - self.context = context or zmq.Context.instance() - self.encoding = encoding - self.allow_any = False - self.zap_socket = None - self.whitelist = set() - self.blacklist = set() - # passwords is a dict keyed by domain and contains values - # of dicts with username:password pairs. - self.passwords = {} - # certs is dict keyed by domain and contains values - # of dicts keyed by the public keys from the specified location. - self.certs = {} - self.log = log or logging.getLogger('zmq.auth') - - def start(self): - """Create and bind the ZAP socket""" - self.zap_socket = self.context.socket(zmq.REP) - self.zap_socket.linger = 1 - self.zap_socket.bind("inproc://zeromq.zap.01") - - def stop(self): - """Close the ZAP socket""" - if self.zap_socket: - self.zap_socket.close() - self.zap_socket = None - - def allow(self, *addresses): - """Allow (whitelist) IP address(es). - - Connections from addresses not in the whitelist will be rejected. - - - For NULL, all clients from this address will be accepted. - - For PLAIN and CURVE, they will be allowed to continue with authentication. - - whitelist is mutually exclusive with blacklist. - """ - if self.blacklist: - raise ValueError("Only use a whitelist or a blacklist, not both") - self.whitelist.update(addresses) - - def deny(self, *addresses): - """Deny (blacklist) IP address(es). - - Addresses not in the blacklist will be allowed to continue with authentication. - - Blacklist is mutually exclusive with whitelist. - """ - if self.whitelist: - raise ValueError("Only use a whitelist or a blacklist, not both") - self.blacklist.update(addresses) - - def configure_plain(self, domain='*', passwords=None): - """Configure PLAIN authentication for a given domain. - - PLAIN authentication uses a plain-text password file. - To cover all domains, use "*". - You can modify the password file at any time; it is reloaded automatically. - """ - if passwords: - self.passwords[domain] = passwords - - def configure_curve(self, domain='*', location=None): - """Configure CURVE authentication for a given domain. - - CURVE authentication uses a directory that holds all public client certificates, - i.e. their public keys. - - To cover all domains, use "*". - - You can add and remove certificates in that directory at any time. - - To allow all client keys without checking, specify CURVE_ALLOW_ANY for the location. - """ - # If location is CURVE_ALLOW_ANY then allow all clients. Otherwise - # treat location as a directory that holds the certificates. - if location == CURVE_ALLOW_ANY: - self.allow_any = True - else: - self.allow_any = False - try: - self.certs[domain] = load_certificates(location) - except Exception as e: - self.log.error("Failed to load CURVE certs from %s: %s", location, e) - - def handle_zap_message(self, msg): - """Perform ZAP authentication""" - if len(msg) < 6: - self.log.error("Invalid ZAP message, not enough frames: %r", msg) - if len(msg) < 2: - self.log.error("Not enough information to reply") - else: - self._send_zap_reply(msg[1], b"400", b"Not enough frames") - return - - version, request_id, domain, address, identity, mechanism = msg[:6] - credentials = msg[6:] - - domain = u(domain, self.encoding, 'replace') - address = u(address, self.encoding, 'replace') - - if (version != VERSION): - self.log.error("Invalid ZAP version: %r", msg) - self._send_zap_reply(request_id, b"400", b"Invalid version") - return - - self.log.debug("version: %r, request_id: %r, domain: %r," - " address: %r, identity: %r, mechanism: %r", - version, request_id, domain, - address, identity, mechanism, - ) - - - # Is address is explicitly whitelisted or blacklisted? - allowed = False - denied = False - reason = b"NO ACCESS" - - if self.whitelist: - if address in self.whitelist: - allowed = True - self.log.debug("PASSED (whitelist) address=%s", address) - else: - denied = True - reason = b"Address not in whitelist" - self.log.debug("DENIED (not in whitelist) address=%s", address) - - elif self.blacklist: - if address in self.blacklist: - denied = True - reason = b"Address is blacklisted" - self.log.debug("DENIED (blacklist) address=%s", address) - else: - allowed = True - self.log.debug("PASSED (not in blacklist) address=%s", address) - - # Perform authentication mechanism-specific checks if necessary - username = u("user") - if not denied: - - if mechanism == b'NULL' and not allowed: - # For NULL, we allow if the address wasn't blacklisted - self.log.debug("ALLOWED (NULL)") - allowed = True - - elif mechanism == b'PLAIN': - # For PLAIN, even a whitelisted address must authenticate - if len(credentials) != 2: - self.log.error("Invalid PLAIN credentials: %r", credentials) - self._send_zap_reply(request_id, b"400", b"Invalid credentials") - return - username, password = [ u(c, self.encoding, 'replace') for c in credentials ] - allowed, reason = self._authenticate_plain(domain, username, password) - - elif mechanism == b'CURVE': - # For CURVE, even a whitelisted address must authenticate - if len(credentials) != 1: - self.log.error("Invalid CURVE credentials: %r", credentials) - self._send_zap_reply(request_id, b"400", b"Invalid credentials") - return - key = credentials[0] - allowed, reason = self._authenticate_curve(domain, key) - - if allowed: - self._send_zap_reply(request_id, b"200", b"OK", username) - else: - self._send_zap_reply(request_id, b"400", reason) - - def _authenticate_plain(self, domain, username, password): - """PLAIN ZAP authentication""" - allowed = False - reason = b"" - if self.passwords: - # If no domain is not specified then use the default domain - if not domain: - domain = '*' - - if domain in self.passwords: - if username in self.passwords[domain]: - if password == self.passwords[domain][username]: - allowed = True - else: - reason = b"Invalid password" - else: - reason = b"Invalid username" - else: - reason = b"Invalid domain" - - if allowed: - self.log.debug("ALLOWED (PLAIN) domain=%s username=%s password=%s", - domain, username, password, - ) - else: - self.log.debug("DENIED %s", reason) - - else: - reason = b"No passwords defined" - self.log.debug("DENIED (PLAIN) %s", reason) - - return allowed, reason - - def _authenticate_curve(self, domain, client_key): - """CURVE ZAP authentication""" - allowed = False - reason = b"" - if self.allow_any: - allowed = True - reason = b"OK" - self.log.debug("ALLOWED (CURVE allow any client)") - else: - # If no explicit domain is specified then use the default domain - if not domain: - domain = '*' - - if domain in self.certs: - # The certs dict stores keys in z85 format, convert binary key to z85 bytes - z85_client_key = z85.encode(client_key) - if z85_client_key in self.certs[domain] or self.certs[domain] == b'OK': - allowed = True - reason = b"OK" - else: - reason = b"Unknown key" - - status = "ALLOWED" if allowed else "DENIED" - self.log.debug("%s (CURVE) domain=%s client_key=%s", - status, domain, z85_client_key, - ) - else: - reason = b"Unknown domain" - - return allowed, reason - - def _send_zap_reply(self, request_id, status_code, status_text, user_id='user'): - """Send a ZAP reply to finish the authentication.""" - user_id = user_id if status_code == b'200' else b'' - if isinstance(user_id, unicode): - user_id = user_id.encode(self.encoding, 'replace') - metadata = b'' # not currently used - self.log.debug("ZAP reply code=%s text=%s", status_code, status_text) - reply = [VERSION, request_id, status_code, status_text, user_id, metadata] - self.zap_socket.send_multipart(reply) - -__all__ = ['Authenticator', 'CURVE_ALLOW_ANY'] |