#!/usr/bin/env python import unittest import socket from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \ VppMplsIpBind, VppIpMRoute, VppMRoutePath, \ MRouteItfFlags, MRouteEntryFlags, VppIpTable, VppMplsTable, \ VppMplsLabel, MplsLspMode, find_mpls_route from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded from scapy.contrib.mpls import MPLS def verify_filter(capture, sent): if not len(capture) == len(sent): # filter out any IPv6 RAs from the capture for p in capture: if p.haslayer(IPv6): capture.remove(p) return capture def verify_mpls_stack(tst, rx, mpls_labels): # the rx'd packet has the MPLS label popped eth = rx[Ether] tst.assertEqual(eth.type, 0x8847) rx_mpls = rx[MPLS] for ii in range(len(mpls_labels)): tst.assertEqual(rx_mpls.label, mpls_labels[ii].value) tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp) tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl) if ii == len(mpls_labels) - 1: tst.assertEqual(rx_mpls.s, 1) else: # not end of stack tst.assertEqual(rx_mpls.s, 0) # pop the label to expose the next rx_mpls = rx_mpls[MPLS].payload class TestMPLS(VppTestCase): """ MPLS Test Case """ def setUp(self): super(TestMPLS, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(4)) # setup both interfaces # assign them different tables. table_id = 0 self.tables = [] tbl = VppMplsTable(self, 0) tbl.add_vpp_config() self.tables.append(tbl) for i in self.pg_interfaces: i.admin_up() if table_id != 0: tbl = VppIpTable(self, table_id) tbl.add_vpp_config() self.tables.append(tbl) tbl = VppIpTable(self, table_id, is_ip6=1) tbl.add_vpp_config() self.tables.append(tbl) i.set_table_ip4(table_id) i.set_table_ip6(table_id) i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() i.enable_mpls() table_id += 1 def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.ip6_disable() i.set_table_ip4(0) i.set_table_ip6(0) i.disable_mpls() i.admin_down() super(TestMPLS, self).tearDown() # the default of 64 matches the IP packet TTL default def create_stream_labelled_ip4( self, src_if, mpls_labels, ping=0, ip_itf=None, dst_ip=None, chksum=None, ip_ttl=64, n=257): self.reset_packet_infos() pkts = [] for i in range(0, n): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for ii in range(len(mpls_labels)): p = p / MPLS(label=mpls_labels[ii].value, ttl=mpls_labels[ii].ttl, cos=mpls_labels[ii].exp) if not ping: if not dst_ip: p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload)) else: p = (p / IP(src=src_if.local_ip4, dst=dst_ip, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload)) else: p = (p / IP(src=ip_itf.remote_ip4, dst=ip_itf.local_ip4, ttl=ip_ttl) / ICMP()) if chksum: p[IP].chksum = chksum info.data = p.copy() pkts.append(p) return pkts def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0): self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_stream_ip6(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0): self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=ip_ttl, tc=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_stream_labelled_ip6(self, src_if, mpls_labels, hlim=64, dst_ip=None): if dst_ip is None: dst_ip = src_if.remote_ip6 self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for l in mpls_labels: p = p / MPLS(label=l.value, ttl=l.ttl, cos=l.exp) p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def verify_capture_ip4(self, src_if, capture, sent, ping_resp=0, ip_ttl=None, ip_dscp=0): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether] self.assertEqual(eth.type, 0x800) tx_ip = tx[IP] rx_ip = rx[IP] if not ping_resp: self.assertEqual(rx_ip.src, tx_ip.src)
#!/bin/sh
if [ $# -lt 2 ]; then
cat - <<EOF
$0 FROM-DIR TO-DIR ENVIRONMENT
Copies files from one directory to another with possible
transformations.
Files named FILE.spp will be transformed via the spp preprocessor
subject to environment definitions. Source FILE.copyimgspp results in
destination file FILE in the corresponding destination directory.
Files named FILE.copyimgsh are run as shell scripts in (i.e. via chdir)
the corresponding destination directory (and not copied).
First regular files are copied. Then transformations are preformed.
Finally, shell scripts are run.
EOF
exit 1;
fi
FROM_DIR=$1
TO_DIR=$2
FILTER=" -and -not -name '*~'";
FILTER="${FILTER} -and -not -name '.*~'";
FILTER="$FILTER -and -not -path '*/.git*'";
FILTER="$FILTER -and -not -path '*/.svn*'";
FILTER="$FILTER -and -not -path '*/.CVS*'";
FROM_FILES=`(cd $FROM_DIR; eval "find . -not -type d $FILTER")`;
FROM_DIRS=`(cd $FROM_DIR; eval "find . -type d $FILTER")`;
COPY_FILES=
SPP_FILES=
SH_FILES=
for f in $FROM_FILES; do
case $f in
*.copyimgspp) SPP_FILES="$SPP_FILES $f" ;;
*.copyimgsh) SH_FILES="$SH_FILES $f" ;;
*) COPY_FILES="$COPY_FILES $f";;
esac
done
# Make destination directories.
mkdir -p $TO_DIR;
if [ "$FROM_DIRS" != "" ]; then
for d in $FROM_DIRS; do
mkdir -p $TO_DIR/$d;
done
fi
# Copy files
if [ "$COPY_FILES" != "" ]; then
tar -cf - -C $FROM_DIR $COPY_FILES | tar --preserve-permissions -xf - -C $TO_DIR;
fi
# Use spp to transform any spp files
if [ "$SPP_FILES" != "" ]; then
for f in $SPP_FILES; do
d=`dirname $f`;
b=`basename $f .copyimgspp`;
mkdir -p $TO_DIR/$d;
t=$TO_DIR/$d/$b;
spp -o $TO_DIR/$d/$b $FROM_DIR/$f || exit 1;
done;
fi
# Now that all files have been copied/created we run any shell scripts
ABS_FROM_DIR=`(cd $FROM_DIR; pwd)`;
if [ "$SH_FILES" != "" ]; then
# Allow directory to define some functions
if [ -f $FROM_DIR/copyimgsh-functions.sh ]; then
. $FROM_DIR/copyimgsh-functions.sh ;
fi ;
for f in $SH_FILES; do
d=`dirname $f`;
b=`basename $f`;
mkdir -p $TO_DIR/$d;
(cd $TO_DIR/$d; . $ABS_FROM_DIR/$d/$b) || exit 1;
done;
fi;