summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_nat.py
blob: 7a5aca6bb7b677c007cf635ad00b97c784c9f007 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@media only all and (prefers-color-scheme: dark) {
.highlight .hll { background-color: #49483e }
.highlight .c { color: #75715e } /* Comment */
.highlight .err { color: #960050; background-color: #1e0010 } /* Error */
.highlight .k { color: #66d9ef } /* Keyword */
.highlight .l { color: #ae81ff } /* Literal */
.highlight .n { color: #f8f8f2 } /* Name */
.highlight .o { color: #f92672 } /* Operator */
.highlight .p { color: #f8f8f2 } /* Punctuation */
.highlight .ch { color: #75715e } /* Comment.Hashbang */
.highlight .cm { color: #75715e } /* Comment.Multiline */
.highlight .cp { color: #75715e } /* Comment.Preproc */
.highlight .cpf { color: #75715e } /* Comment.PreprocFile */
.highlight .c1 { color: #75715e } /* Comment.Single */
.highlight .cs { color: #75715e } /* Comment.Special */
.highlight .gd { color: #f92672 } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gi { color: #a6e22e } /* Generic.Inserted */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #75715e } /* Generic.Subheading */
.highlight .kc { color: #66d9ef } /* Keyword.Constant */
.highlight .kd { color: #66d9ef } /* Keyword.Declaration */
.highlight .kn { color: #f92672 } /* Keyword.Namespace */
.highlight .kp { color: #66d9ef } /* Keyword.Pseudo */
.highlight .kr { color: #66d9ef } /* Keyword.Reserved */
.highlight .kt { color: #66d9ef } /* Keyword.Type */
.highlight .ld { color: #e6db74 } /* Literal.Date */
.highlight .m { color: #ae81ff } /* Literal.Number */
.highlight .s { color: #e6db74 } /* Literal.String */
.highlight .na { color: #a6e22e } /* Name.Attribute */
.highlight .nb { color: #f8f8f2 } /* Name.Builtin */
.highlight .nc { color: #a6e22e } /* Name.Class */
.highlight .no { color: #66d9ef } /* Name.Constant */
.highlight .nd { color: #a6e22e } /* Name.Decorator */
.highlight .ni { color: #f8f8f2 } /* Name.Entity */
.highlight .ne { color: #a6e22e } /* Name.Exception */
.highlight .nf { color: #a6e22e } /* Name.Function */
.highlight .nl { color: #f8f8f2 } /* Name.Label */
.highlight .nn { color: #f8f8f2 } /* Name.Namespace */
.highlight .nx { color: #a6e22e } /* Name.Other */
.highlight .py { color: #f8f8f2 } /* Name.Property */
.highlight .nt { color: #f92672 } /* Name.Tag */
.highlight .nv { color: #f8f8f2 } /* Name.Variable */
.highlight .ow { color: #f92672 } /* Operator.Word */
.highlight .w { color: #f8f8f2 } /* Text.Whitespace */
.highlight .mb { color: #ae81ff } /* Literal.Number.Bin */
.highlight .mf { color: #ae81ff } /* Literal.Number.Float */
.highlight .mh { color: #ae81ff } /* Literal.Number.Hex */
.highlight .mi { color: #ae81ff } /* Literal.Number.Integer */
.highlight .mo { color: #ae81ff } /* Literal.Number.Oct */
.highlight .sa { color: #e6db74 } /* Literal.String.Affix */
.highlight .sb { color: #e6db74 } /* Literal.String.Backtick */
.highlight .sc { color: #e6db74 } /* Literal.String.Char */
.highlight .dl { color: #e6db74 } /* Literal.String.Delimiter */
.highlight .sd { color: #e6db74 } /* Literal.String.Doc */
.highlight .s2 { color: #e6db74 } /* Literal.String.Double */
.highlight .se { color: #ae81ff } /* Literal.String.Escape */
.highlight .sh { color: #e6db74 } /* Literal.String.Heredoc */
.highlight .si { color: #e6db74 } /* Literal.String.Interpol */
.highlight .sx { color: #e6db74 } /* Literal.String.Other */
.highlight .sr { color: #e6db74 } /* Literal.String.Regex */
.highlight .s1 { color: #e6db74 } /* Literal.String.Single */
.highlight .ss { color: #e6db74 } /* Literal.String.Symbol */
.highlight .bp { color: #f8f8f2 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #a6e22e } /* Name.Function.Magic */
.highlight .vc { color: #f8f8f2 } /* Name.Variable.Class */
.highlight .vg { color: #f8f8f2 } /* Name.Variable.Global */
.highlight .vi { color: #f8f8f2 } /* Name.Variable.Instance */
.highlight .vm { color: #f8f8f2 } /* Name.Variable.Magic */
.highlight .il { color: #ae81ff } /* Literal.Number.Integer.Long */
}
@media (prefers-color-scheme: light) {
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bo
#!/usr/bin/env python

import socket

from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from util import ppp, ppc
from framework import VppTestCase


class IPSecNATTestCase(VppTestCase):
    """ IPSec/NAT

    TRANSPORT MODE:

     ---   encrypt   ---
    |pg2| <-------> |VPP|
     ---   decrypt   ---

    TUNNEL MODE:


     public network  |   private network
     ---   encrypt  ---   plain   ---
    |pg0| <------- |VPP| <------ |pg1|
     ---            ---           ---

     ---   decrypt  ---   plain   ---
    |pg0| -------> |VPP| ------> |pg1|
     ---            ---           ---
    """

    remote_pg0_client_addr = '1.1.1.1'

    @classmethod
    def setUpClass(cls):
        super(IPSecNATTestCase, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        for i in cls.pg_interfaces:
            i.configure_ipv4_neighbors()
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

        cls.tcp_port_in = 6303
        cls.tcp_port_out = 6303
        cls.udp_port_in = 6304
        cls.udp_port_out = 6304
        cls.icmp_id_in = 6305
        cls.icmp_id_out = 6305
        cls.config_esp_tun()
        cls.logger.info(cls.vapi.ppcli("show ipsec"))

    def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
        return [
            # TCP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            TCP(sport=self.tcp_port_in, dport=20),
            # UDP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            UDP(sport=self.udp_port_in, dport=20),
            # ICMP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            ICMP(id=self.icmp_id_in, type='echo-request')
        ]

    def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
        return [
            # TCP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       TCP(dport=self.tcp_port_out, sport=20)),
            # UDP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       UDP(dport=self.udp_port_out, sport=20)),
            # ICMP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       ICMP(id=self.icmp_id_out, type='echo-request'))
        ]

    def verify_capture_plain(self, capture):
        for packet in capture:
            try:
                self.assert_packet_checksums_valid(packet)
                self.assert_equal(packet[IP].src, self.pg0.remote_ip4,
                                  "decrypted packet source address")
                self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                  "decrypted packet destination address")
                if packet.haslayer(TCP):
                    self.assertFalse(
                        packet.haslayer(UDP),
                        "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[TCP].dport, self.tcp_port_in,
                                      "decrypted packet TCP destination port")
                elif packet.haslayer(UDP):
                    if packet[UDP].payload:
                        self.assertFalse(
                            packet[UDP][1].haslayer(UDP),
                            "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[UDP].dport, self.udp_port_in,
                                      "decrypted packet UDP destination port")
                else:
                    self.assertFalse(
                        packet.haslayer(UDP),
                        "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[ICMP].id, self.icmp_id_in,
                                      "decrypted packet ICMP ID")
            except Exception:
                self.logger.error(
                    ppp("Unexpected or invalid plain packet:", packet))
                raise

    def verify_capture_encrypted(self, capture, sa):
        for packet in capture:
            try:
                self.assertIn(ESP, packet[IP])
                decrypt_pkt = sa.decrypt(packet[IP])
                self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                  "encrypted packet source address")
                self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4,
                                  "encrypted packet destination address")
                # if decrypt_pkt.haslayer(TCP):
                #     self.tcp_port_out = decrypt_pkt[TCP].sport
                # elif decrypt_pkt.haslayer(UDP):
                #     self.udp_port_out = decrypt_pkt[UDP].sport
                # else:
                #     self.icmp_id_out = decrypt_pkt[ICMP].id
            except Exception:
                self.logger.error(
                    ppp("Unexpected or invalid encrypted packet:", packet))
                raise

    @classmethod
    def config_esp_tun(cls):
        spd_id = 1
        remote_sa_id = 10
        local_sa_id = 20
        scapy_tun_spi = 1001
        vpp_tun_spi = 1000
        client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr)
        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
        cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, scapy_tun_spi,
                                         cls.pg1.remote_ip4n,
                                         cls.pg0.remote_ip4n,
                                         integrity_key_length=20,
                                         crypto_key_length=16,
                                         protocol=1, udp_encap=1)
        cls.vapi.ipsec_sad_add_del_entry(local_sa_id, vpp_tun_spi,
                                         cls.pg0.remote_ip4n,
                                         cls.pg1.remote_ip4n,
                                         integrity_key_length=20,
                                         crypto_key_length=16,
                                         protocol=1, udp_encap=1)
        cls.vapi.ipsec_spd_add_del(spd_id)
        cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
        l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
                                                     "0.0.0.0")
        l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
                                                   "255.255.255.255")
        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
                                         r_startaddr, r_stopaddr,
                                         protocol=socket.IPPROTO_ESP)
        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
                                         r_startaddr, r_stopaddr,
                                         protocol=socket.IPPROTO_ESP,
                                         is_outbound=0)
        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
                                         r_startaddr, r_stopaddr,
                                         remote_port_start=4500,
                                         remote_port_stop=4500,
                                         protocol=socket.IPPROTO_UDP)
        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
                                         r_startaddr, r_stopaddr,
                                         remote_port_start=4500,
                                         remote_port_stop=4500,
                                         protocol=socket.IPPROTO_UDP,
                                         is_outbound=0)
        l_startaddr = l_stopaddr = cls.pg0.remote_ip4n
        r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
                                         r_startaddr, r_stopaddr,
                                         priority=10, policy=3,
                                         is_outbound=0, sa_id=local_sa_id)
        cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
                                         l_startaddr, l_stopaddr,
                                         priority=10, policy=3,
                                         sa_id=remote_sa_id)

    def test_ipsec_nat_tun(self):
        """ IPSec/NAT tunnel test case """
        local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9,
                                           crypt_algo='AES-CBC',
                                           crypt_key='JPjyOWBeVEQiMe7h',
                                           auth_algo='HMAC-SHA1-96',
                                           auth_key='C91KUR9GYMm5GfkEvNjX',
                                           tunnel_header=IP(
                                               src=self.pg1.remote_ip4,
                                               dst=self.pg0.remote_ip4),
                                           nat_t_header=UDP(
                                               sport=4500,
                                               dport=4500))
        # in2out - from private network to public
        pkts = self.create_stream_plain(
            self.pg1.remote_mac, self.pg1.local_mac,
            self.pg1.remote_ip4, self.pg0.remote_ip4)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_encrypted(capture, local_tun_sa)

        remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8,
                                            crypt_algo='AES-CBC',
                                            crypt_key='JPjyOWBeVEQiMe7h',
                                            auth_algo='HMAC-SHA1-96',
                                            auth_key='C91KUR9GYMm5GfkEvNjX',
                                            tunnel_header=IP(
                                                src=self.pg0.remote_ip4,
                                                dst=self.pg1.remote_ip4),
                                            nat_t_header=UDP(
                                                sport=4500,
                                                dport=4500))

        # out2in - from public network to private
        pkts = self.create_stream_encrypted(
            self.pg0.remote_mac, self.pg0.local_mac,
            self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa)
        self.logger.info(ppc("Sending packets:", pkts))
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_plain(capture)