diff options
-rw-r--r--scripts/automation/regression/setups/trex03/benchmark.yaml (renamed from scripts/automation/regression/setups/trex15/benchmark.yaml)0
-rw-r--r--scripts/automation/regression/setups/trex03/config.yaml (renamed from scripts/automation/regression/setups/trex15/config.yaml)2
-rw-r--r--scripts/automation/regression/setups/trex06/benchmark.yaml (renamed from scripts/automation/regression/setups/trex17/benchmark.yaml)2
-rw-r--r--scripts/automation/regression/setups/trex06/config.yaml (renamed from scripts/automation/regression/setups/trex17/config.yaml)2
74 files changed, 4738 insertions, 985 deletions
diff --git a/doc/ b/doc/
index e1210772..10619532 100755
--- a/doc/
+++ b/doc/
@@ -38,7 +38,7 @@ def initialize_analyticsreporting():
return analytics
-def get_report(analytics,start_date='2016-11-06',end_date='2016-11-27'):
+def get_report(analytics,start_date='2016-11-27',end_date='2016-11-27'):
# Use the Analytics Service Object to query the Analytics Reporting API V4.
return analytics.reports().batchGet(
@@ -80,31 +80,9 @@ def print_response(response):
print metricHeader.get('name') + ': ' + value
-def export_to_dict(response):
- df = {'Test_name':[],'State':[],'Setup':[],'Test_type':[],'MPPS':[],'MPPS-Golden min':[],'MPPS-Golden max':[]}
- for report in response.get('reports', []):
- rows = report.get('data', {}).get('rows', [])
- for row in rows:
- dimensions = row.get('dimensions', [])
- # print 'this is dimensions'
- # print dimensions
- df['Test_name'].append(dimensions[1])
- df['State'].append(dimensions[2])
- df['Setup'].append(dimensions[3])
- df['Test_type'].append(dimensions[4])
- dateRangeValues = row.get('metrics', [])
- value = dateRangeValues[0].get('values',[])[0]
- golden_min = dateRangeValues[0].get('values',[])[1]
- golden_max = dateRangeValues[0].get('values',[])[2]
- # print value
- df['MPPS'].append(value)
- df['MPPS-Golden min'].append(golden_min)
- df['MPPS-Golden max'].append(golden_max)
- return df
def export_to_tuples(response):
- counter = 0
+ # counter = 0
setups = set()
df = {}
for report in response.get('reports', []):
@@ -125,7 +103,6 @@ def export_to_tuples(response):
- counter+=1
if dimensions[3] in setups:
if dimensions[1] in df[dimensions[3]]:
@@ -135,7 +112,7 @@ def export_to_tuples(response):
df[dimensions[3]] = {}
df[dimensions[3]][dimensions[1]] = [tuple(data)]
- print 'counter is: %d' % counter
+ # print 'counter is: %d' % counter
return df, setups
@@ -143,9 +120,72 @@ def main():
analytics = initialize_analyticsreporting()
response = get_report(analytics)
df, setups = export_to_tuples(response)
+ # pprint(df)
+ return df,setups
- #pprint(response)
if __name__ == '__main__':
- main()
+ main()
+response structure (when fetched with "export to tuples"):
+{ 'setup1': {'test_name1': [(test_res1),(test_res2),...],
+ 'test_name2': [(test_res1),(test_res2),...]
+ },
+ 'setup2': {'test_name1': [(test_res1),(test_res2),...],
+ 'test_name2': [(test_res1),(test_res2),...]
+ },
+ .
+ .
+ .
+ .
+{u'kiwi02': {u'VM - 64 bytes, multi CPU, cache size 1024': [(u'VM - 64 bytes, multi CPU, cache size 1024',
+ u'stl',
+ u'performance',
+ u'19.711146',
+ u'19.0',
+ u'22.0'),
+ (u'VM - 64 bytes, multi CPU, cache size 1024',
+ u'stl',
+ u'performance',
+ u'19.581567',
+ u'19.0',
+ u'22.0')],
+ u'VM - 64 bytes, multi CPUs': [(u'VM - 64 bytes, multi CPUs',
+ u'stl',
+ u'performance',
+ u'10.398847',
+ u'9.7',
+ u'12.5'),
+ (u'VM - 64 bytes, multi CPUs',
+ u'stl',
+ u'performance',
+ u'10.925308',
+ u'9.7',
+ u'12.5')
+ ]
+ }
+ u'trex07': {u'VM - 64 bytes, multi CPU, cache size 1024': [(u'VM - 64 bytes, multi CPU, cache size 1024',
+ u'stl',
+ u'performance',
+ u'25.078212',
+ u'9.0',
+ u'15.0')
+ ]
+ u'VM - 64 bytes, multi CPUs': [(u'VM - 64 bytes, multi CPUs',
+ u'stl',
+ u'performance',
+ u'9.469138',
+ u'8.5',
+ u'12.0')
+ ]
+ }
diff --git a/doc/ b/doc/
index 182d8367..bd4a9a2b 100755
--- a/doc/
+++ b/doc/
@@ -3,6 +3,7 @@ import sys
import AnalyticsConnect as ac
import TRexDataAnalysis as tr
import time
+import datetime
def main(verbose = False):
@@ -10,12 +11,14 @@ def main(verbose = False):
print('Retrieving data from Google Analytics')
analytics = ac.initialize_analyticsreporting()
current_date = time.strftime("%Y-%m-%d")
- response = ac.get_report(analytics, '2016-11-06', current_date)
+ k_days_ago = - datetime.timedelta(days=15)
+ start_date = str(
+ response = ac.get_report(analytics, start_date, current_date)
ga_all_data_dict, setups = ac.export_to_tuples(response)
dest_path = os.path.join(os.getcwd(), 'build', 'images')
if verbose:
print('Saving data to %s' % dest_path)
- tr.create_all_data(ga_all_data_dict, setups, '2016-11-06', current_date, save_path = dest_path,
+ tr.create_all_data(ga_all_data_dict, setups, start_date, current_date, save_path = dest_path,
if verbose:
print('Done without errors.')
diff --git a/doc/ b/doc/
index f8cefc19..fb855a16 100755
--- a/doc/
+++ b/doc/
@@ -42,7 +42,7 @@ def generate_dframe_arr_and_stats_of_tests_per_setup(date, setup_name, setup_dic
def create_plot_for_dframe_arr(dframe_arr, setup_name, start_date, end_date, show='no', save_path='',
- file_name='trend_graph'):
+ file_name='_trend_graph'):
dframe_all = pd.concat(dframe_arr, axis=1)
dframe_all = dframe_all.astype(float)
diff --git a/doc/trex-analytics-howto.asciidoc b/doc/trex-analytics-howto.asciidoc
new file mode 100755
index 00000000..6ad6c216
--- /dev/null
+++ b/doc/trex-analytics-howto.asciidoc
@@ -0,0 +1,58 @@
+TRex Analytics How-To Guide
+:local_web_server_url: csi-wiki-01:8181/trex
+:toclevels: 6
+:tabledef-default.subs: normal,callouts
+// PDF version - image width variable
+:p_width: 450
+= Requirements
+Google Analytics Integration: Google API Python client library: link:[here] +
+Data Analysis: using NumPy, MatPlotLib and Pandas from Anaconda 4.2.0 link:[here]
+= Setup fetch-analysis-publishing routine
+== Build an Analytic Environment
+1. Create a Google Analytics account and property using this link:[link]
+2. Using the Google Analytics tracking guide, send test results to your property. link:[here]
+3. Set up your account to properly fetch the data from Google Analytics, using this guide: link:[here]
+== Fetch and organize
+1. Fetch the data into dictionary of this structure: +
+{ 'setup1': {'test_name1': [(test_res1),(test_res2),...],
+ 'test_name2': [(test_res1),(test_res2),...]
+ },
+ 'setup2': {'test_name1': [(test_res1),(test_res2),...],
+ 'test_name2': [(test_res1),(test_res2),...]
+ },
+ .
+ .
+ .
+ .
+test_res should maintain this structure:
+('VM - 64 bytes, multi CPU, cache size 1024','stl','performance','19.711146','19.0','22.0')
+== Analize and generate plots and data tables
+Use the script to create the plots and data tables for your test results: +
+1. run create_all_data with the entire data dictionary that was fetched, to create plots and tables for each setup provided in the dictionary. +
+provide a "save_path" to save the graphs and plots to your desired location +
+== Build the Document
diff --git a/doc/trex_analytics.asciidoc b/doc/trex_analytics.asciidoc
index 72d4876e..35c3a3e4 100755
--- a/doc/trex_analytics.asciidoc
+++ b/doc/trex_analytics.asciidoc
@@ -42,7 +42,7 @@ include::build/images/trex07_latest_test_runs_stats.csv[]
=== Trend: Analysis Over Time
-image:images/trex07trend_graph.png[title="trex07trend_graph",align="left",width={p_width}, link="images/trex07trend_graph.png"]
+image:images/trex07_trend_graph.png[title="trex07_trend_graph",align="left",width={p_width}, link="images/trex07_trend_graph.png"]
[format="csv", options="header",halign='center']
@@ -68,7 +68,7 @@ include::build/images/trex08_latest_test_runs_stats.csv[]
=== Trend: Analysis Over Time
-image:images/trex08trend_graph.png[title="trex08trend_graph",align="left",width={p_width}, link="images/trex08trend_graph.png"]
+image:images/trex08_trend_graph.png[title="trex08_trend_graph",align="left",width={p_width}, link="images/trex08_trend_graph.png"]
[format="csv", options="header",halign='center']
@@ -96,7 +96,7 @@ include::build/images/trex09_latest_test_runs_stats.csv[]
=== Trend: Analysis Over Time
-image:images/trex09trend_graph.png[title="trex09trend_graph",align="left",width={p_width}, link="images/trex09trend_graph.png"]
+image:images/trex09_trend_graph.png[title="trex09_trend_graph",align="left",width={p_width}, link="images/trex09_trend_graph.png"]
[format="csv", options="header",halign='center']
@@ -120,7 +120,7 @@ include::build/images/trex11_latest_test_runs_stats.csv[]
=== Trend: Analysis Over Time
-image:images/trex11trend_graph.png[title="trex11trend_graph",align="left",width={p_width}, link="images/trex11trend_graph.png"]
+image:images/trex11_trend_graph.png[title="trex11_trend_graph",align="left",width={p_width}, link="images/trex11_trend_graph.png"]
[format="csv", options="header",halign='center']
@@ -148,7 +148,7 @@ include::build/images/kiwi02_latest_test_runs_stats.csv[]
=== Trend: Analysis Over Time
-image:images/kiwi02trend_graph.png[title="kiwi02trend_graph",align="left",width={p_width}, link="images/kiwi02trend_graph.png"]
+image:images/kiwi02_trend_graph.png[title="kiwi02_trend_graph",align="left",width={p_width}, link="images/kiwi02_trend_graph.png"]
[format="csv", options="header",halign='center']
diff --git a/doc/trex_book.asciidoc b/doc/trex_book.asciidoc
index ec661908..ad924360 100755
--- a/doc/trex_book.asciidoc
+++ b/doc/trex_book.asciidoc
@@ -869,30 +869,28 @@ asr1k(config)#ipv6 route 5000::/64 3001::2
=== Client clustering configuration
-TRex supports testing complex topologies, using a feature called "client clustering".
-This feature allows more detailed clustering of clients.
+TRex supports testing complex topologies, with more than one DUT, using a feature called "client clustering".
+This feature allows specifying the distribution of clients TRex emulates.
Let's look at the following topology:
.Topology Example
image:images/topology.png[title="Client Clustering",width=850]
+We have two clusters of DUTs.
+Using config file, you can partition TRex emulated clients to groups, and define
+how they will be spread between the DUT clusters.
-We would like to configure two clusters and direct traffic to them.
-Using config file, you can instruct TRex to generate clients
-with specific configuration per cluster.
-Cluster configuration includes:
+Group configuration includes:
* IP start range.
* IP end range.
-* Initiator side configuration.
-* Responder side configuration.
+* Initiator side configuration. - These are the parameters affecting packets sent from client side.
+* Responder side configuration. - These are the parameters affecting packets sent from server side.
It is important to understand that this is *complimentary* to the client generator
-configured per profile - it only defines how the generator will be clustered.
+configured per profile - it only defines how the clients will be spread between clusters.
Let's look at an example.
@@ -908,11 +906,7 @@ $cat cap2/dns.yaml
clients_end : ""
servers_start : ""
servers_end : ""
- clients_per_gb : 201
- min_clients : 101
dual_port_mask : ""
- tcp_aging : 1
- udp_aging : 1
cap_info :
- name: cap2/dns.pcap
cps : 1.0
@@ -921,8 +915,10 @@ $cat cap2/dns.yaml
w : 1
-We want to create two clusters with 4 devices each.
-We also want to divide *80%* of the traffic to the upper cluster and *20%* to the lower cluster.
+We want to create two clusters with 4 and 3 devices respectively.
+We also want to send *80%* of the traffic to the upper cluster and *20%* to the lower cluster.
+We can specify to which DUT the packet will be sent by MAC address or IP. We will present a MAC
+based example, and then see how to change to be IP based.
We will create the following cluster configuration file.
@@ -938,13 +934,11 @@ We will create the following cluster configuration file.
# 'groups' - each client group must contain range of IPs
# and initiator and responder section
-# 'count' represents the number of different MACs
-# addresses in the group.
-# initiator and responder can contain 'vlan', 'src_mac', 'dst_mac'
+# 'count' represents the number of different DUTs
+# in the group.
-# each group contains a double way VLAN configuration
+# 'true' means each group must contain VLAN configuration. 'false' means no VLAN config allowed.
vlan: true
@@ -956,7 +950,7 @@ groups:
dst_mac : "00:00:00:01:00:00"
responder :
vlan : 200
- dst_mac : "00:00:00:01:00:00"
+ dst_mac : "00:00:00:02:00:00"
count : 4
@@ -964,43 +958,74 @@ groups:
ip_end :
initiator :
vlan : 101
- dst_mac : "01:00:00:00:01:01"
+ dst_mac : "00:00:01:00:00:00"
vlan : 201
- dst_mac : "01:00:00:00:02:01"
+ dst_mac : "00:00:02:00:00:00"
- count : 4
+ count : 3
-The above configuration will divide the generator range of 255 clients to two clusters,
-each with 4 devices and VLAN in both directions.
+The above configuration will divide the generator range of 255 clients to two clusters. The range
+of IPs in all groups in the client config file together, must cover the entire range of client IPs
+from the traffic profile file.
-MACs will be allocated incrementaly, with a wrap around.
+MACs will be allocated incrementally, with a wrap around after count addresses.
-*Initiator side:*
+*Initiator side: (packets with source in 16.x.x.x net)*
-* --> dst_mac: 00:00:00:01:00:00 valn: 100
-* --> dst_mac: 00:00:00:01:00:01 valn: 100
-* --> dst_mac: 00:00:00:01:00:02 valn: 100
-* --> dst_mac: 00:00:00:01:00:03 valn: 100
-* --> dst_mac: 00:00:00:01:00:00 valn: 100
-* --> dst_mac: 00:00:00:01:00:01 valn: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:00 vlan: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:01 vlan: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:02 vlan: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:03 vlan: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:00 vlan: 100
+* -> 48.x.x.x - dst_mac: 00:00:00:01:00:01 vlan: 100
-*responder side:*
+*responder side: (packets with source in 48.x.x.x net)*
+* 48.x.x.x -> - dst_mac(from responder) : "00:00:00:02:00:00" , vlan:200
+* 48.x.x.x -> - dst_mac(from responder) : "00:00:00:02:00:01" , vlan:200
+and so on. +
+This means that the DUT MACs of each cluster has to be changed to be sequential. Other option is to
+specify instead of ``dst_mac'', ip address, using ``next_hop''. +
+For example, config file first group will look like:
-* server -> dst_mac(from responder) : "01:00:00:00:02:01" , valn:201
-* server -> dst_mac(from responder) : "01:00:00:00:02:02" , valn:201
+- ip_start :
+ ip_end :
+ initiator :
+ vlan : 100
+ next_hop :
+ src_ip :
+ responder :
+ vlan : 200
+ next_hop :
+ src_ip :
-and so on.
+ count : 4
+In this case, TRex will try to resolve using ARP requests the addresses
+,,, (and the range If not all IPs are resolved,
+TRex will exit with an error message. ``src_ip'' will be used for sending gratitues ARP, and
+for filling relevant fields in ARP request. If no ``src_ip'' given, TRex will look for source
+IP in the relevant port section in the platform config file (/etc/trex_cfg.yaml). If none is found, TRex
+will exit with an error message. +
+If client config file is given, the ``dest_mac'' and ``default_gw'' parameters from the platform config
+file are ignored.
-With this model every client (e.g. will always have the same path, e.g.
-c->s side will always have initiator VLAN and init-destination MAC and in the response side (s->c) alway responder-VLAN and responder-MAC
+It is important to understand that the ip to MAC coupling (both with MAC based config or IP based)
+is done at the beginning and never changes. Meaning, for example, for the MAC case, packets
+with source IP will always have VLAN 100 and dst MAC 00:00:00:01:00:01.
+Packets with destination IP will always have VLAN 200 and dst MAC "00:00:00:02:00:01.
+This way, you can predict exactly which packet (and how many packets) will go to each DUT.
diff --git a/doc/ b/doc/
index 58f6e98f..9ba1d295 100755
--- a/doc/
+++ b/doc/
@@ -987,6 +987,9 @@ def build(bld):
source='trex_scapy_rpc_server.asciidoc waf.css', target='trex_scapy_rpc_server.html',scan=ascii_doc_scan);
+ bld(rule=convert_to_html_toc_book,
+ source='trex-analytics-howto.asciidoc waf.css', target='trex-analytics-howto.html',scan=ascii_doc_scan);
bld(rule='${ASCIIDOC} -a stylesheet=${SRC[1].abspath()} -a icons=true -a toc2 -a max-width=55em -o ${TGT} ${SRC[0].abspath()}',
source='vm_doc.asciidoc waf.css', target='vm_doc.html', scan=ascii_doc_scan)
@@ -1073,16 +1076,51 @@ def release(bld):
os.system('cp -rv build/release_notes.* '+ release_dir)
+def rsync_int(bld, src, dst):
+ cmd = 'rsync -av --del --rsh=ssh build/{src} {host}:{dir}/{dst}'.format(
+ src = src,
+ host = Env().get_local_web_server(),
+ dir = Env().get_remote_release_path() + '../doc',
+ dst = dst)
+ ret = os.system(cmd)
+ if ret:
+ bld.fatal("cmd '%s' exited with return status" % (cmd, ret))
+def rsync_ext(bld, src, dst):
+ cmd = 'rsync -avz --del -e "ssh -i {key}" --rsync-path=/usr/bin/rsync build/{src} {user}@{host}:{dir}/doc/{dst}'.format(
+ key = Env().get_trex_ex_web_key(),
+ src = src,
+ user = Env().get_trex_ex_web_user(),
+ host = Env().get_trex_ex_web_srv(),
+ dir = Env().get_trex_ex_web_path(),
+ dst = dst)
+ ret = os.system(cmd)
+ if ret:
+ bld.fatal("cmd '%s' exited with return status" % (cmd, ret))
def publish(bld):
- # copy all the files to our web server
- remote_dir = "%s:%s" % ( Env().get_local_web_server(), Env().get_remote_release_path ()+'../doc/')
- os.system('rsync -av --del --rsh=ssh build/ %s' % (remote_dir))
+ # copy all the files to internal web server
+ rsync_int(bld, '', '')
def publish_ext(bld):
- from_ = 'build/'
- os.system('rsync -avz --del -e "ssh -i %s" --rsync-path=/usr/bin/rsync %s %s@%s:%s/doc/' % (Env().get_trex_ex_web_key(),from_, Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path() ) )
+ # copy all the files to external web server
+ rsync_ext(bld, '', '')
+def publish_perf(bld):
+ # copy performance files to internal and external servers
+ rsync_int(bld, 'trex_analytics.html', '')
+ rsync_ext(bld, 'trex_analytics.html', '')
+ rsync_int(bld, 'images/*_latest_test_*', 'images/')
+ rsync_ext(bld, 'images/*_latest_test_*', 'images/')
+ rsync_int(bld, 'images/*_trend_graph.*', 'images/')
+ rsync_ext(bld, 'images/*_trend_graph.*', 'images/')
+ rsync_int(bld, 'images/*_trend_stats.*', 'images/')
+ rsync_ext(bld, 'images/*_trend_stats.*', 'images/')
def publish_test(bld):
# copy all the files to our web server
diff --git a/doc/wscript b/doc/wscript
index 187a5528..f574e69b 100755
--- a/doc/wscript
+++ b/doc/wscript
@@ -52,6 +52,9 @@ def publish_ext(bld):
def publish_web(bld):
+def publish_perf(bld):
+ ws_main.publish_perf(bld)
def sync(bld):
diff --git a/linux/ b/linux/
index c989bb50..31d6b979 100755
--- a/linux/
+++ b/linux/
@@ -121,6 +121,7 @@ main_src = SrcGroup(dir='src',
+ 'trex_port_attr.cpp',
@@ -169,6 +170,8 @@ stateless_src = SrcGroup(dir='src/stateless/',
+ 'rx/trex_stateless_rx_core.cpp',
+ 'rx/trex_stateless_rx_port_mngr.cpp'
# RPC code
rpc_server_src = SrcGroup(dir='src/rpc-server/',
diff --git a/linux_dpdk/ b/linux_dpdk/
index 28a8f4c1..fa427fee 100755
--- a/linux_dpdk/
+++ b/linux_dpdk/
@@ -13,6 +13,7 @@ import re
import uuid
import subprocess
import platform
+from waflib import Logs
# these variables are mandatory ('/' are converted automatically)
top = '../'
@@ -108,8 +109,12 @@ def check_ibverbs_deps(bld):
if ret or not out:
bld.fatal("Command of checking libraries '%s' failed.\nReturn status: %s\nOutput: %s" % (cmd, ret, out))
if '=> not found' in out:
+ Logs.pprint('YELLOW', 'Could not find dependency libraries of')
+ for line in out.splitlines():
+ if '=> not found' in line:
+ Logs.pprint('YELLOW', line)
dumy_libs_path = os.path.abspath(top + 'scripts/dumy_libs')
- print('Adding rpath %s' % dumy_libs_path)
+ Logs.pprint('YELLOW', 'Adding rpath of %s' % dumy_libs_path)
@@ -134,6 +139,10 @@ def configure(conf):
conf.check_cxx(lib = 'z', errmsg = missing_pkg_msg(fedora = 'zlib-devel', ubuntu = 'zlib1g-dev'))
+ try:
+ conf.check_cxx(lib = 'ibverbs', errmsg = 'Could not find library ibverbs, will try internal version.')
+ except:
+ pass
def getstatusoutput(cmd):
@@ -177,6 +186,7 @@ main_src = SrcGroup(dir='src',
+ 'trex_port_attr.cpp',
@@ -247,7 +257,8 @@ stateless_src = SrcGroup(dir='src/stateless/',
- 'rx/trex_stateless_rx_core.cpp'
+ 'rx/trex_stateless_rx_core.cpp',
+ 'rx/trex_stateless_rx_port_mngr.cpp'
# JSON package
json_src = SrcGroup(dir='external_libs/json',
@@ -832,12 +843,15 @@ def post_build(bld):
def build(bld):
- check_ibverbs_deps(bld)
bld.read_shlib( name='zmq' , paths=[top+zmq_lib_path] )
- ibverbs_lib_path='external_libs/ibverbs/'
- bld.read_shlib( name='ibverbs' , paths=[top+ibverbs_lib_path] )
+ if bld.env['LIB_IBVERBS']:
+ bld.read_shlib(name='ibverbs')
+ else:
+ ibverbs_lib_path='external_libs/ibverbs/'
+ bld.read_shlib( name='ibverbs' , paths=[top+ibverbs_lib_path] )
+ check_ibverbs_deps(bld)
for obj in build_types:
diff --git a/scripts/automation/regression/cfg/client_cfg.yaml b/scripts/automation/regression/cfg/client_cfg.yaml
new file mode 100644
index 00000000..5c3d19ef
--- /dev/null
+++ b/scripts/automation/regression/cfg/client_cfg.yaml
@@ -0,0 +1,48 @@
+#vlan: true
+vlan: false
+- ip_start :
+ ip_end :
+ initiator :
+ next_hop:
+ src_ip :
+ responder :
+ next_hop:
+ src_ip :
+ count : 1
+- ip_start :
+ ip_end :
+ initiator :
+ next_hop:
+ src_ip :
+ responder :
+ next_hop:
+ src_ip :
+ count : 1
+- ip_start :
+ ip_end :
+ initiator :
+ next_hop:
+ src_ip :
+ responder :
+ next_hop:
+ src_ip :
+ count : 1
+- ip_start :
+ ip_end :
+ initiator :
+ next_hop:
+ src_ip :
+ responder :
+ next_hop:
+ src_ip :
+ count : 1
diff --git a/scripts/automation/regression/setups/trex15/benchmark.yaml b/scripts/automation/regression/setups/trex03/benchmark.yaml
index b366b3fb..b366b3fb 100644
--- a/scripts/automation/regression/setups/trex15/benchmark.yaml
+++ b/scripts/automation/regression/setups/trex03/benchmark.yaml
diff --git a/scripts/automation/regression/setups/trex15/config.yaml b/scripts/automation/regression/setups/trex03/config.yaml
index c5fc3b22..17c4c91e 100644
--- a/scripts/automation/regression/setups/trex15/config.yaml
+++ b/scripts/automation/regression/setups/trex03/config.yaml
@@ -34,6 +34,6 @@
# expected_bw - the "golden" bandwidth (in Gbps) results planned on receiving from the test
- hostname : csi-trex-15
+ hostname : csi-trex-03
cores : 1
modes : [loopback, virt_nics, VM]
diff --git a/scripts/automation/regression/setups/trex17/benchmark.yaml b/scripts/automation/regression/setups/trex06/benchmark.yaml
index 8bc9d29c..2f51a3fd 100644
--- a/scripts/automation/regression/setups/trex17/benchmark.yaml
+++ b/scripts/automation/regression/setups/trex06/benchmark.yaml
@@ -140,7 +140,7 @@ test_CPU_benchmark:
bw_per_core : 1
- name : stl/
- kwargs : {ipg_usec: 2, loop_count: 0}
+ kwargs : {ipg_usec: 3, loop_count: 0}
cpu_util : 1
bw_per_core : 1
diff --git a/scripts/automation/regression/setups/trex17/config.yaml b/scripts/automation/regression/setups/trex06/config.yaml
index 7ad6a20a..da0eb2dd 100644
--- a/scripts/automation/regression/setups/trex17/config.yaml
+++ b/scripts/automation/regression/setups/trex06/config.yaml
@@ -34,6 +34,6 @@
# expected_bw - the "golden" bandwidth (in Gbps) results planned on receiving from the test
- hostname : csi-trex-17
+ hostname : csi-trex-06
cores : 1
modes : [loopback, virt_nics, VM]
diff --git a/scripts/automation/regression/setups/trex11/backup/benchmark.yaml b/scripts/automation/regression/setups/trex11/backup/benchmark.yaml
new file mode 100644
index 00000000..b366b3fb
--- /dev/null
+++ b/scripts/automation/regression/setups/trex11/backup/benchmark.yaml
@@ -0,0 +1,155 @@
+#### TRex benchmark configuration file ####
+### stateful ###
+ multiplier : 2.8
+ cores : 1
+ bw_per_core : 106.652
+ multiplier : 0.5
+ cores : 1
+ bw_per_core : 11.577
+ multiplier : 28
+ cores : 1
+ bw_per_core : 2.030
+ multiplier : 0.8
+ cores : 1
+ bw_per_core : 13.742
+### stateless ###
+ profiles:
+ - name : stl/
+ kwargs : {packet_len: 64}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 64, stream_count: 10}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 64, stream_count: 100}
+ cpu_util : 1
+ bw_per_core : 1
+# causes queue full
+# - name : stl/
+# kwargs : {packet_len: 64, stream_count: 1000}
+# cpu_util : 1
+# bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 128}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 256}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 512}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 1500}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 4000}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 9000}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 9000, stream_count: 10}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 9000, stream_count: 100}
+ cpu_util : 1
+ bw_per_core : 1
+# not enough memory + queue full if memory increase
+# - name : stl/
+# kwargs : {packet_len: 9000, stream_count: 1000}
+# cpu_util : 1
+# bw_per_core : 1
+ - name : stl/
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 64}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 128}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 256}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 512}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 1500}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 4000}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {packet_len: 9000}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ kwargs : {ipg_usec: 4, loop_count: 0}
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/
+ cpu_util : 1
+ bw_per_core : 1
+ - name : stl/hlt/
+ cpu_util : 1
+ bw_per_core : 1
diff --git a/scripts/automation/regression/setups/trex11/backup/config.yaml b/scripts/automation/regression/setups/trex11/backup/config.yaml
new file mode 100644
index 00000000..782b7542
--- /dev/null
+++ b/scripts/automation/regression/setups/trex11/backup/config.yaml
@@ -0,0 +1,38 @@
+#### TRex nightly test configuration file ####
+### TRex configuration:
+# hostname - can be DNS name or IP for the TRex machine for ssh to the box
+# password - root password for TRex machine
+# is_dual - should the TRex inject with -p ?
+# version_path - path to the TRex version and executable
+# cores - how many cores should be used
+# latency - rate of latency packets injected by the TRex
+# modes - list of modes (tagging) of this setup (loopback, virtual etc.)
+# * loopback - Trex works via loopback. Router and TFTP configurations may be skipped.
+# * virtual - virtual OS (accept low CPU utilization in tests)
+### Router configuration:
+# hostname - the router hostname as apears in ______# cli prefix
+# ip_address - the router's ip that can be used to communicate with
+# image - the desired imaged wished to be loaded as the router's running config
+# line_password - router password when access via Telent
+# en_password - router password when changing to "enable" mode
+# interfaces - an array of client-server pairs, representing the interfaces configurations of the router
+# configurations - an array of configurations that could possibly loaded into the router during the test.
+# The "clean" configuration is a mandatory configuration the router will load with to run the basic test bench
+### TFTP configuration:
+# hostname - the tftp hostname
+# ip_address - the tftp's ip address
+# images_path - the tftp's relative path in which the router's images are located
+### Test_misc configuration:
+# expected_bw - the "golden" bandwidth (in Gbps) results planned on receiving from the test
+ hostname : csi-trex-11
+ cores : 1
+ modes : ['loopback', 'VM', 'virt_nics']
diff --git a/scripts/automation/regression/setups/trex11/benchmark.yaml b/scripts/automation/regression/setups/trex11/benchmark.yaml
index b366b3fb..a4969d2d 100644
--- a/scripts/automation/regression/setups/trex11/benchmark.yaml
+++ b/scripts/automation/regression/setups/trex11/benchmark.yaml
@@ -1,31 +1,44 @@
#### TRex benchmark configuration file ####
-### stateful ###
+#### common templates ###
- multiplier : 2.8
- cores : 1
- bw_per_core : 106.652
+# multiplier : 2.8
+# cores : 1
+# bw_per_core : 962.464
multiplier : 0.5
cores : 1
- bw_per_core : 11.577
+ bw_per_core : 48.130
multiplier : 28
cores : 1
- bw_per_core : 2.030
+ bw_per_core : 12.699
- multiplier : 0.8
+ multiplier : 0.5
cores : 1
- bw_per_core : 13.742
+ bw_per_core : 50.561
+ multiplier : 0.5
+ cores : 1
+ bw_per_core : 19.5
+test_rx_check_http: &rx_http
+ multiplier : 1000
+ cores : 1
+ rx_sample_rate : 128
+ bw_per_core : 49.464
@@ -140,16 +153,21 @@ test_CPU_benchmark:
bw_per_core : 1
- name : stl/
- kwargs : {ipg_usec: 4, loop_count: 0}
+ kwargs : {ipg_usec: 2, loop_count: 0}
cpu_util : 1
bw_per_core : 1
- - name : stl/
- cpu_util : 1
- bw_per_core : 1
+ #- name : stl/
+ # cpu_util : 1
+ # bw_per_core : 1
- - name : stl/hlt/
- cpu_util : 1
- bw_per_core : 1
+ #- name : stl/hlt/
+ # cpu_util : 1
+ # bw_per_core : 1
+test_all_profiles :
+ mult : "5%"
+ skip : ['',''] # due to VIC 9K defect trex-282
diff --git a/scripts/automation/regression/setups/trex11/config.yaml b/scripts/automation/regression/setups/trex11/config.yaml
index 782b7542..393c8749 100644
--- a/scripts/automation/regression/setups/trex11/config.yaml
+++ b/scripts/automation/regression/setups/trex11/config.yaml
@@ -4,15 +4,16 @@
### TRex configuration:
-# hostname - can be DNS name or IP for the TRex machine for ssh to the box
-# password - root password for TRex machine
-# is_dual - should the TRex inject with -p ?
-# version_path - path to the TRex version and executable
-# cores - how many cores should be used
-# latency - rate of latency packets injected by the TRex
-# modes - list of modes (tagging) of this setup (loopback, virtual etc.)
-# * loopback - Trex works via loopback. Router and TFTP configurations may be skipped.
-# * virtual - virtual OS (accept low CPU utilization in tests)
+# hostname - can be DNS name or IP for the TRex machine for ssh to the box
+# password - root password for TRex machine
+# is_dual - should the TRex inject with -p ?
+# version_path - path to the TRex version and executable
+# cores - how many cores should be used
+# latency - rate of latency packets injected by the TRex
+# modes - list of modes (tagging) of this setup (loopback etc.)
+# * loopback - Trex works via loopback. Router and TFTP configurations may be skipped.
+# * VM - Virtual OS (accept low CPU utilization in tests, latency can get spikes)
+# * virt_nics - NICs are virtual (VMXNET3 etc.)
### Router configuration:
# hostname - the router hostname as apears in ______# cli prefix
@@ -35,4 +36,6 @@
hostname : csi-trex-11
cores : 1
- modes : ['loopback', 'VM', 'virt_nics']
+ modes : ['loopback']
diff --git a/scripts/automation/regression/stateful_tests/ b/scripts/automation/regression/stateful_tests/
new file mode 100644
index 00000000..852e745d
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/
@@ -0,0 +1,52 @@
+from .trex_general_test import CTRexGeneral_Test, CTRexScenario
+from CPlatform import CStaticRouteConfig
+from .tests_exceptions import *
+#import sys
+import time
+from import nottest
+# Testing client cfg ARP resolve. Actually, just need to check that TRex run finished with no errors.
+# If resolve will fail, TRex will exit with exit code != 0
+class CTRexClientCfg_Test(CTRexGeneral_Test):
+ """This class defines the IMIX testcase of the TRex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ # super(CTRexClientCfg_Test, self).__init__()
+ CTRexGeneral_Test.__init__(self, *args, **kwargs)
+ def setUp(self):
+ if CTRexScenario.setup_name == 'kiwi02':
+ self.skip("Can't run currently on kiwi02")
+ super(CTRexClientCfg_Test, self).setUp() # launch super test class setUp process
+ pass
+ def test_client_cfg(self):
+ # test initializtion
+ if self.is_loopback:
+ return
+ else:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+ ret = self.trex.start_trex(
+ c = 1,
+ m = 1,
+ d = 10,
+ f = 'cap2/dns.yaml',
+ v = 3,
+ client_cfg = 'automation/regression/cfg/client_cfg.yaml',
+ l = 1000)
+ trex_res = self.trex.sample_to_run_finish()
+ print(trex_res)
+ self.check_general_scenario_results(trex_res)
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateless_tests/ b/scripts/automation/regression/stateless_tests/
index acf5dc61..73dac734 100644
--- a/scripts/automation/regression/stateless_tests/
+++ b/scripts/automation/regression/stateless_tests/
@@ -241,6 +241,7 @@ class STLClient_Test(CStlGeneral_Test):
default_mult = self.get_benchmark_param('mult',default="30%")
+ skip_tests = self.get_benchmark_param('skip',default=[])
@@ -248,6 +249,17 @@ class STLClient_Test(CStlGeneral_Test):
for profile in self.profiles:
+ skip=False
+ if skip_tests:
+ for skip_test in skip_tests:
+ if skip_test in profile:
+ skip=True;
+ break;
+ if skip:
+ print("skipping testing profile due to config file {0}...\n".format(profile))
+ continue;
print("now testing profile {0}...\n".format(profile))
p1 = STLProfile.load(profile, port_id = self.tx_port)
diff --git a/scripts/automation/regression/stateless_tests/ b/scripts/automation/regression/stateless_tests/
index d28fca54..4dad712f 100644
--- a/scripts/automation/regression/stateless_tests/
+++ b/scripts/automation/regression/stateless_tests/
@@ -61,6 +61,13 @@ class STLRX_Test(CStlGeneral_Test):
'latency_9k_max_latency': 250,
+ 'rte_enic_pmd': {
+ 'rate_percent': 1,
+ 'total_pkts': 50,
+ 'rate_latency': 1,
+ 'latency_9k_enable': False,
+ },
diff --git a/scripts/automation/trex_control_plane/stf/trex_stf_lib/ b/scripts/automation/trex_control_plane/stf/trex_stf_lib/
index e9d2b8a0..5d992c6e 100755
--- a/scripts/automation/trex_control_plane/stf/trex_stf_lib/
+++ b/scripts/automation/trex_control_plane/stf/trex_stf_lib/
@@ -150,10 +150,8 @@ class CTRexClient(object):
user = user or self.__default_user
d = int(d)
- if d < 30 and not trex_development: # test duration should be at least 30 seconds, unless trex_development flag is specified.
- raise ValueError
except ValueError:
- raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.')
+ raise ValueError('d parameter must be integer, specifying how long TRex run.')
trex_cmd_options.update( {'f' : f, 'd' : d} )
if not trex_cmd_options.get('l'):
diff --git a/scripts/automation/trex_control_plane/stl/console/ b/scripts/automation/trex_control_plane/stl/console/
index b23b5f1f..b33b0447 100755
--- a/scripts/automation/trex_control_plane/stl/console/
+++ b/scripts/automation/trex_control_plane/stl/console/
@@ -202,7 +202,7 @@ class TRexConsole(TRexGeneralCmd):
func_name = f.__name__
if func_name.startswith("do_"):
func_name = func_name[3:]
if not inst.stateless_client.is_connected():
print(format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold'))
@@ -313,6 +313,7 @@ class TRexConsole(TRexGeneralCmd):
def do_shell (self, line):
+ @verify_connected
def do_push (self, line):
'''Push a local PCAP file\n'''
@@ -320,6 +321,7 @@ class TRexConsole(TRexGeneralCmd):
def help_push (self):
+ @verify_connected
def do_portattr (self, line):
'''Change/show port(s) attributes\n'''
@@ -328,6 +330,22 @@ class TRexConsole(TRexGeneralCmd):
+ def do_set_rx_sniffer (self, line):
+ '''Sets a port sniffer on RX channel as PCAP recorder'''
+ self.stateless_client.set_rx_sniffer_line(line)
+ def help_sniffer (self):
+ self.do_set_rx_sniffer("-h")
+ @verify_connected
+ def do_resolve (self, line):
+ '''Resolve ARP for ports'''
+ self.stateless_client.resolve_line(line)
+ def help_sniffer (self):
+ self.do_resolve("-h")
+ @verify_connected
def do_map (self, line):
'''Maps ports topology\n'''
ports = self.stateless_client.get_acquired_ports()
@@ -416,6 +434,7 @@ class TRexConsole(TRexGeneralCmd):
'''Release ports\n'''
+ @verify_connected
def do_reacquire (self, line):
'''reacquire all the ports under your logged user name'''
diff --git a/scripts/automation/trex_control_plane/stl/examples/ b/scripts/automation/trex_control_plane/stl/examples/
new file mode 100644
index 00000000..22cceb8f
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/examples/
@@ -0,0 +1,123 @@
+import stl_path
+from trex_stl_lib.api import *
+import imp
+import time
+import json
+from pprint import pprint
+import argparse
+# IMIX test
+# it maps the ports to sides
+# then it load a predefind profile 'IMIX'
+# and attach it to both sides and inject
+# at a certain rate for some time
+# finally it checks that all packets arrived
+def imix_test (server):
+ # create client
+ c = STLClient(server = server)
+ passed = True
+ try:
+ # connect to server
+ c.connect()
+ # take all the ports
+ c.reset()
+ dir_0 = [0]
+ dir_1 = [1]
+ print "Mapped ports to sides {0} <--> {1}".format(dir_0, dir_1)
+ # load IMIX profile
+ profile_file = os.path.join(stl_path.STL_PROFILES_PATH, '')
+ profile1 = STLProfile.load_py(profile_file, direction=0)
+ profile2 = STLProfile.load_py(profile_file, direction=1)
+ stream1 = profile1.get_streams()
+ stream2 = profile2.get_streams()
+ # add both streams to ports
+ c.add_streams(stream1, ports = dir_0)
+ c.add_streams(stream2, ports = dir_1)
+ # clear the stats before injecting
+ c.clear_stats()
+ c.start(ports = (dir_0 + dir_1), mult = "100kpps", total = True)
+ while True:
+ for rate in range(200,3100,10):
+ # choose rate and start traffic for 10 seconds on 5 mpps
+ #mult = "30%"
+ my_mult = ("%dkpps"%rate)
+ print "Injecting {0} <--> {1} on total rate of '{2}' ".format(dir_0, dir_1, my_mult)
+ c.clear_stats()
+ c.update(ports = (dir_0 + dir_1), mult = my_mult)
+ #time.sleep(1);
+ # block until done
+ #c.wait_on_traffic(ports = (dir_0 + dir_1))
+ # read the stats after the test
+ stats = c.get_stats()
+ # use this for debug info on all the stats
+ pprint(stats)
+ # sum dir 0
+ dir_0_opackets = sum([stats[i]["opackets"] for i in dir_0])
+ dir_0_ipackets = sum([stats[i]["ipackets"] for i in dir_0])
+ # sum dir 1
+ dir_1_opackets = sum([stats[i]["opackets"] for i in dir_1])
+ dir_1_ipackets = sum([stats[i]["ipackets"] for i in dir_1])
+ lost_0 = dir_0_opackets - dir_1_ipackets
+ lost_1 = dir_1_opackets - dir_0_ipackets
+ print "\nPackets injected from {0}: {1:,}".format(dir_0, dir_0_opackets)
+ print "Packets injected from {0}: {1:,}".format(dir_1, dir_1_opackets)
+ print "\npackets lost from {0} --> {1}: {2:,} pkts".format(dir_0, dir_1, lost_0)
+ print "packets lost from {0} --> {1}: {2:,} pkts".format(dir_1, dir_0, lost_1)
+ if (lost_0 <= 0) and (lost_1 <= 0): # less or equal because we might have incoming arps etc.
+ passed = True
+ else:
+ passed = False
+ except STLError as e:
+ passed = False
+ print e
+ finally:
+ c.disconnect()
+ if passed:
+ print "\nTest has passed :-)\n"
+ else:
+ print "\nTest has failed :-(\n"
+parser = argparse.ArgumentParser(description="Example for TRex Stateless, sending IMIX traffic")
+parser.add_argument('-s', '--server',
+ dest='server',
+ help='Remote trex address',
+ default='',
+ type = str)
+args = parser.parse_args()
+# run the tests
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
index 6b53e67e..cc20e088 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -12,7 +12,7 @@ from .trex_stl_types import *
from .trex_stl_async_client import CTRexAsyncClient
from .utils import parsing_opts, text_tables, common
-from .utils.common import list_intersect, list_difference, is_sub_list, PassiveTimer
+from .utils.common import *
from .utils.text_opts import *
from functools import wraps
@@ -24,6 +24,8 @@ import re
import random
import json
import traceback
+import os.path
############################ logger #############################
############################ #############################
@@ -32,9 +34,10 @@ import traceback
# logger API for the client
class LoggerApi(object):
# verbose levels
def __init__(self):
self.level = LoggerApi.VERBOSE_REGULAR
@@ -62,7 +65,7 @@ class LoggerApi(object):
# simple log message with verbose
- def log (self, msg, level = VERBOSE_REGULAR, newline = True):
+ def log (self, msg, level = VERBOSE_REGULAR_SYNC, newline = True):
if not self.check_verbose(level):
@@ -90,19 +93,20 @@ class LoggerApi(object):
# supress object getter
- def supress (self):
+ def supress (self, level = VERBOSE_QUIET):
class Supress(object):
- def __init__ (self, logger):
+ def __init__ (self, logger, level):
self.logger = logger
+ self.level = level
def __enter__ (self):
self.saved_level = self.logger.get_verbose()
- self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
+ self.logger.set_verbose(self.level)
def __exit__ (self, type, value, traceback):
- return Supress(self)
+ return Supress(self, level)
@@ -322,22 +326,25 @@ class EventsHandler(object):
# port attr changed
elif (event_type == 8):
port_id = int(data['port_id'])
- if data['attr'] == self.client.ports[port_id].attr:
- return # false alarm
- old_info = self.client.ports[port_id].get_info()
- self.__async_event_port_attr_changed(port_id, data['attr'])
- new_info = self.client.ports[port_id].get_info()
+ diff = self.__async_event_port_attr_changed(port_id, data['attr'])
+ if not diff:
+ return
ev = "port {0} attributes changed".format(port_id)
- for key, old_val in old_info.items():
- new_val = new_info[key]
- if old_val != new_val:
- ev += '\n {key}: {old} -> {new}'.format(
- key = key,
- old = old_val.lower() if type(old_val) is str else old_val,
- new = new_val.lower() if type(new_val) is str else new_val)
+ for key, (old_val, new_val) in diff.items():
+ ev += '\n {key}: {old} -> {new}'.format(
+ key = key,
+ old = old_val.lower() if type(old_val) is str else old_val,
+ new = new_val.lower() if type(new_val) is str else new_val)
show_event = True
# server stopped
elif (event_type == 100):
ev = "Server has stopped"
@@ -394,7 +401,7 @@ class EventsHandler(object):
def __async_event_port_attr_changed (self, port_id, attr):
if port_id in self.client.ports:
- self.client.ports[port_id].async_event_port_attr_changed(attr)
+ return self.client.ports[port_id].async_event_port_attr_changed(attr)
# add event to log
def __add_event_log (self, origin, ev_type, msg, show = False):
@@ -652,7 +659,7 @@ class STLClient(object):
return rc
def __add_streams(self, stream_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -801,16 +808,75 @@ class STLClient(object):
return rc
+ def __resolve (self, port_id_list = None, retries = 0):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].arp_resolve(retries))
+ return rc
def __set_port_attr (self, port_id_list = None, attr_dict = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
+ for port_id, port_attr_dict in zip(port_id_list, attr_dict):
+ rc.add(self.ports[port_id].set_attr(**port_attr_dict))
+ return rc
+ def __set_rx_sniffer (self, port_id_list, base_filename, limit):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+ for port_id in port_id_list:
+ head, tail = os.path.splitext(base_filename)
+ filename = "{0}-{1}{2}".format(head, port_id, tail)
+ rc.add(self.ports[port_id].set_rx_sniffer(filename, limit))
+ return rc
+ def __remove_rx_sniffer (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
for port_id in port_id_list:
- rc.add(self.ports[port_id].set_attr(attr_dict))
+ rc.add(self.ports[port_id].remove_rx_sniffer())
return rc
+ def __set_rx_queue (self, port_id_list, size):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].set_rx_queue(size))
+ return rc
+ def __remove_rx_queue (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_rx_queue())
+ return rc
+ def __get_rx_queue_pkts (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].get_rx_queue_pkts())
+ return rc
# connect to server
@@ -1010,7 +1076,7 @@ class STLClient(object):
if not port_id in valid_ports:
raise STLError("Port ID '{0}' is not a valid port ID - valid values: {1}".format(port_id, valid_ports))
- return port_id_list
+ return list_remove_dup(port_id_list)
# transmit request on the RPC link
@@ -1315,6 +1381,12 @@ class STLClient(object):
if port_obj.is_active()]
+ def get_resolvable_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.items()
+ if port_obj.is_acquired() and port_obj.get_dst_addr()['ipv4'] is not None]
# get paused ports
def get_paused_ports (self, owned = True):
if owned:
@@ -1340,6 +1412,7 @@ class STLClient(object):
# get stats
+ @__api_check(True)
def get_stats (self, ports = None, sync_now = True):
Return dictionary containing statistics information gathered from the server.
@@ -1587,7 +1660,7 @@ class STLClient(object):
ports = ports if ports is not None else self.get_all_ports()
ports = self._validate_port_list(ports)
- return [self.ports[port_id].get_info() for port_id in ports]
+ return [self.ports[port_id].get_formatted_info() for port_id in ports]
############################ Commands #############################
@@ -1702,6 +1775,9 @@ class STLClient(object):
raise STLError(rc)
+ for port_id in ports:
+ if not self.ports[port_id].is_resolved():
+ self.logger.log(format_text('*** Warning - Port {0} destination is unresolved ***'.format(port_id), 'bold'))
def release (self, ports = None):
@@ -1727,30 +1803,80 @@ class STLClient(object):
if not rc:
raise STLError(rc)
- def ping(self):
+ def ping_rpc_server(self):
- Pings the server
+ Pings the RPC server
- None
+ None
+ :exc:`STLError`
self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
rc = self._transmit("ping", api_class = None)
if not rc:
raise STLError(rc)
+ def ping_ip (self, src_port, dst_ipv4, pkt_size = 64, count = 5):
+ """
+ Pings an IP address through a port
+ :parameters:
+ src_port - on which port_id to send the ICMP PING request
+ dst_ipv4 - which IP to ping
+ pkt_size - packet size to use
+ count - how many times to ping
+ :raises:
+ + :exc:`STLError`
+ """
+ # validate src port
+ validate_type('src_port', src_port, int)
+ if src_port not in self.get_all_ports():
+ raise STLError("src port is not a valid port id")
+ if not is_valid_ipv4(dst_ipv4):
+ raise STLError("dst_ipv4 is not a valid IPv4 address: '{0}'".format(dst_ipv4))
+ if (pkt_size < 64) or (pkt_size > 9216):
+ raise STLError("pkt_size should be a value between 64 and 9216: '{0}'".format(pkt_size))
+ validate_type('count', count, int)
+ self.logger.pre_cmd("Pinging {0} from port {1} with {2} bytes of data:".format(dst_ipv4,
+ src_port,
+ pkt_size))
+ # no async messages
+ with self.logger.supress(level = LoggerApi.VERBOSE_REGULAR_SYNC):
+ self.logger.log('')
+ for i in range(count):
+ rc = self.ports[src_port].ping(ping_ipv4 = dst_ipv4, pkt_size = pkt_size)
+ if not rc:
+ raise STLError(rc)
+ self.logger.log(
+ if i != (count - 1):
+ time.sleep(1)
+ @__api_check(True)
def server_shutdown (self, force = False):
Sends the server a request for total shutdown
@@ -1835,7 +1961,7 @@ class STLClient(object):
- def reset(self, ports = None):
+ def reset(self, ports = None, restart = False):
Force acquire ports, stop the traffic, remove all streams and clear stats
@@ -1843,7 +1969,9 @@ class STLClient(object):
ports : list
Ports on which to execute the command
+ restart: bool
+ Restart the NICs (link down / up)
+ :exc:`STLError`
@@ -1853,12 +1981,34 @@ class STLClient(object):
ports = ports if ports is not None else self.get_all_ports()
ports = self._validate_port_list(ports)
- # force take the port and ignore any streams on it
- self.acquire(ports, force = True, sync_streams = False)
- self.stop(ports, rx_delay_ms = 0)
- self.remove_all_streams(ports)
- self.clear_stats(ports)
+ if restart:
+ self.logger.pre_cmd("Hard resetting ports {0}:".format(ports))
+ else:
+ self.logger.pre_cmd("Resetting ports {0}:".format(ports))
+ try:
+ with self.logger.supress():
+ # force take the port and ignore any streams on it
+ self.acquire(ports, force = True, sync_streams = False)
+ self.stop(ports, rx_delay_ms = 0)
+ self.remove_all_streams(ports)
+ self.clear_stats(ports)
+ self.set_port_attr(ports,
+ promiscuous = False,
+ link_up = True if restart else None,
+ rxf = 'hw')
+ self.remove_rx_sniffer(ports)
+ self.remove_rx_queue(ports)
+ except STLError as e:
+ self.logger.post_cmd(False)
+ raise e
+ self.logger.post_cmd(RC_OK())
def remove_all_streams (self, ports = None):
@@ -1997,7 +2147,25 @@ class STLClient(object):
raise STLError(rc)
+ # common checks for start API
+ def __pre_start_check (self, ports, force):
+ # verify link status
+ ports_link_down = [port_id for port_id in ports if not self.ports[port_id].is_up()]
+ if ports_link_down and not force:
+ raise STLError("Port(s) %s - link DOWN - check the connection or specify 'force'" % ports_link_down)
+ # verify ports are stopped or force stop them
+ active_ports = [port_id for port_id in ports if self.ports[port_id].is_active()]
+ if active_ports and not force:
+ raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
+ # warn if ports are not resolved
+ unresolved_ports = [port_id for port_id in ports if not self.ports[port_id].is_resolved()]
+ if unresolved_ports and not force:
+ raise STLError("Port(s) {0} have unresolved destination addresses - please resolve them or specify 'force'".format(unresolved_ports))
def start (self,
ports = None,
@@ -2052,11 +2220,10 @@ class STLClient(object):
validate_type('total', total, bool)
validate_type('core_mask', core_mask, (int, list))
- # verify link status
- ports_link_down = [port_id for port_id in ports if self.ports[port_id].attr.get('link',{}).get('up') == False]
- if not force and ports_link_down:
- raise STLError("Port(s) %s - link DOWN - check the connection or specify 'force'" % ports_link_down)
+ # some sanity checks before attempting start
+ self.__pre_start_check(ports, force)
# decode core mask argument
decoded_mask = self.__decode_core_mask(ports, core_mask)
@@ -2070,17 +2237,12 @@ class STLClient(object):
raise STLArgumentError('mult', mult)
- # verify ports are stopped or force stop them
+ # stop active ports if needed
active_ports = list(set(self.get_active_ports()).intersection(ports))
- if active_ports:
- if not force:
- raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
- else:
- rc = self.stop(active_ports)
- if not rc:
- raise STLError(rc)
+ if active_ports and force:
+ self.stop(active_ports)
# start traffic
self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
rc = self.__start(mult_obj, duration, ports, force, decoded_mask)
@@ -2117,7 +2279,7 @@ class STLClient(object):
ports = self._validate_port_list(ports)
self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports))
rc = self.__stop(ports)
@@ -2629,16 +2791,28 @@ class STLClient(object):
- def set_port_attr (self, ports = None, promiscuous = None, link_up = None, led_on = None, flow_ctrl = None):
+ def set_port_attr (self,
+ ports = None,
+ promiscuous = None,
+ link_up = None,
+ led_on = None,
+ flow_ctrl = None,
+ rxf = None,
+ ipv4 = None,
+ dest = None,
+ resolve = True):
Set port attributes
- promiscuous - True or False
- link_up - True or False
- led_on - True or False
- flow_ctrl - 0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
+ promiscuous - True or False
+ link_up - True or False
+ led_on - True or False
+ flow_ctrl - 0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
+ rxf - 'hw' for hardware rules matching packets only or 'all' all packets
+ ipv4 - configure IPv4 address for port(s). for multiple ports should be a list of IPv4 addresses in the same length of the ports array
+ dest - configure destination address for port(s) in either IPv4 or MAC format. for multiple ports should be a list in the same length of the ports array
+ resolve - if true, in case a destination address is configured as IPv4 try to resolve it
+ :exe:'STLError'
@@ -2652,29 +2826,229 @@ class STLClient(object):
validate_type('link_up', link_up, (bool, type(None)))
validate_type('led_on', led_on, (bool, type(None)))
validate_type('flow_ctrl', flow_ctrl, (int, type(None)))
- # build attributes
- attr_dict = {}
- if promiscuous is not None:
- attr_dict['promiscuous'] = {'enabled': promiscuous}
- if link_up is not None:
- attr_dict['link_status'] = {'up': link_up}
- if led_on is not None:
- attr_dict['led_status'] = {'on': led_on}
- if flow_ctrl is not None:
- attr_dict['flow_ctrl_mode'] = {'mode': flow_ctrl}
+ validate_choice('rxf', rxf, ['hw', 'all'])
+ # common attributes for all ports
+ cmn_attr_dict = {}
+ cmn_attr_dict['promiscuous'] = promiscuous
+ cmn_attr_dict['link_status'] = link_up
+ cmn_attr_dict['led_status'] = led_on
+ cmn_attr_dict['flow_ctrl_mode'] = flow_ctrl
+ cmn_attr_dict['rx_filter_mode'] = rxf
+ # each port starts with a set of the common attributes
+ attr_dict = [dict(cmn_attr_dict) for _ in ports]
+ # default value for IPv4 / dest is none for all ports
+ if ipv4 is None:
+ ipv4 = [None] * len(ports)
+ if dest is None:
+ dest = [None] * len(ports)
+ ipv4 = listify(ipv4)
+ if len(ipv4) != len(ports):
+ raise STLError("'ipv4' must be a list in the same length of ports - 'ports': {0}, 'ip': {1}".format(ports, ipv4))
+ dest = listify(dest)
+ if len(dest) != len(ports):
+ raise STLError("'dest' must be a list in the same length of ports - 'ports': {0}, 'dest': {1}".format(ports, dest))
+ # update each port attribute with ipv4
+ for addr, port_attr in zip(ipv4, attr_dict):
+ port_attr['ipv4'] = addr
+ # update each port attribute with dest
+ for addr, port_attr in zip(dest, attr_dict):
+ port_attr['dest'] = addr
- # no attributes to set
- if not attr_dict:
- return
self.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports))
rc = self.__set_port_attr(ports, attr_dict)
+ if not rc:
+ raise STLError(rc)
+ # automatic resolve
+ if resolve:
+ # find any port with a dest configured as IPv4
+ resolve_ports = [port_id for port_id, port_dest in zip(ports, dest) if is_valid_ipv4(port_dest)]
+ if resolve_ports:
+ self.resolve(ports = resolve_ports)
+ @__api_check(True)
+ def resolve (self, ports = None, retries = 0):
+ """
+ Resolves ports (ARP resolution)
+ :parameters:
+ ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ retires - how many times to retry on each port (intervals of 100 milliseconds)
+ :raises:
+ + :exe:'STLError'
+ """
+ # by default - resolve all the ports that are configured with IPv4 dest
+ ports = ports if ports is not None else self.get_resolvable_ports()
+ ports = self._validate_port_list(ports)
+ active_ports = list_intersect(ports, self.get_active_ports())
+ if active_ports:
+ raise STLError('Port(s) {0} are active, please stop them before resolving'.format(active_ports))
+ self.logger.pre_cmd('Resolving destination on port(s) {0}:'.format(ports))
+ with self.logger.supress():
+ rc = self.__resolve(ports, retries)
+ self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+ # print the ARP transaction
+ self.logger.log(rc)
+ self.logger.log('')
+ @__api_check(True)
+ def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000):
+ """
+ Sets a RX sniffer for port(s) written to a PCAP file
+ :parameters:
+ ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ base_filename - filename will be appended with '-<port_number>', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc.
+ limit - limit how many packets will be written
+ :raises:
+ + :exe:'STLError'
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+ # check arguments
+ validate_type('base_filename', base_filename, basestring)
+ validate_type('limit', limit, (int))
+ if limit <= 0:
+ raise STLError("'limit' must be a positive value")
+ self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports))
+ rc = self.__set_rx_sniffer(ports, base_filename, limit)
+ self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+ @__api_check(True)
+ def remove_rx_sniffer (self, ports = None):
+ """
+ Removes RX sniffer from port(s)
+ :raises:
+ + :exe:'STLError'
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+ self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports))
+ rc = self.__remove_rx_sniffer(ports)
+ self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+ @__api_check(True)
+ def set_rx_queue (self, ports = None, size = 1000):
+ """
+ Sets RX queue for port(s)
+ The queue is cyclic and will hold last 'size' packets
+ :parameters:
+ ports - for which ports to apply a queue
+ size - size of the queue
+ :raises:
+ + :exe:'STLError'
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+ # check arguments
+ validate_type('size', size, (int))
+ if size <= 0:
+ raise STLError("'size' must be a positive value")
+ self.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports))
+ rc = self.__set_rx_queue(ports, size)
+ self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+ @__api_check(True)
+ def remove_rx_queue (self, ports = None):
+ """
+ Removes RX queue from port(s)
+ :parameters:
+ ports - for which ports to remove the RX queue
+ :raises:
+ + :exe:'STLError'
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+ self.logger.pre_cmd("Removing RX queue on port(s) {0}:".format(ports))
+ rc = self.__remove_rx_queue(ports)
+ self.logger.post_cmd(rc)
if not rc:
raise STLError(rc)
+ @__api_check(True)
+ def get_rx_queue_pkts (self, ports = None):
+ """
+ Returns any packets queued on the RX side by the server
+ return value is a dictonary per port
+ :parameters:
+ ports - for which ports to fetch
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+ rc = self.__get_rx_queue_pkts(ports)
+ if not rc:
+ raise STLError(rc)
+ # decode the data back to the user
+ result = {}
+ for port, r in zip(ports,
+ result[port] = r
+ return result
def clear_events (self):
Clear all events
@@ -2718,10 +3092,29 @@ class STLClient(object):
def ping_line (self, line):
- '''pings the server'''
- return RC_OK()
+ '''pings the server / specific IP'''
+ # no parameters - so ping server
+ if not line:
+ self.ping_rpc_server()
+ return True
+ parser = parsing_opts.gen_parser(self,
+ "ping",
+ self.ping_line.__doc__,
+ parsing_opts.SOURCE_PORT,
+ parsing_opts.PING_IPV4,
+ parsing_opts.PKT_SIZE,
+ parsing_opts.PING_COUNT)
+ opts = parser.parse_args(line.split())
+ if not opts:
+ return opts
+ # IP ping
+ self.ping_ip(opts.source_port, opts.ping_ipv4, opts.pkt_size, opts.count)
def shutdown_line (self, line):
'''shutdown the server'''
@@ -2849,13 +3242,14 @@ class STLClient(object):
parser = parsing_opts.gen_parser(self,
- parsing_opts.PORT_LIST_WITH_ALL)
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.PORT_RESTART)
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
if not opts:
return opts
- self.reset(ports = opts.ports)
+ self.reset(ports = opts.ports, restart = opts.restart)
return RC_OK()
@@ -2891,14 +3285,19 @@ class STLClient(object):
# just for sanity - will be checked on the API as well
self.__decode_core_mask(opts.ports, core_mask)
+ # for better use experience - check this first
+ try:
+ self.__pre_start_check(opts.ports, opts.force)
+ except STLError as e:
+ msg = e.brief()
+ self.logger.log(format_text(msg, 'bold'))
+ return RC_ERR(msg)
+ # stop ports if needed
active_ports = list_intersect(self.get_active_ports(), opts.ports)
- if active_ports:
- if not opts.force:
- msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
- self.logger.log(format_text(msg, 'bold'))
- return RC_ERR(msg)
- else:
- self.stop(active_ports)
+ if active_ports and opts.force:
+ self.stop(active_ports)
# process tunables
@@ -3240,7 +3639,7 @@ class STLClient(object):
'''Sets port attributes '''
parser = parsing_opts.gen_parser(self,
- "port_attr",
+ "portattr",
@@ -3248,24 +3647,27 @@ class STLClient(object):
+ parsing_opts.RX_FILTER_MODE,
+ parsing_opts.IPV4,
+ parsing_opts.DEST
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
if not opts:
return opts
- opts.prom = parsing_opts.ON_OFF_DICT.get(opts.prom)
- = parsing_opts.UP_DOWN_DICT.get(
- opts.led = parsing_opts.ON_OFF_DICT.get(opts.led)
- opts.flow_ctrl = parsing_opts.FLOW_CTRL_DICT.get(opts.flow_ctrl)
+ opts.prom = parsing_opts.ON_OFF_DICT.get(opts.prom)
+ = parsing_opts.UP_DOWN_DICT.get(
+ opts.led = parsing_opts.ON_OFF_DICT.get(opts.led)
+ opts.flow_ctrl = parsing_opts.FLOW_CTRL_DICT.get(opts.flow_ctrl)
# if no attributes - fall back to printing the status
- if not filter(lambda x:x is not None, [opts.prom,, opts.led, opts.flow_ctrl, opts.supp]):
+ if not list(filter(lambda x:x is not None, [opts.prom,, opts.led, opts.flow_ctrl, opts.supp, opts.rx_filter_mode, opts.ipv4, opts.dest])):
self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in opts.ports)))
if opts.supp:
- info = self.ports[0].get_info() # assume for now all ports are same
+ info = self.ports[0].get_formatted_info() # assume for now all ports are same
print('Supported attributes for current NICs:')
print(' Promiscuous: yes')
@@ -3274,15 +3676,79 @@ class STLClient(object):
print(' Flow control: %s' % info['fc_supported'])
- return self.set_port_attr(opts.ports, opts.prom,, opts.led, opts.flow_ctrl)
+ self.set_port_attr(opts.ports,
+ opts.prom,
+ opts.led,
+ opts.flow_ctrl,
+ opts.rx_filter_mode,
+ opts.ipv4,
+ opts.dest)
+ @__console
+ def set_rx_sniffer_line (self, line):
+ '''Sets a port sniffer on RX channel in form of a PCAP file'''
+ parser = parsing_opts.gen_parser(self,
+ "set_rx_sniffer",
+ self.set_rx_sniffer_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.OUTPUT_FILENAME,
+ parsing_opts.LIMIT,
+ parsing_opts.ALL_FILES)
+ opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
+ if not opts:
+ return opts
+ rxf = 'all' if opts.all else None
+ if rxf:
+ self.set_port_attr(opts.ports, rxf = rxf)
+ self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit)
+ return RC_OK()
+ def resolve_line (self, line):
+ '''Performs a port ARP resolution'''
+ parser = parsing_opts.gen_parser(self,
+ "resolve",
+ self.resolve_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.RETRIES)
+ opts = parser.parse_args(line.split(), default_ports = self.get_resolvable_ports(), verify_acquired = True)
+ if not opts:
+ return opts
+ ports = list_intersect(opts.ports, self.get_resolvable_ports())
+ if not ports:
+ if not opts.ports:
+ msg = 'resolve - no ports with IPv4 destination'
+ else:
+ msg = 'pause - none of ports {0} are configured with IPv4 destination'.format(opts.ports)
+ self.logger.log(msg)
+ return RC_ERR(msg)
+ self.resolve(ports = ports, retries = opts.retries)
+ return RC_OK()
+ @__console
def show_profile_line (self, line):
'''Shows profile information'''
parser = parsing_opts.gen_parser(self,
- "port",
+ "profile",
@@ -3378,3 +3844,6 @@ class STLClient(object):
return "{0} {1}>".format(prefix, self.get_acquired_ports())
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
index 1461fcec..93a930e4 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -172,6 +172,10 @@ class JsonRpcClient(object):
return RC_ERR("*** [RPC] - Failed to send message to server")
+ except KeyboardInterrupt as e:
+ # must restore the socket to a sane state
+ self.reconnect()
+ raise e
tries = 0
while True:
@@ -184,6 +188,10 @@ class JsonRpcClient(object):
return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport))
+ except KeyboardInterrupt as e:
+ # must restore the socket to a sane state
+ self.reconnect()
+ raise e
return response
@@ -255,11 +263,6 @@ class JsonRpcClient(object):
self.connected = True
- rc = self.invoke_rpc_method('ping', api_class = None)
- if not rc:
- self.connected = False
- return rc
return RC_OK()
@@ -267,12 +270,6 @@ class JsonRpcClient(object):
# connect using current values
return self.connect()
- if not self.connected:
- return RC_ERR("Not connected to server")
- # reconnect
- return self.connect(self.server, self.port)
def is_connected(self):
return self.connected
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
index cec3761f..ef74a85e 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -4,12 +4,14 @@ from collections import namedtuple, OrderedDict
from .trex_stl_packet_builder_scapy import STLPktBuilder
from .trex_stl_streams import STLStream
from .trex_stl_types import *
+from .trex_stl_rx_features import ARPResolver, PingResolver
from . import trex_stl_stats
from .utils.constants import FLOW_CTRL_DICT_REVERSED
import base64
import copy
from datetime import datetime, timedelta
+import threading
StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
@@ -50,7 +52,9 @@ class Port(object):
def __init__ (self, port_id, user, comm_link, session_id, info):
self.port_id = port_id
self.state = self.STATE_IDLE
self.handler = None
self.comm_link = comm_link
self.transmit = comm_link.transmit
@@ -62,7 +66,7 @@ class Port(object):
self.streams = {}
self.profile = None
self.session_id = session_id
- self.attr = {}
+ self.status = {}
self.port_stats = trex_stl_stats.CPortStats(self)
@@ -72,31 +76,31 @@ class Port(object):
self.owner = ''
self.last_factor_type = None
+ self.__attr = {}
+ self.attr_lock = threading.Lock()
# decorator to verify port is up
def up(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
if not port.is_up():
return port.err("{0} - port is down".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
# owned
def owned(func):
- def func_wrapper(*args):
+ def func_wrapper(*args, **kwargs):
port = args[0]
- if not port.is_up():
- return port.err("{0} - port is down".format(func.__name__))
if not port.is_acquired():
return port.err("{0} - port is not owned".format(func.__name__))
- return func(*args)
+ return func(*args, **kwargs)
return func_wrapper
@@ -106,9 +110,6 @@ class Port(object):
def func_wrapper(*args, **kwargs):
port = args[0]
- if not port.is_up():
- return port.err("{0} - port is down".format(func.__name__))
if not port.is_acquired():
return port.err("{0} - port is not owned".format(func.__name__))
@@ -128,16 +129,16 @@ class Port(object):
return RC_OK(data)
def get_speed_bps (self):
- return (['speed'] * 1000 * 1000 * 1000)
+ return (self.get_speed_gbps() * 1000 * 1000 * 1000)
- def get_formatted_speed (self):
- return "{0} Gbps".format(['speed'])
+ def get_speed_gbps (self):
+ return self.__attr['speed']
def is_acquired(self):
return (self.handler != None)
def is_up (self):
- return (self.state != self.STATE_DOWN)
+ return self.__attr['link']['up']
def is_active(self):
return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
@@ -165,7 +166,6 @@ class Port(object):
# take the port
- @up
def acquire(self, force = False, sync_streams = True):
params = {"port_id": self.port_id,
"user": self.user,
@@ -185,7 +185,6 @@ class Port(object):
# sync all the streams with the server
- @up
def sync_streams (self):
params = {"port_id": self.port_id}
@@ -201,7 +200,6 @@ class Port(object):
return self.ok()
# release the port
- @up
def release(self):
params = {"port_id": self.port_id,
"handler": self.handler}
@@ -219,7 +217,6 @@ class Port(object):
- @up
def sync(self):
params = {"port_id": self.port_id}
@@ -250,10 +247,10 @@ class Port(object):
self.next_available_id = int(['max_stream_id']) + 1
- # attributes
- self.attr =['attr']
- if 'speed' in
-['speed'] =['speed'] // 1000
+ self.status =
+ # replace the attributes in a thread safe manner
+ self.set_ts_attr(['attr'])
return self.ok()
@@ -424,8 +421,8 @@ class Port(object):
# save this for TUI
self.last_factor_type = mul['type']
- return self.ok()
+ return rc
# stop traffic
@@ -445,8 +442,9 @@ class Port(object):
return self.err(rc.err())
self.state = self.STATE_STREAMS
self.last_factor_type = None
# timestamp for last tx
self.tx_stopped_ts =
@@ -487,6 +485,101 @@ class Port(object):
return self.ok()
+ @owned
+ def set_rx_sniffer (self, pcap_filename, limit):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": True,
+ "pcap_filename": pcap_filename,
+ "limit": limit}
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+ return self.ok()
+ @owned
+ def remove_rx_sniffer (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "capture",
+ "enabled": False}
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+ return self.ok()
+ @owned
+ def set_arp_resolution (self, ipv4, mac):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "ipv4": ipv4,
+ "mac": mac}
+ rc = self.transmit("set_arp_resolution", params)
+ if rc.bad():
+ return self.err(rc.err())
+ return self.ok()
+ @owned
+ def set_rx_queue (self, size):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": True,
+ "size": size}
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+ return self.ok()
+ @owned
+ def remove_rx_queue (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": False}
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+ return self.ok()
+ @owned
+ def get_rx_queue_pkts (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+ rc = self.transmit("get_rx_queue_pkts", params)
+ if rc.bad():
+ return self.err(rc.err())
+ pkts =['pkts']
+ # decode the packets from base64 to binary
+ for i in range(len(pkts)):
+ pkts[i]['binary'] = base64.b64decode(pkts[i]['binary'])
+ return RC_OK(pkts)
def pause (self):
@@ -568,21 +661,44 @@ class Port(object):
- def set_attr (self, attr_dict):
+ def set_attr (self, **kwargs):
+ json_attr = {}
+ if kwargs.get('promiscuous') is not None:
+ json_attr['promiscuous'] = {'enabled': kwargs.get('promiscuous')}
+ if kwargs.get('link_status') is not None:
+ json_attr['link_status'] = {'up': kwargs.get('link_status')}
+ if kwargs.get('led_status') is not None:
+ json_attr['led_status'] = {'on': kwargs.get('led_status')}
+ if kwargs.get('flow_ctrl_mode') is not None:
+ json_attr['flow_ctrl_mode'] = {'on': kwargs.get('flow_ctrl_mode')}
+ if kwargs.get('rx_filter_mode') is not None:
+ json_attr['rx_filter_mode'] = {'mode': kwargs.get('rx_filter_mode')}
+ if kwargs.get('ipv4') is not None:
+ json_attr['ipv4'] = {'addr': kwargs.get('ipv4')}
+ if kwargs.get('dest') is not None:
+ json_attr['dest'] = {'addr': kwargs.get('dest')}
params = {"handler": self.handler,
"port_id": self.port_id,
- "attr": attr_dict}
+ "attr": json_attr}
rc = self.transmit("set_port_attr", params)
if rc.bad():
return self.err(rc.err())
+ # update the dictionary from the server explicitly
+ return self.sync()
- #self.attr.update(attr_dict)
- return self.ok()
def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler):
@@ -607,7 +723,16 @@ class Port(object):
def get_profile (self):
return self.profile
+ # invalidates the current ARP
+ def invalidate_arp (self):
+ dest = self.__attr['dest']
+ if dest['type'] != 'mac':
+ return self.set_attr(dest = dest['ipv4'])
+ else:
+ return self.ok()
def print_profile (self, mult, duration):
if not self.get_profile():
@@ -648,24 +773,32 @@ class Port(object):
- # generate port info
- def get_info (self):
+ # generate formatted (console friendly) port info
+ def get_formatted_info (self, sync = True):
+ # sync the status
+ if sync:
+ self.sync()
+ # get a copy of the current attribute set (safe against manipulation)
+ attr = self.get_ts_attr()
info = dict(
info['status'] = self.get_port_state_name()
- if 'link' in self.attr:
- info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN'
+ if 'link' in attr:
+ info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
info['link'] = 'N/A'
- if 'fc' in self.attr:
- info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A')
+ if 'fc' in attr:
+ info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
info['fc'] = 'N/A'
- if 'promiscuous' in self.attr:
- info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off"
+ if 'promiscuous' in attr:
+ info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
info['prom'] = "N/A"
@@ -692,34 +825,122 @@ class Port(object):
info['is_virtual'] = 'N/A'
+ # speed
+ info['speed'] = self.get_speed_gbps()
+ # RX filter mode
+ info['rx_filter_mode'] = 'hardware match' if attr['rx_filter_mode'] == 'hw' else 'fetch all'
+ # src MAC and IPv4
+ info['src_mac'] = attr['src_mac']
+ info['src_ipv4'] = attr['src_ipv4']
+ if info['src_ipv4'] is None:
+ info['src_ipv4'] = 'Not Configured'
+ # dest
+ dest = attr['dest']
+ if dest['type'] == 'mac':
+ info['dest'] = dest['mac']
+ info['arp'] = '-'
+ elif dest['type'] == 'ipv4':
+ info['dest'] = dest['ipv4']
+ info['arp'] = dest['arp']
+ elif dest['type'] == 'ipv4_u':
+ info['dest'] = dest['ipv4']
+ info['arp'] = 'unresolved'
+ # RX info
+ rx_info = self.status['rx_info']
+ # RX sniffer
+ sniffer = rx_info['sniffer']
+ info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
+ # RX queue
+ queue = rx_info['queue']
+ info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
return info
def get_port_state_name(self):
return self.STATES_MAP.get(self.state, "Unknown")
+ def get_src_addr (self):
+ src_mac = self.__attr['src_mac']
+ src_ipv4 = self.__attr['src_ipv4']
+ return {'mac': src_mac, 'ipv4': src_ipv4}
+ def get_dst_addr (self):
+ dest = self.__attr['dest']
+ if dest['type'] == 'mac':
+ return {'ipv4': None, 'mac': dest['mac']}
+ elif dest['type'] == 'ipv4':
+ return {'ipv4': dest['ipv4'], 'mac': dest['arp']}
+ elif dest['type'] == 'ipv4_u':
+ return {'ipv4': dest['ipv4'], 'mac': None}
+ else:
+ assert(0)
+ # port is considered resolved if it's dest is either MAC or resolved IPv4
+ def is_resolved (self):
+ return (self.get_dst_addr()['mac'] is not None)
+ # return True if the port is valid for resolve (has an IPv4 address as dest)
+ def is_resolvable (self):
+ return (self.get_dst_addr()['ipv4'] is not None)
+ @writeable
+ def arp_resolve (self, retries):
+ return ARPResolver(self).resolve(retries)
+ @writeable
+ def ping (self, ping_ipv4, pkt_size):
+ return PingResolver(self, ping_ipv4, pkt_size).resolve()
################# stats handler ######################
def generate_port_stats(self):
return self.port_stats.generate_stats()
def generate_port_status(self):
- info = self.get_info()
+ info = self.get_formatted_info()
- return {"driver": info['driver'],
- "description": info.get('description', 'N/A')[:18],
- "HW src mac": info['hw_macaddr'],
- "SW src mac": info['src_macaddr'],
- "SW dst mac": info['dst_macaddr'],
- "PCI Address": info['pci_addr'],
- "NUMA Node": info['numa'],
+ return {"driver": info['driver'],
+ "description": info.get('description', 'N/A')[:18],
+ "src MAC": info['src_mac'],
+ "src IPv4": info['src_ipv4'],
+ "Destination": info['dest'],
+ "ARP Resolution": info['arp'],
+ "PCI Address": info['pci_addr'],
+ "NUMA Node": info['numa'],
"--": "",
"---": "",
- "link speed": "{speed} Gb/s".format(speed=info['speed']),
+ "----": "",
+ "-----": "",
+ "link speed": info['speed'],
"port status": info['status'],
"link status": info['link'],
"promiscuous" : info['prom'],
"flow ctrl" : info['fc'],
+ "RX Filter Mode": info['rx_filter_mode'],
+ "RX Queueing": info['rx_queue'],
+ "RX sniffer": info['rx_sniffer'],
def clear_stats(self):
@@ -756,17 +977,54 @@ class Port(object):
return {"streams" : OrderedDict(sorted(data.items())) }
+ ######## attributes are a complex type (dict) that might be manipulated through the async thread #############
+ # get in a thread safe manner a duplication of attributes
+ def get_ts_attr (self):
+ with self.attr_lock:
+ return dict(self.__attr)
+ # set in a thread safe manner a new dict of attributes
+ def set_ts_attr (self, new_attr):
+ with self.attr_lock:
+ self.__attr = new_attr
################# events handler ######################
def async_event_port_job_done (self):
# until thread is locked - order is important
self.tx_stopped_ts =
self.state = self.STATE_STREAMS
self.last_factor_type = None
- def async_event_port_attr_changed (self, attr):
-['speed'] = attr['speed'] // 1000
- self.attr = attr
+ def async_event_port_attr_changed (self, new_attr):
+ # get a thread safe duplicate
+ cur_attr = self.get_ts_attr()
+ # check if anything changed
+ if new_attr == cur_attr:
+ return None
+ # generate before
+ before = self.get_formatted_info(sync = False)
+ # update
+ self.set_ts_attr(new_attr)
+ # generate after
+ after = self.get_formatted_info(sync = False)
+ # return diff
+ diff = {}
+ for key, new_value in after.items():
+ old_value = before.get(key, 'N/A')
+ if new_value != old_value:
+ diff[key] = (old_value, new_value)
+ return diff
# rest of the events are used for TUI / read only sessions
def async_event_port_stopped (self):
@@ -792,3 +1050,4 @@ class Port(object):
def async_event_released (self):
self.owner = ''
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
new file mode 100644
index 00000000..3754e608
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -0,0 +1,255 @@
+from .trex_stl_streams import STLStream, STLTXSingleBurst
+from .trex_stl_packet_builder_scapy import STLPktBuilder
+from scapy.layers.l2 import Ether, ARP
+from scapy.layers.inet import IP, ICMP
+import time
+# a generic abstract class for resolving using the server
+class Resolver(object):
+ def __init__ (self, port, queue_size = 100):
+ self.port = port
+ # code to execute before sending any request - return RC object
+ def pre_send (self):
+ raise NotImplementedError()
+ # return a list of streams for request
+ def generate_request (self):
+ raise NotImplementedError()
+ # return None for more packets otherwise RC object
+ def on_pkt_rx (self, pkt):
+ raise NotImplementedError()
+ # return value in case of timeout
+ def on_timeout_err (self, retries):
+ raise NotImplementedError()
+ ##################### API ######################
+ def resolve (self, retries = 0):
+ # first cleanup
+ rc = self.port.remove_all_streams()
+ if not rc:
+ return rc
+ # call the specific class implementation
+ rc = self.pre_send()
+ if not rc:
+ return rc
+ # start the iteration
+ try:
+ # add the stream(s)
+ self.port.add_streams(self.generate_request())
+ rc = self.port.set_attr(rx_filter_mode = 'all')
+ if not rc:
+ return rc
+ rc = self.port.set_rx_queue(size = 100)
+ if not rc:
+ return rc
+ return self.resolve_wrapper(retries)
+ finally:
+ # best effort restore
+ self.port.set_attr(rx_filter_mode = 'hw')
+ self.port.remove_rx_queue()
+ self.port.remove_all_streams()
+ # main resolve function
+ def resolve_wrapper (self, retries):
+ # retry for 'retries'
+ index = 0
+ while True:
+ rc = self.resolve_iteration()
+ if rc is not None:
+ return rc
+ if index >= retries:
+ return self.on_timeout_err(retries)
+ index += 1
+ time.sleep(0.1)
+ def resolve_iteration (self):
+ mult = {'op': 'abs', 'type' : 'percentage', 'value': 100}
+ rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff)
+ if not rc:
+ return rc
+ # save the start timestamp
+ self.start_ts =['ts']
+ # block until traffic finishes
+ while self.port.is_active():
+ time.sleep(0.01)
+ return self.wait_for_rx_response()
+ def wait_for_rx_response (self):
+ # we try to fetch response for 5 times
+ polling = 5
+ while polling > 0:
+ # fetch the queue
+ rx_pkts = self.port.get_rx_queue_pkts()
+ # might be an error
+ if not rx_pkts:
+ return rx_pkts
+ # for each packet - examine it
+ for pkt in
+ rc = self.on_pkt_rx(pkt)
+ if rc is not None:
+ return rc
+ if polling == 0:
+ return None
+ polling -= 1
+ time.sleep(0.1)
+class ARPResolver(Resolver):
+ def __init__ (self, port_id):
+ super(ARPResolver, self).__init__(port_id)
+ # before resolve
+ def pre_send (self):
+ self.dst = self.port.get_dst_addr()
+ self.src = self.port.get_src_addr()
+ if self.dst['ipv4'] is None:
+ return self.port.err("Port has a non-IPv4 destination: '{0}'".format(self.dst['mac']))
+ if self.src['ipv4'] is None:
+ return self.port.err('Port must have an IPv4 source address configured')
+ # invalidate the current ARP resolution (if exists)
+ return self.port.invalidate_arp()
+ # return a list of streams for request
+ def generate_request (self):
+ base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac'])
+ s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+ return [s1]
+ # return None in case more packets are needed else the status rc
+ def on_pkt_rx (self, pkt):
+ scapy_pkt = Ether(pkt['binary'])
+ if not 'ARP' in scapy_pkt:
+ return None
+ arp = scapy_pkt['ARP']
+ # check this is the right ARP (ARP reply with the address)
+ if (arp.op != 2) or (arp.psrc != self.dst['ipv4']):
+ return None
+ rc = self.port.set_arp_resolution(arp.psrc, arp.hwsrc)
+ if not rc:
+ return rc
+ return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc))
+ def on_timeout_err (self, retries):
+ return self.port.err('failed to receive ARP response ({0} retries)'.format(retries))
+ #################### ping resolver ####################
+class PingResolver(Resolver):
+ def __init__ (self, port, ping_ip, pkt_size):
+ super(PingResolver, self).__init__(port)
+ self.ping_ip = ping_ip
+ self.pkt_size = pkt_size
+ def pre_send (self):
+ self.src = self.port.get_src_addr()
+ self.dst = self.port.get_dst_addr()
+ if self.src['ipv4'] is None:
+ return self.port.err('Ping - port does not have an IPv4 address configured')
+ if self.dst['mac'] is None:
+ return self.port.err('Ping - port has an unresolved destination, cannot determine next hop MAC address')
+ if self.ping_ip == self.src['ipv4']:
+ return self.port.err('Ping - cannot ping own IP')
+ return self.port.ok()
+ # return a list of streams for request
+ def generate_request (self):
+ src = self.port.get_src_addr()
+ dst = self.port.get_dst_addr()
+ base_pkt = Ether(dst = dst['mac'])/IP(src = src['ipv4'], dst = self.ping_ip)/ICMP(type = 8)
+ pad = max(0, self.pkt_size - len(base_pkt))
+ base_pkt = base_pkt / (pad * 'x')
+ #base_pkt.show2()
+ s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+ return [s1]
+ # return None for more packets otherwise RC object
+ def on_pkt_rx (self, pkt):
+ scapy_pkt = Ether(pkt['binary'])
+ if not 'ICMP' in scapy_pkt:
+ return None
+ #scapy_pkt.show2()
+ ip = scapy_pkt['IP']
+ icmp = scapy_pkt['ICMP']
+ dt = pkt['ts'] - self.start_ts
+ if icmp.type == 0:
+ # echo reply
+ return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl))
+ # unreachable
+ elif icmp.type == 3:
+ return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src))
+ else:
+ scapy_pkt.show2()
+ return self.port.err('unknown ICMP reply')
+ # return the str of a timeout err
+ def on_timeout_err (self, retries):
+ return self.port.ok('Request timed out.')
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
index 9f601484..6a59126f 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -670,12 +670,18 @@ class CTRexInfoGenerator(object):
("promiscuous", []),
("flow ctrl", []),
("--", []),
- ("HW src mac", []),
- ("SW src mac", []),
- ("SW dst mac", []),
+ ("src IPv4", []),
+ ("src MAC", []),
("---", []),
+ ("Destination", []),
+ ("ARP Resolution", []),
+ ("----", []),
("PCI Address", []),
("NUMA Node", []),
+ ("-----", []),
+ ("RX Filter Mode", []),
+ ("RX Queueing", []),
+ ("RX sniffer", []),
@@ -1103,13 +1109,7 @@ class CPortStats(CTRexStats):
port_state = format_text(port_state, 'bold')
if self._port_obj:
- if 'link' in self._port_obj.attr:
- if self._port_obj.attr.get('link', {}).get('up') == False:
- link_state = format_text('DOWN', 'red', 'bold')
- else:
- link_state = 'UP'
- else:
- link_state = 'N/A'
+ link_state = 'UP' if self._port_obj.is_up() else format_text('DOWN', 'red', 'bold')
link_state = ''
@@ -1130,7 +1130,7 @@ class CPortStats(CTRexStats):
return {"owner": owner,
"state": "{0}".format(port_state),
'link': link_state,
- "speed": self._port_obj.get_formatted_speed() if self._port_obj else '',
+ "speed": "%g Gb/s" % self._port_obj.get_speed_gbps() if self._port_obj else '',
"CPU util.": "{0} {1}%".format(self.get_trend_gui("m_cpu_util", use_raw = True),
format_threshold(round_float(self.get("m_cpu_util")), [85, 100], [0, 85])) if self._port_obj else '' ,
"--": " ",
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
index aa6c4218..81015ddc 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/
@@ -135,6 +135,12 @@ def validate_type(arg_name, arg, valid_types):
raise STLError('validate_type: valid_types should be type or list or tuple of types')
+def validate_choice (arg_name, arg, choices):
+ if arg is not None and not arg in choices:
+ raise STLError("validate_choice: argument '{0}' can only be one of '{1}'".format(arg_name, choices))
# throws STLError if not exactly one argument is present
def verify_exclusive_arg (args_list):
if not (len(list(filter(lambda x: x is not None, args_list))) == 1):
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
index 72ee8972..cbbacb27 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
@@ -3,6 +3,8 @@ import sys
import string
import random
import time
+import socket
+import re
import pwd
@@ -86,3 +88,23 @@ class PassiveTimer(object):
return (time.time() > self.expr_sec)
+def is_valid_ipv4 (addr):
+ try:
+ socket.inet_pton(socket.AF_INET, addr)
+ return True
+ except (socket.error, TypeError):
+ return False
+def is_valid_mac (mac):
+ return bool(re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()))
+def list_remove_dup (l):
+ tmp = list()
+ for x in l:
+ if not x in tmp:
+ tmp.append(x)
+ return tmp
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
index 34cafd79..7ae22e89 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/
@@ -1,6 +1,6 @@
import argparse
from collections import namedtuple, OrderedDict
-from .common import list_intersect, list_difference
+from .common import list_intersect, list_difference, is_valid_ipv4, is_valid_mac, list_remove_dup
from .text_opts import format_text
from ..trex_stl_types import *
from .constants import ON_OFF_DICT, UP_DOWN_DICT, FLOW_CTRL_DICT
@@ -45,6 +45,21 @@ FLOW_CTRL = 28
+LIMIT = 33
+IPV4 = 35
+DEST = 36
+PING_IPV4 = 40
+PKT_SIZE = 42
@@ -218,8 +233,30 @@ def is_valid_file(filename):
return filename
+def check_ipv4_addr (ipv4_str):
+ if not is_valid_ipv4(ipv4_str):
+ raise argparse.ArgumentTypeError("invalid IPv4 address: '{0}'".format(ipv4_str))
+ return ipv4_str
+def check_pkt_size (pkt_size):
+ try:
+ pkt_size = int(pkt_size)
+ except ValueError:
+ raise argparse.ArgumentTypeError("invalid packet size type: '{0}'".format(pkt_size))
+ if (pkt_size < 64) or (pkt_size > 9216):
+ raise argparse.ArgumentTypeError("invalid packet size: '{0}' - valid range is 64 to 9216".format(pkt_size))
+ return pkt_size
+def check_dest_addr (addr):
+ if not (is_valid_ipv4(addr) or is_valid_mac(addr)):
+ raise argparse.ArgumentTypeError("not a valid IPv4 or MAC address: '{0}'".format(addr))
+ return addr
def decode_tunables (tunable_str):
tunables = {}
@@ -304,6 +341,62 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'dest': 'flow_ctrl',
'choices': FLOW_CTRL_DICT}),
+ RX_FILTER_MODE: ArgumentPack(['--rxf'],
+ {'help': 'Set RX filtering mode',
+ 'dest': 'rx_filter_mode',
+ 'choices': ['hw', 'all']}),
+ IPV4: ArgumentPack(['--ipv4'],
+ {'help': 'IPv4 address(s) for the port(s)',
+ 'dest': 'ipv4',
+ 'nargs': '+',
+ 'default': None,
+ 'type': check_ipv4_addr}),
+ DEST: ArgumentPack(['--dest'],
+ {'help': 'Destination address(s) for the port(s) in either IPv4 or MAC format',
+ 'dest': 'dest',
+ 'nargs': '+',
+ 'default': None,
+ 'type': check_dest_addr}),
+ RETRIES: ArgumentPack(['-r', '--retries'],
+ {'help': 'retries count [default is zero]',
+ 'dest': 'retries',
+ 'default': 0,
+ 'type': int}),
+ OUTPUT_FILENAME: ArgumentPack(['-o', '--output'],
+ {'help': 'Output PCAP filename',
+ 'dest': 'output_filename',
+ 'default': None,
+ 'required': True,
+ 'type': str}),
+ PORT_RESTART: ArgumentPack(['-r', '--restart'],
+ {'help': 'hard restart port(s)',
+ 'dest': 'restart',
+ 'default': False,
+ 'action': 'store_true'}),
+ ALL_FILES: ArgumentPack(['--all'],
+ {'help': 'change RX port filter to fetch all packets',
+ 'dest': 'all',
+ 'default': False,
+ 'action': "store_true"}),
+ LIMIT: ArgumentPack(['-l', '--limit'],
+ {'help': 'Limit the packet count to be written to the file',
+ 'dest': 'limit',
+ 'default': 1000,
+ 'type': int}),
SUPPORTED: ArgumentPack(['--supp'],
{'help': 'Show which attributes are supported by current NICs',
'default': None,
@@ -326,6 +419,32 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'help': "A list of ports on which to apply the command",
'default': []}),
+ SOURCE_PORT: ArgumentPack(['--port', '-p'],
+ {'dest':'source_port',
+ 'type': int,
+ 'help': 'source port for the action',
+ 'required': True}),
+ PING_IPV4: ArgumentPack(['-d'],
+ {'help': 'which IPv4 to ping',
+ 'dest': 'ping_ipv4',
+ 'required': True,
+ 'type': check_ipv4_addr}),
+ PING_COUNT: ArgumentPack(['-n', '--count'],
+ {'help': 'How many times to ping [default is 5]',
+ 'dest': 'count',
+ 'default': 5,
+ 'type': int}),
+ PKT_SIZE: ArgumentPack(['-s'],
+ {'dest':'pkt_size',
+ 'help': 'packet size to use',
+ 'default': 64,
+ 'type': check_pkt_size}),
ALL_PORTS: ArgumentPack(['-a'],
{"action": "store_true",
"dest": "all_ports",
@@ -461,6 +580,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'required': False}),
{'required': True}),
@@ -531,6 +651,8 @@ class CCmdArgParser(argparse.ArgumentParser):
opts.ports = default_ports
+ opts.ports = list_remove_dup(opts.ports)
# so maybe we have ports configured
invalid_ports = list_difference(opts.ports, self.stateless_client.get_all_ports())
if invalid_ports:
diff --git a/scripts/t-rex-64 b/scripts/t-rex-64
index 5515ba03..fc8318a5 100755
--- a/scripts/t-rex-64
+++ b/scripts/t-rex-64
@@ -26,7 +26,7 @@ cd $(dirname $0)
#Add dummy lib in case we don't find it, e.g. there is no OFED installed
-if ldd _t-rex-64 | grep "" | grep -q "not found"; then
+if ldd _$(basename $0) | grep "" | grep -q "not found"; then
export LD_LIBRARY_PATH=$PWD:$PWD/dumy_libs
diff --git a/scripts/t-rex-64-valgrind b/scripts/t-rex-64-valgrind
new file mode 100755
index 00000000..d11491ba
--- /dev/null
+++ b/scripts/t-rex-64-valgrind
@@ -0,0 +1,69 @@
+#! /bin/bash
+if [ "$(id -u)" != 0 ]; then
+ echo 'Error: Please run as root (sudo etc.)'
+ exit -1
+INPUT_ARGS=${@//[–—]/-} # replace bizarre minuses with normal one
+./trex-cfg $INPUT_ARGS
+if [ $RESULT -ne 0 ]; then
+ exit $RESULT
+pci_desc_re='^(\S+) - (.+)$'
+while read line
+ if [[ "$line" =~ $pci_desc_re ]]; then
+ pci_name="pci$(echo ${BASH_REMATCH[1]} | tr ':' '_' | tr '.' '_')" # make alphanumeric name
+ export $pci_name="${BASH_REMATCH[2]}"
+ fi
+done <<< "$($PYTHON --dump-pci-description)"
+cd $(dirname $0)
+#Add dummy lib in case we don't find it, e.g. there is no OFED installed
+if ldd ./_t-rex-64 | grep "" | grep -q "not found"; then
+export LD_LIBRARY_PATH=$PWD:$PWD/dumy_libs
+export VALGRIND_LIB=/auto/proj-pcube-b/apps/PL-b/tools/valgrind-dpdk/lib/valgrind
+export VALGRIND_BIN="/auto/proj-pcube-b/apps/PL-b/tools/valgrind-dpdk/bin/valgrind --leak-check=full --error-exitcode=1 --suppressions=valgrind.sup"
+if [ -t 0 ] && [ -t 1 ]; then
+ export is_tty=true
+ saveterm="$(stty -g)"
+ export is_tty=false
+# if we have a new core run optimized trex
+if grep -q avx /proc/cpuinfo ; then
+ if [ $RESULT -eq 132 ]; then
+ echo " WARNING this program is optimized for the new Intel processors. "
+ echo " try the ./t-rex-64-o application that should work for any Intel processor but might be slower. "
+ echo " try to run t-rex-64-o .. "
+ fi
+if $is_tty; then
+ stty $saveterm
+if [ $RESULT -ne 0 ]; then
+ exit $RESULT
diff --git a/scripts/ b/scripts/
index fabe6d68..1824d073 100755
--- a/scripts/
+++ b/scripts/
@@ -58,8 +58,8 @@ def isnum (x):
def find_trex_pid ():
procs = [x for x in os.listdir('/proc/') if isnum(x)]
for proc in procs:
- cmd = open('/proc/{0}/{1}'.format(proc, 'cmdline')).readline()
- if '_t-rex' in cmd:
+ cmd = open('/proc/{0}/{1}'.format(proc, 'comm')).readline()
+ if cmd.startswith('_t-rex-64'):
return proc
return None
diff --git a/scripts/valgrind.sup b/scripts/valgrind.sup
new file mode 100644
index 00000000..b6bcc883
--- /dev/null
+++ b/scripts/valgrind.sup
@@ -0,0 +1,57 @@
+ DL issue
+ Memcheck:Cond
+ fun:index
+ fun:expand_dynamic_string_token
+ fun:fillin_rpath
+ fun:_dl_init_paths
+ fun:dl_main
+ fun:_dl_sysdep_start
+ fun:_dl_start_final
+ fun:_dl_start
+ obj:/lib/x86_64-linux-gnu/
+ obj:*
+ obj:*
+ obj:*
+ obj:*
+ DPDK threads
+ Memcheck:Leak
+ match-leak-kinds: possible
+ fun:calloc
+ fun:allocate_dtv
+ fun:_dl_allocate_tls
+ fun:allocate_stack
+ fun:pthread_create@@GLIBC_2.2.5
+ fun:rte_eal_init
+ fun:_Z9main_testiPPc
+ fun:(below main)
+ DPDK interrupt thread
+ Memcheck:Leak
+ match-leak-kinds: possible
+ fun:calloc
+ fun:allocate_dtv
+ fun:_dl_allocate_tls
+ fun:allocate_stack
+ fun:pthread_create@@GLIBC_2.2.5
+ fun:rte_eal_intr_init
+ fun:rte_eal_init
+ fun:_Z9main_testiPPc
+ fun:(below main)
+ DPDK epoll ctl
+ Memcheck:Param
+ epoll_ctl(event)
+ fun:epoll_ctl
+ fun:eal_intr_thread_main
+ fun:start_thread
+ fun:clone
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp
index 04e7aaba..fded49ec 100755
--- a/src/common/basic_utils.cpp
+++ b/src/common/basic_utils.cpp
@@ -20,6 +20,10 @@ limitations under the License.
#include <sstream>
#include <sys/resource.h>
+#include "pal_utl.h"
+int my_inet_pton4(const char *src, unsigned char *dst);
bool utl_is_file_exists (const std::string& name) {
if (FILE *file = fopen(name.c_str(), "r")) {
@@ -174,13 +178,6 @@ void TestDump(void){
utl_DumpBuffer2(stdout,buffer,31,1,4,SHOW_BUFFER_ADDR_EN |SHOW_BUFFER_CHAR);
-utl_macaddr_to_str(const uint8_t *mac) {
- std::string tmp;
- utl_macaddr_to_str(mac, tmp);
- return tmp;
void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) {
for (int i = 0; i < 6; i++) {
@@ -197,6 +194,26 @@ void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) {
+std::string utl_macaddr_to_str(const uint8_t *macaddr) {
+ std::string tmp;
+ utl_macaddr_to_str(macaddr, tmp);
+ return tmp;
+bool utl_str_to_macaddr(const std::string &s, uint8_t *mac) {
+ int last = -1;
+ int rc = sscanf(s.c_str(), "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%n",
+ mac + 0, mac + 1, mac + 2, mac + 3, mac + 4, mac + 5,
+ &last);
+ if ( (rc != 6) || (s.size() != last) ) {
+ return false;
+ }
+ return true;
* generate a random connection handler
@@ -255,3 +272,23 @@ void utl_set_coredump_size(long size, bool map_huge_pages) {
fprintf(fp, "%08x\n", mask);
+bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
+ uint32_t tmp;
+ int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp);
+ if (!rc) {
+ return false;
+ }
+ ipv4_num = PAL_NTOHL(tmp);
+ return true;
+std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) {
+ std::stringstream ss;
+ ss << ((ipv4_addr >> 24) & 0xff) << "." << ((ipv4_addr >> 16) & 0xff) << "." << ((ipv4_addr >> 8) & 0xff) << "." << (ipv4_addr & 0xff);
+ return ss.str();
diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h
index 1079ecfc..36f9db85 100755
--- a/src/common/basic_utils.h
+++ b/src/common/basic_utils.h
@@ -87,6 +87,8 @@ bool utl_is_file_exists (const std::string& name) ;
void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output);
std::string utl_macaddr_to_str(const uint8_t *macaddr);
+bool utl_str_to_macaddr(const std::string &s, uint8_t *mac);
std::string utl_generate_random_str(unsigned int &seed, int len);
@@ -99,6 +101,9 @@ std::string utl_generate_random_str(unsigned int &seed, int len);
void utl_set_coredump_size(long size, bool map_huge_pages = false);
+bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
+std::string utl_uint32_to_ipv4(uint32_t ipv4_addr);
diff --git a/src/common/captureFile.h b/src/common/captureFile.h
index fefa62bd..d87e57b6 100755
--- a/src/common/captureFile.h
+++ b/src/common/captureFile.h
@@ -227,9 +227,10 @@ class CFileWriterBase {
- virtual ~CFileWriterBase(){};
- virtual bool Create(char * name) = 0;
+ virtual ~CFileWriterBase(){};
+ virtual bool Create(char * name) = 0;
virtual bool write_packet(CCapPktRaw * lpPacket)=0;
+ virtual void flush_to_disk() = 0;
diff --git a/src/common/erf.cpp b/src/common/erf.cpp
index 76945b01..f872a281 100755
--- a/src/common/erf.cpp
+++ b/src/common/erf.cpp
@@ -280,7 +280,11 @@ bool CErfFileWriter::write_packet(CCapPktRaw * lpPacket){
return true;
+void CErfFileWriter::flush_to_disk() {
+ if (m_fd) {
+ fflush(m_fd);
+ }
bool CPcapFileWriter::Create(char *file_name){
diff --git a/src/common/erf.h b/src/common/erf.h
index e1b83e46..bec94872 100755
--- a/src/common/erf.h
+++ b/src/common/erf.h
@@ -224,6 +224,13 @@ public:
virtual bool Create(char *file_name);
void Delete();
virtual bool write_packet(CCapPktRaw * lpPacket);
+ /**
+ * flush all packets to disk
+ *
+ */
+ void flush_to_disk();
FILE *m_fd;
int m_cnt;
diff --git a/src/common/pcap.cpp b/src/common/pcap.cpp
index 8e9bf0ac..b976aed7 100755
--- a/src/common/pcap.cpp
+++ b/src/common/pcap.cpp
@@ -223,11 +223,18 @@ bool LibPCapWriter::Create(char * name)
printf(" ERROR create file \n");
/* prepare the write counter */
m_pkt_count = 0;
return init();
+void LibPCapWriter::flush_to_disk() {
+ if (m_is_open) {
+ fflush(m_file_handler);
+ }
* Write the libpcap header.
diff --git a/src/common/pcap.h b/src/common/pcap.h
index 3f8dfd21..265ea17b 100755
--- a/src/common/pcap.h
+++ b/src/common/pcap.h
@@ -1,5 +1,5 @@
-#ifndef __LIBPCAP_H__
-#define __LIBPCAP_H__
+#ifndef __TREX_LIBPCAP_H__
+#define __TREX_LIBPCAP_H__
Copyright (c) 2015-2015 Cisco Systems, Inc.
@@ -143,6 +143,13 @@ public:
void Close();
+ /**
+ * flush all packets to disk
+ *
+ * @author imarom (11/24/2016)
+ */
+ void flush_to_disk();
bool init();
@@ -151,4 +158,5 @@ private:
bool m_is_open;
uint32_t m_pkt_count;
diff --git a/src/debug.cpp b/src/debug.cpp
index d0b7cf11..e272424c 100644
--- a/src/debug.cpp
+++ b/src/debug.cpp
@@ -142,7 +142,7 @@ rte_mbuf_t *CTrexDebug::create_test_pkt(int ip_ver, uint16_t l4_proto, uint8_t t
pkt = CTestPktGen::create_test_pkt(l3_type, l4_proto, ttl, ip_id, flags, 1000, pkt_size);
- /* DEBUG print the packet
+ /* DEBUG print the packet
utl_k12_pkt_format(stdout,pkt, pkt_size) ;
@@ -370,6 +370,7 @@ int CTrexDebug::verify_hw_rules(bool recv_all) {
case STL:
if ( CGlobalInfo::m_options.is_stateless() ) {
exp_q = MAIN_DPDK_RX_Q;
+ pkt_flags |= DPF_TOS_1;
} else {
@@ -379,6 +380,7 @@ int CTrexDebug::verify_hw_rules(bool recv_all) {
} else {
exp_q = MAIN_DPDK_RX_Q;
+ pkt_flags |= DPF_TOS_1;
diff --git a/src/dpdk/drivers/net/enic/enic_ethdev.c b/src/dpdk/drivers/net/enic/enic_ethdev.c
index c05476b2..6a86e23f 100644
--- a/src/dpdk/drivers/net/enic/enic_ethdev.c
+++ b/src/dpdk/drivers/net/enic/enic_ethdev.c
@@ -436,22 +436,6 @@ static void enicpmd_dev_stats_reset(struct rte_eth_dev *eth_dev)
-int enicpmd_dev_get_fw_support(int port_id,
- uint32_t *ver){
- struct rte_eth_dev *dev;
- dev = &rte_eth_devices[port_id];
- *ver=0;
- struct enic *enic = pmd_priv(dev);
- enic->adv_filters;
- if ( enic->adv_filters ==0 ) {
- return (-1);
- }
- return (0);
static void enicpmd_dev_info_get(struct rte_eth_dev *eth_dev,
diff --git a/src/dpdk/drivers/net/enic/enic_main.c b/src/dpdk/drivers/net/enic/enic_main.c
index 889bc692..473bfc3c 100644
--- a/src/dpdk/drivers/net/enic/enic_main.c
+++ b/src/dpdk/drivers/net/enic/enic_main.c
@@ -166,6 +166,7 @@ void enic_dev_stats_get(struct enic *enic, struct rte_eth_stats *r_stats)
/* The number of truncated packets can only be calculated by
* subtracting a hardware counter from error packets received by
* the driver. Note: this causes transient inaccuracies in the
@@ -180,7 +181,7 @@ void enic_dev_stats_get(struct enic *enic, struct rte_eth_stats *r_stats)
r_stats->ipackets = stats->rx.rx_frames_ok - rx_truncated;
r_stats->opackets = stats->tx.tx_frames_ok;
- r_stats->ibytes = stats->rx.rx_bytes_ok;
+ r_stats->ibytes = stats->rx.rx_unicast_bytes_ok+stats->rx.rx_multicast_bytes_ok+stats->rx.rx_broadcast_bytes_ok;
r_stats->obytes = stats->tx.tx_bytes_ok;
r_stats->ierrors = stats->rx.rx_errors + stats->rx.rx_drop;
diff --git a/src/dpdk/drivers/net/ixgbe/ixgbe_ethdev.c b/src/dpdk/drivers/net/ixgbe/ixgbe_ethdev.c
index d478a159..72963a89 100644
--- a/src/dpdk/drivers/net/ixgbe/ixgbe_ethdev.c
+++ b/src/dpdk/drivers/net/ixgbe/ixgbe_ethdev.c
@@ -5784,13 +5784,17 @@ ixgbe_add_del_ethertype_filter(struct rte_eth_dev *dev,
if (filter->queue >= IXGBE_MAX_RX_QUEUE_NUM)
return -EINVAL;
+#define TREX_PATCH
+#ifndef TREX_PATCH
+ // no real reason to block this.
+ // We configure rules using FDIR and ethertype that point to same queue, so there are no race condition issues.
if (filter->ether_type == ETHER_TYPE_IPv4 ||
filter->ether_type == ETHER_TYPE_IPv6) {
PMD_DRV_LOG(ERR, "unsupported ether_type(0x%04x) in"
" ethertype filter.", filter->ether_type);
return -EINVAL;
if (filter->flags & RTE_ETHTYPE_FLAGS_MAC) {
PMD_DRV_LOG(ERR, "mac compare is unsupported.");
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 84be590f..92cbca6e 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -461,6 +461,7 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() {
memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
m_num_ports = 0; // need to call create to init
CFlowStatRuleMgr::~CFlowStatRuleMgr() {
@@ -802,11 +803,14 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
if (m_num_started_streams == 0) {
send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
//also good time to zero global counters
memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
+ #if 0
// wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
// start transmitting packets only after it is working, otherwise, packets will get lost.
if (m_rx_core) { // in simulation, m_rx_core will be NULL
@@ -819,6 +823,8 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
+ #endif
} else {
// make sure rx core is working. If not, we got really confused somehow.
if (m_rx_core)
@@ -966,13 +972,24 @@ int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) {
extern bool rx_should_stop;
void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
TrexStatelessCpToRxMsgBase *msg;
if (is_start) {
- msg = new TrexStatelessRxStartMsg();
+ static MsgReply<bool> reply;
+ reply.reset();
+ msg = new TrexStatelessRxEnableLatency(reply);
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
+ /* hold until message was ack'ed - otherwise we might lose packets */
+ if (m_rx_core) {
+ reply.wait_for_reply();
+ assert(m_rx_core->is_working());
+ }
} else {
- msg = new TrexStatelessRxStopMsg();
+ msg = new TrexStatelessRxDisableLatency();
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
- m_ring_to_rx->Enqueue((CGenNode *)msg);
// return false if no counters changed since last run. true otherwise
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 631f9a3e..5723503c 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -28,6 +28,7 @@ limitations under the License.
#include <string.h>
#include "flow_stat_parser.h"
#include "trex_defs.h"
+#include "trex_stateless_rx_defs.h"
#include "trex_port_attr.h"
#include <json/json.h>
@@ -112,19 +113,13 @@ public:
IF_STAT_RX_BYTES_COUNT = 8, // Card support counting rx bytes
- struct mac_cfg_st {
- uint8_t hw_macaddr[6];
- uint8_t src_macaddr[6];
- uint8_t dst_macaddr[6];
- };
* interface static info
struct intf_info_st {
std::string driver_name;
- mac_cfg_st mac_info;
+ uint8_t hw_macaddr[6];
std::string pci_addr;
int numa_node;
bool has_crc;
@@ -234,7 +229,7 @@ public:
info.has_crc = true;
info.numa_node = 0;
- memset(&info.mac_info, 0, sizeof(info.mac_info));
+ memset(&info.hw_macaddr, 0, sizeof(info.hw_macaddr));
virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const {
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 6f18b0b8..c9e0c6a1 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -132,7 +132,7 @@ static char global_master_id_str[10];
class CTRexExtendedDriverBase {
/* by default NIC driver adds CRC */
virtual bool has_crc_added() {
return true;
@@ -168,6 +168,7 @@ public:
virtual CFlowStatParser *get_flow_stat_parser();
virtual int set_rcv_all(CPhyEthIF * _if, bool set_on)=0;
virtual TRexPortAttr * create_port_attr(uint8_t port_id) = 0;
+ virtual uint8_t get_num_crc_fix_bytes() {return 0;}
/* Does this NIC type support automatic packet dropping in case of a link down?
in case it is supported the packets will be dropped, else there would be a back pressure to tx queues
@@ -277,6 +278,18 @@ public:
virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return 0;}
+class CTRexExtendedDriverBaseE1000 : public CTRexExtendedDriverBase1GVm {
+ CTRexExtendedDriverBaseE1000() {
+ // E1000 driver is only relevant in VM in our case
+ CGlobalInfo::m_options.preview.set_vm_one_queue_enable(true);
+ }
+ static CTRexExtendedDriverBase * create() {
+ return ( new CTRexExtendedDriverBaseE1000() );
+ }
+ // e1000 driver handing us packets with ethernet CRC, so we need to chop them
+ virtual uint8_t get_num_crc_fix_bytes() {return 4;}
class CTRexExtendedDriverBase10G : public CTRexExtendedDriverBase {
@@ -316,7 +329,8 @@ public:
| TrexPlatformApi::IF_STAT_PAYLOAD;
virtual CFlowStatParser *get_flow_stat_parser();
- virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return 0;}
+ int add_del_eth_filter(CPhyEthIF * _if, bool is_add, uint16_t ethertype);
+ virtual int set_rcv_all(CPhyEthIF * _if, bool set_on);
class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G {
@@ -379,7 +393,7 @@ private:
uint8_t m_if_per_card;
-class CTRexExtendedDriverBaseVIC : public CTRexExtendedDriverBase40G {
+class CTRexExtendedDriverBaseVIC : public CTRexExtendedDriverBase {
@@ -396,6 +410,22 @@ public:
virtual bool is_hardware_filter_is_supported(){
return (true);
+ virtual void update_global_config_fdir(port_cfg_t * cfg){
+ }
+ virtual bool is_hardware_support_drop_queue(){
+ return(true);
+ }
+ void clear_extended_stats(CPhyEthIF * _if);
+ void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
+ virtual int get_min_sample_rate(void){
+ }
virtual int verify_fw_ver(int i);
@@ -556,7 +586,7 @@ private:
/* virtual devices */
- register_driver(std::string("rte_em_pmd"),CTRexExtendedDriverBase1GVm::create);
+ register_driver(std::string("rte_em_pmd"),CTRexExtendedDriverBaseE1000::create);
@@ -1078,7 +1108,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
if (po->preview.get_is_rx_check_enable() || po->is_latency_enabled() || CGlobalInfo::is_learn_mode()
- || (CGlobalInfo::m_options.m_arp_ref_per != 0)) {
+ || (CGlobalInfo::m_options.m_arp_ref_per != 0) || get_vm_one_queue_enable()) {
@@ -1627,6 +1657,7 @@ int DpdkTRexPortAttr::set_led(bool on){
int DpdkTRexPortAttr::get_flow_ctrl(int &mode) {
int ret = rte_eth_dev_flow_ctrl_get(m_port_id, &fc_conf_tmp);
if (ret) {
+ mode = -1;
return ret;
mode = (int) fc_conf_tmp.mode;
@@ -1723,12 +1754,19 @@ bool DpdkTRexPortAttr::update_link_status_nowait(){
rte_eth_link new_link;
bool changed = false;
rte_eth_link_get_nowait(m_port_id, &new_link);
if (new_link.link_speed != m_link.link_speed ||
new_link.link_duplex != m_link.link_duplex ||
new_link.link_autoneg != m_link.link_autoneg ||
new_link.link_status != m_link.link_status) {
changed = true;
+ /* in case of link status change - notify the dest object */
+ if (new_link.link_status != m_link.link_status) {
+ get_dest().on_link_down();
+ }
m_link = new_link;
return changed;
@@ -1770,7 +1808,7 @@ bool DpdkTRexPortAttr::get_promiscuous(){
-void DpdkTRexPortAttr::macaddr_get(struct ether_addr *mac_addr){
+void DpdkTRexPortAttr::get_hw_src_mac(struct ether_addr *mac_addr){
rte_eth_macaddr_get(m_port_id , mac_addr);
@@ -1961,7 +1999,6 @@ public:
virtual int send_node(CGenNode * node);
virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m);
virtual int flush_tx_queue(void);
- __attribute__ ((noinline)) void handle_rx_queue();
__attribute__ ((noinline)) void handle_slowpath_features(CGenNode *node, rte_mbuf_t *m, uint8_t *p, pkt_dir_t dir);
void apply_client_cfg(const ClientCfgBase *cfg, rte_mbuf_t *m, pkt_dir_t dir, uint8_t *p);
@@ -2037,46 +2074,6 @@ bool CCoreEthIF::Create(uint8_t core_id,
return (true);
-// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have
-// rules to drop queue 0 packets, and pass queue 1 packets to RX core, like in other cases.
-// We receive all packets in the same core that transmitted, and handle them to RX core.
-void CCoreEthIF::handle_rx_queue(void) {
- if ( likely( ! get_vm_one_queue_enable() ) ) {
- return;
- }
- pkt_dir_t dir;
- bool is_rx = get_is_rx_thread_enabled();
- for (dir=CLIENT_SIDE; dir<CS_NUM; dir++) {
- CCorePerPort * lp_port=&m_ports[dir];
- CPhyEthIF * lp=lp_port->m_port;
- rte_mbuf_t * rx_pkts[32];
- int j=0;
- while (true) {
- j++;
- uint16_t cnt =lp->rx_burst(0,rx_pkts,32);
- if ( cnt ) {
- int i;
- for (i=0; i<(int)cnt;i++) {
- rte_mbuf_t * m=rx_pkts[i];
- if ( is_rx ){
- if (!process_rx_pkt(dir,m)){
- rte_pktmbuf_free(m);
- }
- }else{
- rte_pktmbuf_free(m);
- }
- }
- }
- if ((cnt<5) || j>10 ) {
- break;
- }
- }
- }
int CCoreEthIF::flush_tx_queue(void){
/* flush both sides */
pkt_dir_t dir;
@@ -2089,8 +2086,6 @@ int CCoreEthIF::flush_tx_queue(void){
- handle_rx_queue();
return 0;
@@ -2580,10 +2575,11 @@ private:
class CLatencyVmPort : public CPortLatencyHWBase {
void Create(uint8_t port_index,CNodeRing * ring,
- CLatencyManager * mgr){
+ CLatencyManager * mgr, CPhyEthIF * p) {
m_dir = (port_index%2);
m_ring_to_dp = ring;
m_mgr = mgr;
+ m_port = p;
virtual int tx(rte_mbuf_t * m){
@@ -2610,17 +2606,23 @@ public:
return (-1);
- virtual rte_mbuf_t * rx(){
- return (0);
+ virtual rte_mbuf_t * rx() {
+ rte_mbuf_t * rx_pkts[1];
+ uint16_t cnt = m_port->rx_burst(0, rx_pkts, 1);
+ if (cnt) {
+ return (rx_pkts[0]);
+ } else {
+ return (0);
+ }
- virtual uint16_t rx_burst(struct rte_mbuf **rx_pkts,
- uint16_t nb_pkts){
- return (0);
+ virtual uint16_t rx_burst(struct rte_mbuf **rx_pkts, uint16_t nb_pkts) {
+ uint16_t cnt = m_port->rx_burst(0, rx_pkts, nb_pkts);
+ return (cnt);
+ CPhyEthIF * m_port;
uint8_t m_dir;
CNodeRing * m_ring_to_dp; /* ring dp -> latency thread */
CLatencyManager * m_mgr;
@@ -3128,9 +3130,9 @@ public:
uint32_t m_max_ports; /* active number of ports supported options are 2,4,8,10,12 */
uint32_t m_max_cores; /* current number of cores , include master and latency ==> ( master)1+c*(m_max_ports>>1)+1( latency ) */
uint32_t m_cores_mul; /* how cores multipler given c=4 ==> m_cores_mul */
- uint32_t m_max_queues_per_port;
- uint32_t m_cores_to_dual_ports; /* number of ports that will handle dual ports */
- uint16_t m_latency_tx_queue_id;
+ uint32_t m_max_queues_per_port; // Number of TX queues per port
+ uint32_t m_cores_to_dual_ports; /* number of TX cores allocated for each port pair */
+ uint16_t m_rx_core_tx_q_id; /* TX q used by rx core */
// statistic
CPPSMeasure m_cps;
float m_expected_pps;
@@ -3317,11 +3319,20 @@ void CGlobalTRex::pre_test() {
// we don't have dest MAC. Get it from what we resolved.
uint32_t ip = CGlobalInfo::m_options.m_ip_cfg[port_id].get_def_gw();
uint16_t vlan = CGlobalInfo::m_options.m_ip_cfg[port_id].get_vlan();
- if (! pretest.get_mac(port_id, ip, vlan, mac)) {
+ if (!pretest.get_mac(port_id, ip, vlan, mac)) {
fprintf(stderr, "Failed resolving dest MAC for default gateway:%d.%d.%d.%d on port %d\n"
, (ip >> 24) & 0xFF, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF, port_id);
- exit(1);
+ if (get_is_stateless()) {
+ continue;
+ } else {
+ exit(1);
+ }
memcpy(CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest, mac, ETHER_ADDR_LEN);
// if port is connected in loopback, no need to send gratuitous ARP. It will only confuse our ingress counters.
if (need_grat_arp[port_id] && (! pretest.is_loopback(port_id))) {
@@ -3340,6 +3351,16 @@ void CGlobalTRex::pre_test() {
// Configure port back to normal mode. Only relevant packets handled by software.
CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(pif, false);
+ /* set resolved IPv4 */
+ uint32_t dg = CGlobalInfo::m_options.m_ip_cfg[port_id].get_def_gw();
+ const uint8_t *dst_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest;
+ if (dg) {
+ m_ports[port_id].get_port_attr()->get_dest().set_dest(dg, dst_mac);
+ } else {
+ m_ports[port_id].get_port_attr()->get_dest().set_dest(dst_mac);
+ }
@@ -3408,12 +3429,14 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){
void CGlobalTRex::try_stop_all_cores(){
TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit();
- TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit();
+ delete dp_msg;
if (get_is_stateless()) {
+ TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit();
- delete dp_msg;
// no need to delete rx_msg. Deleted by receiver
bool all_core_finished = false;
int i;
@@ -3463,6 +3486,7 @@ int CGlobalTRex::ixgbe_rx_queue_flush(){
+// init stateful rx core
void CGlobalTRex::ixgbe_configure_mg(void) {
int i;
CLatencyManagerCfg mg_cfg;
@@ -3485,13 +3509,13 @@ void CGlobalTRex::ixgbe_configure_mg(void) {
if ( get_vm_one_queue_enable() ) {
/* vm mode, indirect queues */
for (i=0; i<m_max_ports; i++) {
+ CPhyEthIF * _if = &m_ports[i];
CMessagingManager * rx_dp=CMsgIns::Ins()->getRxDp();
uint8_t thread_id = (i>>1);
CNodeRing * r = rx_dp->getRingCpToDp(thread_id);
- m_latency_vm_vports[i].Create((uint8_t)i,r,&m_mg);
+ m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg, _if);
mg_cfg.m_ports[i] =&m_latency_vm_vports[i];
@@ -3500,7 +3524,7 @@ void CGlobalTRex::ixgbe_configure_mg(void) {
for (i=0; i<m_max_ports; i++) {
CPhyEthIF * _if=&m_ports[i];
- m_latency_vports[i].Create(_if,m_latency_tx_queue_id,1);
+ m_latency_vports[i].Create(_if, m_rx_core_tx_q_id, 1);
mg_cfg.m_ports[i] =&m_latency_vports[i];
@@ -3517,20 +3541,22 @@ void CGlobalTRex::rx_sl_configure(void) {
int i;
rx_sl_cfg.m_max_ports = m_max_ports;
+ rx_sl_cfg.m_num_crc_fix_bytes = get_ex_drv()->get_num_crc_fix_bytes();
if ( get_vm_one_queue_enable() ) {
/* vm mode, indirect queues */
for (i=0; i < m_max_ports; i++) {
+ CPhyEthIF * _if = &m_ports[i];
CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
uint8_t thread_id = (i >> 1);
CNodeRing * r = rx_dp->getRingCpToDp(thread_id);
- m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg);
+ m_latency_vm_vports[i].Create(i, r, &m_mg, _if);
rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i];
} else {
for (i = 0; i < m_max_ports; i++) {
CPhyEthIF * _if = &m_ports[i];
- m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1);
+ m_latency_vports[i].Create(_if, m_rx_core_tx_q_id, 1);
rx_sl_cfg.m_ports[i] = &m_latency_vports[i];
@@ -3541,88 +3567,53 @@ void CGlobalTRex::rx_sl_configure(void) {
int CGlobalTRex::ixgbe_start(void){
int i;
for (i=0; i<m_max_ports; i++) {
+ socket_id_t socket_id = CGlobalInfo::m_socket.port_to_socket((port_id_t)i);
+ assert(CGlobalInfo::m_mem_pool[socket_id].m_mbuf_pool_2048);
CPhyEthIF * _if=&m_ports[i];
- /* last TX queue if for latency check */
- if ( get_vm_one_queue_enable() ) {
- /* one tx one rx */
+ if ( get_vm_one_queue_enable() ) {
/* VMXNET3 does claim to support 16K but somehow does not work */
/* reduce to 2000 */
m_port_cfg.m_port_conf.rxmode.max_rx_pkt_len = 2000;
- _if->configure(1,
- 1,
- &m_port_cfg.m_port_conf);
- /* will not be used */
- m_latency_tx_queue_id= m_cores_to_dual_ports;
- socket_id_t socket_id = CGlobalInfo::m_socket.port_to_socket((port_id_t)i);
- assert(CGlobalInfo::m_mem_pool[socket_id].m_mbuf_pool_2048);
+ /* In VM case, there is one tx q and one rx q */
+ _if->configure(1, 1, &m_port_cfg.m_port_conf);
+ // Only 1 rx queue, so use it for everything
+ m_rx_core_tx_q_id = 0;
- _if->rx_queue_setup(0,
- socket_id,
- &m_port_cfg.m_rx_conf,
+ _if->rx_queue_setup(0, RTE_TEST_RX_DESC_VM_DEFAULT, socket_id, &m_port_cfg.m_rx_conf,
+ // 1 TX queue in VM case
+ _if->tx_queue_setup(0, RTE_TEST_TX_DESC_VM_DEFAULT, socket_id, &m_port_cfg.m_tx_conf);
+ } else {
+ // 2 rx queues.
+ // TX queues: 1 for each core handling the port pair + 1 for latency pkts + 1 for use by RX core
+ _if->configure(2, m_cores_to_dual_ports + 2, &m_port_cfg.m_port_conf);
+ m_rx_core_tx_q_id = m_cores_to_dual_ports;
- int qid;
- for ( qid=0; qid<(m_max_queues_per_port); qid++) {
- _if->tx_queue_setup((uint16_t)qid,
- socket_id,
- &m_port_cfg.m_tx_conf);
- }
- }else{
- _if->configure(2,
- m_cores_to_dual_ports+1,
- &m_port_cfg.m_port_conf);
- /* the latency queue for latency measurement packets */
- m_latency_tx_queue_id= m_cores_to_dual_ports;
- socket_id_t socket_id = CGlobalInfo::m_socket.port_to_socket((port_id_t)i);
- assert(CGlobalInfo::m_mem_pool[socket_id].m_mbuf_pool_2048);
- /* drop queue */
- _if->rx_queue_setup(0,
+ // setup RX drop queue
+ _if->rx_queue_setup(MAIN_DPDK_DATA_Q,
- /* set the filter queue */
- _if->set_rx_queue(1);
- /* latency measurement ring is 1 */
- _if->rx_queue_setup(1,
+ // setup RX filter queue
+ _if->set_rx_queue(MAIN_DPDK_RX_Q);
+ _if->rx_queue_setup(MAIN_DPDK_RX_Q,
- int qid;
- for ( qid=0; qid<(m_max_queues_per_port+1); qid++) {
+ // setup TX queues
+ for (int qid = 0; qid < m_max_queues_per_port; qid++) {
@@ -3647,8 +3638,7 @@ int CGlobalTRex::ixgbe_start(void){
printf(" WARNING : there is no link on one of the ports, driver support auto drop in case of link down - continue\n");
- rte_exit(EXIT_FAILURE, " "
- " one of the link is down \n");
+ rte_exit(EXIT_FAILURE, " One of the links is down \n");
} else {
@@ -3679,7 +3669,7 @@ int CGlobalTRex::ixgbe_start(void){
if ( get_vm_one_queue_enable() ) {
lat_q_id = 0;
} else {
- lat_q_id = get_cores_tx() / get_base_num_cores();
+ lat_q_id = get_cores_tx() / get_base_num_cores() + 1;
for (i=0; i<get_cores_tx(); i++) {
int j=(i+1);
@@ -3704,6 +3694,7 @@ int CGlobalTRex::ixgbe_start(void){
fprintf(stdout," -------------------------------\n");
+ fprintf(stdout, "RX core uses TX queue number %d on all ports\n", m_rx_core_tx_q_id);
for (i=0; i<get_cores_tx(); i++) {
@@ -3807,7 +3798,14 @@ bool CGlobalTRex::Create(){
void CGlobalTRex::Delete(){
+ m_fl.Delete();
+ if (m_trex_stateless) {
+ delete m_trex_stateless;
+ m_trex_stateless = NULL;
+ }
@@ -3930,12 +3928,12 @@ int CGlobalTRex::queues_prob_init(){
m_cores_to_dual_ports = 2;
- /* number of queue - 1 per core for dual ports*/
- m_max_queues_per_port = m_cores_to_dual_ports;
+ // One q for each core allowed to send on this port + 1 for latency q (Used in stateless) + 1 for RX core.
+ m_max_queues_per_port = m_cores_to_dual_ports + 2;
if (m_max_queues_per_port > BP_MAX_TX_QUEUE) {
- "maximum number of queue should be maximum %d \n",BP_MAX_TX_QUEUE);
+ "Error: Number of TX queues exceeds %d. Try running with lower -c <val> \n",BP_MAX_TX_QUEUE);
@@ -4405,18 +4403,9 @@ CGlobalTRex:: publish_async_port_attr_changed(uint8_t port_id) {
Json::Value data;
data["port_id"] = port_id;
TRexPortAttr * _attr = m_ports[port_id].get_port_attr();
- /* attributes */
- data["attr"]["speed"] = _attr->get_link_speed();
- data["attr"]["promiscuous"]["enabled"] = _attr->get_promiscuous();
- data["attr"]["link"]["up"] = _attr->is_link_up();
- int mode;
- int ret = _attr->get_flow_ctrl(mode);
- if (ret != 0) {
- mode = -1;
- }
- data["attr"]["fc"]["mode"] = mode;
+ _attr->to_json(data["attr"]);
m_zmq_publisher.publish_event(TrexPublisher::EVENT_PORT_ATTR_CHANGED, data);
@@ -4617,8 +4606,11 @@ void CGlobalTRex::shutdown() {
for (int i = 0; i < m_max_ports; i++) {
if (m_mark_for_shutdown != SHUTDOWN_TEST_ENDED) {
/* we should stop latency and exit to stop agents */
+ Delete();
+ utl_termio_reset();
@@ -4782,7 +4774,6 @@ int CGlobalTRex::stop_master(){
- m_fl.Delete();
return (0);
@@ -4922,6 +4913,18 @@ bool CPhyEthIF::Create(uint8_t portid) {
m_last_tx_pps = 0.0;
m_port_attr = g_trex.m_drv->create_port_attr(portid);
+ uint32_t src_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_ip();
+ if (src_ipv4) {
+ m_port_attr->set_src_ipv4(src_ipv4);
+ }
+ /* for now set as unresolved IPv4 destination */
+ uint32_t dest_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_def_gw();
+ if (dest_ipv4) {
+ m_port_attr->get_dest().set_dest(dest_ipv4);
+ }
return true;
@@ -5623,7 +5626,7 @@ int main_test(int argc , char * argv[]){
return (0);
@@ -6001,6 +6004,7 @@ void CTRexExtendedDriverBase10G::update_configuration(port_cfg_t * cfg){
int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if) {
+ set_rcv_all(_if, false);
if ( get_is_stateless() ) {
return configure_rx_filter_rules_stateless(_if);
} else {
@@ -6031,7 +6035,7 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules_stateless(CPhyEthIF *
res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter);
if (res != 0) {
- rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res);
+ rte_exit(EXIT_FAILURE, "Error: rte_eth_dev_filter_ctrl in configure_rx_filter_rules_stateless: %d\n",res);
@@ -6100,12 +6104,51 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules_statefull(CPhyEthIF *
res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter);
if (res != 0) {
- rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res);
+ rte_exit(EXIT_FAILURE, "Error: rte_eth_dev_filter_ctrl in configure_rx_filter_rules_statefull: %d\n",res);
return (0);
+int CTRexExtendedDriverBase10G::add_del_eth_filter(CPhyEthIF * _if, bool is_add, uint16_t ethertype) {
+ int res = 0;
+ uint8_t port_id=_if->get_rte_port_id();
+ struct rte_eth_ethertype_filter filter;
+ enum rte_filter_op op;
+ memset(&filter, 0, sizeof(filter));
+ filter.ether_type = ethertype;
+ res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_ETHERTYPE, RTE_ETH_FILTER_GET, &filter);
+ if (is_add && (res >= 0))
+ return 0;
+ if ((! is_add) && (res == -ENOENT))
+ return 0;
+ if (is_add) {
+ } else {
+ }
+ filter.queue = 1;
+ res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_ETHERTYPE, op, &filter);
+ if (res != 0) {
+ printf("Error: %s L2 filter for ethertype 0x%04x returned %d\n", is_add ? "Adding":"Deleting", ethertype, res);
+ exit(1);
+ }
+ return 0;
+int CTRexExtendedDriverBase10G::set_rcv_all(CPhyEthIF * _if, bool set_on) {
+ int res = 0;
+ res = add_del_eth_filter(_if, set_on, ETHER_TYPE_ARP);
+ res |= add_del_eth_filter(_if, set_on, ETHER_TYPE_IPv4);
+ res |= add_del_eth_filter(_if, set_on, ETHER_TYPE_IPv6);
+ return res;
void CTRexExtendedDriverBase10G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats){
int i;
@@ -6852,26 +6895,58 @@ int CTRexExtendedDriverBaseVIC::configure_rx_filter_rules_statefull(CPhyEthIF *
return 0;
-extern "C" int enicpmd_dev_get_fw_support(int port_id,
- uint32_t *ver);
+void CTRexExtendedDriverBaseVIC::clear_extended_stats(CPhyEthIF * _if){
+ rte_eth_stats_reset(_if->get_port_id());
+void CTRexExtendedDriverBaseVIC::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats) {
+ struct rte_eth_stats stats1;
+ struct rte_eth_stats *prev_stats = &stats->m_prev_stats;
+ rte_eth_stats_get(_if->get_port_id(), &stats1);
+ stats->ipackets += stats1.ipackets - prev_stats->ipackets;
+ stats->ibytes += stats1.ibytes - prev_stats->ibytes
+ - ((stats1.ipackets << 2) - (prev_stats->ipackets << 2));
+ stats->opackets += stats1.opackets - prev_stats->opackets;
+ stats->obytes += stats1.obytes - prev_stats->obytes;
+ stats->f_ipackets += 0;
+ stats->f_ibytes += 0;
+ stats->ierrors += stats1.imissed + stats1.ierrors + stats1.rx_nombuf
+ - prev_stats->imissed - prev_stats->ierrors - prev_stats->rx_nombuf;
+ stats->oerrors += stats1.oerrors - prev_stats->oerrors;
+ stats->imcasts += 0;
+ stats->rx_nombuf += stats1.rx_nombuf - prev_stats->rx_nombuf;
+ prev_stats->ipackets = stats1.ipackets;
+ prev_stats->ibytes = stats1.ibytes;
+ prev_stats->opackets = stats1.opackets;
+ prev_stats->obytes = stats1.obytes;
+ prev_stats->imissed = stats1.imissed;
+ prev_stats->oerrors = stats1.oerrors;
+ prev_stats->ierrors = stats1.ierrors;
+ prev_stats->rx_nombuf = stats1.rx_nombuf;
int CTRexExtendedDriverBaseVIC::verify_fw_ver(int port_id) {
- uint32_t ver;
- int ret=enicpmd_dev_get_fw_support(port_id,&ver);
+ struct rte_eth_fdir_info fdir_info;
- if (ret==0) {
- if (CGlobalInfo::m_options.preview.getVMode() >= 1) {
- printf("VIC port %d: FW support advanced filtering \n", port_id);
+ if ( rte_eth_dev_filter_ctrl(port_id,RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_INFO,(void *)&fdir_info) == 0 ){
+ if ( fdir_info.flow_types_mask[0] & (1<< RTE_ETH_FLOW_NONFRAG_IPV4_OTHER) ) {
+ /* support new features */
+ if (CGlobalInfo::m_options.preview.getVMode() >= 1) {
+ printf("VIC port %d: FW support advanced filtering \n", port_id);
+ }
+ return (0);
- }else{
- printf("Error: VIC firmware should upgrade to support advanced filtering \n");
- printf(" Please refer to %s for upgrade instructions\n",
- "");
- exit(1);
- return (0);
+ printf("Error: VIC firmware should upgrade to support advanced filtering \n");
+ printf(" Please refer to %s for upgrade instructions\n",
+ "");
+ exit(1);
@@ -7110,19 +7185,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
/* mac INFO */
/* hardware */
- g_trex.m_ports[interface_id].get_port_attr()->macaddr_get(&rte_mac_addr);
+ g_trex.m_ports[interface_id].get_port_attr()->get_hw_src_mac(&rte_mac_addr);
assert(ETHER_ADDR_LEN == 6);
- /* software */
- uint8_t sw_macaddr[12];
- memcpy(sw_macaddr, CGlobalInfo::m_options.get_dst_src_mac_addr(interface_id), 12);
- for (int i = 0; i < 6; i++) {
- info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i];
- info.mac_info.dst_macaddr[i] = sw_macaddr[i];
- info.mac_info.src_macaddr[i] = sw_macaddr[6 + i];
- }
+ memcpy(info.hw_macaddr, rte_mac_addr.addr_bytes, 6);
info.numa_node = g_trex.m_ports[interface_id].m_dev_info.pci_dev->numa_node;
struct rte_pci_addr *loc = &g_trex.m_ports[interface_id].m_dev_info.pci_dev->addr;
@@ -7228,6 +7294,25 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const {
return g_trex.m_ports[port_id].get_port_attr();
+int DpdkTRexPortAttr::set_rx_filter_mode(rx_filter_mode_e rx_filter_mode) {
+ if (rx_filter_mode == m_rx_filter_mode) {
+ return (0);
+ }
+ CPhyEthIF *_if = &g_trex.m_ports[m_port_id];
+ bool recv_all = (rx_filter_mode == RX_FILTER_MODE_ALL);
+ int rc = CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(_if, recv_all);
+ if (rc != 0) {
+ return (rc);
+ }
+ m_rx_filter_mode = rx_filter_mode;
+ return (0);
* marks the control plane for a total server shutdown
@@ -7236,3 +7321,6 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const {
void TrexDpdkPlatformApi::mark_for_shutdown() const {
diff --git a/src/pkt_gen.cpp b/src/pkt_gen.cpp
index 45e3a298..656b1b06 100644
--- a/src/pkt_gen.cpp
+++ b/src/pkt_gen.cpp
@@ -222,7 +222,7 @@ char *CTestPktGen::create_test_pkt(uint16_t l3_type, uint16_t l4_proto, uint8_t
switch(l3_type) {
case EthernetHeader::Protocol::IP:
+ if (flags & DPF_TOS_1) {
@@ -232,11 +232,11 @@ char *CTestPktGen::create_test_pkt(uint16_t l3_type, uint16_t l4_proto, uint8_t
case EthernetHeader::Protocol::IPv6:
+ if (flags & DPF_TOS_1) {
- }
+ }
diff --git a/src/pkt_gen.h b/src/pkt_gen.h
index 309e02b9..8dcba624 100644
--- a/src/pkt_gen.h
+++ b/src/pkt_gen.h
@@ -37,7 +37,8 @@ enum {
enum {
DPF_VLAN = 0x1,
+ DPF_RXCHECK = 0x4,
+ DPF_TOS_1 = 0x8,
class CTestPktGen {
diff --git a/src/pre_test.cpp b/src/pre_test.cpp
index df753be5..7127645d 100644
--- a/src/pre_test.cpp
+++ b/src/pre_test.cpp
@@ -35,6 +35,15 @@ CPretestOnePortInfo::CPretestOnePortInfo() {
+CPretestOnePortInfo::~CPretestOnePortInfo() {
+ for (std::vector<COneIPInfo *>::iterator it = m_src_info.begin(); it != m_src_info.end(); ++it) {
+ delete *it;
+ }
+ for (std::vector<COneIPInfo *>::iterator it = m_dst_info.begin(); it != m_dst_info.end(); ++it) {
+ delete *it;
+ }
void CPretestOnePortInfo::add_src(uint32_t ip, uint16_t vlan, MacAddress mac) {
COneIPv4Info *one_ip = new COneIPv4Info(ip, vlan, mac);
diff --git a/src/pre_test.h b/src/pre_test.h
index 9573ff02..14b444cf 100644
--- a/src/pre_test.h
+++ b/src/pre_test.h
@@ -49,6 +49,7 @@ class CPretestOnePortInfo {
+ ~CPretestOnePortInfo();
void add_src(uint32_t ip, uint16_t vlan, MacAddress mac);
void add_dst(uint32_t ip, uint16_t vlan);
void add_src(uint16_t ip[8], uint16_t vlan, MacAddress mac);
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 109cc1a4..3d541fe5 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -27,6 +27,8 @@ limitations under the License.
#include <internal_api/trex_platform_api.h>
+#include "trex_stateless_rx_core.h"
#include <fstream>
#include <iostream>
#include <unistd.h>
@@ -289,19 +291,15 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"] = Json::arrayValue;
for (int i = 0; i < main->get_port_count(); i++) {
- uint32_t speed;
string driver;
- string hw_macaddr;
- string src_macaddr;
- string dst_macaddr;
string pci_addr;
string description;
supp_speeds_t supp_speeds;
int numa;
TrexStatelessPort *port = main->get_port_by_id(i);
- port->get_properties(driver, speed);
- port->get_macaddr(hw_macaddr, src_macaddr, dst_macaddr);
+ port->get_properties(driver);
port->get_pci_info(pci_addr, numa);
@@ -311,9 +309,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"][i]["driver"] = driver;
section["ports"][i]["description"] = description;
- section["ports"][i]["hw_macaddr"] = hw_macaddr;
- section["ports"][i]["src_macaddr"] = src_macaddr;
- section["ports"][i]["dst_macaddr"] = dst_macaddr;
section["ports"][i]["pci_addr"] = pci_addr;
section["ports"][i]["numa"] = numa;
@@ -330,7 +325,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["ports"][i]["rx"]["counters"] = port->get_rx_count_num();
- section["ports"][i]["speed"] = (uint16_t) speed / 1000;
section["ports"][i]["is_fc_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_fc_change_supported();
section["ports"][i]["is_led_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_led_change_supported();
section["ports"][i]["is_link_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_link_change_supported();
@@ -345,6 +339,69 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
+TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+ const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result);
+ rx_filter_mode_e filter_mode;
+ if (type == "hw") {
+ filter_mode = RX_FILTER_MODE_HW;
+ } else if (type == "all") {
+ filter_mode = RX_FILTER_MODE_ALL;
+ } else {
+ /* can't happen - parsed choice */
+ assert(0);
+ }
+ return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode);
+TrexRpcCmdSetPortAttr::parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+ const std::string ipv4_str = parse_string(msg, "addr", result);
+ uint32_t ipv4_addr;
+ if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) {
+ std::stringstream ss;
+ ss << "invalid IPv4 address: '" << ipv4_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_src_ipv4(ipv4_addr);
+ return (0);
+TrexRpcCmdSetPortAttr::parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
+ /* can be either IPv4 or MAC */
+ const std::string addr = parse_string(msg, "addr", result);
+ TRexPortAttr *port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id);
+ /* try IPv4 */
+ uint32_t ipv4_addr;
+ uint8_t mac[6];
+ if (utl_ipv4_to_uint32(addr.c_str(), ipv4_addr)) {
+ port_attr->get_dest().set_dest(ipv4_addr);
+ } else if (utl_str_to_macaddr(addr, mac)) {
+ port_attr->get_dest().set_dest(mac);
+ } else {
+ std::stringstream ss;
+ ss << "'dest' is not an IPv4 address or a MAC address: '" << addr << "'";
+ generate_parse_err(result, ss.str());
+ }
+ return (0);
* set port commands
@@ -361,46 +418,64 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
const Json::Value &attr = parse_object(params, "attr", result);
int ret = 0;
- bool changed = false;
/* iterate over all attributes in the dict */
for (const std::string &name : attr.getMemberNames()) {
if (name == "promiscuous") {
bool enabled = parse_bool(attr[name], "enabled", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_promiscuous(enabled);
else if (name == "link_status") {
bool up = parse_bool(attr[name], "up", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_link_up(up);
else if (name == "led_status") {
bool on = parse_bool(attr[name], "on", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_led(on);
- } else if (name == "flow_ctrl_mode") {
+ }
+ else if (name == "flow_ctrl_mode") {
int mode = parse_int(attr[name], "mode", result);
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode);
- } else {
- generate_execute_err(result, "Not recognized attribute: " + name);
- break;
- if (ret != 0){
- if ( ret == -ENOTSUP ) {
- generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
- }
- else if (ret) {
- generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
- }
+ else if (name == "rx_filter_mode") {
+ const Json::Value &rx = parse_object(attr, name, result);
+ ret = parse_rx_filter_mode(rx, port_id, result);
+ }
+ else if (name == "ipv4") {
+ const Json::Value &ipv4 = parse_object(attr, name, result);
+ ret = parse_ipv4(ipv4, port_id, result);
+ }
+ else if (name == "dest") {
+ const Json::Value &dest = parse_object(attr, name, result);
+ ret = parse_dest(dest, port_id, result);
+ }
+ /* unknown attribute */
+ else {
+ generate_execute_err(result, "unknown attribute type: '" + name + "'");
- } else {
- changed = true;
- }
- if (changed) {
- get_stateless_obj()->get_platform_api()->publish_async_port_attr_changed(port_id);
- }
+ /* check error code */
+ if ( ret == -ENOTSUP ) {
+ generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
+ } else if (ret) {
+ generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
+ }
+ }
result["result"] = Json::objectValue;
return (TREX_RPC_CMD_OK);
@@ -568,18 +643,17 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name());
result["result"]["state"] = port->get_state_as_string();
result["result"]["max_stream_id"] = port->get_max_stream_id();
- result["result"]["speed"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_link_speed();
/* attributes */
- result["result"]["attr"]["promiscuous"]["enabled"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_promiscuous();
- result["result"]["attr"]["link"]["up"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->is_link_up();
- int mode;
- int ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_flow_ctrl(mode);
- if (ret != 0) {
- mode = -1;
+ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
+ /* RX info */
+ try {
+ result["result"]["rx_info"] = port->rx_features_to_json();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
- result["result"]["attr"]["fc"]["mode"] = mode;
return (TREX_RPC_CMD_OK);
@@ -640,3 +714,151 @@ TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
+ * set on/off RX software receive mode
+ *
+ */
+TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ /* decide which feature is being set */
+ const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result);
+ if (type == "capture") {
+ parse_capture_msg(params, port, result);
+ } else if (type == "queue") {
+ parse_queue_msg(params, port, result);
+ } else if (type == "server") {
+ parse_server_msg(params, port, result);
+ } else {
+ assert(0);
+ }
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+ bool enabled = parse_bool(msg, "enabled", result);
+ if (enabled) {
+ std::string pcap_filename = parse_string(msg, "pcap_filename", result);
+ uint64_t limit = parse_uint32(msg, "limit", result);
+ if (limit == 0) {
+ generate_parse_err(result, "limit cannot be zero");
+ }
+ try {
+ port->start_rx_capture(pcap_filename, limit);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+ } else {
+ try {
+ port->stop_rx_capture();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+ }
+TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+ bool enabled = parse_bool(msg, "enabled", result);
+ if (enabled) {
+ uint64_t size = parse_uint32(msg, "size", result);
+ if (size == 0) {
+ generate_parse_err(result, "queue size cannot be zero");
+ }
+ try {
+ port->start_rx_queue(size);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+ } else {
+ try {
+ port->stop_rx_queue();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+ }
+TrexRpcCmdSetRxFeature::parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+TrexRpcCmdGetRxQueuePkts::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ try {
+ const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
+ if (pkt_buffer) {
+ result["result"]["pkts"] = pkt_buffer->to_json();
+ delete pkt_buffer;
+ } else {
+ result["result"]["pkts"] = Json::arrayValue;
+ }
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+ return (TREX_RPC_CMD_OK);
+TrexRpcCmdSetARPRes::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ const std::string ipv4_str = parse_string(params, "ipv4", result);
+ const std::string mac_str = parse_string(params, "mac", result);
+ uint32_t ipv4_addr;
+ if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) {
+ std::stringstream ss;
+ ss << "invalid IPv4 address: '" << ipv4_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+ uint8_t mac[6];
+ if (!utl_str_to_macaddr(mac_str, mac)) {
+ std::stringstream ss;
+ ss << "'invalid MAC address: '" << mac_str << "'";
+ generate_parse_err(result, ss.str());
+ }
+ port->getPortAttrObj()->get_dest().set_dest(ipv4_addr, mac);
+ return (TREX_RPC_CMD_OK);
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index c950e011..9a57c5f9 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -670,6 +670,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
generate_parse_err(result, "start message can only specify absolute speed rate");
+ dsec_t ts = now_sec();
TrexPortMultiplier mul(type, op, value);
try {
@@ -680,7 +681,8 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
result["result"]["multiplier"] = port->get_multiplier();
+ result["result"]["ts"] = ts;
return (TREX_RPC_CMD_OK);
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 5fde1d0c..2b2178e2 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -27,6 +27,7 @@ limitations under the License.
#include <memory>
class TrexStream;
+class TrexStatelessPort;
/* all the RPC commands decl. goes here */
@@ -89,10 +90,17 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true, APIClass:
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1, false, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE,
+ int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result);
+ int parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result);
+ int parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result);
* stream cmds
@@ -144,5 +152,15 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_
TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE,
+ void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+ void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+ void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetARPRes, "set_arp_resolution", 2, false, APIClass::API_CLASS_TYPE_CORE);
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index cddf19b9..919be1f1 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -71,6 +71,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdPushRemote());
register_command(new TrexRpcCmdShutdown());
+ register_command(new TrexRpcCmdSetRxFeature());
+ register_command(new TrexRpcCmdGetRxQueuePkts());
+ register_command(new TrexRpcCmdSetARPRes());
diff --git a/src/stateful_rx_core.cpp b/src/stateful_rx_core.cpp
index 7ee802df..65d0a17d 100644
--- a/src/stateful_rx_core.cpp
+++ b/src/stateful_rx_core.cpp
@@ -677,58 +677,6 @@ void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp,
-// In VM, we receive the RX packets in DP core, and send message to RX core with the packet
-void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg) {
- assert(msg->m_latency_offset==0xdead);
- uint8_t rx_port_index=(thread_id<<1)+(msg->m_dir&1);
- assert( rx_port_index <m_max_ports ) ;
- CLatencyManagerPerPort * lp=&m_ports[rx_port_index];
- handle_rx_pkt(lp,(rte_mbuf_t *)msg->m_pkt);
-void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id,
- CNodeRing * r){
- while ( true ) {
- CGenNode * node;
- if ( r->Dequeue(node)!=0 ){
- break;
- }
- assert(node);
- CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
- uint8_t msg_type = msg->m_msg_type;
- switch (msg_type ) {
- case CGenNodeMsgBase::LATENCY_PKT:
- handle_latency_pkt_msg(thread_id,(CGenNodeLatencyPktInfo *) msg);
- break;
- default:
- printf("ERROR latency-thread message type is not valid %d \n",msg_type);
- assert(0);
- }
- CGlobalInfo::free_node(node);
- }
-// VM mode function. Handle messages from DP
-void CLatencyManager::try_rx_queues(){
- CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
- uint8_t threads=CMsgIns::Ins()->get_num_threads();
- int ti;
- for (ti=0; ti<(int)threads; ti++) {
- CNodeRing * r = rx_dp->getRingDpToCp(ti);
- if ( !r->isEmpty() ){
- run_rx_queue_msgs((uint8_t)ti,r);
- }
- }
void CLatencyManager::try_rx(){
rte_mbuf_t * rx_pkts[64];
int i;
@@ -790,8 +738,6 @@ void CLatencyManager::start(int iter, bool activate_watchdog) {
- bool do_try_rx_queue = CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
if (activate_watchdog) {
m_monitor.create("STF RX CORE", 1);
@@ -807,9 +753,6 @@ void CLatencyManager::start(int iter, bool activate_watchdog) {
if (dt> (0.0)) {
- if (do_try_rx_queue){
- try_rx_queues();
- }
diff --git a/src/stateful_rx_core.h b/src/stateful_rx_core.h
index 48fbeb97..2df406de 100644
--- a/src/stateful_rx_core.h
+++ b/src/stateful_rx_core.h
@@ -355,12 +355,8 @@ private:
double grat_arp_timeout();
void send_one_grat_arp();
void try_rx();
- void try_rx_queues();
- void run_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void wait_for_rx_dump();
void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m);
- /* messages handlers */
- void handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg);
pqueue_t m_p_queue; /* priorty queue */
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 9bb20990..7edf1a31 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -25,6 +25,7 @@ limitations under the License.
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
#include <common/captureFile.h>
+#include "trex_stateless_rx_defs.h"
#include <string>
@@ -156,9 +157,9 @@ private:
TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
- m_port_id = port_id;
- m_port_state = PORT_STATE_IDLE;
- m_platform_api = api;
+ m_port_id = port_id;
+ m_port_state = PORT_STATE_IDLE;
+ m_platform_api = api;
/* get the platform specific data */
api->get_interface_info(port_id, m_api_info);
@@ -584,10 +585,9 @@ TrexStatelessPort::get_max_stream_id() const {
-TrexStatelessPort::get_properties(std::string &driver, uint32_t &speed) {
+TrexStatelessPort::get_properties(std::string &driver) {
driver = m_api_info.driver_name;
- speed = m_platform_api->getPortAttrObj(m_port_id)->get_link_speed();
@@ -888,16 +888,6 @@ TrexStatelessPort::get_port_effective_rate(double &pps,
-TrexStatelessPort::get_macaddr(std::string &hw_macaddr,
- std::string &src_macaddr,
- std::string &dst_macaddr) {
- utl_macaddr_to_str(m_api_info.mac_info.hw_macaddr, hw_macaddr);
- utl_macaddr_to_str(m_api_info.mac_info.src_macaddr, src_macaddr);
- utl_macaddr_to_str(m_api_info.mac_info.dst_macaddr, dst_macaddr);
TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) {
pci_addr = m_api_info.pci_addr;
numa_node = m_api_info.numa_node;
@@ -944,6 +934,71 @@ TrexStatelessPort::remove_and_delete_all_streams() {
+TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
+ static MsgReply<bool> reply;
+ reply.reset();
+ TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
+ send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
+ /* as below, must wait for ACK from RX core before returning ACK */
+ reply.wait_for_reply();
+TrexStatelessPort::stop_rx_capture() {
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
+ send_message_to_rx(msg);
+TrexStatelessPort::start_rx_queue(uint64_t size) {
+ static MsgReply<bool> reply;
+ reply.reset();
+ TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+ /* we cannot return ACK to the user until the RX core has approved
+ this might cause the user to lose some packets from the queue
+ */
+ reply.wait_for_reply();
+TrexStatelessPort::stop_rx_queue() {
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
+ send_message_to_rx(msg);
+const RXPacketBuffer *
+TrexStatelessPort::get_rx_queue_pkts() {
+ static MsgReply<const RXPacketBuffer *> reply;
+ reply.reset();
+ TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+ return reply.wait_for_reply();
+TrexStatelessPort::rx_features_to_json() {
+ static MsgReply<Json::Value> reply;
+ reply.reset();
+ TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+ return reply.wait_for_reply();
/************* Trex Port Owner **************/
TrexPortOwner::TrexPortOwner() {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index e2a2aeba..74ab17f1 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -24,12 +24,15 @@ limitations under the License.
#include "common/basic_utils.h"
#include "internal_api/trex_platform_api.h"
#include "trex_dp_port_events.h"
+#include "trex_stateless_rx_defs.h"
#include "trex_stream.h"
class TrexStatelessCpToDpMsgBase;
class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
+class RXPacketBuffer;
* TRex port owner can perform
@@ -255,11 +258,8 @@ public:
* @author imarom (16-Sep-15)
* @param driver
- * @param speed
- void get_properties(std::string &driver, uint32_t &speed);
+ void get_properties(std::string &driver);
* encode stats as JSON
@@ -362,14 +362,58 @@ public:
double &bps_L2,
double &percentage);
+ void get_pci_info(std::string &pci_addr, int &numa_node);
- void get_macaddr(std::string &hw_macaddr,
- std::string &src_macaddr,
- std::string &dst_macaddr);
- void get_pci_info(std::string &pci_addr, int &numa_node);
+ /**
+ * enable RX capture on port
+ *
+ */
+ void start_rx_capture(const std::string &pcap_filename, uint64_t limit);
+ /**
+ * disable RX capture if on
+ *
+ */
+ void stop_rx_capture();
+ /**
+ * start RX queueing of packets
+ *
+ * @author imarom (11/7/2016)
+ *
+ * @param limit
+ */
+ void start_rx_queue(uint64_t limit);
+ /**
+ * stop RX queueing
+ *
+ * @author imarom (11/7/2016)
+ */
+ void stop_rx_queue();
+ /**
+ * fetch the RX queue packets from the queue
+ *
+ */
+ const RXPacketBuffer *get_rx_queue_pkts();
+ /**
+ * generate a JSON describing the status
+ * of the RX features
+ *
+ */
+ Json::Value rx_features_to_json();
+ /**
+ * return the port attribute object
+ *
+ */
+ TRexPortAttr *getPortAttrObj() {
+ return m_platform_api->getPortAttrObj(m_port_id);
+ }
bool is_core_active(int core_id);
@@ -401,7 +445,7 @@ private:
void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
* when a port stops, perform various actions
@@ -456,6 +500,7 @@ private:
TrexPortOwner m_owner;
int m_pending_async_stop_event;
@@ -502,9 +547,9 @@ public:
static const std::initializer_list<std::string> g_types;
static const std::initializer_list<std::string> g_ops;
- mul_type_e m_type;
- mul_op_e m_op;
- double m_value;
+ mul_type_e m_type;
+ mul_op_e m_op;
+ double m_value;
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 95613b41..17acb21e 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -241,13 +241,15 @@ TrexDpPortEventMsg::handle() {
/************************* messages from CP to RX **********************/
-bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) {
- rx_core->work();
+bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) {
+ rx_core->enable_latency();
+ m_reply.set_reply(true);
return true;
-bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) {
- rx_core->idle();
+bool TrexStatelessRxDisableLatency::handle (CRxCoreStateless *rx_core) {
+ rx_core->disable_latency();
return true;
@@ -255,3 +257,62 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
return true;
+TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
+ rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
+ /* mark as done */
+ m_reply.set_reply(true);
+ return true;
+TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
+ rx_core->stop_recorder(m_port_id);
+ return true;
+TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->start_queue(m_port_id, m_size);
+ /* mark as done */
+ m_reply.set_reply(true);
+ return true;
+TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->stop_queue(m_port_id);
+ return true;
+TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
+ const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+ /* set the reply */
+ m_reply.set_reply(pkt_buffer);
+ return true;
+TrexStatelessRxFeaturesToJson::handle(CRxCoreStateless *rx_core) {
+ Json::Value output = rx_core->get_rx_port_mngr(m_port_id).to_json();
+ /* set the reply */
+ m_reply.set_reply(output);
+ return true;
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index fb2c27ab..79a6bf08 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -24,11 +24,66 @@ limitations under the License.
#include "msg_manager.h"
#include "trex_dp_port_events.h"
+#include "trex_exception.h"
+#include "trex_stateless_rx_defs.h"
+#include "os_time.h"
class TrexStatelessDpCore;
class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
+class RXPacketBuffer;
+ * Generic message reply object
+ *
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class MsgReply {
+ MsgReply() {
+ reset();
+ }
+ void reset() {
+ m_pending = true;
+ }
+ bool is_pending() const {
+ return m_pending;
+ }
+ void set_reply(const T &reply) {
+ m_reply = reply;
+ /* before marking as done make sure all stores are committed */
+ asm volatile("mfence" ::: "memory");
+ m_pending = false;
+ }
+ T wait_for_reply(int timeout_ms = 500, int backoff_ms = 1) {
+ int guard = timeout_ms;
+ while (is_pending()) {
+ guard -= backoff_ms;
+ if (guard < 0) {
+ throw TrexException("timeout: failed to get reply from core");
+ }
+ delay(backoff_ms);
+ }
+ return m_reply;
+ }
+ volatile bool m_pending;
+ T m_reply;
* defines the base class for CP to DP messages
@@ -312,7 +367,7 @@ private:
/************************* messages from DP to CP **********************/
- * defines the base class for CP to DP messages
+ * defines the base class for DP to CP messages
* @author imarom (27-Oct-15)
@@ -404,11 +459,19 @@ public:
-class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxEnableLatency(MsgReply<bool> &reply) : m_reply(reply) {
+ }
bool handle (CRxCoreStateless *rx_core);
+ MsgReply<bool> &m_reply;
-class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
@@ -416,4 +479,114 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxStartCapture(uint8_t port_id,
+ const std::string &pcap_filename,
+ uint64_t limit,
+ MsgReply<bool> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ m_limit = limit;
+ m_pcap_filename = pcap_filename;
+ }
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+ std::string m_pcap_filename;
+ uint64_t m_limit;
+ MsgReply<bool> &m_reply;
+class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxStopCapture(uint8_t port_id) {
+ m_port_id = port_id;
+ }
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxStartQueue(uint8_t port_id,
+ uint64_t size,
+ MsgReply<bool> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ m_size = size;
+ }
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+ uint64_t m_size;
+ MsgReply<bool> &m_reply;
+class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxStopQueue(uint8_t port_id) {
+ m_port_id = port_id;
+ }
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ }
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+ MsgReply<const RXPacketBuffer *> &m_reply;
+ * a request from RX core to dump to Json the RX features
+ */
+class TrexStatelessRxFeaturesToJson : public TrexStatelessCpToRxMsgBase {
+ TrexStatelessRxFeaturesToJson(uint8_t port_id, MsgReply<Json::Value> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ }
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(CRxCoreStateless *rx_core);
+ uint8_t m_port_id;
+ MsgReply<Json::Value> &m_reply;
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index d162c5b3..f2061bf7 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -26,6 +26,8 @@
#include "pal/linux/sanb_atomic.h"
#include "trex_stateless_messaging.h"
#include "trex_stateless_rx_core.h"
+#include "trex_stateless.h"
void CRFC2544Info::create() {
@@ -64,15 +66,7 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
-void CCPortLatencyStl::reset() {
- for (int i = 0; i < MAX_FLOW_STATS; i++) {
- m_rx_pg_stat[i].clear();
- m_rx_pg_stat_payload[i].clear();
- }
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
- m_rcv_all = false;
m_capture = false;
m_max_ports = cfg.m_max_ports;
@@ -82,15 +76,19 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_ring_to_cp = cp_rx->getRingDpToCp(0);
m_state = STATE_IDLE;
- for (int i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- lp->m_io = cfg.m_ports[i];
- lp->m_port.reset();
+ for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
+ m_rfc2544[i].create();
- for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
- m_rfc2544[i].create();
+ /* create per port manager */
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].create(cfg.m_ports[i],
+ m_rfc2544,
+ &m_err_cntrs,
+ &m_cpu_dp_u,
+ cfg.m_num_crc_fix_bytes);
@@ -124,10 +122,31 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
+ /* a message might result in a change of state */
+ recalculate_next_state();
return true;
+void CRxCoreStateless::recalculate_next_state() {
+ if (m_state == STATE_QUIT) {
+ return;
+ }
+ /* next state is determine by the question are there any ports with active features ? */
+ m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE);
+bool CRxCoreStateless::are_any_features_active() {
+ for (int i = 0; i < m_max_ports; i++) {
+ if (m_rx_port_mngr[i].has_features_set()) {
+ return true;
+ }
+ }
+ return false;
void CRxCoreStateless::idle_state_loop() {
const int SHORT_DELAY_MS = 2;
const int LONG_DELAY_MS = 50;
@@ -141,7 +160,7 @@ void CRxCoreStateless::idle_state_loop() {
counter = 0;
} else {
- flush_rx();
+ flush_all_pending_pkts();
/* enter deep sleep only if enough time had passed */
@@ -154,284 +173,88 @@ void CRxCoreStateless::idle_state_loop() {
-void CRxCoreStateless::start() {
- int count = 0;
- int i = 0;
- bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
- /* register a watchdog handle on current core */
- m_monitor.create("STL RX CORE", 1);
- TrexWatchDog::getInstance().register_monitor(&m_monitor);
- while (true) {
- if (m_state == STATE_WORKING) {
- i++;
- if (i == 100000) { // approx 10msec
- i = 0;
- periodic_check_for_cp_messages(); // m_state might change in here
- }
- } else {
- if (m_state == STATE_QUIT)
- break;
- count = 0;
- i = 0;
- set_working_msg_ack(false);
- idle_state_loop();
- set_working_msg_ack(true);
- }
- if (do_try_rx_queue) {
- try_rx_queues();
- }
- count += try_rx();
+ * for each port give a tick (for flushing if needed)
+ *
+ */
+void CRxCoreStateless::port_manager_tick() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].tick();
- rte_pause();
- m_monitor.disable();
-void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) {
- CFlowStatParser parser;
- if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
- uint32_t ip_id;
- if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) {
- if (m_rcv_all || is_flow_stat_id(ip_id)) {
- uint16_t hw_id;
- if (m_rcv_all || is_flow_stat_payload_id(ip_id)) {
- bool good_packet = true;
- uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
- struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
- (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
- hw_id = fsp_head->hw_id;
- CRFC2544Info *curr_rfc2544;
- if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
- good_packet = false;
- if (!m_rcv_all)
- m_err_cntrs.m_bad_header++;
- } else {
- curr_rfc2544 = &m_rfc2544[hw_id];
- if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
- // bad flow seq num
- // Might be the first packet of a new flow, packet from an old flow, or garbage.
- if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
- // packet from previous flow using this hw_id that arrived late
- good_packet = false;
- m_err_cntrs.m_old_flow++;
- } else {
- if (curr_rfc2544->no_flow_seq()) {
- // first packet we see from this flow
- good_packet = true;
- curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
- } else {
- // garbage packet
- good_packet = false;
- m_err_cntrs.m_bad_header++;
- }
- }
- }
- }
- if (good_packet) {
- uint32_t pkt_seq = fsp_head->seq;
- uint32_t exp_seq = curr_rfc2544->get_seq();
- if (unlikely(pkt_seq != exp_seq)) {
- if (pkt_seq < exp_seq) {
- if (exp_seq - pkt_seq > 100000) {
- // packet loss while we had wrap around
- curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544->inc_seq_err_too_big();
- curr_rfc2544->set_seq(pkt_seq + 1);
- } else {
- if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544->inc_dup();
- } else {
- curr_rfc2544->inc_ooo();
- // We thought it was lost, but it was just out of order
- curr_rfc2544->dec_seq_err();
- }
- curr_rfc2544->inc_seq_err_too_low();
- }
- } else {
- if (unlikely (pkt_seq - exp_seq > 100000)) {
- // packet reorder while we had wrap around
- if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544->inc_dup();
- } else {
- curr_rfc2544->inc_ooo();
- // We thought it was lost, but it was just out of order
- curr_rfc2544->dec_seq_err();
- }
- curr_rfc2544->inc_seq_err_too_low();
- } else {
- // seq > curr_rfc2544->seq. Assuming lost packets
- curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544->inc_seq_err_too_big();
- curr_rfc2544->set_seq(pkt_seq + 1);
- }
- }
- } else {
- curr_rfc2544->set_seq(pkt_seq + 1);
- }
- lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
- uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
- dsec_t ctime = ptime_convert_hr_dsec(d);
- curr_rfc2544->add_sample(ctime);
- }
- } else {
- hw_id = get_hw_id(ip_id);
- if (hw_id < MAX_FLOW_STATS) {
- lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
- }
- }
- }
+void CRxCoreStateless::handle_work_stage() {
+ /* set the next sync time to */
+ dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
+ while (m_state == STATE_WORKING) {
+ process_all_pending_pkts();
+ dsec_t now = now_sec();
+ if ( (now - sync_time_sec) > 0 ) {
+ periodic_check_for_cp_messages();
+ port_manager_tick();
+ sync_time_sec = now + (1.0 / 1000);
+ rte_pause();
-void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
+void CRxCoreStateless::start() {
+ /* register a watchdog handle on current core */
+ m_monitor.create("STL RX CORE", 1);
+ TrexWatchDog::getInstance().register_monitor(&m_monitor);
-// In VM setup, handle packets coming as messages from DP cores.
-void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
- while ( true ) {
- CGenNode * node;
- if ( r->Dequeue(node) != 0 ) {
+ while (m_state != STATE_QUIT) {
+ switch (m_state) {
+ case STATE_IDLE:
+ idle_state_loop();
- }
- assert(node);
- CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node;
- CGenNodeLatencyPktInfo * l_msg;
- uint8_t msg_type = msg->m_msg_type;
- uint8_t rx_port_index;
- CLatencyManagerPerPortStl * lp;
- switch (msg_type) {
- case CGenNodeMsgBase::LATENCY_PKT:
- l_msg = (CGenNodeLatencyPktInfo *)msg;
- assert(l_msg->m_latency_offset == 0xdead);
- rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
- assert( rx_port_index < m_max_ports );
- lp = &m_ports[rx_port_index];
- handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt);
- if (m_capture)
- capture_pkt((rte_mbuf_t *)l_msg->m_pkt);
- rte_pktmbuf_free((rte_mbuf_t *)l_msg->m_pkt);
+ handle_work_stage();
- printf("ERROR latency-thread message type is not valid %d \n", msg_type);
+ break;
- CGlobalInfo::free_node(node);
-// VM mode function. Handle messages from DP
-void CRxCoreStateless::try_rx_queues() {
- CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
- uint8_t threads=CMsgIns::Ins()->get_num_threads();
- int ti;
- for (ti = 0; ti < (int)threads; ti++) {
- CNodeRing * r = rx_dp->getRingDpToCp(ti);
- if ( ! r->isEmpty() ) {
- handle_rx_queue_msgs((uint8_t)ti, r);
- }
- }
+ m_monitor.disable();
-// exactly the same as try_rx, without the handle_rx_pkt
-// purpose is to flush rx queues when core is in idle state
-void CRxCoreStateless::flush_rx() {
- rte_mbuf_t * rx_pkts[64];
- int i, total_pkts = 0;
- for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- rte_mbuf_t * m;
- /* try to read 64 packets clean up the queue */
- uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
- total_pkts += cnt_p;
- if (cnt_p) {
- m_cpu_dp_u.start_work1();
- int j;
- for (j = 0; j < cnt_p; j++) {
- m = rx_pkts[j];
- rte_pktmbuf_free(m);
- }
- /* commit only if there was work to do ! */
- m_cpu_dp_u.commit1();
- }/* if work */
- }// all ports
+void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
-int CRxCoreStateless::try_rx() {
- rte_mbuf_t * rx_pkts[64];
- int i, total_pkts = 0;
- for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPortStl * lp = &m_ports[i];
- rte_mbuf_t * m;
- /* try to read 64 packets clean up the queue */
- uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
- total_pkts += cnt_p;
- if (cnt_p) {
- m_cpu_dp_u.start_work1();
- int j;
- for (j = 0; j < cnt_p; j++) {
- m = rx_pkts[j];
- handle_rx_pkt(lp, m);
- rte_pktmbuf_free(m);
- }
- /* commit only if there was work to do ! */
- m_cpu_dp_u.commit1();
- }/* if work */
- }// all ports
- return total_pkts;
-bool CRxCoreStateless::is_flow_stat_id(uint32_t id) {
- if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
- return false;
+int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
-bool CRxCoreStateless::is_flow_stat_payload_id(uint32_t id) {
- if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
- return false;
+ int total_pkts = 0;
+ for (int i = 0; i < m_max_ports; i++) {
+ total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx);
+ }
+ return total_pkts;
-uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
- return (0x00ff & id);
void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
- for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
- }
+ m_rx_port_mngr[port_id].clear_stats();
int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
, bool reset, TrexPlatformApi::driver_stat_cap_e type) {
- for (int hw_id = min; hw_id <= max; hw_id++) {
- if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id];
- } else {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
- }
- if (reset) {
- if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear();
- } else {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
- }
- }
- }
- return 0;
+ /* for now only latency stats */
+ m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type);
+ return (0);
int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
@@ -458,13 +281,6 @@ int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
return 0;
-void CRxCoreStateless::set_working_msg_ack(bool val) {
- sanb_smp_memory_barrier();
- m_ack_start_work_msg = val;
- sanb_smp_memory_barrier();
void CRxCoreStateless::update_cpu_util(){
@@ -472,3 +288,53 @@ void CRxCoreStateless::update_cpu_util(){
double CRxCoreStateless::get_cpu_util() {
return m_cpu_cp_u.GetVal();
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
+ m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
+ recalculate_next_state();
+CRxCoreStateless::stop_recorder(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_recorder();
+ recalculate_next_state();
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
+ m_rx_port_mngr[port_id].start_queue(size);
+ recalculate_next_state();
+CRxCoreStateless::stop_queue(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_queue();
+ recalculate_next_state();
+CRxCoreStateless::enable_latency() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].enable_latency();
+ }
+ recalculate_next_state();
+CRxCoreStateless::disable_latency() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].disable_latency();
+ }
+ recalculate_next_state();
+const RXPortManager &
+CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
+ assert(port_id < m_max_ports);
+ return m_rx_port_mngr[port_id];
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 3f9fb6cc..cd16bb8a 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -25,6 +25,7 @@
#include "os_time.h"
#include "pal/linux/sanb_atomic.h"
#include "utl_cpuu.h"
+#include "trex_stateless_rx_port_mngr.h"
class TrexStatelessCpToRxMsgBase;
@@ -37,25 +38,6 @@ class CCPortLatencyStl {
rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD];
-class CLatencyManagerPerPortStl {
- CCPortLatencyStl m_port;
- CPortLatencyHWBase * m_io;
-class CRxSlCfg {
- public:
- CRxSlCfg (){
- m_max_ports = 0;
- m_cps = 0.0;
- }
- public:
- uint32_t m_max_ports;
- double m_cps;
- CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
class CRFC2544Info {
void create();
@@ -109,12 +91,20 @@ class CRxCoreErrCntrs {
m_old_flow = 0;
- private:
+ public:
uint64_t m_bad_header;
uint64_t m_old_flow;
+ * stateless RX core
+ *
+ */
class CRxCoreStateless {
+ /**
+ * core states
+ */
enum state_e {
@@ -129,47 +119,78 @@ class CRxCoreStateless {
, TrexPlatformApi::driver_stat_cap_e type);
int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset);
int get_rx_err_cntrs(CRxCoreErrCntrs *rx_err);
- void work() {
- m_state = STATE_WORKING;
- m_err_cntrs.reset(); // When starting to work, reset global counters
- }
- void idle() {m_state = STATE_IDLE;}
void quit() {m_state = STATE_QUIT;}
- bool is_working() const {return (m_ack_start_work_msg == true);}
- void set_working_msg_ack(bool val);
+ bool is_working() const {return (m_state == STATE_WORKING);}
double get_cpu_util();
void update_cpu_util();
+ const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ return m_rx_port_mngr[port_id].get_pkt_buffer();
+ }
+ /**
+ * start capturing of RX packets on a specific port
+ *
+ */
+ void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
+ void stop_recorder(uint8_t port_id);
+ /**
+ * start RX queueing of packets
+ *
+ */
+ void start_queue(uint8_t port_id, uint64_t size);
+ void stop_queue(uint8_t port_id);
+ /**
+ * enable latency feature for RX packets
+ * will be apply to all ports
+ */
+ void enable_latency();
+ void disable_latency();
+ const RXPortManager &get_rx_port_mngr(uint8_t port_id);
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
void tickle();
void idle_state_loop();
- void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m);
+ void recalculate_next_state();
+ bool are_any_features_active();
void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
- void flush_rx();
- int try_rx();
- void try_rx_queues();
- bool is_flow_stat_id(uint32_t id);
- bool is_flow_stat_payload_id(uint32_t id);
- uint16_t get_hw_id(uint16_t id);
+ void handle_work_stage();
+ void port_manager_tick();
+ int process_all_pending_pkts(bool flush_rx = false);
+ void flush_all_pending_pkts() {
+ process_all_pending_pkts(true);
+ }
+ void try_rx_queues();
- TrexMonitor m_monitor;
- uint32_t m_max_ports;
- bool m_capture;
- bool m_rcv_all;
- CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS];
- state_e m_state;
- CNodeRing *m_ring_from_cp;
- CNodeRing *m_ring_to_cp;
- CCpuUtlDp m_cpu_dp_u;
- CCpuUtlCp m_cpu_cp_u;
+ TrexMonitor m_monitor;
+ uint32_t m_max_ports;
+ bool m_capture;
+ state_e m_state;
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+ CCpuUtlDp m_cpu_dp_u;
+ CCpuUtlCp m_cpu_cp_u;
// Used for acking "work" (go out of idle) messages from cp
volatile bool m_ack_start_work_msg __rte_cache_aligned;
CRxCoreErrCntrs m_err_cntrs;
+ RXPortManager m_rx_port_mngr[TREX_MAX_PORTS];
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
new file mode 100644
index 00000000..aefcc133
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -0,0 +1,62 @@
+ Itay Marom
+ Cisco Systems, Inc.
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#include "trex_defs.h"
+#include <json/json.h>
+class CPortLatencyHWBase;
+ * general SL cfg
+ *
+ */
+class CRxSlCfg {
+ public:
+ CRxSlCfg (){
+ m_max_ports = 0;
+ m_cps = 0.0;
+ m_num_crc_fix_bytes = 0;
+ }
+ public:
+ uint32_t m_max_ports;
+ double m_cps;
+ CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
+ uint8_t m_num_crc_fix_bytes;
+ * describes the filter type applied to the RX
+ * RX_FILTER_MODE_HW - only hardware filtered traffic will
+ * reach the RX core
+ *
+ */
+typedef enum rx_filter_mode_ {
+} rx_filter_mode_e;
+#endif /* __TREX_STATELESS_RX_DEFS_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
new file mode 100644
index 00000000..afc6827c
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -0,0 +1,515 @@
+ Itay Marom
+ Cisco Systems, Inc.
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#include "bp_sim.h"
+#include "trex_stateless_rx_port_mngr.h"
+#include "common/captureFile.h"
+#include "trex_stateless_rx_core.h"
+ * latency RX feature
+ *
+ *************************************/
+RXLatency::RXLatency() {
+ m_rcv_all = false;
+ m_rfc2544 = NULL;
+ m_err_cntrs = NULL;
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
+ m_rfc2544 = rfc2544;
+ m_err_cntrs = err_cntrs;
+RXLatency::handle_pkt(const rte_mbuf_t *m) {
+ CFlowStatParser parser;
+ if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
+ uint32_t ip_id;
+ if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) {
+ if (m_rcv_all || is_flow_stat_id(ip_id)) {
+ uint16_t hw_id;
+ if (m_rcv_all || is_flow_stat_payload_id(ip_id)) {
+ bool good_packet = true;
+ uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
+ struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
+ (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
+ hw_id = fsp_head->hw_id;
+ CRFC2544Info *curr_rfc2544;
+ if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
+ good_packet = false;
+ if (!m_rcv_all)
+ m_err_cntrs->m_bad_header++;
+ } else {
+ curr_rfc2544 = &m_rfc2544[hw_id];
+ if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
+ // bad flow seq num
+ // Might be the first packet of a new flow, packet from an old flow, or garbage.
+ if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
+ // packet from previous flow using this hw_id that arrived late
+ good_packet = false;
+ m_err_cntrs->m_old_flow++;
+ } else {
+ if (curr_rfc2544->no_flow_seq()) {
+ // first packet we see from this flow
+ good_packet = true;
+ curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
+ } else {
+ // garbage packet
+ good_packet = false;
+ m_err_cntrs->m_bad_header++;
+ }
+ }
+ }
+ }
+ if (good_packet) {
+ uint32_t pkt_seq = fsp_head->seq;
+ uint32_t exp_seq = curr_rfc2544->get_seq();
+ if (unlikely(pkt_seq != exp_seq)) {
+ if (pkt_seq < exp_seq) {
+ if (exp_seq - pkt_seq > 100000) {
+ // packet loss while we had wrap around
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ } else {
+ if (pkt_seq == (exp_seq - 1)) {
+ curr_rfc2544->inc_dup();
+ } else {
+ curr_rfc2544->inc_ooo();
+ // We thought it was lost, but it was just out of order
+ curr_rfc2544->dec_seq_err();
+ }
+ curr_rfc2544->inc_seq_err_too_low();
+ }
+ } else {
+ if (unlikely (pkt_seq - exp_seq > 100000)) {
+ // packet reorder while we had wrap around
+ if (pkt_seq == (exp_seq - 1)) {
+ curr_rfc2544->inc_dup();
+ } else {
+ curr_rfc2544->inc_ooo();
+ // We thought it was lost, but it was just out of order
+ curr_rfc2544->dec_seq_err();
+ }
+ curr_rfc2544->inc_seq_err_too_low();
+ } else {
+ // seq > curr_rfc2544->seq. Assuming lost packets
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ }
+ }
+ } else {
+ curr_rfc2544->set_seq(pkt_seq + 1);
+ }
+ m_rx_pg_stat_payload[hw_id].add_pkts(1);
+ m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
+ uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
+ dsec_t ctime = ptime_convert_hr_dsec(d);
+ curr_rfc2544->add_sample(ctime);
+ }
+ } else {
+ hw_id = get_hw_id(ip_id);
+ if (hw_id < MAX_FLOW_STATS) {
+ m_rx_pg_stat[hw_id].add_pkts(1);
+ m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
+ }
+ }
+ }
+ }
+ }
+RXLatency::reset_stats() {
+ for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
+ m_rx_pg_stat[hw_id].clear();
+ }
+RXLatency::get_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type) {
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ rx_stats[hw_id - min] = m_rx_pg_stat_payload[hw_id];
+ } else {
+ rx_stats[hw_id - min] = m_rx_pg_stat[hw_id];
+ }
+ if (reset) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ m_rx_pg_stat_payload[hw_id].clear();
+ } else {
+ m_rx_pg_stat[hw_id].clear();
+ }
+ }
+ }
+RXLatency::to_json() const {
+ return Json::objectValue;
+ * RX feature queue
+ *
+ *************************************/
+RXPacketBuffer::RXPacketBuffer(uint64_t size) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ /* generate queue */
+ m_buffer = new RXPacket*[m_size](); // zeroed
+RXPacketBuffer::~RXPacketBuffer() {
+ assert(m_buffer);
+ while (!is_empty()) {
+ RXPacket *pkt = pop();
+ delete pkt;
+ }
+ delete [] m_buffer;
+RXPacketBuffer::push(const rte_mbuf_t *m) {
+ /* if full - pop the oldest */
+ if (is_full()) {
+ delete pop();
+ }
+ /* push packet */
+ m_buffer[m_head] = new RXPacket(m);
+ m_head = next(m_head);
+RXPacket *
+RXPacketBuffer::pop() {
+ assert(!is_empty());
+ RXPacket *pkt = m_buffer[m_tail];
+ m_tail = next(m_tail);
+ return pkt;
+RXPacketBuffer::get_element_count() const {
+ if (m_head >= m_tail) {
+ return (m_head - m_tail);
+ } else {
+ return ( get_capacity() - (m_tail - m_head - 1) );
+ }
+RXPacketBuffer::to_json() const {
+ Json::Value output = Json::arrayValue;
+ int tmp = m_tail;
+ while (tmp != m_head) {
+ RXPacket *pkt = m_buffer[tmp];
+ output.append(pkt->to_json());
+ tmp = next(tmp);
+ }
+ return output;
+RXQueue::start(uint64_t size) {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ }
+ m_pkt_buffer = new RXPacketBuffer(size);
+RXQueue::stop() {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ m_pkt_buffer = NULL;
+ }
+const RXPacketBuffer *
+RXQueue::fetch() {
+ /* if no buffer or the buffer is empty - give a NULL one */
+ if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) {
+ return nullptr;
+ }
+ /* hold a pointer to the old one */
+ RXPacketBuffer *old_buffer = m_pkt_buffer;
+ /* replace the old one with a new one and freeze the old */
+ m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
+ return old_buffer;
+RXQueue::handle_pkt(const rte_mbuf_t *m) {
+ m_pkt_buffer->push(m);
+RXQueue::to_json() const {
+ assert(m_pkt_buffer != NULL);
+ Json::Value output = Json::objectValue;
+ output["size"] = Json::UInt64(m_pkt_buffer->get_capacity());
+ output["count"] = Json::UInt64(m_pkt_buffer->get_element_count());
+ return output;
+ * RX feature recorder
+ *
+ *************************************/
+RXPacketRecorder::RXPacketRecorder() {
+ m_writer = NULL;
+ m_count = 0;
+ m_limit = 0;
+ m_epoch = -1;
+ m_pending_flush = false;
+RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
+ m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
+ if (m_writer == NULL) {
+ std::stringstream ss;
+ ss << "unable to create PCAP file: " << pcap;
+ throw TrexException(ss.str());
+ }
+ assert(limit > 0);
+ m_limit = limit;
+ m_count = 0;
+ m_pending_flush = false;
+ m_pcap_filename = pcap;
+RXPacketRecorder::stop() {
+ if (!m_writer) {
+ return;
+ }
+ delete m_writer;
+ m_writer = NULL;
+RXPacketRecorder::flush_to_disk() {
+ if (m_writer && m_pending_flush) {
+ m_writer->flush_to_disk();
+ m_pending_flush = false;
+ }
+RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
+ if (!m_writer) {
+ return;
+ }
+ dsec_t now = now_sec();
+ if (m_epoch < 0) {
+ m_epoch = now;
+ }
+ dsec_t dt = now - m_epoch;
+ CPktNsecTimeStamp t_c(dt);
+ m_pkt.time_nsec = t_c.m_time_nsec;
+ m_pkt.time_sec = t_c.m_time_sec;
+ const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
+ m_pkt.pkt_len = m->pkt_len;
+ memcpy(m_pkt.raw, p, m->pkt_len);
+ m_writer->write_packet(&m_pkt);
+ m_count++;
+ m_pending_flush = true;
+ if (m_count == m_limit) {
+ stop();
+ }
+RXPacketRecorder::to_json() const {
+ Json::Value output = Json::objectValue;
+ output["pcap_filename"] = m_pcap_filename;
+ output["limit"] = Json::UInt64(m_limit);
+ output["count"] = Json::UInt64(m_count);
+ return output;
+ * Port manager
+ *
+ *************************************/
+RXPortManager::RXPortManager() {
+ clear_all_features();
+ m_io = NULL;
+ m_cpu_dp_u = NULL;
+RXPortManager::create(CPortLatencyHWBase *io,
+ CRFC2544Info *rfc2544,
+ CRxCoreErrCntrs *err_cntrs,
+ CCpuUtlDp *cpu_util,
+ uint8_t crc_bytes_num) {
+ m_io = io;
+ m_cpu_dp_u = cpu_util;
+ m_num_crc_fix_bytes = crc_bytes_num;
+ /* init features */
+ m_latency.create(rfc2544, err_cntrs);
+void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
+ /* handle features */
+ if (is_feature_set(LATENCY)) {
+ m_latency.handle_pkt(m);
+ }
+ if (is_feature_set(RECORDER)) {
+ m_recorder.handle_pkt(m);
+ }
+ if (is_feature_set(QUEUE)) {
+ m_queue.handle_pkt(m);
+ }
+int RXPortManager::process_all_pending_pkts(bool flush_rx) {
+ rte_mbuf_t *rx_pkts[64];
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = m_io->rx_burst(rx_pkts, 64);
+ if (cnt_p == 0) {
+ return cnt_p;
+ }
+ m_cpu_dp_u->start_work1();
+ for (int j = 0; j < cnt_p; j++) {
+ rte_mbuf_t *m = rx_pkts[j];
+ if (!flush_rx) {
+ // patch relevant only for e1000 driver
+ if (m_num_crc_fix_bytes) {
+ rte_pktmbuf_trim(m, m_num_crc_fix_bytes);
+ }
+ handle_pkt(m);
+ }
+ rte_pktmbuf_free(m);
+ }
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u->commit1();
+ return cnt_p;
+RXPortManager::tick() {
+ if (is_feature_set(RECORDER)) {
+ m_recorder.flush_to_disk();
+ }
+RXPortManager::to_json() const {
+ Json::Value output = Json::objectValue;
+ if (is_feature_set(LATENCY)) {
+ output["latency"] = m_latency.to_json();
+ output["latency"]["is_active"] = true;
+ } else {
+ output["latency"]["is_active"] = false;
+ }
+ if (is_feature_set(RECORDER)) {
+ output["sniffer"] = m_recorder.to_json();
+ output["sniffer"]["is_active"] = true;
+ } else {
+ output["sniffer"]["is_active"] = false;
+ }
+ if (is_feature_set(QUEUE)) {
+ output["queue"] = m_queue.to_json();
+ output["queue"]["is_active"] = true;
+ } else {
+ output["queue"]["is_active"] = false;
+ }
+ return output;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
new file mode 100644
index 00000000..c049cb56
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -0,0 +1,416 @@
+ Itay Marom
+ Cisco Systems, Inc.
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#include <stdint.h>
+#include "common/base64.h"
+#include "common/captureFile.h"
+class CPortLatencyHWBase;
+class CRFC2544Info;
+class CRxCoreErrCntrs;
+ * RX feature latency
+ *
+ *************************************/
+class RXLatency {
+ RXLatency();
+ void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs);
+ void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+ void get_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type);
+ void reset_stats();
+ bool is_flow_stat_id(uint32_t id) {
+ if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
+ return false;
+ }
+ bool is_flow_stat_payload_id(uint32_t id) {
+ if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
+ return false;
+ }
+ uint16_t get_hw_id(uint16_t id) {
+ return (0x00ff & id);
+ rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
+ rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD];
+ bool m_rcv_all;
+ CRFC2544Info *m_rfc2544;
+ CRxCoreErrCntrs *m_err_cntrs;
+ * describes a single saved RX packet
+ *
+ */
+class RXPacket {
+ RXPacket(const rte_mbuf_t *m) {
+ /* assume single part packet */
+ assert(m->nb_segs == 1);
+ m_size = m->pkt_len;
+ const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
+ m_raw = new uint8_t[m_size];
+ memcpy(m_raw, p, m_size);
+ /* generate a packet timestamp */
+ m_timestamp = now_sec();
+ }
+ /* slow path and also RVO - pass by value is ok */
+ Json::Value to_json() {
+ Json::Value output;
+ output["ts"] = m_timestamp;
+ output["binary"] = base64_encode(m_raw, m_size);
+ return output;
+ }
+ ~RXPacket() {
+ if (m_raw) {
+ delete [] m_raw;
+ }
+ }
+ uint8_t *m_raw;
+ uint16_t m_size;
+ dsec_t m_timestamp;
+ * RX feature queue
+ *
+ *************************************/
+class RXPacketBuffer {
+ RXPacketBuffer(uint64_t size);
+ ~RXPacketBuffer();
+ /**
+ * push a packet to the buffer
+ *
+ */
+ void push(const rte_mbuf_t *m);
+ /**
+ * generate a JSON output of the queue
+ *
+ */
+ Json::Value to_json() const;
+ bool is_empty() const {
+ return (m_head == m_tail);
+ }
+ bool is_full() const {
+ return ( next(m_head) == m_tail);
+ }
+ /**
+ * return the total amount of space possible
+ */
+ uint64_t get_capacity() const {
+ /* one slot is used for diff between full/empty */
+ return (m_size - 1);
+ }
+ /**
+ * returns how many elements are in the queue
+ */
+ uint64_t get_element_count() const;
+ int next(int v) const {
+ return ( (v + 1) % m_size );
+ }
+ /* pop in case of full queue - internal usage */
+ RXPacket * pop();
+ int m_head;
+ int m_tail;
+ int m_size;
+ RXPacket **m_buffer;
+class RXQueue {
+ RXQueue() {
+ m_pkt_buffer = nullptr;
+ }
+ ~RXQueue() {
+ stop();
+ }
+ /**
+ * start RX queue
+ *
+ */
+ void start(uint64_t size);
+ /**
+ * fetch the current buffer
+ * return NULL if no packets
+ */
+ const RXPacketBuffer * fetch();
+ /**
+ * stop RX queue
+ *
+ */
+ void stop();
+ void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+ RXPacketBuffer *m_pkt_buffer;
+ * RX feature PCAP recorder
+ *
+ *************************************/
+class RXPacketRecorder {
+ RXPacketRecorder();
+ ~RXPacketRecorder() {
+ stop();
+ }
+ void start(const std::string &pcap, uint64_t limit);
+ void stop();
+ void handle_pkt(const rte_mbuf_t *m);
+ /**
+ * flush any cached packets to disk
+ *
+ */
+ void flush_to_disk();
+ Json::Value to_json() const;
+ CFileWriterBase *m_writer;
+ std::string m_pcap_filename;
+ CCapPktRaw m_pkt;
+ dsec_t m_epoch;
+ uint64_t m_limit;
+ uint64_t m_count;
+ bool m_pending_flush;
+/************************ manager ***************************/
+ * per port RX features manager
+ *
+ * @author imarom (10/30/2016)
+ */
+class RXPortManager {
+ enum feature_t {
+ NO_FEATURES = 0x0,
+ LATENCY = 0x1,
+ RECORDER = 0x2,
+ QUEUE = 0x4
+ };
+ RXPortManager();
+ void create(CPortLatencyHWBase *io,
+ CRFC2544Info *rfc2544,
+ CRxCoreErrCntrs *err_cntrs,
+ CCpuUtlDp *cpu_util,
+ uint8_t crc_bytes_num);
+ void clear_stats() {
+ m_latency.reset_stats();
+ }
+ void get_latency_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type) {
+ return m_latency.get_stats(rx_stats, min, max, reset, type);
+ }
+ RXLatency & get_latency() {
+ return m_latency;
+ }
+ /* latency */
+ void enable_latency() {
+ set_feature(LATENCY);
+ }
+ void disable_latency() {
+ unset_feature(LATENCY);
+ }
+ /* recorder */
+ void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
+ m_recorder.start(pcap, limit_pkts);
+ set_feature(RECORDER);
+ }
+ void stop_recorder() {
+ m_recorder.stop();
+ unset_feature(RECORDER);
+ }
+ /* queue */
+ void start_queue(uint32_t size) {
+ m_queue.start(size);
+ set_feature(QUEUE);
+ }
+ void stop_queue() {
+ m_queue.stop();
+ unset_feature(QUEUE);
+ }
+ const RXPacketBuffer *get_pkt_buffer() {
+ if (!is_feature_set(QUEUE)) {
+ return nullptr;
+ }
+ return m_queue.fetch();
+ }
+ /**
+ * fetch and process all packets
+ *
+ */
+ int process_all_pending_pkts(bool flush_rx = false);
+ /**
+ * flush all pending packets without processing them
+ *
+ */
+ void flush_all_pending_pkts() {
+ process_all_pending_pkts(true);
+ }
+ /**
+ * handle a single packet
+ *
+ */
+ void handle_pkt(const rte_mbuf_t *m);
+ /**
+ * maintenance
+ *
+ * @author imarom (11/24/2016)
+ */
+ void tick();
+ bool has_features_set() {
+ return (m_features != NO_FEATURES);
+ }
+ bool no_features_set() {
+ return (!has_features_set());
+ }
+ /**
+ * write the status to a JSON format
+ */
+ Json::Value to_json() const;
+ void clear_all_features() {
+ m_features = NO_FEATURES;
+ }
+ void set_feature(feature_t feature) {
+ m_features |= feature;
+ }
+ void unset_feature(feature_t feature) {
+ m_features &= (~feature);
+ }
+ bool is_feature_set(feature_t feature) const {
+ return ( (m_features & feature) == feature );
+ }
+ uint32_t m_features;
+ RXLatency m_latency;
+ RXPacketRecorder m_recorder;
+ RXQueue m_queue;
+ // compensate for the fact that hardware send us packets without Ethernet CRC, and we report with it
+ uint8_t m_num_crc_fix_bytes;
+ CCpuUtlDp *m_cpu_dp_u;
+ CPortLatencyHWBase *m_io;
diff --git a/src/trex_port_attr.cpp b/src/trex_port_attr.cpp
new file mode 100644
index 00000000..61e88585
--- /dev/null
+++ b/src/trex_port_attr.cpp
@@ -0,0 +1,145 @@
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+#include "trex_port_attr.h"
+#include "bp_sim.h"
+DestAttr::DestAttr(uint8_t port_id) {
+ m_port_id = port_id;
+ m_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest;
+ m_type = DEST_TYPE_MAC;
+ /* save the default */
+ memcpy(m_default_mac, m_mac, 6);
+ * set dest as an IPv4 unresolved
+ */
+DestAttr::set_dest(uint32_t ipv4) {
+ assert(ipv4 != 0);
+ m_ipv4 = ipv4;
+ memset(m_mac, 0, 6); // just to be on the safe side
+ * set dest as a resolved IPv4
+ */
+DestAttr::set_dest(uint32_t ipv4, const uint8_t *mac) {
+ assert(ipv4 != 0);
+ m_ipv4 = ipv4;
+ /* source might be the same as dest (this shadows the datapath memory) */
+ memmove(m_mac, mac, 6);
+ m_type = DEST_TYPE_IPV4;
+ * dest dest as MAC
+ *
+ */
+DestAttr::set_dest(const uint8_t *mac) {
+ m_ipv4 = 0;
+ /* source might be the same as dest (this shadows the datapath memory) */
+ memmove(m_mac, mac, 6);
+ m_type = DEST_TYPE_MAC;
+DestAttr::to_json(Json::Value &output) const {
+ switch (m_type) {
+ case DEST_TYPE_IPV4:
+ output["type"] = "ipv4";
+ output["ipv4"] = utl_uint32_to_ipv4(m_ipv4);
+ output["arp"] = utl_macaddr_to_str(m_mac);
+ break;
+ output["type"] = "ipv4_u";
+ output["ipv4"] = utl_uint32_to_ipv4(m_ipv4);
+ break;
+ output["type"] = "mac";
+ output["mac"] = utl_macaddr_to_str(m_mac);
+ break;
+ default:
+ assert(0);
+ }
+const uint8_t *
+TRexPortAttr::get_src_mac() const {
+ return CGlobalInfo::m_options.get_src_mac_addr(m_port_id);
+TRexPortAttr::get_rx_filter_mode() const {
+ switch (m_rx_filter_mode) {
+ return "all";
+ return "hw";
+ default:
+ assert(0);
+ }
+TRexPortAttr::to_json(Json::Value &output) {
+ output["src_mac"] = utl_macaddr_to_str(get_src_mac());
+ output["promiscuous"]["enabled"] = get_promiscuous();
+ output["link"]["up"] = is_link_up();
+ output["speed"] = get_link_speed() / 1000; // make sure we have no cards of less than 1 Gbps
+ output["rx_filter_mode"] = get_rx_filter_mode();
+ if (get_src_ipv4() != 0) {
+ output["src_ipv4"] = utl_uint32_to_ipv4(get_src_ipv4());
+ } else {
+ output["src_ipv4"] = Json::nullValue;
+ }
+ int mode;
+ get_flow_ctrl(mode);
+ output["fc"]["mode"] = mode;
+ m_dest.to_json(output["dest"]);
+TRexPortAttr::update_src_dst_mac(uint8_t *raw_pkt) {
+ memcpy(raw_pkt, get_dest().get_dest_mac(), 6);
+ memcpy(raw_pkt + 6, get_src_mac(), 6);
diff --git a/src/trex_port_attr.h b/src/trex_port_attr.h
index 9231e263..3cb9beff 100755
--- a/src/trex_port_attr.h
+++ b/src/trex_port_attr.h
@@ -21,10 +21,93 @@ limitations under the License.
#include <vector>
#include "rte_ethdev_includes.h"
#include "trex_defs.h"
+#include "common/basic_utils.h"
+#include <json/json.h>
+#include "trex_stateless_rx_defs.h"
+#include <string.h>
+ * destination port attribute
+ *
+ */
+class DestAttr {
+ DestAttr(uint8_t port_id);
+ /**
+ * dest can be either MAC IPv4, or IPv4 unresolved
+ */
+ enum dest_type_e {
+ };
+ /**
+ * set dest as an IPv4 unresolved
+ */
+ void set_dest(uint32_t ipv4);
+ /**
+ * set dest as a resolved IPv4
+ */
+ void set_dest(uint32_t ipv4, const uint8_t *mac);
+ /**
+ * set dest as a plain MAC
+ */
+ void set_dest(const uint8_t *mac);
+ /**
+ * return true if destination is resolved
+ */
+ bool is_resolved() const {
+ return (m_type != DEST_TYPE_IPV4_UNRESOLVED);
+ }
+ /**
+ * get the dest mac
+ * if the dest is not resolved
+ * it will return the default MAC
+ */
+ const uint8_t *get_dest_mac() {
+ return m_mac;
+ }
+ /**
+ * when link gets down - this should be called
+ *
+ */
+ void on_link_down() {
+ if (m_type == DEST_TYPE_IPV4) {
+ /* reset the IPv4 dest with no resolution */
+ set_dest(m_ipv4);
+ }
+ }
+ void to_json(Json::Value &output) const;
+ uint32_t m_ipv4;
+ uint8_t *m_mac;
+ dest_type_e m_type;
+ uint8_t m_port_id;
+ uint8_t m_default_mac[6];
class TRexPortAttr {
+ TRexPortAttr(uint8_t port_id) : m_dest(port_id) {
+ m_src_ipv4 = 0;
+ }
virtual ~TRexPortAttr(){}
@@ -33,10 +116,10 @@ public:
virtual void update_device_info() = 0;
virtual void reset_xstats() = 0;
virtual void update_description() = 0;
virtual bool get_promiscuous() = 0;
- virtual void macaddr_get(struct ether_addr *mac_addr) = 0;
+ virtual void get_hw_src_mac(struct ether_addr *mac_addr) = 0;
virtual uint32_t get_link_speed() { return m_link.link_speed; } // L1 Mbps
virtual bool is_link_duplex() { return (m_link.link_duplex ? true : false); }
virtual bool is_link_autoneg() { return (m_link.link_autoneg ? true : false); }
@@ -51,24 +134,50 @@ public:
virtual void get_description(std::string &description) { description = intf_info_st.description; }
virtual void get_supported_speeds(supp_speeds_t &supp_speeds) = 0;
+ uint32_t get_src_ipv4() {return m_src_ipv4;}
+ DestAttr & get_dest() {return m_dest;}
+ const uint8_t *get_src_mac() const;
+ std::string get_rx_filter_mode() const;
+ /* for a raw packet, write the src/dst MACs */
+ void update_src_dst_mac(uint8_t *raw_pkt);
virtual int set_promiscuous(bool enabled) = 0;
virtual int add_mac(char * mac) = 0;
virtual int set_link_up(bool up) = 0;
virtual int set_flow_ctrl(int mode) = 0;
virtual int set_led(bool on) = 0;
-/* DUMPS */
+ virtual int set_rx_filter_mode(rx_filter_mode_e mode) = 0;
+ void set_src_ipv4(uint32_t addr) {
+ m_src_ipv4 = addr;
+ }
+ /* DUMPS */
virtual void dump_link(FILE *fd) = 0;
+ /* dump object status to JSON */
+ void to_json(Json::Value &output);
- uint8_t m_port_id;
- rte_eth_link m_link;
- struct rte_eth_dev_info dev_info;
- bool flag_is_virtual;
- bool flag_is_fc_change_supported;
- bool flag_is_led_change_supported;
- bool flag_is_link_change_supported;
+ uint8_t m_port_id;
+ rte_eth_link m_link;
+ uint32_t m_src_ipv4;
+ DestAttr m_dest;
+ struct rte_eth_dev_info dev_info;
+ rx_filter_mode_e m_rx_filter_mode;
+ bool flag_is_virtual;
+ bool flag_is_fc_change_supported;
+ bool flag_is_led_change_supported;
+ bool flag_is_link_change_supported;
struct intf_info_st {
std::string pci_addr;
@@ -81,8 +190,11 @@ protected:
class DpdkTRexPortAttr : public TRexPortAttr {
- DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) {
+ DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) : TRexPortAttr(port_id) {
m_port_id = port_id;
+ m_rx_filter_mode = RX_FILTER_MODE_HW;
flag_is_virtual = is_virtual;
int tmp;
flag_is_fc_change_supported = fc_change_allowed && (get_flow_ctrl(tmp) != -ENOTSUP);
@@ -101,7 +213,7 @@ public:
virtual bool get_promiscuous();
- virtual void macaddr_get(struct ether_addr *mac_addr);
+ virtual void get_hw_src_mac(struct ether_addr *mac_addr);
virtual int get_xstats_values(xstats_values_t &xstats_values);
virtual int get_xstats_names(xstats_names_t &xstats_names);
virtual int get_flow_ctrl(int &mode);
@@ -114,6 +226,7 @@ public:
virtual int set_flow_ctrl(int mode);
virtual int set_led(bool on);
+ virtual int set_rx_filter_mode(rx_filter_mode_e mode);
/* DUMPS */
virtual void dump_link(FILE *fd);
@@ -128,7 +241,7 @@ private:
class SimTRexPortAttr : public TRexPortAttr {
- SimTRexPortAttr() {
+ SimTRexPortAttr() : TRexPortAttr(0) {
m_link.link_speed = 10000;
m_link.link_duplex = 1;
m_link.link_autoneg = 0;
@@ -146,7 +259,7 @@ public:
void reset_xstats() {}
void update_description() {}
bool get_promiscuous() { return false; }
- void macaddr_get(struct ether_addr *mac_addr) {}
+ void get_hw_src_mac(struct ether_addr *mac_addr) {}
int get_xstats_values(xstats_values_t &xstats_values) { return -ENOTSUP; }
int get_xstats_names(xstats_names_t &xstats_names) { return -ENOTSUP; }
int get_flow_ctrl(int &mode) { return -ENOTSUP; }
@@ -158,6 +271,7 @@ public:
int set_flow_ctrl(int mode) { return -ENOTSUP; }
int set_led(bool on) { return -ENOTSUP; }
void dump_link(FILE *fd) {}
+ int set_rx_filter_mode(rx_filter_mode_e mode) { return -ENOTSUP; }
diff --git a/src/utl_ip.h b/src/utl_ip.h
index 95db588a..27bb6c81 100644
--- a/src/utl_ip.h
+++ b/src/utl_ip.h
@@ -105,6 +105,7 @@ class COneIPInfo {
} COneIPInfo_ip_types;
+ virtual ~COneIPInfo() {}
virtual void get_mac(uint8_t *mac) const {
@@ -154,6 +155,7 @@ class COneIPv4Info : public COneIPInfo {
COneIPv4Info(uint32_t ip, uint16_t vlan, MacAddress mac, uint8_t port) : COneIPInfo(vlan, mac, port) {
m_ip = ip;
+ ~COneIPv4Info() {};
uint32_t get_ip() {return m_ip;}
virtual uint8_t ip_ver() const {return IP4_VER;}
virtual uint32_t get_arp_req_len() const {return 60;}
@@ -194,6 +196,7 @@ class COneIPv6Info : public COneIPInfo {
COneIPv6Info(uint16_t ip[8], uint16_t vlan, MacAddress mac, uint8_t port) : COneIPInfo(vlan, mac, port) {
memcpy(m_ip, ip, sizeof(m_ip));
+ ~COneIPv6Info() {}
const uint8_t *get_ipv6() {return (uint8_t *)m_ip;}
virtual uint8_t ip_ver() const {return IP6_VER;}
diff --git a/src/utl_term_io.cpp b/src/utl_term_io.cpp
index 8e561188..e45aeebd 100755
--- a/src/utl_term_io.cpp
+++ b/src/utl_term_io.cpp
@@ -78,6 +78,13 @@ int utl_termio_init(){
+ /* stdout is non-blocking */
+ int fd = fileno(stdout);
+ int f = fcntl(fd, F_GETFL, 0);
+ f |= O_NONBLOCK;
+ fcntl(fd, F_SETFL, f);
return (0);