aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_pnat.py
blob: 970249489d35894b532bd7c6ff66f6e84de1c8d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applic
#!/usr/bin/env python3
"""Policy 1:1 NAT functional tests"""

import unittest
from scapy.layers.inet import Ether, IP, UDP, ICMP
from framework import VppTestCase, VppTestRunner
from vpp_papi import VppEnum


class TestPNAT(VppTestCase):
    """PNAT Test Case"""

    maxDiff = None

    @classmethod
    def setUpClass(cls):
        super(TestPNAT, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        cls.interfaces = list(cls.pg_interfaces)

    @classmethod
    def tearDownClass(cls):
        super(TestPNAT, cls).tearDownClass()

    def setUp(self):
        super(TestPNAT, self).setUp()
        for i in self.interfaces:
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

    def tearDown(self):
        super(TestPNAT, self).tearDown()
        if not self.vpp_dead:
            for i in self.pg_interfaces:
                i.unconfig_ip4()
                i.admin_down()

    def validate(self, rx, expected):
        self.assertEqual(rx, expected.__class__(expected))

    def validate_bytes(self, rx, expected):
        self.assertEqual(rx, expected)

    def ping_check(self):
        """Verify non matching traffic works."""
        p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)

        icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
        reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
            type="echo-reply"
        )
        rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
        for p in rx:
            reply[IP].id = p[IP].id
            self.validate(p[1], reply)

    def test_pnat(self):
        """PNAT test"""

        PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
        PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT

        tests = [
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {
                    "mask": 0xA,
                    "dst": "10.10.10.10",
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
                ),
                "reply": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=6871)
                ),
            },
            {
                "input": PNAT_IP4_OUTPUT,
                "sw_if_index": self.pg1.sw_if_index,
                "match": {
                    "mask": 0x9,
                    "src": self.pg0.remote_ip4,
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=6871)
                ),
                "reply": (
                    IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
                ),
            },
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {
                    "mask": 0xA,
                    "dst": "10.10.10.10",
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
                    / UDP(sport=65530, dport=6871)
                ),
                "reply": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(sport=65530, dport=5555)
                ),
            },
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {
                    "mask": 0xA,
                    "dst": self.pg1.remote_ip4,
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0x8, "dport": 5555},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=6871, chksum=0)
                ),
                "reply": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=5555, chksum=0)
                ),
            },
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
                "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
                "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
                "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
            },
        ]

        p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
        for t in tests:
            rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
            self.vapi.pnat_binding_attach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )

            reply = t["reply"]
            reply[IP].ttl -= 1
            rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
            for p in rx:
                # p.show2()
                self.validate(p[1], reply)

            self.ping_check()

            self.vapi.pnat_binding_detach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )
            self.vapi.pnat_binding_del(binding_index=rv.binding_index)

    def test_pnat_show(self):
        """PNAT show tests"""

        PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
        PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT

        tests = [
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {
                    "mask": 0xA,
                    "dst": "10.10.10.10",
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
                ),
                "reply": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=6871)
                ),
            },
            {
                "input": PNAT_IP4_OUTPUT,
                "sw_if_index": self.pg1.sw_if_index,
                "match": {
                    "mask": 0x9,
                    "src": self.pg0.remote_ip4,
                    "proto": 17,
                    "dport": 6871,
                },
                "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
                "send": (
                    IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
                    / UDP(dport=6871)
                ),
                "reply": (
                    IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
                ),
            },
        ]
        binding_index = []
        for t in tests:
            rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
            binding_index.append(rv.binding_index)
            self.vapi.pnat_binding_attach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )

        rv, l = self.vapi.pnat_bindings_get()
        self.assertEqual(len(l), len(tests))

        rv, l = self.vapi.pnat_interfaces_get()
        self.assertEqual(len(l), 2)

        self.logger.info(self.vapi.cli("show pnat translations"))
        self.logger.info(self.vapi.cli("show pnat interfaces"))

        for i, t in enumerate(tests):
            self.vapi.pnat_binding_detach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=binding_index[i],
            )
            self.vapi.pnat_binding_del(binding_index=binding_index[i])

    def test_pnat_wildcard_proto(self):
        """
        PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
        handler
        """

        PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
        PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT

        tests = [
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {"mask": 0x2, "dst": "10.10.10.10"},
                "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
                "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
                "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
            },
            {
                "input": PNAT_IP4_OUTPUT,
                "sw_if_index": self.pg1.sw_if_index,
                "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
                "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
                "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
                "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
            },
        ]

        p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
        for t in tests:
            rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
            self.vapi.pnat_binding_attach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )

            reply = t["reply"]
            reply[IP].ttl -= 1
            rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
            for p in rx:
                self.validate(p[1], reply)

            self.ping_check()

            self.vapi.pnat_binding_detach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )
            self.vapi.pnat_binding_del(binding_index=rv.binding_index)

    def test_pnat_wildcard_proto_v2(self):
        """PNAT test wildcard IP protocol using pnat_binding_add_v2"""

        PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
        PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT

        tests = [
            {
                "input": PNAT_IP4_INPUT,
                "sw_if_index": self.pg0.sw_if_index,
                "match": {"mask": 0x42, "dst": "10.10.10.10"},
                "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
                "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
                "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
            },
            {
                "input": PNAT_IP4_OUTPUT,
                "sw_if_index": self.pg1.sw_if_index,
                "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
                "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
                "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
                "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
            },
        ]

        p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
        for t in tests:
            rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
            self.vapi.pnat_binding_attach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )

            reply = t["reply"]
            reply[IP].ttl -= 1
            rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
            for p in rx:
                self.validate(p[1], reply)

            self.ping_check()

            self.vapi.pnat_binding_detach(
                sw_if_index=t["sw_if_index"],
                attachment=t["input"],
                binding_index=rv.binding_index,
            )
            self.vapi.pnat_binding_del(binding_index=rv.binding_index)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)