diff options
author | Piotr Bronowski <piotrx.bronowski@intel.com> | 2022-07-08 12:45:51 +0000 |
---|---|---|
committer | Fan Zhang <roy.fan.zhang@intel.com> | 2022-07-15 12:45:34 +0000 |
commit | 651cc01b642f0fb373ce15533bf820196b1b5d8f (patch) | |
tree | a2e58c393cfecd8a200df41c5cfd393b66bb30bc /test/template_ipsec.py | |
parent | 86f8208af43efaa71b8e1c0a5ff86fc233456d9d (diff) |
tests: add fast path ipv6 python tests for outbound policy matching
This patch introduces set of python tests for fast path ipv6, based on
ipv4 tests. Some missing parts of ipsec framework has been added
in order to test ipv6 implementation.
Type: feature
Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Change-Id: Icc13322787d76485c08106bad2cb071947ad9846
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 1b1c9aaa25d..2295b7579c5 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -2088,5 +2088,200 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd): return False +class IPSecIPv6Fwd(VppTestCase): + """Test IPSec by capturing and verifying IPv6 forwarded pkts""" + + @classmethod + def setUpConstants(cls): + super(IPSecIPv6Fwd, cls).setUpConstants() + + def setUp(self): + super(IPSecIPv6Fwd, self).setUp() + # store SPD objects so we can remove configs on tear down + self.spd_objs = [] + self.spd_policies = [] + + def tearDown(self): + # remove SPD policies + for obj in self.spd_policies: + obj.remove_vpp_config() + self.spd_policies = [] + # remove SPD items (interface bindings first, then SPD) + for obj in reversed(self.spd_objs): + obj.remove_vpp_config() + self.spd_objs = [] + # close down pg intfs + for pg in self.pg_interfaces: + pg.unconfig_ip6() + pg.admin_down() + super(IPSecIPv6Fwd, self).tearDown() + + def create_interfaces(self, num_ifs=2): + # create interfaces pg0 ... pg<num_ifs> + self.create_pg_interfaces(range(num_ifs)) + for pg in self.pg_interfaces: + # put the interface up + pg.admin_up() + # configure IPv6 address on the interface + pg.config_ip6() + pg.resolve_ndp() + self.logger.info(self.vapi.ppcli("show int addr")) + + def spd_create_and_intf_add(self, spd_id, pg_list): + spd = VppIpsecSpd(self, spd_id) + spd.add_vpp_config() + self.spd_objs.append(spd) + for pg in pg_list: + spdItf = VppIpsecSpdItfBinding(self, spd, pg) + spdItf.add_vpp_config() + self.spd_objs.append(spdItf) + + def get_policy(self, policy_type): + e = VppEnum.vl_api_ipsec_spd_action_t + if policy_type == "protect": + return e.IPSEC_API_SPD_ACTION_PROTECT + elif policy_type == "bypass": + return e.IPSEC_API_SPD_ACTION_BYPASS + elif policy_type == "discard": + return e.IPSEC_API_SPD_ACTION_DISCARD + else: + raise Exception("Invalid policy type: %s", policy_type) + + def spd_add_rem_policy( + self, + spd_id, + src_if, + dst_if, + proto, + is_out, + priority, + policy_type, + remove=False, + all_ips=False, + ip_range=False, + local_ip_start=ip_address("0::0"), + local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_ip_start=ip_address("0::0"), + remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_port_start=0, + remote_port_stop=65535, + local_port_start=0, + local_port_stop=65535, + ): + spd = VppIpsecSpd(self, spd_id) + + if all_ips: + src_range_low = ip_address("0::0") + src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + dst_range_low = ip_address("0::0") + dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + + elif ip_range: + src_range_low = local_ip_start + src_range_high = local_ip_stop + dst_range_low = remote_ip_start + dst_range_high = remote_ip_stop + + else: + src_range_low = src_if.remote_ip6 + src_range_high = src_if.remote_ip6 + dst_range_low = dst_if.remote_ip6 + dst_range_high = dst_if.remote_ip6 + + spdEntry = VppIpsecSpdEntry( + self, + spd, + 0, + src_range_low, + src_range_high, + dst_range_low, + dst_range_high, + proto, + priority=priority, + policy=self.get_policy(policy_type), + is_outbound=is_out, + remote_port_start=remote_port_start, + remote_port_stop=remote_port_stop, + local_port_start=local_port_start, + local_port_stop=local_port_stop, + ) + + if remove is False: + spdEntry.add_vpp_config() + self.spd_policies.append(spdEntry) + else: + spdEntry.remove_vpp_config() + self.spd_policies.remove(spdEntry) + self.logger.info(self.vapi.ppcli("show ipsec all")) + return spdEntry + + def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + packets = [] + for i in range(pkt_count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IPv6] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except Exception as e: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def verify_policy_match(self, pkt_count, spdEntry): + self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats())) + matched_pkts = spdEntry.get_stats().get("packets") + self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) + self.assert_equal(pkt_count, matched_pkts) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |