1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import stl_path
from trex_stl_lib.api import *
import argparse
import sys
def create_vm (ip_start, ip_end):
vm =[
# dest
STLVmFlowVar(name="dst", min_value = ip_start, max_value = ip_end, size = 4, op = "inc"),
STLVmWrFlowVar(fv_name="dst",pkt_offset= "IP.dst"),
# checksum
STLVmFixIpv4(offset = "IP")
]
return vm
# warning: might make test slow
def alter_streams(streams, remove_fcs, vlan_id):
for stream in streams:
packet = Ether(stream.pkt)
if vlan_id >= 0 and vlan_id <= 4096:
packet_l3 = packet.payload
packet = Ether() / Dot1Q(vlan = vlan_id) / packet_l3
if remove_fcs and packet.lastlayer().name == 'Padding':
packet.lastlayer().underlayer.remove_payload()
packet = STLPktBuilder(packet)
stream.fields['packet'] = packet.dump_pkt()
stream.pkt = base64.b64decode(stream.fields['packet']['binary'])
def inject_pcap (pcap_file, server, port, loop_count, ipg_usec, use_vm, remove_fcs, vlan_id):
# create client
c = STLClient(server = server)
try:
if use_vm:
vm = create_vm("10.0.0.1", "10.0.0.254")
else:
vm = None
profile = STLProfile.load_pcap(pcap_file, ipg_usec = ipg_usec, loop_count = loop_count, vm = vm)
print("Loaded pcap {0} with {1} packets...\n".format(pcap_file, len(profile)))
streams = profile.get_streams()
if remove_fcs or (vlan_id >= 0 and vlan_id <= 4096):
alter_streams(streams, remove_fcs, vlan_id)
# uncomment this for simulator run
#STLSim().run(profile.get_streams(), outfile = '/auto/srg-sce-swinfra-usr/emb/users/ybrustin/out.pcap')
c.connect()
c.reset(ports = [port])
stream_ids = c.add_streams(streams, ports = [port])
c.clear_stats()
c.start()
c.wait_on_traffic(ports = [port])
stats = c.get_stats()
opackets = stats[port]['opackets']
print("{0} packets were Tx on port {1}\n".format(opackets, port))
except STLError as e:
print(e)
sys.exit(1)
finally:
c.disconnect()
def setParserOptions():
parser = argparse.ArgumentParser(prog="stl_pcap.py")
parser.add_argument("-f", "--file", help = "pcap file to inject",
dest = "pcap",
required = True,
type = str)
parser.add_argument("-s", "--server", help = "TRex server address",
dest = "server",
default = 'localhost',
type = str)
parser.add_argument("-p", "--port", help = "port to inject on",
dest = "port",
required = True,
type = int)
parser.add_argument("-n", "--number", help = "How many times to inject pcap [default is 1, 0 means forever]",
dest = "loop_count",
default = 1,
type = int)
parser.add_argument("-i", help = "IPG in usec",
dest = "ipg",
default = 10.0,
type = float)
parser.add_argument("-x", help = "Iterate over IP dest",
dest = "use_vm",
default = False,
action = "store_true")
parser.add_argument("-r", "--remove-fcs", help = "Remove FCS if exists. Limited by Scapy capabilities.",
dest = "remove",
default = False,
action = "store_true")
parser.add_argument("-v", "--vlan", help = "Add VLAN header with this ID. Limited by Scapy capabilities.",
dest = "vlan",
default = -1,
type = int)
return parser
def main ():
parser = setParserOptions()
options = parser.parse_args()
inject_pcap(options.pcap, options.server, options.port, options.loop_count, options.ipg, options.use_vm, options.remove, options.vlan)
# inject pcap
if __name__ == '__main__':
main()
|