aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorNeale Ranns <nranns@cisco.com>2017-10-21 09:37:55 -0700
committerDamjan Marion <dmarion.lists@gmail.com>2017-10-31 08:06:03 +0000
commit4c7c8e55b03e21787cafb11cd49b9488c5ffef70 (patch)
tree7e853bdaf7c4e1cc9cb6c15e121b93944892be5e /test
parentb3d1b203579226ca5136b9d6a2744577d07cfcc6 (diff)
Refactor IP input checks for re-use at MPLS disposition
Change-Id: I7aafdecd6f370411138e6ab67b2ff72cda6e0666 Signed-off-by: Neale Ranns <nranns@cisco.com>
Diffstat (limited to 'test')
-rw-r--r--test/test_ip4.py135
-rw-r--r--test/test_ip6.py78
-rw-r--r--test/test_mpls.py23
3 files changed, 233 insertions, 3 deletions
diff --git a/test/test_ip4.py b/test/test_ip4.py
index 42fd1164a5f..b05635f95ee 100644
--- a/test/test_ip4.py
+++ b/test/test_ip4.py
@@ -1227,5 +1227,140 @@ class TestIPDeag(VppTestCase):
self.send_and_expect(self.pg0, pkts_src, self.pg2)
+class TestIPInput(VppTestCase):
+ """ IPv4 Input Exceptions """
+
+ def setUp(self):
+ super(TestIPInput, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIPInput, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def send_and_expect(self, input, pkts, output):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(len(pkts))
+ return rx
+
+ def send_and_assert_no_replies(self, intf, pkts, remark):
+ self.vapi.cli("clear trace")
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ for i in self.pg_interfaces:
+ i.get_capture(0)
+ i.assert_nothing_captured(remark=remark)
+
+ def test_ip_input(self):
+ """ IP Input Exceptions """
+
+ # i can't find a way in scapy to construct an IP packet
+ # with a length less than the IP header length
+
+ #
+ # Packet too short - this is forwarded
+ #
+ p_short = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ len=40) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_expect(self.pg0, p_short * 65, self.pg1)
+
+ #
+ # Packet too long - this is dropped
+ #
+ p_long = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ len=400) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_assert_no_replies(self.pg0, p_long * 65,
+ "too long")
+
+ #
+ # bad chksum - this is dropped
+ #
+ p_chksum = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ chksum=400) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_assert_no_replies(self.pg0, p_chksum * 65,
+ "bad checksum")
+
+ #
+ # bad version - this is dropped
+ #
+ p_ver = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ version=3) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_assert_no_replies(self.pg0, p_ver * 65,
+ "funky version")
+
+ #
+ # fragment offset 1 - this is dropped
+ #
+ p_frag = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ frag=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_assert_no_replies(self.pg0, p_frag * 65,
+ "frag offset")
+
+ #
+ # TTL expired packet
+ #
+ p_ttl = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ ttl=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_expect(self.pg0, p_ttl * 65, self.pg0)
+
+ rx = rx[0]
+ icmp = rx[ICMP]
+
+ self.assertEqual(icmptypes[icmp.type], "time-exceeded")
+ self.assertEqual(icmpcodes[icmp.type][icmp.code],
+ "ttl-zero-during-transit")
+ self.assertEqual(icmp.src, self.pg0.remote_ip4)
+ self.assertEqual(icmp.dst, self.pg1.remote_ip4)
+
+ self.logger.error(self.vapi.cli("sh error"))
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_ip6.py b/test/test_ip6.py
index dbe87465f7b..0a0d56cbb8d 100644
--- a/test/test_ip6.py
+++ b/test/test_ip6.py
@@ -16,7 +16,8 @@ from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
- ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
+ ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
+ ICMPv6TimeExceeded
from util import ppp
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
@@ -1618,5 +1619,80 @@ class TestIP6Punt(VppTestCase):
is_add=0,
is_ip6=1)
+
+class TestIP6Input(VppTestCase):
+ """ IPv6 Input Exceptions """
+
+ def setUp(self):
+ super(TestIP6Input, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIP6Input, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def send_and_expect(self, input, pkts, output):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(len(pkts))
+ return rx
+
+ def send_and_assert_no_replies(self, intf, pkts, remark):
+ self.vapi.cli("clear trace")
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ for i in self.pg_interfaces:
+ i.get_capture(0)
+ i.assert_nothing_captured(remark=remark)
+
+ def test_ip_input(self):
+ """ IP6 Input Exceptions """
+
+ #
+ # bad version - this is dropped
+ #
+ p_version = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg1.remote_ip6,
+ version=3) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.send_and_assert_no_replies(self.pg0, p_version * 65,
+ "funky version")
+
+ #
+ # hop limit - IMCP replies
+ #
+ p_version = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg1.remote_ip6,
+ hlim=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
+ rx = rx[0]
+ icmp = rx[ICMPv6TimeExceeded]
+ self.assertEqual(icmp.type, 3)
+ # 0: "hop limit exceeded in transit",
+ self.assertEqual(icmp.code, 0)
+
+ self.logger.error(self.vapi.cli("sh error"))
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_mpls.py b/test/test_mpls.py
index 460a32d1fc1..d265e85ed5e 100644
--- a/test/test_mpls.py
+++ b/test/test_mpls.py
@@ -106,6 +106,7 @@ class TestMPLS(VppTestCase):
ping=0,
ip_itf=None,
dst_ip=None,
+ chksum=None,
n=257):
self.reset_packet_infos()
pkts = []
@@ -133,6 +134,8 @@ class TestMPLS(VppTestCase):
dst=ip_itf.local_ip4) /
ICMP())
+ if chksum:
+ p[IP].chksum = chksum
info.data = p.copy()
pkts.append(p)
return pkts
@@ -152,7 +155,7 @@ class TestMPLS(VppTestCase):
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
- dst_ip=None):
+ dst_ip=None, hlim=64):
if dst_ip is None:
dst_ip = src_if.remote_ip6
self.reset_packet_infos()
@@ -162,7 +165,7 @@ class TestMPLS(VppTestCase):
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
- IPv6(src=src_if.remote_ip6, dst=dst_ip) /
+ IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
@@ -1026,12 +1029,21 @@ class TestMPLS(VppTestCase):
self.verify_capture_ip4(self.pg1, rx, tx)
#
+ # disposed packets have an invalid IPv4 checkusm
+ #
+ tx = self.create_stream_labelled_ip4(self.pg0, [34],
+ dst_ip="232.1.1.1", n=65,
+ chksum=1)
+ self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
+ #
# set the RPF-ID of the enrtry to not match the input packet's
#
route_232_1_1_1.update_rpf_id(56)
tx = self.create_stream_labelled_ip4(self.pg0, [34],
dst_ip="232.1.1.1")
self.send_and_assert_no_replies(self.pg0, tx, "RPF-ID drop 56")
+ self.logger.error(self.vapi.cli("sh error"))
def test_mcast_ip6_tail(self):
""" MPLS IPv6 Multicast Tail """
@@ -1092,6 +1104,13 @@ class TestMPLS(VppTestCase):
self.verify_capture_ip6(self.pg1, rx, tx)
#
+ # disposed packets have hop-limit = 1
+ #
+ tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
+ dst_ip="ff01::1", hlim=1)
+ self.send_and_assert_no_replies(self.pg0, tx, "Hop Limt Expired")
+
+ #
# set the RPF-ID of the enrtry to not match the input packet's
#
route_ff.update_rpf_id(56)