#!/usr/bin/env python3 import unittest from random import shuffle, choice, randrange from framework import VppTestCase, VppTestRunner import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, GRE from scapy.layers.inet import IP, UDP, ICMP, icmptypes from scapy.layers.inet6 import HBHOptUnknown, ICMPv6ParamProblem,\ ICMPv6TimeExceeded, IPv6, IPv6ExtHdrFragment,\ IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, PadN, ICMPv6EchoRequest,\ ICMPv6EchoReply from framework import VppTestCase, VppTestRunner from util import ppp, ppc, fragment_rfc791, fragment_rfc8200 from vpp_gre_interface import VppGreInterface from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto from vpp_papi import VppEnum # 35 is enough to have >257 400-byte fragments test_packet_count = 35 class TestIPv4Reassembly(VppTestCase): """ IPv4 Reassembly """ @classmethod def setUpClass(cls): super().setUpClass() cls.create_pg_interfaces([0, 1]) cls.src_if = cls.pg0 cls.dst_if = cls.pg1 # setup all interfaces for i in cls.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() # packet sizes cls.packet_sizes = [64, 512, 1518, 9018] cls.padding = " abcdefghijklmn" cls.create_stream(cls.packet_sizes) cls.create_fragments() @classmethod def tearDownClass(cls): super().tearDownClass() def setUp(self): """ Test setup - force timeout on existing reassemblies """ super().setUp() self.vapi.ip_reassembly_enable_disable( sw_if_index=self.src_if.sw_if_index, enable_ip4=True) self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10) self.virtual_sleep(.25) self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10000) def tearDown(self): self.vapi.ip_reassembly_enable_disable( sw_if_index=self.src_if.sw_if_index, enable_ip4=False) super().tearDown() def show_commands_at_teardown(self): self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) @classmethod def create_stream(cls, packet_sizes, packet_count=test_packet_count): """Create input packet stream :param list packet_sizes: Required packet sizes. """ for i in range(0, packet_count): info = cls.create_packet_info(cls.src_if, cls.src_if) payload = cls.info_to_payload(info) p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) / IP(id=info.index, src=cls.src_if.remote_ip4, dst=cls.dst_if.remote_ip4) / UDP(sport=1234, dport=5678) / Raw(payload)) size = packet_sizes[(i // 2) % len(packet_sizes)] cls.extend_packet(p, size, cls.padding) info.data = p @classmethod def create_fragments(cls): infos = cls._packet_infos cls.pkt_infos = [] for index, info in infos.items(): p = info.data # cls.logger.debug(ppp("Packet:", # p.__class__(scapy.compat.raw(p)))) fragments_400 = fragment_rfc791(p, 400) fragments_300 = fragment_rfc791(p, 300) fragments_200 = [ x for f in fragments_400 for x in fragment_rfc791(f, 200)] cls.pkt_infos.append( (index, fragments_400, fragments_300, fragments_200)) cls.fragments_400 = [ x for (_, frags, _, _) in cls.pkt_infos for x in frags] cls.fragments_300 = [ x for (_, _, frags, _) in cls.pkt_infos for x in frags] cls.fragments_200 = [ x for (_, _, _, frags) in cls.pkt_infos for x in frags] cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " "%s 300-byte fragments and %s 200-byte fragments" % (len(infos), len(cls.fragments_400), len(cls.fragments_300), len(cls.fragments_200))) def verify_capture(self, capture, dropped_packet_indexes=[]): """Verify captured packet stream. :param list capture: Captured packet stream. """ info = None seen = set() for packet in capture: try: self.logger.debug(ppp("Got packet:", packet)) ip = packet[IP] udp = packet[UDP] payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertTrue( packet_index not in dropped_packet_indexes, ppp("Packet received, but should be dropped:", packet)) if packet_index in seen: raise Exception(ppp("Duplicate packet received", packet)) seen.add(packet_index) self.assertEqual(payload_info.dst, self.src_if.sw_if_index) info = self._packet_infos[packet_index] self.assertTrue(info is not None) self.assertEqual(packet_index, info.index) saved_packet = info.data self.assertEqual(ip.src, saved_packet[IP].src) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(udp.payload, saved_packet[UDP].payload) except Exception: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for index in self._packet_infos: self.assertTrue(index in seen or index in dropped_packet_indexes, "Packet with packet_index %d not received" % index) def test_reassembly(self): """ basic reassembly """ self.pg_enable_capture() self.src_if.add_stream(self.fragments_200) self.pg_start() packets = self.dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) self.src_if.assert_nothing_captured() # run it all again to verify correctness self.pg_enable_capture() self.src_if.add_stream(self.fragments_200) self.pg_start() packets = self.dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) self.src_if.assert_nothing_captured() def test_verify_clear_trace_mid_reassembly(self): """ verify clear trace works mid-reassembly """ self.pg_enable_capture() self.src_if.add_stream(self.fragments_200[0:-1]) self.pg_start() self.logger.debug(self.vapi.cli("show trace")) self.vapi.cli("clear trace") self.src_if.add_stream(self.fragments_200[-1]) self.pg_start() packets = self.dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) def test_reversed(self): """ reverse order reassembly """ fragments = list(self.fragments_200) fragments.reverse() self.pg_enable_capture() self.src_if.add_stream(fragments) self.pg_start() packets = self.dst_if.get_capture(len(self.packet_infos)) self.verify_capture(packets) self.src_if.assert_nothing_captured() # run it all again to verify correctness self.pg_enable_capture() self.src_if.add_stream(fragments) self.pg_start() packets = self.dst_if.get_capture(len(self.packet_infos)) self.verify_capture(packets) self.src_if.assert_nothing_captured() def test_long_fragment_chain(self): """ long fragment chain """ error_cnt_str = \ "/err/ip4-full-reassembly-feature/fragment chain too long (drop)" error_cnt = self.statistics.get_err_counter(error_cnt_str) self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000, max_reassembly_length=3, expire_walk_interval_ms=50) p1 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=1000, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / UDP(sport=1234, dport=5678) / Raw(b"X" * 1000)) p2 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=1001, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / UDP(sport=1234, dport=5678) / Raw(b"X" * 1000)) frags = fragment_rfc791(p1, 200) + fragment_rfc791(p2, 500) self.pg_enable_capture() self.src_if.add_stream(frags) self.pg_start() self.dst_if.get_capture(1) self.assert_error_counter_equal(error_cnt_str, error_cnt + 1) def test_5737(self): """ fragment length + ip header size > 65535 """ self.vapi.cli("clear errors") raw = b'''E\x00\x00\x88,\xf8\x1f\xfe@\x01\x98\x00\xc0\xa8\n-\xc0\xa8\n\ \x01\x08\x00\xf0J\xed\xcb\xf1\xf5Test-group: IPv4.IPv4.ipv4-message.\ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737''' malformed_packet = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(raw)) p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=1000, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / UDP(sport=1234, dport=5678) / Raw(b"X" * 1000)) valid_fragments = fragment_rfc791(p, 400) counter = "/err/ip4-full-reassembly-feature/malformed packets" error_counter = self.statistics.get_err_counter(counter) self.pg_enable_capture() self.src_if.add_stream([malformed_packet] + valid_fragments) self.pg_start() self.dst_if.get_capture(1) self.logger.debug(self.vapi.ppcli("show error")) self.assertEqual(self.statistics.get_err_counter(counter), error_counter + 1) def test_44924(self): """ compress tiny fragments """ packets = [(Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=24339, flags="MF", frag=0, ttl=64, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) / Raw(load='Test-group: IPv4')), (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=24339, flags="MF", frag=3, ttl=64, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) / Raw(load='.IPv4.Fragmentation.vali')), (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=24339, frag=6, ttl=64, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) / Raw(load='d; Test-case: 44924')) ] self.pg_enable_capture() self.src_if.add_stream(packets) self.pg_start() self.dst_if.get_capture(1) def test_frag_1(self): """ fragment of size 1 """ self.vapi.cli("clear errors") malformed_packets = [(Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(id=7, len=21, flags="MF", frag=0, ttl=64, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / ICMP(type="echo-request")),
#!/usr/bin/env python

