diff options
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index e4797353ecd..d9a9d1b78c1 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -14,6 +14,12 @@ from framework import VppTestCase, VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 from vpp_papi import VppEnum +from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, \ + VppIpsecSpdItfBinding +from ipaddress import ip_address +from re import search +from os import popen + class IPsecIPv4Params: @@ -1571,5 +1577,210 @@ class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests): pass +class SpdFlowCacheTemplate(VppTestCase): + @classmethod + def setUpConstants(cls): + super(SpdFlowCacheTemplate, cls).setUpConstants() + # Override this method with required cmdline parameters e.g. + # cls.vpp_cmdline.extend(["ipsec", "{", + # "ipv4-outbound-spd-flow-cache on", + # "}"]) + # cls.logger.info("VPP modified cmdline is %s" % " " + # .join(cls.vpp_cmdline)) + + def setUp(self): + super(SpdFlowCacheTemplate, self).setUp() + # store SPD objects so we can remove configs on tear down + self.spd_objs = [] + self.spd_policies = [] + + def tearDown(self): + # remove SPD policies + for obj in self.spd_policies: + obj.remove_vpp_config() + self.spd_policies = [] + # remove SPD items (interface bindings first, then SPD) + for obj in reversed(self.spd_objs): + obj.remove_vpp_config() + self.spd_objs = [] + # close down pg intfs + for pg in self.pg_interfaces: + pg.unconfig_ip4() + pg.admin_down() + super(SpdFlowCacheTemplate, self).tearDown() + + def create_interfaces(self, num_ifs=2): + # create interfaces pg0 ... pg<num_ifs> + self.create_pg_interfaces(range(num_ifs)) + for pg in self.pg_interfaces: + # put the interface up + pg.admin_up() + # configure IPv4 address on the interface + pg.config_ip4() + # resolve ARP, so that we know VPP MAC + pg.resolve_arp() + self.logger.info(self.vapi.ppcli("show int addr")) + + def spd_create_and_intf_add(self, spd_id, pg_list): + spd = VppIpsecSpd(self, spd_id) + spd.add_vpp_config() + self.spd_objs.append(spd) + for pg in pg_list: + spdItf = VppIpsecSpdItfBinding(self, spd, pg) + spdItf.add_vpp_config() + self.spd_objs.append(spdItf) + + def get_policy(self, policy_type): + e = VppEnum.vl_api_ipsec_spd_action_t + if policy_type == "protect": + return e.IPSEC_API_SPD_ACTION_PROTECT + elif policy_type == "bypass": + return e.IPSEC_API_SPD_ACTION_BYPASS + elif policy_type == "discard": + return e.IPSEC_API_SPD_ACTION_DISCARD + else: + raise Exception("Invalid policy type: %s", policy_type) + + def spd_add_rem_policy(self, spd_id, src_if, dst_if, + proto, is_out, priority, policy_type, + remove=False, all_ips=False): + spd = VppIpsecSpd(self, spd_id) + + if all_ips: + src_range_low = ip_address("0.0.0.0") + src_range_high = ip_address("255.255.255.255") + dst_range_low = ip_address("0.0.0.0") + dst_range_high = ip_address("255.255.255.255") + else: + src_range_low = src_if.remote_ip4 + src_range_high = src_if.remote_ip4 + dst_range_low = dst_if.remote_ip4 + dst_range_high = dst_if.remote_ip4 + + spdEntry = VppIpsecSpdEntry(self, spd, 0, + src_range_low, + src_range_high, + dst_range_low, + dst_range_high, + proto, + priority=priority, + policy=self.get_policy(policy_type), + is_outbound=is_out) + + if(remove is False): + spdEntry.add_vpp_config() + self.spd_policies.append(spdEntry) + else: + spdEntry.remove_vpp_config() + self.spd_policies.remove(spdEntry) + self.logger.info(self.vapi.ppcli("show ipsec all")) + return spdEntry + + def create_stream(self, src_if, dst_if, pkt_count, + src_prt=1234, dst_prt=5678): + packets = [] + for i in range(pkt_count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / + IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / + UDP(sport=src_prt, dport=dst_prt) / + Raw(payload)) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet) + # make sure the indexes match + self.assert_equal(payload_info.src, src_if.sw_if_index, + "source sw_if_index") + self.assert_equal(payload_info.dst, dst_if.sw_if_index, + "destination sw_if_index") + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, + dst_if.sw_if_index, + packet_info) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal(payload_info.index, packet_info.index, + "packet info index") + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IP].src, + "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, + "UDP source port") + except Exception as e: + self.logger.error(ppp("Unexpected or invalid packet:", + packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, + dst_if.sw_if_index, + packet_info) + self.assertIsNone(remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name)) + + def verify_policy_match(self, pkt_count, spdEntry): + self.logger.info( + "XXXX %s %s", str(spdEntry), str(spdEntry.get_stats())) + matched_pkts = spdEntry.get_stats().get('packets') + self.logger.info( + "Policy %s matched: %d pkts", str(spdEntry), matched_pkts) + self.assert_equal(pkt_count, matched_pkts) + + def get_spd_flow_cache_entries(self): + """ 'show ipsec spd' output: + ip4-outbound-spd-flow-cache-entries: 0 + """ + show_ipsec_reply = self.vapi.cli("show ipsec spd") + # match the relevant section of 'show ipsec spd' output + regex_match = re.search( + 'ip4-outbound-spd-flow-cache-entries: (.*)', + show_ipsec_reply, re.DOTALL) + if regex_match is None: + raise Exception("Unable to find spd flow cache entries \ + in \'show ipsec spd\' CLI output - regex failed to match") + else: + try: + num_entries = int(regex_match.group(1)) + except ValueError: + raise Exception("Unable to get spd flow cache entries \ + from \'show ipsec spd\' string: %s", regex_match.group(0)) + self.logger.info("%s", regex_match.group(0)) + return num_entries + + def verify_num_outbound_flow_cache_entries(self, expected_elements): + self.assertEqual(self.get_spd_flow_cache_entries(), expected_elements) + + def crc32_supported(self): + # lscpu is part of util-linux package, available on all Linux Distros + stream = os.popen('lscpu') + cpu_info = stream.read() + # feature/flag "crc32" on Aarch64 and "sse4_2" on x86 + # see vppinfra/crc32.h + if "crc32" or "sse4_2" in cpu_info: + self.logger.info("\ncrc32 supported:\n" + cpu_info) + return True + else: + self.logger.info("\ncrc32 NOT supported:\n" + cpu_info) + return False + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |