diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py')
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py | 426 |
1 files changed, 369 insertions, 57 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index cec3761f..9eefc177 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -4,12 +4,14 @@ from collections import namedtuple, OrderedDict from .trex_stl_packet_builder_scapy import STLPktBuilder from .trex_stl_streams import STLStream from .trex_stl_types import * +from .trex_stl_rx_features import ARPResolver, PingResolver from . import trex_stl_stats from .utils.constants import FLOW_CTRL_DICT_REVERSED import base64 import copy from datetime import datetime, timedelta +import threading StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata']) @@ -50,7 +52,9 @@ class Port(object): def __init__ (self, port_id, user, comm_link, session_id, info): self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None self.comm_link = comm_link self.transmit = comm_link.transmit @@ -62,7 +66,7 @@ class Port(object): self.streams = {} self.profile = None self.session_id = session_id - self.attr = {} + self.status = {} self.port_stats = trex_stl_stats.CPortStats(self) @@ -72,31 +76,31 @@ class Port(object): self.owner = '' self.last_factor_type = None - + + self.__attr = {} + self.attr_lock = threading.Lock() + # decorator to verify port is up def up(func): - def func_wrapper(*args): + def func_wrapper(*args, **kwargs): port = args[0] if not port.is_up(): return port.err("{0} - port is down".format(func.__name__)) - return func(*args) + return func(*args, **kwargs) return func_wrapper # owned def owned(func): - def func_wrapper(*args): + def func_wrapper(*args, **kwargs): port = args[0] - if not port.is_up(): - return port.err("{0} - port is down".format(func.__name__)) - if not port.is_acquired(): return port.err("{0} - port is not owned".format(func.__name__)) - return func(*args) + return func(*args, **kwargs) return func_wrapper @@ -106,14 +110,11 @@ class Port(object): def func_wrapper(*args, **kwargs): port = args[0] - if not port.is_up(): - return port.err("{0} - port is down".format(func.__name__)) - if not port.is_acquired(): return port.err("{0} - port is not owned".format(func.__name__)) if not port.is_writeable(): - return port.err("{0} - port is not in a writeable state".format(func.__name__)) + return port.err("{0} - port is active, please stop the port before executing command".format(func.__name__)) return func(*args, **kwargs) @@ -122,22 +123,22 @@ class Port(object): def err(self, msg): - return RC_ERR("port {0} : {1}\n".format(self.port_id, msg)) + return RC_ERR("port {0} : *** {1}".format(self.port_id, msg)) def ok(self, data = ""): return RC_OK(data) def get_speed_bps (self): - return (self.info['speed'] * 1000 * 1000 * 1000) + return (self.get_speed_gbps() * 1000 * 1000 * 1000) - def get_formatted_speed (self): - return "{0} Gbps".format(self.info['speed']) + def get_speed_gbps (self): + return self.__attr['speed'] def is_acquired(self): return (self.handler != None) def is_up (self): - return (self.state != self.STATE_DOWN) + return self.__attr['link']['up'] def is_active(self): return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX) @@ -165,7 +166,6 @@ class Port(object): # take the port - @up def acquire(self, force = False, sync_streams = True): params = {"port_id": self.port_id, "user": self.user, @@ -185,7 +185,6 @@ class Port(object): # sync all the streams with the server - @up def sync_streams (self): params = {"port_id": self.port_id} @@ -201,7 +200,6 @@ class Port(object): return self.ok() # release the port - @up def release(self): params = {"port_id": self.port_id, "handler": self.handler} @@ -219,7 +217,6 @@ class Port(object): - @up def sync(self): params = {"port_id": self.port_id} @@ -250,10 +247,10 @@ class Port(object): self.next_available_id = int(rc.data()['max_stream_id']) + 1 - # attributes - self.attr = rc.data()['attr'] - if 'speed' in rc.data(): - self.info['speed'] = rc.data()['speed'] // 1000 + self.status = rc.data() + + # replace the attributes in a thread safe manner + self.set_ts_attr(rc.data()['attr']) return self.ok() @@ -424,8 +421,8 @@ class Port(object): # save this for TUI self.last_factor_type = mul['type'] - - return self.ok() + + return rc # stop traffic @@ -445,8 +442,9 @@ class Port(object): return self.err(rc.err()) self.state = self.STATE_STREAMS + self.last_factor_type = None - + # timestamp for last tx self.tx_stopped_ts = datetime.now() @@ -487,6 +485,122 @@ class Port(object): return self.ok() + + @owned + def set_rx_sniffer (self, pcap_filename, limit): + + if not self.is_service_mode_on(): + return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode') + + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "capture", + "enabled": True, + "pcap_filename": pcap_filename, + "limit": limit} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + + @owned + def remove_rx_sniffer (self): + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "capture", + "enabled": False} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + @writeable + def set_l2_mode (self, dst_mac): + if not self.is_service_mode_on(): + return self.err('port service mode must be enabled for configuring L2 mode. Please enable service mode') + + params = {"handler": self.handler, + "port_id": self.port_id, + "dst_mac": dst_mac} + + rc = self.transmit("set_l2", params) + if rc.bad(): + return self.err(rc.err()) + + return self.sync() + + + @writeable + def set_l3_mode (self, src_addr, dest_addr, resolved_mac = None): + if not self.is_service_mode_on(): + return self.err('port service mode must be enabled for configuring L3 mode. Please enable service mode') + + params = {"handler": self.handler, + "port_id": self.port_id, + "src_addr": src_addr, + "dst_addr": dest_addr} + + if resolved_mac: + params["resolved_mac"] = resolved_mac + + rc = self.transmit("set_l3", params) + if rc.bad(): + return self.err(rc.err()) + + return self.sync() + + + @owned + def set_rx_queue (self, size): + + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "queue", + "enabled": True, + "size": size} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + @owned + def remove_rx_queue (self): + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "queue", + "enabled": False} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + @owned + def get_rx_queue_pkts (self): + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("get_rx_queue_pkts", params) + if rc.bad(): + return self.err(rc.err()) + + pkts = rc.data()['pkts'] + + # decode the packets from base64 to binary + for i in range(len(pkts)): + pkts[i]['binary'] = base64.b64decode(pkts[i]['binary']) + + return RC_OK(pkts) + + @owned def pause (self): @@ -568,23 +682,60 @@ class Port(object): @owned - def set_attr (self, attr_dict): + def set_attr (self, **kwargs): + + json_attr = {} + + if kwargs.get('promiscuous') is not None: + json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')} + + if kwargs.get('link_status') is not None: + json_attr['link_status'] = {'up': kwargs.get('link_status')} + + if kwargs.get('led_status') is not None: + json_attr['led_status'] = {'on': kwargs.get('led_status')} + + if kwargs.get('flow_ctrl_mode') is not None: + json_attr['flow_ctrl_mode'] = {'mode': kwargs.get('flow_ctrl_mode')} + + if kwargs.get('rx_filter_mode') is not None: + json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')} + params = {"handler": self.handler, "port_id": self.port_id, - "attr": attr_dict} + "attr": json_attr} rc = self.transmit("set_port_attr", params) if rc.bad(): return self.err(rc.err()) + # update the dictionary from the server explicitly + return self.sync() - #self.attr.update(attr_dict) - + + @owned + def set_service_mode (self, enabled): + rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw') + if not rc: + return rc + + if not enabled: + rc = self.remove_rx_queue() + if not rc: + return rc + + rc = self.remove_rx_sniffer() + if not rc: + return rc + return self.ok() + def is_service_mode_on (self): + return self.get_rx_filter_mode() == 'all' + @writeable - def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler): + def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec): params = {"handler": self.handler, "port_id": self.port_id, @@ -594,7 +745,8 @@ class Port(object): "count": count, "duration": duration, "is_dual": is_dual, - "slave_handler": slave_handler} + "slave_handler": slave_handler, + "min_ipg_usec": min_ipg_usec if min_ipg_usec else 0} rc = self.transmit("push_remote", params) if rc.bad(): @@ -607,7 +759,16 @@ class Port(object): def get_profile (self): return self.profile - + # invalidates the current ARP + def invalidate_arp (self): + dest = self.__attr['dest'] + + if dest['type'] != 'mac': + return self.set_attr(dest = dest['ipv4']) + else: + return self.ok() + + def print_profile (self, mult, duration): if not self.get_profile(): return @@ -648,24 +809,32 @@ class Port(object): format_time(exp_time_factor_sec))) print("\n") - # generate port info - def get_info (self): + # generate formatted (console friendly) port info + def get_formatted_info (self, sync = True): + + # sync the status + if sync: + self.sync() + + # get a copy of the current attribute set (safe against manipulation) + attr = self.get_ts_attr() + info = dict(self.info) info['status'] = self.get_port_state_name() - if 'link' in self.attr: - info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN' + if 'link' in attr: + info['link'] = 'UP' if attr['link']['up'] else 'DOWN' else: info['link'] = 'N/A' - if 'fc' in self.attr: - info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A') + if 'fc' in attr: + info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A') else: info['fc'] = 'N/A' - if 'promiscuous' in self.attr: - info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off" + if 'promiscuous' in attr: + info['prom'] = "on" if attr['promiscuous']['enabled'] else "off" else: info['prom'] = "N/A" @@ -692,34 +861,139 @@ class Port(object): else: info['is_virtual'] = 'N/A' + # speed + info['speed'] = self.get_speed_gbps() + + # RX filter mode + info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all' + + # src MAC and IPv4 + info['src_mac'] = attr['src_mac'] + info['src_ipv4'] = attr['src_ipv4'] + + if info['src_ipv4'] is None: + info['src_ipv4'] = '-' + + # dest + dest = attr['dest'] + if dest['type'] == 'mac': + info['dest'] = dest['mac'] + info['arp'] = '-' + + elif dest['type'] == 'ipv4': + info['dest'] = dest['ipv4'] + info['arp'] = dest['arp'] + + elif dest['type'] == 'ipv4_u': + info['dest'] = dest['ipv4'] + info['arp'] = 'unresolved' + + + # RX info + rx_info = self.status['rx_info'] + + # RX sniffer + sniffer = rx_info['sniffer'] + info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' + + + # RX queue + queue = rx_info['queue'] + info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' + + # Grat ARP + grat_arp = rx_info['grat_arp'] + if grat_arp['is_active']: + info['grat_arp'] = "every {0} seconds".format(grat_arp['interval_sec']) + else: + info['grat_arp'] = "off" + + return info def get_port_state_name(self): return self.STATES_MAP.get(self.state, "Unknown") + def get_src_addr (self): + src_mac = self.__attr['src_mac'] + src_ipv4 = self.__attr['src_ipv4'] + + return {'mac': src_mac, 'ipv4': src_ipv4} + + def get_rx_filter_mode (self): + return self.__attr['rx_filter_mode'] + + def get_dst_addr (self): + dest = self.__attr['dest'] + + if dest['type'] == 'mac': + return {'ipv4': None, 'mac': dest['mac']} + + elif dest['type'] == 'ipv4': + return {'ipv4': dest['ipv4'], 'mac': dest['arp']} + + elif dest['type'] == 'ipv4_u': + return {'ipv4': dest['ipv4'], 'mac': None} + + else: + assert(0) + + + # port is considered resolved if it's dest is either MAC or resolved IPv4 + def is_resolved (self): + return (self.get_dst_addr()['mac'] is not None) + + # return True if the port is valid for resolve (has an IPv4 address as dest) + def is_resolvable (self): + return (self.get_dst_addr()['ipv4'] is not None) + + @writeable + def arp_resolve (self, retries): + if not self.is_service_mode_on(): + return self.err('port service mode must be enabled for performing ARP resolution. Please enable service mode') + + return ARPResolver(self).resolve(retries) + + @writeable + def ping (self, ping_ipv4, pkt_size): + if not self.is_service_mode_on(): + return self.err('port service mode must be enabled for performing ping. Please enable service mode') + + return PingResolver(self, ping_ipv4, pkt_size).resolve() + + ################# stats handler ###################### def generate_port_stats(self): return self.port_stats.generate_stats() def generate_port_status(self): - info = self.get_info() + info = self.get_formatted_info() - return {"driver": info['driver'], - "description": info.get('description', 'N/A')[:18], - "HW src mac": info['hw_macaddr'], - "SW src mac": info['src_macaddr'], - "SW dst mac": info['dst_macaddr'], - "PCI Address": info['pci_addr'], - "NUMA Node": info['numa'], + return {"driver": info['driver'], + "description": info.get('description', 'N/A')[:18], + "src MAC": info['src_mac'], + "src IPv4": info['src_ipv4'], + "Destination": info['dest'], + "ARP Resolution": format_text("{0}".format(info['arp']), 'bold', 'red') if info['arp'] == 'unresolved' else info['arp'], + "PCI Address": info['pci_addr'], + "NUMA Node": info['numa'], "--": "", "---": "", - "link speed": "{speed} Gb/s".format(speed=info['speed']), + "----": "", + "-----": "", + "link speed": "%g Gb/s" % info['speed'], "port status": info['status'], "link status": info['link'], "promiscuous" : info['prom'], "flow ctrl" : info['fc'], + + "RX Filter Mode": info['rx_filter_mode'], + "RX Queueing": info['rx_queue'], + "RX sniffer": info['rx_sniffer'], + "Grat ARP": info['grat_arp'], + } def clear_stats(self): @@ -756,17 +1030,54 @@ class Port(object): return {"streams" : OrderedDict(sorted(data.items())) } - + ######## attributes are a complex type (dict) that might be manipulated through the async thread ############# + + # get in a thread safe manner a duplication of attributes + def get_ts_attr (self): + with self.attr_lock: + return dict(self.__attr) + + # set in a thread safe manner a new dict of attributes + def set_ts_attr (self, new_attr): + with self.attr_lock: + self.__attr = new_attr + + ################# events handler ###################### def async_event_port_job_done (self): # until thread is locked - order is important self.tx_stopped_ts = datetime.now() self.state = self.STATE_STREAMS + self.last_factor_type = None - def async_event_port_attr_changed (self, attr): - self.info['speed'] = attr['speed'] // 1000 - self.attr = attr + def async_event_port_attr_changed (self, new_attr): + + # get a thread safe duplicate + cur_attr = self.get_ts_attr() + + # check if anything changed + if new_attr == cur_attr: + return None + + # generate before + before = self.get_formatted_info(sync = False) + + # update + self.set_ts_attr(new_attr) + + # generate after + after = self.get_formatted_info(sync = False) + + # return diff + diff = {} + for key, new_value in after.items(): + old_value = before.get(key, 'N/A') + if new_value != old_value: + diff[key] = (old_value, new_value) + + return diff + # rest of the events are used for TUI / read only sessions def async_event_port_stopped (self): @@ -792,3 +1103,4 @@ class Port(object): def async_event_released (self): self.owner = '' + |