import socket

from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from util import ppp, ppc
from template_ipsec import TemplateIpsec


class IPSecNATTestCase(TemplateIpsec):
    """ IPSec/NAT
    TUNNEL MODE:


     public network  |   private network
     ---   encrypt  ---   plain   ---
    |pg0| <------- |VPP| <------ |pg1|
     ---            ---           ---

     ---   decrypt  ---   plain   ---
    |pg0| -------> |VPP| ------> |pg1|
     ---            ---           ---
    """

    tcp_port_in = 6303
    tcp_port_out = 6303
    udp_port_in = 6304
    udp_port_out = 6304
    icmp_id_in = 6305
    icmp_id_out = 6305

    @classmethod
    def setUpClass(cls):
        super(IPSecNATTestCase, cls).setUpClass()
        cls.tun_if = cls.pg0
        cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
        cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
                                             cls.tun_if.sw_if_index)
        p = cls.ipv4_params
        cls.config_esp_tun(p)
        cls.logger.info(cls.vapi.ppcli("show ipsec"))
        src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
        cls.vapi.ip_add_del_route(src, p.addr_len,
                                  cls.tun_if.remote_addr_n[p.addr_type],
                                  is_ipv6=p.is_ipv6)

    def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
        return [
            # TCP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            TCP(sport=self.tcp_port_in, dport=20),
            # UDP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            UDP(sport=self.udp_port_in, dport=20),
            # ICMP
            Ether(src=src_mac, dst=dst_mac) /
            IP(src=src_ip, dst=dst_ip) /
            ICMP(id=self.icmp_id_in, type='echo-request')
        ]

    def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
        return [
            # TCP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       TCP(dport=self.tcp_port_out, sport=20)),
            # UDP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       UDP(dport=self.udp_port_out, sport=20)),
            # ICMP
            Ether(src=src_mac, dst=dst_mac) /
            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
                       ICMP(id=self.icmp_id_out, type='echo-request'))
        ]

    def verify_capture_plain(self, capture):
        for packet in capture:
            try:
                self.assert_packet_checksums_valid(packet)
                self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
                                  "decrypted packet source address")
                self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                  "decrypted packet destination address")
                if packet.haslayer(TCP):
                    self.assertFalse(
                        packet.haslayer(UDP),
                        "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[TCP].dport, self.tcp_port_in,
                                      "decrypted packet TCP destination port")
                elif packet.haslayer(UDP):
                    if packet[UDP].payload:
                        self.assertFalse(
                            packet[UDP][1].haslayer(UDP),
                            "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[UDP].dport, self.udp_port_in,
                                      "decrypted packet UDP destination port")
                else:
                    self.assertFalse(
                        packet.haslayer(UDP),
                        "unexpected UDP header in decrypted packet")
                    self.assert_equal(packet[ICMP].id, self.icmp_id_in,
                                      "decrypted packet ICMP ID")
            except Exception:
                self.logger.error(
                    ppp("Unexpected or invalid plain packet:", packet))
                raise

    def verify_capture_encrypted(self, capture, sa):
        for packet in capture:
            try:
                copy = packet.__class__(str(packet))
                del copy[UDP].len
                copy = packet.__class__(str(copy))
                self.assert_equal(packet[UDP].len, copy[UDP].len,
                                  "UDP header length")
                self.assert_packet_checksums_valid(packet)
                self.assertIn(ESP, packet[IP])
                decrypt_pkt = sa.decrypt(packet[IP])
                self.assert_packet_checksums_valid(decrypt_pkt)
                self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                  "encrypted packet source address")
                self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
                                  "encrypted packet destination address")
            except Exception:
                self.logger.error(
                    ppp("Unexpected or invalid encrypted packet:", packet))
                raise

    @classmethod
    def config_esp_tun(cls, params):
        addr_type = params.addr_type
        scapy_tun_sa_id = params.scapy_tun_sa_id
        scapy_tun_spi = params.scapy_tun_spi
        vpp_tun_sa_id = params.vpp_tun_sa_id
        vpp_tun_spi = params.vpp_tun_spi
        auth_algo_vpp_id = params.auth_algo_vpp_id
        auth_key = params.auth_key
        crypt_algo_vpp_id = params.crypt_algo_vpp_id
        crypt_key = params.crypt_key
        addr_any = params.addr_any
        addr_bcast = params.addr_bcast
        cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
                                         auth_algo_vpp_id, auth_key,
                                         crypt_algo_vpp_id, crypt_key,
                                         cls.vpp_esp_protocol,
                                         cls.pg1.remote_addr_n[addr_type],
                                         cls.tun_if.remote_addr_n[addr_type],
                                         udp_encap=1)
        cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
                                         auth_algo_vpp_id, auth_key,
                                         crypt_algo_vpp_id, crypt_key,
                                         cls.vpp_esp_protocol,
                                         cls.tun_if.remote_addr_n[addr_type],
                                         cls.pg1.remote_addr_n[addr_type],
                                         udp_encap=1)
        l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
        l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
                                         l_startaddr, l_stopaddr, r_startaddr,
                                         r_stopaddr,
                                         protocol=socket.IPPROTO_ESP)
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
                                         l_startaddr, l_stopaddr, r_startaddr,
                                         r_stopaddr, is_outbound=0,
                                         protocol=socket.IPPROTO_ESP)
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
                                         l_startaddr, l_stopaddr, r_startaddr,
                                         r_stopaddr, remote_port_start=4500,
                                         remote_port_stop=4500,
                                         protocol=socket.IPPROTO_UDP)
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
                                         l_startaddr, l_stopaddr, r_startaddr,
                                         r_stopaddr, remote_port_start=4500,
                                         remote_port_stop=4500,
                                         protocol=socket.IPPROTO_UDP,
                                         is_outbound=0)
        l_startaddr = l_stopaddr = cls.tun_if.remote_addr_n[addr_type]
        r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type]
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
                                         l_startaddr, l_stopaddr, r_startaddr,
                                         r_stopaddr, priority=10, policy=3,
                                         is_outbound=0)
        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
                                         r_startaddr, r_stopaddr, l_startaddr,
                                         l_stopaddr, priority=10, policy=3)

    def test_ipsec_nat_tun(self):
        """ IPSec/NAT tunnel test case """
        p = self.ipv4_params
        scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi,
                                           crypt_algo=p.crypt_algo,
                                           crypt_key=p.crypt_key,
                                           auth_algo=p.auth_algo,
                                           auth_key=p.auth_key,
                                           tunnel_header=IP(
                                               src=self.pg1.remote_ip4,
                                               dst=self.tun_if.remote_ip4),
                                           nat_t_header=UDP(
                                               sport=4500,
                                               dport=4500))
        # in2out - from private network to public
        pkts = self.create_stream_plain(
            self.pg1.remote_mac, self.pg1.local_mac,
            self.pg1.remote_ip4, self.tun_if.remote_ip4)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.tun_if.get_capture(len(pkts))
        self.verify_capture_encrypted(capture, scapy_tun_sa)

        vpp_tun_sa = SecurityAssociation(ESP,
                                         spi=p.vpp_tun_spi,
                                         crypt_algo=p.crypt_algo,
                                         crypt_key=p.crypt_key,
                                         auth_algo=p.auth_algo,
                                         auth_key=p.auth_key,
                                         tunnel_header=IP(
                                             src=self.tun_if.remote_ip4,
                                             dst=self.pg1.remote_ip4),
                                         nat_t_header=UDP(
                                             sport=4500,
                                             dport=4500))

        # out2in - from public network to private
        pkts = self.create_stream_encrypted(
            self.tun_if.remote_mac, self.tun_if.local_mac,
            self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
        self.logger.info(ppc("Sending packets:", pkts))
        self.tun_if.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_plain(capture)
ture() self.send_packets(first_packets) self.send_packets(second_packets) self.send_packets(rest_of_packets) packets = self.dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) for send_if in self.send_ifs: send_if.assert_nothing_captured() self.logger.debug(self.vapi.ppcli("show trace")) self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) self.vapi.cli("clear trace") self.pg_enable_capture() self.send_packets(first_packets) self.send_packets(second_packets) self.send_packets(rest_of_packets) packets = self.dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) for send_if in self.send_ifs: send_if.assert_nothing_captured() class TestIPv6SVReassembly(VppTestCase): """ IPv6 Shallow Virtual Reassembly """ @classmethod def setUpClass(cls): super().setUpClass() cls.create_pg_interfaces([0, 1]) cls.src_if = cls.pg0 cls.dst_if = cls.pg1 # setup all interfaces for i in cls.pg_interfaces: i.admin_up() i.config_ip6() i.resolve_ndp() def setUp(self): """ Test setup - force timeout on existing reassemblies """ super().setUp() self.vapi.ip_reassembly_enable_disable( sw_if_index=self.src_if.sw_if_index, enable_ip6=True, type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL) self.vapi.ip_reassembly_set( timeout_ms=0, max_reassemblies=1000, max_reassembly_length=1000, type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL, expire_walk_interval_ms=10, is_ip6=1) self.virtual_sleep(.25) self.vapi.ip_reassembly_set( timeout_ms=1000000, max_reassemblies=1000, max_reassembly_length=1000, type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL, expire_walk_interval_ms=10000, is_ip6=1) def tearDown(self): super().tearDown() self.logger.debug(self.vapi.ppcli("show ip6-sv-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) def test_basic(self): """ basic reassembly """ payload_len = 1000 payload = "" counter = 0 while len(payload) < payload_len: payload += "%u " % counter counter += 1 p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / UDP(sport=1234, dport=5678) / Raw(payload)) fragments = fragment_rfc8200(p, 1, payload_len/4) # send fragment #2 - should be cached inside reassembly self.pg_enable_capture() self.src_if.add_stream(fragments[1]) self.pg_start() self.logger.debug(self.vapi.ppcli("show ip6-sv-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) self.logger.debug(self.vapi.ppcli("show trace")) self.dst_if.assert_nothing_captured() # send fragment #1 - reassembly is finished now and both fragments # forwarded self.pg_enable_capture() self.src_if.add_stream(fragments[0]) self.pg_start() self.logger.debug(self.vapi.ppcli("show ip6-sv-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) self.logger.debug(self.vapi.ppcli("show trace")) c = self.dst_if.get_capture(2) for sent, recvd in zip([fragments[1], fragments[0]], c): self.assertEqual(sent[IPv6].src, recvd[IPv6].src) self.assertEqual(sent[IPv6].dst, recvd[IPv6].dst) self.assertEqual(sent[Raw].payload, recvd[Raw].payload) # send rest of fragments - should be immediately forwarded self.pg_enable_capture() self.src_if.add_stream(fragments[2:]) self.pg_start() c = self.dst_if.get_capture(len(fragments[2:])) for sent, recvd in zip(fragments[2:], c): self.assertEqual(sent[IPv6].src, recvd[IPv6].src) self.assertEqual(sent[IPv6].dst, recvd[IPv6].dst) self.assertEqual(sent[Raw].payload, recvd[Raw].payload) def test_verify_clear_trace_mid_reassembly(self): """ verify clear trace works mid-reassembly """ payload_len = 1000 payload = "" counter = 0 while len(payload) < payload_len: payload += "%u " % counter counter += 1 p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / UDP(sport=1234, dport=5678) / Raw(payload)) fragments = fragment_rfc8200(p, 1, payload_len/4) self.pg_enable_capture() self.src_if.add_stream(fragments[1]) self.pg_start() self.logger.debug(self.vapi.cli("show trace")) self.vapi.cli("clear trace") self.pg_enable_capture() self.src_if.add_stream(fragments[0]) self.pg_start() self.dst_if.get_capture(2) self.logger.debug(self.vapi.cli("show trace")) self.vapi.cli("clear trace") self.pg_enable_capture() self.src_if.add_stream(fragments[2:]) self.pg_start() self.dst_if.get_capture(len(fragments[2:])) def test_timeout(self): """ reassembly timeout """ payload_len = 1000 payload = "" counter = 0 while len(payload) < payload_len: payload += "%u " % counter counter += 1 p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / UDP(sport=1234, dport=5678) / Raw(payload)) fragments = fragment_rfc8200(p, 1, payload_len/4) self.vapi.ip_reassembly_set( timeout_ms=100, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=50, is_ip6=1, type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL) # send fragments #2 and #1 - should be forwarded self.pg_enable_capture() self.src_if.add_stream(fragments[0:2]) self.pg_start() self.logger.debug(self.vapi.ppcli("show ip4-sv-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) self.logger.debug(self.vapi.ppcli("show trace")) c = self.dst_if.get_capture(2) for sent, recvd in zip([fragments[1], fragments[0]], c): self.assertEqual(sent[IPv6].src, recvd[IPv6].src) self.assertEqual(sent[IPv6].dst, recvd[IPv6].dst) self.assertEqual(sent[Raw].payload, recvd[Raw].payload) # wait for cleanup self.virtual_sleep(.25, "wait before sending rest of fragments") # send rest of fragments - shouldn't be forwarded self.pg_enable_capture() self.src_if.add_stream(fragments[2:]) self.pg_start() self.dst_if.assert_nothing_captured() def test_lru(self): """ reassembly reuses LRU element """ self.vapi.ip_reassembly_set( timeout_ms=1000000, max_reassemblies=1, max_reassembly_length=1000, type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL, is_ip6=1, expire_walk_interval_ms=10000) payload_len = 1000 payload = "" counter = 0 while len(payload) < payload_len: payload += "%u " % counter counter += 1 packet_count = 10 fragments = [f for i in range(packet_count) for p in (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / UDP(sport=1234, dport=5678) / Raw(payload)) for f in fragment_rfc8200(p, i, payload_len/4)] self.pg_enable_capture() self.src_if.add_stream(fragments) self.pg_start() c = self.dst_if.get_capture(len(fragments)) for sent, recvd in zip(fragments, c): self.assertEqual(sent[IPv6].src, recvd[IPv6].src) self.assertEqual(sent[IPv6].dst, recvd[IPv6].dst) self.assertEqual(sent[Raw].payload, recvd[Raw].payload) def test_one_fragment(self): """ whole packet in one fragment processed independently """ pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / ICMPv6EchoRequest()/Raw('X' * 1600)) frags = fragment_rfc8200(pkt, 1, 400) # send a fragment with known id self.send_and_expect(self.src_if, [frags[0]], self.dst_if) # send an atomic fragment with same id - should be reassembled pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest()) rx = self.send_and_expect(self.src_if, [pkt], self.dst_if) # now forward packets matching original reassembly, should still work rx = self.send_and_expect(self.src_if, frags[1:], self.dst_if) def test_bunch_of_fragments(self): """ valid fragments followed by rogue fragments and atomic fragment""" pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / ICMPv6EchoRequest()/Raw('X' * 1600)) frags = fragment_rfc8200(pkt, 1, 400) rx = self.send_and_expect(self.src_if, frags, self.dst_if) rogue = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / IPv6ExtHdrFragment(id=1, nh=58, offset=608)/Raw('X'*308)) self.send_and_expect(self.src_if, rogue*604, self.dst_if) pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest()) rx = self.send_and_expect(self.src_if, [pkt], self.dst_if) def test_truncated_fragment(self): """ truncated fragment """ pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2) / IPv6ExtHdrFragment(nh=6)) self.send_and_assert_no_replies(self.pg0, [pkt], self.pg0) class TestIPv4ReassemblyLocalNode(VppTestCase): """ IPv4 Reassembly for packets coming to ip4-local node """ @classmethod def setUpClass(cls): super().setUpClass() cls.create_pg_interfaces([0]) cls.src_dst_if = cls.pg0 # setup all interfaces for i in cls.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() cls.padding = " abcdefghijklmn" cls.create_stream() cls.create_fragments() @classmethod def tearDownClass(cls): super().tearDownClass() def setUp(self): """ Test setup - force timeout on existing reassemblies """ super().setUp() self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10) self.virtual_sleep(.25) self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10000) def tearDown(self): super().tearDown() def show_commands_at_teardown(self): self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) @classmethod def create_stream(cls, packet_count=test_packet_count): """Create input packet stream for defined interface. :param list packet_sizes: Required packet sizes. """ for i in range(0, packet_count): info = cls.create_packet_info(cls.src_dst_if, cls.src_dst_if) payload = cls.info_to_payload(info) p = (Ether(dst=cls.src_dst_if.local_mac, src=cls.src_dst_if.remote_mac) / IP(id=info.index, src=cls.src_dst_if.remote_ip4, dst=cls.src_dst_if.local_ip4) / ICMP(type='echo-request', id=1234) / Raw(payload)) cls.extend_packet(p, 1518, cls.padding) info.data = p @classmethod def create_fragments(cls): infos = cls._packet_infos cls.pkt_infos = [] for index, info in infos.items(): p = info.data # cls.logger.debug(ppp("Packet:", # p.__class__(scapy.compat.raw(p)))) fragments_300 = fragment_rfc791(p, 300) cls.pkt_infos.append((index, fragments_300)) cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags] cls.logger.debug("Fragmented %s packets into %s 300-byte fragments" % (len(infos), len(cls.fragments_300))) def verify_capture(self, capture): """Verify captured packet stream. :param list capture: Captured packet stream. """ info = None seen = set() for packet in capture: try: self.logger.debug(ppp("Got packet:", packet)) ip = packet[IP] icmp = packet[ICMP] payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index if packet_index in seen: raise Exception(ppp("Duplicate packet received", packet)) seen.add(packet_index) self.assertEqual(payload_info.dst, self.src_dst_if.sw_if_index) info = self._packet_infos[packet_index] self.assertIsNotNone(info) self.assertEqual(packet_index, info.index) saved_packet = info.data self.assertEqual(ip.src, saved_packet[IP].dst) self.assertEqual(ip.dst, saved_packet[IP].src) self.assertEqual(icmp.type, 0) # echo reply self.assertEqual(icmp.id, saved_packet[ICMP].id) self.assertEqual(icmp.payload, saved_packet[ICMP].payload) except Exception: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for index in self._packet_infos: self.assertIn(index, seen, "Packet with packet_index %d not received" % index) def test_reassembly(self): """ basic reassembly """ self.pg_enable_capture() self.src_dst_if.add_stream(self.fragments_300) self.pg_start() packets = self.src_dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) # run it all again to verify correctness self.pg_enable_capture() self.src_dst_if.add_stream(self.fragments_300) self.pg_start() packets = self.src_dst_if.get_capture(len(self.pkt_infos)) self.verify_capture(packets) class TestFIFReassembly(VppTestCase): """ Fragments in fragments reassembly """ @classmethod def setUpClass(cls): super().setUpClass() cls.create_pg_interfaces([0, 1]) cls.src_if = cls.pg0 cls.dst_if = cls.pg1 for i in cls.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() cls.packet_sizes = [64, 512, 1518, 9018] cls.padding = " abcdefghijklmn" @classmethod def tearDownClass(cls): super().tearDownClass() def setUp(self): """ Test setup - force timeout on existing reassemblies """ super().setUp() self.vapi.ip_reassembly_enable_disable( sw_if_index=self.src_if.sw_if_index, enable_ip4=True, enable_ip6=True) self.vapi.ip_reassembly_enable_disable( sw_if_index=self.dst_if.sw_if_index, enable_ip4=True, enable_ip6=True) self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10) self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10, is_ip6=1) self.virtual_sleep(.25) self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10000) self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10000, is_ip6=1) def tearDown(self): super().tearDown() def show_commands_at_teardown(self): self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details")) self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details")) self.logger.debug(self.vapi.ppcli("show buffers")) def verify_capture(self, capture, ip_class, dropped_packet_indexes=[]): """Verify captured packet stream. :param list capture: Captured packet stream. """ info = None seen = set() for packet in capture: try: self.logger.debug(ppp("Got packet:", packet)) ip = packet[ip_class] udp = packet[UDP] payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertTrue( packet_index not in dropped_packet_indexes, ppp("Packet received, but should be dropped:", packet)) if packet_index in seen: raise Exception(ppp("Duplicate packet received", packet)) seen.add(packet_index) self.assertEqual(payload_info.dst, self.dst_if.sw_if_index) info = self._packet_infos[packet_index] self.assertTrue(info is not None) self.assertEqual(packet_index, info.index) saved_packet = info.data self.assertEqual(ip.src, saved_packet[ip_class].src) self.assertEqual(ip.dst, saved_packet[ip_class].dst) self.assertEqual(udp.payload, saved_packet[UDP].payload) except Exception: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for index in self._packet_infos: self.assertTrue(index in seen or index in dropped_packet_indexes, "Packet with packet_index %d not received" % index) def test_fif4(self): """ Fragments in fragments (4o4) """ # TODO this should be ideally in setUpClass, but then we hit a bug # with VppIpRoute incorrectly reporting it's present when it's not # so we need to manually remove the vpp config, thus we cannot have # it shared for multiple test cases self.tun_ip4 = "1.1.1.2" self.gre4 = VppGreInterface(self, self.src_if.local_ip4, self.tun_ip4) self.gre4.add_vpp_config() self.gre4.admin_up() self.gre4.config_ip4() self.vapi.ip_reassembly_enable_disable( sw_if_index=self.gre4.sw_if_index, enable_ip4=True) self.route4 = VppIpRoute(self, self.tun_ip4, 32, [VppRoutePath(self.src_if.remote_ip4, self.src_if.sw_if_index)]) self.route4.add_vpp_config() self.reset_packet_infos() for i in range(test_packet_count): info = self.create_packet_info(self.src_if, self.dst_if) payload = self.info_to_payload(info) # Ethernet header here is only for size calculation, thus it # doesn't matter how it's initialized. This is to ensure that # reassembled packet is not > 9000 bytes, so that it's not dropped p = (Ether() / IP(id=i, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / UDP(sport=1234, dport=5678) / Raw(payload)) size = self.packet_sizes[(i // 2) % len(self.packet_sizes)] self.extend_packet(p, size, self.padding) info.data = p[IP] # use only IP part, without ethernet header fragments = [x for _, p in self._packet_infos.items() for x in fragment_rfc791(p.data, 400)] encapped_fragments = \ [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IP(src=self.tun_ip4, dst=self.src_if.local_ip4) / GRE() / p for p in fragments] fragmented_encapped_fragments = \ [x for p in encapped_fragments for x in fragment_rfc791(p, 200)] self.src_if.add_stream(fragmented_encapped_fragments) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.src_if.assert_nothing_captured() packets = self.dst_if.get_capture(len(self._packet_infos)) self.verify_capture(packets, IP) # TODO remove gre vpp config by hand until VppIpRoute gets fixed # so that it's query_vpp_config() works as it should self.gre4.remove_vpp_config() self.logger.debug(self.vapi.ppcli("show interface")) def test_fif6(self): """ Fragments in fragments (6o6) """ # TODO this should be ideally in setUpClass, but then we hit a bug # with VppIpRoute incorrectly reporting it's present when it's not # so we need to manually remove the vpp config, thus we cannot have # it shared for multiple test cases self.tun_ip6 = "1002::1" self.gre6 = VppGreInterface(self, self.src_if.local_ip6, self.tun_ip6) self.gre6.add_vpp_config() self.gre6.admin_up() self.gre6.config_ip6() self.vapi.ip_reassembly_enable_disable( sw_if_index=self.gre6.sw_if_index, enable_ip6=True) self.route6 = VppIpRoute(self, self.tun_ip6, 128, [VppRoutePath( self.src_if.remote_ip6, self.src_if.sw_if_index)]) self.route6.add_vpp_config() self.reset_packet_infos() for i in range(test_packet_count): info = self.create_packet_info(self.src_if, self.dst_if) payload = self.info_to_payload(info) # Ethernet header here is only for size calculation, thus it # doesn't matter how it's initialized. This is to ensure that # reassembled packet is not > 9000 bytes, so that it's not dropped p = (Ether() / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) / UDP(sport=1234, dport=5678) / Raw(payload)) size = self.packet_sizes[(i // 2) % len(self.packet_sizes)] self.extend_packet(p, size, self.padding) info.data = p[IPv6] # use only IPv6 part, without ethernet header fragments = [x for _, i in self._packet_infos.items() for x in fragment_rfc8200( i.data, i.index, 400)] encapped_fragments = \ [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / IPv6(src=self.tun_ip6, dst=self.src_if.local_ip6) / GRE() / p for p in fragments] fragmented_encapped_fragments = \ [x for p in encapped_fragments for x in ( fragment_rfc8200( p, 2 * len(self._packet_infos) + p[IPv6ExtHdrFragment].id, 200) if IPv6ExtHdrFragment in p else [p] ) ] self.src_if.add_stream(fragmented_encapped_fragments) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.src_if.assert_nothing_captured() packets = self.dst_if.get_capture(len(self._packet_infos)) self.verify_capture(packets, IPv6) # TODO remove gre vpp config by hand until VppIpRoute gets fixed # so that it's query_vpp_config() works as it should self.gre6.remove_vpp_config() if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)