summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression/stateless_tests/stl_capture_test.py
blob: e24fe3bddd92d5f85f159d783a7b903be59137e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#!/router/bin/python
from .stl_general_test import CStlGeneral_Test, CTRexScenario
from trex_stl_lib.api import *
import os, sys
import pprint

def ip2num (ip_str):
    return struct.unpack('>L', socket.inet_pton(socket.AF_INET, ip_str))[0]
    
def num2ip (ip_num):
    return socket.inet_ntoa(struct.pack('>L', ip_num))

def ip_add (ip_str, cnt):
    return num2ip(ip2num(ip_str) + cnt)
    
    
class STLCapture_Test(CStlGeneral_Test):
    """Tests for capture packets"""

    def setUp(self):
        CStlGeneral_Test.setUp(self)

        if not self.is_loopback:
            self.skip('capture tests are skipped on a non-loopback machine')
            
        assert 'bi' in CTRexScenario.stl_ports_map

        self.c = CTRexScenario.stl_trex

        self.tx_port, self.rx_port = CTRexScenario.stl_ports_map['bi'][0]

        self.c.connect()
        self.c.reset(ports = [self.tx_port, self.rx_port])

        self.pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')

        self.percentage = 5 if self.is_virt_nics else 50


    @classmethod
    def tearDownClass(cls):
        if CTRexScenario.stl_init_error:
            return
        # connect back at end of tests
        if not cls.is_connected():
            CTRexScenario.stl_trex.connect()


    # a simple capture test - inject packets and see the packets arrived the same
    def test_basic_capture (self):
        pkt_count = 100
        
        try:
            # move to service mode
            self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
            
            # start a capture
            txc = self.c.start_capture(tx_ports = self.tx_port, limit = pkt_count)
            rxc = self.c.start_capture(rx_ports = self.rx_port, limit = pkt_count)
            
            # inject few packets with a VM
            vm = STLScVmRaw( [STLVmFlowVar ( "ip_src",  min_value="16.0.0.0", max_value="16.255.255.255", size=4, step = 7, op = "inc"),
                              STLVmWrFlowVar (fv_name="ip_src", pkt_offset= "IP.src"),
                              STLVmFixIpv4(offset = "IP")
                              ]
                             );
              
            pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example',
                                vm = vm)
            
            stream = STLStream(name = 'burst',
                               packet = pkt,
                               mode = STLTXSingleBurst(total_pkts = pkt_count,
                                                       percentage = self.percentage)
                               )
            
            self.c.add_streams(ports = self.tx_port, streams = [stream])
            
            self.c.start(ports = self.tx_port, force = True)
            self.c.wait_on_traffic(ports = self.tx_port)
            
            tx_pkt_list = []
            rx_pkt_list = []
            
            self.c.stop_capture(txc['id'], output = tx_pkt_list)
            self.c.stop_capture(rxc['id'], output = rx_pkt_list)
            
            assert (len(tx_pkt_list) == len(rx_pkt_list) == pkt_count)
            
            # make sure we have the same binaries in both lists
            tx_bin = [pkt['binary'] for pkt in tx_pkt_list]
            rx_bin = [pkt['binary'] for pkt in rx_pkt_list]
            assert(set(tx_bin) == set(rx_bin))
            
            # generate all the values that should be
            expected_src_ips = [ip_add('16.0.0.0', i * 7) for i in range(pkt_count)]
            
            for i, pkt in enumerate(rx_pkt_list):
                pkt_scapy = Ether(pkt['binary'])
                pkt_ts    = pkt['ts']
                
                assert('IP' in pkt_scapy)
                assert(pkt_scapy['IP'].src in expected_src_ips)
                
                # remove the match
                del expected_src_ips[expected_src_ips.index(pkt_scapy['IP'].src)]
                
            
        except STLError as e:
            assert False , '{0}'.format(e)
            
        finally:
            self.c.remove_all_captures()
            self.c.set_service_mode(ports = self.rx_port, enabled = False)

            
    
            
    # in this test we apply captures under traffic multiple times
    def test_stress_capture (self):
        pkt_count = 100
        
        try:
            # move to service mode
            self.c.set_service_mode(ports = self.rx_port)
            
            # start heavy traffic
            pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
            
            stream = STLStream(name = 'burst',
                               packet = pkt,
                               mode = STLTXCont(percentage = self.percentage)
                               )
            
            self.c.add_streams(ports = self.tx_port, streams = [stream])
            self.c.start(ports = self.tx_port, force = True)
            captures = [{'capture_id': None, 'limit': 50}, {'capture_id': None, 'limit': 80}, {'capture_id': None, 'limit': 100}]
            
            for i in range(0, 100):
                # start a few captures
                for capture in captures:
                    capture['capture_id'] = self.c.start_capture(rx_ports = [self.rx_port], limit = capture['limit'])['id']
                
                # a little time to wait for captures to be full
                wait_iterations = 0
                while True:
                    server_captures = self.c.get_capture_status()
                    counts = ([c['count'] for c in server_captures.values()])
                    if {50, 80, 100} == set(counts):
                        break
                        
                    time.sleep(0.1)
                    wait_iterations += 1
                    assert(wait_iterations <= 5)
                    
                
                for capture in captures:
                    capture_id = capture['capture_id']
                    
                    # make sure the server registers us and we are full
                    assert(capture['capture_id'] in server_captures.keys())
                    assert(server_captures[capture_id]['count'] == capture['limit'])
                    
                    # fetch packets
                    pkt_list = []
                    self.c.stop_capture(capture['capture_id'], pkt_list)
                    assert (len(pkt_list) == capture['limit'])
                
                    # a little sanity per packet
                    for pkt in pkt_list:
                        scapy_pkt = Ether(pkt['binary'])
                        assert(scapy_pkt['IP'].src == '16.0.0.1')
                        assert(scapy_pkt['IP'].dst == '48.0.0.1')
              
        except STLError as e:
            assert False , '{0}'.format(e)
            
        finally:
            self.c.remove_all_captures()
            self.c.set_service_mode(ports = self.rx_port, enabled = False)
            
    
    # in this test we capture and analyze the ARP request / response
    def test_arp_capture (self):
        if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
            
        if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
            
        try:
            # move to service mode
            self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
                                                                        
            # start a capture
            capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 2)
         
            # generate an ARP request
            self.c.arp(ports = self.tx_port)
            
            pkts = []
            self.c.stop_capture(capture_info['id'], output = pkts)
        
            assert len(pkts) == 2
            
            # find the correct order
            if pkts[0]['port'] == self.rx_port:
                request  = pkts[0]
                response = pkts[1]
            else:
                request  = pkts[1]
                response = pkts[0]
            
            assert request['port']  == self.rx_port
            assert response['port'] == self.tx_port
            
            arp_request, arp_response = Ether(request['binary']), Ether(response['binary'])
            assert 'ARP' in arp_request
            assert 'ARP' in arp_response
            
            assert arp_request['ARP'].op  == 1
            assert arp_response['ARP'].op == 2
            
            assert arp_request['ARP'].pdst == arp_response['ARP'].psrc
            
            
        except STLError as e:
            assert False , '{0}'.format(e)
            
        finally:
             self.c.remove_all_captures()
             self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)

             
    # test PING
    def test_ping_capture (self):
        if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
            
        if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
            return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
            
        try:
            # move to service mode
            self.c.set_service_mode(ports = [self.tx_port, self.rx_port])

            # start a capture
            capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 100)

            # generate an ARP request
            tx_ipv4 = self.c.get_port_attr(port = self.tx_port)['src_ipv4']
            rx_ipv4 = self.c.get_port_attr(port = self.rx_port)['src_ipv4']
            
            count = 50
            
            self.c.ping_ip(src_port = self.tx_port, dst_ip = rx_ipv4, pkt_size = 1500, count = count, interval_sec = 0.01)

            pkts = []
            self.c.stop_capture(capture_info['id'], output = pkts)

            req_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.rx_port]
            res_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.tx_port]
            assert len(req_pkts) == count
            assert len(res_pkts) == count
            
            for req_pkt in req_pkts:
                assert 'ICMP' in req_pkt
                assert req_pkt['IP'].src == tx_ipv4
                assert req_pkt['IP'].dst == rx_ipv4
                assert req_pkt['ICMP'].type == 8
                assert len(req_pkt) == 1500
                
            for res_pkt in res_pkts:
                assert 'ICMP' in res_pkt
                assert res_pkt['IP'].src == rx_ipv4
                assert res_pkt['IP'].dst == tx_ipv4
                assert res_pkt['ICMP'].type == 0
                assert len(res_pkt) == 1500

                
        except STLError as e:
            assert False , '{0}'.format(e)

        finally:
             self.c.remove_all_captures()
             self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)


             
             
    # in this test we stress TX & RX captures in parallel
    def test_stress_tx_rx (self):
        pkt_count = 100
        
        try:
            # move to service mode
            self.c.set_service_mode(ports = [self.rx_port, self.tx_port])
            
            # start heavy traffic
            pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
            
            stream = STLStream(name = 'burst',
                               packet = pkt,
                               mode = STLTXCont(percentage = self.percentage)
                               )
            
            self.c.add_streams(ports = self.tx_port, streams = [stream])
            self.c.start(ports = self.tx_port, mult = "50%", force = True)
            
            
            # start a capture on the RX port
            capture_rx = self.c.start_capture(rx_ports = self.rx_port, limit = 1000)
            
            # now under traffic start/stop the TX capture
            for i in range(0, 1000):
                # start a common capture
                capture_txrx = self.c.start_capture(rx_ports = self.rx_port, tx_ports = self.tx_port, limit = 1000)
                self.c.stop_capture(capture_txrx['id'])
                
              
        except STLError as e:
            assert False , '{0}'.format(e)
            
        finally:
            self.c.remove_all_captures()
            self.c.set_service_mode(ports = [self.rx_port, self.tx_port], enabled = False)