1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
from .trex_stl_rx_service_api import RXServiceAPI
from ..trex_stl_streams import STLStream, STLTXSingleBurst
from ..trex_stl_packet_builder_scapy import STLPktBuilder
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, ICMP
class RXServiceICMP(RXServiceAPI):
def __init__ (self, port, ping_ip, pkt_size, *a, **k):
super(RXServiceICMP, self).__init__(port, layer_mode = RXServiceAPI.LAYER_MODE_L3, *a, **k)
self.ping_ip = ping_ip
self.pkt_size = pkt_size
self.result = {}
def get_name (self):
return "PING"
def pre_execute (self):
if not self.port.is_resolved():
return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')
self.layer_cfg = dict(self.port.get_layer_cfg())
return self.port.ok()
# return a list of streams for request
def generate_request (self):
base_pkt = Ether(dst = self.layer_cfg['ether']['dst'])/IP(src = self.layer_cfg['ipv4']['src'], dst = self.ping_ip)/ICMP(type = 8)
pad = max(0, self.pkt_size - len(base_pkt))
base_pkt = base_pkt / (pad * 'x')
s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
self.base_pkt = base_pkt
return [s1]
def on_pkt_rx (self, pkt, start_ts):
scapy_pkt = Ether(pkt['binary'])
if not 'ICMP' in scapy_pkt:
return None
ip = scapy_pkt['IP']
if ip.dst != self.layer_cfg['ipv4']['src']:
return None
icmp = scapy_pkt['ICMP']
dt = pkt['ts'] - start_ts
# echo reply
if icmp.type == 0:
# check seq
if icmp.seq != self.base_pkt['ICMP'].seq:
return None
self.result['formatted_string'] = 'Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)
self.result['src_ip'] = ip.src
self.result['rtt'] = dt * 1000
self.result['ttl'] = ip.ttl
self.result['status'] = 'success'
return self.port.ok(self.result)
# unreachable
elif icmp.type == 3:
# check seq
if icmp.payload.seq != self.base_pkt['ICMP'].seq:
return None
self.result['formatted_string'] = 'Reply from {0}: Destination host unreachable'.format(icmp.src)
self.result['status'] = 'unreachable'
return self.port.ok(self.result)
else:
# skip any other types
#scapy_pkt.show2()
return None
# return the str of a timeout err
def on_timeout(self):
self.result['formatted_string'] = 'Request timed out.'
self.result['status'] = 'timeout'
return self.port.ok(self.result)
|