summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux/ws_main.py3
-rwxr-xr-xlinux_dpdk/ws_main.py4
-rwxr-xr-xscripts/automation/trex_control_plane/stl/console/trex_console.py14
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py555
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py505
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py20
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py6
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py12
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py82
-rwxr-xr-xscripts/trex_show_threads.py4
-rwxr-xr-xsrc/common/basic_utils.cpp44
-rwxr-xr-xsrc/common/basic_utils.h6
-rwxr-xr-xsrc/common/pcap.h5
-rw-r--r--src/flow_stat.cpp4
-rw-r--r--src/internal_api/trex_platform_api.h11
-rw-r--r--src/main_dpdk.cpp90
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp293
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h20
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp5
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp70
-rw-r--r--src/stateless/cp/trex_stateless_port.h71
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp47
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h130
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp327
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h106
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h157
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp253
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h411
-rw-r--r--src/trex_port_attr.cpp77
-rwxr-xr-xsrc/trex_port_attr.h192
30 files changed, 3010 insertions, 514 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py
index 79e26915..b835e4ce 100755
--- a/linux/ws_main.py
+++ b/linux/ws_main.py
@@ -119,6 +119,7 @@ main_src = SrcGroup(dir='src',
'utl_json.cpp',
'utl_cpuu.cpp',
'msg_manager.cpp',
+ 'trex_port_attr.cpp',
'publisher/trex_publisher.cpp',
'stateful_rx_core.cpp',
'flow_stat.cpp',
@@ -167,6 +168,8 @@ stateless_src = SrcGroup(dir='src/stateless/',
'cp/trex_dp_port_events.cpp',
'dp/trex_stateless_dp_core.cpp',
'messaging/trex_stateless_messaging.cpp',
+ 'rx/trex_stateless_rx_core.cpp',
+ 'rx/trex_stateless_rx_port_mngr.cpp'
])
# RPC code
rpc_server_src = SrcGroup(dir='src/rpc-server/',
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py
index ce877d6d..7fd73f2e 100755
--- a/linux_dpdk/ws_main.py
+++ b/linux_dpdk/ws_main.py
@@ -142,6 +142,7 @@ main_src = SrcGroup(dir='src',
'nat_check.cpp',
'nat_check_flow_table.cpp',
'msg_manager.cpp',
+ 'trex_port_attr.cpp',
'publisher/trex_publisher.cpp',
'pal/linux_dpdk/pal_utl.cpp',
'pal/linux_dpdk/mbuf.cpp',
@@ -212,7 +213,8 @@ stateless_src = SrcGroup(dir='src/stateless/',
'cp/trex_dp_port_events.cpp',
'dp/trex_stateless_dp_core.cpp',
'messaging/trex_stateless_messaging.cpp',
- 'rx/trex_stateless_rx_core.cpp'
+ 'rx/trex_stateless_rx_core.cpp',
+ 'rx/trex_stateless_rx_port_mngr.cpp'
])
# JSON package
json_src = SrcGroup(dir='external_libs/json',
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py
index b23b5f1f..f1635b97 100755
--- a/scripts/automation/trex_control_plane/stl/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py
@@ -327,6 +327,20 @@ class TRexConsole(TRexGeneralCmd):
def help_portattr (self):
self.do_portattr("-h")
+ def do_set_rx_sniffer (self, line):
+ '''Sets a port sniffer on RX channel as PCAP recorder'''
+ self.stateless_client.set_rx_sniffer_line(line)
+
+ def help_sniffer (self):
+ self.do_set_rx_sniffer("-h")
+
+ def do_resolve (self, line):
+ '''Resolve ARP for ports'''
+ self.stateless_client.resolve_line(line)
+
+ def help_sniffer (self):
+ self.do_resolve("-h")
+
@verify_connected
def do_map (self, line):
'''Maps ports topology\n'''
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 6b53e67e..cf328d2e 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -12,7 +12,7 @@ from .trex_stl_types import *
from .trex_stl_async_client import CTRexAsyncClient
from .utils import parsing_opts, text_tables, common
-from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer
+from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer, is_valid_ipv4, is_valid_mac
from .utils.text_opts import *
from functools import wraps
@@ -24,6 +24,8 @@ import re
import random
import json
import traceback
+import os.path
+
############################ logger #############################
############################ #############################
@@ -322,22 +324,25 @@ class EventsHandler(object):
# port attr changed
elif (event_type == 8):
+
port_id = int(data['port_id'])
- if data['attr'] == self.client.ports[port_id].attr:
- return # false alarm
- old_info = self.client.ports[port_id].get_info()
- self.__async_event_port_attr_changed(port_id, data['attr'])
- new_info = self.client.ports[port_id].get_info()
+
+ diff = self.__async_event_port_attr_changed(port_id, data['attr'])
+ if not diff:
+ return
+
+
ev = "port {0} attributes changed".format(port_id)
- for key, old_val in old_info.items():
- new_val = new_info[key]
- if old_val != new_val:
- ev += '\n {key}: {old} -> {new}'.format(
- key = key,
- old = old_val.lower() if type(old_val) is str else old_val,
- new = new_val.lower() if type(new_val) is str else new_val)
+ for key, (old_val, new_val) in diff.items():
+ ev += '\n {key}: {old} -> {new}'.format(
+ key = key,
+ old = old_val.lower() if type(old_val) is str else old_val,
+ new = new_val.lower() if type(new_val) is str else new_val)
+
show_event = True
-
+
+
+
# server stopped
elif (event_type == 100):
ev = "Server has stopped"
@@ -394,7 +399,7 @@ class EventsHandler(object):
def __async_event_port_attr_changed (self, port_id, attr):
if port_id in self.client.ports:
- self.client.ports[port_id].async_event_port_attr_changed(attr)
+ return self.client.ports[port_id].async_event_port_attr_changed(attr)
# add event to log
def __add_event_log (self, origin, ev_type, msg, show = False):
@@ -652,7 +657,7 @@ class STLClient(object):
return rc
-
+
def __add_streams(self, stream_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -801,17 +806,67 @@ class STLClient(object):
return rc
+ def __resolve (self, port_id_list = None, retries = 0):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].resolve(retries))
+
+ return rc
+
+
def __set_port_attr (self, port_id_list = None, attr_dict = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
+ for port_id, port_attr_dict in zip(port_id_list, attr_dict):
+ rc.add(self.ports[port_id].set_attr(**port_attr_dict))
+
+ return rc
+
+
+ def __set_rx_sniffer (self, port_id_list, base_filename, limit):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
for port_id in port_id_list:
- rc.add(self.ports[port_id].set_attr(attr_dict))
+ head, tail = os.path.splitext(base_filename)
+ filename = "{0}-{1}{2}".format(head, port_id, tail)
+ rc.add(self.ports[port_id].set_rx_sniffer(filename, limit))
return rc
+ def __remove_rx_sniffer (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_rx_sniffer())
+
+ return rc
+
+ def __set_rx_queue (self, port_id_list, size):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].set_rx_queue(size))
+
+ return rc
+
+ def __remove_rx_queue (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_rx_queue())
+
+ return rc
+
# connect to server
def __connect(self):
@@ -1727,6 +1782,54 @@ class STLClient(object):
if not rc:
raise STLError(rc)
+ def test (self):
+ self.resolve()
+ return
+
+ #rc = self.ports[0].resolve()
+ #if not rc:
+ # raise STLError(rc)
+ #return
+
+ self.reset(ports = [0])
+
+ attr = self.ports[0].get_ts_attr()
+ src_ipv4 = attr['src_ipv4']
+ src_mac = attr['src_mac']
+ dest = attr['dest']
+ print(src_ipv4, src_mac, dest)
+ #self.set_port_attr(ports = [0, 1], ipv4 = ['5.5.5.5', '6.6.6.6'])
+ return
+
+ self.set_rx_queue(ports = [0], size = 1000, rxf = 'all')
+
+ #base_pkt = Ether()/ARP()/('x'*50)
+ base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = '1.1.1.2',pdst = '1.1.1.1', hwsrc = 'a0:36:9f:20:e6:ce')
+ #base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ICMP()
+
+ print('Sending ARP request on port 0:\n')
+ base_pkt.show2()
+
+ # send some traffic
+ x = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+
+ self.add_streams(streams = [x], ports = [0])
+ self.start(ports = [0], mult = '100%')
+ self.wait_on_traffic(ports = [0])
+ time.sleep(1)
+
+ pkts = self.get_rx_queue_pkts(ports = [0])
+
+ print('got back on port 0:\n')
+ for pkt in pkts[0]:
+ Ether(pkt).show2()
+
+ self.remove_rx_queue(ports = [1])
+ self.set_port_attr(ports = [1], rxf = 'hw')
+ #for pkt in pkts[1]:
+ # Ether(pkt).show2()
+
+
@__api_check(True)
def ping(self):
"""
@@ -1740,7 +1843,9 @@ class STLClient(object):
+ :exc:`STLError`
"""
-
+ self.resolve()
+ return
+
self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
self.connection_info['sync_port']))
rc = self._transmit("ping", api_class = None)
@@ -1750,6 +1855,7 @@ class STLClient(object):
if not rc:
raise STLError(rc)
+
@__api_check(True)
def server_shutdown (self, force = False):
"""
@@ -1835,7 +1941,7 @@ class STLClient(object):
@__api_check(True)
- def reset(self, ports = None):
+ def reset(self, ports = None, restart = False):
"""
Force acquire ports, stop the traffic, remove all streams and clear stats
@@ -1843,7 +1949,9 @@ class STLClient(object):
ports : list
Ports on which to execute the command
-
+ restart: bool
+ Restart the NICs (link down / up)
+
:raises:
+ :exc:`STLError`
@@ -1858,7 +1966,14 @@ class STLClient(object):
self.stop(ports, rx_delay_ms = 0)
self.remove_all_streams(ports)
self.clear_stats(ports)
+ self.set_port_attr(ports,
+ promiscuous = False,
+ link_up = True if restart else None,
+ rxf = 'hw')
+ self.remove_rx_sniffer(ports)
+ self.remove_rx_queue(ports)
+
@__api_check(True)
def remove_all_streams (self, ports = None):
@@ -1997,7 +2112,25 @@ class STLClient(object):
raise STLError(rc)
-
+ # common checks for start API
+ def __pre_start_check (self, ports, force):
+
+ # verify link status
+ ports_link_down = [port_id for port_id in ports if not self.ports[port_id].is_up()]
+ if not force and ports_link_down:
+ raise STLError("Port(s) %s - link DOWN - check the connection or specify 'force'" % ports_link_down)
+
+ # verify ports are stopped or force stop them
+ active_ports = list(set(self.get_active_ports()).intersection(ports))
+ if active_ports and not force:
+ raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
+
+ # warn if ports are not resolved
+ unresolved_ports = [port_id for port_id in ports if not self.ports[port_id].is_resolved()]
+ if unresolved_ports and not force:
+ raise STLError("Port(s) {0} are unresolved - please resolve them or specify 'force'".format(unresolved_ports))
+
+
@__api_check(True)
def start (self,
ports = None,
@@ -2052,11 +2185,9 @@ class STLClient(object):
validate_type('total', total, bool)
validate_type('core_mask', core_mask, (int, list))
- # verify link status
- ports_link_down = [port_id for port_id in ports if self.ports[port_id].attr.get('link',{}).get('up') == False]
- if not force and ports_link_down:
- raise STLError("Port(s) %s - link DOWN - check the connection or specify 'force'" % ports_link_down)
-
+
+ self.__pre_start_check(ports, force)
+
#########################
# decode core mask argument
decoded_mask = self.__decode_core_mask(ports, core_mask)
@@ -2070,17 +2201,12 @@ class STLClient(object):
raise STLArgumentError('mult', mult)
- # verify ports are stopped or force stop them
+ # stop active ports if needed
active_ports = list(set(self.get_active_ports()).intersection(ports))
- if active_ports:
- if not force:
- raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
- else:
- rc = self.stop(active_ports)
- if not rc:
- raise STLError(rc)
-
+ if active_ports and force:
+ self.stop(active_ports)
+
# start traffic
self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
rc = self.__start(mult_obj, duration, ports, force, decoded_mask)
@@ -2629,16 +2755,30 @@ class STLClient(object):
@__api_check(True)
- def set_port_attr (self, ports = None, promiscuous = None, link_up = None, led_on = None, flow_ctrl = None):
+ def set_port_attr (self,
+ ports = None,
+ promiscuous = None,
+ link_up = None,
+ led_on = None,
+ flow_ctrl = None,
+ rxf = None,
+ ipv4 = None,
+ dest = None,
+ resolve = True):
"""
Set port attributes
:parameters:
- promiscuous - True or False
- link_up - True or False
- led_on - True or False
- flow_ctrl - 0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
-
+ promiscuous - True or False
+ link_up - True or False
+ led_on - True or False
+ flow_ctrl - 0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
+ rxf - 'hw' for hardware rules matching packets only or 'all' all packets
+ ipv4 - configure IPv4 address for port(s). for multiple ports should be a list
+ of IPv4 addresses in the same length of the ports array
+ dest - configure destination address for port(s) in either IPv4 or MAC format.
+ for multiple ports should be a list in the same length of the ports array
+ resolve - if true, in case a destination address is configured as IPv4 try to resolve it
:raises:
+ :exe:'STLError'
@@ -2652,29 +2792,233 @@ class STLClient(object):
validate_type('link_up', link_up, (bool, type(None)))
validate_type('led_on', led_on, (bool, type(None)))
validate_type('flow_ctrl', flow_ctrl, (int, type(None)))
-
- # build attributes
- attr_dict = {}
- if promiscuous is not None:
- attr_dict['promiscuous'] = {'enabled': promiscuous}
- if link_up is not None:
- attr_dict['link_status'] = {'up': link_up}
- if led_on is not None:
- attr_dict['led_status'] = {'on': led_on}
- if flow_ctrl is not None:
- attr_dict['flow_ctrl_mode'] = {'mode': flow_ctrl}
+ validate_choice('rxf', rxf, ['hw', 'all'])
+
+ # common attributes for all ports
+ cmn_attr_dict = {}
+
+ cmn_attr_dict['promiscuous'] = promiscuous
+ cmn_attr_dict['link_status'] = link_up
+ cmn_attr_dict['led_status'] = led_on
+ cmn_attr_dict['flow_ctrl_mode'] = flow_ctrl
+ cmn_attr_dict['rx_filter_mode'] = rxf
+
+ # each port starts with a set of the common attributes
+ attr_dict = [dict(cmn_attr_dict) for _ in ports]
+
+ # default value for IPv4 / dest is none for all ports
+ if ipv4 is None:
+ ipv4 = [None] * len(ports)
+ if dest is None:
+ dest = [None] * len(ports)
+
+ ipv4 = listify(ipv4)
+ if len(ipv4) != len(ports):
+ raise STLError("'ipv4' must be a list in the same length of ports - 'ports': {0}, 'ip': {1}".format(ports, ipv4))
+
+ dest = listify(dest)
+ if len(dest) != len(ports):
+ raise STLError("'dest' must be a list in the same length of ports - 'ports': {0}, 'dest': {1}".format(ports, dest))
+
+ # update each port attribute with ipv4
+ for addr, port_attr in zip(ipv4, attr_dict):
+ port_attr['ipv4'] = addr
+
+ # update each port attribute with dest
+ for addr, port_attr in zip(dest, attr_dict):
+ port_attr['dest'] = addr
+
- # no attributes to set
- if not attr_dict:
- return
-
self.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports))
rc = self.__set_port_attr(ports, attr_dict)
self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ # automatic resolve
+ if resolve:
+ # find any port with a dest configured as IPv4
+ resolve_ports = [port_id for port_id, port_dest in zip(ports, dest) if is_valid_ipv4(port_dest)]
+
+ if resolve_ports:
+ self.resolve(ports = resolve_ports)
+
+
+
+
+ @__api_check(True)
+ def resolve (self, ports = None, retries = 0):
+ """
+ Resolves ports (ARP resolution)
+
+ :parameters:
+ ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ retires - how many times to retry on each port (intervals of 100 milliseconds)
+ :raises:
+ + :exe:'STLError'
+
+ """
+ # by default - resolve all the ports that are configured with IPv4 dest
+ if ports is None:
+ ports = [port_id for port_id in self.get_acquired_ports() if self.ports[port_id].get_dest()['type'] == 'ipv4']
+ if not ports:
+ raise STLError('No ports configured with destination as IPv4')
+
+ active_ports = list(set(self.get_active_ports()).intersection(ports))
+ if active_ports:
+ raise STLError('Port(s) {0} are active'.format(active_ports))
+
+ ports = self._validate_port_list(ports)
+
+ self.logger.pre_cmd("Resolving destination on port(s) {0}:".format(ports))
+ with self.logger.supress():
+ rc = self.__resolve(ports, retries)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+
+
+ @__api_check(True)
+ def set_rx_sniffer (self, ports = None, base_filename = 'rx_capture', limit = 1000, rxf = None):
+ """
+ Sets RX sniffer for port(s) written to a PCAP file
+
+ :parameters:
+ ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ base_filename - filename will be appended with '-<port_number>'
+ limit - limit how many packets will be written
+ rxf - RX filter mode to use: 'hw' or 'all'
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ # check arguments
+ validate_type('base_filename', base_filename, basestring)
+ validate_type('limit', limit, (int))
+ if limit <= 0:
+ raise STLError("'limit' must be a positive value")
+
+ # change RX filter mode if asked
+ if rxf:
+ self.set_port_attr(ports, rxf = rxf)
+
+ self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports))
+ rc = self.__set_rx_sniffer(ports, base_filename, limit)
+ self.logger.post_cmd(rc)
+
+
+ if not rc:
+ raise STLError(rc)
+
+
+
+ @__api_check(True)
+ def remove_rx_sniffer (self, ports = None):
+ """
+ Removes RX sniffer from port(s)
+
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports))
+ rc = self.__remove_rx_sniffer(ports)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ @__api_check(True)
+ def set_rx_queue (self, ports = None, size = 1000, rxf = None):
+ """
+ Sets RX queue for port(s)
+ The queue is cyclic and will hold last 'size' packets
+
+ :parameters:
+ ports - for which ports to apply a queue
+ size - size of the queue
+ rxf - which RX filter to use on those ports: 'hw' or 'all'
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ # check arguments
+ validate_type('size', size, (int))
+ if size <= 0:
+ raise STLError("'size' must be a positive value")
+
+ # change RX filter mode if asked
+ if rxf:
+ self.set_port_attr(ports, rxf = rxf)
+
+ self.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports))
+ rc = self.__set_rx_queue(ports, size)
+ self.logger.post_cmd(rc)
+
+
+ if not rc:
+ raise STLError(rc)
+
+
+
+ @__api_check(True)
+ def remove_rx_queue (self, ports = None):
+ """
+ Removes RX queue from port(s)
+
+ :parameters:
+ ports - for which ports to remove the RX queue
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ self.logger.pre_cmd("Removing RX queue on port(s) {0}:".format(ports))
+ rc = self.__remove_rx_queue(ports)
+ self.logger.post_cmd(rc)
if not rc:
raise STLError(rc)
+
+
+ @__api_check(True)
+ def get_rx_queue_pkts (self, ports = None):
+ """
+ Returns any packets queued on the RX side by the server
+ return value is a dictonary per port
+
+ :parameters:
+ ports - for which ports to fetch
+ """
+
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ result = {}
+ for port in ports:
+ result[port] = self.ports[port].get_rx_queue_pkts()
+
+ return result
+
+
def clear_events (self):
"""
Clear all events
@@ -2849,13 +3193,14 @@ class STLClient(object):
parser = parsing_opts.gen_parser(self,
"reset",
self.reset_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.PORT_RESTART)
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
if not opts:
return opts
- self.reset(ports = opts.ports)
+ self.reset(ports = opts.ports, restart = opts.restart)
return RC_OK()
@@ -2891,14 +3236,19 @@ class STLClient(object):
# just for sanity - will be checked on the API as well
self.__decode_core_mask(opts.ports, core_mask)
+ # for better use experience - check this first
+ try:
+ self.__pre_start_check(opts.ports, opts.force)
+ except STLError as e:
+ msg = e.brief()
+ self.logger.log(format_text(msg, 'bold'))
+ return RC_ERR(msg)
+
+
+ # stop ports if needed
active_ports = list_intersect(self.get_active_ports(), opts.ports)
- if active_ports:
- if not opts.force:
- msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
- self.logger.log(format_text(msg, 'bold'))
- return RC_ERR(msg)
- else:
- self.stop(active_ports)
+ if active_ports and opts.force:
+ self.stop(active_ports)
# process tunables
@@ -3240,7 +3590,7 @@ class STLClient(object):
'''Sets port attributes '''
parser = parsing_opts.gen_parser(self,
- "port_attr",
+ "portattr",
self.set_port_attr_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.PROMISCUOUS,
@@ -3248,24 +3598,27 @@ class STLClient(object):
parsing_opts.LED_STATUS,
parsing_opts.FLOW_CTRL,
parsing_opts.SUPPORTED,
+ parsing_opts.RX_FILTER_MODE,
+ parsing_opts.IPV4,
+ parsing_opts.DEST
)
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
if not opts:
return opts
- opts.prom = parsing_opts.ON_OFF_DICT.get(opts.prom)
- opts.link = parsing_opts.UP_DOWN_DICT.get(opts.link)
- opts.led = parsing_opts.ON_OFF_DICT.get(opts.led)
- opts.flow_ctrl = parsing_opts.FLOW_CTRL_DICT.get(opts.flow_ctrl)
+ opts.prom = parsing_opts.ON_OFF_DICT.get(opts.prom)
+ opts.link = parsing_opts.UP_DOWN_DICT.get(opts.link)
+ opts.led = parsing_opts.ON_OFF_DICT.get(opts.led)
+ opts.flow_ctrl = parsing_opts.FLOW_CTRL_DICT.get(opts.flow_ctrl)
# if no attributes - fall back to printing the status
- if not filter(lambda x:x is not None, [opts.prom, opts.link, opts.led, opts.flow_ctrl, opts.supp]):
+ if not list(filter(lambda x:x is not None, [opts.prom, opts.link, opts.led, opts.flow_ctrl, opts.supp, opts.rx_filter_mode, opts.ipv4, opts.dest])):
self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in opts.ports)))
return
if opts.supp:
- info = self.ports[0].get_info() # assume for now all ports are same
+ info = self.ports[0].get_formatted_info() # assume for now all ports are same
print('')
print('Supported attributes for current NICs:')
print(' Promiscuous: yes')
@@ -3274,15 +3627,66 @@ class STLClient(object):
print(' Flow control: %s' % info['fc_supported'])
print('')
else:
- return self.set_port_attr(opts.ports, opts.prom, opts.link, opts.led, opts.flow_ctrl)
+ self.set_port_attr(opts.ports,
+ opts.prom,
+ opts.link,
+ opts.led,
+ opts.flow_ctrl,
+ opts.rx_filter_mode,
+ opts.ipv4,
+ opts.dest)
+
+
+
+
+ @__console
+ def set_rx_sniffer_line (self, line):
+ '''Sets a port sniffer on RX channel in form of a PCAP file'''
+
+ parser = parsing_opts.gen_parser(self,
+ "set_rx_sniffer",
+ self.set_rx_sniffer_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.OUTPUT_FILENAME,
+ parsing_opts.LIMIT,
+ parsing_opts.ALL_FILES)
+
+ opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
+ if not opts:
+ return opts
+
+ rxf = 'all' if opts.all else None
+ self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit, rxf)
+
+ @__console
+ def resolve_line (self, line):
+ '''Performs a port ARP resolution'''
+
+ parser = parsing_opts.gen_parser(self,
+ "resolve",
+ self.resolve_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.RETRIES)
+
+ resolvable_ports = [port_id for port_id in self.get_acquired_ports() if self.ports[port_id].get_dest()['type'] == 'ipv4']
+
+ opts = parser.parse_args(line.split(), default_ports = resolvable_ports, verify_acquired = True)
+ if not opts:
+ return opts
+
+
+ self.resolve(ports = opts.ports, retries = opts.retries)
+
+
+
@__console
def show_profile_line (self, line):
'''Shows profile information'''
parser = parsing_opts.gen_parser(self,
- "port",
+ "profile",
self.show_profile_line.__doc__,
parsing_opts.FILE_PATH)
@@ -3378,3 +3782,6 @@ class STLClient(object):
else:
return "{0} {1}>".format(prefix, self.get_acquired_ports())
+
+
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index cec3761f..f658b7fa 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -2,14 +2,18 @@
from collections import namedtuple, OrderedDict
from .trex_stl_packet_builder_scapy import STLPktBuilder
-from .trex_stl_streams import STLStream
+from .trex_stl_streams import STLStream, STLTXSingleBurst
from .trex_stl_types import *
from . import trex_stl_stats
from .utils.constants import FLOW_CTRL_DICT_REVERSED
+from scapy.layers.l2 import Ether, ARP
+
import base64
import copy
from datetime import datetime, timedelta
+import threading
+import time
StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
@@ -50,7 +54,9 @@ class Port(object):
def __init__ (self, port_id, user, comm_link, session_id, info):
self.port_id = port_id
+
self.state = self.STATE_IDLE
+
self.handler = None
self.comm_link = comm_link
self.transmit = comm_link.transmit
@@ -62,7 +68,8 @@ class Port(object):
self.streams = {}
self.profile = None
self.session_id = session_id
- self.attr = {}
+ self.status = {}
+ self.__attr = {}
self.port_stats = trex_stl_stats.CPortStats(self)
@@ -73,21 +80,23 @@ class Port(object):
self.owner = ''
self.last_factor_type = None
+ self.attr_lock = threading.Lock()
+
# decorator to verify port is up
def up(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
if not port.is_up():
return port.err("{0} - port is down".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
# owned
def owned(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
if not port.is_up():
@@ -96,7 +105,7 @@ class Port(object):
if not port.is_acquired():
return port.err("{0} - port is not owned".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
@@ -128,16 +137,16 @@ class Port(object):
return RC_OK(data)
def get_speed_bps (self):
- return (self.info['speed'] * 1000 * 1000 * 1000)
+ return (self.__attr['speed'] * 1000 * 1000 * 1000)
def get_formatted_speed (self):
- return "{0} Gbps".format(self.info['speed'])
+ return "%g Gb/s" % (self.__attr['speed'] / 1000)
def is_acquired(self):
return (self.handler != None)
def is_up (self):
- return (self.state != self.STATE_DOWN)
+ return self.__attr['link']['up']
def is_active(self):
return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
@@ -165,7 +174,6 @@ class Port(object):
# take the port
- @up
def acquire(self, force = False, sync_streams = True):
params = {"port_id": self.port_id,
"user": self.user,
@@ -185,7 +193,6 @@ class Port(object):
# sync all the streams with the server
- @up
def sync_streams (self):
params = {"port_id": self.port_id}
@@ -201,7 +208,6 @@ class Port(object):
return self.ok()
# release the port
- @up
def release(self):
params = {"port_id": self.port_id,
"handler": self.handler}
@@ -219,7 +225,6 @@ class Port(object):
- @up
def sync(self):
params = {"port_id": self.port_id}
@@ -250,10 +255,10 @@ class Port(object):
self.next_available_id = int(rc.data()['max_stream_id']) + 1
- # attributes
- self.attr = rc.data()['attr']
- if 'speed' in rc.data():
- self.info['speed'] = rc.data()['speed'] // 1000
+ self.status = rc.data()
+
+ # replace the attributes in a thread safe manner
+ self.set_ts_attr(rc.data()['attr'])
return self.ok()
@@ -487,6 +492,98 @@ class Port(object):
return self.ok()
+
+ @owned
+ def set_rx_sniffer (self, pcap_filename, limit):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": True,
+ "pcap_filename": pcap_filename,
+ "limit": limit}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def set_arp_resolution (self, ipv4, mac):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "ipv4": ipv4,
+ "mac": mac}
+
+ rc = self.transmit("set_arp_resolution", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+
+ @owned
+ def remove_rx_sniffer (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": False}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+
+ @owned
+ def set_rx_queue (self, size):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": True,
+ "size": size}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def remove_rx_queue (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": False}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def get_rx_queue_pkts (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("get_rx_queue_pkts", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ pkts = rc.data()['pkts']
+
+ # decode the packets
+ for i in range(len(pkts)):
+ pkts[i] = base64.b64decode(pkts[i])
+
+ return pkts
+
+
@owned
def pause (self):
@@ -568,21 +665,44 @@ class Port(object):
@owned
- def set_attr (self, attr_dict):
+ def set_attr (self, **kwargs):
+
+ json_attr = {}
+
+ if kwargs.get('promiscuous') is not None:
+ json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')}
+
+ if kwargs.get('link_status') is not None:
+ json_attr['link_status'] = {'up': kwargs.get('link_status')}
+
+ if kwargs.get('led_status') is not None:
+ json_attr['led_status'] = {'on': kwargs.get('led_status')}
+
+ if kwargs.get('flow_ctrl_mode') is not None:
+ json_attr['flow_ctrl_mode'] = {'on': kwargs.get('flow_ctrl_mode')}
+
+ if kwargs.get('rx_filter_mode') is not None:
+ json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')}
+
+ if kwargs.get('ipv4') is not None:
+ json_attr['ipv4'] = {'addr': kwargs.get('ipv4')}
+
+ if kwargs.get('dest') is not None:
+ json_attr['dest'] = {'addr': kwargs.get('dest')}
+
params = {"handler": self.handler,
"port_id": self.port_id,
- "attr": attr_dict}
+ "attr": json_attr}
rc = self.transmit("set_port_attr", params)
if rc.bad():
return self.err(rc.err())
+ # update the dictionary from the server explicitly
+ return self.sync()
- #self.attr.update(attr_dict)
-
- return self.ok()
-
+
@writeable
def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler):
@@ -607,7 +727,35 @@ class Port(object):
def get_profile (self):
return self.profile
+ # invalidates the current ARP
+ def invalidate_arp (self):
+ dest = self.__attr['dest']
+
+ if dest['type'] != 'mac':
+ return self.set_attr(dest = dest['addr'])
+ else:
+ return self.ok()
+
+
+ @writeable
+ def add_arp_request (self):
+ ipv4 = self.__attr['src_ipv4']
+ dest = self.__attr['dest']
+ mac = self.__attr['src_mac']
+
+ if ipv4 == 'none':
+ return self.err('port must have a configured IPv4')
+
+ if dest['type'] == 'mac':
+ return self.err('port must have an IPv4 destination')
+
+
+ base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = ipv4, pdst = dest['addr'], hwsrc = mac)
+ s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+ return self.add_streams([s1])
+
+
def print_profile (self, mult, duration):
if not self.get_profile():
return
@@ -648,24 +796,32 @@ class Port(object):
format_time(exp_time_factor_sec)))
print("\n")
- # generate port info
- def get_info (self):
+ # generate formatted (console friendly) port info
+ def get_formatted_info (self, sync = True):
+
+ # sync the status
+ if sync:
+ self.sync()
+
+ # get a copy of the current attribute set (safe against manipulation)
+ attr = self.get_ts_attr()
+
info = dict(self.info)
info['status'] = self.get_port_state_name()
- if 'link' in self.attr:
- info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN'
+ if 'link' in attr:
+ info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
else:
info['link'] = 'N/A'
- if 'fc' in self.attr:
- info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A')
+ if 'fc' in attr:
+ info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
else:
info['fc'] = 'N/A'
- if 'promiscuous' in self.attr:
- info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off"
+ if 'promiscuous' in attr:
+ info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
else:
info['prom'] = "N/A"
@@ -692,34 +848,129 @@ class Port(object):
else:
info['is_virtual'] = 'N/A'
+ if 'speed' in attr:
+ info['speed'] = self.get_formatted_speed()
+ else:
+ info['speed'] = 'N/A'
+
+
+ if 'rx_filter_mode' in attr:
+ info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
+ else:
+ info['rx_filter_mode'] = 'N/A'
+
+ # src MAC and IPv4
+ info['src_mac'] = attr.get('src_mac', 'N/A')
+
+ info['src_ipv4'] = attr.get('src_ipv4', 'N/A')
+ if info['src_ipv4'] == 'none':
+ info['src_ipv4'] = 'Not Configured'
+
+ # dest
+ dest = attr.get('dest', {})
+ info['dest'] = dest.get('addr', 'N/A')
+
+ if dest['type'] == 'mac':
+ info['arp'] = '-'
+ else:
+ info['arp'] = dest.get('arp', 'N/A')
+
+
+ if info['dest'] == 'none':
+ info['dest'] = 'Not Configured'
+
+
+ if info['arp'] == 'none':
+ info['arp'] = 'unresolved'
+
+
+
+ # RX info
+ rx_info = self.status['rx_info']
+
+ # RX sniffer
+ if 'sniffer' in rx_info:
+ sniffer = rx_info['sniffer']
+ info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
+ else:
+ info['rx_sniffer'] = 'N/A'
+
+
+ # RX queue
+ if 'queue' in rx_info:
+ queue = rx_info['queue']
+ info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
+ else:
+ info['rx_queue'] = 'off'
+
+
return info
def get_port_state_name(self):
return self.STATES_MAP.get(self.state, "Unknown")
+ def get_src_ipv4 (self):
+ src_ipv4 = self.__attr['src_ipv4']
+ if src_ipv4 == 'none':
+ src_ipv4 = None
+
+ return src_ipv4
+
+ def get_dest (self):
+ return self.__attr['dest']
+
+ def get_src_mac (self):
+ return self.__attr['src_mac']
+
+
+ def is_resolved (self):
+ dest = self.get_dest()
+
+ if dest['type'] == 'mac':
+ return True
+ elif dest['type'] == 'ipv4':
+ return dest['arp'] != 'none'
+ else:
+ # unsupported type
+ assert(0)
+
+
+ def resolve (self, retries):
+ return ARPResolver(self).resolve(retries)
+
+
+
################# stats handler ######################
def generate_port_stats(self):
return self.port_stats.generate_stats()
def generate_port_status(self):
- info = self.get_info()
+ info = self.get_formatted_info()
- return {"driver": info['driver'],
- "description": info.get('description', 'N/A')[:18],
- "HW src mac": info['hw_macaddr'],
- "SW src mac": info['src_macaddr'],
- "SW dst mac": info['dst_macaddr'],
- "PCI Address": info['pci_addr'],
- "NUMA Node": info['numa'],
+ return {"driver": info['driver'],
+ "description": info.get('description', 'N/A')[:18],
+ "src MAC": info['src_mac'],
+ "src IPv4": info['src_ipv4'],
+ "Destination": info['dest'],
+ "ARP Resolution": info['arp'],
+ "PCI Address": info['pci_addr'],
+ "NUMA Node": info['numa'],
"--": "",
"---": "",
- "link speed": "{speed} Gb/s".format(speed=info['speed']),
+ "----": "",
+ "-----": "",
+ "link speed": info['speed'],
"port status": info['status'],
"link status": info['link'],
"promiscuous" : info['prom'],
"flow ctrl" : info['fc'],
+
+ "RX Filter Mode": info['rx_filter_mode'],
+ "RX Queueing": info['rx_queue'],
+ "RX sniffer": info['rx_sniffer'],
+
}
def clear_stats(self):
@@ -756,7 +1007,19 @@ class Port(object):
return {"streams" : OrderedDict(sorted(data.items())) }
-
+ ######## attributes are a complex type (dict) that might be manipulated through the async thread #############
+
+ # get in a thread safe manner a duplication of attributes
+ def get_ts_attr (self):
+ with self.attr_lock:
+ return dict(self.__attr)
+
+ # set in a thread safe manner a new dict of attributes
+ def set_ts_attr (self, new_attr):
+ with self.attr_lock:
+ self.__attr = new_attr
+
+
################# events handler ######################
def async_event_port_job_done (self):
# until thread is locked - order is important
@@ -764,9 +1027,33 @@ class Port(object):
self.state = self.STATE_STREAMS
self.last_factor_type = None
- def async_event_port_attr_changed (self, attr):
- self.info['speed'] = attr['speed'] // 1000
- self.attr = attr
+ def async_event_port_attr_changed (self, new_attr):
+
+ # get a thread safe duplicate
+ cur_attr = self.get_ts_attr()
+
+ # check if anything changed
+ if new_attr == cur_attr:
+ return None
+
+ # generate before
+ before = self.get_formatted_info(sync = False)
+
+ # update
+ self.set_ts_attr(new_attr)
+
+ # generate after
+ after = self.get_formatted_info(sync = False)
+
+ # return diff
+ diff = {}
+ for key, new_value in after.items():
+ old_value = before.get(key, 'N/A')
+ if new_value != old_value:
+ diff[key] = (old_value, new_value)
+
+ return diff
+
# rest of the events are used for TUI / read only sessions
def async_event_port_stopped (self):
@@ -792,3 +1079,133 @@ class Port(object):
def async_event_released (self):
self.owner = ''
+
+# a class to handle port ARP resolution
+class ARPResolver(object):
+ def __init__ (self, port):
+ self.port = port
+
+ # some sanity checks before resolving
+ def sanity (self):
+ if self.port.get_dest()['type'] == 'mac':
+ return self.port.err('resolve - port does not have an IPv4 as destination')
+
+ if self.port.get_src_ipv4() is None:
+ return self.port.err('resolve - port does not have an IPv4 address configured')
+
+ return self.port.ok()
+
+
+ # safe call - make sure RX filter mode is restored
+ def resolve (self, retries):
+ try:
+ rc = self.port.set_attr(rx_filter_mode = 'all')
+ if not rc:
+ return rc
+ rc = self.port.set_rx_queue(size = 100)
+ if not rc:
+ return rc
+
+ return self.resolve_wrapper(retries)
+ finally:
+ # best effort restore
+ self.port.set_attr(rx_filter_mode = 'hw')
+ self.port.remove_rx_queue()
+
+
+ # main resolve function
+ def resolve_wrapper (self, retries):
+ rc = self.sanity()
+ if not rc:
+ return rc
+
+ # invalidate the current ARP resolution (if exists)
+ rc = self.port.invalidate_arp()
+ if not rc:
+ return rc
+
+
+ rc = self.port.remove_all_streams()
+ if not rc:
+ return rc
+
+
+ rc = self.port.add_arp_request()
+ if not rc:
+ return rc
+
+
+ # retry for 'retries'
+ index = 0
+ while True:
+ response = self.resolve_iteration()
+ if response:
+ break
+
+ if index >= retries:
+ return self.port.err('failed to receive ARP response ({0} retries)'.format(retries))
+
+ index += 1
+ time.sleep(0.1)
+
+
+ # set ARP resolution result
+ rc = self.port.set_arp_resolution(response['ipv4'], response['mac'])
+ if not rc:
+ return rc
+
+ return self.port.ok()
+
+
+ def resolve_iteration (self):
+
+ mult = {'op': 'abs', 'type' : 'percentage', 'value': 100}
+ rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff)
+ if not rc:
+ return rc
+
+ # block until traffic finishes
+ while self.port.is_active():
+ time.sleep(0.01)
+
+ return self.wait_for_rx_response()
+
+
+ def wait_for_rx_response (self):
+
+ # we try to fetch response for 5 times
+ polling = 5
+
+ while polling > 0:
+ rx_pkts = self.port.get_rx_queue_pkts()
+ response = self.find_arp_response(rx_pkts)
+
+ if response:
+ return response
+
+ if polling == 0:
+ return None
+
+ polling -= 1
+ time.sleep(0.1)
+
+
+ # search in 'pkts' for an ARP response that matches the dest
+ def find_arp_response (self, pkts):
+
+ for pkt in pkts:
+ scapy_pkt = Ether(pkt)
+ if not 'ARP' in scapy_pkt:
+ continue
+
+ arp = scapy_pkt['ARP']
+ dest = self.port.get_dest()
+
+ # check this is the right ARP (ARP reply with the address)
+ if (arp.op != 2) or (arp.psrc != dest['addr']):
+ continue
+
+ return {'ipv4': arp.psrc, 'mac': arp.hwsrc}
+
+ return None
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
index 9f601484..2efb5a84 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
@@ -670,12 +670,18 @@ class CTRexInfoGenerator(object):
("promiscuous", []),
("flow ctrl", []),
("--", []),
- ("HW src mac", []),
- ("SW src mac", []),
- ("SW dst mac", []),
+ ("src IPv4", []),
+ ("src MAC", []),
("---", []),
+ ("Destination", []),
+ ("ARP Resolution", []),
+ ("----", []),
("PCI Address", []),
("NUMA Node", []),
+ ("-----", []),
+ ("RX Filter Mode", []),
+ ("RX Queueing", []),
+ ("RX sniffer", []),
]
)
@@ -1103,13 +1109,7 @@ class CPortStats(CTRexStats):
port_state = format_text(port_state, 'bold')
if self._port_obj:
- if 'link' in self._port_obj.attr:
- if self._port_obj.attr.get('link', {}).get('up') == False:
- link_state = format_text('DOWN', 'red', 'bold')
- else:
- link_state = 'UP'
- else:
- link_state = 'N/A'
+ link_state = 'UP' if self._port_obj.is_up() else format_text('DOWN', 'red', 'bold')
else:
link_state = ''
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py
index aa6c4218..81015ddc 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py
@@ -135,6 +135,12 @@ def validate_type(arg_name, arg, valid_types):
else:
raise STLError('validate_type: valid_types should be type or list or tuple of types')
+
+def validate_choice (arg_name, arg, choices):
+ if arg is not None and not arg in choices:
+ raise STLError("validate_choice: argument '{0}' can only be one of '{1}'".format(arg_name, choices))
+
+
# throws STLError if not exactly one argument is present
def verify_exclusive_arg (args_list):
if not (len(list(filter(lambda x: x is not None, args_list))) == 1):
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
index 72ee8972..02e13fd7 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
@@ -3,6 +3,8 @@ import sys
import string
import random
import time
+import socket
+import re
try:
import pwd
@@ -86,3 +88,13 @@ class PassiveTimer(object):
return (time.time() > self.expr_sec)
+def is_valid_ipv4 (addr):
+ try:
+ socket.inet_pton(socket.AF_INET, addr)
+ return True
+ except (socket.error, TypeError):
+ return False
+
+def is_valid_mac (mac):
+ return bool(re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()))
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
index 34cafd79..e7f04546 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
@@ -1,6 +1,6 @@
import argparse
from collections import namedtuple, OrderedDict
-from .common import list_intersect, list_difference
+from .common import list_intersect, list_difference, is_valid_ipv4, is_valid_mac
from .text_opts import format_text
from ..trex_stl_types import *
from .constants import ON_OFF_DICT, UP_DOWN_DICT, FLOW_CTRL_DICT
@@ -45,6 +45,17 @@ FLOW_CTRL = 28
SUPPORTED = 29
FILE_PATH_NO_CHECK = 30
+OUTPUT_FILENAME = 31
+ALL_FILES = 32
+LIMIT = 33
+PORT_RESTART = 34
+
+IPV4 = 35
+DEST = 36
+RETRIES = 37
+
+RX_FILTER_MODE = 38
+
GLOBAL_STATS = 50
PORT_STATS = 51
PORT_STATUS = 52
@@ -218,8 +229,20 @@ def is_valid_file(filename):
return filename
+def check_ipv4_addr (ipv4_str):
+ if not is_valid_ipv4(ipv4_str):
+ raise argparse.ArgumentTypeError("invalid IPv4 address: '{0}'".format(ipv4_str))
+ return ipv4_str
+
+def check_dest_addr (addr):
+ if not (is_valid_ipv4(addr) or is_valid_mac(addr)):
+ raise argparse.ArgumentTypeError("not a valid IPv4 or MAC address: '{0}'".format(addr))
+
+ return addr
+
+
def decode_tunables (tunable_str):
tunables = {}
@@ -304,6 +327,62 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'dest': 'flow_ctrl',
'choices': FLOW_CTRL_DICT}),
+ RX_FILTER_MODE: ArgumentPack(['--rxf'],
+ {'help': 'Set RX filtering mode',
+ 'dest': 'rx_filter_mode',
+ 'choices': ['hw', 'all']}),
+
+
+ IPV4: ArgumentPack(['--ipv4'],
+ {'help': 'IPv4 address(s) for the port(s)',
+ 'dest': 'ipv4',
+ 'nargs': '+',
+ 'default': None,
+ 'type': check_ipv4_addr}),
+
+ DEST: ArgumentPack(['--dest'],
+ {'help': 'Destination address(s) for the port(s) in either IPv4 or MAC format',
+ 'dest': 'dest',
+ 'nargs': '+',
+ 'default': None,
+ 'type': check_dest_addr}),
+
+ RETRIES: ArgumentPack(['-r', '--retries'],
+ {'help': 'retries count [default is zero]',
+ 'dest': 'retries',
+ 'default': 0,
+ 'type': int}),
+
+
+ OUTPUT_FILENAME: ArgumentPack(['-o', '--output'],
+ {'help': 'Output PCAP filename',
+ 'dest': 'output_filename',
+ 'default': None,
+ 'required': True,
+ 'type': str}),
+
+
+ PORT_RESTART: ArgumentPack(['-r', '--restart'],
+ {'help': 'hard restart port(s)',
+ 'dest': 'restart',
+ 'default': False,
+ 'action': 'store_true'}),
+
+
+ ALL_FILES: ArgumentPack(['--all'],
+ {'help': 'change RX port filter to fetch all packets',
+ 'dest': 'all',
+ 'default': False,
+ 'action': "store_true"}),
+
+
+ LIMIT: ArgumentPack(['-l', '--limit'],
+ {'help': 'Limit the packet count to be written to the file',
+ 'dest': 'limit',
+ 'default': 1000,
+ 'type': int}),
+
+
SUPPORTED: ArgumentPack(['--supp'],
{'help': 'Show which attributes are supported by current NICs',
'default': None,
@@ -461,6 +540,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
ALL_PORTS],
{'required': False}),
+
STREAM_FROM_PATH_OR_FILE: ArgumentGroup(MUTEX, [FILE_PATH,
FILE_FROM_DB],
{'required': True}),
diff --git a/scripts/trex_show_threads.py b/scripts/trex_show_threads.py
index fabe6d68..1824d073 100755
--- a/scripts/trex_show_threads.py
+++ b/scripts/trex_show_threads.py
@@ -58,8 +58,8 @@ def isnum (x):
def find_trex_pid ():
procs = [x for x in os.listdir('/proc/') if isnum(x)]
for proc in procs:
- cmd = open('/proc/{0}/{1}'.format(proc, 'cmdline')).readline()
- if '_t-rex' in cmd:
+ cmd = open('/proc/{0}/{1}'.format(proc, 'comm')).readline()
+ if cmd.startswith('_t-rex-64'):
return proc
return None
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp
index f169c29f..dfd3b183 100755
--- a/src/common/basic_utils.cpp
+++ b/src/common/basic_utils.cpp
@@ -20,6 +20,10 @@ limitations under the License.
#include <sstream>
#include <sys/resource.h>
+#include "pal_utl.h"
+
+int my_inet_pton4(const char *src, unsigned char *dst);
+
bool utl_is_file_exists (const std::string& name) {
if (FILE *file = fopen(name.c_str(), "r")) {
fclose(file);
@@ -190,6 +194,26 @@ void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) {
}
+std::string utl_macaddr_to_str(const uint8_t *macaddr) {
+ std::string tmp;
+ utl_macaddr_to_str(macaddr, tmp);
+
+ return tmp;
+}
+
+bool utl_str_to_macaddr(const std::string &s, uint8_t *mac) {
+ int last = -1;
+ int rc = sscanf(s.c_str(), "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%n",
+ mac + 0, mac + 1, mac + 2, mac + 3, mac + 4, mac + 5,
+ &last);
+
+ if ( (rc != 6) || (s.size() != last) ) {
+ return false;
+ }
+
+ return true;
+}
+
/**
* generate a random connection handler
*
@@ -248,3 +272,23 @@ void utl_set_coredump_size(long size, bool map_huge_pages) {
fprintf(fp, "%08x\n", mask);
fclose(fp);
}
+
+uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
+
+ uint32_t tmp;
+
+ int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp);
+ if (!rc) {
+ return (0);
+ }
+
+ ipv4_num = PAL_NTOHL(tmp);
+
+ return (1);
+}
+
+std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) {
+ std::stringstream ss;
+ ss << ((ipv4_addr >> 24) & 0xff) << "." << ((ipv4_addr >> 16) & 0xff) << "." << ((ipv4_addr >> 8) & 0xff) << "." << (ipv4_addr & 0xff);
+ return ss.str();
+}
diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h
index f6250a2b..ab0ff1ec 100755
--- a/src/common/basic_utils.h
+++ b/src/common/basic_utils.h
@@ -86,6 +86,9 @@ bool utl_is_file_exists (const std::string& name) ;
void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output);
+std::string utl_macaddr_to_str(const uint8_t *macaddr);
+bool utl_str_to_macaddr(const std::string &s, uint8_t *mac);
+
std::string utl_generate_random_str(unsigned int &seed, int len);
/**
@@ -98,6 +101,9 @@ std::string utl_generate_random_str(unsigned int &seed, int len);
*/
void utl_set_coredump_size(long size, bool map_huge_pages = false);
+uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
+std::string utl_uint32_to_ipv4(uint32_t ipv4_addr);
+
#endif
diff --git a/src/common/pcap.h b/src/common/pcap.h
index 3f8dfd21..c9139e4c 100755
--- a/src/common/pcap.h
+++ b/src/common/pcap.h
@@ -1,5 +1,5 @@
-#ifndef __LIBPCAP_H__
-#define __LIBPCAP_H__
+#ifndef __TREX_LIBPCAP_H__
+#define __TREX_LIBPCAP_H__
/*
Copyright (c) 2015-2015 Cisco Systems, Inc.
@@ -151,4 +151,5 @@ private:
bool m_is_open;
uint32_t m_pkt_count;
};
+
#endif
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 84be590f..dae29795 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -968,9 +968,9 @@ void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
TrexStatelessCpToRxMsgBase *msg;
if (is_start) {
- msg = new TrexStatelessRxStartMsg();
+ msg = new TrexStatelessRxEnableLatency();
} else {
- msg = new TrexStatelessRxStopMsg();
+ msg = new TrexStatelessRxDisableLatency();
}
m_ring_to_rx->Enqueue((CGenNode *)msg);
}
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 631f9a3e..5723503c 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -28,6 +28,7 @@ limitations under the License.
#include <string.h>
#include "flow_stat_parser.h"
#include "trex_defs.h"
+#include "trex_stateless_rx_defs.h"
#include "trex_port_attr.h"
#include <json/json.h>
@@ -112,19 +113,13 @@ public:
IF_STAT_RX_BYTES_COUNT = 8, // Card support counting rx bytes
};
- struct mac_cfg_st {
- uint8_t hw_macaddr[6];
- uint8_t src_macaddr[6];
- uint8_t dst_macaddr[6];
- };
-
/**
* interface static info
*
*/
struct intf_info_st {
std::string driver_name;
- mac_cfg_st mac_info;
+ uint8_t hw_macaddr[6];
std::string pci_addr;
int numa_node;
bool has_crc;
@@ -234,7 +229,7 @@ public:
info.has_crc = true;
info.numa_node = 0;
- memset(&info.mac_info, 0, sizeof(info.mac_info));
+ memset(&info.hw_macaddr, 0, sizeof(info.hw_macaddr));
}
virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const {
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index ab154b67..e799a5bd 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -316,7 +316,7 @@ public:
| TrexPlatformApi::IF_STAT_PAYLOAD;
}
virtual CFlowStatParser *get_flow_stat_parser();
- virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return 0;}
+ virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return -ENOTSUP;}
};
class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G {
@@ -1624,6 +1624,7 @@ int DpdkTRexPortAttr::set_led(bool on){
int DpdkTRexPortAttr::get_flow_ctrl(int &mode) {
int ret = rte_eth_dev_flow_ctrl_get(m_port_id, &fc_conf_tmp);
if (ret) {
+ mode = -1;
return ret;
}
mode = (int) fc_conf_tmp.mode;
@@ -1720,12 +1721,20 @@ bool DpdkTRexPortAttr::update_link_status_nowait(){
rte_eth_link new_link;
bool changed = false;
rte_eth_link_get_nowait(m_port_id, &new_link);
+
+ /* if the link got down - update the dest atribute to move to unresolved */
+ if (new_link.link_status != m_link.link_status) {
+ get_dest().on_link_down();
+ changed = true;
+ }
+
+ /* other changes */
if (new_link.link_speed != m_link.link_speed ||
new_link.link_duplex != m_link.link_duplex ||
- new_link.link_autoneg != m_link.link_autoneg ||
- new_link.link_status != m_link.link_status) {
+ new_link.link_autoneg != m_link.link_autoneg) {
changed = true;
}
+
m_link = new_link;
return changed;
}
@@ -1767,7 +1776,7 @@ bool DpdkTRexPortAttr::get_promiscuous(){
}
-void DpdkTRexPortAttr::macaddr_get(struct ether_addr *mac_addr){
+void DpdkTRexPortAttr::get_hw_src_mac(struct ether_addr *mac_addr){
rte_eth_macaddr_get(m_port_id , mac_addr);
}
@@ -3217,6 +3226,7 @@ void CGlobalTRex::pre_test() {
exit(1);
}
memcpy(CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest, mac, ETHER_ADDR_LEN);
+
// if port is connected in loopback, no need to send gratuitous ARP. It will only confuse our ingress counters.
if (pretest.is_loopback(port_id))
CGlobalInfo::m_options.m_ip_cfg[port_id].set_grat_arp_needed(false);
@@ -3229,6 +3239,18 @@ void CGlobalTRex::pre_test() {
// Configure port back to normal mode. Only relevant packets handled by software.
CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(pif, false);
+
+
+ /* set resolved IPv4 */
+ uint32_t dg = CGlobalInfo::m_options.m_ip_cfg[port_id].get_def_gw();
+ const uint8_t *dst_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest;
+ if (dg) {
+ m_ports[port_id].get_port_attr()->get_dest().set_dest_ipv4(dg, dst_mac);
+ } else {
+ m_ports[port_id].get_port_attr()->get_dest().set_dest_mac(dst_mac);
+ }
+
+
}
}
@@ -4293,18 +4315,9 @@ CGlobalTRex:: publish_async_port_attr_changed(uint8_t port_id) {
Json::Value data;
data["port_id"] = port_id;
TRexPortAttr * _attr = m_ports[port_id].get_port_attr();
-
- /* attributes */
- data["attr"]["speed"] = _attr->get_link_speed();
- data["attr"]["promiscuous"]["enabled"] = _attr->get_promiscuous();
- data["attr"]["link"]["up"] = _attr->is_link_up();
- int mode;
- int ret = _attr->get_flow_ctrl(mode);
- if (ret != 0) {
- mode = -1;
- }
- data["attr"]["fc"]["mode"] = mode;
-
+
+ _attr->to_json(data["attr"]);
+
m_zmq_publisher.publish_event(TrexPublisher::EVENT_PORT_ATTR_CHANGED, data);
}
@@ -4808,6 +4821,18 @@ bool CPhyEthIF::Create(uint8_t portid) {
m_last_tx_pps = 0.0;
m_port_attr = g_trex.m_drv->create_port_attr(portid);
+
+ uint32_t src_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_ip();
+ if (src_ipv4) {
+ m_port_attr->set_src_ipv4(src_ipv4);
+ }
+
+ /* for now set as unresolved IPv4 destination */
+ uint32_t dest_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_def_gw();
+ if (dest_ipv4) {
+ m_port_attr->get_dest().set_dest_ipv4(dest_ipv4);
+ }
+
return true;
}
@@ -5580,7 +5605,7 @@ CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() {
// in 1G we need to wait if links became ready to soon
void CTRexExtendedDriverBase1G::wait_after_link_up(){
- wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
+ //wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
}
int CTRexExtendedDriverBase1G::wait_for_stable_link(){
@@ -6993,19 +7018,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
/* mac INFO */
/* hardware */
- g_trex.m_ports[interface_id].get_port_attr()->macaddr_get(&rte_mac_addr);
+ g_trex.m_ports[interface_id].get_port_attr()->get_hw_src_mac(&rte_mac_addr);
assert(ETHER_ADDR_LEN == 6);
- /* software */
- uint8_t sw_macaddr[12];
- memcpy(sw_macaddr, CGlobalInfo::m_options.get_dst_src_mac_addr(interface_id), 12);
-
- for (int i = 0; i < 6; i++) {
- info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i];
- info.mac_info.dst_macaddr[i] = sw_macaddr[i];
- info.mac_info.src_macaddr[i] = sw_macaddr[6 + i];
-
- }
+ memcpy(info.hw_macaddr, rte_mac_addr.addr_bytes, 6);
info.numa_node = g_trex.m_ports[interface_id].m_dev_info.pci_dev->numa_node;
struct rte_pci_addr *loc = &g_trex.m_ports[interface_id].m_dev_info.pci_dev->addr;
@@ -7111,6 +7127,21 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const {
return g_trex.m_ports[port_id].get_port_attr();
}
+
+int DpdkTRexPortAttr::set_rx_filter_mode(rx_filter_mode_e rx_filter_mode) {
+
+ CPhyEthIF *_if = &g_trex.m_ports[m_port_id];
+ bool recv_all = (rx_filter_mode == RX_FILTER_MODE_ALL);
+ int rc = CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(_if, recv_all);
+ if (rc != 0) {
+ return (rc);
+ }
+
+ m_rx_filter_mode = rx_filter_mode;
+
+ return (0);
+}
+
/**
* marks the control plane for a total server shutdown
*
@@ -7119,3 +7150,6 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const {
void TrexDpdkPlatformApi::mark_for_shutdown() const {
g_trex.mark_for_shutdown(CGlobalTRex::SHUTDOWN_RPC_REQ);
}
+
+
+
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 109cc1a4..14b38165 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -27,6 +27,8 @@ limitations under the License.
#include <internal_api/trex_platform_api.h>
+#include "trex_stateless_rx_core.h"
+
#include <fstream>
#include <iostream>
#include <unistd.h>
@@ -289,19 +291,15 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"] = Json::arrayValue;
for (int i = 0; i < main->get_port_count(); i++) {
- uint32_t speed;
string driver;
- string hw_macaddr;
- string src_macaddr;
- string dst_macaddr;
string pci_addr;
string description;
supp_speeds_t supp_speeds;
int numa;
TrexStatelessPort *port = main->get_port_by_id(i);
- port->get_properties(driver, speed);
- port->get_macaddr(hw_macaddr, src_macaddr, dst_macaddr);
+
+ port->get_properties(driver);
port->get_pci_info(pci_addr, numa);
main->get_platform_api()->getPortAttrObj(i)->get_description(description);
@@ -311,12 +309,9 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"][i]["driver"] = driver;
section["ports"][i]["description"] = description;
- section["ports"][i]["hw_macaddr"] = hw_macaddr;
- section["ports"][i]["src_macaddr"] = src_macaddr;
- section["ports"][i]["dst_macaddr"] = dst_macaddr;
- section["ports"][i]["pci_addr"] = pci_addr;
- section["ports"][i]["numa"] = numa;
+ section["ports"][i]["pci_addr"] = pci_addr;
+ section["ports"][i]["numa"] = numa;
uint16_t caps = port->get_rx_caps();
section["ports"][i]["rx"]["caps"] = Json::arrayValue;
@@ -330,7 +325,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"][i]["rx"]["caps"].append("rx_bytes");
}
section["ports"][i]["rx"]["counters"] = port->get_rx_count_num();
- section["ports"][i]["speed"] = (uint16_t) speed / 1000;
section["ports"][i]["is_fc_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_fc_change_supported();
section["ports"][i]["is_led_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_led_change_supported();
section["ports"][i]["is_link_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_link_change_supported();
@@ -345,6 +339,68 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+
+int
+TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+ const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result);
+
+ rx_filter_mode_e filter_mode;
+ if (type == "hw") {
+ filter_mode = RX_FILTER_MODE_HW;
+ } else if (type == "all") {
+ filter_mode = RX_FILTER_MODE_ALL;
+ } else {
+ assert(0);
+ }
+
+ return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode);
+}
+
+int
+TrexRpcCmdSetPortAttr::parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+
+ const std::string ipv4_str = parse_string(msg, "addr", result);
+
+ uint32_t ipv4_addr;
+ if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) {
+ std::stringstream ss;
+ ss << "invalid IPv4 address: '" << ipv4_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+
+ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_src_ipv4(ipv4_addr);
+ return (0);
+}
+
+int
+TrexRpcCmdSetPortAttr::parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+
+ /* can be either IPv4 or MAC */
+ const std::string addr = parse_string(msg, "addr", result);
+
+ TRexPortAttr *port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id);
+
+ /* try IPv4 */
+ uint32_t ipv4_addr;
+ uint8_t mac[6];
+
+ if (utl_ipv4_to_uint32(addr.c_str(), ipv4_addr)) {
+ port_attr->get_dest().set_dest_ipv4(ipv4_addr);
+
+ } else if (utl_str_to_macaddr(addr, mac)) {
+ port_attr->get_dest().set_dest_mac(mac);
+
+ } else {
+ std::stringstream ss;
+ ss << "'dest' is not an IPv4 address or a MAC address: '" << addr << "'";
+ generate_parse_err(result, ss.str());
+ }
+
+
+ return (0);
+}
+
+
/**
* set port commands
*
@@ -361,46 +417,64 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
const Json::Value &attr = parse_object(params, "attr", result);
+
int ret = 0;
- bool changed = false;
+
/* iterate over all attributes in the dict */
for (const std::string &name : attr.getMemberNames()) {
+
if (name == "promiscuous") {
bool enabled = parse_bool(attr[name], "enabled", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_promiscuous(enabled);
}
+
else if (name == "link_status") {
bool up = parse_bool(attr[name], "up", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_link_up(up);
}
+
else if (name == "led_status") {
bool on = parse_bool(attr[name], "on", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_led(on);
- } else if (name == "flow_ctrl_mode") {
+ }
+
+ else if (name == "flow_ctrl_mode") {
int mode = parse_int(attr[name], "mode", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode);
- } else {
- generate_execute_err(result, "Not recognized attribute: " + name);
- break;
}
- if (ret != 0){
- if ( ret == -ENOTSUP ) {
- generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
- }
- else if (ret) {
- generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
- }
+
+ else if (name == "rx_filter_mode") {
+ const Json::Value &rx = parse_object(attr, name, result);
+ ret = parse_rx_filter_mode(rx, port_id, result);
+ }
+
+ else if (name == "ipv4") {
+ const Json::Value &ipv4 = parse_object(attr, name, result);
+ ret = parse_ipv4(ipv4, port_id, result);
+ }
+
+ else if (name == "dest") {
+ const Json::Value &dest = parse_object(attr, name, result);
+ ret = parse_dest(dest, port_id, result);
+ }
+
+ /* unknown attribute */
+ else {
+ generate_execute_err(result, "unknown attribute type: '" + name + "'");
break;
- } else {
- changed = true;
}
- }
- if (changed) {
- get_stateless_obj()->get_platform_api()->publish_async_port_attr_changed(port_id);
- }
+ /* check error code */
+ if ( ret == -ENOTSUP ) {
+ generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
+ } else if (ret) {
+ generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
+ }
+ }
+
result["result"] = Json::objectValue;
return (TREX_RPC_CMD_OK);
+
}
@@ -568,17 +642,12 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name());
result["result"]["state"] = port->get_state_as_string();
result["result"]["max_stream_id"] = port->get_max_stream_id();
- result["result"]["speed"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_link_speed();
/* attributes */
- result["result"]["attr"]["promiscuous"]["enabled"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_promiscuous();
- result["result"]["attr"]["link"]["up"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->is_link_up();
- int mode;
- int ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_flow_ctrl(mode);
- if (ret != 0) {
- mode = -1;
- }
- result["result"]["attr"]["fc"]["mode"] = mode;
+ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
+
+ /* RX info */
+ port->get_rx_features().to_json(result["result"]["rx_info"]);
return (TREX_RPC_CMD_OK);
}
@@ -640,3 +709,149 @@ TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
}
+/**
+ * set on/off RX software receive mode
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ /* decide which feature is being set */
+ const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result);
+
+ if (type == "capture") {
+ parse_capture_msg(params, port, result);
+ } else if (type == "queue") {
+ parse_queue_msg(params, port, result);
+ } else if (type == "server") {
+ parse_server_msg(params, port, result);
+ } else {
+ assert(0);
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
+
+void
+TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+
+ bool enabled = parse_bool(msg, "enabled", result);
+
+ if (enabled) {
+
+ std::string pcap_filename = parse_string(msg, "pcap_filename", result);
+ uint64_t limit = parse_uint32(msg, "limit", result);
+
+ if (limit == 0) {
+ generate_parse_err(result, "limit cannot be zero");
+ }
+
+ try {
+ port->start_rx_capture(pcap_filename, limit);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ } else {
+
+ try {
+ port->stop_rx_capture();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ }
+
+}
+
+void
+TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+ bool enabled = parse_bool(msg, "enabled", result);
+
+ if (enabled) {
+
+ uint64_t size = parse_uint32(msg, "size", result);
+
+ if (size == 0) {
+ generate_parse_err(result, "queue size cannot be zero");
+ }
+
+ try {
+ port->start_rx_queue(size);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ } else {
+
+ try {
+ port->stop_rx_queue();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ }
+
+}
+
+void
+TrexRpcCmdSetRxFeature::parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+}
+
+
+trex_rpc_cmd_rc_e
+TrexRpcCmdGetRxQueuePkts::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ const RxPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
+ if (pkt_buffer) {
+ result["result"]["pkts"] = pkt_buffer->to_json();
+ } else {
+ result["result"]["pkts"] = Json::arrayValue;
+ }
+
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+
+ return (TREX_RPC_CMD_OK);
+}
+
+trex_rpc_cmd_rc_e
+TrexRpcCmdSetARPRes::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ const std::string ipv4_str = parse_string(params, "ipv4", result);
+ const std::string mac_str = parse_string(params, "mac", result);
+
+ uint32_t ipv4_addr;
+ if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) {
+ std::stringstream ss;
+ ss << "invalid IPv4 address: '" << ipv4_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+
+ uint8_t mac[6];
+ if (!utl_str_to_macaddr(mac_str, mac)) {
+ std::stringstream ss;
+ ss << "'invalid MAC address: '" << mac_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+
+ port->getPortAttrObj()->get_dest().set_dest_ipv4(ipv4_addr, mac);
+
+ return (TREX_RPC_CMD_OK);
+
+}
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 5fde1d0c..2b2178e2 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -27,6 +27,7 @@ limitations under the License.
#include <memory>
class TrexStream;
+class TrexStatelessPort;
/* all the RPC commands decl. goes here */
@@ -89,10 +90,17 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true, APIClass:
*/
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1, false, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE,
+
+ int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result);
+ int parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result);
+ int parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result);
+);
+
+
/**
* stream cmds
*/
@@ -144,5 +152,15 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_
TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE,
+ void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+ void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+ void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+
+);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetARPRes, "set_arp_resolution", 2, false, APIClass::API_CLASS_TYPE_CORE);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index cddf19b9..919be1f1 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -71,6 +71,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdPushRemote());
register_command(new TrexRpcCmdShutdown());
+
+ register_command(new TrexRpcCmdSetRxFeature());
+ register_command(new TrexRpcCmdGetRxQueuePkts());
+
+ register_command(new TrexRpcCmdSetARPRes());
}
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 9bb20990..d4bc5c36 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -25,6 +25,7 @@ limitations under the License.
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
#include <common/captureFile.h>
+#include "trex_stateless_rx_defs.h"
#include <string>
@@ -156,9 +157,9 @@ private:
TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
- m_port_id = port_id;
- m_port_state = PORT_STATE_IDLE;
- m_platform_api = api;
+ m_port_id = port_id;
+ m_port_state = PORT_STATE_IDLE;
+ m_platform_api = api;
/* get the platform specific data */
api->get_interface_info(port_id, m_api_info);
@@ -584,10 +585,9 @@ TrexStatelessPort::get_max_stream_id() const {
}
void
-TrexStatelessPort::get_properties(std::string &driver, uint32_t &speed) {
+TrexStatelessPort::get_properties(std::string &driver) {
driver = m_api_info.driver_name;
- speed = m_platform_api->getPortAttrObj(m_port_id)->get_link_speed();
}
bool
@@ -888,16 +888,6 @@ TrexStatelessPort::get_port_effective_rate(double &pps,
}
void
-TrexStatelessPort::get_macaddr(std::string &hw_macaddr,
- std::string &src_macaddr,
- std::string &dst_macaddr) {
-
- utl_macaddr_to_str(m_api_info.mac_info.hw_macaddr, hw_macaddr);
- utl_macaddr_to_str(m_api_info.mac_info.src_macaddr, src_macaddr);
- utl_macaddr_to_str(m_api_info.mac_info.dst_macaddr, dst_macaddr);
-}
-
-void
TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
pci_addr = m_api_info.pci_addr;
numa_node = m_api_info.numa_node;
@@ -944,6 +934,56 @@ TrexStatelessPort::remove_and_delete_all_streams() {
}
}
+void
+TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
+
+ m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
+
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+ send_message_to_rx(msg);
+}
+
+void
+TrexStatelessPort::stop_rx_capture() {
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
+ send_message_to_rx(msg);
+ m_rx_features_info.m_rx_capture_info.disable();
+}
+
+void
+TrexStatelessPort::start_rx_queue(uint64_t size) {
+
+ m_rx_features_info.m_rx_queue_info.enable(size);
+
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+ send_message_to_rx(msg);
+}
+
+void
+TrexStatelessPort::stop_rx_queue() {
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
+ send_message_to_rx(msg);
+ m_rx_features_info.m_rx_queue_info.disable();
+}
+
+
+RxPacketBuffer *
+TrexStatelessPort::get_rx_queue_pkts() {
+
+ if (m_rx_features_info.m_rx_queue_info.is_empty()) {
+ return NULL;
+ }
+
+ /* ask RX core for the pkt queue */
+ TrexStatelessMsgReply<RxPacketBuffer *> msg_reply;
+
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply);
+ send_message_to_rx(msg);
+
+ RxPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
+ return pkt_buffer;
+}
+
/************* Trex Port Owner **************/
TrexPortOwner::TrexPortOwner() {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index e2a2aeba..cf6b2716 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -24,12 +24,15 @@ limitations under the License.
#include "common/basic_utils.h"
#include "internal_api/trex_platform_api.h"
#include "trex_dp_port_events.h"
+#include "trex_stateless_rx_defs.h"
#include "trex_stream.h"
class TrexStatelessCpToDpMsgBase;
class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
+class RxPacketBuffer;
+
/**
* TRex port owner can perform
@@ -255,11 +258,8 @@ public:
* @author imarom (16-Sep-15)
*
* @param driver
- * @param speed
*/
- void get_properties(std::string &driver, uint32_t &speed);
-
-
+ void get_properties(std::string &driver);
/**
* encode stats as JSON
@@ -362,14 +362,60 @@ public:
double &bps_L2,
double &percentage);
+ void get_pci_info(std::string &pci_addr, int &numa_node);
- void get_macaddr(std::string &hw_macaddr,
- std::string &src_macaddr,
- std::string &dst_macaddr);
- void get_pci_info(std::string &pci_addr, int &numa_node);
+ /**
+ * enable RX capture on port
+ *
+ */
+ void start_rx_capture(const std::string &pcap_filename, uint64_t limit);
+ /**
+ * disable RX capture if on
+ *
+ */
+ void stop_rx_capture();
+
+ /**
+ * start RX queueing of packets
+ *
+ * @author imarom (11/7/2016)
+ *
+ * @param limit
+ */
+ void start_rx_queue(uint64_t limit);
+ /**
+ * stop RX queueing
+ *
+ * @author imarom (11/7/2016)
+ */
+ void stop_rx_queue();
+
+
+ /**
+ * get the RX features info object
+ *
+ */
+ const RXFeaturesInfo &get_rx_features() {
+ return m_rx_features_info;
+ }
+
+ /**
+ * fetch the RX queue packets from the queue
+ *
+ */
+ RxPacketBuffer *get_rx_queue_pkts();
+
+ /**
+ * return the port attribute object
+ *
+ */
+ TRexPortAttr *getPortAttrObj() {
+ return m_platform_api->getPortAttrObj(m_port_id);
+ }
+
private:
bool is_core_active(int core_id);
@@ -456,6 +502,9 @@ private:
TrexPortOwner m_owner;
int m_pending_async_stop_event;
+
+ RXFeaturesInfo m_rx_features_info;
+
};
@@ -502,9 +551,9 @@ public:
static const std::initializer_list<std::string> g_types;
static const std::initializer_list<std::string> g_ops;
- mul_type_e m_type;
- mul_op_e m_op;
- double m_value;
+ mul_type_e m_type;
+ mul_op_e m_op;
+ double m_value;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 95613b41..c2182f3c 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -241,13 +241,13 @@ TrexDpPortEventMsg::handle() {
}
/************************* messages from CP to RX **********************/
-bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) {
- rx_core->work();
+bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) {
+ rx_core->enable_latency();
return true;
}
-bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) {
- rx_core->idle();
+bool TrexStatelessRxDisableLatency::handle (CRxCoreStateless *rx_core) {
+ rx_core->disable_latency();
return true;
}
@@ -255,3 +255,42 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
rx_core->quit();
return true;
}
+
+
+bool
+TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
+ rx_core->start_capture(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+
+ return true;
+}
+
+bool
+TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
+ rx_core->stop_capture(m_port_id);
+
+ return true;
+}
+
+bool
+TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->start_queue(m_port_id, m_size, m_shared_counter);
+
+ return true;
+}
+
+bool
+TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->stop_queue(m_port_id);
+
+ return true;
+}
+
+
+
+bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
+ RxPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+ assert(pkt_buffer);
+ m_reply.set(pkt_buffer);
+
+ return true;
+}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index fb2c27ab..52b1662e 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -24,11 +24,15 @@ limitations under the License.
#include "msg_manager.h"
#include "trex_dp_port_events.h"
+#include "trex_exception.h"
+#include "trex_stateless_rx_defs.h"
+#include "os_time.h"
class TrexStatelessDpCore;
class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
+class RxPacketBuffer;
/**
* defines the base class for CP to DP messages
@@ -312,7 +316,7 @@ private:
/************************* messages from DP to CP **********************/
/**
- * defines the base class for CP to DP messages
+ * defines the base class for DP to CP messages
*
* @author imarom (27-Oct-15)
*/
@@ -404,11 +408,11 @@ public:
};
-class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
-class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
@@ -416,4 +420,124 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
+
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
+ m_port_id = port_id;
+ m_limit = rx_capture_info.m_limit;
+ m_pcap_filename = rx_capture_info.m_pcap_filename;
+ m_shared_counter = &rx_capture_info.m_shared_counter;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ std::string m_pcap_filename;
+ uint64_t m_limit;
+ uint64_t *m_shared_counter;
+};
+
+
+class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStopCapture(uint8_t port_id) {
+ m_port_id = port_id;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+};
+
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
+ m_port_id = port_id;
+ m_size = rx_queue_info.m_size;
+ m_shared_counter = &rx_queue_info.m_shared_counter;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ uint64_t m_size;
+ uint64_t *m_shared_counter;
+};
+
+class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStopQueue(uint8_t port_id) {
+ m_port_id = port_id;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+};
+
+
+template<typename T> class TrexStatelessMsgReply {
+public:
+ TrexStatelessMsgReply() {
+ m_pending = true;
+ }
+
+ bool is_pending() const {
+ return m_pending;
+ }
+
+ void set(T reply) {
+ m_reply = reply;
+
+ /* before marking as done - memory fence */
+ asm volatile("mfence" ::: "memory");
+ m_pending = false;
+ }
+
+ T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+ int guard = timeout_ms;
+
+ while (is_pending()) {
+ guard -= backoff_ms;
+ if (guard < 0) {
+ throw TrexException("timeout: RX core has failed to reply");
+ }
+
+ delay(backoff_ms);
+
+ }
+ return m_reply;
+
+ }
+private:
+ bool m_pending;
+ T m_reply;
+};
+
+
+
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
+public:
+
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RxPacketBuffer *> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ TrexStatelessMsgReply<RxPacketBuffer*> &m_reply;
+};
+
+
#endif /* __TREX_STATELESS_MESSAGING_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index d162c5b3..2a678365 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -26,6 +26,8 @@
#include "pal/linux/sanb_atomic.h"
#include "trex_stateless_messaging.h"
#include "trex_stateless_rx_core.h"
+#include "trex_stateless.h"
+
void CRFC2544Info::create() {
m_latency.Create();
@@ -64,15 +66,7 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
obj.set_latency_json(json);
};
-void CCPortLatencyStl::reset() {
- for (int i = 0; i < MAX_FLOW_STATS; i++) {
- m_rx_pg_stat[i].clear();
- m_rx_pg_stat_payload[i].clear();
- }
-}
-
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
- m_rcv_all = false;
m_capture = false;
m_max_ports = cfg.m_max_ports;
@@ -82,15 +76,18 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_ring_to_cp = cp_rx->getRingDpToCp(0);
m_state = STATE_IDLE;
- for (int i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- lp->m_io = cfg.m_ports[i];
- lp->m_port.reset();
+ for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
+ m_rfc2544[i].create();
}
+
m_cpu_cp_u.Create(&m_cpu_dp_u);
- for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
- m_rfc2544[i].create();
+ /* init per port manager */
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].create(cfg.m_ports[i],
+ m_rfc2544,
+ &m_err_cntrs,
+ &m_cpu_dp_u);
}
}
@@ -124,10 +121,30 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
handle_cp_msg(msg);
}
+ recalculate_next_state();
return true;
}
+void CRxCoreStateless::recalculate_next_state() {
+ if (m_state == STATE_QUIT) {
+ return;
+ }
+
+ /* next state is determine by the question are there any ports with active features ? */
+ m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE);
+}
+
+bool CRxCoreStateless::are_any_features_active() {
+ for (int i = 0; i < m_max_ports; i++) {
+ if (m_rx_port_mngr[i].has_features_set()) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
void CRxCoreStateless::idle_state_loop() {
const int SHORT_DELAY_MS = 2;
const int LONG_DELAY_MS = 50;
@@ -141,7 +158,7 @@ void CRxCoreStateless::idle_state_loop() {
counter = 0;
continue;
} else {
- flush_rx();
+ flush_all_pending_pkts();
}
/* enter deep sleep only if enough time had passed */
@@ -154,143 +171,55 @@ void CRxCoreStateless::idle_state_loop() {
}
}
-void CRxCoreStateless::start() {
- int count = 0;
+void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
int i = 0;
- bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
+
+ while (m_state == STATE_WORKING) {
+
+ if (do_try_rx_queue) {
+ try_rx_queues();
+ }
+
+ process_all_pending_pkts();
+
+ i++;
+ if (i == 100000) { // approx 10msec
+ i = 0;
+ periodic_check_for_cp_messages(); // m_state might change in here
+ }
+
+ rte_pause();
+ }
+}
+
+void CRxCoreStateless::start() {
+ bool do_try_rx_queue = CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
/* register a watchdog handle on current core */
m_monitor.create("STL RX CORE", 1);
TrexWatchDog::getInstance().register_monitor(&m_monitor);
- while (true) {
- if (m_state == STATE_WORKING) {
- i++;
- if (i == 100000) { // approx 10msec
- i = 0;
- periodic_check_for_cp_messages(); // m_state might change in here
- }
- } else {
- if (m_state == STATE_QUIT)
- break;
- count = 0;
- i = 0;
+ while (m_state != STATE_QUIT) {
+
+ switch (m_state) {
+ case STATE_IDLE:
set_working_msg_ack(false);
idle_state_loop();
set_working_msg_ack(true);
- }
- if (do_try_rx_queue) {
- try_rx_queues();
- }
- count += try_rx();
- }
- rte_pause();
+ break;
- m_monitor.disable();
-}
+ case STATE_WORKING:
+ handle_work_stage(do_try_rx_queue);
+ break;
-void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) {
- CFlowStatParser parser;
-
- if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
- uint32_t ip_id;
- if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) {
- if (m_rcv_all || is_flow_stat_id(ip_id)) {
- uint16_t hw_id;
- if (m_rcv_all || is_flow_stat_payload_id(ip_id)) {
- bool good_packet = true;
- uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
- struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
- (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
- hw_id = fsp_head->hw_id;
- CRFC2544Info *curr_rfc2544;
-
- if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
- good_packet = false;
- if (!m_rcv_all)
- m_err_cntrs.m_bad_header++;
- } else {
- curr_rfc2544 = &m_rfc2544[hw_id];
-
- if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
- // bad flow seq num
- // Might be the first packet of a new flow, packet from an old flow, or garbage.
-
- if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
- // packet from previous flow using this hw_id that arrived late
- good_packet = false;
- m_err_cntrs.m_old_flow++;
- } else {
- if (curr_rfc2544->no_flow_seq()) {
- // first packet we see from this flow
- good_packet = true;
- curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
- } else {
- // garbage packet
- good_packet = false;
- m_err_cntrs.m_bad_header++;
- }
- }
- }
- }
-
- if (good_packet) {
- uint32_t pkt_seq = fsp_head->seq;
- uint32_t exp_seq = curr_rfc2544->get_seq();
- if (unlikely(pkt_seq != exp_seq)) {
- if (pkt_seq < exp_seq) {
- if (exp_seq - pkt_seq > 100000) {
- // packet loss while we had wrap around
- curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544->inc_seq_err_too_big();
- curr_rfc2544->set_seq(pkt_seq + 1);
- } else {
- if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544->inc_dup();
- } else {
- curr_rfc2544->inc_ooo();
- // We thought it was lost, but it was just out of order
- curr_rfc2544->dec_seq_err();
- }
- curr_rfc2544->inc_seq_err_too_low();
- }
- } else {
- if (unlikely (pkt_seq - exp_seq > 100000)) {
- // packet reorder while we had wrap around
- if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544->inc_dup();
- } else {
- curr_rfc2544->inc_ooo();
- // We thought it was lost, but it was just out of order
- curr_rfc2544->dec_seq_err();
- }
- curr_rfc2544->inc_seq_err_too_low();
- } else {
- // seq > curr_rfc2544->seq. Assuming lost packets
- curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544->inc_seq_err_too_big();
- curr_rfc2544->set_seq(pkt_seq + 1);
- }
- }
- } else {
- curr_rfc2544->set_seq(pkt_seq + 1);
- }
- lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
- uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
- dsec_t ctime = ptime_convert_hr_dsec(d);
- curr_rfc2544->add_sample(ctime);
- }
- } else {
- hw_id = get_hw_id(ip_id);
- if (hw_id < MAX_FLOW_STATS) {
- lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
- }
- }
- }
+ default:
+ assert(0);
+ break;
}
+
}
+
+ m_monitor.disable();
}
void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
@@ -310,7 +239,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
CGenNodeLatencyPktInfo * l_msg;
uint8_t msg_type = msg->m_msg_type;
uint8_t rx_port_index;
- CLatencyManagerPerPortStl * lp;
+
switch (msg_type) {
case CGenNodeMsgBase::LATENCY_PKT:
@@ -318,8 +247,9 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
assert(l_msg->m_latency_offset == 0xdead);
rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
assert( rx_port_index < m_max_ports );
- lp = &m_ports[rx_port_index];
- handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt);
+
+ m_rx_port_mngr[rx_port_index].handle_pkt((rte_mbuf_t *)l_msg->m_pkt);
+
if (m_capture)
capture_pkt((rte_mbuf_t *)l_msg->m_pkt);
rte_pktmbuf_free((rte_mbuf_t *)l_msg->m_pkt);
@@ -347,87 +277,38 @@ void CRxCoreStateless::try_rx_queues() {
}
}
-// exactly the same as try_rx, without the handle_rx_pkt
-// purpose is to flush rx queues when core is in idle state
-void CRxCoreStateless::flush_rx() {
- rte_mbuf_t * rx_pkts[64];
- int i, total_pkts = 0;
- for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- rte_mbuf_t * m;
- /* try to read 64 packets clean up the queue */
- uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
- total_pkts += cnt_p;
- if (cnt_p) {
- m_cpu_dp_u.start_work1();
- int j;
- for (j = 0; j < cnt_p; j++) {
- m = rx_pkts[j];
- rte_pktmbuf_free(m);
- }
- /* commit only if there was work to do ! */
- m_cpu_dp_u.commit1();
- }/* if work */
- }// all ports
-}
+int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
+
+ int total_pkts = 0;
+ for (int i = 0; i < m_max_ports; i++) {
+ total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx);
+ }
-int CRxCoreStateless::try_rx() {
- rte_mbuf_t * rx_pkts[64];
- int i, total_pkts = 0;
- for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- rte_mbuf_t * m;
- /* try to read 64 packets clean up the queue */
- uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
- total_pkts += cnt_p;
- if (cnt_p) {
- m_cpu_dp_u.start_work1();
- int j;
- for (j = 0; j < cnt_p; j++) {
- m = rx_pkts[j];
- handle_rx_pkt(lp, m);
- rte_pktmbuf_free(m);
- }
- /* commit only if there was work to do ! */
- m_cpu_dp_u.commit1();
- }/* if work */
- }// all ports
return total_pkts;
-}
-bool CRxCoreStateless::is_flow_stat_id(uint32_t id) {
- if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
- return false;
}
-bool CRxCoreStateless::is_flow_stat_payload_id(uint32_t id) {
- if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
- return false;
-}
-
-uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
- return (0x00ff & id);
-}
void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
- for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
- }
+ m_rx_port_mngr[port_id].clear_stats();
}
int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
, bool reset, TrexPlatformApi::driver_stat_cap_e type) {
+
+ RXLatency &latency = m_rx_port_mngr[port_id].get_latency();
+
for (int hw_id = min; hw_id <= max; hw_id++) {
if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id];
+ rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id];
} else {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id];
}
if (reset) {
if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear();
+ latency.m_rx_pg_stat_payload[hw_id].clear();
} else {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ latency.m_rx_pg_stat[hw_id].clear();
}
}
}
@@ -472,3 +353,39 @@ void CRxCoreStateless::update_cpu_util(){
double CRxCoreStateless::get_cpu_util() {
return m_cpu_cp_u.GetVal();
}
+
+
+void
+CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
+ m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter);
+}
+
+void
+CRxCoreStateless::stop_capture(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_capture();
+}
+
+void
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
+ m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+}
+
+void
+CRxCoreStateless::stop_queue(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_queue();
+}
+
+void
+CRxCoreStateless::enable_latency() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].enable_latency();
+ }
+}
+
+void
+CRxCoreStateless::disable_latency() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].disable_latency();
+ }
+}
+
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 3f9fb6cc..519724d8 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -25,36 +25,10 @@
#include "os_time.h"
#include "pal/linux/sanb_atomic.h"
#include "utl_cpuu.h"
+#include "trex_stateless_rx_port_mngr.h"
class TrexStatelessCpToRxMsgBase;
-class CCPortLatencyStl {
- public:
- void reset();
-
- public:
- rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
- rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD];
-};
-
-class CLatencyManagerPerPortStl {
-public:
- CCPortLatencyStl m_port;
- CPortLatencyHWBase * m_io;
-};
-
-class CRxSlCfg {
- public:
- CRxSlCfg (){
- m_max_ports = 0;
- m_cps = 0.0;
- }
-
- public:
- uint32_t m_max_ports;
- double m_cps;
- CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
-};
class CRFC2544Info {
public:
@@ -109,7 +83,7 @@ class CRxCoreErrCntrs {
m_old_flow = 0;
}
- private:
+ public:
uint64_t m_bad_header;
uint64_t m_old_flow;
};
@@ -129,47 +103,81 @@ class CRxCoreStateless {
, TrexPlatformApi::driver_stat_cap_e type);
int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset);
int get_rx_err_cntrs(CRxCoreErrCntrs *rx_err);
- void work() {
- m_state = STATE_WORKING;
- m_err_cntrs.reset(); // When starting to work, reset global counters
- }
- void idle() {m_state = STATE_IDLE;}
+
+
void quit() {m_state = STATE_QUIT;}
bool is_working() const {return (m_ack_start_work_msg == true);}
void set_working_msg_ack(bool val);
double get_cpu_util();
void update_cpu_util();
+ RxPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ return m_rx_port_mngr[port_id].get_pkt_buffer();
+ }
+
+ /**
+ * start capturing of RX packets on a specific port
+ *
+ * @author imarom (11/2/2016)
+ *
+ * @param port_id
+ * @param pcap_filename
+ * @param limit
+ */
+ void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+ void stop_capture(uint8_t port_id);
+
+ /**
+ * start RX queueing of packets
+ *
+ */
+ void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+ void stop_queue(uint8_t port_id);
+
+ /**
+ * enable latency feature for RX packets
+ * will be apply to all ports
+ */
+ void enable_latency();
+ void disable_latency();
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
void tickle();
void idle_state_loop();
- void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m);
+
+ void recalculate_next_state();
+ bool are_any_features_active();
+
void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
- void flush_rx();
- int try_rx();
+ void handle_work_stage(bool do_try_rx_queue);
+
+ int process_all_pending_pkts(bool flush_rx = false);
+
+ void flush_all_pending_pkts() {
+ process_all_pending_pkts(true);
+ }
+
void try_rx_queues();
- bool is_flow_stat_id(uint32_t id);
- bool is_flow_stat_payload_id(uint32_t id);
- uint16_t get_hw_id(uint16_t id);
private:
- TrexMonitor m_monitor;
- uint32_t m_max_ports;
- bool m_capture;
- bool m_rcv_all;
- CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS];
- state_e m_state;
- CNodeRing *m_ring_from_cp;
- CNodeRing *m_ring_to_cp;
- CCpuUtlDp m_cpu_dp_u;
- CCpuUtlCp m_cpu_cp_u;
+ TrexMonitor m_monitor;
+ uint32_t m_max_ports;
+ bool m_capture;
+ state_e m_state;
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+ CCpuUtlDp m_cpu_dp_u;
+ CCpuUtlCp m_cpu_cp_u;
+
// Used for acking "work" (go out of idle) messages from cp
volatile bool m_ack_start_work_msg __rte_cache_aligned;
+
CRxCoreErrCntrs m_err_cntrs;
CRFC2544Info m_rfc2544[MAX_FLOW_STATS_PAYLOAD];
+
+ RXPortManager m_rx_port_mngr[TREX_MAX_PORTS];
};
#endif
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
new file mode 100644
index 00000000..7b1e0f32
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -0,0 +1,157 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef __TREX_STATELESS_RX_DEFS_H__
+#define __TREX_STATELESS_RX_DEFS_H__
+
+#include "trex_defs.h"
+#include <json/json.h>
+
+class CPortLatencyHWBase;
+
+/**
+ * general SL cfg
+ *
+ */
+class CRxSlCfg {
+ public:
+ CRxSlCfg (){
+ m_max_ports = 0;
+ m_cps = 0.0;
+ }
+
+ public:
+ uint32_t m_max_ports;
+ double m_cps;
+ CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
+};
+
+/**
+ * describes the filter type applied to the RX
+ * RX_FILTER_MODE_HW - only hardware filtered traffic will
+ * reach the RX core
+ *
+ */
+typedef enum rx_filter_mode_ {
+ RX_FILTER_MODE_HW,
+ RX_FILTER_MODE_ALL
+} rx_filter_mode_e;
+
+/**
+ * holds RX capture info
+ *
+ */
+class RXCaptureInfo {
+public:
+ RXCaptureInfo() {
+ m_is_active = false;
+ m_limit = 0;
+ m_shared_counter = 0;
+ }
+
+ void enable(const std::string &pcap_filename, uint64_t limit) {
+ m_pcap_filename = pcap_filename;
+ m_limit = limit;
+ m_is_active = true;
+ }
+
+ void disable() {
+ m_is_active = false;
+ m_pcap_filename = "";
+ m_limit = 0;
+ }
+
+ bool is_empty() const {
+ return (m_shared_counter == 0);
+ }
+
+ void to_json(Json::Value &output) const {
+ output["is_active"] = m_is_active;
+ if (m_is_active) {
+ output["pcap_filename"] = m_pcap_filename;
+ output["limit"] = Json::UInt64(m_limit);
+ output["count"] = Json::UInt64(m_shared_counter);
+ }
+ }
+
+public:
+ bool m_is_active;
+ std::string m_pcap_filename;
+ uint64_t m_limit;
+ uint64_t m_shared_counter;
+};
+
+
+class RXQueueInfo {
+public:
+
+ RXQueueInfo() {
+ m_is_active = false;
+ m_shared_counter = 0;
+ }
+
+ void enable(uint64_t size) {
+ m_size = size;
+ m_is_active = true;
+ }
+
+ void disable() {
+ m_is_active = false;
+ m_size = 0;
+ }
+
+ bool is_empty() const {
+ return (m_shared_counter == 0);
+ }
+
+ void to_json(Json::Value &output) const {
+ output["is_active"] = m_is_active;
+ if (m_is_active) {
+ output["size"] = Json::UInt64(m_size);
+ output["count"] = Json::UInt64(m_shared_counter);
+ }
+ }
+
+public:
+ bool m_is_active;
+ uint64_t m_size;
+ uint64_t m_shared_counter;
+};
+
+
+/**
+ * holds all the RX features info
+ *
+ * @author imarom (11/7/2016)
+ */
+class RXFeaturesInfo {
+public:
+ RXCaptureInfo m_rx_capture_info;
+ RXQueueInfo m_rx_queue_info;
+
+ void to_json(Json::Value &msg) const {
+ m_rx_capture_info.to_json(msg["sniffer"]);
+ m_rx_queue_info.to_json(msg["queue"]);
+ }
+};
+
+#endif /* __TREX_STATELESS_RX_DEFS_H__ */
+
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
new file mode 100644
index 00000000..2683dbe1
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -0,0 +1,253 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+#include "bp_sim.h"
+#include "trex_stateless_rx_port_mngr.h"
+#include "common/captureFile.h"
+#include "trex_stateless_rx_core.h"
+
+/************************** latency feature ************/
+void RXLatency::handle_pkt(const rte_mbuf_t *m) {
+ CFlowStatParser parser;
+
+ if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
+ uint32_t ip_id;
+ if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) {
+ if (m_rcv_all || is_flow_stat_id(ip_id)) {
+ uint16_t hw_id;
+ if (m_rcv_all || is_flow_stat_payload_id(ip_id)) {
+ bool good_packet = true;
+ uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
+ struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
+ (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
+ hw_id = fsp_head->hw_id;
+ CRFC2544Info *curr_rfc2544;
+
+ if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
+ good_packet = false;
+ if (!m_rcv_all)
+ m_err_cntrs->m_bad_header++;
+ } else {
+ curr_rfc2544 = &m_rfc2544[hw_id];
+
+ if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
+ // bad flow seq num
+ // Might be the first packet of a new flow, packet from an old flow, or garbage.
+
+ if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
+ // packet from previous flow using this hw_id that arrived late
+ good_packet = false;
+ m_err_cntrs->m_old_flow++;
+ } else {
+ if (curr_rfc2544->no_flow_seq()) {
+ // first packet we see from this flow
+ good_packet = true;
+ curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
+ } else {
+ // garbage packet
+ good_packet = false;
+ m_err_cntrs->m_bad_header++;
+ }
+ }
+ }
+ }
+
+ if (good_packet) {
+ uint32_t pkt_seq = fsp_head->seq;
+ uint32_t exp_seq = curr_rfc2544->get_seq();
+ if (unlikely(pkt_seq != exp_seq)) {
+ if (pkt_seq < exp_seq) {
+ if (exp_seq - pkt_seq > 100000) {
+ // packet loss while we had wrap around
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ } else {
+ if (pkt_seq == (exp_seq - 1)) {
+ curr_rfc2544->inc_dup();
+ } else {
+ curr_rfc2544->inc_ooo();
+ // We thought it was lost, but it was just out of order
+ curr_rfc2544->dec_seq_err();
+ }
+ curr_rfc2544->inc_seq_err_too_low();
+ }
+ } else {
+ if (unlikely (pkt_seq - exp_seq > 100000)) {
+ // packet reorder while we had wrap around
+ if (pkt_seq == (exp_seq - 1)) {
+ curr_rfc2544->inc_dup();
+ } else {
+ curr_rfc2544->inc_ooo();
+ // We thought it was lost, but it was just out of order
+ curr_rfc2544->dec_seq_err();
+ }
+ curr_rfc2544->inc_seq_err_too_low();
+ } else {
+ // seq > curr_rfc2544->seq. Assuming lost packets
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ }
+ }
+ } else {
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ }
+ m_rx_pg_stat_payload[hw_id].add_pkts(1);
+ m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
+ uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
+ dsec_t ctime = ptime_convert_hr_dsec(d);
+ curr_rfc2544->add_sample(ctime);
+ }
+ } else {
+ hw_id = get_hw_id(ip_id);
+ if (hw_id < MAX_FLOW_STATS) {
+ m_rx_pg_stat[hw_id].add_pkts(1);
+ m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
+ }
+ }
+ }
+ }
+ }
+}
+
+void
+RXLatency::reset_stats() {
+ for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
+ m_rx_pg_stat[hw_id].clear();
+ }
+}
+
+/****************************** packet recorder ****************************/
+
+RXPacketRecorder::RXPacketRecorder() {
+ m_writer = NULL;
+ m_shared_counter = NULL;
+ m_limit = 0;
+ m_epoch = -1;
+}
+
+RXPacketRecorder::~RXPacketRecorder() {
+ stop();
+}
+
+void
+RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
+ m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
+ if (m_writer == NULL) {
+ std::stringstream ss;
+ ss << "unable to create PCAP file: " << pcap;
+ throw TrexException(ss.str());
+ }
+
+ assert(limit > 0);
+ m_limit = limit;
+ m_shared_counter = shared_counter;
+ (*m_shared_counter) = 0;
+}
+
+void
+RXPacketRecorder::stop() {
+ if (m_writer) {
+ delete m_writer;
+ m_writer = NULL;
+ }
+}
+
+void
+RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
+ if (!m_writer) {
+ return;
+ }
+
+ dsec_t now = now_sec();
+ if (m_epoch < 0) {
+ m_epoch = now;
+ }
+
+ dsec_t dt = now - m_epoch;
+
+ CPktNsecTimeStamp t_c(dt);
+ m_pkt.time_nsec = t_c.m_time_nsec;
+ m_pkt.time_sec = t_c.m_time_sec;
+
+ const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
+ m_pkt.pkt_len = m->pkt_len;
+ memcpy(m_pkt.raw, p, m->pkt_len);
+
+ m_writer->write_packet(&m_pkt);
+
+ m_limit--;
+ (*m_shared_counter)++;
+
+ if (m_limit == 0) {
+ stop();
+ }
+}
+
+
+void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
+
+ /* handle features */
+
+ if (is_feature_set(LATENCY)) {
+ m_latency.handle_pkt(m);
+ }
+
+ if (is_feature_set(CAPTURE)) {
+ m_recorder.handle_pkt(m);
+ }
+
+ if (is_feature_set(QUEUE)) {
+ m_pkt_buffer->handle_pkt(m);
+ }
+}
+
+
+int RXPortManager::process_all_pending_pkts(bool flush_rx) {
+
+ rte_mbuf_t *rx_pkts[64];
+
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = m_io->rx_burst(rx_pkts, 64);
+ if (cnt_p == 0) {
+ return cnt_p;
+ }
+
+
+ m_cpu_dp_u->start_work1();
+
+ for (int j = 0; j < cnt_p; j++) {
+ rte_mbuf_t *m = rx_pkts[j];
+
+ if (!flush_rx) {
+ handle_pkt(m);
+ }
+
+ rte_pktmbuf_free(m);
+ }
+
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u->commit1();
+
+
+ return cnt_p;
+}
+
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
new file mode 100644
index 00000000..aa8ba8e9
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -0,0 +1,411 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef __TREX_STATELESS_RX_PORT_MNGR_H__
+#define __TREX_STATELESS_RX_PORT_MNGR_H__
+
+#include <stdint.h>
+#include "common/base64.h"
+
+#include "common/captureFile.h"
+
+/************************* latency ***********************/
+
+class CPortLatencyHWBase;
+class CRFC2544Info;
+class CRxCoreErrCntrs;
+
+class RXLatency {
+public:
+
+ RXLatency() {
+ m_rcv_all = false;
+ m_rfc2544 = NULL;
+ m_err_cntrs = NULL;
+
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+ }
+
+ void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
+ m_rfc2544 = rfc2544;
+ m_err_cntrs = err_cntrs;
+ }
+
+ void reset_stats();
+
+ void handle_pkt(const rte_mbuf_t *m);
+
+private:
+ bool is_flow_stat_id(uint32_t id) {
+ if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
+ return false;
+ }
+
+ bool is_flow_stat_payload_id(uint32_t id) {
+ if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
+ return false;
+ }
+
+ uint16_t get_hw_id(uint16_t id) {
+ return (0x00ff & id);
+}
+
+public:
+
+ rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
+ rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD];
+
+ bool m_rcv_all;
+ CRFC2544Info *m_rfc2544;
+ CRxCoreErrCntrs *m_err_cntrs;
+};
+
+/************************ queue ***************************/
+
+/**
+ * describes a single saved RX packet
+ *
+ */
+class RxPacket {
+public:
+
+ RxPacket(const rte_mbuf_t *m) {
+ /* assume single part packet */
+ assert(m->nb_segs == 1);
+
+ m_size = m->pkt_len;
+ const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
+
+ m_raw = (uint8_t *)malloc(m_size);
+ memcpy(m_raw, p, m_size);
+ }
+
+ /* RVO here - no performance impact */
+ const std::string to_base64_str() const {
+ return base64_encode(m_raw, m_size);
+ }
+
+ ~RxPacket() {
+ if (m_raw) {
+ delete m_raw;
+ }
+ }
+
+private:
+
+ uint8_t *m_raw;
+ uint16_t m_size;
+};
+
+/**
+ * a simple cyclic buffer to hold RX packets
+ *
+ */
+class RxPacketBuffer {
+public:
+
+ RxPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ m_shared_counter = shared_counter;
+
+ *m_shared_counter = 0;
+
+ m_buffer = new RxPacket*[m_size](); // zeroed
+
+ m_is_enabled = true;
+ }
+
+ ~RxPacketBuffer() {
+ assert(m_buffer);
+
+ while (!is_empty()) {
+ RxPacket *pkt = pop();
+ delete pkt;
+ }
+ delete [] m_buffer;
+ }
+
+ /* freeze the data structure - no more packets can be pushed / poped */
+ RxPacketBuffer * freeze_and_clone() {
+ /* create a new one */
+ RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter);
+
+ /* freeze the current */
+ m_shared_counter = NULL;
+ m_is_enabled = false;
+
+ return new_buffer;
+ }
+
+ bool is_empty() const {
+ return (m_head == m_tail);
+ }
+
+ bool is_full() const {
+ return ( next(m_head) == m_tail);
+ }
+
+ void handle_pkt(const rte_mbuf_t *m) {
+ assert(m_is_enabled);
+
+ /* if full - pop the oldest */
+ if (is_full()) {
+ delete pop();
+ }
+
+ (*m_shared_counter)++;
+
+ m_buffer[m_head] = new RxPacket(m);
+ m_head = next(m_head);
+ }
+
+ /**
+ * generate a JSON output of the queue
+ *
+ */
+ Json::Value to_json() const {
+
+ Json::Value output = Json::arrayValue;
+
+ int tmp = m_tail;
+ while (tmp != m_head) {
+ RxPacket *pkt = m_buffer[tmp];
+ output.append(pkt->to_base64_str());
+ tmp = next(tmp);
+ }
+
+ return output;
+ }
+
+private:
+ int next(int v) const {
+ return ( (v + 1) % m_size );
+ }
+
+ /* pop in case of full queue - internal usage */
+ RxPacket * pop() {
+ assert(m_is_enabled);
+ assert(!is_empty());
+
+ RxPacket *pkt = m_buffer[m_tail];
+ m_tail = next(m_tail);
+ (*m_shared_counter)--;
+ return pkt;
+ }
+
+ int m_head;
+ int m_tail;
+ int m_size;
+ RxPacket **m_buffer;
+ uint64_t *m_shared_counter;
+ bool m_is_enabled;
+};
+
+/************************ recoder ***************************/
+
+/**
+ * RX packet recorder to PCAP file
+ *
+ */
+class RXPacketRecorder {
+public:
+ RXPacketRecorder();
+ ~RXPacketRecorder();
+ void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
+ void stop();
+ void handle_pkt(const rte_mbuf_t *m);
+
+private:
+ CFileWriterBase *m_writer;
+ CCapPktRaw m_pkt;
+ dsec_t m_epoch;
+ uint64_t m_limit;
+ uint64_t *m_shared_counter;
+};
+
+
+/************************ manager ***************************/
+
+/**
+ * per port RX features manager
+ *
+ * @author imarom (10/30/2016)
+ */
+class RXPortManager {
+public:
+ enum features_t {
+ LATENCY = 0x1,
+ CAPTURE = 0x2,
+ QUEUE = 0x4
+ };
+
+ RXPortManager() {
+ m_features = 0;
+ m_pkt_buffer = NULL;
+ m_io = NULL;
+ m_cpu_dp_u = NULL;
+ }
+
+ void create(CPortLatencyHWBase *io,
+ CRFC2544Info *rfc2544,
+ CRxCoreErrCntrs *err_cntrs,
+ CCpuUtlDp *cpu_util) {
+ m_io = io;
+ m_cpu_dp_u = cpu_util;
+ m_latency.create(rfc2544, err_cntrs);
+ }
+
+ void clear_stats() {
+ m_latency.reset_stats();
+ }
+
+ RXLatency & get_latency() {
+ return m_latency;
+ }
+
+ void enable_latency() {
+ set_feature(LATENCY);
+ }
+
+ void disable_latency() {
+ unset_feature(LATENCY);
+ }
+
+ /**
+ * capturing of RX packets
+ *
+ * @author imarom (11/2/2016)
+ *
+ * @param pcap
+ * @param limit_pkts
+ */
+ void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
+ m_recorder.start(pcap, limit_pkts, shared_counter);
+ set_feature(CAPTURE);
+ }
+
+ void stop_capture() {
+ m_recorder.stop();
+ unset_feature(CAPTURE);
+ }
+
+ /**
+ * queueing packets
+ *
+ */
+ void start_queue(uint32_t size, uint64_t *shared_counter) {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ }
+ m_pkt_buffer = new RxPacketBuffer(size, shared_counter);
+ set_feature(QUEUE);
+ }
+
+ void stop_queue() {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ m_pkt_buffer = NULL;
+ }
+ unset_feature(QUEUE);
+ }
+
+ RxPacketBuffer *get_pkt_buffer() {
+ if (!is_feature_set(QUEUE)) {
+ return NULL;
+ }
+
+ assert(m_pkt_buffer);
+
+ /* hold a pointer to the old one */
+ RxPacketBuffer *old_buffer = m_pkt_buffer;
+
+ /* replace the old one with a new one and freeze the old */
+ m_pkt_buffer = old_buffer->freeze_and_clone();
+
+ return old_buffer;
+ }
+
+
+ /**
+ * fetch and process all packets
+ *
+ */
+ int process_all_pending_pkts(bool flush_rx = false);
+
+
+ /**
+ * flush all pending packets without processing them
+ *
+ */
+ void flush_all_pending_pkts() {
+ process_all_pending_pkts(true);
+ }
+
+
+ /**
+ * handle a single packet
+ *
+ */
+ void handle_pkt(const rte_mbuf_t *m);
+
+
+ bool has_features_set() {
+ return (m_features != 0);
+ }
+
+
+ bool no_features_set() {
+ return (!has_features_set());
+ }
+
+private:
+
+
+ void set_feature(features_t feature) {
+ m_features |= feature;
+ }
+
+ void unset_feature(features_t feature) {
+ m_features &= (~feature);
+ }
+
+ bool is_feature_set(features_t feature) {
+ return ( (m_features & feature) == feature );
+ }
+
+ uint32_t m_features;
+ RXPacketRecorder m_recorder;
+ RXLatency m_latency;
+ RxPacketBuffer *m_pkt_buffer;
+ CCpuUtlDp *m_cpu_dp_u;
+ CPortLatencyHWBase *m_io;
+};
+
+
+
+#endif /* __TREX_STATELESS_RX_PORT_MNGR_H__ */
+
diff --git a/src/trex_port_attr.cpp b/src/trex_port_attr.cpp
new file mode 100644
index 00000000..26199e33
--- /dev/null
+++ b/src/trex_port_attr.cpp
@@ -0,0 +1,77 @@
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_port_attr.h"
+#include "bp_sim.h"
+
+const uint8_t DestAttr::g_dummy_mac[6] = {0x0,0x0,0x0,0x1,0x0,0x0};
+
+
+DestAttr::DestAttr(uint8_t port_id) {
+ m_port_id = port_id;
+
+ m_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest;
+}
+
+const uint8_t *
+TRexPortAttr::get_src_mac() const {
+ return CGlobalInfo::m_options.get_src_mac_addr(m_port_id);
+}
+
+
+std::string
+TRexPortAttr::get_rx_filter_mode() const {
+ switch (m_rx_filter_mode) {
+ case RX_FILTER_MODE_ALL:
+ return "all";
+ case RX_FILTER_MODE_HW:
+ return "hw";
+ default:
+ assert(0);
+ }
+}
+
+
+void
+TRexPortAttr::to_json(Json::Value &output) {
+
+ output["src_mac"] = utl_macaddr_to_str(get_src_mac());
+ output["promiscuous"]["enabled"] = get_promiscuous();
+ output["link"]["up"] = is_link_up();
+ output["speed"] = get_link_speed();
+ output["rx_filter_mode"] = get_rx_filter_mode();
+
+ if (get_src_ipv4() != 0) {
+ output["src_ipv4"] = utl_uint32_to_ipv4(get_src_ipv4());
+ } else {
+ output["src_ipv4"] = "none";
+ }
+
+
+ int mode;
+ get_flow_ctrl(mode);
+ output["fc"]["mode"] = mode;
+
+ m_dest.to_json(output["dest"]);
+
+}
+
+void
+TRexPortAttr::update_src_dst_mac(uint8_t *raw_pkt) {
+ memcpy(raw_pkt, get_dest().get_dest_mac(), 6);
+ memcpy(raw_pkt + 6, get_src_mac(), 6);
+}
+
diff --git a/src/trex_port_attr.h b/src/trex_port_attr.h
index 9231e263..eb7c85de 100755
--- a/src/trex_port_attr.h
+++ b/src/trex_port_attr.h
@@ -21,10 +21,141 @@ limitations under the License.
#include <vector>
#include "rte_ethdev_includes.h"
#include "trex_defs.h"
+#include "common/basic_utils.h"
+#include <json/json.h>
+#include "trex_stateless_rx_defs.h"
+#include <string.h>
+
+/**
+ * destination port attribute
+ *
+ */
+class DestAttr {
+private:
+ static const uint8_t g_dummy_mac[6];
+public:
+
+ DestAttr(uint8_t port_id);
+
+ enum dest_type_e {
+ DEST_TYPE_IPV4 = 1,
+ DEST_TYPE_MAC = 2
+ };
+
+ /**
+ * set dest as an IPv4 unresolved
+ */
+ void set_dest_ipv4(uint32_t ipv4) {
+ assert(ipv4 != 0);
+
+ m_src_ipv4 = ipv4;
+ memset(m_mac, 0, 6);
+ m_type = DEST_TYPE_IPV4;
+ }
+
+ /**
+ * set dest as a resolved IPv4
+ */
+ void set_dest_ipv4(uint32_t ipv4, const uint8_t *mac) {
+ assert(ipv4 != 0);
+
+ m_src_ipv4 = ipv4;
+ memcpy(m_mac, mac, 6);
+ m_type = DEST_TYPE_IPV4;
+
+ }
+
+ /**
+ * dest dest as MAC
+ *
+ */
+ void set_dest_mac(const uint8_t *mac) {
+
+ m_src_ipv4 = 0;
+ memcpy(m_mac, mac, 6);
+ m_type = DEST_TYPE_MAC;
+ }
+
+
+ bool is_resolved() const {
+ if (m_type == DEST_TYPE_MAC) {
+ return true;
+ }
+
+ for (int i = 0; i < 6; i++) {
+ if (m_mac[i] != 0) {
+ return true;
+ }
+ }
+
+ /* all zeroes - non resolved */
+ return false;
+ }
+
+ /**
+ * get the dest mac
+ * if no MAC is configured and dest was not resolved
+ * will return a dummy
+ */
+ const uint8_t *get_dest_mac() {
+
+ if (is_resolved()) {
+ return m_mac;
+ } else {
+ return g_dummy_mac;
+ }
+ }
+
+ /**
+ * when link gets down - this should be called
+ *
+ */
+ void on_link_down() {
+ if (m_type == DEST_TYPE_IPV4) {
+ /* reset the IPv4 dest with no resolution */
+ set_dest_ipv4(m_src_ipv4);
+ }
+ }
+
+ void to_json(Json::Value &output) {
+ switch (m_type) {
+
+ case DEST_TYPE_IPV4:
+ output["type"] = "ipv4";
+ output["addr"] = utl_uint32_to_ipv4(m_src_ipv4);
+ if (is_resolved()) {
+ output["arp"] = utl_macaddr_to_str(m_mac);
+ } else {
+ output["arp"] = "none";
+ }
+ break;
+
+ case DEST_TYPE_MAC:
+ output["type"] = "mac";
+ output["addr"] = utl_macaddr_to_str(m_mac);
+ break;
+
+ default:
+ assert(0);
+ }
+
+ }
+
+private:
+ uint32_t m_src_ipv4;
+ uint8_t *m_mac;
+ dest_type_e m_type;
+ uint8_t m_port_id;
+};
class TRexPortAttr {
public:
+
+ TRexPortAttr(uint8_t port_id) : m_dest(port_id) {
+ m_src_ipv4 = 0;
+ }
+
virtual ~TRexPortAttr(){}
/* UPDATES */
@@ -33,10 +164,10 @@ public:
virtual void update_device_info() = 0;
virtual void reset_xstats() = 0;
virtual void update_description() = 0;
-
+
/* GETTERS */
virtual bool get_promiscuous() = 0;
- virtual void macaddr_get(struct ether_addr *mac_addr) = 0;
+ virtual void get_hw_src_mac(struct ether_addr *mac_addr) = 0;
virtual uint32_t get_link_speed() { return m_link.link_speed; } // L1 Mbps
virtual bool is_link_duplex() { return (m_link.link_duplex ? true : false); }
virtual bool is_link_autoneg() { return (m_link.link_autoneg ? true : false); }
@@ -51,24 +182,50 @@ public:
virtual void get_description(std::string &description) { description = intf_info_st.description; }
virtual void get_supported_speeds(supp_speeds_t &supp_speeds) = 0;
+ uint32_t get_src_ipv4() {return m_src_ipv4;}
+ DestAttr & get_dest() {return m_dest;}
+
+ const uint8_t *get_src_mac() const;
+ std::string get_rx_filter_mode() const;
+
+ /* for a raw packet, write the src/dst MACs */
+ void update_src_dst_mac(uint8_t *raw_pkt);
+
/* SETTERS */
virtual int set_promiscuous(bool enabled) = 0;
virtual int add_mac(char * mac) = 0;
virtual int set_link_up(bool up) = 0;
virtual int set_flow_ctrl(int mode) = 0;
virtual int set_led(bool on) = 0;
-
-/* DUMPS */
+ virtual int set_rx_filter_mode(rx_filter_mode_e mode) = 0;
+
+ void set_src_ipv4(uint32_t addr) {
+ m_src_ipv4 = addr;
+ }
+
+ /* DUMPS */
virtual void dump_link(FILE *fd) = 0;
+ /* dump object status to JSON */
+ void to_json(Json::Value &output);
+
+
protected:
- uint8_t m_port_id;
- rte_eth_link m_link;
- struct rte_eth_dev_info dev_info;
- bool flag_is_virtual;
- bool flag_is_fc_change_supported;
- bool flag_is_led_change_supported;
- bool flag_is_link_change_supported;
+
+ uint8_t m_port_id;
+ rte_eth_link m_link;
+ uint32_t m_src_ipv4;
+ DestAttr m_dest;
+
+ struct rte_eth_dev_info dev_info;
+
+ rx_filter_mode_e m_rx_filter_mode;
+
+ bool flag_is_virtual;
+ bool flag_is_fc_change_supported;
+ bool flag_is_led_change_supported;
+ bool flag_is_link_change_supported;
+
struct intf_info_st {
std::string pci_addr;
@@ -81,8 +238,11 @@ protected:
class DpdkTRexPortAttr : public TRexPortAttr {
public:
- DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) {
+ DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) : TRexPortAttr(port_id) {
+
m_port_id = port_id;
+ m_rx_filter_mode = RX_FILTER_MODE_HW;
+
flag_is_virtual = is_virtual;
int tmp;
flag_is_fc_change_supported = fc_change_allowed && (get_flow_ctrl(tmp) != -ENOTSUP);
@@ -101,7 +261,7 @@ public:
/* GETTERS */
virtual bool get_promiscuous();
- virtual void macaddr_get(struct ether_addr *mac_addr);
+ virtual void get_hw_src_mac(struct ether_addr *mac_addr);
virtual int get_xstats_values(xstats_values_t &xstats_values);
virtual int get_xstats_names(xstats_names_t &xstats_names);
virtual int get_flow_ctrl(int &mode);
@@ -114,6 +274,7 @@ public:
virtual int set_flow_ctrl(int mode);
virtual int set_led(bool on);
+ virtual int set_rx_filter_mode(rx_filter_mode_e mode);
/* DUMPS */
virtual void dump_link(FILE *fd);
@@ -128,7 +289,7 @@ private:
class SimTRexPortAttr : public TRexPortAttr {
public:
- SimTRexPortAttr() {
+ SimTRexPortAttr() : TRexPortAttr(0) {
m_link.link_speed = 10000;
m_link.link_duplex = 1;
m_link.link_autoneg = 0;
@@ -146,7 +307,7 @@ public:
void reset_xstats() {}
void update_description() {}
bool get_promiscuous() { return false; }
- void macaddr_get(struct ether_addr *mac_addr) {}
+ void get_hw_src_mac(struct ether_addr *mac_addr) {}
int get_xstats_values(xstats_values_t &xstats_values) { return -ENOTSUP; }
int get_xstats_names(xstats_names_t &xstats_names) { return -ENOTSUP; }
int get_flow_ctrl(int &mode) { return -ENOTSUP; }
@@ -158,6 +319,7 @@ public:
int set_flow_ctrl(int mode) { return -ENOTSUP; }
int set_led(bool on) { return -ENOTSUP; }
void dump_link(FILE *fd) {}
+ int set_rx_filter_mode(rx_filter_mode_e mode) { return -ENOTSUP; }
};