1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
from .trex_stl_streams import *
from .trex_stl_packet_builder_scapy import *
# map ports
# will destroy all streams/data on the ports
def stl_map_ports (client, ports = None):
# by default use all ports
if ports is None:
ports = client.get_all_ports()
stl_send_3_pkts(client, ports)
tx_pkts = {}
pkts = 1
base_pkt = STLPktBuilder(pkt = Ether()/IP())
for port in ports:
tx_pkts[pkts] = port
stream = STLStream(packet = base_pkt,
mode = STLTXSingleBurst(pps = 100000, total_pkts = pkts * 3))
client.add_streams(stream, [port])
pkts *= 2
# inject
client.clear_stats()
client.start(ports, mult = "50%")
client.wait_on_traffic(ports)
stats = client.get_stats()
# cleanup
client.reset(ports = ports)
table = {'map': {}, 'bi' : [], 'unknown': []}
# actual mapping
for port in ports:
ipackets = int(round(stats[port]["ipackets"] / 3.0)) # majority out of 3 to clean random noises
table['map'][port] = None
for pkts in tx_pkts.keys():
if ( (pkts & ipackets) == pkts ):
tx_port = tx_pkts[pkts]
table['map'][port] = tx_port
unmapped = list(ports)
while len(unmapped) > 0:
port_a = unmapped.pop(0)
port_b = table['map'][port_a]
# if unknown - add to the unknown list
if port_b == None:
table['unknown'].append(port_a)
# self-loop, due to bug?
elif port_a == port_b:
continue
# bi-directional ports
elif (table['map'][port_b] == port_a):
unmapped.remove(port_b)
table['bi'].append( (port_a, port_b) )
return table
# reset ports and send 3 packets from each acquired port
def stl_send_3_pkts(client, ports = None):
base_pkt = STLPktBuilder(pkt = Ether()/IP())
stream = STLStream(packet = base_pkt,
mode = STLTXSingleBurst(pps = 100000, total_pkts = 3))
client.reset(ports)
client.add_streams(stream, ports)
client.start(ports, mult = "50%")
client.wait_on_traffic(ports)
client.reset(ports)
|