1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
#!/usr/bin/env python
from framework import VppTestCase
from ipaddress import IPv4Address
from ipaddress import IPv6Address
from scapy.contrib.gtp import *
from scapy.all import *
class TestSRv6EndMGTP4E(VppTestCase):
""" SRv6 End.M.GTP4.E (SRv6 -> GTP-U) """
@classmethod
def setUpClass(cls):
super(TestSRv6EndMGTP4E, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
cls.pg_if_i = cls.pg_interfaces[0]
cls.pg_if_o = cls.pg_interfaces[1]
cls.pg_if_i.config_ip6()
cls.pg_if_o.config_ip4()
cls.ip4_dst = cls.pg_if_o.remote_ip4
# cls.ip4_src = cls.pg_if_o.local_ip4
cls.ip4_src = "192.168.192.10"
for pg_if in cls.pg_interfaces:
pg_if.admin_up()
pg_if.resolve_arp()
except Exception:
super(TestSRv6EndMGTP4E, cls).tearDownClass()
raise
def create_packets(self, inner):
ip4_dst = IPv4Address(str(self.ip4_dst))
# 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
dst = b'\xaa' * 4 + ip4_dst.packed + \
b'\x11' + b'\xbb' * 4 + b'\x11' * 3
ip6_dst = IPv6Address(dst)
ip4_src = IPv4Address(str(self.ip4_src))
# 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
src = b'\xcc' * 8 + ip4_src.packed + \
b'\xdd' * 2 + b'\x11' * 2
ip6_src = IPv6Address(src)
self.logger.info("ip4 dst: {}".format(ip4_dst))
self.logger.info("ip4 src: {}".format(ip4_src))
self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
self.logger.info("ip6 src (local srgw): {}".format(ip6_src))
pkts = list()
for d, s in inner:
pkt = (Ether() /
IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
IPv6ExtHdrSegmentRouting() /
IPv6(dst=d, src=s) /
UDP(sport=1000, dport=23))
self.logger.info(pkt.show2(dump=True))
pkts.append(pkt)
return pkts
def test_srv6_end(self):
""" test_srv6_end """
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
self.vapi.cli(
"sr localsid address {} behavior end.m.gtp4.e v4src_position 64"
.format(pkts[0]['IPv6'].dst))
self.logger.info(self.vapi.cli("show sr localsids"))
self.vapi.cli("clear errors")
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.logger.info(self.vapi.cli("show errors"))
self.logger.info(self.vapi.cli("show int address"))
capture = self.pg1.get_capture(len(pkts))
for pkt in capture:
self.logger.info(pkt.show2(dump=True))
self.assertEqual(pkt[IP].dst, self.ip4_dst)
self.assertEqual(pkt[IP].src, self.ip4_src)
self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
class TestSRv6TMTmap(VppTestCase):
""" SRv6 T.M.Tmap (GTP-U -> SRv6) """
@classmethod
def setUpClass(cls):
super(TestSRv6TMTmap, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
cls.pg_if_i = cls.pg_interfaces[0]
cls.pg_if_o = cls.pg_interfaces[1]
cls.pg_if_i.config_ip4()
cls.pg_if_o.config_ip6()
for pg_if in cls.pg_interfaces:
pg_if.admin_up()
pg_if.resolve_arp()
except Exception:
super(TestSRv6TMTmap, cls).tearDownClass()
raise
class TestSRv6EndMGTP6E(VppTestCase):
""" SRv6 End.M.GTP6.E """
@classmethod
def setUpClass(cls):
super(TestSRv6EndMGTP6E, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
cls.pg_if_i = cls.pg_interfaces[0]
cls.pg_if_o = cls.pg_interfaces[1]
cls.pg_if_i.config_ip4()
cls.pg_if_o.config_ip6()
for pg_if in cls.pg_interfaces:
pg_if.admin_up()
pg_if.resolve_arp()
except Exception:
super(TestSRv6EndMGTP6E, cls).tearDownClass()
raise
class TestSRv6EndMGTP6D(VppTestCase):
""" SRv6 End.M.GTP6.D """
@classmethod
def setUpClass(cls):
super(TestSRv6EndMGTP6D, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
cls.pg_if_i = cls.pg_interfaces[0]
cls.pg_if_o = cls.pg_interfaces[1]
cls.pg_if_i.config_ip4()
cls.pg_if_o.config_ip6()
for pg_if in cls.pg_interfaces:
pg_if.admin_up()
pg_if.resolve_arp()
except Exception:
super(TestSRv6EndMGTP6D, cls).tearDownClass()
raise
|