From 25be508c9922f558552b950fb25599826b1b8308 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 28 Nov 2016 16:47:41 +0200 Subject: self code review Signed-off-by: imarom --- .../stl/trex_stl_lib/trex_stl_client.py | 155 ++++----- .../stl/trex_stl_lib/trex_stl_jsonrpc_client.py | 14 +- .../stl/trex_stl_lib/trex_stl_port.py | 358 +++------------------ .../stl/trex_stl_lib/trex_stl_rx_features.py | 255 +++++++++++++++ 4 files changed, 371 insertions(+), 411 deletions(-) create mode 100644 scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 546298ce..c72244a6 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -12,7 +12,7 @@ from .trex_stl_types import * from .trex_stl_async_client import CTRexAsyncClient from .utils import parsing_opts, text_tables, common -from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer, is_valid_ipv4, is_valid_mac, list_remove_dup +from .utils.common import * from .utils.text_opts import * from functools import wraps @@ -1793,61 +1793,15 @@ class STLClient(object): if not rc: raise STLError(rc) - def test (self): - self.resolve() - return - - #rc = self.ports[0].resolve() - #if not rc: - # raise STLError(rc) - #return - - self.reset(ports = [0]) - - attr = self.ports[0].get_ts_attr() - src_ipv4 = attr['src_ipv4'] - src_mac = attr['src_mac'] - dest = attr['dest'] - print(src_ipv4, src_mac, dest) - #self.set_port_attr(ports = [0, 1], ipv4 = ['5.5.5.5', '6.6.6.6']) - return - - self.set_rx_queue(ports = [0], size = 1000, rxf = 'all') - - #base_pkt = Ether()/ARP()/('x'*50) - base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = '1.1.1.2',pdst = '1.1.1.1', hwsrc = 'a0:36:9f:20:e6:ce') - #base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ICMP() - - print('Sending ARP request on port 0:\n') - base_pkt.show2() - - # send some traffic - x = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) - - self.add_streams(streams = [x], ports = [0]) - self.start(ports = [0], mult = '100%') - self.wait_on_traffic(ports = [0]) - time.sleep(1) - - pkts = self.get_rx_queue_pkts(ports = [0]) - - print('got back on port 0:\n') - for pkt in pkts[0]: - Ether(pkt).show2() - - self.remove_rx_queue(ports = [1]) - self.set_port_attr(ports = [1], rxf = 'hw') - #for pkt in pkts[1]: - # Ether(pkt).show2() @__api_check(True) - def ping(self): + def ping_rpc_server(self): """ - Pings the server + Pings the RPC server :parameters: - none + None :raises: + :exc:`STLError` @@ -1865,9 +1819,9 @@ class STLClient(object): @__api_check(True) - def ip_ping (self, src_port, dst_ipv4, pkt_size = 64, count = 5): + def ping_ip (self, src_port, dst_ipv4, pkt_size = 64, count = 5): """ - Pings an IP address + Pings an IP address through a port :parameters: src_port - on which port_id to send the ICMP PING request @@ -1888,11 +1842,12 @@ class STLClient(object): with self.logger.supress(level = LoggerApi.VERBOSE_REGULAR_SYNC): self.logger.log('') for i in range(count): - rc = self.ports[src_port].ping(ping_ipv4 = dst_ipv4, pkt_size = pkt_size, retries = 0) - if rc: - self.logger.log(rc.data()) - else: + rc = self.ports[src_port].ping(ping_ipv4 = dst_ipv4, pkt_size = pkt_size) + if not rc: raise STLError(rc) + + self.logger.log(rc.data()) + if i != (count - 1): time.sleep(1) @@ -2003,18 +1958,33 @@ class STLClient(object): ports = ports if ports is not None else self.get_all_ports() ports = self._validate_port_list(ports) - # force take the port and ignore any streams on it - self.acquire(ports, force = True, sync_streams = False) - self.stop(ports, rx_delay_ms = 0) - self.remove_all_streams(ports) - self.clear_stats(ports) - self.set_port_attr(ports, - promiscuous = False, - link_up = True if restart else None, - rxf = 'hw') - self.remove_rx_sniffer(ports) - self.remove_rx_queue(ports) + + if restart: + self.logger.pre_cmd("Hard resetting ports {0}:".format(ports)) + else: + self.logger.pre_cmd("Resetting ports {0}:".format(ports)) + + + try: + with self.logger.supress(): + # force take the port and ignore any streams on it + self.acquire(ports, force = True, sync_streams = False) + self.stop(ports, rx_delay_ms = 0) + self.remove_all_streams(ports) + self.clear_stats(ports) + self.set_port_attr(ports, + promiscuous = False, + link_up = True if restart else None, + rxf = 'hw') + self.remove_rx_sniffer(ports) + self.remove_rx_queue(ports) + + except STLError as e: + self.logger.post_cmd(False) + raise e + + self.logger.post_cmd(RC_OK()) @__api_check(True) @@ -2159,11 +2129,11 @@ class STLClient(object): # verify link status ports_link_down = [port_id for port_id in ports if not self.ports[port_id].is_up()] - if not force and ports_link_down: + if ports_link_down and not force: raise STLError("Port(s) %s - link DOWN - check the connection or specify 'force'" % ports_link_down) # verify ports are stopped or force stop them - active_ports = list(set(self.get_active_ports()).intersection(ports)) + active_ports = [port_id for port_id in ports if self.ports[port_id].is_active()] if active_ports and not force: raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports)) @@ -2228,6 +2198,7 @@ class STLClient(object): validate_type('core_mask', core_mask, (int, list)) + # some sanity checks before attempting start self.__pre_start_check(ports, force) ######################### @@ -2285,7 +2256,7 @@ class STLClient(object): return ports = self._validate_port_list(ports) - + self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports)) rc = self.__stop(ports) self.logger.post_cmd(rc) @@ -2844,7 +2815,7 @@ class STLClient(object): cmn_attr_dict['led_status'] = led_on cmn_attr_dict['flow_ctrl_mode'] = flow_ctrl cmn_attr_dict['rx_filter_mode'] = rxf - + # each port starts with a set of the common attributes attr_dict = [dict(cmn_attr_dict) for _ in ports] @@ -2908,37 +2879,37 @@ class STLClient(object): if not ports: raise STLError('No ports configured with destination as IPv4') - active_ports = list(set(self.get_active_ports()).intersection(ports)) + ports = self._validate_port_list(ports) + + active_ports = list_intersect(ports, self.get_active_ports()) if active_ports: raise STLError('Port(s) {0} are active, please stop them before resolving'.format(active_ports)) - ports = self._validate_port_list(ports) - self.logger.pre_cmd("Resolving destination on port(s) {0}:".format(ports)) + self.logger.pre_cmd('Resolving destination on port(s) {0}:'.format(ports)) with self.logger.supress(): rc = self.__resolve(ports, retries) self.logger.post_cmd(rc) - if rc: - self.logger.log(rc) - if not rc: raise STLError(rc) - + + # print the ARP transaction + self.logger.log(rc) + self.logger.log('') @__api_check(True) - def set_rx_sniffer (self, ports = None, base_filename = 'rx_capture', limit = 1000, rxf = None): + def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000): """ - Sets RX sniffer for port(s) written to a PCAP file + Sets a RX sniffer for port(s) written to a PCAP file :parameters: ports - for which ports to apply a unique sniffer (each port gets a unique file) - base_filename - filename will be appended with '-' + base_filename - filename will be appended with '-', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc. limit - limit how many packets will be written - rxf - RX filter mode to use: 'hw' or 'all' :raises: + :exe:'STLError' @@ -2952,10 +2923,6 @@ class STLClient(object): if limit <= 0: raise STLError("'limit' must be a positive value") - # change RX filter mode if asked - if rxf: - self.set_port_attr(ports, rxf = rxf) - self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports)) rc = self.__set_rx_sniffer(ports, base_filename, limit) self.logger.post_cmd(rc) @@ -2987,7 +2954,7 @@ class STLClient(object): @__api_check(True) - def set_rx_queue (self, ports = None, size = 1000, rxf = None): + def set_rx_queue (self, ports = None, size = 1000): """ Sets RX queue for port(s) The queue is cyclic and will hold last 'size' packets @@ -2995,7 +2962,6 @@ class STLClient(object): :parameters: ports - for which ports to apply a queue size - size of the queue - rxf - which RX filter to use on those ports: 'hw' or 'all' :raises: + :exe:'STLError' @@ -3008,10 +2974,6 @@ class STLClient(object): if size <= 0: raise STLError("'size' must be a positive value") - # change RX filter mode if asked - if rxf: - self.set_port_attr(ports, rxf = rxf) - self.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports)) rc = self.__set_rx_queue(ports, size) self.logger.post_cmd(rc) @@ -3112,7 +3074,7 @@ class STLClient(object): # no parameters - so ping server if not line: - self.ping() + self.ping_rpc_server() return True parser = parsing_opts.gen_parser(self, @@ -3128,7 +3090,7 @@ class STLClient(object): return opts # IP ping - self.ip_ping(opts.source_port, opts.ping_ipv4, opts.pkt_size, opts.count) + self.ping_ip(opts.source_port, opts.ping_ipv4, opts.pkt_size, opts.count) @__console @@ -3722,7 +3684,10 @@ class STLClient(object): rxf = 'all' if opts.all else None - self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit, rxf) + if rxf: + self.set_port_attr(opts.ports, rxf = rxf) + + self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit) @__console diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index 1461fcec..4ebfa0be 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -172,6 +172,10 @@ class JsonRpcClient(object): self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") + except KeyboardInterrupt as e: + # must restore the socket to a sane state + self.reconnect() + raise e tries = 0 while True: @@ -184,6 +188,10 @@ class JsonRpcClient(object): self.disconnect() return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport)) + except KeyboardInterrupt as e: + # must restore the socket to a sane state + self.reconnect() + raise e return response @@ -267,12 +275,6 @@ class JsonRpcClient(object): # connect using current values return self.connect() - if not self.connected: - return RC_ERR("Not connected to server") - - # reconnect - return self.connect(self.server, self.port) - def is_connected(self): return self.connected diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index d225c31c..d0092533 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -4,17 +4,14 @@ from collections import namedtuple, OrderedDict from .trex_stl_packet_builder_scapy import STLPktBuilder from .trex_stl_streams import STLStream, STLTXSingleBurst from .trex_stl_types import * +from .trex_stl_rx_features import ARPResolver, PingResolver from . import trex_stl_stats from .utils.constants import FLOW_CTRL_DICT_REVERSED -from scapy.layers.l2 import Ether, ARP -from scapy.layers.inet import IP, ICMP - import base64 import copy from datetime import datetime, timedelta import threading -import time StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata']) @@ -70,7 +67,6 @@ class Port(object): self.profile = None self.session_id = session_id self.status = {} - self.__attr = {} self.port_stats = trex_stl_stats.CPortStats(self) @@ -81,6 +77,7 @@ class Port(object): self.owner = '' self.last_factor_type = None + self.__attr = {} self.attr_lock = threading.Lock() # decorator to verify port is up @@ -100,9 +97,6 @@ class Port(object): def func_wrapper(*args, **kwargs): port = args[0] - if not port.is_up(): - return port.err("{0} - port is down".format(func.__name__)) - if not port.is_acquired(): return port.err("{0} - port is not owned".format(func.__name__)) @@ -116,9 +110,6 @@ class Port(object): def func_wrapper(*args, **kwargs): port = args[0] - if not port.is_up(): - return port.err("{0} - port is down".format(func.__name__)) - if not port.is_acquired(): return port.err("{0} - port is not owned".format(func.__name__)) @@ -511,33 +502,36 @@ class Port(object): return self.ok() + @owned - def set_arp_resolution (self, ipv4, mac): - + def remove_rx_sniffer (self): params = {"handler": self.handler, "port_id": self.port_id, - "ipv4": ipv4, - "mac": mac} + "type": "capture", + "enabled": False} - rc = self.transmit("set_arp_resolution", params) + rc = self.transmit("set_rx_feature", params) if rc.bad(): return self.err(rc.err()) return self.ok() - - + + @owned - def remove_rx_sniffer (self): + def set_arp_resolution (self, ipv4, mac): + params = {"handler": self.handler, "port_id": self.port_id, - "type": "capture", - "enabled": False} + "ipv4": ipv4, + "mac": mac} - rc = self.transmit("set_rx_feature", params) + rc = self.transmit("set_arp_resolution", params) if rc.bad(): return self.err(rc.err()) return self.ok() + + @owned @@ -579,7 +573,7 @@ class Port(object): pkts = rc.data()['pkts'] - # decode the packets + # decode the packets from base64 to binary for i in range(len(pkts)): pkts[i]['binary'] = base64.b64decode(pkts[i]['binary']) @@ -735,7 +729,7 @@ class Port(object): dest = self.__attr['dest'] if dest['type'] != 'mac': - return self.set_attr(dest = dest['addr']) + return self.set_attr(dest = dest['ipv4']) else: return self.ok() @@ -851,42 +845,32 @@ class Port(object): info['src_ipv4'] = 'Not Configured' # dest - dest = attr.get('dest', {}) - info['dest'] = dest.get('addr', 'N/A') - + dest = attr['dest'] if dest['type'] == 'mac': - info['arp'] = '-' - else: - info['arp'] = dest.get('arp', 'N/A') - - - if info['dest'] is None: - info['dest'] = 'Not Configured' - + info['dest'] = dest['mac'] + info['arp'] = '-' - if info['arp'] is None: - info['arp'] = 'unresolved' - + elif dest['type'] == 'ipv4': + info['dest'] = dest['ipv4'] + info['arp'] = dest['arp'] + + elif dest['type'] == 'ipv4_u': + info['dest'] = dest['ipv4'] + info['arp'] = 'unresolved' # RX info rx_info = self.status['rx_info'] # RX sniffer - if 'sniffer' in rx_info: - sniffer = rx_info['sniffer'] - info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' - else: - info['rx_sniffer'] = 'N/A' - + sniffer = rx_info['sniffer'] + info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' + # RX queue - if 'queue' in rx_info: - queue = rx_info['queue'] - info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' - else: - info['rx_queue'] = 'off' - + queue = rx_info['queue'] + info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' + return info @@ -895,7 +879,7 @@ class Port(object): return self.STATES_MAP.get(self.state, "Unknown") def get_src_addr (self): - src_mac = self.__attr['src_mac'] + src_mac = self.__attr['src_mac'] src_ipv4 = self.__attr['src_ipv4'] return {'mac': src_mac, 'ipv4': src_ipv4} @@ -904,36 +888,34 @@ class Port(object): def get_dst_addr (self): dest = self.__attr['dest'] - dst_ipv4 = None - dst_mac = None - if dest['type'] == 'mac': - dst_mac = dest['addr'] + return {'ipv4': None, 'mac': dest['mac']} + elif dest['type'] == 'ipv4': - dst_ipv4 = dest['addr'] - dst_mac = dest['arp'] + return {'ipv4': dest['ipv4'], 'mac': dest['arp']} + + elif dest['type'] == 'ipv4_u': + return {'ipv4': dest['ipv4'], 'mac': None} + else: assert(0) - - - return {'ipv4': dst_ipv4, 'mac' : dst_mac} - - # return True if the port is resolved (either has MAC as dest of ARP resolution) + + # return True if the port is resolved def is_resolved (self): - return (self.get_dst_addr()['mac'] != None) + return (self.get_dst_addr()['mac'] is not None) # return True if the port is valid for resolve (has an IPv4 address as dest) def is_resolvable (self): - return (self.get_dst_addr()['ipv4'] != None) + return (self.get_dst_addr()['ipv4'] is not None) @writeable def arp_resolve (self, retries): return ARPResolver(self).resolve(retries) @writeable - def ping (self, ping_ipv4, pkt_size, retries): - return PingResolver(self, ping_ipv4, pkt_size).resolve(retries) + def ping (self, ping_ipv4, pkt_size): + return PingResolver(self, ping_ipv4, pkt_size).resolve() ################# stats handler ###################### @@ -1075,248 +1057,4 @@ class Port(object): def async_event_released (self): self.owner = '' - -# a generic abstract class for resolving using the server -class Resolver(object): - def __init__ (self, port, queue_size = 100): - self.port = port - - # code to execute before sending any request - return RC object - def pre_send (self): - raise NotImplementedError() - - # return a list of streams for request - def generate_request (self): - raise NotImplementedError() - - # return None for more packets otherwise RC object - def on_pkt_rx (self, pkt): - raise NotImplementedError() - - # return value in case of timeout - def on_timeout_err (self, retries): - raise NotImplementedError() - - ##################### API ###################### - def resolve (self, retries = 0): - - # first cleanup - rc = self.port.remove_all_streams() - if not rc: - return rc - - # call the specific class implementation - rc = self.pre_send() - if not rc: - return rc - - # start the iteration - try: - - # add the stream(s) - self.port.add_streams(self.generate_request()) - - rc = self.port.set_attr(rx_filter_mode = 'all') - if not rc: - return rc - - rc = self.port.set_rx_queue(size = 100) - if not rc: - return rc - - return self.resolve_wrapper(retries) - - finally: - # best effort restore - self.port.set_attr(rx_filter_mode = 'hw') - self.port.remove_rx_queue() - self.port.remove_all_streams() - - - # main resolve function - def resolve_wrapper (self, retries): - - # retry for 'retries' - index = 0 - while True: - rc = self.resolve_iteration() - if rc is not None: - return rc - - if index >= retries: - return self.on_timeout_err(retries) - - index += 1 - time.sleep(0.1) - - - - def resolve_iteration (self): - - mult = {'op': 'abs', 'type' : 'percentage', 'value': 100} - rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff) - if not rc: - return rc - - # save the start timestamp - self.start_ts = rc.data()['ts'] - - # block until traffic finishes - while self.port.is_active(): - time.sleep(0.01) - - return self.wait_for_rx_response() - - - def wait_for_rx_response (self): - - # we try to fetch response for 5 times - polling = 5 - - while polling > 0: - # fetch the queue - rx_pkts = self.port.get_rx_queue_pkts() - - # for each packet - examine it - for pkt in rx_pkts: - rc = self.on_pkt_rx(pkt) - if rc is not None: - return rc - - if polling == 0: - return None - - polling -= 1 - time.sleep(0.1) - - - - - -class ARPResolver(Resolver): - def __init__ (self, port_id): - super(ARPResolver, self).__init__(port_id) - - # before resolve - def pre_send (self): - dst = self.port.get_dst_addr() - src = self.port.get_src_addr() - - if dst['ipv4'] is None: - return self.port.err("Port has a non-IPv4 destination: '{0}'".format(dst['mac'])) - - if src['ipv4'] is None: - return self.port.err('Port must have an IPv4 source address configured') - - # invalidate the current ARP resolution (if exists) - return self.port.invalidate_arp() - - - # return a list of streams for request - def generate_request (self): - - dst = self.port.get_dst_addr() - src = self.port.get_src_addr() - - base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = src['ipv4'], pdst = dst['ipv4'], hwsrc = src['mac']) - s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) - - return [s1] - - - # return None in case more packets are needed else the status rc - def on_pkt_rx (self, pkt): - scapy_pkt = Ether(pkt['binary']) - if not 'ARP' in scapy_pkt: - return None - - arp = scapy_pkt['ARP'] - dst = self.port.get_dst_addr() - - # check this is the right ARP (ARP reply with the address) - if (arp.op != 2) or (arp.psrc != dst['ipv4']): - return None - - - rc = self.port.set_arp_resolution(arp.psrc, arp.hwsrc) - if not rc: - return rc - - return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc)) - - def on_timeout_err (self, retries): - return self.port.err('failed to receive ARP response ({0} retries)'.format(retries)) - - - - - #################### ping resolver #################### - -class PingResolver(Resolver): - def __init__ (self, port, ping_ip, pkt_size): - super(PingResolver, self).__init__(port) - self.ping_ip = ping_ip - self.pkt_size = pkt_size - - def pre_send (self): - - src = self.port.get_src_addr() - dst = self.port.get_dst_addr() - if src['ipv4'] is None: - return self.port.err('Ping - port does not have an IPv4 address configured') - - if dst['mac'] is None: - return self.port.err('Ping - port has an unresolved destination, cannot determine next hop MAC address') - - if self.ping_ip == src['ipv4']: - return self.port.err('Ping - cannot ping own IP') - - return self.port.ok() - - - # return a list of streams for request - def generate_request (self): - - src = self.port.get_src_addr() - dst = self.port.get_dst_addr() - - base_pkt = Ether(dst = dst['mac'])/IP(src = src['ipv4'], dst = self.ping_ip)/ICMP(type = 8) - pad = max(0, self.pkt_size - len(base_pkt)) - - base_pkt = base_pkt / (pad * 'x') - - #base_pkt.show2() - s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) - - return [s1] - - # return None for more packets otherwise RC object - def on_pkt_rx (self, pkt): - scapy_pkt = Ether(pkt['binary']) - if not 'ICMP' in scapy_pkt: - return None - - #scapy_pkt.show2() - ip = scapy_pkt['IP'] - - icmp = scapy_pkt['ICMP'] - - dt = pkt['ts'] - self.start_ts - - if icmp.type == 0: - # echo reply - return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)) - - # unreachable - elif icmp.type == 3: - return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src)) - else: - scapy_pkt.show2() - return self.port.err('unknown ICMP reply') - - - - # return the str of a timeout err - def on_timeout_err (self, retries): - return self.port.ok('Request timed out.') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py new file mode 100644 index 00000000..f9f6a499 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py @@ -0,0 +1,255 @@ + +from .trex_stl_streams import STLStream, STLTXSingleBurst +from .trex_stl_packet_builder_scapy import STLPktBuilder + +from scapy.layers.l2 import Ether, ARP +from scapy.layers.inet import IP, ICMP + +import time + +# a generic abstract class for resolving using the server +class Resolver(object): + def __init__ (self, port, queue_size = 100): + self.port = port + + # code to execute before sending any request - return RC object + def pre_send (self): + raise NotImplementedError() + + # return a list of streams for request + def generate_request (self): + raise NotImplementedError() + + # return None for more packets otherwise RC object + def on_pkt_rx (self, pkt): + raise NotImplementedError() + + # return value in case of timeout + def on_timeout_err (self, retries): + raise NotImplementedError() + + ##################### API ###################### + def resolve (self, retries = 0): + + # first cleanup + rc = self.port.remove_all_streams() + if not rc: + return rc + + # call the specific class implementation + rc = self.pre_send() + if not rc: + return rc + + # start the iteration + try: + + # add the stream(s) + self.port.add_streams(self.generate_request()) + + rc = self.port.set_attr(rx_filter_mode = 'all') + if not rc: + return rc + + rc = self.port.set_rx_queue(size = 100) + if not rc: + return rc + + return self.resolve_wrapper(retries) + + finally: + # best effort restore + self.port.set_attr(rx_filter_mode = 'hw') + self.port.remove_rx_queue() + self.port.remove_all_streams() + + + # main resolve function + def resolve_wrapper (self, retries): + + # retry for 'retries' + index = 0 + while True: + rc = self.resolve_iteration() + if rc is not None: + return rc + + if index >= retries: + return self.on_timeout_err(retries) + + index += 1 + time.sleep(0.1) + + + + def resolve_iteration (self): + + mult = {'op': 'abs', 'type' : 'percentage', 'value': 100} + rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff) + if not rc: + return rc + + # save the start timestamp + self.start_ts = rc.data()['ts'] + + # block until traffic finishes + while self.port.is_active(): + time.sleep(0.01) + + return self.wait_for_rx_response() + + + def wait_for_rx_response (self): + + # we try to fetch response for 5 times + polling = 5 + + while polling > 0: + + # fetch the queue + rx_pkts = self.port.get_rx_queue_pkts() + + # might be an error + if not rx_pkts: + return rx_pkts + + # for each packet - examine it + for pkt in rx_pkts: + rc = self.on_pkt_rx(pkt) + if rc is not None: + return rc + + if polling == 0: + return None + + polling -= 1 + time.sleep(0.1) + + + + + +class ARPResolver(Resolver): + def __init__ (self, port_id): + super(ARPResolver, self).__init__(port_id) + + # before resolve + def pre_send (self): + self.dst = self.port.get_dst_addr() + self.src = self.port.get_src_addr() + + if self.dst['ipv4'] is None: + return self.port.err("Port has a non-IPv4 destination: '{0}'".format(dst['mac'])) + + if self.src['ipv4'] is None: + return self.port.err('Port must have an IPv4 source address configured') + + # invalidate the current ARP resolution (if exists) + return self.port.invalidate_arp() + + + # return a list of streams for request + def generate_request (self): + + base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac']) + s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) + + return [s1] + + + # return None in case more packets are needed else the status rc + def on_pkt_rx (self, pkt): + scapy_pkt = Ether(pkt['binary']) + if not 'ARP' in scapy_pkt: + return None + + arp = scapy_pkt['ARP'] + + # check this is the right ARP (ARP reply with the address) + if (arp.op != 2) or (arp.psrc != self.dst['ipv4']): + return None + + + rc = self.port.set_arp_resolution(arp.psrc, arp.hwsrc) + if not rc: + return rc + + return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc)) + + + def on_timeout_err (self, retries): + return self.port.err('failed to receive ARP response ({0} retries)'.format(retries)) + + + + + #################### ping resolver #################### + +class PingResolver(Resolver): + def __init__ (self, port, ping_ip, pkt_size): + super(PingResolver, self).__init__(port) + self.ping_ip = ping_ip + self.pkt_size = pkt_size + + def pre_send (self): + + self.src = self.port.get_src_addr() + self.dst = self.port.get_dst_addr() + + if self.src['ipv4'] is None: + return self.port.err('Ping - port does not have an IPv4 address configured') + + if self.dst['mac'] is None: + return self.port.err('Ping - port has an unresolved destination, cannot determine next hop MAC address') + + if self.ping_ip == self.src['ipv4']: + return self.port.err('Ping - cannot ping own IP') + + return self.port.ok() + + + # return a list of streams for request + def generate_request (self): + + src = self.port.get_src_addr() + dst = self.port.get_dst_addr() + + base_pkt = Ether(dst = dst['mac'])/IP(src = src['ipv4'], dst = self.ping_ip)/ICMP(type = 8) + pad = max(0, self.pkt_size - len(base_pkt)) + + base_pkt = base_pkt / (pad * 'x') + + #base_pkt.show2() + s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) + + return [s1] + + # return None for more packets otherwise RC object + def on_pkt_rx (self, pkt): + scapy_pkt = Ether(pkt['binary']) + if not 'ICMP' in scapy_pkt: + return None + + #scapy_pkt.show2() + ip = scapy_pkt['IP'] + + icmp = scapy_pkt['ICMP'] + + dt = pkt['ts'] - self.start_ts + + if icmp.type == 0: + # echo reply + return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)) + + # unreachable + elif icmp.type == 3: + return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src)) + else: + scapy_pkt.show2() + return self.port.err('unknown ICMP reply') + + + + # return the str of a timeout err + def on_timeout_err (self, retries): + return self.port.ok('Request timed out.') -- cgit 1.2.3-korg