summaryrefslogtreecommitdiffstats
path: root/test/test_interface_crud.py
diff options
context:
space:
mode:
authorMatej Klotton <mklotton@cisco.com>2016-12-22 11:06:56 +0100
committerDamjan Marion <dmarion.lists@gmail.com>2017-01-09 13:33:39 +0000
commit8d8a1da52907246c3218ea7c60ab8ca0569d0a83 (patch)
treed988a4071df8bcf65d604587dfa69c4d6c331b25 /test/test_interface_crud.py
parent0120e235ad0103c1318b6be5065d79d372439bba (diff)
make test: Loopback interface CRUD test
Change-Id: I0581da7a682bfe4dd6520ecf1b2ea6bd8c20b1b3 Signed-off-by: Matej Klotton <mklotton@cisco.com>
Diffstat (limited to 'test/test_interface_crud.py')
-rw-r--r--test/test_interface_crud.py151
1 files changed, 151 insertions, 0 deletions
diff --git a/test/test_interface_crud.py b/test/test_interface_crud.py
new file mode 100644
index 00000000000..63917047ed4
--- /dev/null
+++ b/test/test_interface_crud.py
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+"""CRUD tests of APIs (Create, Read, Update, Delete) HLD:
+
+- interface up/down/add/delete - interface type:
+ - pg (TBD)
+ - loopback
+ - vhostuser (TBD)
+ - af_packet (TBD)
+ - netmap (TBD)
+ - tuntap (root privileges needed)
+ - vxlan (TBD)
+"""
+
+import unittest
+
+from scapy.layers.inet import IP, ICMP
+from scapy.layers.l2 import Ether
+
+from framework import VppTestCase, VppTestRunner
+
+
+class TestLoopbackInterfaceCRUD(VppTestCase):
+ """CRUD Loopback
+
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestLoopbackInterfaceCRUD, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces(range(1))
+ for i in cls.pg_interfaces:
+ i.config_ip4()
+ i.resolve_arp()
+ except:
+ cls.tearDownClass()
+ raise
+
+ @staticmethod
+ def create_icmp_stream(src_if, dst_ifs):
+ """
+
+ :param VppInterface src_if: Packets are send to this interface,
+ using this interfaces remote host.
+ :param list dst_ifs: IPv4 ICMP requests are send to interfaces
+ addresses.
+ :return: List of generated packets.
+ """
+ pkts = []
+ for i in dst_ifs:
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IP(src=src_if.remote_ip4, dst=i.local_ip4) /
+ ICMP(id=i.sw_if_index, type='echo-request'))
+ pkts.append(p)
+ return pkts
+
+ def verify_icmp(self, capture, request_src_if, dst_ifs):
+ """
+
+ :param capture: Capture to verify.
+ :param VppInterface request_src_if: Interface where was send packets.
+ :param list dst_ifs: Interfaces where was generated IPv4 ICMP requests.
+ """
+ rcvd_icmp_pkts = []
+ for pkt in capture:
+ try:
+ ip = pkt[IP]
+ icmp = pkt[ICMP]
+ except IndexError:
+ pass
+ else:
+ info = (ip.src, ip.dst, icmp.type, icmp.id)
+ rcvd_icmp_pkts.append(info)
+
+ for i in dst_ifs:
+ # 0 - icmp echo response
+ info = (i.local_ip4, request_src_if.remote_ip4, 0, i.sw_if_index)
+ self.assertIn(info, rcvd_icmp_pkts)
+
+ def test_crud(self):
+ # create
+ loopbacks = self.create_loopback_interfaces(range(20))
+ for i in loopbacks:
+ i.local_ip4_prefix_len = 32
+ i.config_ip4()
+ i.admin_up()
+
+ # read (check sw if dump, ip4 fib, ip6 fib)
+ if_dump = self.vapi.sw_interface_dump()
+ fib4_dump = self.vapi.ip_fib_dump()
+ for i in loopbacks:
+ self.assertTrue(i.is_interface_config_in_dump(if_dump))
+ self.assertTrue(i.is_ip4_entry_in_fib_dump(fib4_dump))
+
+ # check ping
+ stream = self.create_icmp_stream(self.pg0, loopbacks)
+ self.pg0.add_stream(stream)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(expected_count=len(stream))
+
+ self.verify_icmp(capture, self.pg0, loopbacks)
+
+ # delete
+ for i in loopbacks:
+ i.remove_vpp_config()
+
+ # read (check not in sw if dump, ip4 fib, ip6 fib)
+ if_dump = self.vapi.sw_interface_dump()
+ fib4_dump = self.vapi.ip_fib_dump()
+ for i in loopbacks:
+ self.assertFalse(i.is_interface_config_in_dump(if_dump))
+ self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))
+
+ # check not ping
+ stream = self.create_icmp_stream(self.pg0, loopbacks)
+ self.pg0.add_stream(stream)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.assert_nothing_captured()
+
+ def test_down(self):
+ # create
+ loopbacks = self.create_loopback_interfaces(range(20))
+ for i in loopbacks:
+ i.local_ip4_prefix_len = 32
+ i.config_ip4()
+ i.admin_up()
+
+ # disable
+ for i in loopbacks:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ # read (check not in sw if dump, ip4 fib, ip6 fib)
+ if_dump = self.vapi.sw_interface_dump()
+ fib4_dump = self.vapi.ip_fib_dump()
+ for i in loopbacks:
+ self.assertTrue(i.is_interface_config_in_dump(if_dump))
+ self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))
+
+ # check not ping
+ stream = self.create_icmp_stream(self.pg0, loopbacks)
+ self.pg0.add_stream(stream)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.assert_nothing_captured()
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)