summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/examples/hlt_udp_simple.py
blob: bdec9999f5403d4024d87556fa48dce2d0b341a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/python

""" 
Sample HLTAPI application (for loopback)
Connect to TRex 
Send UDP packet in specific length 
Each direction has its own IP range
"""

import sys
import argparse
import stl_path
from trex_stl_lib.api import *
from trex_stl_lib.trex_stl_hltapi import *


if __name__ == "__main__":
    parser = argparse.ArgumentParser(usage=""" 
    Connect to TRex and send bidirectional continuous traffic

    examples:

     hlt_udp_simple.py --server <hostname/ip>

     hlt_udp_simple.py -s 300 -d 30 -rate_pps 5000000 --src <MAC> --dst <MAC>

     then run the simulator on the output 
       ./stl-sim -f example.yaml -o a.pcap  ==> a.pcap include the packet

    """,
    description="Example for TRex HLTAPI",
    epilog=" based on hhaim's stl_run_udp_simple example");

    parser.add_argument("--server", 
                        dest="server",
                        help='Remote trex address',
                        default="127.0.0.1",
                        type = str)

    parser.add_argument("-s", "--frame-size", 
                        dest="frame_size",
                        help='L2 frame size in bytes without FCS',
                        default=60,
                        type = int,)

    parser.add_argument('-d','--duration', 
                        dest='duration',
                        help='duration in second ',
                        default=10,
                        type = int,)

    parser.add_argument('--rate-pps', 
                        dest='rate_pps',
                        help='speed in pps',
                        default="100")

    parser.add_argument('--src', 
                        dest='src_mac',
                        help='src MAC',
                        default='00:50:56:b9:de:75')

    parser.add_argument('--dst', 
                        dest='dst_mac',
                        help='dst MAC',
                        default='00:50:56:b9:34:f3')

    args = parser.parse_args();

    hltapi = CTRexHltApi()
    print('Connecting to TRex')
    res = hltapi.connect(device = args.server, port_list = [0, 1], reset = True, break_locks = True)
    check_res(res)
    ports = res['port_handle']
    if len(ports) < 2:
        error('Should have at least 2 ports for this test')
    print('Connected, acquired ports: %s' % ports)

    print('Creating traffic')

    res = hltapi.traffic_config(mode = 'create', bidirectional = True,
                                port_handle = ports[0], port_handle2 = ports[1],
                                frame_size = args.frame_size,
                                mac_src = args.src_mac, mac_dst = args.dst_mac,
                                mac_src2 = args.dst_mac, mac_dst2 = args.src_mac,
                                l3_protocol = 'ipv4',
                                ip_src_addr = '10.0.0.1', ip_src_mode = 'increment', ip_src_count = 254,
                                ip_dst_addr = '8.0.0.1', ip_dst_mode = 'increment', ip_dst_count = 254,
                                l4_protocol = 'udp',
                                udp_dst_port = 12, udp_src_port = 1025,
                                rate_pps = args.rate_pps,
                                )
    check_res(res)

    print('Starting traffic')
    res = hltapi.traffic_control(action = 'run', port_handle = ports[:2])
    check_res(res)
    wait_with_progress(args.duration)

    print('Stopping traffic')
    res = hltapi.traffic_control(action = 'stop', port_handle = ports[:2])
    check_res(res)

    res = hltapi.traffic_stats(mode = 'aggregate', port_handle = ports[:2])
    check_res(res)
    print_brief_stats(res)
    
    res = hltapi.cleanup_session(port_handle = 'all')
    check_res(res)

    print('Done')
an class="n">TestIpIrb, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show l2patch")) self.logger.info(self.vapi.cli("show l2fib verbose")) self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id)) self.logger.info(self.vapi.cli("show ip arp")) def create_stream(self, src_ip_if, dst_ip_if, packet_sizes): pkts = [] for i in range(0, 257): remote_dst_host = choice(dst_ip_if.remote_hosts) info = self.create_packet_info( src_ip_if.sw_if_index, dst_ip_if.sw_if_index) payload = self.info_to_payload(info) p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) / IP(src=src_ip_if.remote_ip4, dst=remote_dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() size = packet_sizes[(i // 2) % len(packet_sizes)] self.extend_packet(p, size) pkts.append(p) return pkts def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if, packet_sizes): pkts = [] for i in range(0, 257): info = self.create_packet_info( src_ip_if.sw_if_index, dst_ip_if.sw_if_index) payload = self.info_to_payload(info) host = choice(src_l2_if.remote_hosts) p = (Ether(src=host.mac, dst = src_ip_if.local_mac) / IP(src=host.ip4, dst=dst_ip_if.remote_ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() size = packet_sizes[(i // 2) % len(packet_sizes)] self.extend_packet(p, size) pkts.append(p) return pkts def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture): last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index for packet in capture: ip = packet[IP] udp = packet[IP][UDP] payload_info = self.payload_to_info(str(packet[IP][UDP][Raw])) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) saved_packet = next_info.data self.assertTrue(next_info is not None) # MAC: src, dst self.assertEqual(packet.src, dst_ip_if.local_mac) self.assertEqual(packet.dst, dst_ip_if.remote_mac) # IP: src, dst host = src_ip_if.host_by_ip4(ip.src) self.assertIsNotNone(host) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(ip.dst, dst_ip_if.remote_ip4) # UDP: self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) def verify_capture(self, dst_ip_if, src_ip_if, capture): last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index for packet in capture: ip = packet[IP] udp = packet[IP][UDP] payload_info = self.payload_to_info(str(packet[IP][UDP][Raw])) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data self.assertTrue(next_info is not None) # MAC: src, dst self.assertEqual(packet.src, dst_ip_if.local_mac) host = dst_ip_if.host_by_mac(packet.dst) # IP: src, dst self.assertEqual(ip.src, src_ip_if.remote_ip4) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(ip.dst, host.ip4) # UDP: self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) def test_ip4_irb_1(self): """ IPv4 IRB test 1 Test scenario: - ip traffic from pg2 interface must ends in both pg0 and pg1 - arp entry present in loop0 interface for destination IP - no l2 entree configured, pg0 and pg1 are same """ stream = self.create_stream( self.pg2, self.loop0, self.pg_if_packet_sizes) self.pg2.add_stream(stream) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rcvd1 = self.pg0.get_capture() rcvd2 = self.pg1.get_capture() self.verify_capture(self.loop0, self.pg2, rcvd1) self.verify_capture(self.loop0, self.pg2, rcvd2) self.assertListEqual(rcvd1.res, rcvd2.res) def test_ip4_irb_2(self): """ IPv4 IRB test 2 Test scenario: - ip traffic from pg0 and pg1 ends on pg2 """ stream1 = self.create_stream_l2_to_ip( self.pg0, self.loop0, self.pg2, self.pg_if_packet_sizes) stream2 = self.create_stream_l2_to_ip( self.pg1, self.loop0, self.pg2, self.pg_if_packet_sizes) self.pg0.add_stream(stream1) self.pg1.add_stream(stream2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rcvd = self.pg2.get_capture() self.verify_capture_l2_to_ip(self.pg2, self.loop0, rcvd) self.assertEqual(len(stream1) + len(stream2), len(rcvd.res)) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)