#!/usr/bin/env python3 """IP{4,6} over IP{v,6} tunnel functional tests""" import unittest from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment, Raw from scapy.all import fragment, fragment6, RandShort, defragment6 from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto from vpp_ipip_tun_interface import VppIpIpTunInterface from vpp_teib import VppTeib from vpp_papi import VppEnum from socket import AF_INET, AF_INET6, inet_pton from util import reassemble4 """ Testipip is a subclass of VPPTestCase classes. IPIP tests. """ def ipip_add_tunnel(test, src, dst, table_id=0, dscp=0x0, flags=0): """ Add a IPIP tunnel """ return test.vapi.ipip_add_tunnel( tunnel={ 'src': src, 'dst': dst, 'table_id': table_id, 'instance': 0xffffffff, 'dscp': dscp, 'flags': flags } ) # the number of packets to send when injecting traffic. # a multiple of 8 minus one, so we test all by 8/4/2/1 loops N_PACKETS = 64 - 1 class TestIPIP(VppTestCase): """ IPIP Test Case """ @classmethod def setUpClass(cls): super(TestIPIP, cls).setUpClass() cls.create_pg_interfaces(range(2)) cls.interfaces = list(cls.pg_interfaces) @classmethod def tearDownClass(cls): super(TestIPIP, cls).tearDownClass() def setUp(self): super(TestIPIP, self).setUp() for i in self.interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.disable_ipv6_ra() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(TestIPIP, self).tearDown() if not self.vpp_dead: for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def validate(self, rx, expected): self.assertEqual(rx, expected.__class__(expected)) def generate_ip4_frags(self, payload_length, fragment_size): p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length) p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4) outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4, id=RandShort(), dst=self.pg0.local_ip4) / p_ip4 / p_payload) frags = fragment(outer_ip4, fragment_size) p4_reply = (p_ip4 / p_payload) p4_reply.ttl -= 1 return frags, p4_reply def verify_ip4ip4_encaps(self, a, p_ip4s, p_ip4_encaps): for i, p_ip4 in enumerate(p_ip4s): p_ip4.dst = a p4 = (self.p_ether / p_ip4 / self.p_payload) p_ip4_inner = p_ip4 p_ip4_inner.ttl -= 1 p4_reply = (p_ip4_encaps[i] / p_ip4_inner / self.p_payload) p4_reply.ttl -= 1 p4_reply.id = 0 rx = self.send_and_expect(self.pg0, p4 * N_PACKETS, self.pg1) for p in rx: self.validate(p[1], p4_reply) self.assert_packet_checksums_valid(p) def verify_ip6ip4_encaps(self, a, p_ip6s, p_ip4_encaps): for i, p_ip6 in enumerate(p_ip6s): p_ip6.dst = a p6 = (self.p_ether / p_ip6 / self.p_payload) p_inner_ip6 = p_ip6 p_inner_ip6.hlim -= 1 p6_reply = (p_ip4_encaps[i] / p_inner_ip6 / self.p_payload) p6_reply.ttl -= 1 rx = self.send_and_expect(self.pg0, p6 * N_PACKETS, self.pg1) for p in rx: self.validate(p[1], p6_reply) self.assert_packet_checksums_valid(p) def test_ipip4(self): """ ip{v4,v6} over ip4 test """ self.pg1.generate_remote_hosts(5) self.pg1.configure_ipv4_neighbors() e = VppEnum.vl_api_tunnel_encap_decap_flags_t d = VppEnum.vl_api_ip_dscp_t self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100) # create a TOS byte by shifting a DSCP code point 2 bits. those 2 bits # are for the ECN. dscp = d.IP_API_DSCP_AF31 << 2 ecn = 3 dscp_ecn = d.IP_API_DSCP_AF31 << 2 | ecn # IPv4 transport that copies the DCSP from the payload tun_dscp = VppIpIpTunInterface( self, self.pg0, self.pg0.local_ip4, self.pg1.remote_hosts[0].ip4, flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP) tun_dscp.add_vpp_config() # IPv4 transport that copies the DCSP and ECN from the payload tun_dscp_ecn = VppIpIpTunInterface( self, self.pg0, self.pg0.local_ip4, self.pg1.remote_hosts[1].ip4, flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP | e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)) tun_dscp_ecn.add_vpp_config() # IPv4 transport that copies the ECN from the payload and sets the # DF bit on encap. copies the ECN on decap tun_ecn = VppIpIpTunInterface( self, self.pg0, self.pg0.local_ip4, self.pg1.remote_hosts[2].ip4, flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN | e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF | e.TUNNEL_API_ENCAP_DECAP_FLAG_DECAP_COPY_ECN)) tun_ecn.add_vpp_config() # IPv4 transport that sets a fixed DSCP in the encap and copies # the DF bit tun = VppIpIpTunInterface( self, self.pg0, self.pg0.local_ip4, self.pg1.remote_hosts[3].ip4, dscp=d.IP_API_DSCP_AF11, flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF) tun.add_vpp_config() # array of all the tunnels tuns = [tun_dscp, tun_dscp_ecn, tun_ecn, tun] # addresses for prefixes routed via each tunnel a4s = ["" for i in range(len(tuns))] a6s = ["" for i in range(len(tuns))] # IP headers with each combination of DSCp/ECN tested p_ip6s = [IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp), IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp_ecn), IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=ecn), IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=0xff)] p_ip4s = [IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp, flags='DF'), IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp_ecn), IP(src="1.2.3.4", dst="130.67.0.1", tos=ecn), IP(src="1.2.3.4", dst="130.67.0.1", tos=0xff)] # Configure each tunnel for i, t in enumerate(tuns): # Set interface up and enable IP on it self.vapi.sw_interface_set_flags(t.sw_if_index, 1) self.vapi.sw_interface_set_unnumbered( sw_if_index=self.pg0.sw_if_index, unnumbered_sw_if_index=t.sw_if_index) # prefix for route / destination address for packets a4s[i] = "130.67.%d.0" % i a6s[i] = "dead:%d::" % i # Add IPv4 and IPv6 routes via tunnel interface ip4_via_tunnel = VppIpRoute( self, a4s[i], 24, [VppRoutePath("0.0.0.0", t.sw_if_index, proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)]) ip4_via_tunnel.add_vpp_config() ip6_via_tunnel = VppIpRoute( self, a6s[i], 64, [VppRoutePath("::", t.sw_if_index, proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)]) ip6_via_tunnel.add_vpp_config() # # Encapsulation # # tun_dscp copies only the dscp # expected TC values are thus only the DCSP value is present from the # inner exp_tcs = [dscp, dscp, 0, 0xfc] p_ip44_encaps = [IP(src=self.pg0.local_ip4, dst=tun_dscp.dst, tos=tc) for tc in exp_tcs] p_ip64_encaps = [IP(src=self.pg0.local_ip4, dst=tun_dscp.dst, proto='ipv6', id=0, tos=tc) for tc in exp_tcs] # IPv4 in to IPv4 tunnel self.verify_ip4ip4_encaps(a4s[0], p_ip4s, p_ip44_encaps) # IPv6 in to IPv4 tunnel self.verify_ip6ip4_encaps(a6s[0], p_ip6s, p_ip64_encaps) # tun_dscp_ecn copies the dscp and the ecn exp_tcs = [dscp, dscp_ecn, ecn, 0xff] p_ip44_encaps = [IP(src=self.pg0.local_ip4, dst=tun_dscp_ecn.dst, tos=tc) for tc in exp_tcs] p_ip64_encaps = [IP(src=self.pg0.local_ip4, dst=tun_dscp_ecn.dst, proto='ipv6', id=0, tos=tc) for tc in exp_tcs] self.verify_ip4ip4_encaps(a4s[1], p_ip4s, p_ip44_encaps) self.verify_ip6ip4_encaps(a6s[1], p_ip6s, p_ip64_encaps) # tun_ecn copies only the ecn and always sets DF exp_tcs = [0, ecn, ecn, ecn] p_ip44_encaps = [IP(src=self.pg0.local_ip4, dst=tun_ecn.dst, flags='DF', tos=tc) for tc in exp_tcs] p_ip64_encaps = [IP(src=self.pg0.local_ip4, dst=tun_ecn.dst, flags='DF', proto='ipv6', id=0, tos=tc) for tc in exp_tcs] self.verify_ip4ip4_encaps(a4s[2], p_ip4s, p_ip44_encaps) self.verify_ip6ip4_encaps(a6s[2], p_ip6s, p_ip64_encaps) # tun sets a fixed dscp and copies DF fixed_dscp = tun.dscp << 2 flags = ['DF', 0, 0, 0] p_ip44_encaps = [IP(src=self.pg0.local_ip4, dst=tun.dst, flags=f, tos=fixed_dscp) for f in flags] p_ip64_encaps = [IP(src=self.pg0.local_ip4, dst=tun.dst, proto='ipv6', id=0, tos=fixed_dscp) for i in range(len(p_ip4s))] self.verify_ip4ip4_encaps(a4s[3], p_ip4s, p_ip44_encaps) self.verify_ip6ip4_encaps(a6s[3], p_ip6s, p_ip64_encaps) # # Decapsulation # n_packets_decapped = 0 self.p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) # IPv4 tunnel to IPv4 tcs = [0, dscp, dscp_ecn, ecn] # one overlay packet and all combinations of its encap p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4) p_ip4_encaps = [IP(src=tun.dst,
/*
 * Copyright (c) 2015 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
  Copyright (c) 2005 Eliot Dresselhaus

  Permission is hereby granted, free of charge, to any person obtaining
  a copy of this software and associated documentation files (the
  "Software"), to deal in the Software without restriction, including
  without limitation the rights to use, copy, modify, merge, publish,
  distribute, sublicense, and/or sell copies of the Software, and to
  permit persons to whom the Software is furnished to do so, subject to
  the following conditions:

  The above copyright notice and this permission notice shall be
  included in all copies or substantial portions of the Software.

  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#ifndef included_phash_h
#define included_phash_h

#include <vppinfra/hash.h>	/* for Bob's mixing functions */

typedef struct
{
  /* Maybe either pointer to vector or inline word. */
  uword key;

  /* Hash code (A, B). */
  u32 a, b;
} phash_key_t;

/* Table indexed by B. */
typedef struct
{
  /* Vector of key indices with this same value of B. */
  u32 *keys;

  /* hash=a^tabb[b].val_b */
  u32 val_b;

  /* High watermark of who has visited this map node. */
  u32 water_b;
} phash_tabb_t;

always_inline void
phash_tabb_free (phash_tabb_t * b)
{
  vec_free (b->keys);
  b->val_b = b->water_b = 0;
}

typedef struct
{
  /* b that currently occupies this hash */
  u32 b_q;

  /* Queue position of parent that could use this hash. */
  u32 parent_q;

  /* What to change parent tab[b] to use this hash. */
  u32 newval_q;

  /* Original value of tab[b]. */
  u32 oldval_q;
} phash_tabq_t;

typedef struct
{
  u8 a_bits, b_bits, s_bits, a_shift;
  u32 b_mask;
  u32 *tab;
  u32 *scramble;

  /* Seed value for hash mixer. */
  u64 hash_seed;

  u32 flags;

  /* Key functions want 64 bit keys.
     Use hash_mix64 rather than hash_mix32. */
#define PHASH_FLAG_MIX64		(1 << 0)
#define PHASH_FLAG_MIX32		(0 << 0)

  /* When b_bits is large enough (>= 12) we scramble. */
#define PHASH_FLAG_USE_SCRAMBLE		(1 << 1)

  /* Slow mode gives smaller tables but at the expense of more run time. */
#define PHASH_FLAG_SLOW_MODE		(0 << 2)
#define PHASH_FLAG_FAST_MODE		(1 << 2)

  /* Generate minimal perfect hash instead of perfect hash. */
#define PHASH_FLAG_NON_MINIMAL		(0 << 3)
#define PHASH_FLAG_MINIMAL		(1 << 3)

  /* vec_len (keys) for minimal hash;
     1 << s_bits for non-minimal hash. */
  u32 hash_max;

  /* Vector of keys. */
  phash_key_t *keys;

  /* Used by callbacks to identify keys. */
  void *private;

  /* Key comparison callback. */
  int (*key_is_equal) (void *private, uword key1, uword key2);

  /* Callback to reduce single key -> hash seeds. */
  void (*key_seed1) (void *private, uword key, void *seed);

  /* Callback to reduce two key2 -> hash seeds. */
  void (*key_seed2) (void *private, uword key1, uword key2, void *seed);

  /* Stuff used to compute perfect hash. */
  u32 random_seed;

  /* Stuff indexed by B. */
  phash_tabb_t *tabb;

  /* Table of B ordered by number of keys in tabb[b]. */
  u32 *tabb_sort;

  /* Unique key (or ~0 if none) for a given hash
     H = A ^ scramble[tab[B].val_b]. */
  u32 *tabh;

  /* Stuff indexed by q. */
  phash_tabq_t *tabq;

  /* Stats. */
  u32 n_seed_trials, n_perfect_calls;
} phash_main_t;

always_inline void
phash_main_free_working_memory (phash_main_t * pm)
{
  vec_free (pm->tabb);
  vec_free (pm->tabq);
  vec_free (pm->tabh);
  vec_free (pm->tabb_sort);
  if (!(pm->flags & PHASH_FLAG_USE_SCRAMBLE))
    vec_free (pm->scramble);
}

always_inline void
phash_main_free (phash_main_t * pm)
{
  phash_main_free_working_memory (pm);
  vec_free (pm->tab);
  vec_free (pm->keys);
  memset (pm, 0, sizeof (pm[0]));
}

/* Slow hash computation for general keys. */
uword phash_hash_slow (phash_main_t * pm, uword key);

/* Main routine to compute perfect hash. */
clib_error_t *phash_find_perfect_hash (phash_main_t * pm);

/* Validates that hash is indeed perfect. */
clib_error_t *phash_validate (phash_main_t * pm);

/* Unit test. */
int phash_test_main (unformat_input_t * input);

#endif /* included_phash_h */

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */
e(len(p_ip4s))] self.verify_ip4ip6_encaps(a4s[3], p_ip4s, p_ip6_encaps) self.verify_ip6ip6_encaps(a6s[3], p_ip6s, p_ip6_encaps) # # Decapsulation # n_packets_decapped = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) # IPv6 tunnel to IPv4 tcs = [0, dscp, dscp_ecn, ecn] # one overlay packet and all combinations of its encap p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4) p_ip6_encaps = [IPv6(src=tun.dst, dst=self.pg0.local_ip6, tc=tc) for tc in tcs] # for each encap tun will produce the same inner packet because it does # not copy up fields from the payload for p_ip6_encap in p_ip6_encaps: p6 = (self.p_ether / p_ip6_encap / p_ip4 / self.p_payload) p4_reply = (p_ip4 / self.p_payload) p4_reply.ttl -= 1 rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0) n_packets_decapped += N_PACKETS for p in rx: self.validate(p[1], p4_reply) self.assert_packet_checksums_valid(p) err = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.assertEqual(err, n_packets_decapped) # tun_ecn copies the ECN bits from the encap to the inner p_ip6_encaps = [IPv6(src=tun_ecn.dst, dst=self.pg0.local_ip6, tc=tc) for tc in tcs] p_ip4_replys = [p_ip4.copy() for i in range(len(p_ip6_encaps))] p_ip4_replys[2].tos = ecn p_ip4_replys[3].tos = ecn for i, p_ip6_encap in enumerate(p_ip6_encaps): p6 = (self.p_ether / p_ip6_encap / p_ip4 / self.p_payload) p4_reply = (p_ip4_replys[i] / self.p_payload) p4_reply.ttl -= 1 rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0) n_packets_decapped += N_PACKETS for p in rx: self.validate(p[1], p4_reply) self.assert_packet_checksums_valid(p) err = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.assertEqual(err, n_packets_decapped) # IPv6 tunnel to IPv6 # for each encap tun will produce the same inner packet because it does # not copy up fields from the payload p_ip6_encaps = [IPv6(src=tun.dst, dst=self.pg0.local_ip6, tc=tc) for tc in tcs] p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6) for p_ip6_encap in p_ip6_encaps: p6 = (self.p_ether / p_ip6_encap / p_ip6 / self.p_payload) p6_reply = (p_ip6 / self.p_payload) p6_reply.hlim = 63 rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0) n_packets_decapped += N_PACKETS for p in rx: self.validate(p[1], p6_reply) self.assert_packet_checksums_valid(p) err = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.assertEqual(err, n_packets_decapped) # IPv6 tunnel to IPv6 # tun_ecn copies the ECN bits from the encap to the inner p_ip6_encaps = [IPv6(src=tun_ecn.dst, dst=self.pg0.local_ip6, tc=tc) for tc in tcs] p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6) p_ip6_replys = [p_ip6.copy() for i in range(len(p_ip6_encaps))] p_ip6_replys[2].tc = ecn p_ip6_replys[3].tc = ecn for i, p_ip6_encap in enumerate(p_ip6_encaps): p6 = (self.p_ether / p_ip6_encap / p_ip6 / self.p_payload) p6_reply = (p_ip6_replys[i] / self.p_payload) p6_reply.hlim = 63 rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0) n_packets_decapped += N_PACKETS for p in rx: self.validate(p[1], p6_reply) self.assert_packet_checksums_valid(p) err = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.assertEqual(err, n_packets_decapped) def test_frag(self): """ ip{v4,v6} over ip6 test frag """ p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP') p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4) p_payload = UDP(sport=1234, dport=1234) # # Fragmentation / Reassembly and Re-fragmentation # rv = self.vapi.ip_reassembly_enable_disable( sw_if_index=self.pg1.sw_if_index, enable_ip6=1) self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=1000, max_reassembly_length=1000, expire_walk_interval_ms=10000, is_ip6=1) # Send lots of fragments, verify reassembled packet before_cnt = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') frags, p6_reply = self.generate_ip6_frags(3131, 1400) f = [] for i in range(0, 1000): f.extend(frags) self.pg1.add_stream(f) self.pg_enable_capture() self.pg_start() rx = self.pg0.get_capture(1000) for p in rx: self.validate(p[1], p6_reply) cnt = self.statistics.get_err_counter( '/err/ipip6-input/packets decapsulated') self.assertEqual(cnt, before_cnt + 1000) f = [] r = [] # TODO: Check out why reassembly of atomic fragments don't work for i in range(10, 90): frags, p6_reply = self.generate_ip6_frags(i * 100, 1000) f.extend(frags) r.extend(p6_reply) self.pg_enable_capture() self.pg1.add_stream(f) self.pg_start() rx = self.pg0.get_capture(80) i = 0 for p in rx: self.validate(p[1], r[i]) i += 1 # Simple fragmentation p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0]) # IPv6 in to IPv6 tunnel p_payload = UDP(sport=1234, dport=1234) / self.payload(1300) p6 = (p_ether / p_ip6 / p_payload) p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6, hlim=63) / p_ip6 / p_payload) p6_reply[1].hlim -= 1 self.pg_enable_capture() self.pg0.add_stream(p6) self.pg_start() rx = self.pg1.get_capture(2) # Scapy defragment doesn't deal well with multiple layers # of same type / Ethernet header first f = [p[1] for p in rx] reass_pkt = defragment6(f) self.validate(reass_pkt, p6_reply) # Now try with re-fragmentation # # Send large fragments to tunnel head-end, for the tunnel head end # to reassemble and then refragment out the tunnel again. # Hair-pinning # self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0]) frags, p6_reply = self.generate_ip6_hairpin_frags(8000, 1200) self.pg_enable_capture() self.pg1.add_stream(frags) self.pg_start() rx = self.pg1.get_capture(7) f = [p[1] for p in rx] reass_pkt = defragment6(f) p6_reply.id = 256 self.validate(reass_pkt, p6_reply) def test_ipip_create(self): """ ipip create / delete interface test """ rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5') sw_if_index = rv.sw_if_index self.vapi.ipip_del_tunnel(sw_if_index) def test_ipip_vrf_create(self): """ ipip create / delete interface VRF test """ t = VppIpTable(self, 20) t.add_vpp_config() rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5', table_id=20) sw_if_index = rv.sw_if_index self.vapi.ipip_del_tunnel(sw_if_index) def payload(self, len): return 'x' * len if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)