diff options
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 1b1c9aaa25d..2295b7579c5 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -2088,5 +2088,200 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd): return False +class IPSecIPv6Fwd(VppTestCase): + """Test IPSec by capturing and verifying IPv6 forwarded pkts""" + + @classmethod + def setUpConstants(cls): + super(IPSecIPv6Fwd, cls).setUpConstants() + + def setUp(self): + super(IPSecIPv6Fwd, self).setUp() + # store SPD objects so we can remove configs on tear down + self.spd_objs = [] + self.spd_policies = [] + + def tearDown(self): + # remove SPD policies + for obj in self.spd_policies: + obj.remove_vpp_config() + self.spd_policies = [] + # remove SPD items (interface bindings first, then SPD) + for obj in reversed(self.spd_objs): + obj.remove_vpp_config() + self.spd_objs = [] + # close down pg intfs + for pg in self.pg_interfaces: + pg.unconfig_ip6() + pg.admin_down() + super(IPSecIPv6Fwd, self).tearDown() + + def create_interfaces(self, num_ifs=2): + # create interfaces pg0 ... pg<num_ifs> + self.create_pg_interfaces(range(num_ifs)) + for pg in self.pg_interfaces: + # put the interface up + pg.admin_up() + # configure IPv6 address on the interface + pg.config_ip6() + pg.resolve_ndp() + self.logger.info(self.vapi.ppcli("show int addr")) + + def spd_create_and_intf_add(self, spd_id, pg_list): + spd = VppIpsecSpd(self, spd_id) + spd.add_vpp_config() + self.spd_objs.append(spd) + for pg in pg_list: + spdItf = VppIpsecSpdItfBinding(self, spd, pg) + spdItf.add_vpp_config() + self.spd_objs.append(spdItf) + + def get_policy(self, policy_type): + e = VppEnum.vl_api_ipsec_spd_action_t + if policy_type == "protect": + return e.IPSEC_API_SPD_ACTION_PROTECT + elif policy_type == "bypass": + return e.IPSEC_API_SPD_ACTION_BYPASS + elif policy_type == "discard": + return e.IPSEC_API_SPD_ACTION_DISCARD + else: + raise Exception("Invalid policy type: %s", policy_type) + + def spd_add_rem_policy( + self, + spd_id, + src_if, + dst_if, + proto, + is_out, + priority, + policy_type, + remove=False, + all_ips=False, + ip_range=False, + local_ip_start=ip_address("0::0"), + local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_ip_start=ip_address("0::0"), + remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_port_start=0, + remote_port_stop=65535, + local_port_start=0, + local_port_stop=65535, + ): + spd = VppIpsecSpd(self, spd_id) + + if all_ips: + src_range_low = ip_address("0::0") + src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + dst_range_low = ip_address("0::0") + dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + + elif ip_range: + src_range_low = local_ip_start + src_range_high = local_ip_stop + dst_range_low = remote_ip_start + dst_range_high = remote_ip_stop + + else: + src_range_low = src_if.remote_ip6 + src_range_high = src_if.remote_ip6 + dst_range_low = dst_if.remote_ip6 + dst_range_high = dst_if.remote_ip6 + + spdEntry = VppIpsecSpdEntry( + self, + spd, + 0, + src_range_low, + src_range_high, + dst_range_low, + dst_range_high, + proto, + priority=priority, + policy=self.get_policy(policy_type), + is_outbound=is_out, + remote_port_start=remote_port_start, + remote_port_stop=remote_port_stop, + local_port_start=local_port_start, + local_port_stop=local_port_stop, + ) + + if remove is False: + spdEntry.add_vpp_config() + self.spd_policies.append(spdEntry) + else: + spdEntry.remove_vpp_config() + self.spd_policies.remove(spdEntry) + self.logger.info(self.vapi.ppcli("show ipsec all")) + return spdEntry + + def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + packets = [] + for i in range(pkt_count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IPv6] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except Exception as e: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def verify_policy_match(self, pkt_count, spdEntry): + self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats())) + matched_pkts = spdEntry.get_stats().get("packets") + self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) + self.assert_equal(pkt_count, matched_pkts) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |