summaryrefslogtreecommitdiffstats
path: root/test/test_map_br.py
blob: 3602ddd2e31627602fa1b3742ce716842fc6d706 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
#!/usr/bin/env python3

import ipaddress
import unittest

from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from util import fragment_rfc791, fragment_rfc8200

import scapy.compat
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6


class TestMAPBR(VppTestCase):
    """ MAP-T Test Cases """

    @classmethod
    def setUpClass(cls):
        super(TestMAPBR, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestMAPBR, cls).tearDownClass()

    def setUp(self):
        super(TestMAPBR, self).setUp()

        #
        # Create 2 pg interfaces.
        # pg0 is IPv4
        # pg1 is IPv6
        #
        self.create_pg_interfaces(range(2))

        self.pg0.admin_up()
        self.pg0.config_ip4()
        self.pg1.generate_remote_hosts(20)
        self.pg1.configure_ipv4_neighbors()
        self.pg0.resolve_arp()

        self.pg1.admin_up()
        self.pg1.config_ip6()
        self.pg1.generate_remote_hosts(20)
        self.pg1.configure_ipv6_neighbors()

        #
        # BR configuration parameters used for all test.
        #
        self.ip4_prefix = '198.18.0.0/24'
        self.ip6_prefix = '2001:db8:f0::/48'
        self.ip6_src = '2001:db8:ffff:ff00::/64'
        self.ea_bits_len = 12
        self.psid_offset = 6
        self.psid_length = 4
        self.mtu = 1500
        self.tag = 'MAP-T BR'

        self.ipv4_internet_address = self.pg0.remote_ip4
        self.ipv4_map_address = "198.18.0.12"
        self.ipv4_udp_or_tcp_internet_port = 65000
        self.ipv4_udp_or_tcp_map_port = 16606

        self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3"      # 198.18.0.12
        self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3"   # 198.18.0.28
        self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3"      # 10.0.0.12
        self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4"       # 4
        self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3"     # f1

        self.ipv6_udp_or_tcp_internet_port = 65000
        self.ipv6_udp_or_tcp_map_port = 16606
        self.ipv6_udp_or_tcp_spoof_port = 16862

        self.ipv6_map_address = (
            "2001:db8:ffff:ff00:ac:1001:200:0")         # 176.16.1.2
        self.ipv6_map_same_rule_diff_addr = (
            "2001:db8:ffff:ff00:c6:1200:1000:0")        # 198.18.0.16
        self.ipv6_map_same_rule_same_addr = (
            "2001:db8:ffff:ff00:c6:1200:c00:0")         # 198.18.0.12

        self.map_br_prefix = "2001:db8:f0::"
        self.map_br_prefix_len = 48
        self.psid_number = 3

        #
        # Add an IPv6 route to the MAP-BR.
        #
        map_route = VppIpRoute(self,
                               self.map_br_prefix,
                               self.map_br_prefix_len,
                               [VppRoutePath(self.pg1.remote_ip6,
                                             self.pg1.sw_if_index)])
        map_route.add_vpp_config()

        #
        # Add a MAP BR domain that maps from pg0 to pg1.
        #
        self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
                                 ip6_prefix=self.ip6_prefix,
                                 ip6_src=self.ip6_src,
                                 ea_bits_len=self.ea_bits_len,
                                 psid_offset=self.psid_offset,
                                 psid_length=self.psid_length,
                                 mtu=self.mtu,
                                 tag=self.tag)

        #
        # Set BR parameters.
        #
        self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
        self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
        self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
        self.vapi.map_param_set_traffic_class(copy=1)

        #
        # Enable MAP-T on interfaces.
        #
        self.vapi.map_if_enable_disable(is_enable=1,
                                        sw_if_index=self.pg0.sw_if_index,
                                        is_translation=1)

        self.vapi.map_if_enable_disable(is_enable=1,
                                        sw_if_index=self.pg1.sw_if_index,
                                        is_translation=1)

        self.vapi.map_if_enable_disable(is_enable=1,
                                        sw_if_index=self.pg1.sw_if_index,
                                        is_translation=1)

    def tearDown(self):
        super(TestMAPBR, self).tearDown()
        for i in self.pg_interfaces:
            i.unconfig_ip4()
            i.unconfig_ip6()
            i.admin_down()

    def v4_address_check(self, pkt):
        self.assertEqual(pkt[IP].src, self.ipv4_map_address)
        self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)

    def v4_port_check(self, pkt, proto):
        self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
        self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)

    def v6_address_check(self, pkt):
        self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
        self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)

    def v6_port_check(self, pkt, proto):
        self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
        self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)

    #
    # Normal translation of UDP packets v4 -> v6 direction
    # Send 128 frame size packet for IPv4/UDP.
    # Received packet should be translated into IPv6 packet with no
    # fragment header.
    #

    def test_map_t_udp_ip4_to_ip6(self):
        """ MAP-T UDP IPv4 -> IPv6 """

        eth = Ether(src=self.pg0.remote_mac,
                    dst=self.pg0.local_mac)
        ip = IP(src=self.pg0.remote_ip4,
                dst=self.ipv4_map_address,
                tos=0)
        udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
                  dport=self.ipv4_udp_or_tcp_map_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg0, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v6_address_check(rx_pkt)
        self.v6_port_check(rx_pkt, UDP)
        self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
        self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)

    #
    # Normal translation of TCP packets v4 -> v6 direction.
    # Send 128 frame size packet for IPv4/TCP.
    # Received packet should be translated into IPv6 packet with no
    # fragment header.
    #

    def test_map_t_tcp_ip4_to_ip6(self):
        """ MAP-T TCP IPv4 -> IPv6 """

        eth = Ether(src=self.pg0.remote_mac,
                    dst=self.pg0.local_mac)
        ip = IP(src=self.pg0.remote_ip4,
                dst=self.ipv4_map_address,
                tos=0)
        tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
                  dport=self.ipv4_udp_or_tcp_map_port)
        payload = "a" * 82
        tx_pkt = eth / ip / tcp / payload

        self.pg_send(self.pg0, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v6_address_check(rx_pkt)
        self.v6_port_check(rx_pkt, TCP)
        self.assertEqual(rx_pkt[IPv6].tc, 0)    # IPv4 ToS passed to v6 TC
        self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)

    #
    # Normal translation of UDP packets v6 -> v4 direction
    # Send 128 frame size packet for IPv6/UDP.
    # Received packet should be translated into an IPv4 packet with DF=1.
    #

    def test_map_t_udp_ip6_to_ip4(self):
        """ MAP-T UDP IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v4_address_check(rx_pkt)
        self.v4_port_check(rx_pkt, UDP)
        self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
        self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
        df_bit = IP(flags="DF").flags
        self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)

    #
    # Normal translation of TCP packets v6 -> v4 direction
    # Send 128 frame size packet for IPv6/TCP.
    # Received packet should be translated into an IPv4 packet with DF=1
    #

    def test_map_t_tcp_ip6_to_ip4(self):
        """ MAP-T TCP IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / tcp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v4_address_check(rx_pkt)
        self.v4_port_check(rx_pkt, TCP)
        self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
        self.assertEqual(rx_pkt[IP].tos, 0)    # IPv6 TC passed to v4 ToS
        df_bit = IP(flags="DF").flags
        self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)

    #
    # Translation of ICMP Echo Request v4 -> v6 direction
    # Received packet should be translated into an IPv6 Echo Request.
    #

    def test_map_t_echo_request_ip4_to_ip6(self):
        """ MAP-T echo request IPv4 -> IPv6 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IP(src=self.pg0.remote_ip4,
                dst=self.ipv4_map_address)
        icmp = ICMP(type="echo-request",
                    id=self.ipv6_udp_or_tcp_map_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / payload

        self.pg_send(self.pg0, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
        self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
                         ICMPv6EchoRequest(type="Echo Request").type)
        self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
        self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
                         self.ipv6_udp_or_tcp_map_port)

    #
    # Translation of ICMP Echo Reply v4 -> v6 direction
    # Received packet should be translated into an IPv6 Echo Reply.
    #

    def test_map_t_echo_reply_ip4_to_ip6(self):
        """ MAP-T echo reply IPv4 -> IPv6 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IP(src=self.pg0.remote_ip4,
                dst=self.ipv4_map_address)
        icmp = ICMP(type="echo-reply",
                    id=self.ipv6_udp_or_tcp_map_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / payload

        self.pg_send(self.pg0, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
        self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
                         ICMPv6EchoReply(type="Echo Reply").type)
        self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
        self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
                         self.ipv6_udp_or_tcp_map_port)

    #
    # Translation of ICMP Time Exceeded v4 -> v6 direction
    # Received packet should be translated into an IPv6 Time Exceeded.
    #

    def test_map_t_time_exceeded_ip4_to_ip6(self):
        """ MAP-T time exceeded IPv4 -> IPv6 """

        eth = Ether(src=self.pg0.remote_mac,
                    dst=self.pg0.local_mac)
        ip = IP(src=self.pg0.remote_ip4,
                dst=self.ipv4_map_address)
        icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
        ip_inner = IP(dst=self.pg0.remote_ip4,
                      src=self.ipv4_map_address, ttl=1)
        udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
                        dport=self.ipv4_udp_or_tcp_internet_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload

        self.pg_send(self.pg0, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v6_address_check(rx_pkt)
        self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
        self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
                         ICMPv6TimeExceeded().type)
        self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
                         ICMPv6TimeExceeded(
                            code="hop limit exceeded in transit").code)
        self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
        self.assertTrue(rx_pkt.haslayer(IPerror6))
        self.assertTrue(rx_pkt.haslayer(UDPerror))
        self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
        self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
        self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
        self.assertEqual(rx_pkt[UDPerror].dport,
                         self.ipv6_udp_or_tcp_internet_port)

    #
    # Translation of ICMP Echo Request v6 -> v4 direction
    # Received packet should be translated into an IPv4 Echo Request.
    #

    def test_map_t_echo_request_ip6_to_ip4(self):
        """ MAP-T echo request IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        icmp = ICMPv6EchoRequest()
        icmp.id = self.ipv6_udp_or_tcp_map_port
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
        self.assertEqual(rx_pkt[ICMP].code, 0)
        self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)

    #
    # Translation of ICMP Echo Reply v6 -> v4 direction
    # Received packet should be translated into an IPv4 Echo Reply.
    #

    def test_map_t_echo_reply_ip6_to_ip4(self):
        """ MAP-T echo reply IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
        self.assertEqual(rx_pkt[ICMP].code, 0)
        self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)

    #
    # Translation of ICMP Packet Too Big v6 -> v4 direction
    # Received packet should be translated into an IPv4 Dest Unreachable.
    #

    def test_map_t_packet_too_big_ip6_to_ip4(self):
        """ MAP-T packet too big IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        icmp = ICMPv6PacketTooBig(mtu=1280)
        ip_inner = IPv6(src=self.ipv6_map_address,
                        dst=self.ipv6_cpe_address)
        udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
                        dport=self.ipv6_udp_or_tcp_map_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v4_address_check(rx_pkt)
        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
        self.assertEqual(rx_pkt[ICMP].code,
                         ICMP(code="fragmentation-needed").code)
        self.assertEqual(rx_pkt[ICMP].nexthopmtu,
                         tx_pkt[ICMPv6PacketTooBig].mtu - 20)
        self.assertTrue(rx_pkt.haslayer(IPerror))
        self.assertTrue(rx_pkt.haslayer(UDPerror))
        self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
        self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
        self.assertEqual(rx_pkt[UDPerror].sport,
                         self.ipv4_udp_or_tcp_internet_port)
        self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)

    #
    # Translation of ICMP Time Exceeded v6 -> v4 direction
    # Received packet should be translated into an IPv4 Time Exceeded.
    #

    def test_map_t_time_exceeded_ip6_to_ip4(self):
        """ MAP-T time exceeded IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        icmp = ICMPv6TimeExceeded()
        ip_inner = IPv6(src=self.ipv6_map_address,
                        dst=self.ipv6_cpe_address, hlim=1)
        udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
                        dport=self.ipv6_udp_or_tcp_map_port)
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg0.get_capture(1)
        rx_pkt = rx_pkts[0]

        self.v4_address_check(rx_pkt)
        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
        self.assertEqual(rx_pkt[ICMP].code,
                         ICMP(code="ttl-zero-during-transit").code)
        self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
        self.assertTrue(rx_pkt.haslayer(IPerror))
        self.assertTrue(rx_pkt.haslayer(UDPerror))
        self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
        self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
        self.assertEqual(rx_pkt[UDPerror].sport,
                         self.ipv4_udp_or_tcp_internet_port)
        self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)

    #
    # Spoofed IPv4 Source Address v6 -> v4 direction
    # Send a packet with a wrong IPv4 address embedded in bits 72-103.
    # The BR should either drop the packet, or rewrite the spoofed
    # source IPv4 as the actual source IPv4 address.
    # The BR really should drop the packet.
    #

    def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
        """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_spoof_address,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")

    #
    # Spoofed IPv4 Source Prefix v6 -> v4 direction
    # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
    # The BR should either drop the packet, or rewrite the source IPv4
    # to the prefix that matches the source IPv4 address.
    #

    def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
        """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_spoof_prefix,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")

    #
    # Spoofed IPv6 PSID v6 -> v4 direction
    # Send a packet with a wrong IPv6 port PSID
    # The BR should drop the packet.
    #

    def test_map_t_spoof_psid_ip6_to_ip4(self):
        """ MAP-T spoof psid IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_spoof_psid,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")

    #
    # Spoofed IPv6 subnet field v6 -> v4 direction
    # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
    # The BR should drop the packet.
    #

    def test_map_t_spoof_subnet_ip6_to_ip4(self):
        """ MAP-T spoof subnet IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_spoof_subnet,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")

    #
    # Spoofed IPv6 port PSID v6 -> v4 direction
    # Send a packet with a wrong IPv6 port PSID
    # The BR should drop the packet.
    #

    def test_map_t_spoof_port_psid_ip6_to_ip4(self):
        """ MAP-T spoof port psid IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
                  dport=self.ipv6_udp_or_tcp_internet_port)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")

    #
    # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
    # Send a packet with a wrong IPv6 IMCP ID PSID
    # The BR should drop the packet.
    #

    def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
        """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_address)
        icmp = ICMPv6EchoRequest()
        icmp.id = self.ipv6_udp_or_tcp_spoof_port
        payload = "H" * 10
        tx_pkt = eth / ip / icmp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        self.pg0.get_capture(0, timeout=1)
        self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")

    #
    # Map to Map - same rule, different address
    #

    @unittest.skip("Fixme: correct behavior needs clarification")
    def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
        """ MAP-T same rule, diff addr IPv6 -> IPv6 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_same_rule_diff_addr)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=1025)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

    #
    # Map to Map - same rule, same address
    #

    @unittest.skip("Fixme: correct behavior needs clarification")
    def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
        """ MAP-T same rule, same addr IPv6 -> IPv6 """

        eth = Ether(src=self.pg1.remote_mac,
                    dst=self.pg1.local_mac)
        ip = IPv6(src=self.ipv6_cpe_address,
                  dst=self.ipv6_map_same_rule_same_addr)
        udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
                  dport=1025)
        payload = "a" * 82
        tx_pkt = eth / ip / udp / payload

        self.pg_send(self.pg1, tx_pkt * 1)

        rx_pkts = self.pg1.get_capture(1)
        rx_pkt = rx_pkts[0]

if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
an class="o">= self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].update(mult, force)) return rc def __validate (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].validate()) return rc # connect to server def __connect(self): # first disconnect if already connected if self.is_connected(): self.__disconnect() # clear this flag self.connected = False # connect sync channel self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.comm_link.connect() self.logger.post_cmd(rc) if not rc: return rc # version rc = self._transmit("get_version") if not rc: return rc self.server_version = rc.data() self.global_stats.server_version = rc.data() # cache system info rc = self._transmit("get_system_info") if not rc: return rc self.system_info = rc.data() # cache supported commands rc = self._transmit("get_supported_cmds") if not rc: return rc self.supported_cmds = rc.data() # create ports for port_id in xrange(self.system_info["port_count"]): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] self.ports[port_id] = Port(port_id, speed, driver, self.username, self.comm_link, self.session_id) # sync the ports rc = self.__sync_ports() if not rc: return rc # connect async channel self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) rc = self.async_client.connect() self.logger.post_cmd(rc) if not rc: return rc self.connected = True return RC_OK() # disconenct from server def __disconnect(self, release_ports = True): # release any previous acquired ports if self.is_connected() and release_ports: self.__release(self.get_acquired_ports()) self.comm_link.disconnect() self.async_client.disconnect() self.connected = False return RC_OK() # clear stats def __clear_stats(self, port_id_list, clear_global): for port_id in port_id_list: self.ports[port_id].clear_stats() if clear_global: self.global_stats.clear_stats() self.logger.log_cmd("clearing stats on port(s) {0}:".format(port_id_list)) return RC # get stats def __get_stats (self, port_id_list): stats = {} stats['global'] = self.global_stats.get_stats() total = {} for port_id in port_id_list: port_stats = self.ports[port_id].get_stats() stats[port_id] = port_stats for k, v in port_stats.iteritems(): if not k in total: total[k] = v else: total[k] += v stats['total'] = total return stats ############ functions used by other classes but not users ############## def _verify_port_id_list (self, port_id_list): # check arguments if not isinstance(port_id_list, list): return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list))) # all ports are valid ports if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]): return RC_ERR("") return RC_OK() def _validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): return False # check each item of the sequence return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) # transmit request on the RPC link def _transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) # transmit batch request on the RPC link def _transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) # stats def _get_formatted_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) stats_obj = {} for stats_type in stats_opts: stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) return stats_obj def _get_streams(self, port_id_list, streams_mask=set()): streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) return streams_obj def _invalidate_stats (self, port_id_list): for port_id in port_id_list: self.ports[port_id].invalidate_stats() self.global_stats.invalidate() return RC_OK() ################################# # ------ private methods ------ # @staticmethod def __get_mask_keys(ok_values={True}, **kwargs): masked_keys = set() for key, val in kwargs.iteritems(): if val in ok_values: masked_keys.add(key) return masked_keys @staticmethod def __filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} # API decorator - double wrap because of argument def __api_check(connected = True): def wrap (f): def wrap2(*args, **kwargs): client = args[0] func_name = f.__name__ # check connection if connected and not client.is_connected(): raise STLStateError(func_name, 'disconnected') ret = f(*args, **kwargs) return ret return wrap2 return wrap ############################ API ############################# ############################ ############################# ############################ ############################# def __enter__ (self): self.connect() self.acquire(force = True) self.reset() return self def __exit__ (self, type, value, traceback): if self.get_active_ports(): self.stop(self.get_active_ports()) self.disconnect() ############################ Getters ############################# ############################ ############################# ############################ ############################# # return verbose level of the logger def get_verbose (self): return self.logger.get_verbose() # is the client on read only mode ? def is_all_ports_acquired (self): return not (self.get_all_ports() == self.get_acquired_ports()) # is the client connected ? def is_connected (self): return self.connected and self.comm_link.is_connected # get connection info def get_connection_info (self): return self.connection_info # get supported commands by the server def get_server_supported_cmds(self): return self.supported_cmds # get server version def get_server_version(self): return self.server_version # get server system info def get_server_system_info(self): return self.system_info # get port count def get_port_count(self): return len(self.ports) # returns the port object def get_port (self, port_id): port = self.ports.get(port_id, None) if (port != None): return port else: raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) # get all ports as IDs def get_all_ports (self): return self.ports.keys() # get all acquired ports def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() if port_obj.is_acquired()] # get all active ports (TX or pause) def get_active_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() if port_obj.is_active()] # get paused ports def get_paused_ports (self): return [port_id for port_id, port_obj in self.ports.iteritems() if port_obj.is_paused()] # get all TX ports def get_transmitting_ports (self): return [port_id for port_id, port_obj in self.ports.iteritems() if port_obj.is_transmitting()] # get stats def get_stats (self, ports = None, async_barrier = True): # by default use all ports if ports == None: ports = self.get_acquired_ports() else: ports = self.__ports(ports) # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # check async barrier if not type(async_barrier) is bool: raise STLArgumentError('async_barrier', async_barrier) # if the user requested a barrier - use it if async_barrier: rc = self.async_client.barrier() if not rc: raise STLError(rc) return self.__get_stats(ports) # return all async events def get_events (self): return self.event_handler.get_events() ############################ Commands ############################# ############################ ############################# ############################ ############################# """ Sets verbose level :parameters: level : str "high" "low" "normal" :raises: None """ def set_verbose (self, level): modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH} if not level in modes.keys(): raise STLArgumentError('level', level) self.logger.set_verbose(modes[level]) """ Connects to the TRex server :parameters: None :raises: + :exc:`STLError` """ @__api_check(False) def connect (self): rc = self.__connect() if not rc: raise STLError(rc) """ Disconnects from the server :parameters: stop_traffic : bool tries to stop traffic before disconnecting release_ports : bool tries to release all the acquired ports """ @__api_check(False) def disconnect (self, stop_traffic = True, release_ports = True): # try to stop ports but do nothing if not possible if stop_traffic: try: self.stop() except STLError: pass self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.__disconnect(release_ports) self.logger.post_cmd(rc) """ Acquires ports for executing commands :parameters: ports : list ports to execute the command force : bool force acquire the ports :raises: + :exc:`STLError` """ @__api_check(True) def acquire (self, ports = None, force = False): # by default use all ports if ports == None: ports = self.get_all_ports() # verify ports rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # verify valid port id list if force: self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) else: self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) rc = self.__acquire(ports, force) self.logger.post_cmd(rc) if not rc: # cleanup self.__release(ports) raise STLError(rc) """ Release ports :parameters: ports : list ports to execute the command :raises: + :exc:`STLError` """ @__api_check(True) def release (self, ports = None): # by default use all acquired ports if ports == None: ports = self.get_acquired_ports() # verify ports rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.logger.pre_cmd("Releasing ports {0}:".format(ports)) rc = self.__release(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ Pings the server :parameters: None :raises: + :exc:`STLError` """ @__api_check(True) def ping(self): self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self._transmit("ping") self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ force acquire ports, stop the traffic, remove all streams and clear stats :parameters: ports : list ports to execute the command :raises: + :exc:`STLError` """ @__api_check(True) def reset(self, ports = None): # by default use all ports if ports == None: ports = self.get_all_ports() # verify ports rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.acquire(ports, force = True) self.stop(ports) self.remove_all_streams(ports) self.clear_stats(ports) """ remove all streams from port(s) :parameters: ports : list ports to execute the command :raises: + :exc:`STLError` """ @__api_check(True) def remove_all_streams (self, ports = None): # by default use all ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports)) rc = self.__remove_all_streams(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ add a list of streams to port(s) :parameters: ports : list ports to execute the command streams: list streams to attach :returns: list of stream IDs in order of the stream list :raises: + :exc:`STLError` """ @__api_check(True) def add_streams (self, streams, ports = None): # by default use all ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # transform single stream if not isinstance(streams, list): streams = [streams] # check streams if not all([isinstance(stream, STLStream) for stream in streams]): raise STLArgumentError('streams', streams) self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports)) rc = self.__add_streams(streams, ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) return [stream.get_id() for stream in streams] """ remove a list of streams from ports :parameters: ports : list ports to execute the command stream_id_list: list stream id list to remove :raises: + :exc:`STLError` """ @__api_check(True) def remove_streams (self, stream_id_list, ports = None): # by default use all ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # transform single stream if not isinstance(stream_id_list, list): stream_id_list = [stream_id_list] # check streams if not all([isinstance(stream_id, long) for stream_id in stream_id_list]): raise STLArgumentError('stream_id_list', stream_id_list) # remove streams self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports)) rc = self.__remove_streams(stream_id_list, ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ load a profile file to port(s) :parameters: filename : str filename to load ports : list ports to execute the command :raises: + :exc:`STLError` """ @__api_check(True) def load_profile (self, filename, ports = None): # check filename if not os.path.isfile(filename): raise STLError("file '{0}' does not exists".format(filename)) # by default use all ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) streams = None # try YAML try: streams_db = CStreamsDB() stream_list = streams_db.load_yaml_file(filename) # convert to new style stream object streams = [HACKSTLStream(stream) for stream in stream_list.compiled] except YAMLError: # try python try: basedir = os.path.dirname(filename) sys.path.append(basedir) file = os.path.basename(filename).split('.')[0] module = __import__(file, globals(), locals(), [], -1) streams = module.register().get_streams() except (AttributeError, ImportError): raise STLError("bad format input file '{0}'".format(filename)) self.add_streams(streams, ports) """ start traffic on port(s) :parameters: ports : list ports to execute command mult : str multiplier in a form of pps, bps, or line util in % examples: "5kpps", "10gbps", "85%", "32mbps" force : bool imply stopping the port of active and also forces a profile that exceeds the L1 BW duration : int limit the run for time in seconds -1 means unlimited total : bool should the B/W be divided by the ports or duplicated for each :raises: + :exc:`STLError` """ @__api_check(True) def start (self, ports = None, mult = "1", force = False, duration = -1, total = False): # by default use all ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = False, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) # some type checkings if not type(force) is bool: raise STLArgumentError('force', force) if not isinstance(duration, (int, float)): raise STLArgumentError('duration', duration) if not type(total) is bool: raise STLArgumentError('total', total) # verify ports are stopped or force stop them active_ports = list(set(self.get_active_ports()).intersection(ports)) if active_ports: if not force: raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports)) else: rc = self.stop(active_ports) if not rc: raise STLError(rc) # start traffic self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports)) rc = self.__start(mult_obj, duration, ports, force) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ stop port(s) :parameters: ports : list ports to execute the command :raises: + :exc:`STLError` """ @__api_check(True) def stop (self, ports = None): # by default the user means all the active ports if ports == None: ports = self.get_active_ports() if not ports: return # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports)) rc = self.__stop(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ update traffic on port(s) :parameters: ports : list ports to execute command mult : str multiplier in a form of pps, bps, or line util in % and also with +/- examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+" force : bool forces a profile that exceeds the L1 BW total : bool should the B/W be divided by the ports or duplicated for each :raises: + :exc:`STLError` """ @__api_check(True) def update (self, ports = None, mult = "1", total = False, force = False): # by default the user means all the active ports if ports == None: ports = self.get_active_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = True, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) # verify total if not type(total) is bool: raise STLArgumentError('total', total) # call low level functions self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports)) rc = self.__update(mult, ports, force) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ pause traffic on port(s) :parameters: ports : list ports to execute command :raises: + :exc:`STLError` """ @__api_check(True) def pause (self, ports = None): # by default the user means all the TX ports if ports == None: ports = self.get_transmitting_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports)) rc = self.__pause(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ resume traffic on port(s) :parameters: ports : list ports to execute command :raises: + :exc:`STLError` """ @__api_check(True) def resume (self, ports = None): # by default the user means all the paused ports if ports == None: ports = self.get_paused_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports)) rc = self.__resume(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) """ validate port(s) configuration :parameters: ports : list ports to execute command mult : str multiplier in a form of pps, bps, or line util in % examples: "5kpps", "10gbps", "85%", "32mbps" duration : int limit the run for time in seconds -1 means unlimited total : bool should the B/W be divided by the ports or duplicated for each :raises: + :exc:`STLError` """ @__api_check(True) def validate (self, ports = None, mult = "1", duration = "-1", total = False): if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = True, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) if not isinstance(duration, (int, float)): raise STLArgumentError('duration', duration) self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports)) rc = self.__validate(ports) self.logger.post_cmd(rc) for port in ports: self.ports[port].print_profile(mult_obj, duration) """ clear stats on port(s) :parameters: ports : list ports to execute command clear_global : bool clear the global stats :raises: + :exc:`STLError` """ @__api_check(False) def clear_stats (self, ports = None, clear_global = True): # by default use all ports if ports == None: ports = self.get_all_ports() else: ports = self.__ports(ports) # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) # verify clear global if not type(clear_global) is bool: raise STLArgumentError('clear_global', clear_global) rc = self.__clear_stats(ports, clear_global) if not rc: raise STLError(rc) """ block until specify port(s) traffic has ended :parameters: ports : list ports to execute command timeout : int timeout in seconds :raises: + :exc:`STLTimeoutError` - in case timeout has expired + :exe:'STLError' """ @__api_check(True) def wait_on_traffic (self, ports = None, timeout = 60): # by default use all acquired ports if ports == None: ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) expr = time.time() + timeout # wait while any of the required ports are active while set(self.get_active_ports()).intersection(ports): time.sleep(0.01) if time.time() > expr: raise STLTimeoutError(timeout) """ clear all events :parameters: None :raises: None """ def clear_events (self): self.event_handler.clear_events() ############################ Line ############################# ############################ Commands ############################# ############################ ############################# # console decorator def __console(f): def wrap(*args): client = args[0] time1 = time.time() try: rc = f(*args) except STLError as e: client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return # if got true - print time if rc: delta = time.time() - time1 client.logger.log(format_time(delta) + "\n") return wrap @__console def connect_line (self, line): '''Connects to the TRex server''' # define a parser parser = parsing_opts.gen_parser(self, "connect", self.connect_line.__doc__, parsing_opts.FORCE) opts = parser.parse_args(line.split()) if opts is None: return # call the API self.connect() self.acquire(force = opts.force) # true means print time return True @__console def disconnect_line (self, line): self.disconnect() @__console def reset_line (self, line): self.reset() # true means print time return True @__console def start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser parser = parsing_opts.gen_parser(self, "start", self.start_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.TOTAL, parsing_opts.FORCE, parsing_opts.STREAM_FROM_PATH_OR_FILE, parsing_opts.DURATION, parsing_opts.MULTIPLIER_STRICT, parsing_opts.DRY_RUN) opts = parser.parse_args(line.split()) if opts is None: return active_ports = list(set(self.get_active_ports()).intersection(opts.ports)) if active_ports: if not opts.force: msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports) self.logger.log(format_text(msg, 'bold')) return else: self.stop(active_ports) # remove all streams self.remove_all_streams(opts.ports) # pack the profile self.load_profile(opts.file[0], opts.ports) if opts.dry: self.validate(opts.ports, opts.mult, opts.duration, opts.total) else: self.start(opts.ports, opts.mult, opts.force, opts.duration, opts.total) # true means print time return True @__console def stop_line (self, line): '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "stop", self.stop_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return # find the relevant ports ports = list(set(self.get_active_ports()).intersection(opts.ports)) if not ports: self.logger.log(format_text("No active traffic on provided ports\n", 'bold')) return self.stop(ports) # true means print time return True @__console def update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, "update", self.update_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER, parsing_opts.TOTAL, parsing_opts.FORCE) opts = parser.parse_args(line.split()) if opts is None: return # find the relevant ports ports = list(set(self.get_active_ports()).intersection(opts.ports)) if not ports: self.logger.log(format_text("No ports in valid state to update\n", 'bold')) return self.update(ports, opts.mult, opts.total, opts.force) # true means print time return True @__console def pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "pause", self.pause_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return # find the relevant ports ports = list(set(self.get_transmitting_ports()).intersection(opts.ports)) if not ports: self.logger.log(format_text("No ports in valid state to pause\n", 'bold')) return self.pause(ports) # true means print time return True @__console def resume_line (self, line): '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "resume", self.resume_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return # find the relevant ports ports = list(set(self.get_paused_ports()).intersection(opts.ports)) if not ports: self.logger.log(format_text("No ports in valid state to resume\n", 'bold')) return return self.resume(ports) # true means print time return True @__console def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return self.clear_stats(opts.ports) @__console def show_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: return # determine stats mask mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask) stats = self._get_formatted_stats(opts.ports, mask) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @__console def show_streams_line(self, line): '''Fetch streams statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "streams", self.show_streams_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STREAMS_MASK) opts = parser.parse_args(line.split()) if opts is None: return streams = self._get_streams(opts.ports, set(opts.streams)) if not streams: self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) else: # print stats to screen for stream_hdr, port_streams_data in streams.iteritems(): text_tables.print_table_with_header(port_streams_data.text_table, header= stream_hdr.split(":")[0] + ":", untouched_header= stream_hdr.split(":")[1]) @__console def validate_line (self, line): '''validates port(s) stream configuration\n''' parser = parsing_opts.gen_parser(self, "validate", self.validate_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return self.validate(opts.ports)