diff options
Diffstat (limited to 'src/plugins/nat/test/test_pnat.py')
-rw-r--r-- | src/plugins/nat/test/test_pnat.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/plugins/nat/test/test_pnat.py b/src/plugins/nat/test/test_pnat.py new file mode 100644 index 00000000000..5e52fa9f135 --- /dev/null +++ b/src/plugins/nat/test/test_pnat.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +"""Policy 1:1 NAT functional tests""" + +import unittest +from scapy.layers.inet import Ether, IP, UDP, ICMP +from framework import VppTestCase, VppTestRunner +from vpp_papi import VppEnum + + +class TestPNAT(VppTestCase): + """ PNAT Test Case """ + maxDiff = None + + @classmethod + def setUpClass(cls): + super(TestPNAT, cls).setUpClass() + cls.create_pg_interfaces(range(2)) + cls.interfaces = list(cls.pg_interfaces) + + @classmethod + def tearDownClass(cls): + super(TestPNAT, cls).tearDownClass() + + def setUp(self): + super(TestPNAT, self).setUp() + for i in self.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def tearDown(self): + super(TestPNAT, self).tearDown() + if not self.vpp_dead: + for i in self.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + + def validate(self, rx, expected): + self.assertEqual(rx, expected.__class__(expected)) + + def validate_bytes(self, rx, expected): + self.assertEqual(rx, expected) + + def ping_check(self): + """ Verify non matching traffic works. """ + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + + icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + ICMP()) + reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / + ICMP(type='echo-reply')) + rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0) + for p in rx: + reply[IP].id = p[IP].id + self.validate(p[1], reply) + + def test_pnat(self): + """ PNAT test """ + + PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT + PNAT_IP4_OUTPUT = \ + VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT + + tests = [ + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_OUTPUT, + 'sw_if_index': self.pg1.sw_if_index, + 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871)), + 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4, + 'dport': 5555}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=5555)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x8, 'dport': 5555}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871, chksum=0)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=5555, chksum=0)) + }, + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1}, + 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + ICMP()), + 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(), + }, + ] + + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + for t in tests: + rv = self.vapi.pnat_binding_add(match=t['match'], + rewrite=t['rewrite']) + self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + + reply = t['reply'] + reply[IP].ttl -= 1 + rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1) + for p in rx: + # p.show2() + self.validate(p[1], reply) + + self.ping_check() + + self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + self.vapi.pnat_binding_del(binding_index=rv.binding_index) + + def test_pnat_show(self): + """ PNAT show tests """ + + PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT + PNAT_IP4_OUTPUT = \ + VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT + + tests = [ + { + 'input': PNAT_IP4_INPUT, + 'sw_if_index': self.pg0.sw_if_index, + 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, + 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / + UDP(dport=6871)), + 'reply': (IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + { + 'input': PNAT_IP4_OUTPUT, + 'sw_if_index': self.pg1.sw_if_index, + 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, + 'dport': 6871}, + 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, + 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(dport=6871)), + 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / + UDP(dport=6871)) + }, + ] + binding_index = [] + for t in tests: + rv = self.vapi.pnat_binding_add(match=t['match'], + rewrite=t['rewrite']) + binding_index.append(rv.binding_index) + self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=rv.binding_index) + + rv, l = self.vapi.pnat_bindings_get() + self.assertEqual(len(l), len(tests)) + + rv, l = self.vapi.pnat_interfaces_get() + self.assertEqual(len(l), 2) + + self.logger.info(self.vapi.cli("show pnat translations")) + self.logger.info(self.vapi.cli("show pnat interfaces")) + + for i, t in enumerate(tests): + self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], + attachment=t['input'], + binding_index=binding_index[i]) + self.vapi.pnat_binding_del(binding_index=binding_index[i]) + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |