summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py')
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py426
1 files changed, 369 insertions, 57 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index cec3761f..9eefc177 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -4,12 +4,14 @@ from collections import namedtuple, OrderedDict
from .trex_stl_packet_builder_scapy import STLPktBuilder
from .trex_stl_streams import STLStream
from .trex_stl_types import *
+from .trex_stl_rx_features import ARPResolver, PingResolver
from . import trex_stl_stats
from .utils.constants import FLOW_CTRL_DICT_REVERSED
import base64
import copy
from datetime import datetime, timedelta
+import threading
StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
@@ -50,7 +52,9 @@ class Port(object):
def __init__ (self, port_id, user, comm_link, session_id, info):
self.port_id = port_id
+
self.state = self.STATE_IDLE
+
self.handler = None
self.comm_link = comm_link
self.transmit = comm_link.transmit
@@ -62,7 +66,7 @@ class Port(object):
self.streams = {}
self.profile = None
self.session_id = session_id
- self.attr = {}
+ self.status = {}
self.port_stats = trex_stl_stats.CPortStats(self)
@@ -72,31 +76,31 @@ class Port(object):
self.owner = ''
self.last_factor_type = None
-
+
+ self.__attr = {}
+ self.attr_lock = threading.Lock()
+
# decorator to verify port is up
def up(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
if not port.is_up():
return port.err("{0} - port is down".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
# owned
def owned(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
- if not port.is_up():
- return port.err("{0} - port is down".format(func.__name__))
-
if not port.is_acquired():
return port.err("{0} - port is not owned".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
@@ -106,14 +110,11 @@ class Port(object):
def func_wrapper(*args, **kwargs):
port = args[0]
- if not port.is_up():
- return port.err("{0} - port is down".format(func.__name__))
-
if not port.is_acquired():
return port.err("{0} - port is not owned".format(func.__name__))
if not port.is_writeable():
- return port.err("{0} - port is not in a writeable state".format(func.__name__))
+ return port.err("{0} - port is active, please stop the port before executing command".format(func.__name__))
return func(*args, **kwargs)
@@ -122,22 +123,22 @@ class Port(object):
def err(self, msg):
- return RC_ERR("port {0} : {1}\n".format(self.port_id, msg))
+ return RC_ERR("port {0} : *** {1}".format(self.port_id, msg))
def ok(self, data = ""):
return RC_OK(data)
def get_speed_bps (self):
- return (self.info['speed'] * 1000 * 1000 * 1000)
+ return (self.get_speed_gbps() * 1000 * 1000 * 1000)
- def get_formatted_speed (self):
- return "{0} Gbps".format(self.info['speed'])
+ def get_speed_gbps (self):
+ return self.__attr['speed']
def is_acquired(self):
return (self.handler != None)
def is_up (self):
- return (self.state != self.STATE_DOWN)
+ return self.__attr['link']['up']
def is_active(self):
return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
@@ -165,7 +166,6 @@ class Port(object):
# take the port
- @up
def acquire(self, force = False, sync_streams = True):
params = {"port_id": self.port_id,
"user": self.user,
@@ -185,7 +185,6 @@ class Port(object):
# sync all the streams with the server
- @up
def sync_streams (self):
params = {"port_id": self.port_id}
@@ -201,7 +200,6 @@ class Port(object):
return self.ok()
# release the port
- @up
def release(self):
params = {"port_id": self.port_id,
"handler": self.handler}
@@ -219,7 +217,6 @@ class Port(object):
- @up
def sync(self):
params = {"port_id": self.port_id}
@@ -250,10 +247,10 @@ class Port(object):
self.next_available_id = int(rc.data()['max_stream_id']) + 1
- # attributes
- self.attr = rc.data()['attr']
- if 'speed' in rc.data():
- self.info['speed'] = rc.data()['speed'] // 1000
+ self.status = rc.data()
+
+ # replace the attributes in a thread safe manner
+ self.set_ts_attr(rc.data()['attr'])
return self.ok()
@@ -424,8 +421,8 @@ class Port(object):
# save this for TUI
self.last_factor_type = mul['type']
-
- return self.ok()
+
+ return rc
# stop traffic
@@ -445,8 +442,9 @@ class Port(object):
return self.err(rc.err())
self.state = self.STATE_STREAMS
+
self.last_factor_type = None
-
+
# timestamp for last tx
self.tx_stopped_ts = datetime.now()
@@ -487,6 +485,122 @@ class Port(object):
return self.ok()
+
+ @owned
+ def set_rx_sniffer (self, pcap_filename, limit):
+
+ if not self.is_service_mode_on():
+ return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode')
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": True,
+ "pcap_filename": pcap_filename,
+ "limit": limit}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+
+ @owned
+ def remove_rx_sniffer (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": False}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @writeable
+ def set_l2_mode (self, dst_mac):
+ if not self.is_service_mode_on():
+ return self.err('port service mode must be enabled for configuring L2 mode. Please enable service mode')
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "dst_mac": dst_mac}
+
+ rc = self.transmit("set_l2", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.sync()
+
+
+ @writeable
+ def set_l3_mode (self, src_addr, dest_addr, resolved_mac = None):
+ if not self.is_service_mode_on():
+ return self.err('port service mode must be enabled for configuring L3 mode. Please enable service mode')
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "src_addr": src_addr,
+ "dst_addr": dest_addr}
+
+ if resolved_mac:
+ params["resolved_mac"] = resolved_mac
+
+ rc = self.transmit("set_l3", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.sync()
+
+
+ @owned
+ def set_rx_queue (self, size):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": True,
+ "size": size}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def remove_rx_queue (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": False}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def get_rx_queue_pkts (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("get_rx_queue_pkts", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ pkts = rc.data()['pkts']
+
+ # decode the packets from base64 to binary
+ for i in range(len(pkts)):
+ pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
+
+ return RC_OK(pkts)
+
+
@owned
def pause (self):
@@ -568,23 +682,60 @@ class Port(object):
@owned
- def set_attr (self, attr_dict):
+ def set_attr (self, **kwargs):
+
+ json_attr = {}
+
+ if kwargs.get('promiscuous') is not None:
+ json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')}
+
+ if kwargs.get('link_status') is not None:
+ json_attr['link_status'] = {'up': kwargs.get('link_status')}
+
+ if kwargs.get('led_status') is not None:
+ json_attr['led_status'] = {'on': kwargs.get('led_status')}
+
+ if kwargs.get('flow_ctrl_mode') is not None:
+ json_attr['flow_ctrl_mode'] = {'mode': kwargs.get('flow_ctrl_mode')}
+
+ if kwargs.get('rx_filter_mode') is not None:
+ json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')}
+
params = {"handler": self.handler,
"port_id": self.port_id,
- "attr": attr_dict}
+ "attr": json_attr}
rc = self.transmit("set_port_attr", params)
if rc.bad():
return self.err(rc.err())
+ # update the dictionary from the server explicitly
+ return self.sync()
- #self.attr.update(attr_dict)
-
+
+ @owned
+ def set_service_mode (self, enabled):
+ rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw')
+ if not rc:
+ return rc
+
+ if not enabled:
+ rc = self.remove_rx_queue()
+ if not rc:
+ return rc
+
+ rc = self.remove_rx_sniffer()
+ if not rc:
+ return rc
+
return self.ok()
+ def is_service_mode_on (self):
+ return self.get_rx_filter_mode() == 'all'
+
@writeable
- def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler):
+ def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec):
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -594,7 +745,8 @@ class Port(object):
"count": count,
"duration": duration,
"is_dual": is_dual,
- "slave_handler": slave_handler}
+ "slave_handler": slave_handler,
+ "min_ipg_usec": min_ipg_usec if min_ipg_usec else 0}
rc = self.transmit("push_remote", params)
if rc.bad():
@@ -607,7 +759,16 @@ class Port(object):
def get_profile (self):
return self.profile
-
+ # invalidates the current ARP
+ def invalidate_arp (self):
+ dest = self.__attr['dest']
+
+ if dest['type'] != 'mac':
+ return self.set_attr(dest = dest['ipv4'])
+ else:
+ return self.ok()
+
+
def print_profile (self, mult, duration):
if not self.get_profile():
return
@@ -648,24 +809,32 @@ class Port(object):
format_time(exp_time_factor_sec)))
print("\n")
- # generate port info
- def get_info (self):
+ # generate formatted (console friendly) port info
+ def get_formatted_info (self, sync = True):
+
+ # sync the status
+ if sync:
+ self.sync()
+
+ # get a copy of the current attribute set (safe against manipulation)
+ attr = self.get_ts_attr()
+
info = dict(self.info)
info['status'] = self.get_port_state_name()
- if 'link' in self.attr:
- info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN'
+ if 'link' in attr:
+ info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
else:
info['link'] = 'N/A'
- if 'fc' in self.attr:
- info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A')
+ if 'fc' in attr:
+ info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
else:
info['fc'] = 'N/A'
- if 'promiscuous' in self.attr:
- info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off"
+ if 'promiscuous' in attr:
+ info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
else:
info['prom'] = "N/A"
@@ -692,34 +861,139 @@ class Port(object):
else:
info['is_virtual'] = 'N/A'
+ # speed
+ info['speed'] = self.get_speed_gbps()
+
+ # RX filter mode
+ info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
+
+ # src MAC and IPv4
+ info['src_mac'] = attr['src_mac']
+ info['src_ipv4'] = attr['src_ipv4']
+
+ if info['src_ipv4'] is None:
+ info['src_ipv4'] = '-'
+
+ # dest
+ dest = attr['dest']
+ if dest['type'] == 'mac':
+ info['dest'] = dest['mac']
+ info['arp'] = '-'
+
+ elif dest['type'] == 'ipv4':
+ info['dest'] = dest['ipv4']
+ info['arp'] = dest['arp']
+
+ elif dest['type'] == 'ipv4_u':
+ info['dest'] = dest['ipv4']
+ info['arp'] = 'unresolved'
+
+
+ # RX info
+ rx_info = self.status['rx_info']
+
+ # RX sniffer
+ sniffer = rx_info['sniffer']
+ info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
+
+
+ # RX queue
+ queue = rx_info['queue']
+ info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
+
+ # Grat ARP
+ grat_arp = rx_info['grat_arp']
+ if grat_arp['is_active']:
+ info['grat_arp'] = "every {0} seconds".format(grat_arp['interval_sec'])
+ else:
+ info['grat_arp'] = "off"
+
+
return info
def get_port_state_name(self):
return self.STATES_MAP.get(self.state, "Unknown")
+ def get_src_addr (self):
+ src_mac = self.__attr['src_mac']
+ src_ipv4 = self.__attr['src_ipv4']
+
+ return {'mac': src_mac, 'ipv4': src_ipv4}
+
+ def get_rx_filter_mode (self):
+ return self.__attr['rx_filter_mode']
+
+ def get_dst_addr (self):
+ dest = self.__attr['dest']
+
+ if dest['type'] == 'mac':
+ return {'ipv4': None, 'mac': dest['mac']}
+
+ elif dest['type'] == 'ipv4':
+ return {'ipv4': dest['ipv4'], 'mac': dest['arp']}
+
+ elif dest['type'] == 'ipv4_u':
+ return {'ipv4': dest['ipv4'], 'mac': None}
+
+ else:
+ assert(0)
+
+
+ # port is considered resolved if it's dest is either MAC or resolved IPv4
+ def is_resolved (self):
+ return (self.get_dst_addr()['mac'] is not None)
+
+ # return True if the port is valid for resolve (has an IPv4 address as dest)
+ def is_resolvable (self):
+ return (self.get_dst_addr()['ipv4'] is not None)
+
+ @writeable
+ def arp_resolve (self, retries):
+ if not self.is_service_mode_on():
+ return self.err('port service mode must be enabled for performing ARP resolution. Please enable service mode')
+
+ return ARPResolver(self).resolve(retries)
+
+ @writeable
+ def ping (self, ping_ipv4, pkt_size):
+ if not self.is_service_mode_on():
+ return self.err('port service mode must be enabled for performing ping. Please enable service mode')
+
+ return PingResolver(self, ping_ipv4, pkt_size).resolve()
+
+
################# stats handler ######################
def generate_port_stats(self):
return self.port_stats.generate_stats()
def generate_port_status(self):
- info = self.get_info()
+ info = self.get_formatted_info()
- return {"driver": info['driver'],
- "description": info.get('description', 'N/A')[:18],
- "HW src mac": info['hw_macaddr'],
- "SW src mac": info['src_macaddr'],
- "SW dst mac": info['dst_macaddr'],
- "PCI Address": info['pci_addr'],
- "NUMA Node": info['numa'],
+ return {"driver": info['driver'],
+ "description": info.get('description', 'N/A')[:18],
+ "src MAC": info['src_mac'],
+ "src IPv4": info['src_ipv4'],
+ "Destination": info['dest'],
+ "ARP Resolution": format_text("{0}".format(info['arp']), 'bold', 'red') if info['arp'] == 'unresolved' else info['arp'],
+ "PCI Address": info['pci_addr'],
+ "NUMA Node": info['numa'],
"--": "",
"---": "",
- "link speed": "{speed} Gb/s".format(speed=info['speed']),
+ "----": "",
+ "-----": "",
+ "link speed": "%g Gb/s" % info['speed'],
"port status": info['status'],
"link status": info['link'],
"promiscuous" : info['prom'],
"flow ctrl" : info['fc'],
+
+ "RX Filter Mode": info['rx_filter_mode'],
+ "RX Queueing": info['rx_queue'],
+ "RX sniffer": info['rx_sniffer'],
+ "Grat ARP": info['grat_arp'],
+
}
def clear_stats(self):
@@ -756,17 +1030,54 @@ class Port(object):
return {"streams" : OrderedDict(sorted(data.items())) }
-
+ ######## attributes are a complex type (dict) that might be manipulated through the async thread #############
+
+ # get in a thread safe manner a duplication of attributes
+ def get_ts_attr (self):
+ with self.attr_lock:
+ return dict(self.__attr)
+
+ # set in a thread safe manner a new dict of attributes
+ def set_ts_attr (self, new_attr):
+ with self.attr_lock:
+ self.__attr = new_attr
+
+
################# events handler ######################
def async_event_port_job_done (self):
# until thread is locked - order is important
self.tx_stopped_ts = datetime.now()
self.state = self.STATE_STREAMS
+
self.last_factor_type = None
- def async_event_port_attr_changed (self, attr):
- self.info['speed'] = attr['speed'] // 1000
- self.attr = attr
+ def async_event_port_attr_changed (self, new_attr):
+
+ # get a thread safe duplicate
+ cur_attr = self.get_ts_attr()
+
+ # check if anything changed
+ if new_attr == cur_attr:
+ return None
+
+ # generate before
+ before = self.get_formatted_info(sync = False)
+
+ # update
+ self.set_ts_attr(new_attr)
+
+ # generate after
+ after = self.get_formatted_info(sync = False)
+
+ # return diff
+ diff = {}
+ for key, new_value in after.items():
+ old_value = before.get(key, 'N/A')
+ if new_value != old_value:
+ diff[key] = (old_value, new_value)
+
+ return diff
+
# rest of the events are used for TUI / read only sessions
def async_event_port_stopped (self):
@@ -792,3 +1103,4 @@ class Port(object):
def async_event_released (self):
self.owner = ''
+