aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_linux_cp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_linux_cp.py')
-rw-r--r--test/test_linux_cp.py237
1 files changed, 124 insertions, 113 deletions
diff --git a/test/test_linux_cp.py b/test/test_linux_cp.py
index 4cbcf6c5a07..a9ff242e215 100644
--- a/test/test_linux_cp.py
+++ b/test/test_linux_cp.py
@@ -4,18 +4,22 @@ import unittest
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6, Raw
-from scapy.layers.l2 import Ether, ARP, Dot1Q
+from scapy.layers.l2 import Ether, ARP
from util import reassemble4
from vpp_object import VppObject
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
from vpp_ipip_tun_interface import VppIpIpTunInterface
-from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
- IpsecTun4, mk_scapy_crypt_key, config_tun_params
-from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
- IpsecTun4, mk_scapy_crypt_key, config_tun_params
+from template_ipsec import (
+ TemplateIpsec,
+ IpsecTun4,
+)
+from template_ipsec import (
+ TemplateIpsec,
+ IpsecTun4,
+)
from test_ipsec_tun_if_esp import TemplateIpsecItf4
-from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
class VppLcpPair(VppObject):
@@ -25,39 +29,43 @@ class VppLcpPair(VppObject):
self.host = host
def add_vpp_config(self):
- self._test.vapi.cli("test lcp add phy %s host %s" %
- (self.phy, self.host))
+ self._test.vapi.cli("test lcp add phy %s host %s" % (self.phy, self.host))
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
- self._test.vapi.cli("test lcp del phy %s host %s" %
- (self.phy, self.host))
+ self._test.vapi.cli("test lcp del phy %s host %s" % (self.phy, self.host))
def object_id(self):
- return "lcp:%d:%d" % (self.phy.sw_if_index,
- self.host.sw_if_index)
+ return "lcp:%d:%d" % (self.phy.sw_if_index, self.host.sw_if_index)
def query_vpp_config(self):
- pairs = list(self._test.vapi.vpp.details_iter(
- self._test.vapi.lcp_itf_pair_get))
+ pairs = list(self._test.vapi.vpp.details_iter(self._test.vapi.lcp_itf_pair_get))
for p in pairs:
- if p.phy_sw_if_index == self.phy.sw_if_index and \
- p.host_sw_if_index == self.host.sw_if_index:
+ if (
+ p.phy_sw_if_index == self.phy.sw_if_index
+ and p.host_sw_if_index == self.host.sw_if_index
+ ):
return True
return False
class TestLinuxCP(VppTestCase):
- """ Linux Control Plane """
-
- extra_vpp_plugin_config = ["plugin",
- "linux_cp_plugin.so",
- "{", "enable", "}",
- "plugin",
- "linux_cp_unittest_plugin.so",
- "{", "enable", "}"]
+ """Linux Control Plane"""
+
+ extra_vpp_plugin_config = [
+ "plugin",
+ "linux_cp_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "linux_cp_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ ]
@classmethod
def setUpClass(cls):
@@ -86,7 +94,7 @@ class TestLinuxCP(VppTestCase):
super(TestLinuxCP, self).tearDown()
def test_linux_cp_tap(self):
- """ Linux CP TAP """
+ """Linux CP TAP"""
#
# Setup
@@ -117,12 +125,12 @@ class TestLinuxCP(VppTestCase):
# hosts to phys
for phy, host in zip(phys, hosts):
for j in range(N_HOSTS):
- p = (Ether(src=phy.local_mac,
- dst=phy.remote_hosts[j].mac) /
- IP(src=phy.local_ip4,
- dst=phy.remote_hosts[j].ip4) /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = (
+ Ether(src=phy.local_mac, dst=phy.remote_hosts[j].mac)
+ / IP(src=phy.local_ip4, dst=phy.remote_hosts[j].ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
rxs = self.send_and_expect(host, [p], phy)
@@ -131,13 +139,13 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(p.show2(True), rx.show2(True))
# ARPs x-connect to phy
- p = (Ether(dst="ff:ff:ff:ff:ff:ff",
- src=phy.remote_hosts[j].mac) /
- ARP(op="who-has",
- hwdst=phy.remote_hosts[j].mac,
- hwsrc=phy.local_mac,
- psrc=phy.local_ip4,
- pdst=phy.remote_hosts[j].ip4))
+ p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
+ op="who-has",
+ hwdst=phy.remote_hosts[j].mac,
+ hwsrc=phy.local_mac,
+ psrc=phy.local_ip4,
+ pdst=phy.remote_hosts[j].ip4,
+ )
rxs = self.send_and_expect(host, [p], phy)
@@ -148,12 +156,12 @@ class TestLinuxCP(VppTestCase):
# phy to host
for phy, host in zip(phys, hosts):
for j in range(N_HOSTS):
- p = (Ether(dst=phy.local_mac,
- src=phy.remote_hosts[j].mac) /
- IP(dst=phy.local_ip4,
- src=phy.remote_hosts[j].ip4) /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = (
+ Ether(dst=phy.local_mac, src=phy.remote_hosts[j].mac)
+ / IP(dst=phy.local_ip4, src=phy.remote_hosts[j].ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
rxs = self.send_and_expect(phy, [p], host)
@@ -162,13 +170,13 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(p.show2(True), rx.show2(True))
# ARPs rx'd on the phy are sent to the host
- p = (Ether(dst="ff:ff:ff:ff:ff:ff",
- src=phy.remote_hosts[j].mac) /
- ARP(op="is-at",
- hwsrc=phy.remote_hosts[j].mac,
- hwdst=phy.local_mac,
- pdst=phy.local_ip4,
- psrc=phy.remote_hosts[j].ip4))
+ p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
+ op="is-at",
+ hwsrc=phy.remote_hosts[j].mac,
+ hwdst=phy.local_mac,
+ pdst=phy.local_ip4,
+ psrc=phy.remote_hosts[j].ip4,
+ )
rxs = self.send_and_expect(phy, [p], host)
@@ -181,7 +189,7 @@ class TestLinuxCP(VppTestCase):
phy.unconfig_ip4()
def test_linux_cp_tun(self):
- """ Linux CP TUN """
+ """Linux CP TUN"""
#
# Setup
@@ -198,15 +206,11 @@ class TestLinuxCP(VppTestCase):
phy.resolve_ndp()
tun4 = VppIpIpTunInterface(
- self,
- phy,
- phy.local_ip4,
- phy.remote_ip4).add_vpp_config()
+ self, phy, phy.local_ip4, phy.remote_ip4
+ ).add_vpp_config()
tun6 = VppIpIpTunInterface(
- self,
- phy,
- phy.local_ip6,
- phy.remote_ip6).add_vpp_config()
+ self, phy, phy.local_ip6, phy.remote_ip6
+ ).add_vpp_config()
tuns = [tun4, tun6]
tun4.admin_up()
@@ -226,9 +230,7 @@ class TestLinuxCP(VppTestCase):
#
# host to phy for v4
- p = (IP(src=tun4.local_ip4, dst="2.2.2.2") /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = IP(src=tun4.local_ip4, dst="2.2.2.2") / UDP(sport=1234, dport=1234) / Raw()
rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
@@ -242,9 +244,7 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(inner.dst, "2.2.2.2")
# host to phy for v6
- p = (IPv6(src=tun6.local_ip6, dst="2::2") /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = IPv6(src=tun6.local_ip6, dst="2::2") / UDP(sport=1234, dport=1234) / Raw()
rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
@@ -257,11 +257,13 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(inner.dst, "2::2")
# phy to host v4
- p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
- IP(dst=phy.local_ip4, src=phy.remote_ip4) /
- IP(dst=tun4.local_ip4, src=tun4.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = (
+ Ether(dst=phy.local_mac, src=phy.remote_mac)
+ / IP(dst=phy.local_ip4, src=phy.remote_ip4)
+ / IP(dst=tun4.local_ip4, src=tun4.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
for rx in rxs:
@@ -270,11 +272,13 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(rx[IP].src, tun4.remote_ip4)
# phy to host v6
- p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
- IPv6(dst=phy.local_ip6, src=phy.remote_ip6) /
- IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) /
- UDP(sport=1234, dport=1234) /
- Raw())
+ p = (
+ Ether(dst=phy.local_mac, src=phy.remote_mac)
+ / IPv6(dst=phy.local_ip6, src=phy.remote_ip6)
+ / IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
for rx in rxs:
@@ -290,17 +294,21 @@ class TestLinuxCP(VppTestCase):
tun6.unconfig_ip6()
-class TestLinuxCPIpsec(TemplateIpsec,
- TemplateIpsecItf4,
- IpsecTun4):
- """ IPsec Interface IPv4 """
+class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
+ """IPsec Interface IPv4"""
- extra_vpp_plugin_config = ["plugin",
- "linux_cp_plugin.so",
- "{", "enable", "}",
- "plugin",
- "linux_cp_unittest_plugin.so",
- "{", "enable", "}"]
+ extra_vpp_plugin_config = [
+ "plugin",
+ "linux_cp_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "linux_cp_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ ]
def setUp(self):
super(TestLinuxCPIpsec, self).setUp()
@@ -347,16 +355,19 @@ class TestLinuxCPIpsec(TemplateIpsec,
self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
self.assert_packet_checksums_valid(rx)
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=54):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) /
- UDP(sport=1111, dport=2222) /
- Raw(b'X' * payload_size))
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=src, dst=dst)
+ / UDP(sport=1111, dport=2222)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
def test_linux_cp_ipsec4_tun(self):
- """ Linux CP Ipsec TUN """
+ """Linux CP Ipsec TUN"""
#
# Setup
@@ -370,14 +381,12 @@ class TestLinuxCPIpsec(TemplateIpsec,
p = self.ipv4_params
self.config_network(p)
- self.config_sa_tun(p,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(p)
pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
- self.logger.error(self.vapi.cli("sh int addr"))
+ self.logger.info(self.vapi.cli("sh int addr"))
self.logger.info(self.vapi.cli("sh lcp"))
self.logger.info(self.vapi.cli("sh ip punt redirect"))
@@ -386,24 +395,26 @@ class TestLinuxCPIpsec(TemplateIpsec,
#
# host to phy for v4
- pkt = (IP(src=p.tun_if.local_ip4,
- dst=p.tun_if.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw())
+ pkt = (
+ IP(src=p.tun_if.local_ip4, dst=p.tun_if.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
self.verify_encrypted(p, p.vpp_tun_sa, rxs)
# phy to host for v4
- pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
- src=p.tun_if.remote_ip4,
- dst=p.tun_if.local_ip4,
- count=N_PKTS)
- try:
- rxs = self.send_and_expect(self.tun_if, pkts, self.host)
- self.verify_decrypted(p, rxs)
- finally:
- self.logger.error(self.vapi.cli("sh trace"))
+ pkts = self.gen_encrypt_pkts(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.tun_if.remote_ip4,
+ dst=p.tun_if.local_ip4,
+ count=N_PKTS,
+ )
+ rxs = self.send_and_expect(self.tun_if, pkts, self.host)
+ self.verify_decrypted(p, rxs)
# cleanup
pair.remove_vpp_config()
@@ -412,5 +423,5 @@ class TestLinuxCPIpsec(TemplateIpsec,
self.unconfig_network(p)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)