diff options
Diffstat (limited to 'test/test_linux_cp.py')
-rw-r--r-- | test/test_linux_cp.py | 228 |
1 files changed, 124 insertions, 104 deletions
diff --git a/test/test_linux_cp.py b/test/test_linux_cp.py index b683954e1da..2d7669b717a 100644 --- a/test/test_linux_cp.py +++ b/test/test_linux_cp.py @@ -10,10 +10,20 @@ from util import reassemble4 from vpp_object import VppObject from framework import VppTestCase, VppTestRunner from vpp_ipip_tun_interface import VppIpIpTunInterface -from template_ipsec import TemplateIpsec, IpsecTun4Tests, \ - IpsecTun4, mk_scapy_crypt_key, config_tun_params -from template_ipsec import TemplateIpsec, IpsecTun4Tests, \ - IpsecTun4, mk_scapy_crypt_key, config_tun_params +from template_ipsec import ( + TemplateIpsec, + IpsecTun4Tests, + IpsecTun4, + mk_scapy_crypt_key, + config_tun_params, +) +from template_ipsec import ( + TemplateIpsec, + IpsecTun4Tests, + IpsecTun4, + mk_scapy_crypt_key, + config_tun_params, +) from test_ipsec_tun_if_esp import TemplateIpsecItf4 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface @@ -25,39 +35,43 @@ class VppLcpPair(VppObject): self.host = host def add_vpp_config(self): - self._test.vapi.cli("test lcp add phy %s host %s" % - (self.phy, self.host)) + self._test.vapi.cli("test lcp add phy %s host %s" % (self.phy, self.host)) self._test.registry.register(self, self._test.logger) return self def remove_vpp_config(self): - self._test.vapi.cli("test lcp del phy %s host %s" % - (self.phy, self.host)) + self._test.vapi.cli("test lcp del phy %s host %s" % (self.phy, self.host)) def object_id(self): - return "lcp:%d:%d" % (self.phy.sw_if_index, - self.host.sw_if_index) + return "lcp:%d:%d" % (self.phy.sw_if_index, self.host.sw_if_index) def query_vpp_config(self): - pairs = list(self._test.vapi.vpp.details_iter( - self._test.vapi.lcp_itf_pair_get)) + pairs = list(self._test.vapi.vpp.details_iter(self._test.vapi.lcp_itf_pair_get)) for p in pairs: - if p.phy_sw_if_index == self.phy.sw_if_index and \ - p.host_sw_if_index == self.host.sw_if_index: + if ( + p.phy_sw_if_index == self.phy.sw_if_index + and p.host_sw_if_index == self.host.sw_if_index + ): return True return False class TestLinuxCP(VppTestCase): - """ Linux Control Plane """ - - extra_vpp_plugin_config = ["plugin", - "linux_cp_plugin.so", - "{", "enable", "}", - "plugin", - "linux_cp_unittest_plugin.so", - "{", "enable", "}"] + """Linux Control Plane""" + + extra_vpp_plugin_config = [ + "plugin", + "linux_cp_plugin.so", + "{", + "enable", + "}", + "plugin", + "linux_cp_unittest_plugin.so", + "{", + "enable", + "}", + ] @classmethod def setUpClass(cls): @@ -86,7 +100,7 @@ class TestLinuxCP(VppTestCase): super(TestLinuxCP, self).tearDown() def test_linux_cp_tap(self): - """ Linux CP TAP """ + """Linux CP TAP""" # # Setup @@ -117,12 +131,12 @@ class TestLinuxCP(VppTestCase): # hosts to phys for phy, host in zip(phys, hosts): for j in range(N_HOSTS): - p = (Ether(src=phy.local_mac, - dst=phy.remote_hosts[j].mac) / - IP(src=phy.local_ip4, - dst=phy.remote_hosts[j].ip4) / - UDP(sport=1234, dport=1234) / - Raw()) + p = ( + Ether(src=phy.local_mac, dst=phy.remote_hosts[j].mac) + / IP(src=phy.local_ip4, dst=phy.remote_hosts[j].ip4) + / UDP(sport=1234, dport=1234) + / Raw() + ) rxs = self.send_and_expect(host, [p], phy) @@ -131,13 +145,13 @@ class TestLinuxCP(VppTestCase): self.assertEqual(p.show2(True), rx.show2(True)) # ARPs x-connect to phy - p = (Ether(dst="ff:ff:ff:ff:ff:ff", - src=phy.remote_hosts[j].mac) / - ARP(op="who-has", - hwdst=phy.remote_hosts[j].mac, - hwsrc=phy.local_mac, - psrc=phy.local_ip4, - pdst=phy.remote_hosts[j].ip4)) + p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP( + op="who-has", + hwdst=phy.remote_hosts[j].mac, + hwsrc=phy.local_mac, + psrc=phy.local_ip4, + pdst=phy.remote_hosts[j].ip4, + ) rxs = self.send_and_expect(host, [p], phy) @@ -148,12 +162,12 @@ class TestLinuxCP(VppTestCase): # phy to host for phy, host in zip(phys, hosts): for j in range(N_HOSTS): - p = (Ether(dst=phy.local_mac, - src=phy.remote_hosts[j].mac) / - IP(dst=phy.local_ip4, - src=phy.remote_hosts[j].ip4) / - UDP(sport=1234, dport=1234) / - Raw()) + p = ( + Ether(dst=phy.local_mac, src=phy.remote_hosts[j].mac) + / IP(dst=phy.local_ip4, src=phy.remote_hosts[j].ip4) + / UDP(sport=1234, dport=1234) + / Raw() + ) rxs = self.send_and_expect(phy, [p], host) @@ -162,13 +176,13 @@ class TestLinuxCP(VppTestCase): self.assertEqual(p.show2(True), rx.show2(True)) # ARPs rx'd on the phy are sent to the host - p = (Ether(dst="ff:ff:ff:ff:ff:ff", - src=phy.remote_hosts[j].mac) / - ARP(op="is-at", - hwsrc=phy.remote_hosts[j].mac, - hwdst=phy.local_mac, - pdst=phy.local_ip4, - psrc=phy.remote_hosts[j].ip4)) + p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP( + op="is-at", + hwsrc=phy.remote_hosts[j].mac, + hwdst=phy.local_mac, + pdst=phy.local_ip4, + psrc=phy.remote_hosts[j].ip4, + ) rxs = self.send_and_expect(phy, [p], host) @@ -181,7 +195,7 @@ class TestLinuxCP(VppTestCase): phy.unconfig_ip4() def test_linux_cp_tun(self): - """ Linux CP TUN """ + """Linux CP TUN""" # # Setup @@ -198,15 +212,11 @@ class TestLinuxCP(VppTestCase): phy.resolve_ndp() tun4 = VppIpIpTunInterface( - self, - phy, - phy.local_ip4, - phy.remote_ip4).add_vpp_config() + self, phy, phy.local_ip4, phy.remote_ip4 + ).add_vpp_config() tun6 = VppIpIpTunInterface( - self, - phy, - phy.local_ip6, - phy.remote_ip6).add_vpp_config() + self, phy, phy.local_ip6, phy.remote_ip6 + ).add_vpp_config() tuns = [tun4, tun6] tun4.admin_up() @@ -226,9 +236,7 @@ class TestLinuxCP(VppTestCase): # # host to phy for v4 - p = (IP(src=tun4.local_ip4, dst="2.2.2.2") / - UDP(sport=1234, dport=1234) / - Raw()) + p = IP(src=tun4.local_ip4, dst="2.2.2.2") / UDP(sport=1234, dport=1234) / Raw() rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy) @@ -242,9 +250,7 @@ class TestLinuxCP(VppTestCase): self.assertEqual(inner.dst, "2.2.2.2") # host to phy for v6 - p = (IPv6(src=tun6.local_ip6, dst="2::2") / - UDP(sport=1234, dport=1234) / - Raw()) + p = IPv6(src=tun6.local_ip6, dst="2::2") / UDP(sport=1234, dport=1234) / Raw() rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy) @@ -257,11 +263,13 @@ class TestLinuxCP(VppTestCase): self.assertEqual(inner.dst, "2::2") # phy to host v4 - p = (Ether(dst=phy.local_mac, src=phy.remote_mac) / - IP(dst=phy.local_ip4, src=phy.remote_ip4) / - IP(dst=tun4.local_ip4, src=tun4.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw()) + p = ( + Ether(dst=phy.local_mac, src=phy.remote_mac) + / IP(dst=phy.local_ip4, src=phy.remote_ip4) + / IP(dst=tun4.local_ip4, src=tun4.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw() + ) rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4) for rx in rxs: @@ -270,11 +278,13 @@ class TestLinuxCP(VppTestCase): self.assertEqual(rx[IP].src, tun4.remote_ip4) # phy to host v6 - p = (Ether(dst=phy.local_mac, src=phy.remote_mac) / - IPv6(dst=phy.local_ip6, src=phy.remote_ip6) / - IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) / - UDP(sport=1234, dport=1234) / - Raw()) + p = ( + Ether(dst=phy.local_mac, src=phy.remote_mac) + / IPv6(dst=phy.local_ip6, src=phy.remote_ip6) + / IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) + / UDP(sport=1234, dport=1234) + / Raw() + ) rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5) for rx in rxs: @@ -290,17 +300,21 @@ class TestLinuxCP(VppTestCase): tun6.unconfig_ip6() -class TestLinuxCPIpsec(TemplateIpsec, - TemplateIpsecItf4, - IpsecTun4): - """ IPsec Interface IPv4 """ +class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4): + """IPsec Interface IPv4""" - extra_vpp_plugin_config = ["plugin", - "linux_cp_plugin.so", - "{", "enable", "}", - "plugin", - "linux_cp_unittest_plugin.so", - "{", "enable", "}"] + extra_vpp_plugin_config = [ + "plugin", + "linux_cp_plugin.so", + "{", + "enable", + "}", + "plugin", + "linux_cp_unittest_plugin.so", + "{", + "enable", + "}", + ] def setUp(self): super(TestLinuxCPIpsec, self).setUp() @@ -347,16 +361,19 @@ class TestLinuxCPIpsec(TemplateIpsec, self.assert_equal(rx[IP].dst, p.tun_if.local_ip4) self.assert_packet_checksums_valid(rx) - def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, - payload_size=54): - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - sa.encrypt(IP(src=src, dst=dst) / - UDP(sport=1111, dport=2222) / - Raw(b'X' * payload_size)) - for i in range(count)] + def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54): + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / sa.encrypt( + IP(src=src, dst=dst) + / UDP(sport=1111, dport=2222) + / Raw(b"X" * payload_size) + ) + for i in range(count) + ] def test_linux_cp_ipsec4_tun(self): - """ Linux CP Ipsec TUN """ + """Linux CP Ipsec TUN""" # # Setup @@ -370,9 +387,7 @@ class TestLinuxCPIpsec(TemplateIpsec, p = self.ipv4_params self.config_network(p) - self.config_sa_tun(p, - self.pg0.local_ip4, - self.pg0.remote_ip4) + self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4) self.config_protect(p) pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config() @@ -386,19 +401,24 @@ class TestLinuxCPIpsec(TemplateIpsec, # # host to phy for v4 - pkt = (IP(src=p.tun_if.local_ip4, - dst=p.tun_if.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw()) + pkt = ( + IP(src=p.tun_if.local_ip4, dst=p.tun_if.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw() + ) rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if) self.verify_encrypted(p, p.vpp_tun_sa, rxs) # phy to host for v4 - pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if, - src=p.tun_if.remote_ip4, - dst=p.tun_if.local_ip4, - count=N_PKTS) + pkts = self.gen_encrypt_pkts( + p, + p.scapy_tun_sa, + self.tun_if, + src=p.tun_if.remote_ip4, + dst=p.tun_if.local_ip4, + count=N_PKTS, + ) rxs = self.send_and_expect(self.tun_if, pkts, self.host) self.verify_decrypted(p, rxs) @@ -409,5 +429,5 @@ class TestLinuxCPIpsec(TemplateIpsec, self.unconfig_network(p) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |