summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/examples/security/ioloop-ironhouse.py
blob: fbde3062529194f08ae9747195c7bfbee0ad0852 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python

'''
Ironhouse extends Stonehouse with client public key authentication.

This is the strongest security model we have today, protecting against every
attack we know about, except end-point attacks (where an attacker plants
spyware on a machine to capture data before it's encrypted, or after it's
decrypted).

This example demonstrates using the IOLoopAuthenticator.

Author: Chris Laws
'''

import logging
import os
import sys

import zmq
import zmq.auth
from zmq.auth.ioloop import IOLoopAuthenticator
from zmq.eventloop import ioloop, zmqstream

def echo(server, msg):
    logging.debug("server recvd %s", msg)
    reply = msg + [b'World']
    logging.debug("server sending %s", reply)
    server.send_multipart(reply)
    
def setup_server(server_secret_file, endpoint='tcp://127.0.0.1:9000'):
    """setup a simple echo server with CURVE auth"""
    server = zmq.Context.instance().socket(zmq.ROUTER)

    server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
    server.curve_secretkey = server_secret
    server.curve_publickey = server_public
    server.curve_server = True  # must come before bind
    server.bind(endpoint)
    
    server_stream = zmqstream.ZMQStream(server)
    # simple echo
    server_stream.on_recv_stream(echo)
    return server_stream

def client_msg_recvd(msg):
    logging.debug("client recvd %s", msg)
    logging.info("Ironhouse test OK")
    # stop the loop when we get the reply
    ioloop.IOLoop.instance().stop()

def setup_client(client_secret_file, server_public_file, endpoint='tcp://127.0.0.1:9000'):
    """setup a simple client with CURVE auth"""
    
    client = zmq.Context.instance().socket(zmq.DEALER)

    # We need two certificates, one for the client and one for
    # the server. The client must know the server's public key
    # to make a CURVE connection.
    client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
    client.curve_secretkey = client_secret
    client.curve_publickey = client_public

    server_public, _ = zmq.auth.load_certificate(server_public_file)
    # The client must know the server's public key to make a CURVE connection.
    client.curve_serverkey = server_public
    client.connect(endpoint)
    
    client_stream = zmqstream.ZMQStream(client)
    client_stream.on_recv(client_msg_recvd)
    return client_stream
    

def run():
    '''Run Ironhouse example'''

    # These direcotries are generated by the generate_certificates script
    base_dir = os.path.dirname(__file__)
    keys_dir = os.path.join(base_dir, 'certificates')
    public_keys_dir = os.path.join(base_dir, 'public_keys')
    secret_keys_dir = os.path.join(base_dir, 'private_keys')

    if not (os.path.exists(keys_dir) and
            os.path.exists(public_keys_dir) and
            os.path.exists(secret_keys_dir)):
        logging.critical("Certificates are missing - run generate_certificates script first")
        sys.exit(1)

    # Start an authenticator for this context.
    auth = IOLoopAuthenticator()
    auth.allow('127.0.0.1')
    # Tell authenticator to use the certificate in a directory
    auth.configure_curve(domain='*', location=public_keys_dir)
    
    server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
    server = setup_server(server_secret_file)
    server_public_file = os.path.join(public_keys_dir, "server.key")
    client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
    client = setup_client(client_secret_file, server_public_file)
    client.send(b'Hello')
    
    auth.start()
    ioloop.IOLoop.instance().start()

if __name__ == '__main__':
    if zmq.zmq_version_info() < (4,0):
        raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))

    if '-v' in sys.argv:
        level = logging.DEBUG
    else:
        level = logging.INFO

    logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")

    run()