summaryrefslogtreecommitdiffstats
path: root/test/test_mpls.py
diff options
context:
space:
mode:
authorNeale Ranns <nranns@cisco.com>2016-11-01 10:05:08 +0000
committerDamjan Marion <dmarion.lists@gmail.com>2016-11-01 19:26:24 +0000
commit8fe8cc21d1e389d8e971a303e53c9e703aaaa0e0 (patch)
treedd42f4a0e7242ba9b2804dc2dd5619d2b837c13f /test/test_mpls.py
parent3b906b0d9b93a892831ce4d54d1d7ec3956ce2b4 (diff)
MPLS Exp-null Tests
Add some 'make test' unit tests for MPLS explicit NULL label handling. Fix the stacking of the MPLS load-balance result form the lookup onto the IPx lookup object. Change-Id: I890d1221b8e3dea99bcc714ed9d0154a5f602c52 Signed-off-by: Neale Ranns <nranns@cisco.com>
Diffstat (limited to 'test/test_mpls.py')
-rw-r--r--test/test_mpls.py209
1 files changed, 209 insertions, 0 deletions
diff --git a/test/test_mpls.py b/test/test_mpls.py
new file mode 100644
index 00000000000..45af470421e
--- /dev/null
+++ b/test/test_mpls.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+
+import unittest
+import socket
+from logging import *
+
+from framework import VppTestCase, VppTestRunner
+from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether, Dot1Q, ARP
+from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP
+from scapy.contrib.mpls import MPLS
+
+class TestMPLS(VppTestCase):
+ """ MPLS Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestMPLS, cls).setUpClass()
+
+ def setUp(self):
+ super(TestMPLS, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(3))
+
+ # setup both interfaces
+ # assign them different tables.
+ table_id = 0
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.set_table_ip4(table_id)
+ i.set_table_ip6(table_id)
+ i.config_ip4()
+ i.config_ip6()
+ i.enable_mpls()
+ i.resolve_arp()
+ i.resolve_ndp()
+ table_id += 1
+
+ def tearDown(self):
+ super(TestMPLS, self).tearDown()
+
+ def create_stream_ip4(self, src_if, mpls_label, mpls_ttl):
+ pkts = []
+ for i in range(0, 257):
+ info = self.create_packet_info(src_if.sw_if_index,
+ src_if.sw_if_index)
+ payload = self.info_to_payload(info)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ MPLS(label=mpls_label, ttl=mpls_ttl) /
+ IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ info.data = p.copy()
+ pkts.append(p)
+ return pkts
+
+ def create_stream_ip6(self, src_if, mpls_label, mpls_ttl):
+ pkts = []
+ for i in range(0, 257):
+ info = self.create_packet_info(src_if.sw_if_index,
+ src_if.sw_if_index)
+ payload = self.info_to_payload(info)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ MPLS(label=mpls_label, ttl=mpls_ttl) /
+ IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ info.data = p.copy()
+ pkts.append(p)
+ return pkts
+
+ def verify_capture_ip4(self, src_if, capture, sent):
+ try:
+ self.assertEqual(len(capture), len(sent))
+
+ for i in range(len(capture)):
+ tx = sent[i]
+ rx = capture[i]
+
+ # the rx'd packet has the MPLS label popped
+ eth = rx[Ether];
+ self.assertEqual(eth.type, 0x800);
+
+ tx_ip = tx[IP]
+ rx_ip = rx[IP]
+
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ # IP processing post pop has decremented the TTL
+ self.assertEqual(rx_ip.ttl+1, tx_ip.ttl)
+
+ except:
+ raise;
+
+ def verify_capture_ip6(self, src_if, capture, sent):
+ try:
+ self.assertEqual(len(capture), len(sent))
+
+ for i in range(len(capture)):
+ tx = sent[i]
+ rx = capture[i]
+
+ # the rx'd packet has the MPLS label popped
+ eth = rx[Ether];
+ self.assertEqual(eth.type, 0x86DD);
+
+ tx_ip = tx[IPv6]
+ rx_ip = rx[IPv6]
+
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ # IP processing post pop has decremented the TTL
+ self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+
+ except:
+ raise;
+
+
+ def test_v4_exp_null(self):
+ """ MPLS V4 Explicit NULL test """
+
+ #
+ # The first test case has an MPLS TTL of 0
+ # all packet should be dropped
+ #
+ tx = self.create_stream_ip4(self.pg0, 0, 0)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+
+ try:
+ self.assertEqual(0, len(rx));
+ except:
+ error("MPLS TTL=0 packets forwarded")
+ error(packet.show())
+ raise
+
+ #
+ # a stream with a non-zero MPLS TTL
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_ip4(self.pg0, 0, 2)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx)
+
+ #
+ # a stream with a non-zero MPLS TTL
+ # PG1 is in table 1
+ # we are ensuring the post-pop lookup occurs in the VRF table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_ip4(self.pg1, 0, 2)
+ self.pg1.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg1.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx)
+
+ def test_v6_exp_null(self):
+ """ MPLS V6 Explicit NULL test """
+
+ #
+ # a stream with a non-zero MPLS TTL
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_ip6(self.pg0, 2, 2)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip6(self.pg0, rx, tx)
+
+ #
+ # a stream with a non-zero MPLS TTL
+ # PG1 is in table 1
+ # we are ensuring the post-pop lookup occurs in the VRF table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_ip6(self.pg1, 2, 2)
+ self.pg1.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg1.get_capture()
+ self.verify_capture_ip6(self.pg0, rx, tx)
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)