summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/examples/stl_functional.py
blob: a74d9efe6545a1d38429b9a0e1f58d43908d9e3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import stl_path
from trex_stl_lib.api import *

"""
An example on how to use TRex for functional tests

It can be used for various tasks and can replace simple Pagent/Scapy
low rate tests
"""

# passed a connected client object and two ports
def test_dot1q (c, rx_port, tx_port):
   
    # activate service mode on RX code
    c.set_service_mode(ports = rx_port)

    # generate a simple Dot1Q
    pkt = Ether() / Dot1Q(vlan = 100) / IP()

    # start a capture
    capture = c.start_capture(rx_ports = rx_port)

    # push the Dot1Q packet to TX port... we need 'force' because this is under service mode
    print('\nSending 1 Dot1Q packet(s) on port {}'.format(tx_port))

    c.push_packets(ports = tx_port, pkts = pkt, force = True)
    c.wait_on_traffic(ports = tx_port)

    rx_pkts = []
    c.stop_capture(capture_id = capture['id'], output = rx_pkts)

    print('\nRecived {} packets on port {}:\n'.format(len(rx_pkts), rx_port))
    
    c.set_service_mode(ports = rx_port, enabled = False)

    # got back one packet
    assert(len(rx_pkts) == 1)
    rx_scapy_pkt = Ether(rx_pkts[0]['binary'])

    # it's a Dot1Q with the same VLAN
    assert('Dot1Q' in rx_scapy_pkt)
    assert(rx_scapy_pkt.vlan == 100)

    
    rx_scapy_pkt.show2()

    
# test a echo request / echo reply
def test_ping (c, tx_port, rx_port):
    
    # activate service mode on RX code
    c.set_service_mode(ports = [tx_port, rx_port])

    # fetch the config
    tx_port_attr = c.get_port_attr(port = tx_port)
    rx_port_attr = c.get_port_attr(port = rx_port)
    
    assert(tx_port_attr['layer_mode'] == 'IPv4')
    assert(rx_port_attr['layer_mode'] == 'IPv4')
    
    pkt = Ether() / IP(src = tx_port_attr['src_ipv4'], dst = rx_port_attr['src_ipv4']) / ICMP(type = 8)

    # start a capture on the sending port
    capture = c.start_capture(rx_ports = tx_port)
    
    print('\nSending ping request on port {}'.format(tx_port))

    # send the ping packet
    c.push_packets(ports = tx_port, pkts = pkt, force = True)
    c.wait_on_traffic(ports = tx_port)

    # fetch the packet
    rx_pkts = []
    c.stop_capture(capture_id = capture['id'], output = rx_pkts)

    print('\nRecived {} packets on port {}:\n'.format(len(rx_pkts), tx_port))
    
    c.set_service_mode(ports = rx_port, enabled = False)

    # got back one packet
    assert(len(rx_pkts) == 1)
    rx_scapy_pkt = Ether(rx_pkts[0]['binary'])

    # check for ICMP reply
    assert('ICMP' in rx_scapy_pkt)
    assert(rx_scapy_pkt['ICMP'].type == 0)
    
    rx_scapy_pkt.show2()
    
def main ():
    
    # create a client
    c = STLClient()

    try:
        # connect to the server
        c.connect()

        # there should be at least two ports connected
        tx_port, rx_port = stl_map_ports(c)['bi'][0]
        c.reset(ports = [tx_port, rx_port])

        # test 1
        test_dot1q(c, tx_port, rx_port)
        
        # test 2
        test_ping(c, tx_port, rx_port)
 
    except STLError as e:
        print(e)
    
    finally:
        c.stop()
        c.remove_all_captures()
        c.set_service_mode(enabled = False)
        c.disconnect()



if __name__ == '__main__':
    main()