1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
import socket
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.packet import Raw
from framework import VppTestCase
from util import ppp
""" TestPing is a subclass of VPPTestCase classes.
Basic test for sanity check of the ping.
"""
class TestPing(VppTestCase):
""" Ping Test Case """
@classmethod
def setUpClass(cls):
super(TestPing, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
for i in cls.interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.disable_ipv6_ra()
i.resolve_arp()
i.resolve_ndp()
except Exception:
super(TestPing, cls).tearDownClass()
raise
def tearDown(self):
super(TestPing, self).tearDown()
if not self.vpp_dead:
self.vapi.cli("show hardware")
def test_ping_basic(self):
""" basic ping test """
try:
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.logger.info(self.vapi.cli("show ip arp"))
self.logger.info(self.vapi.cli("show ip6 neighbors"))
remote_ip4 = self.pg1.remote_ip4
ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
ret = self.vapi.cli(ping_cmd)
self.logger.info(ret)
out = self.pg1.get_capture(10)
icmp_id = None
icmp_seq = 1
for p in out:
ip = p[IP]
self.assertEqual(ip.version, 4)
self.assertEqual(ip.flags, 0)
self.assertEqual(ip.src, self.pg1.local_ip4)
self.assertEqual(ip.dst, self.pg1.remote_ip4)
self.assertEqual(ip.proto, 1)
self.assertEqual(len(ip.options), 0)
self.assertGreaterEqual(ip.ttl, 254)
icmp = p[ICMP]
self.assertEqual(icmp.type, 8)
self.assertEqual(icmp.code, 0)
self.assertEqual(icmp.seq, icmp_seq)
icmp_seq = icmp_seq + 1
if icmp_id is None:
icmp_id = icmp.id
else:
self.assertEqual(icmp.id, icmp_id)
finally:
self.vapi.cli("show error")
def test_ping_burst(self):
""" burst ping test """
try:
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.logger.info(self.vapi.cli("show ip arp"))
self.logger.info(self.vapi.cli("show ip6 neighbors"))
remote_ip4 = self.pg1.remote_ip4
ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
ret = self.vapi.cli(ping_cmd)
self.logger.info(ret)
out = self.pg1.get_capture(3*5)
icmp_id = None
icmp_seq = 1
count = 0
for p in out:
ip = p[IP]
self.assertEqual(ip.version, 4)
self.assertEqual(ip.flags, 0)
self.assertEqual(ip.src, self.pg1.local_ip4)
self.assertEqual(ip.dst, self.pg1.remote_ip4)
self.assertEqual(ip.proto, 1)
self.assertEqual(len(ip.options), 0)
self.assertGreaterEqual(ip.ttl, 254)
icmp = p[ICMP]
self.assertEqual(icmp.type, 8)
self.assertEqual(icmp.code, 0)
self.assertEqual(icmp.seq, icmp_seq)
count = count + 1
if count >= 3:
icmp_seq = icmp_seq + 1
count = 0
if icmp_id is None:
icmp_id = icmp.id
else:
self.assertEqual(icmp.id, icmp_id)
finally:
self.vapi.cli("show error")
Verify receipt of encapsulated frames on pg0
"""
self.pg1.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Pick first received frame and check if it's corectly encapsulated.
out = self.pg0.get_capture(1)
pkt = out[0]
self.check_encapsulation(pkt, self.single_tunnel_bd)
# payload = self.decapsulate(pkt)
# self.assert_eq_pkts(payload, self.frame_reply)
def test_ucast_flood(self):
""" Unicast flood test
Send frames from pg3
Verify receipt of encapsulated frames on pg0
"""
self.pg3.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Get packet from each tunnel and assert it's corectly encapsulated.
out = self.pg0.get_capture(self.n_ucast_tunnels)
for pkt in out:
self.check_encapsulation(pkt, self.ucast_flood_bd, True)
# payload = self.decapsulate(pkt)
# self.assert_eq_pkts(payload, self.frame_reply)
def test_mcast_flood(self):
""" Multicast flood test
Send frames from pg2
Verify receipt of encapsulated frames on pg0
"""
self.pg2.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Pick first received frame and check if it's corectly encapsulated.
out = self.pg0.get_capture(1)
pkt = out[0]
self.check_encapsulation(pkt, self.mcast_flood_bd,
local_only=False, mcast_pkt=True)
# payload = self.decapsulate(pkt)
# self.assert_eq_pkts(payload, self.frame_reply)
@classmethod
def create_gtpu_flood_test_bd(cls, teid, n_ucast_tunnels):
# Create 10 ucast gtpu tunnels under bd
ip_range_start = 10
ip_range_end = ip_range_start + n_ucast_tunnels
next_hop_address = cls.pg0.remote_ip4n
for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
ip_range_end):
# add host route so dest_ip4n will not be resolved
cls.vapi.ip_add_del_route(dst_address=dest_ip4n,
dst_address_length=32,
next_hop_address=next_hop_address)
r = cls.vapi.gtpu_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=dest_ip4n,
teid=teid)
cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
bd_id=teid)
@classmethod
def add_del_shared_mcast_dst_load(cls, is_add):
"""
add or del tunnels sharing the same mcast dst
to test gtpu ref_count mechanism
"""
n_shared_dst_tunnels = 20
teid_start = 1000
teid_end = teid_start + n_shared_dst_tunnels
for teid in range(teid_start, teid_end):
r = cls.vapi.gtpu_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=cls.mcast_ip4n,
mcast_sw_if_index=1,
teid=teid,
is_add=is_add)
if r.sw_if_index == 0xffffffff:
raise ValueError("bad sw_if_index: ~0")
@classmethod
def add_shared_mcast_dst_load(cls):
cls.add_del_shared_mcast_dst_load(is_add=1)
@classmethod
def del_shared_mcast_dst_load(cls):
cls.add_del_shared_mcast_dst_load(is_add=0)
@classmethod
def add_del_mcast_tunnels_load(cls, is_add):
"""
add or del tunnels to test gtpu stability
"""
n_distinct_dst_tunnels = 20
ip_range_start = 10
ip_range_end = ip_range_start + n_distinct_dst_tunnels
for dest_ip4n in ip4n_range(cls.mcast_ip4n, ip_range_start,
ip_range_end):
teid = bytearray(dest_ip4n)[3]
cls.vapi.gtpu_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=dest_ip4n,
mcast_sw_if_index=1,
teid=teid,
is_add=is_add)
@classmethod
def add_mcast_tunnels_load(cls):
cls.add_del_mcast_tunnels_load(is_add=1)
@classmethod
def del_mcast_tunnels_load(cls):
cls.add_del_mcast_tunnels_load(is_add=0)
# Class method to start the GTPU test case.
# Overrides setUpClass method in VppTestCase class.
# Python try..except statement is used to ensure that the tear down of
# the class will be executed even if exception is raised.
# @param cls The class pointer.
@classmethod
def setUpClass(cls):
super(TestGtpu, cls).setUpClass()
try:
cls.dport = 2152
cls.gtp_type = 0xff
# Create 2 pg interfaces.
cls.create_pg_interfaces(range(4))
for pg in cls.pg_interfaces:
pg.admin_up()
# Configure IPv4 addresses on VPP pg0.
cls.pg0.config_ip4()
# Resolve MAC address for VPP's IP address on pg0.
cls.pg0.resolve_arp()
# Our Multicast address
cls.mcast_ip4 = '239.1.1.1'
cls.mcast_ip4n = socket.inet_pton(socket.AF_INET, cls.mcast_ip4)
iplong = atol(cls.mcast_ip4)
cls.mcast_mac = "01:00:5e:%02x:%02x:%02x" % (
(iplong >> 16) & 0x7F, (iplong >> 8) & 0xFF, iplong & 0xFF)
# Create GTPU VTEP on VPP pg0, and put gtpu_tunnel0 and pg1
# into BD.
cls.single_tunnel_bd = 11
r = cls.vapi.gtpu_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=cls.pg0.remote_ip4n,
teid=cls.single_tunnel_bd)
cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
bd_id=cls.single_tunnel_bd)
cls.vapi.sw_interface_set_l2_bridge(
rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd)
# Setup teid 2 to test multicast flooding
cls.n_ucast_tunnels = 10
cls.mcast_flood_bd = 12
cls.create_gtpu_flood_test_bd(cls.mcast_flood_bd,
cls.n_ucast_tunnels)
r = cls.vapi.gtpu_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=cls.mcast_ip4n,
mcast_sw_if_index=1,
teid=cls.mcast_flood_bd)
cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
bd_id=cls.mcast_flood_bd)
cls.vapi.sw_interface_set_l2_bridge(
rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd)
# Add and delete mcast tunnels to check stability
cls.add_shared_mcast_dst_load()
cls.add_mcast_tunnels_load()
cls.del_shared_mcast_dst_load()
cls.del_mcast_tunnels_load()
# Setup teid 3 to test unicast flooding
cls.ucast_flood_bd = 13
cls.create_gtpu_flood_test_bd(cls.ucast_flood_bd,
cls.n_ucast_tunnels)
cls.vapi.sw_interface_set_l2_bridge(
rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd)
except Exception:
super(TestGtpu, cls).tearDownClass()
raise
# Method to define VPP actions before tear down of the test case.
# Overrides tearDown method in VppTestCase class.
# @param self The object pointer.
def tearDown(self):
super(TestGtpu, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show bridge-domain 11 detail"))
self.logger.info(self.vapi.cli("show bridge-domain 12 detail"))
self.logger.info(self.vapi.cli("show bridge-domain 13 detail"))
self.logger.info(self.vapi.cli("show int"))
self.logger.info(self.vapi.cli("show gtpu tunnel"))
self.logger.info(self.vapi.cli("show trace"))
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
|