diff options
author | Ole Troan <ot@cisco.com> | 2021-01-12 21:49:38 +0100 |
---|---|---|
committer | Neale Ranns <neale@graphiant.com> | 2021-02-05 13:27:48 +0000 |
commit | 18327be5d458f9f73c12d76e677ee5a068ec6b10 (patch) | |
tree | bac6dbc08280e5bd6d5749ea56c862e6cdc38434 /src/plugins/nat/test | |
parent | 490b92738f3cc1c8d534abd6dee8dba942cb652d (diff) |
nat: 1:1 policy NAT
A NAT sub-plugin doing statically configured match/rewrite on IP4 input or output.
It's stateless (no connection tracking).
Currently it supports rewriting of SA, DA and TCP/UDP ports.
It should be simple to add new rewrites if required.
API:
pnat_binding_add, pnat_binding_del, pnat_bindings_get, pnat_interfaces_get
CLI:
set pnat translation interface <name> match <5-tuple> rewrite <5-tuple> {in|out} [del]
show pnat translations
show pnat interfaces
Trying a new C based unit testing scheme. Where the graph node is tested
in isolation. See pnat/pnat_test.c.
Also added new cmake targets to generate coverage directly.
E.g.:
make test_pnat-ccov-report
File '/vpp/sdnat/src/plugins/nat/pnat/pnat.c':
Name Regions Miss Cover Lines Miss Cover
------------------------------------------------------------------------------------
pnat_interface_by_sw_if_index 39 8 79.49% 13 0 100.00%
pnat_instructions_from_mask 9 0 100.00% 13 0 100.00%
pnat_binding_add 64 8 87.50% 31 2 93.55%
pnat_flow_lookup 4 4 0.00% 10 10 0.00%
pnat_binding_attach 104 75 27.88% 33 6 81.82%
pnat_binding_detach 30 5 83.33% 23 2 91.30%
pnat_binding_del 97 33 65.98% 17 3 82.35%
pnat.c:pnat_calc_key_from_5tuple 9 1 88.89% 14 1 92.86%
pnat.c:pnat_interface_check_mask 10 2 80.00% 11 2 81.82%
pnat.c:pnat_enable 5 0 100.00% 11 0 100.00%
pnat.c:pnat_enable_interface 107 26 75.70% 60 15 75.00%
pnat.c:pnat_disable_interface 91 30 67.03% 32 7 78.12%
pnat.c:pnat_disable 7 2 71.43% 13 7 46.15%
------------------------------------------------------------------------------------
TOTAL 576 194 66.32% 281 55 80.43%
File '/vpp/sdnat/src/plugins/nat/pnat/pnat_node.h':
Name Regions Miss Cover Lines Miss Cover
------------------------------------------------------------------------------------
pnat_test.c:pnat_node_inline 67 11 83.58% 115 1 99.13%
pnat_test.c:pnat_calc_key 9 2 77.78% 14 2 85.71%
pnat_test.c:pnat_rewrite_ip4 55 11 80.00% 60 12 80.00%
pnat_test.c:format_pnat_trace 1 1 0.00% 12 12 0.00%
pnat_node.c:pnat_node_inline 63 63 0.00% 115 115 0.00%
pnat_node.c:pnat_calc_key 9 9 0.00% 14 14 0.00%
pnat_node.c:pnat_rewrite_ip4 55 55 0.00% 60 60 0.00%
pnat_node.c:format_pnat_trace 5 5 0.00% 12 12 0.00%
------------------------------------------------------------------------------------
TOTAL 264 157 40.53% 402 228 43.28%
Type: feature
Change-Id: I9c897f833603054a8303e7369ebff6512517c9e0
Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/plugins/nat/test')
-rw-r--r-- | src/plugins/nat/test/test_pnat.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/plugins/nat/test/test_pnat.py b/src/plugins/nat/test/test_pnat.py new file mode 100644 index 00000000000..5e52fa9f135 --- /dev/null +++ b/src/plugins/nat/test/test_pnat.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +"""Policy 1:1 NAT functional tests""" + +import unittest +from scapy.layers.inet import Ether, IP, UDP, ICMP +from framework import VppTestCase, VppTestRunner +from vpp_papi import VppEnum + + +class TestPNAT(VppTestCase): + """ PNAT Test Case """ + maxDiff = None + + @classmethod + def setUpClass(cls): + super(TestPNAT, cls).setUpClass() + cls.create_pg_interfaces(range(2)) + cls.interfaces = list(cls.pg_interfaces) + + @classmethod + def tearDownClass(cls): + super(TestPNAT, cls).tearDownClass() + + def setUp(self): + super(TestPNAT, self).setUp() + for i in self.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def tearDown(self): + super(TestPNAT, self).tearDown() + if not self.vpp_dead: + for i in self.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + + def validate(self, rx, expected): + self.assertEqual(rx, expected.__class__(expected)) + + def validate_bytes(self, rx, expected): + self.assertEqual(rx, expected) + + def ping_check(self): + """ Verify non matching traffic works. """ + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + + icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + ICMP()) + reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / + ICMP(type='echo-reply')) + rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0) + for p in rx: + reply[IP].id = p[IP].id + self.validate(p[1], reply) + + def test_pnat(self): + """ PNAT test """ + + PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT + PNAT_IP4_OUTPUT = \ + VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT + + tests = [ + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_OUTPUT, + 'sw_if_index': self.pg1.sw_if_index, + 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871)), + 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4, + 'dport': 5555}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=5555)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x8, 'dport': 5555}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871, chksum=0)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=5555, chksum=0)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1}, + 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + ICMP()), + 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(), + }, + ] + + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + for t in tests: + rv = self.vapi.pnat_binding_add(match=t['match'], + rewrite=t['rewrite']) + self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + + reply = t['reply'] + reply[IP].ttl -= 1 + rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1) + for p in rx: + # p.show2() + self.validate(p[1], reply) + + self.ping_check() + + self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + self.vapi.pnat_binding_del(binding_index=rv.binding_index) + + def test_pnat_show(self): + """ PNAT show tests """ + + PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT + PNAT_IP4_OUTPUT = \ + VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT + + tests = [ + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_OUTPUT, + 'sw_if_index': self.pg1.sw_if_index, + 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871)), + 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + ] + binding_index = [] + for t in tests: + rv = self.vapi.pnat_binding_add(match=t['match'], + rewrite=t['rewrite']) + binding_index.append(rv.binding_index) + self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + + rv, l = self.vapi.pnat_bindings_get() + self.assertEqual(len(l), len(tests)) + + rv, l = self.vapi.pnat_interfaces_get() + self.assertEqual(len(l), 2) + + self.logger.info(self.vapi.cli("show pnat translations")) + self.logger.info(self.vapi.cli("show pnat interfaces")) + + for i, t in enumerate(tests): + self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=binding_index[i]) + self.vapi.pnat_binding_del(binding_index=binding_index[i]) + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |