#!/usr/bin/env python import unittest from logging import * from framework import VppTestCase, VppTestRunner from vpp_sub_interface import VppDot1QSubint from vpp_gre_interface import VppGreInterface, VppGre6Interface from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable from vpp_papi_provider import L2_VTR_OP from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q, GRE from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from scapy.volatile import RandMAC, RandIP from util import ppp, ppc class GreTunnelTypes: TT_L3 = 0 TT_TEB = 1 TT_ERSPAN = 2 class TestGRE(VppTestCase): """ GRE Test Case """ @classmethod def setUpClass(cls): super(TestGRE, cls).setUpClass() def setUp(self): super(TestGRE, self).setUp() # create 3 pg interfaces - set one in a non-default table. self.create_pg_interfaces(range(3)) self.tbl = VppIpTable(self, 1) self.tbl.add_vpp_config() self.pg1.set_table_ip4(1) for i in self.pg_interfaces: i.admin_up() self.pg0.config_ip4() self.pg0.resolve_arp() self.pg1.config_ip4() self.pg1.resolve_arp() self.pg2.config_ip6() self.pg2.resolve_ndp() def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() self.pg1.set_table_ip4(0) super(TestGRE, self).tearDown() def create_stream_ip4(self, src_if, src_ip, dst_ip): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_ip, dst=dst_ip) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_stream_ip6(self, src_if, src_ip, dst_ip): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_ip, dst=dst_ip) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_tunnel_stream_4o4(self, src_if, tunnel_src, tunnel_dst, src_ip, dst_ip): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=tunnel_src, dst=tunnel_dst) / GRE() / IP(src=src_ip, dst=dst_ip) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_tunnel_stream_6o4(self, src_if, tunnel_src, tunnel_dst, src_ip, dst_ip): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=tunnel_src, dst=tunnel_dst) / GRE() / IPv6(src=src_ip, dst=dst_ip) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_tunnel_stream_6o6(self, src_if, tunnel_src, tunnel_dst, src_ip, dst_ip): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=tunnel_src, dst=tunnel_dst) / GRE() / IPv6(src=src_ip, dst=dst_ip) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_tunnel_stream_l2o4(self, src_if, tunnel_src, tunnel_dst): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=tunnel_src, dst=tunnel_dst) / GRE() / Ether(dst=RandMAC('*:*:*:*:*:*'), src=RandMAC('*:*:*:*:*:*')) / IP(src=str(RandIP()), dst=str(RandIP())) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_tunnel_stream_vlano4(self, src_if, tunnel_src, tunnel_dst, vlan): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=tunnel_src, dst=tunnel_dst) / GRE() / Ether(dst=RandMAC('*:*:*:*:*:*'), src=RandMAC('*:*:*:*:*:*')) / Dot1Q(vlan=vlan) / IP(src=str(RandIP()), dst=str(RandIP())) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def verify_tunneled_4o4(self, src_if, capture, sent, tunnel_src, tunnel_dst): self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): try: tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_i
# Process this file with autoconf to produce a configure script.
AC_INIT(libvppinfra,1.0,)
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_HEADERS(config/config.h)
AC_CANONICAL_BUILD
AC_CANONICAL_HOST
AM_INIT_AUTOMAKE([gnu no-dist-gzip dist-bzip2])
AM_SILENT_RULES([yes])
# Checks for programs.
AC_PROG_CC
AM_PROG_AS
AM_PROG_LIBTOOL
######################################################################
dnl ------------
dnl Check CFLAGS, CC
dnl ------------
AC_ARG_WITH(cc,
AC_HELP_STRING([--with-cc],[Set CC for use as C compiler.]),
[CC="$with_cc"])
AC_ARG_WITH(cflags,
AC_HELP_STRING([--with-cflags],[Set CFLAGS for use by C compiler.]),
[CFLAGS="$with_cflags"])
AC_ARG_WITH(ldflags,
AC_HELP_STRING([--with-ldflags],[Set LDFLAGS for linking.]),
[LDFLAGS="$with_ldflags"])
######################################################################
AC_ARG_ENABLE(tests,
AC_HELP_STRING([--enable-tests],[Enable unit tests]),
[enable_tests=1],
[enable_tests=0])
AM_CONDITIONAL(ENABLE_TESTS, test "$enable_tests" = "1")
AC_ARG_WITH(unix,
AC_HELP_STRING([--with-unix],[Compile unix version of clib]),
[],
[case $host_os in
darwin* | linux*) with_unix=yes;;
*) with_unix=no;;
esac])
AM_CONDITIONAL(WITH_UNIX, test "$with_unix" = "yes")
## Enable 64-bit vector lengths
AC_ARG_WITH(vec64,
AC_HELP_STRING([--with-vec64],[Enable 64-bit vector lengths]),
[with_vec64=1],
[with_vec64=0])
AC_SUBST(VEC64,[-DCLIB_VEC64=${with_vec64}])
AC_CONFIG_FILES([Makefile])
AC_OUTPUT