1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
#!/usr/bin/env python3
import abc
import six
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP
from util import ip4_range
@six.add_metaclass(abc.ABCMeta)
class BridgeDomain(object):
""" Bridge domain abstraction """
@property
def frame_request(self):
""" Ethernet frame modeling a generic request """
return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
IP(src='1.2.3.4', dst='4.3.2.1') /
UDP(sport=10000, dport=20000) /
Raw('\xa5' * 100))
@property
def frame_reply(self):
""" Ethernet frame modeling a generic reply """
return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
IP(src='4.3.2.1', dst='1.2.3.4') /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
@abc.abstractmethod
def ip_range(self, start, end):
""" range of remote ip's """
pass
@abc.abstractmethod
def encap_mcast(self, pkt, src_ip, src_mac, vni):
""" Encapsulate mcast packet """
pass
@abc.abstractmethod
def encapsulate(self, pkt, vni):
""" Encapsulate packet """
pass
@abc.abstractmethod
def decapsulate(self, pkt):
""" Decapsulate packet """
pass
@abc.abstractmethod
def check_encapsulation(self, pkt, vni, local_only=False):
""" Verify the encapsulation """
pass
def assert_eq_pkts(self, pkt1, pkt2):
""" Verify the Ether, IP, UDP, payload are equal in both
packets
"""
self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
self.assertEqual(pkt1[IP].src, pkt2[IP].src)
self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
self.assertEqual(pkt1[Raw], pkt2[Raw])
def test_decap(self):
""" Decapsulation test
Send encapsulated frames from pg0
Verify receipt of decapsulated frames on pg1
"""
encapsulated_pkt = self.encapsulate(self.frame_request,
self.single_tunnel_bd)
self.pg0.add_stream([encapsulated_pkt, ])
self.pg1.enable_capture()
self.pg_start()
# Pick first received frame and check if it's the non-encapsulated
# frame
out = self.pg1.get_capture(1)
pkt = out[0]
self.assert_eq_pkts(pkt, self.frame_request)
def test_encap(self):
""" Encapsulation test
Send frames from pg1
Verify receipt of encapsulated frames on pg0
"""
self.pg1.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Pick first received frame and check if it's correctly encapsulated.
out = self.pg0.get_capture(1)
pkt = out[0]
self.check_encapsulation(pkt, self.single_tunnel_bd)
payload = self.decapsulate(pkt)
self.assert_eq_pkts(payload, self.frame_reply)
def test_ucast_flood(self):
""" Unicast flood test
Send frames from pg3
Verify receipt of encapsulated frames on pg0
"""
self.pg3.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Get packet from each tunnel and assert it's correctly encapsulated.
out = self.pg0.get_capture(self.n_ucast_tunnels)
for pkt in out:
self.check_encapsulation(pkt, self.ucast_flood_bd, True)
payload = self.decapsulate(pkt)
self.assert_eq_pkts(payload, self.frame_reply)
def test_mcast_flood(self):
""" Multicast flood test
Send frames from pg2
Verify receipt of encapsulated frames on pg0
"""
self.pg2.add_stream([self.frame_reply])
self.pg0.enable_capture()
self.pg_start()
# Pick first received frame and check if it's correctly encapsulated.
out = self.pg0.get_capture(1)
pkt = out[0]
self.check_encapsulation(pkt, self.mcast_flood_bd,
local_only=False, mcast_pkt=True)
payload = self.decapsulate(pkt)
self.assert_eq_pkts(payload, self.frame_reply)
def test_mcast_rcv(self):
""" Multicast receive test
Send 20 encapsulated frames from pg0 only 10 match unicast tunnels
Verify receipt of 10 decap frames on pg2
"""
mac = self.pg0.remote_mac
ip_range_start = 10
ip_range_end = 30
mcast_stream = [
self.encap_mcast(self.frame_request, ip, mac, self.mcast_flood_bd)
for ip in self.ip_range(ip_range_start, ip_range_end)]
self.pg0.add_stream(mcast_stream)
self.pg2.enable_capture()
self.pg_start()
out = self.pg2.get_capture(10)
for pkt in out:
self.assert_eq_pkts(pkt, self.frame_request)
|