summaryrefslogtreecommitdiffstats
path: root/src/plugins/nat/test
diff options
context:
space:
mode:
authorOle Troan <ot@cisco.com>2021-01-12 21:49:38 +0100
committerNeale Ranns <neale@graphiant.com>2021-02-05 13:27:48 +0000
commit18327be5d458f9f73c12d76e677ee5a068ec6b10 (patch)
treebac6dbc08280e5bd6d5749ea56c862e6cdc38434 /src/plugins/nat/test
parent490b92738f3cc1c8d534abd6dee8dba942cb652d (diff)
nat: 1:1 policy NAT
A NAT sub-plugin doing statically configured match/rewrite on IP4 input or output. It's stateless (no connection tracking). Currently it supports rewriting of SA, DA and TCP/UDP ports. It should be simple to add new rewrites if required. API: pnat_binding_add, pnat_binding_del, pnat_bindings_get, pnat_interfaces_get CLI: set pnat translation interface <name> match <5-tuple> rewrite <5-tuple> {in|out} [del] show pnat translations show pnat interfaces Trying a new C based unit testing scheme. Where the graph node is tested in isolation. See pnat/pnat_test.c. Also added new cmake targets to generate coverage directly. E.g.: make test_pnat-ccov-report File '/vpp/sdnat/src/plugins/nat/pnat/pnat.c': Name Regions Miss Cover Lines Miss Cover ------------------------------------------------------------------------------------ pnat_interface_by_sw_if_index 39 8 79.49% 13 0 100.00% pnat_instructions_from_mask 9 0 100.00% 13 0 100.00% pnat_binding_add 64 8 87.50% 31 2 93.55% pnat_flow_lookup 4 4 0.00% 10 10 0.00% pnat_binding_attach 104 75 27.88% 33 6 81.82% pnat_binding_detach 30 5 83.33% 23 2 91.30% pnat_binding_del 97 33 65.98% 17 3 82.35% pnat.c:pnat_calc_key_from_5tuple 9 1 88.89% 14 1 92.86% pnat.c:pnat_interface_check_mask 10 2 80.00% 11 2 81.82% pnat.c:pnat_enable 5 0 100.00% 11 0 100.00% pnat.c:pnat_enable_interface 107 26 75.70% 60 15 75.00% pnat.c:pnat_disable_interface 91 30 67.03% 32 7 78.12% pnat.c:pnat_disable 7 2 71.43% 13 7 46.15% ------------------------------------------------------------------------------------ TOTAL 576 194 66.32% 281 55 80.43% File '/vpp/sdnat/src/plugins/nat/pnat/pnat_node.h': Name Regions Miss Cover Lines Miss Cover ------------------------------------------------------------------------------------ pnat_test.c:pnat_node_inline 67 11 83.58% 115 1 99.13% pnat_test.c:pnat_calc_key 9 2 77.78% 14 2 85.71% pnat_test.c:pnat_rewrite_ip4 55 11 80.00% 60 12 80.00% pnat_test.c:format_pnat_trace 1 1 0.00% 12 12 0.00% pnat_node.c:pnat_node_inline 63 63 0.00% 115 115 0.00% pnat_node.c:pnat_calc_key 9 9 0.00% 14 14 0.00% pnat_node.c:pnat_rewrite_ip4 55 55 0.00% 60 60 0.00% pnat_node.c:format_pnat_trace 5 5 0.00% 12 12 0.00% ------------------------------------------------------------------------------------ TOTAL 264 157 40.53% 402 228 43.28% Type: feature Change-Id: I9c897f833603054a8303e7369ebff6512517c9e0 Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/plugins/nat/test')
-rw-r--r--src/plugins/nat/test/test_pnat.py203
1 files changed, 203 insertions, 0 deletions
diff --git a/src/plugins/nat/test/test_pnat.py b/src/plugins/nat/test/test_pnat.py
new file mode 100644
index 00000000000..5e52fa9f135
--- /dev/null
+++ b/src/plugins/nat/test/test_pnat.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python3
+"""Policy 1:1 NAT functional tests"""
+
+import unittest
+from scapy.layers.inet import Ether, IP, UDP, ICMP
+from framework import VppTestCase, VppTestRunner
+from vpp_papi import VppEnum
+
+
+class TestPNAT(VppTestCase):
+ """ PNAT Test Case """
+ maxDiff = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestPNAT, cls).setUpClass()
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestPNAT, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestPNAT, self).setUp()
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestPNAT, self).tearDown()
+ if not self.vpp_dead:
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def validate(self, rx, expected):
+ self.assertEqual(rx, expected.__class__(expected))
+
+ def validate_bytes(self, rx, expected):
+ self.assertEqual(rx, expected)
+
+ def ping_check(self):
+ """ Verify non matching traffic works. """
+ p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+
+ icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+ ICMP())
+ reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
+ ICMP(type='echo-reply'))
+ rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0)
+ for p in rx:
+ reply[IP].id = p[IP].id
+ self.validate(p[1], reply)
+
+ def test_pnat(self):
+ """ PNAT test """
+
+ PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
+ PNAT_IP4_OUTPUT = \
+ VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
+
+ tests = [
+ {
+ 'input': PNAT_IP4_INPUT,
+ 'sw_if_index': self.pg0.sw_if_index,
+ 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
+ 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
+ UDP(dport=6871)),
+ 'reply': (IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4) /
+ UDP(dport=6871))
+ },
+ {
+ 'input': PNAT_IP4_OUTPUT,
+ 'sw_if_index': self.pg1.sw_if_index,
+ 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
+ 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(dport=6871)),
+ 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
+ UDP(dport=6871))
+ },
+ {
+ 'input': PNAT_IP4_INPUT,
+ 'sw_if_index': self.pg0.sw_if_index,
+ 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4,
+ 'dport': 5555},
+ 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
+ UDP(dport=6871)),
+ 'reply': (IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4) /
+ UDP(dport=5555))
+ },
+ {
+ 'input': PNAT_IP4_INPUT,
+ 'sw_if_index': self.pg0.sw_if_index,
+ 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0x8, 'dport': 5555},
+ 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(dport=6871, chksum=0)),
+ 'reply': (IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4) /
+ UDP(dport=5555, chksum=0))
+ },
+ {
+ 'input': PNAT_IP4_INPUT,
+ 'sw_if_index': self.pg0.sw_if_index,
+ 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1},
+ 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'},
+ 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP()),
+ 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(),
+ },
+ ]
+
+ p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ for t in tests:
+ rv = self.vapi.pnat_binding_add(match=t['match'],
+ rewrite=t['rewrite'])
+ self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
+ attachment=t['input'],
+ binding_index=rv.binding_index)
+
+ reply = t['reply']
+ reply[IP].ttl -= 1
+ rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1)
+ for p in rx:
+ # p.show2()
+ self.validate(p[1], reply)
+
+ self.ping_check()
+
+ self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
+ attachment=t['input'],
+ binding_index=rv.binding_index)
+ self.vapi.pnat_binding_del(binding_index=rv.binding_index)
+
+ def test_pnat_show(self):
+ """ PNAT show tests """
+
+ PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
+ PNAT_IP4_OUTPUT = \
+ VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
+
+ tests = [
+ {
+ 'input': PNAT_IP4_INPUT,
+ 'sw_if_index': self.pg0.sw_if_index,
+ 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
+ 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
+ UDP(dport=6871)),
+ 'reply': (IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4) /
+ UDP(dport=6871))
+ },
+ {
+ 'input': PNAT_IP4_OUTPUT,
+ 'sw_if_index': self.pg1.sw_if_index,
+ 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
+ 'dport': 6871},
+ 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
+ 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(dport=6871)),
+ 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
+ UDP(dport=6871))
+ },
+ ]
+ binding_index = []
+ for t in tests:
+ rv = self.vapi.pnat_binding_add(match=t['match'],
+ rewrite=t['rewrite'])
+ binding_index.append(rv.binding_index)
+ self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
+ attachment=t['input'],
+ binding_index=rv.binding_index)
+
+ rv, l = self.vapi.pnat_bindings_get()
+ self.assertEqual(len(l), len(tests))
+
+ rv, l = self.vapi.pnat_interfaces_get()
+ self.assertEqual(len(l), 2)
+
+ self.logger.info(self.vapi.cli("show pnat translations"))
+ self.logger.info(self.vapi.cli("show pnat interfaces"))
+
+ for i, t in enumerate(tests):
+ self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
+ attachment=t['input'],
+ binding_index=binding_index[i])
+ self.vapi.pnat_binding_del(binding_index=binding_index[i])
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)