summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py
blob: 8d4e2f574b412832da9696640c6ee142845c3878 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from .trex_stl_rx_service_api import RXServiceAPI

from ..trex_stl_streams import STLStream, STLTXSingleBurst
from ..trex_stl_packet_builder_scapy import STLPktBuilder

from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, ICMP


class RXServiceICMP(RXServiceAPI):
    
    def __init__ (self, port, ping_ip, pkt_size, *a, **k):
        
        super(RXServiceICMP, self).__init__(port, layer_mode = RXServiceAPI.LAYER_MODE_L3, *a, **k)
        self.ping_ip  = ping_ip
        self.pkt_size = pkt_size
        self.result = {}

    def get_name (self):
        return "PING"
        
    def pre_execute (self):

        if not self.port.is_resolved():
            return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')

        self.layer_cfg = dict(self.port.get_layer_cfg())

        return self.port.ok()


    # return a list of streams for request
    def generate_request (self):

        base_pkt = Ether(dst = self.layer_cfg['ether']['dst'])/IP(src = self.layer_cfg['ipv4']['src'], dst = self.ping_ip)/ICMP(type = 8)
        pad = max(0, self.pkt_size - len(base_pkt))

        base_pkt = base_pkt / (pad * 'x')

        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )

        self.base_pkt = base_pkt

        return [s1]

    def on_pkt_rx (self, pkt, start_ts):
        
        scapy_pkt = Ether(pkt['binary'])
        if not 'ICMP' in scapy_pkt:
            return None

        ip = scapy_pkt['IP']
        if ip.dst != self.layer_cfg['ipv4']['src']:
            return None

        icmp = scapy_pkt['ICMP']

        dt = pkt['ts'] - start_ts

        # echo reply
        if icmp.type == 0:
            # check seq
            if icmp.seq != self.base_pkt['ICMP'].seq:
                return None
            self.result['formatted_string'] = 'Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)
            self.result['src_ip'] = ip.src
            self.result['rtt'] = dt * 1000
            self.result['ttl'] = ip.ttl
            self.result['status'] = 'success'
            return self.port.ok(self.result)

        # unreachable
        elif icmp.type == 3:
            # check seq
            if icmp.payload.seq != self.base_pkt['ICMP'].seq:
                return None
            self.result['formatted_string'] = 'Reply from {0}: Destination host unreachable'.format(icmp.src)
            self.result['status'] = 'unreachable'
            return self.port.ok(self.result)

        else:
            # skip any other types
            #scapy_pkt.show2()
            return None



    # return the str of a timeout err
    def on_timeout(self):
        self.result['formatted_string'] = 'Request timed out.'
        self.result['status'] = 'timeout'
        return self.port.ok(self.result)