summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
committerHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
commit2d37b9f98020a4458aaad1f3fd05ca5e408213e0 (patch)
tree3a8cd16eb748711b72df37c6f7eea4842d73290a
parent996f2451dba01f534420418eaac2856510682757 (diff)
parent63bf6aba10075a03fe6609369c1c7008afb85ba7 (diff)
merge from master
-rw-r--r--.gitignore3
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_general_test.py11
-rwxr-xr-xscripts/automation/regression/stateless_tests/stl_benchmark_test.py21
-rw-r--r--scripts/automation/regression/stateless_tests/stl_client_test.py6
-rwxr-xr-xscripts/automation/trex_control_plane/stl/console/trex_console.py6
-rw-r--r--scripts/automation/trex_control_plane/stl/examples/stl_pcap.py58
-rw-r--r--scripts/automation/trex_control_plane/stl/examples/stl_pcap_remote.py123
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py9
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py210
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_hltapi.py11
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_packet_builder_scapy.py23
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py234
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py7
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py24
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py6
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py12
-rw-r--r--scripts/exp/pcap_remote_basic-0-ex.erfbin0 -> 125664 bytes
-rw-r--r--scripts/exp/pcap_remote_duration-0-ex.erfbin0 -> 528 bytes
-rw-r--r--scripts/exp/pcap_remote_loop-0-ex.erfbin0 -> 376992 bytes
-rw-r--r--scripts/exp/remote_test.capbin0 -> 108552 bytes
-rwxr-xr-xsrc/bp_sim.cpp413
-rwxr-xr-xsrc/bp_sim.h35
-rwxr-xr-xsrc/common/captureFile.cpp16
-rwxr-xr-xsrc/common/captureFile.h6
-rwxr-xr-xsrc/common/pcap.cpp3
-rw-r--r--src/gtest/trex_stateless_gtest.cpp64
-rw-r--r--src/main_dpdk.cpp242
-rw-r--r--src/publisher/trex_publisher.h1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp29
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h2
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp78
-rw-r--r--src/stateless/cp/trex_stateless_port.h27
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp192
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h29
-rw-r--r--src/stateless/dp/trex_stream_node.h172
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp32
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h52
40 files changed, 1593 insertions, 592 deletions
diff --git a/.gitignore b/.gitignore
index 80456922..ac511561 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,6 +37,9 @@ scripts/stl/exportedFile.pcap
scripts/exp/stl_multi_burst1-0.erf
scripts/exp/stl_multi_pkt1-0.erf
scripts/exp/stl_multi_pkt2-0.erf
+scripts/exp/pcap_remote_basic-0.erf
+scripts/exp/pcap_remote_duration-0.erf
+scripts/exp/pcap_remote_loop-0.erf
scripts/exp/stl_single_pkt_burst1-0.erf
scripts/exp/stl_single_sctp_pkt-0.erf
scripts/exp/stl_single_stream-0.erf
diff --git a/scripts/automation/regression/stateful_tests/trex_general_test.py b/scripts/automation/regression/stateful_tests/trex_general_test.py
index 42720f70..010ceb36 100755
--- a/scripts/automation/regression/stateful_tests/trex_general_test.py
+++ b/scripts/automation/regression/stateful_tests/trex_general_test.py
@@ -165,10 +165,13 @@ class CTRexGeneral_Test(unittest.TestCase):
# report benchmarks
if self.GAManager:
- setup_test = '%s.%s' % (CTRexScenario.setup_name, self.get_name())
- self.GAManager.gaAddAction(Event = 'stateful_test', action = setup_test, label = 'bw_per_core', value = int(test_norm_cpu))
- self.GAManager.gaAddAction(Event = 'stateful_test', action = setup_test, label = 'bw_per_core_exp', value = int(expected_norm_cpu))
- self.GAManager.emptyAndReportQ()
+ try:
+ setup_test = '%s.%s' % (CTRexScenario.setup_name, self.get_name())
+ self.GAManager.gaAddAction(Event = 'stateful_test', action = setup_test, label = 'bw_per_core', value = int(test_norm_cpu))
+ self.GAManager.gaAddAction(Event = 'stateful_test', action = setup_test, label = 'bw_per_core_exp', value = int(expected_norm_cpu))
+ self.GAManager.emptyAndReportQ()
+ except Exception as e:
+ print('Sending GA failed: %s' % e)
def check_results_gt (self, res, name, val):
if res is None:
diff --git a/scripts/automation/regression/stateless_tests/stl_benchmark_test.py b/scripts/automation/regression/stateless_tests/stl_benchmark_test.py
index ef4c435f..c2c11cc7 100755
--- a/scripts/automation/regression/stateless_tests/stl_benchmark_test.py
+++ b/scripts/automation/regression/stateless_tests/stl_benchmark_test.py
@@ -51,15 +51,18 @@ class STLBenchmark_Test(CStlGeneral_Test):
# report benchmarks
if self.GAManager:
- profile_repr = '%s.%s %s' % (CTRexScenario.setup_name,
- os.path.basename(profile_bench['name']),
- repr(kwargs).replace("'", ''))
- self.GAManager.gaAddAction(Event = 'stateless_test', action = profile_repr,
- label = 'bw_per_core', value = int(agv_bw_per_core))
- # TODO: report expected once acquired
- #self.GAManager.gaAddAction(Event = 'stateless_test', action = profile_repr,
- # label = 'bw_per_core_exp', value = int(expected_norm_cpu))
- self.GAManager.emptyAndReportQ()
+ try:
+ profile_repr = '%s.%s %s' % (CTRexScenario.setup_name,
+ os.path.basename(profile_bench['name']),
+ repr(kwargs).replace("'", ''))
+ self.GAManager.gaAddAction(Event = 'stateless_test', action = profile_repr,
+ label = 'bw_per_core', value = int(agv_bw_per_core))
+ # TODO: report expected once acquired
+ #self.GAManager.gaAddAction(Event = 'stateless_test', action = profile_repr,
+ # label = 'bw_per_core_exp', value = int(expected_norm_cpu))
+ self.GAManager.emptyAndReportQ()
+ except Exception as e:
+ print('Sending GA failed: %s' % e)
def tearDown(self):
self.stl_trex.reset()
diff --git a/scripts/automation/regression/stateless_tests/stl_client_test.py b/scripts/automation/regression/stateless_tests/stl_client_test.py
index e7c9bb5d..10f56b3f 100644
--- a/scripts/automation/regression/stateless_tests/stl_client_test.py
+++ b/scripts/automation/regression/stateless_tests/stl_client_test.py
@@ -84,9 +84,6 @@ class STLClient_Test(CStlGeneral_Test):
self.c.clear_stats()
self.c.start(ports = [self.tx_port, self.rx_port])
- assert self.c.ports[self.tx_port].is_transmitting(), 'port should be active'
- assert self.c.ports[self.rx_port].is_transmitting(), 'port should be active'
-
self.c.wait_on_traffic(ports = [self.tx_port, self.rx_port])
stats = self.c.get_stats()
@@ -124,9 +121,6 @@ class STLClient_Test(CStlGeneral_Test):
self.c.clear_stats()
self.c.start(ports = [self.tx_port, self.rx_port])
- assert self.c.ports[self.tx_port].is_transmitting(), 'port should be active'
- assert self.c.ports[self.rx_port].is_transmitting(), 'port should be active'
-
self.c.wait_on_traffic(ports = [self.tx_port, self.rx_port])
stats = self.c.get_stats()
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py
index f8161dcb..ab70d357 100755
--- a/scripts/automation/trex_control_plane/stl/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py
@@ -319,9 +319,13 @@ class TRexConsole(TRexGeneralCmd):
return self.do_history(line)
def do_push (self, line):
- '''Push a PCAP file\n'''
+ '''Push a local PCAP file\n'''
return self.stateless_client.push_line(line)
+ #def do_push_remote (self, line):
+ # '''Push a remote accessible PCAP file\n'''
+ # return self.stateless_client.push_remote_line(line)
+
def help_push (self):
return self.do_push("-h")
diff --git a/scripts/automation/trex_control_plane/stl/examples/stl_pcap.py b/scripts/automation/trex_control_plane/stl/examples/stl_pcap.py
index 45ddc24b..eae0f18b 100644
--- a/scripts/automation/trex_control_plane/stl/examples/stl_pcap.py
+++ b/scripts/automation/trex_control_plane/stl/examples/stl_pcap.py
@@ -3,62 +3,52 @@ from trex_stl_lib.api import *
import argparse
import sys
-def create_vm (ip_start, ip_end):
- vm =[
- # dest
- STLVmFlowVar(name="dst", min_value = ip_start, max_value = ip_end, size = 4, op = "inc"),
- STLVmWrFlowVar(fv_name="dst",pkt_offset= "IP.dst"),
+def packet_hook_generator (remove_fcs, vlan_id):
- # checksum
- STLVmFixIpv4(offset = "IP")
+ def packet_hook (packet):
+ packet = Ether(packet)
- ]
-
- return vm
-
-# warning: might make test slow
-def alter_streams(streams, remove_fcs, vlan_id):
- for stream in streams:
- packet = Ether(stream.pkt)
if vlan_id >= 0 and vlan_id <= 4096:
packet_l3 = packet.payload
packet = Ether() / Dot1Q(vlan = vlan_id) / packet_l3
+
if remove_fcs and packet.lastlayer().name == 'Padding':
packet.lastlayer().underlayer.remove_payload()
- packet = STLPktBuilder(packet)
- stream.fields['packet'] = packet.dump_pkt()
- stream.pkt = base64.b64decode(stream.fields['packet']['binary'])
+
+ return str(packet)
+
+ return packet_hook
+
def inject_pcap (pcap_file, server, port, loop_count, ipg_usec, use_vm, remove_fcs, vlan_id):
# create client
c = STLClient(server = server)
-
- try:
- if use_vm:
- vm = create_vm("10.0.0.1", "10.0.0.254")
- else:
- vm = None
- profile = STLProfile.load_pcap(pcap_file, ipg_usec = ipg_usec, loop_count = loop_count, vm = vm)
+ if remove_fcs or vlan_id:
+ packet_hook = packet_hook_generator(remove_fcs, vlan_id)
+ else:
+ packet_hook = None
- print("Loaded pcap {0} with {1} packets...\n".format(pcap_file, len(profile)))
- streams = profile.get_streams()
- if remove_fcs or (vlan_id >= 0 and vlan_id <= 4096):
- alter_streams(streams, remove_fcs, vlan_id)
+ try:
- # uncomment this for simulator run
- #STLSim().run(profile.get_streams(), outfile = '/auto/srg-sce-swinfra-usr/emb/users/ybrustin/out.pcap')
+ vm = STLIPRange(dst = {'start': '10.0.0.1', 'end': '10.0.0.254', 'step' : 1}) if use_vm else None
c.connect()
c.reset(ports = [port])
- stream_ids = c.add_streams(streams, ports = [port])
c.clear_stats()
+ d = c.push_pcap(pcap_file,
+ ipg_usec = ipg_usec,
+ count = loop_count,
+ vm = vm,
+ packet_hook = packet_hook)
+
+ STLSim().run(d, outfile = 'test.cap')
+
+ c.wait_on_traffic()
- c.start()
- c.wait_on_traffic(ports = [port])
stats = c.get_stats()
opackets = stats[port]['opackets']
diff --git a/scripts/automation/trex_control_plane/stl/examples/stl_pcap_remote.py b/scripts/automation/trex_control_plane/stl/examples/stl_pcap_remote.py
new file mode 100644
index 00000000..c47eee31
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/examples/stl_pcap_remote.py
@@ -0,0 +1,123 @@
+import stl_path
+from trex_stl_lib.api import *
+import argparse
+import sys
+
+
+def inject_pcap (c, pcap_file, port, loop_count, ipg_usec, duration):
+
+ pcap_file = os.path.abspath(pcap_file)
+
+ c.reset(ports = [port])
+ c.push_remote(pcap_file, ports = [port], ipg_usec = ipg_usec, speedup = 1.0, count = loop_count, duration = duration)
+ # assume 100 seconds is enough - but can be more
+ c.wait_on_traffic(ports = [port], timeout = 100)
+
+ stats = c.get_stats()
+ opackets = stats[port]['opackets']
+
+ return opackets
+ #print("{0} packets were Tx on port {1}\n".format(opackets, port))
+
+
+
+def setParserOptions():
+ parser = argparse.ArgumentParser(prog="stl_pcap.py")
+
+ parser.add_argument("-f", "--file", help = "pcap file to inject",
+ dest = "pcap",
+ required = True,
+ type = str)
+
+ parser.add_argument("-s", "--server", help = "TRex server address",
+ dest = "server",
+ default = 'localhost',
+ type = str)
+
+ parser.add_argument("-p", "--port", help = "port to inject on",
+ dest = "port",
+ required = True,
+ type = int)
+
+ parser.add_argument("-n", "--number", help = "How many times to inject pcap [default is 1, 0 means forever]",
+ dest = "loop_count",
+ default = 1,
+ type = int)
+
+ parser.add_argument("-i", help = "IPG in usec",
+ dest = "ipg",
+ default = None,
+ type = float)
+
+ parser.add_argument("-d", help = "duration in seconds",
+ dest = "duration",
+ default = -1,
+ type = float)
+
+ return parser
+
+def sizeof_fmt(num, suffix='B'):
+ for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
+ if abs(num) < 1024.0:
+ return "%3.1f%s%s" % (num, unit, suffix)
+ num /= 1024.0
+ return "%.1f%s%s" % (num, 'Yi', suffix)
+
+
+def read_txt_file (filename):
+
+ with open(filename) as f:
+ lines = f.readlines()
+
+ caps = []
+ for raw in lines:
+ raw = raw.rstrip()
+ if raw[0] == '#':
+ continue
+ ext=os.path.splitext(raw)[1]
+ if ext not in ['.cap', '.pcap', '.erf']:
+ # skip unknown format
+ continue
+
+ caps.append(raw)
+
+ return caps
+
+
+def start (args):
+
+ parser = setParserOptions()
+ options = parser.parse_args(args)
+
+ ext = os.path.splitext(options.pcap)[1]
+ if ext == '.txt':
+ caps = read_txt_file(options.pcap)
+ elif ext in ['.cap', '.pcap']:
+ caps = [options.pcap]
+ else:
+ print("unknown file extension for file {0}".format(options.pcap))
+ return
+
+ c = STLClient(server = options.server)
+ try:
+ c.connect()
+ for i, cap in enumerate(caps, start = 1):
+ before = time.time()
+ print ("{:} CAP {:} @ {:} - ".format(i, cap, sizeof_fmt(os.path.getsize(cap)))),
+ injected = inject_pcap(c, cap, options.port, options.loop_count, options.ipg, options.duration)
+ print("took {:.2f} seconds for {:} packets").format(time.time() - before, injected)
+
+ except STLError as e:
+ print(e)
+ return
+
+ finally:
+ c.disconnect()
+
+def main ():
+ start(sys.argv[1:])
+
+# inject pcap
+if __name__ == '__main__':
+ main()
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
index 022077a9..5c9faf0f 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
@@ -194,11 +194,13 @@ class CTRexAsyncClient():
if not self.connected:
return
+ # mark for join
+ self.active = False
+
# signal that the context was destroyed (exit the thread loop)
self.context.term()
- # mark for join and join
- self.active = False
+ # join
self.t.join()
# done
@@ -239,6 +241,7 @@ class CTRexAsyncClient():
except zmq.ContextTerminated:
# outside thread signaled us to exit
+ assert(not self.active)
break
msg = json.loads(line)
@@ -256,6 +259,8 @@ class CTRexAsyncClient():
# closing of socket must be from the same thread
self.socket.close(linger = 0)
+ def is_thread_alive (self):
+ return self.t.is_alive()
# did we get info for the last 3 seconds ?
def is_alive (self):
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 862a9979..010d966c 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -313,6 +313,10 @@ class EventsHandler(object):
if session_id != self.client.session_id:
self.__async_event_port_released(port_id)
+ elif (type == 7):
+ port_id = int(data['port_id'])
+ ev = "port {0} job failed".format(port_id)
+ show_event = True
# server stopped
elif (type == 100):
@@ -331,30 +335,37 @@ class EventsHandler(object):
# private functions
+ # on rare cases events may come on a non existent prot
+ # (server was re-run with different config)
def __async_event_port_job_done (self, port_id):
- self.client.ports[port_id].async_event_port_job_done()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_job_done()
def __async_event_port_stopped (self, port_id):
- self.client.ports[port_id].async_event_port_stopped()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_stopped()
def __async_event_port_started (self, port_id):
- self.client.ports[port_id].async_event_port_started()
-
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_started()
def __async_event_port_paused (self, port_id):
- self.client.ports[port_id].async_event_port_paused()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_paused()
def __async_event_port_resumed (self, port_id):
- self.client.ports[port_id].async_event_port_resumed()
-
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_resumed()
def __async_event_port_acquired (self, port_id, who):
- self.client.ports[port_id].async_event_acquired(who)
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_acquired(who)
def __async_event_port_released (self, port_id):
- self.client.ports[port_id].async_event_released()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_released()
def __async_event_server_stopped (self):
self.client.connected = False
@@ -711,6 +722,17 @@ class STLClient(object):
return rc
+ def __push_remote (self, pcap_filename, port_id_list, ipg_usec, speedup, count, duration):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].push_remote(pcap_filename, ipg_usec, speedup, count, duration))
+
+ return rc
+
+
def __validate (self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -832,6 +854,9 @@ class STLClient(object):
# clear stats
def __clear_stats(self, port_id_list, clear_global, clear_flow_stats):
+ # we must be sync with the server
+ self.async_client.barrier()
+
for port_id in port_id_list:
self.ports[port_id].clear_stats()
@@ -1852,7 +1877,136 @@ class STLClient(object):
@__api_check(True)
- def validate (self, ports = None, mult = "1", duration = "-1", total = False):
+ def push_remote (self,
+ pcap_filename,
+ ports = None,
+ ipg_usec = None,
+ speedup = 1.0,
+ count = 1,
+ duration = -1):
+ """
+ Push a remote server-reachable PCAP file
+ the path must be fullpath accessible to the server
+
+ :parameters:
+ pcap_filename : str
+ PCAP file name in full path and accessible to the server
+
+ ports : list
+ Ports on which to execute the command
+
+ ipg_usec : float
+ Inter-packet gap in microseconds
+
+ speedup : float
+ A factor to adjust IPG. effectively IPG = IPG / speedup
+
+ count: int
+ How many times to transmit the cap
+
+ duration: float
+ Limit runtime by duration in seconds
+ :raises:
+ + :exc:`STLError`
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ validate_type('pcap_filename', pcap_filename, str)
+ validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
+ validate_type('speedup', speedup, (float, int))
+ validate_type('count', count, int)
+ validate_type('duration', duration, (float, int))
+
+ self.logger.pre_cmd("Pushing remote PCAP on port(s) {0}:".format(ports))
+ rc = self.__push_remote(pcap_filename, ports, ipg_usec, speedup, count, duration)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ @__api_check(True)
+ def push_pcap (self,
+ pcap_filename,
+ ports = None,
+ ipg_usec = None,
+ speedup = 1.0,
+ count = 1,
+ duration = -1,
+ force = False,
+ vm = None,
+ packet_hook = None):
+ """
+ Push a local PCAP to the server
+ This is equivalent to loading a PCAP file to a profile
+ and attaching the profile to port(s)
+
+ file size is limited to 1MB
+
+ :parameters:
+ pcap_filename : str
+ PCAP filename (accessible locally)
+
+ ports : list
+ Ports on which to execute the command
+
+ ipg_usec : float
+ Inter-packet gap in microseconds
+
+ speedup : float
+ A factor to adjust IPG. effectively IPG = IPG / speedup
+
+ count: int
+ How many times to transmit the cap
+
+ duration: float
+ Limit runtime by duration in seconds
+
+ force: bool
+ Ignore file size limit - push any file size to the server
+
+ vm: list of VM instructions
+ VM instructions to apply for every packet
+
+ packet_hook : Callable or function
+ Will be applied to every packet
+
+ :raises:
+ + :exc:`STLError`
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ validate_type('pcap_filename', pcap_filename, str)
+ validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
+ validate_type('speedup', speedup, (float, int))
+ validate_type('count', count, int)
+ validate_type('duration', duration, (float, int))
+ validate_type('vm', vm, (list, type(None)))
+
+ # no support for > 1MB PCAP - use push remote
+ if not force and os.path.getsize(pcap_filename) > (1024 * 1024):
+ raise STLError("PCAP size of {:} is too big for local push - consider using remote push or provide 'force'".format(format_num(os.path.getsize(pcap_filename), suffix = 'B')))
+
+ self.remove_all_streams(ports = ports)
+
+ profile = STLProfile.load_pcap(pcap_filename,
+ ipg_usec,
+ speedup,
+ count,
+ vm = vm,
+ packet_hook = packet_hook)
+
+ id_list = self.add_streams(profile.get_streams(), ports)
+
+ return self.start(ports = ports, duration = duration)
+
+
+ @__api_check(True)
+ def validate (self, ports = None, mult = "1", duration = -1, total = False):
"""
Validate port(s) configuration
@@ -1899,6 +2053,8 @@ class STLClient(object):
rc = self.__validate(ports)
self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
for port in ports:
self.ports[port].print_profile(mult_obj, duration)
@@ -1931,7 +2087,6 @@ class STLClient(object):
if not type(clear_global) is bool:
raise STLArgumentError('clear_global', clear_global)
-
rc = self.__clear_stats(ports, clear_global, clear_flow_stats)
if not rc:
raise STLError(rc)
@@ -1995,6 +2150,11 @@ class STLClient(object):
# wait while any of the required ports are active
while set(self.get_active_ports()).intersection(ports):
+
+ # make sure ASYNC thread is still alive - otherwise we will be stuck forever
+ if not self.async_client.is_thread_alive():
+ raise STLError("subscriber thread is dead")
+
time.sleep(0.01)
if time.time() > expr:
raise STLTimeoutError(timeout)
@@ -2519,6 +2679,7 @@ class STLClient(object):
"push",
self.push_line.__doc__,
parsing_opts.FILE_PATH,
+ parsing_opts.REMOTE_FILE,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.COUNT,
parsing_opts.DURATION,
@@ -2540,16 +2701,25 @@ class STLClient(object):
else:
self.stop(active_ports)
- # pcap injection removes all previous streams from the ports
- self.remove_all_streams(ports = opts.ports)
-
- profile = STLProfile.load_pcap(opts.file[0],
- opts.ipg_usec,
- opts.speedup,
- opts.count)
- id_list = self.add_streams(profile.get_streams(), opts.ports)
- self.start(ports = opts.ports, duration = opts.duration, force = opts.force)
+ if opts.remote:
+ self.push_remote(opts.file[0],
+ ports = opts.ports,
+ ipg_usec = opts.ipg_usec,
+ speedup = opts.speedup,
+ count = opts.count,
+ duration = opts.duration)
+
+ else:
+ self.push_pcap(opts.file[0],
+ ports = opts.ports,
+ ipg_usec = opts.ipg_usec,
+ speedup = opts.speedup,
+ count = opts.count,
+ duration = opts.duration,
+ force = opts.force)
+
+
return True
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_hltapi.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_hltapi.py
index 0afeff20..33a7b3af 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_hltapi.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_hltapi.py
@@ -23,7 +23,9 @@ traffic_config_kwargs = {
'split_by_cores': 'split', # ( split | duplicate | single ) TRex extention: split = split traffic by cores, duplicate = duplicate traffic for all cores, single = run only with sinle core (not implemented yet)
'load_profile': None, # TRex extention: path to filename with stream profile (stream builder parameters will be ignored, limitation: modify)
'consistent_random': False, # TRex extention: False (default) = random sequence will be different every run, True = random sequence will be same every run
- 'ignore_macs': False, # TRex extention: True = use MACs from server configuration , no MAC VM (workaround on lack of ARP)
+ 'ignore_macs': False, # TRex extention: True = use MACs from server configuration, no MAC VM (workaround on lack of ARP)
+ 'disable_flow_stats': False, # TRex extention: True = don't use flow stats for this stream, (workaround for limitation on type of packet for flow_stats)
+ 'flow_stats_id': None, # TRex extention: uint, for use of STLHltStream, specifies id for flow stats (see stateless manual for flow_stats details)
'port_handle': None,
'port_handle2': None,
'bidirectional': False,
@@ -446,7 +448,7 @@ class CTRexHltApi(object):
kwargs = merge_kwargs(traffic_config_kwargs, user_kwargs)
stream_id = kwargs['stream_id']
mode = kwargs['mode']
- pg_id = None
+ pg_id = kwargs['flow_stats_id']
port_handle = port_list = self._parse_port_list(kwargs['port_handle'])
ALLOWED_MODES = ['create', 'modify', 'remove', 'enable', 'disable', 'reset']
@@ -864,7 +866,10 @@ def STLHltStream(**user_kwargs):
raise STLError('Could not create transmit_mode object %s: %s' % (transmit_mode, e if isinstance(e, STLError) else traceback.format_exc()))
try:
- pg_id = kwargs.get('pg_id')
+ if kwargs['l3_protocol'] == 'ipv4' and not kwargs['disable_flow_stats']:
+ pg_id = kwargs.get('pg_id', kwargs.get('flow_stats_id'))
+ else:
+ pg_id = None
stream = STLStream(packet = packet,
random_seed = 1 if is_true(kwargs['consistent_random']) else 0,
#enabled = True,
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_packet_builder_scapy.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_packet_builder_scapy.py
index f06d5d70..e12efaf9 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_packet_builder_scapy.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_packet_builder_scapy.py
@@ -1492,4 +1492,27 @@ class STLPktBuilder(CTrexPktBuilderInterface):
pass;
+def STLIPRange (src = None,
+ dst = None,
+ fix_chksum = True):
+
+ vm = []
+
+ if src:
+ vm += [
+ STLVmFlowVar(name="src", min_value = src['start'], max_value = src['end'], size = 4, op = "inc", step = src['step']),
+ STLVmWrFlowVar(fv_name="src",pkt_offset= "IP.src")
+ ]
+
+ if dst:
+ vm += [
+ STLVmFlowVar(name="dst", min_value = dst['start'], max_value = dst['end'], size = 4, op = "inc", step = dst['step']),
+ STLVmWrFlowVar(fv_name="dst",pkt_offset= "IP.dst")
+ ]
+
+ if fix_chksum:
+ vm.append( STLVmFixIpv4(offset = "IP"))
+
+
+ return vm
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index e8f89b27..391b2076 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -29,17 +29,20 @@ def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
# describes a single port
class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+ STATE_PCAP_TX = 5
+
PortState = namedtuple('PortState', ['state_id', 'state_name'])
STATES_MAP = {STATE_DOWN: "DOWN",
STATE_IDLE: "IDLE",
STATE_STREAMS: "IDLE",
STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
+ STATE_PAUSE: "PAUSE",
+ STATE_PCAP_TX : "ACTIVE"}
def __init__ (self, port_id, user, comm_link, session_id, info):
@@ -67,6 +70,54 @@ class Port(object):
self.owner = ''
+ # decorator to verify port is up
+ def up(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+ # owned
+ def owned(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ if not port.is_acquired():
+ return port.err("{0} - port is not owned".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+
+ # decorator to check server is readable (port not down and etc.)
+ def writeable(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ if not port.is_acquired():
+ return port.err("{0} - port is not owned".format(func.__name__))
+
+ if not port.is_writeable():
+ return port.err("{0} - port is not in a writeable state".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+
+
def err(self, msg):
return RC_ERR("port {0} : {1}\n".format(self.port_id, msg))
@@ -79,7 +130,39 @@ class Port(object):
def get_formatted_speed (self):
return "{0} Gbps".format(self.info['speed'])
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_up (self):
+ return (self.state != self.STATE_DOWN)
+
+ def is_active(self):
+ return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
+
+ def is_transmitting (self):
+ return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX)
+
+ def is_paused (self):
+ return (self.state == self.STATE_PAUSE)
+
+ def is_writeable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ def get_owner (self):
+ if self.is_acquired():
+ return self.user
+ else:
+ return self.owner
+
+ def __allocate_stream_id (self):
+ id = self.next_available_id
+ self.next_available_id += 1
+ return id
+
+
# take the port
+ @up
def acquire(self, force = False, sync_streams = True):
params = {"port_id": self.port_id,
"user": self.user,
@@ -99,6 +182,7 @@ class Port(object):
# sync all the streams with the server
+ @up
def sync_streams (self):
params = {"port_id": self.port_id}
@@ -114,6 +198,7 @@ class Port(object):
return self.ok()
# release the port
+ @up
def release(self):
params = {"port_id": self.port_id,
"handler": self.handler}
@@ -129,25 +214,11 @@ class Port(object):
else:
return self.err(rc.err())
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def is_transmitting (self):
- return (self.state == self.STATE_TX)
-
- def is_paused (self):
- return (self.state == self.STATE_PAUSE)
-
- def get_owner (self):
- if self.is_acquired():
- return self.user
- else:
- return self.owner
+
+ @up
def sync(self):
+
params = {"port_id": self.port_id}
rc = self.transmit("get_port_status", params)
@@ -167,6 +238,8 @@ class Port(object):
self.state = self.STATE_TX
elif port_state == "PAUSE":
self.state = self.STATE_PAUSE
+ elif port_state == "PCAP_TX":
+ self.state = self.STATE_PCAP_TX
else:
raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
@@ -181,27 +254,11 @@ class Port(object):
return self.ok()
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
-
- def __allocate_stream_id (self):
- id = self.next_available_id
- self.next_available_id += 1
- return id
-
# add streams
+ @writeable
def add_streams (self, streams_list):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
# listify
streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
@@ -273,14 +330,9 @@ class Port(object):
# remove stream from port
+ @writeable
def remove_streams (self, stream_id_list):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
# single element to list
stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
@@ -314,14 +366,9 @@ class Port(object):
# remove all the streams
+ @writeable
def remove_all_streams (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -348,19 +395,11 @@ class Port(object):
return self.streams
- # start traffic
+ @writeable
def start (self, mul, duration, force):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
+ return self.err("unable to start traffic - no streams attached to port")
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -378,15 +417,12 @@ class Port(object):
# stop traffic
# with force ignores the cached state and sends the command
+ @owned
def stop (self, force = False):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- # port is already stopped
- if not force:
- if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS):
- return self.ok()
+ # if not is not active and not force - go back
+ if not self.is_active() and not force:
+ return self.ok()
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -420,19 +456,10 @@ class Port(object):
return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms)
-
+ @writeable
def remove_rx_filters (self):
assert(self.has_rx_enabled())
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if self.state == self.STATE_DOWN:
- return self.err("Unable to remove RX filters - port is down")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to remove RX filters - port is transmitting")
-
if self.state == self.STATE_IDLE:
return self.ok()
@@ -446,11 +473,11 @@ class Port(object):
return self.ok()
-
+ @owned
def pause (self):
- if not self.is_acquired():
- return self.err("port is not owned")
+ if (self.state == self.STATE_PCAP_TX) :
+ return self.err("pause is not supported during PCAP TX")
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -466,12 +493,9 @@ class Port(object):
return self.ok()
-
+ @owned
def resume (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
if (self.state != self.STATE_PAUSE) :
return self.err("port is not in pause mode")
@@ -488,11 +512,11 @@ class Port(object):
return self.ok()
-
+ @owned
def update (self, mul, force):
- if not self.is_acquired():
- return self.err("port is not owned")
+ if (self.state == self.STATE_PCAP_TX) :
+ return self.err("update is not supported during PCAP TX")
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -508,15 +532,9 @@ class Port(object):
return self.ok()
-
+ @owned
def validate (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
-
if (self.state == self.STATE_IDLE):
return self.err("no streams attached to port")
@@ -532,12 +550,8 @@ class Port(object):
return self.ok()
+ @owned
def set_attr (self, attr_dict):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -552,6 +566,24 @@ class Port(object):
return self.ok()
+ @writeable
+ def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "pcap_filename": pcap_filename,
+ "ipg_usec": ipg_usec if ipg_usec is not None else -1,
+ "speedup": speedup,
+ "count": count,
+ "duration": duration}
+
+ rc = self.transmit("push_remote", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.state = self.STATE_PCAP_TX
+ return self.ok()
+
def get_profile (self):
return self.profile
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
index c7513144..61122e79 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
@@ -28,6 +28,9 @@ SS_COMPAT = [GLOBAL_STATS, STREAMS_STATS]
ExportableStats = namedtuple('ExportableStats', ['raw_data', 'text_table'])
+def round_float (f):
+ return float("%.2f" % f)
+
# deep mrege of dicts dst = src + dst
def deep_merge_dicts (dst, src):
for k, v in src.items():
@@ -633,10 +636,10 @@ class CGlobalStats(CTRexStats):
("version", "{ver}, UUID: {uuid}".format(ver=self.server_version.get("version", "N/A"),
uuid="N/A")),
- ("cpu_util", "{0}% {1}".format( format_threshold(self.get("m_cpu_util"), [85, 100], [0, 85]),
+ ("cpu_util", "{0}% {1}".format( format_threshold(round_float(self.get("m_cpu_util")), [85, 100], [0, 85]),
self.get_trend_gui("m_cpu_util", use_raw = True))),
- ("rx_cpu_util", "{0}% {1}".format( format_threshold(self.get("m_rx_cpu_util"), [85, 100], [0, 85]),
+ ("rx_cpu_util", "{0}% {1}".format( format_threshold(round_float(self.get("m_rx_cpu_util")), [85, 100], [0, 85]),
self.get_trend_gui("m_rx_cpu_util", use_raw = True))),
(" ", ""),
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
index 165942d8..a7fd3026 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
@@ -7,6 +7,7 @@ from .trex_stl_packet_builder_scapy import STLPktBuilder, Ether, IP, UDP, TCP, R
from collections import OrderedDict, namedtuple
from scapy.utils import ltoa
+from scapy.error import Scapy_Exception
import random
import yaml
import base64
@@ -927,7 +928,7 @@ class STLProfile(object):
# loop_count = 0 means loop forever
@staticmethod
- def load_pcap (pcap_file, ipg_usec = None, speedup = 1.0, loop_count = 1, vm = None):
+ def load_pcap (pcap_file, ipg_usec = None, speedup = 1.0, loop_count = 1, vm = None, packet_hook = None):
""" Convert a pcap file with a number of packets to a list of connected streams.
packet1->packet2->packet3 etc
@@ -938,7 +939,7 @@ class STLProfile(object):
Name of the pcap file
ipg_usec : float
- Inter packet gap in usec. If IPG=0, IPG is taken from pcap file
+ Inter packet gap in usec. If IPG is None, IPG is taken from pcap file
speedup : float
When reading the pcap file, divide IPG by this "speedup" factor. Resulting IPG is sped up by this factor.
@@ -949,6 +950,9 @@ class STLProfile(object):
vm : list
List of Field engine instructions
+ packet_hook : Callable or function
+ will be applied to every packet
+
:return: STLProfile
"""
@@ -958,8 +962,8 @@ class STLProfile(object):
raise STLError("file '{0}' does not exists".format(pcap_file))
# make sure IPG is not less than 1 usec
- if ipg_usec is not None and ipg_usec < 1:
- raise STLError("ipg_usec cannot be less than 1 usec: '{0}'".format(ipg_usec))
+ if ipg_usec is not None and ipg_usec < 0.001:
+ raise STLError("ipg_usec cannot be less than 0.001 usec: '{0}'".format(ipg_usec))
if loop_count < 0:
raise STLError("'loop_count' cannot be negative")
@@ -967,8 +971,15 @@ class STLProfile(object):
streams = []
last_ts_usec = 0
- pkts = RawPcapReader(pcap_file).read_all()
-
+ try:
+ pkts = RawPcapReader(pcap_file).read_all()
+ except Scapy_Exception as e:
+ raise STLError("failed to open PCAP file '{0}'".format(pcap_file))
+
+ if packet_hook:
+ pkts = [(packet_hook(cap), meta) for (cap, meta) in pkts]
+
+
for i, (cap, meta) in enumerate(pkts, start = 1):
# IPG - if not provided, take from cap
if ipg_usec == None:
@@ -984,7 +995,6 @@ class STLProfile(object):
next = i + 1
action_count = 0
-
streams.append(STLStream(name = i,
packet = STLPktBuilder(pkt_buffer = cap, vm = vm),
mode = STLTXSingleBurst(total_pkts = 1, percentage = 100),
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
index ad46625b..98e3ca6a 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
@@ -32,6 +32,7 @@ PROMISCUOUS = 19
NO_PROMISCUOUS = 20
PROMISCUOUS_SWITCH = 21
TUNABLES = 22
+REMOTE_FILE = 23
GLOBAL_STATS = 50
PORT_STATS = 51
@@ -290,6 +291,11 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'default': False,
'help': "Set if you want to stop active ports before appyling command."}),
+ REMOTE_FILE: ArgumentPack(['-r', '--remote'],
+ {"action": "store_true",
+ 'default': False,
+ 'help': "file path should be interpeted by the server (remote file)"}),
+
FILE_PATH: ArgumentPack(['-f'],
{'metavar': 'FILE',
'dest': 'file',
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
index 5c0dfb14..7e0bf9e4 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
@@ -150,11 +150,15 @@ def format_text(text, *args):
def format_threshold (value, red_zone, green_zone):
- if value >= red_zone[0] and value <= red_zone[1]:
- return format_text("{0}".format(value), 'red')
+ try:
+ if value >= red_zone[0] and value <= red_zone[1]:
+ return format_text("{0}".format(value), 'red')
- if value >= green_zone[0] and value <= green_zone[1]:
- return format_text("{0}".format(value), 'green')
+ if value >= green_zone[0] and value <= green_zone[1]:
+ return format_text("{0}".format(value), 'green')
+ except TypeError:
+ # if value is not comparable or not a number - skip this
+ pass
return "{0}".format(value)
diff --git a/scripts/exp/pcap_remote_basic-0-ex.erf b/scripts/exp/pcap_remote_basic-0-ex.erf
new file mode 100644
index 00000000..3c626419
--- /dev/null
+++ b/scripts/exp/pcap_remote_basic-0-ex.erf
Binary files differ
diff --git a/scripts/exp/pcap_remote_duration-0-ex.erf b/scripts/exp/pcap_remote_duration-0-ex.erf
new file mode 100644
index 00000000..3731f735
--- /dev/null
+++ b/scripts/exp/pcap_remote_duration-0-ex.erf
Binary files differ
diff --git a/scripts/exp/pcap_remote_loop-0-ex.erf b/scripts/exp/pcap_remote_loop-0-ex.erf
new file mode 100644
index 00000000..c5f3d8be
--- /dev/null
+++ b/scripts/exp/pcap_remote_loop-0-ex.erf
Binary files differ
diff --git a/scripts/exp/remote_test.cap b/scripts/exp/remote_test.cap
new file mode 100644
index 00000000..05462b28
--- /dev/null
+++ b/scripts/exp/remote_test.cap
Binary files differ
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 76bd6ec7..f9cb3220 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3858,187 +3858,86 @@ int CNodeGenerator::flush_file(dsec_t max_time,
-#if 0
-int CNodeGenerator::flush_file(dsec_t max_time,
- dsec_t d_time,
- bool always,
- CFlowGenListPerThread * thread,
- double &old_offset){
- CGenNode * node;
- #ifdef TREX_SIM
- dsec_t flush_time=now_sec();
- #endif
- dsec_t offset=0.0;
- #ifdef TREX_SIM
- dsec_t n_time;
- #endif
- if (always) {
- offset=old_offset;
- }
- #ifdef TREX_SIM
- uint32_t events=0;
- #endif
- bool done=false;
-
- thread->m_cpu_dp_u.start_work1();
-
- /**
- * if a positive value was given to max time
- * schedule an exit node
- */
- if ( (max_time > 0) && (!always) ) {
- CGenNode *exit_node = thread->create_node();
-
- exit_node->m_type = CGenNode::EXIT_SCHED;
- exit_node->m_time = max_time;
- add_node(exit_node);
- }
-
- while (true) {
-
- node = m_p_queue.top();
- #ifdef TREX_SIM
- n_time = node->m_time + offset;
-
- events++;
+void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+
+ /*repeat and NAT is not supported */
+ if ( node->is_nat_first_state() ) {
+ node->set_nat_wait_state();
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
#endif
-/*#ifdef VALG
- if (events > 1 ) {
- CALLGRIND_START_INSTRUMENTATION;
- }
-#endif*/
-
- thread->m_cpu_dp_u.commit1();
- thread->m_cpu_dp_u.start_work1();
-
- #ifdef TREX_SIM
-
- if ( likely ( m_is_realtime ) ){
- dsec_t dt ;
- thread->m_cpu_dp_u.commit1();
-
- while ( true ) {
- dt = now_sec() - n_time ;
-
- if (dt> (-0.00003)) {
- break;
- }
-
- rte_pause();
- }
- thread->m_cpu_dp_u.start_work1();
+ } else {
+ if ( node->is_nat_wait_state() ) {
+ if (node->is_responder_pkt()) {
+ m_p_queue.pop();
+ /* time out, need to free the flow and remove the association , we didn't get convertion yet*/
+ thread->terminate_nat_flows(node);
+ return;
- /* add offset in case of faliures more than 100usec */
- if ( unlikely( dt > 0.000100 ) ) {
- offset += dt;
- }
- /* update histogram */
- if ( unlikely( events % 16 ) ==0 ) {
- m_realtime_his.Add(dt);
- }
- /* flush evey 10 usec */
- if ( now_sec() - flush_time > 0.00001 ){
- m_v_if->flush_tx_queue();
- flush_time=now_sec();
+ } else {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
}
+ } else {
+ assert(0);
}
- #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ thread->free_last_flow_node( node);
+ } else {
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+}
+void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+ /* flow sync message is a sync point for time */
+ thread->m_cur_time_sec = node->m_time;
- uint8_t type=node->m_type;
+ /* first pop the node */
+ m_p_queue.pop();
- if ( type == CGenNode::STATELESS_PKT ) {
- m_p_queue.pop();
- CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ thread->check_msgs(); /* check messages */
+ m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* if the stream has been deactivated - end */
- if ( unlikely( node_sl->is_mask_for_free() ) ) {
- thread->free_node(node);
- } else {
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
+ thread->free_node(node);
+ exit_scheduler = true;
+ } else {
+ /* schedule for next maintenace */
+ node->m_time += SYNC_TIME_OUT;
+ m_p_queue.push(node);
+ }
- /* count before handle - node might be destroyed */
- #ifdef TREX_SIM
- update_stl_stats(node_sl);
- #endif
+}
- node_sl->handle(thread);
+void CNodeGenerator::handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+ m_p_queue.pop();
+ CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
+ TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
+ cmd->handle(&thread->m_stateless_dp_info);
+ exit_scheduler = cmd->is_quit();
+ thread->free_node((CGenNode *)node_cmd);/* free the node */
+}
- #ifdef TREX_SIM
- if (has_limit_reached()) {
- thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
- }
- #endif
+void CNodeGenerator::handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+ m_p_queue.pop();
- }
-
-
- }else{
- if ( likely( type == CGenNode::FLOW_PKT ) ) {
- /* PKT */
- if ( !(node->is_repeat_flow()) || (always==false)) {
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- if ((node->is_repeat_flow()) && (always==false)) {
- /* Flow is repeated, reschedule it */
- thread->reschedule_flow( node);
- }else{
- /* Flow will not be repeated, so free node */
- thread->free_last_flow_node( node);
- }
- }else{
- node->update_next_pkt_in_flow();
- m_p_queue.push(node);
- }
- }else{
- if ((type == CGenNode::FLOW_FIF)) {
- /* callback to our method */
- m_p_queue.pop();
- if ( always == false) {
- thread->m_cur_time_sec = node->m_time ;
-
- if ( thread->generate_flows_roundrobin(&done) <0){
- break;
- }
- if (!done) {
- node->m_time +=d_time;
- m_p_queue.push(node);
- }else{
- thread->free_node(node);
- }
- }else{
- thread->free_node(node);
- }
-
- }else{
- bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
- if (exit_sccheduler) {
- break;
- }
- }
- }
- }
- }
+ CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
- if ( thread->is_terminated_by_master() ) {
- return (0);
- }
-
- if (!always) {
- old_offset =offset;
- }else{
- // free the left other
- thread->handler_defer_job_flush();
+ /* might have been marked for free */
+ if ( unlikely( node_pcap->is_marked_for_free() ) ) {
+ thread->free_node(node);
+ } else {
+ node_pcap->handle(thread);
}
- return (0);
}
-#endif
-
bool
CNodeGenerator::handle_slow_messages(uint8_t type,
CGenNode * node,
@@ -4048,89 +3947,42 @@ CNodeGenerator::handle_slow_messages(uint8_t type,
/* should we continue after */
bool exit_scheduler = false;
- if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) {
+ switch (type) {
+ case CGenNode::PCAP_PKT:
+ handle_pcap_pkt(node, thread);
+ break;
+
+ case CGenNode::FLOW_DEFER_PORT_RELEASE:
m_p_queue.pop();
thread->handler_defer_job(node);
thread->free_node(node);
+ break;
- } else if (type == CGenNode::FLOW_PKT_NAT) {
- /*repeat and NAT is not supported */
- if ( node->is_nat_first_state() ){
- node->set_nat_wait_state();
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }else{
- if ( node->is_nat_wait_state() ) {
- if (node->is_responder_pkt()) {
- m_p_queue.pop();
- /* time out, need to free the flow and remove the association , we didn't get convertion yet*/
- thread->terminate_nat_flows(node);
- return (exit_scheduler);
-
- }else{
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }
- }else{
- assert(0);
- }
- }
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- thread->free_last_flow_node( node);
- }else{
- node->update_next_pkt_in_flow();
- m_p_queue.push(node);
- }
-
- } else if ( type == CGenNode::FLOW_SYNC ) {
+ case CGenNode::FLOW_PKT_NAT:
+ handle_flow_pkt(node, thread);
+ break;
- /* flow sync message is a sync point for time */
- thread->m_cur_time_sec = node->m_time;
+ case CGenNode::FLOW_SYNC:
+ handle_flow_sync(node, thread, exit_scheduler);
+ break;
- /* first pop the node */
- m_p_queue.pop();
+ case CGenNode::EXIT_SCHED:
+ m_p_queue.pop();
+ thread->free_node(node);
+ exit_scheduler = true;
+ break;
- thread->check_msgs(); /* check messages */
- m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* exit in case this is the last node*/
- if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
- thread->free_node(node);
- exit_scheduler = true;
- } else {
- /* schedule for next maintenace */
- node->m_time += SYNC_TIME_OUT;
- m_p_queue.push(node);
- }
+ case CGenNode::COMMAND:
+ handle_command(node, thread, exit_scheduler);
+ break;
+ default:
+ assert(0);
+ }
- } else if ( type == CGenNode::EXIT_SCHED ) {
- m_p_queue.pop();
- thread->free_node(node);
- exit_scheduler = true;
-
- } else {
- if ( type == CGenNode::COMMAND) {
- m_p_queue.pop();
- CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
- {
- TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
- cmd->handle(&thread->m_stateless_dp_info);
- exit_scheduler = cmd->is_quit();
- thread->free_node((CGenNode *)node_cmd);/* free the node */
- }
- }else{
- printf(" ERROR type is not valid %d \n",type);
- assert(0);
- }
- }
+ return (exit_scheduler);
- return exit_scheduler;
}
@@ -4408,8 +4260,6 @@ void CFlowGenListPerThread::check_msgs(void) {
}
}
-//void delay(int msec);
-
void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
@@ -5123,39 +4973,66 @@ int CErfIFStl::update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p){
}
-int CErfIFStl::send_node(CGenNode * _no_to_use){
-
- if ( m_preview_mode->getFileWrite() ){
-
- CGenNodeStateless * node_sl=(CGenNodeStateless *) _no_to_use;
+int CErfIFStl::send_sl_node(CGenNodeStateless *node_sl) {
+ pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
- pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
-
- rte_mbuf_t * m;
- if ( likely(node_sl->is_cache_mbuf_array()) ) {
- m=node_sl->cache_mbuf_array_get_cur();
- fill_raw_packet(m,_no_to_use,dir);
+ rte_mbuf_t * m;
+ if ( likely(node_sl->is_cache_mbuf_array()) ) {
+ m=node_sl->cache_mbuf_array_get_cur();
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ }else{
+ m=node_sl->get_cache_mbuf();
+ if (m) {
+ /* cache packet */
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ /* can't free the m, it is cached*/
}else{
- m=node_sl->get_cache_mbuf();
- if (m) {
- /* cache packet */
- fill_raw_packet(m,_no_to_use,dir);
- /* can't free the m, it is cached*/
- }else{
-
- m=node_sl->alloc_node_with_vm();
- assert(m);
- fill_raw_packet(m,_no_to_use,dir);
- rte_pktmbuf_free(m);
- }
+ m=node_sl->alloc_node_with_vm();
+ assert(m);
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ rte_pktmbuf_free(m);
}
+ }
/* check that we have mbuf */
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
- int rc = write_pkt(m_raw);
- BP_ASSERT(rc == 0);
+
+int CErfIFStl::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
}
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ fill_raw_packet(m, (CGenNode*)pcap_node, dir);
+ rte_pktmbuf_free(m);
+
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
+
+int CErfIFStl::send_node(CGenNode * _no_to_use){
+
+ if ( m_preview_mode->getFileWrite() ) {
+
+ switch (_no_to_use->m_type) {
+ case CGenNode::STATELESS_PKT:
+ return send_sl_node((CGenNodeStateless *) _no_to_use);
+
+ case CGenNode::PCAP_PKT:
+ return send_pcap_node((CGenNodePCAP *) _no_to_use);
+
+ default:
+ assert(0);
+ }
+ }
return (0);
}
@@ -6523,10 +6400,18 @@ void CGenNodeBase::free_base(){
CGenNodeStateless* p=(CGenNodeStateless*)this;
p->free_stl_node();
return;
+ }
+
+ if (m_type == PCAP_PKT) {
+ CGenNodePCAP *p = (CGenNodePCAP *)this;
+ p->destroy();
+ return;
}
+
if ( m_type == COMMAND ) {
CGenNodeCommand* p=(CGenNodeCommand*)this;
p->free_command();
}
}
+
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 0a7e8bda..bdca7703 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -61,6 +61,8 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
+class CGenNodePCAP;
+
#undef NAT_TRACE_
#define FORCE_NO_INLINE __attribute__ ((noinline))
@@ -1423,7 +1425,9 @@ public:
EXIT_SCHED =6,
COMMAND =7,
- EXIT_PORT_SCHED =8
+ EXIT_PORT_SCHED =8,
+
+ PCAP_PKT =9,
};
@@ -1441,6 +1445,7 @@ public:
NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40,
NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE = 0x80,
NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100, /* init packet start from server side with server addr */
+ NODE_FLAGS_SLOW_PATH = 0x200 /* used by the nodes to differ between fast path nodes and slow path nodes */
};
@@ -1479,6 +1484,17 @@ public:
return ( m_socket_id );
}
+ inline void set_slow_path(bool enable) {
+ if (enable) {
+ m_flags |= NODE_FLAGS_SLOW_PATH;
+ } else {
+ m_flags &= ~NODE_FLAGS_SLOW_PATH;
+ }
+ }
+
+ inline bool get_is_slow_path() const {
+ return ( (m_flags & NODE_FLAGS_SLOW_PATH) ? true : false);
+ }
void free_base();
};
@@ -1581,6 +1597,7 @@ public:
return ( (m_flags &NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE)?true:false );
}
+
/* direction for ip addr */
inline pkt_dir_t cur_pkt_ip_addr_dir();
@@ -1873,6 +1890,9 @@ public:
virtual pkt_dir_t port_id_to_dir(uint8_t port_id);
+private:
+ int send_sl_node(CGenNodeStateless * node_sl);
+ int send_pcap_node(CGenNodePCAP * pcap_node);
};
@@ -2085,7 +2105,11 @@ private:
bool always,
CFlowGenListPerThread * thread,
double &old_offset);
-
+private:
+ void handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler);
+ void handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread);
+ void handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler);
+ void handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread);
public:
pqueue_t m_p_queue;
@@ -3628,10 +3652,14 @@ public :
inline CGenNode * create_node(void);
+
inline CGenNodeStateless * create_node_sl(void){
return ((CGenNodeStateless*)create_node() );
}
+ inline CGenNodePCAP * allocate_pcap_node(void) {
+ return ((CGenNodePCAP*)create_node());
+ }
inline void free_node(CGenNode *p);
inline void free_last_flow_node(CGenNode *p);
@@ -3653,6 +3681,9 @@ public:
bool set_stateless_next_node( CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
+ void stop_stateless_traffic(uint8_t port_id) {
+ m_stateless_dp_info.stop_traffic(port_id, false, 0);
+ }
void Dump(FILE *fd);
void DumpCsv(FILE *fd);
diff --git a/src/common/captureFile.cpp b/src/common/captureFile.cpp
index e73c37ad..4c50bcb2 100755
--- a/src/common/captureFile.cpp
+++ b/src/common/captureFile.cpp
@@ -244,28 +244,23 @@ bool CErfCmp::compare(std::string f1, std::string f2 ){
return (res);
}
-
-
/**
* try to create type by type
* @param name
*
* @return CCapReaderBase*
*/
-CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
+CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops, std::ostream &err)
{
- if (name == NULL) {
- printf("Got null file name\n");
- return NULL;
- }
+ assert(name);
/* make sure we have a file */
FILE * f = CAP_FOPEN_64(name, "rb");
if (f == NULL) {
if (errno == ENOENT) {
- printf("\nERROR: Cap file not found %s\n\n",name);
+ err << "CAP file '" << name << "' not found";
} else {
- printf("\nERROR: Failed to open cap file '%s' with errno %d\n\n", name, errno);
+ err << "failed to open CAP file '" << name << "' with errno " << errno;
}
return NULL;
}
@@ -281,8 +276,7 @@ CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
delete next;
}
- printf("\nERROR: file %s format not supported",name);
- printf("\nERROR: formats supported are LIBPCAP and ERF. other formats are deprecated\n\n");
+ err << "unsupported CAP format (not PCAP or ERF): " << name << "\n";
return NULL;
}
diff --git a/src/common/captureFile.h b/src/common/captureFile.h
index 3be83432..32b98272 100755
--- a/src/common/captureFile.h
+++ b/src/common/captureFile.h
@@ -24,6 +24,8 @@ limitations under the License.
#include <math.h>
#include <stdlib.h>
#include <string>
+#include <iostream>
+
#ifdef WIN32
#pragma warning(disable:4786)
#endif
@@ -201,11 +203,13 @@ public:
* @param name - cature file name
* @param loops - number of loops for the same capture. use 0
* for one time transmition
+ * @param err - IO stream to print error
+ *
* @return CCapReaderBase* - pointer to new instance (allocated
* by the function). the user should release the
* instance once it has no use any more.
*/
- static CCapReaderBase * CreateReader(char * name, int loops = 0);
+ static CCapReaderBase * CreateReader(char * name, int loops = 0, std::ostream &err = std::cout);
private:
diff --git a/src/common/pcap.cpp b/src/common/pcap.cpp
index 9b360a3e..a68fb620 100755
--- a/src/common/pcap.cpp
+++ b/src/common/pcap.cpp
@@ -156,8 +156,7 @@ bool LibPCapReader::ReadPacket(CCapPktRaw *lpPacket)
}
if (pkt_header.len > READER_MAX_PACKET_SIZE) {
/* cannot read this packet */
- printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
- exit(-1);
+ //printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
return false;
}
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 11070f5c..50f8e5ec 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3611,6 +3611,70 @@ TEST_F(basic_stl, vm_split_client_var) {
}
+TEST_F(basic_stl, pcap_remote_basic) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_basic";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 10,
+ 1,
+ 1,
+ -1);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+TEST_F(basic_stl, pcap_remote_loop) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_loop";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 1,
+ 1,
+ 3,
+ -1);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+TEST_F(basic_stl, pcap_remote_duration) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_duration";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 100000,
+ 1,
+ 0,
+ 0.5);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
/********************************************* Itay Tests End *************************************/
class rx_stat_pkt_parse : public testing::Test {
protected:
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 3fa3ca68..092bd133 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -1773,6 +1773,9 @@ protected:
class CCoreEthIFStateless : public CCoreEthIF {
public:
virtual int send_node(CGenNode * node);
+protected:
+ int handle_slow_path_node(CGenNode *node);
+ int send_pcap_node(CGenNodePCAP *pcap_node);
};
bool CCoreEthIF::Create(uint8_t core_id,
@@ -1991,7 +1994,14 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){
-int CCoreEthIFStateless::send_node(CGenNode * no){
+int CCoreEthIFStateless::send_node(CGenNode * no) {
+
+ /* if a node is marked as slow path - single IF to redirect it to slow path */
+ if (no->get_is_slow_path()) {
+ return handle_slow_path_node(no);
+ }
+
+
CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
/* check that we have mbuf */
rte_mbuf_t * m;
@@ -2027,6 +2037,33 @@ int CCoreEthIFStateless::send_node(CGenNode * no){
return (0);
};
+int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
+ }
+
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ CCorePerPort *lp_port=&m_ports[dir];
+ CVirtualIFPerSideStats *lp_stats = &m_stats[dir];
+
+ send_pkt(lp_port, m, lp_stats);
+
+ return (0);
+}
+
+/**
+ * slow path code goes here
+ *
+ */
+int CCoreEthIFStateless::handle_slow_path_node(CGenNode * no) {
+
+ if (no->m_type == CGenNode::PCAP_PKT) {
+ return send_pcap_node((CGenNodePCAP *)no);
+ }
+
+ return (-1);
+}
int CCoreEthIF::send_node(CGenNode * node){
@@ -2649,6 +2686,10 @@ public:
}
int run_in_rx_core();
int run_in_master();
+
+ bool handle_fast_path();
+ bool handle_slow_path(bool &was_stopped);
+
int stop_master();
/* return the minimum number of dp cores needed to support the active ports
this is for c==1 or m_cores_mul==1
@@ -3693,125 +3734,156 @@ CGlobalTRex::publish_async_barrier(uint32_t key) {
m_zmq_publisher.publish_barrier(key);
}
-int CGlobalTRex::run_in_master() {
- bool was_stopped=false;
-
- if ( get_is_stateless() ) {
- m_trex_stateless->launch_control_plane();
- }
-
- /* exception and scope safe */
- std::unique_lock<std::mutex> cp_lock(m_cp_lock);
-
- while ( true ) {
- m_stats_cnt+=1;
+bool
+CGlobalTRex::handle_slow_path(bool &was_stopped) {
+ m_stats_cnt+=1;
- if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){
- if ( m_io_modes.handle_io_modes() ){
- was_stopped=true;
- break;
- }
- }
- if ( sanity_check() ){
- printf(" Test was stopped \n");
+ if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ) {
+ if ( m_io_modes.handle_io_modes() ) {
was_stopped=true;
- break;
+ return false;
}
- if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ }
+
+ if ( sanity_check() ) {
+ printf(" Test was stopped \n");
+ was_stopped=true;
+ return false;
+ }
+ if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ fprintf(stdout,"\033[2J");
+ fprintf(stdout,"\033[2H");
+
+ } else {
+ if ( m_io_modes.m_g_disable_first ) {
+ m_io_modes.m_g_disable_first=false;
fprintf(stdout,"\033[2J");
fprintf(stdout,"\033[2H");
-
- }else{
- if ( m_io_modes.m_g_disable_first ){
- m_io_modes.m_g_disable_first=false;
- fprintf(stdout,"\033[2J");
- fprintf(stdout,"\033[2H");
- printf("clean !!!\n");
- fflush(stdout);
- }
+ printf("clean !!!\n");
+ fflush(stdout);
}
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
- m_io_modes.DumpHelp(stdout);
- }
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
+ m_io_modes.DumpHelp(stdout);
+ }
- dump_stats(stdout,CGlobalStats::dmpTABLE);
+ dump_stats(stdout,CGlobalStats::dmpTABLE);
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
- fprintf (stdout," current time : %.1f sec \n",now_sec());
- float d= CGlobalInfo::m_options.m_duration - now_sec();
- if (d<0) {
- d=0;
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ fprintf (stdout," current time : %.1f sec \n",now_sec());
+ float d= CGlobalInfo::m_options.m_duration - now_sec();
+ if (d<0) {
+ d=0;
- }
- fprintf (stdout," test duration : %.1f sec \n",d);
}
+ fprintf (stdout," test duration : %.1f sec \n",d);
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
- if ( m_stats_cnt%4==0){
- fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
- }
+ if ( m_stats_cnt%4==0) {
+ fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
}
+ }
- if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_mg.update();
+ if ( CGlobalInfo::m_options.is_rx_enabled() ) {
+ m_mg.update();
+
+ if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ switch (m_io_modes.m_l_mode) {
+ case CTrexGlobalIoMode::lDISABLE:
+ fprintf(stdout,"\n+Latency stats disabled \n");
+ break;
+ case CTrexGlobalIoMode::lENABLE:
+ fprintf(stdout,"\n-Latency stats enabled \n");
+ m_mg.DumpShort(stdout);
+ break;
+ case CTrexGlobalIoMode::lENABLE_Extended:
+ fprintf(stdout,"\n-Latency stats extended \n");
+ m_mg.Dump(stdout);
+ break;
+ }
- if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){
- switch (m_io_modes.m_l_mode) {
- case CTrexGlobalIoMode::lDISABLE:
- fprintf(stdout,"\n+Latency stats disabled \n");
+ if ( get_is_rx_check_mode() ) {
+
+ switch (m_io_modes.m_rc_mode) {
+ case CTrexGlobalIoMode::rcDISABLE:
+ fprintf(stdout,"\n+Rx Check stats disabled \n");
break;
- case CTrexGlobalIoMode::lENABLE:
- fprintf(stdout,"\n-Latency stats enabled \n");
- m_mg.DumpShort(stdout);
+ case CTrexGlobalIoMode::rcENABLE:
+ fprintf(stdout,"\n-Rx Check stats enabled \n");
+ m_mg.DumpShortRxCheck(stdout);
break;
- case CTrexGlobalIoMode::lENABLE_Extended:
- fprintf(stdout,"\n-Latency stats extended \n");
- m_mg.Dump(stdout);
+ case CTrexGlobalIoMode::rcENABLE_Extended:
+ fprintf(stdout,"\n-Rx Check stats enhanced \n");
+ m_mg.DumpRxCheck(stdout);
break;
}
- if ( get_is_rx_check_mode() ) {
-
- switch (m_io_modes.m_rc_mode) {
- case CTrexGlobalIoMode::rcDISABLE:
- fprintf(stdout,"\n+Rx Check stats disabled \n");
- break;
- case CTrexGlobalIoMode::rcENABLE:
- fprintf(stdout,"\n-Rx Check stats enabled \n");
- m_mg.DumpShortRxCheck(stdout);
- break;
- case CTrexGlobalIoMode::rcENABLE_Extended:
- fprintf(stdout,"\n-Rx Check stats enhanced \n");
- m_mg.DumpRxCheck(stdout);
- break;
- }
-
- }
-
}
+
}
+ }
+ /* publish data */
+ publish_async_data(false);
+ return true;
+}
- /* publish data */
- publish_async_data(false);
- /* check from messages from DP */
- check_for_dp_messages();
+bool
+CGlobalTRex::handle_fast_path() {
+ /* check from messages from DP */
+ check_for_dp_messages();
- cp_lock.unlock();
- delay(500);
- cp_lock.lock();
+ if ( is_all_cores_finished() ) {
+ return false;
+ }
+
+ return true;
+}
+
+int CGlobalTRex::run_in_master() {
+ bool was_stopped=false;
- if ( is_all_cores_finished() ) {
+ if ( get_is_stateless() ) {
+ m_trex_stateless->launch_control_plane();
+ }
+
+ /* exception and scope safe */
+ std::unique_lock<std::mutex> cp_lock(m_cp_lock);
+
+ uint32_t slow_path_counter = 0;
+
+ const int FASTPATH_DELAY_MS = 20;
+ const int SLOWPATH_DELAY_MS = 500;
+
+ while ( true ) {
+
+ /* fast path */
+ if (!handle_fast_path()) {
break;
}
+
+ /* slow path */
+ if (slow_path_counter >= SLOWPATH_DELAY_MS) {
+ if (!handle_slow_path(was_stopped)) {
+ break;
+ }
+ slow_path_counter = 0;
+ }
+
+
+ cp_lock.unlock();
+ delay(FASTPATH_DELAY_MS);
+ slow_path_counter += FASTPATH_DELAY_MS;
+ cp_lock.lock();
}
/* on exit release the lock */
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index f8843758..1d283478 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -48,6 +48,7 @@ public:
EVENT_PORT_FINISHED_TX = 4,
EVENT_PORT_ACQUIRED = 5,
EVENT_PORT_RELEASED = 6,
+ EVENT_PORT_ERROR = 7,
EVENT_SERVER_STOPPED = 100,
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index f7a23188..27376fe4 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -451,3 +451,32 @@ TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+
+
+/**
+ * push a remote PCAP on a port
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ std::string pcap_filename = parse_string(params, "pcap_filename", result);
+ double ipg_usec = parse_double(params, "ipg_usec", result);
+ double speedup = parse_double(params, "speedup", result);
+ uint32_t count = parse_uint32(params, "count", result);
+ double duration = parse_double(params, "duration", result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->push_remote(pcap_filename, ipg_usec, speedup, count, duration);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 428bdd7b..affa65c1 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -130,5 +130,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass
TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_CLASS_TYPE_CORE);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 924503f2..7104792e 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -65,6 +65,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdRemoveRXFilters());
register_command(new TrexRpcCmdValidate());
+ register_command(new TrexRpcCmdPushRemote());
}
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..360cc7d6 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -240,6 +255,13 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
+bool TrexStatelessPort::is_active() const {
+ return ( (m_port_state == PORT_STATE_TX)
+ || (m_port_state == PORT_STATE_PAUSE)
+ || (m_port_state == PORT_STATE_PCAP_TX)
+ );
+}
+
/**
* stop traffic on port
*
@@ -249,9 +271,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
*/
void
TrexStatelessPort::stop_traffic(void) {
-
- if (!( (m_port_state == PORT_STATE_TX)
- || (m_port_state == PORT_STATE_PAUSE) )) {
+ if (!is_active()) {
return;
}
@@ -395,6 +415,55 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_PCAP_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count,
+ duration);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
@@ -413,6 +482,9 @@ TrexStatelessPort::get_state_as_string() const {
case PORT_STATE_PAUSE:
return "PAUSE";
+
+ case PORT_STATE_PCAP_TX:
+ return "PCAP_TX";
}
return "UNKNOWN";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..8856b429 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -129,11 +129,12 @@ public:
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN = 0x1,
- PORT_STATE_IDLE = 0x2,
- PORT_STATE_STREAMS = 0x4,
- PORT_STATE_TX = 0x8,
- PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_PCAP_TX = 0x20,
};
/**
@@ -212,6 +213,16 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+ /**
* get the port state
*
*/
@@ -220,6 +231,12 @@ public:
}
/**
+ * return true if the port is active
+ * (paused is considered active)
+ */
+ bool is_active() const;
+
+ /**
* port state as string
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 13bf5a5d..c5963625 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -358,6 +358,51 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+ socket_id_t socket_id = m_core->m_node_gen.m_socket_id;
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id,
+ dir,
+ socket_id,
+ mac_addr,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ /* hold a pointer to the node */
+ assert(m_active_pcap_node == NULL);
+ m_active_pcap_node = pcap_node;
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -390,6 +435,19 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
}
}
+ /* check for active PCAP node */
+ if (m_active_pcap_node) {
+ /* when got async stop from outside or duration */
+ if (m_active_pcap_node->is_active()) {
+ m_active_pcap_node->mark_for_free();
+ } else {
+ /* graceful stop - node was put out by the scheduler */
+ m_core->free_node( (CGenNode *)m_active_pcap_node);
+ }
+
+ m_active_pcap_node = NULL;
+ }
+
/* active stream should be zero */
assert(m_active_streams==0);
m_active_nodes.clear();
@@ -401,9 +459,9 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
+ m_active_pcap_node = NULL;
}
@@ -709,6 +767,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
node->cache_mbuf_array_init();
@@ -977,6 +1036,42 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+
+ if (duration > 0.0) {
+ add_port_duration(duration, port_id, event_id);
+ }
+
+ m_state = TrexStatelessDpCore::STATE_PCAP_TX;
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -995,21 +1090,10 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
-
if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
- /* nothing to do ! already stopped */
- //printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
- /* inform the control plane we stopped - this might be a async stop
- (streams ended)
- */
- #if 0
- if ( are_all_ports_idle() ) {
- /* just a place holder if we will need to do somthing in that case */
- }
- #endif
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
@@ -1038,3 +1122,87 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+ m_count = count;
+
+ /* mark this node as slow path */
+ set_slow_path(true);
+
+ if (ipg_usec != -1) {
+ /* fixed IPG */
+ m_ipg_sec = usec_to_sec(ipg_usec / speedup);
+ m_speedup = 0;
+ } else {
+ /* packet IPG */
+ m_ipg_sec = -1;
+ m_speedup = speedup;
+ }
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+ set_socket_id(socket_id);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+/**
+ * cleanup for PCAP node
+ *
+ * @author imarom (08-May-16)
+ */
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index bdf84cfd..af2187ae 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -33,7 +33,7 @@ class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
-
+class CGenNodePCAP;
class CDpOneStream {
public:
@@ -54,7 +54,8 @@ public:
enum state_e {
ppSTATE_IDLE,
ppSTATE_TRANSMITTING,
- ppSTATE_PAUSE
+ ppSTATE_PAUSE,
+ ppSTATE_PCAP_TX,
};
@@ -70,6 +71,12 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,11 +98,11 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CGenNodePCAP *m_active_pcap_node;
CFlowGenListPerThread * m_core ;
int m_event_id;
};
@@ -113,6 +120,7 @@ public:
enum state_e {
STATE_IDLE,
STATE_TRANSMITTING,
+ STATE_PCAP_TX,
STATE_TERMINATE
};
@@ -151,7 +159,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -163,6 +171,19 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index e9e5cf5b..b5395e78 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -448,6 +450,176 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+ /**
+ * destroy the node cleaning up any data
+ *
+ */
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(is_active());
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_count--;
+
+ /* if its the end - go home... */
+ if (m_count == 0) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+
+ /* rewind and load the first packet */
+ m_reader->Rewind();
+ if (!m_reader->ReadPacket(m_raw_packet)) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+ }
+
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+
+ /* fixed IPG */
+ if (m_ipg_sec != -1) {
+ return m_ipg_sec;
+ } else {
+ return ((m_raw_packet->get_time() - m_last_pkt_time) / m_speedup);
+ }
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ // read the next packet
+ next();
+
+ if (is_active()) {
+ m_time += get_ipg();
+ thread->m_node_gen.m_p_queue.push((CGenNode *)this);
+
+ } else {
+ thread->stop_stateless_traffic(get_port_id());
+ }
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ void mark_for_free() {
+ m_state = PCAP_MARKED_FOR_FREE;
+ }
+
+ bool is_active() {
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ bool is_marked_for_free() {
+ return (m_state == PCAP_MARKED_FOR_FREE);
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_INACTIVE,
+ PCAP_MARKED_FOR_FREE
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ double m_last_pkt_time;
+ double m_speedup;
+ double m_ipg_sec;
+ uint32_t m_count;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[33];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..1cbacb6f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,36 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +233,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..9b1f2e31 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,43 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_ipg_usec = ipg_usec;
+ m_speedup = speedup;
+ m_count = count;
+ m_duration = duration;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ int m_event_id;
+ double m_ipg_usec;
+ double m_speedup;
+ double m_duration;
+ uint32_t m_count;
+ uint8_t m_port_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +304,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +341,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +362,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};