summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux_dpdk/ws_main.py5
-rw-r--r--scripts/automation/trex_control_plane/stl/console/trex_capture.py574
-rwxr-xr-xscripts/automation/trex_control_plane/stl/console/trex_console.py57
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py211
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py69
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py1
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py16
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py98
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py16
-rw-r--r--src/main_dpdk.cpp3
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp280
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h22
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp3
-rw-r--r--src/stateless/common/trex_stateless_pkt.cpp182
-rw-r--r--src/stateless/common/trex_stateless_pkt.h201
-rw-r--r--src/stateless/cp/trex_stateless.cpp56
-rw-r--r--src/stateless/cp/trex_stateless.h28
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp28
-rw-r--r--src/stateless/cp/trex_stateless_port.h100
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp61
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h83
-rw-r--r--src/stateless/rx/trex_stateless_capture.cpp258
-rw-r--r--src/stateless/rx/trex_stateless_capture.h397
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp67
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h21
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h2
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp227
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h149
28 files changed, 2454 insertions, 761 deletions
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py
index 29588622..4273dfe7 100755
--- a/linux_dpdk/ws_main.py
+++ b/linux_dpdk/ws_main.py
@@ -297,7 +297,9 @@ stateless_src = SrcGroup(dir='src/stateless/',
'dp/trex_stateless_dp_core.cpp',
'messaging/trex_stateless_messaging.cpp',
'rx/trex_stateless_rx_core.cpp',
- 'rx/trex_stateless_rx_port_mngr.cpp'
+ 'rx/trex_stateless_rx_port_mngr.cpp',
+ 'rx/trex_stateless_capture.cpp',
+ 'common/trex_stateless_pkt.cpp',
])
# JSON package
json_src = SrcGroup(dir='external_libs/json',
@@ -571,6 +573,7 @@ includes_path =''' ../src/pal/linux_dpdk/
../src/stateless/cp/
../src/stateless/dp/
../src/stateless/rx/
+ ../src/stateless/common/
../src/stateless/messaging/
../external_libs/yaml-cpp/include/
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py
new file mode 100644
index 00000000..dfd7f0a4
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py
@@ -0,0 +1,574 @@
+from trex_stl_lib.api import *
+from trex_stl_lib.utils import parsing_opts, text_tables
+import threading
+import tempfile
+import select
+
+class CaptureMonitorWriter(object):
+ def init (self, start_ts):
+ raise NotImplementedError
+
+ def deinit(self):
+ raise NotImplementedError
+
+ def handle_pkts (self, pkts):
+ raise NotImplementedError
+
+ def periodic_check (self):
+ raise NotImplementedError
+
+
+class CaptureMonitorWriterStdout(CaptureMonitorWriter):
+ def __init__ (self, logger, is_brief):
+ self.logger = logger
+ self.is_brief = is_brief
+
+ self.RX_ARROW = u'\u25c0\u2500\u2500'
+ self.TX_ARROW = u'\u25b6\u2500\u2500'
+
+ def init (self, start_ts):
+ self.start_ts = start_ts
+
+ self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high'))
+ self.logger.post_cmd(RC_OK)
+
+ self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold'))
+
+
+ def deinit (self):
+ pass
+
+
+ def periodic_check (self):
+ return RC_OK()
+
+ def handle_pkts (self, pkts):
+ byte_count = 0
+
+ for pkt in pkts:
+ byte_count += self.__handle_pkt(pkt)
+
+ self.logger.prompt_redraw()
+ return RC_OK(byte_count)
+
+
+ def get_scapy_name (self, pkt_scapy):
+ layer = pkt_scapy
+ while layer.payload and layer.payload.name not in('Padding', 'Raw'):
+ layer = layer.payload
+
+ return layer.name
+
+
+ def format_origin (self, origin):
+ if origin == 'RX':
+ return u'{0} {1}'.format(self.RX_ARROW, 'RX')
+ elif origin == 'TX':
+ return u'{0} {1}'.format(self.TX_ARROW, 'TX')
+ else:
+ return '{0}'.format(origin)
+
+
+ def __handle_pkt (self, pkt):
+ pkt_bin = base64.b64decode(pkt['binary'])
+
+ pkt_scapy = Ether(pkt_bin)
+ self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', ''))
+ self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold'))
+
+
+ if self.is_brief:
+ self.logger.log(' {0}'.format(pkt_scapy.command()))
+ else:
+ pkt_scapy.show(label_lvl = ' ')
+ self.logger.log('')
+
+ return len(pkt_bin)
+
+#
+class CaptureMonitorWriterPipe(CaptureMonitorWriter):
+ def __init__ (self, logger):
+ self.logger = logger
+
+ def init (self, start_ts):
+ self.start_ts = start_ts
+ self.fifo_name = tempfile.mktemp()
+
+ try:
+ self.logger.pre_cmd('Starting pipe capture monitor')
+ os.mkfifo(self.fifo_name)
+ self.logger.post_cmd(RC_OK)
+
+ self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold'))
+
+ self.logger.pre_cmd("Waiting for Wireshark pipe connection")
+ self.fifo = os.open(self.fifo_name, os.O_WRONLY)
+ self.logger.post_cmd(RC_OK())
+
+ self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold'))
+
+ self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True)
+ self.writer._write_header(None)
+
+ # register a poller
+ self.poll = select.poll()
+ self.poll.register(self.fifo, select.EPOLLERR)
+
+ except KeyboardInterrupt as e:
+ self.logger.post_cmd(RC_ERR(""))
+ raise STLError("*** pipe monitor aborted...cleaning up")
+
+ except OSError as e:
+ self.logger.post_cmd(RC_ERR(""))
+ raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e)))
+
+
+ def deinit (self):
+ try:
+ os.unlink(self.fifo_name)
+ except OSError:
+ pass
+
+
+ def periodic_check (self):
+ return self.check_pipe()
+
+
+ def check_pipe (self):
+ if self.poll.poll(0):
+ return RC_ERR('*** pipe has been disconnected - aborting monitoring ***')
+
+ return RC_OK()
+
+
+ def handle_pkts (self, pkts):
+ rc = self.check_pipe()
+ if not rc:
+ return rc
+
+ byte_count = 0
+
+ for pkt in pkts:
+ pkt_bin = base64.b64decode(pkt['binary'])
+ ts = pkt['ts']
+ sec = int(ts)
+ usec = int( (ts - sec) * 1e6 )
+
+ try:
+ self.writer._write_packet(pkt_bin, sec = sec, usec = usec)
+ except IOError:
+ return RC_ERR("*** failed to write packet to pipe ***")
+
+ byte_count += len(pkt_bin)
+
+ return RC_OK(byte_count)
+
+
+class CaptureMonitor(object):
+ def __init__ (self, client, cmd_lock):
+ self.client = client
+ self.cmd_lock = cmd_lock
+ self.active = False
+ self.capture_id = None
+ self.logger = client.logger
+ self.writer = None
+
+ def is_active (self):
+ return self.active
+
+
+ def get_capture_id (self):
+ return self.capture_id
+
+
+ def start (self, tx_port_list, rx_port_list, rate_pps, mon_type):
+ try:
+ self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type)
+ except Exception as e:
+ self.__stop()
+ raise e
+
+ def start_internal (self, tx_port_list, rx_port_list, rate_pps, mon_type):
+ # stop any previous monitors
+ if self.active:
+ self.stop()
+
+ self.tx_port_list = tx_port_list
+ self.rx_port_list = rx_port_list
+
+ if mon_type == 'compact':
+ self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True)
+ elif mon_type == 'verbose':
+ self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False)
+ elif mon_type == 'pipe':
+ self.writer = CaptureMonitorWriterPipe(self.logger)
+ else:
+ raise STLError('unknown writer type')
+
+
+ with self.logger.supress():
+ data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps)
+
+ self.capture_id = data['id']
+ self.start_ts = data['ts']
+
+ self.writer.init(self.start_ts)
+
+
+ self.t = threading.Thread(target = self.__thread_cb)
+ self.t.setDaemon(True)
+
+ try:
+ self.active = True
+ self.t.start()
+ except Exception as e:
+ self.active = False
+ self.stop()
+ raise e
+
+ # entry point stop
+ def stop (self):
+
+ if self.active:
+ self.stop_logged()
+ else:
+ self.__stop()
+
+ # wraps stop with a logging
+ def stop_logged (self):
+ self.logger.pre_cmd("Stopping capture monitor")
+
+ try:
+ self.__stop()
+ except Exception as e:
+ self.logger.post_cmd(RC_ERR(""))
+ raise e
+
+ self.logger.post_cmd(RC_OK())
+
+ # internal stop
+ def __stop (self):
+
+ # shutdown thread
+ if self.active:
+ self.active = False
+ self.t.join()
+
+ # deinit the writer
+ if self.writer is not None:
+ self.writer.deinit()
+ self.writer = None
+
+ # cleanup capture ID if possible
+ if self.capture_id is None:
+ return
+
+ capture_id = self.capture_id
+ self.capture_id = None
+
+ # if we are disconnected - we cannot cleanup the capture
+ if not self.client.is_connected():
+ return
+
+ try:
+ captures = [x['id'] for x in self.client.get_capture_status()]
+ if capture_id not in captures:
+ return
+
+ with self.logger.supress():
+ self.client.stop_capture(capture_id)
+
+ except STLError as e:
+ self.logger.post_cmd(RC_ERR(""))
+ raise e
+
+
+ def get_mon_row (self):
+ if not self.is_active():
+ return None
+
+ return [self.capture_id,
+ self.pkt_count,
+ format_num(self.byte_count, suffix = 'B'),
+ ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'),
+ ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-')
+ ]
+
+
+ # sleeps with high freq checks for active
+ def __sleep (self):
+ for _ in range(5):
+ if not self.active:
+ return False
+
+ time.sleep(0.1)
+
+ return True
+
+ def __lock (self):
+ while True:
+ rc = self.cmd_lock.acquire(False)
+ if rc:
+ return True
+
+ if not self.active:
+ return False
+ time.sleep(0.1)
+
+ def __unlock (self):
+ self.cmd_lock.release()
+
+
+ def __thread_cb (self):
+ try:
+ rc = self.__thread_main_loop()
+ finally:
+ pass
+
+ if not rc:
+ self.logger.log(str(rc))
+ self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold'))
+ self.logger.prompt_redraw()
+
+
+ def __thread_main_loop (self):
+ self.pkt_count = 0
+ self.byte_count = 0
+
+ while self.active:
+
+ # sleep
+ if not self.__sleep():
+ break
+
+ # check that the writer is ok
+ rc = self.writer.periodic_check()
+ if not rc:
+ return rc
+
+ # try to lock
+ if not self.__lock():
+ break
+
+ try:
+ if not self.client.is_connected():
+ return RC_ERR('*** client has been disconnected, aborting monitoring ***')
+ rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10})
+ if not rc:
+ return rc
+
+ finally:
+ self.__unlock()
+
+
+ pkts = rc.data()['pkts']
+ if not pkts:
+ continue
+
+ rc = self.writer.handle_pkts(pkts)
+ if not rc:
+ return rc
+
+ self.pkt_count += len(pkts)
+ self.byte_count += rc.data()
+
+ # graceful shutdown
+ return RC_OK()
+
+
+
+# main class
+class CaptureManager(object):
+ def __init__ (self, client, cmd_lock):
+ self.c = client
+ self.cmd_lock = cmd_lock
+ self.monitor = CaptureMonitor(client, cmd_lock)
+ self.logger = client.logger
+
+ # install parsers
+
+ self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__)
+ self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands")
+
+ self.install_record_parser()
+ self.install_monitor_parser()
+
+ # show
+ self.show_parser = self.subparsers.add_parser('show', help = "show all active captures")
+
+ # reset
+ self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures")
+
+ # register handlers
+ self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show}
+
+
+ def install_record_parser (self):
+ # recording
+ self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording")
+ record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd')
+ self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture")
+ self.record_stop_parser = record_sub.add_parser('stop', help = "stops an active buffered capture")
+
+ # start
+ self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
+ parsing_opts.RX_PORT_LIST,
+ parsing_opts.LIMIT)
+
+ # stop
+ self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID,
+ parsing_opts.OUTPUT_FILENAME)
+
+
+
+ def install_monitor_parser (self):
+ # monitor
+ self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring')
+ monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd')
+ self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor')
+ self.monitor_stop_parser = monitor_sub.add_parser('stop', help = 'stops an active monitor')
+
+ self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
+ parsing_opts.RX_PORT_LIST,
+ parsing_opts.MONITOR_TYPE)
+
+
+
+ def stop (self):
+ self.monitor.stop()
+
+
+ # main entry point for parsing commands from console
+ def parse_line (self, line):
+ try:
+ self.parse_line_internal(line)
+ except STLError as e:
+ self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold'))
+ return RC_ERR(e.brief())
+
+
+ def parse_line_internal (self, line):
+ '''Manage PCAP recorders'''
+
+ # default
+ if not line:
+ line = "show"
+
+ opts = self.parser.parse_args(line.split())
+ if not opts:
+ return opts
+
+ # call the handler
+ self.cmds[opts.commands](opts)
+
+
+ # record methods
+ def parse_record (self, opts):
+ if opts.record_cmd == 'start':
+ self.parse_record_start(opts)
+ elif opts.record_cmd == 'stop':
+ self.parse_record_stop(opts)
+ else:
+ self.record_parser.formatted_error("too few arguments")
+
+
+ def parse_record_start (self, opts):
+ if not opts.tx_port_list and not opts.rx_port_list:
+ self.record_start_parser.formatted_error('please provide either --tx or --rx')
+ return
+
+ rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+
+ self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold'))
+ self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold'))
+
+
+ def parse_record_stop (self, opts):
+ captures = self.c.get_capture_status()
+ ids = [c['id'] for c in captures]
+
+ if opts.capture_id == self.monitor.get_capture_id():
+ self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id))
+ return
+
+ if opts.capture_id not in ids:
+ self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id))
+ return
+
+ self.c.stop_capture(opts.capture_id, opts.output_filename)
+
+
+ # monitor methods
+ def parse_monitor (self, opts):
+ if opts.mon_cmd == 'start':
+ self.parse_monitor_start(opts)
+ elif opts.mon_cmd == 'stop':
+ self.parse_monitor_stop(opts)
+ else:
+ self.monitor_parser.formatted_error("too few arguments")
+
+
+ def parse_monitor_start (self, opts):
+ mon_type = 'compact'
+
+ if opts.verbose:
+ mon_type = 'verbose'
+ elif opts.pipe:
+ mon_type = 'pipe'
+
+ if not opts.tx_port_list and not opts.rx_port_list:
+ self.monitor_start_parser.formatted_error('please provide either --tx or --rx')
+ return
+
+ self.monitor.stop()
+ self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type)
+
+ def parse_monitor_stop (self, opts):
+ self.monitor.stop()
+
+ def parse_clear (self, opts):
+ self.monitor.stop()
+ self.c.remove_all_captures()
+
+
+
+ def parse_show (self, opts):
+ data = self.c.get_capture_status()
+
+ # captures
+ cap_table = text_tables.TRexTextTable()
+ cap_table.set_cols_align(["c"] * 6)
+ cap_table.set_cols_width([15] * 6)
+
+ # monitor
+ mon_table = text_tables.TRexTextTable()
+ mon_table.set_cols_align(["c"] * 5)
+ mon_table.set_cols_width([15] * 5)
+
+ for elem in data:
+ id = elem['id']
+
+ if self.monitor.get_capture_id() == id:
+ row = self.monitor.get_mon_row()
+ mon_table.add_rows([row], header=False)
+
+ else:
+ row = [id,
+ format_text(elem['state'], 'bold'),
+ '[{0}/{1}]'.format(elem['count'], elem['limit']),
+ format_num(elem['bytes'], suffix = 'B'),
+ bitfield_to_str(elem['filter']['tx']),
+ bitfield_to_str(elem['filter']['rx'])]
+
+ cap_table.add_rows([row], header=False)
+
+ cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports'])
+ mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports'])
+
+ if cap_table._rows:
+ text_tables.print_table_with_header(cap_table, '\nActive Recorders')
+
+ if mon_table._rows:
+ text_tables.print_table_with_header(mon_table, '\nActive Monitor')
+
+
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py
index c9956472..83f36820 100755
--- a/scripts/automation/trex_control_plane/stl/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py
@@ -29,6 +29,8 @@ import string
import os
import sys
import tty, termios
+from threading import Lock
+import threading
try:
import stl_path
@@ -39,6 +41,7 @@ from trex_stl_lib.api import *
from trex_stl_lib.utils.text_opts import *
from trex_stl_lib.utils.common import user_input, get_current_user
from trex_stl_lib.utils import parsing_opts
+from .trex_capture import CaptureManager
try:
import trex_tui
@@ -172,6 +175,8 @@ class TRexConsole(TRexGeneralCmd):
def __init__(self, stateless_client, verbose = False):
+ self.cmd_lock = Lock()
+
self.stateless_client = stateless_client
TRexGeneralCmd.__init__(self)
@@ -184,8 +189,11 @@ class TRexConsole(TRexGeneralCmd):
self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__)
self.intro += "\nType 'help' or '?' for supported actions\n"
+ self.cap_mngr = CaptureManager(stateless_client, self.cmd_lock)
+
self.postcmd(False, "")
+
################### internal section ########################
@@ -231,6 +239,7 @@ class TRexConsole(TRexGeneralCmd):
lines = line.split(';')
try:
+ self.cmd_lock.acquire()
for line in lines:
stop = self.onecmd(line)
stop = self.postcmd(stop, line)
@@ -238,10 +247,15 @@ class TRexConsole(TRexGeneralCmd):
return "quit"
return ""
+
except STLError as e:
print(e)
return ''
+ finally:
+ self.cmd_lock.release()
+
+
def postcmd(self, stop, line):
self.prompt = self.stateless_client.generate_prompt(prefix = 'trex')
@@ -347,12 +361,12 @@ class TRexConsole(TRexGeneralCmd):
@verify_connected
- def do_set_rx_sniffer (self, line):
- '''Sets a port sniffer on RX channel as PCAP recorder'''
- self.stateless_client.set_rx_sniffer_line(line)
+ def do_capture (self, line):
+ '''Manage PCAP captures'''
+ self.cap_mngr.parse_line(line)
- def help_sniffer (self):
- self.do_set_rx_sniffer("-h")
+ def help_capture (self):
+ self.do_capture("-h")
@verify_connected
def do_resolve (self, line):
@@ -443,7 +457,9 @@ class TRexConsole(TRexGeneralCmd):
def do_disconnect (self, line):
'''Disconnect from the server\n'''
-
+
+ # stop any monitors before disconnecting
+ self.cap_mngr.stop()
self.stateless_client.disconnect_line(line)
@@ -688,19 +704,24 @@ class TRexConsole(TRexGeneralCmd):
l=help.splitlines()
print("{:<30} {:<30}".format(cmd + " - ",l[0] ))
+
# a custorm cmdloop wrapper
def start(self):
- while True:
- try:
- self.cmdloop()
- break
- except KeyboardInterrupt as e:
- if not readline.get_line_buffer():
- raise KeyboardInterrupt
- else:
- print("")
- self.intro = None
- continue
+ try:
+ while True:
+ try:
+ self.cmdloop()
+ break
+ except KeyboardInterrupt as e:
+ if not readline.get_line_buffer():
+ raise KeyboardInterrupt
+ else:
+ print("")
+ self.intro = None
+ continue
+
+ finally:
+ self.cap_mngr.stop()
if self.terminal:
self.terminal.kill()
@@ -933,6 +954,8 @@ def main():
with stateless_client.logger.supress():
stateless_client.disconnect(stop_traffic = False)
+
+
if __name__ == '__main__':
main()
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 21ae42f1..f7432107 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -17,15 +17,18 @@ from .utils.text_opts import *
from functools import wraps
from texttable import ansi_len
+
from collections import namedtuple
from yaml import YAMLError
import time
import datetime
+import threading
import re
import random
import json
import traceback
import os.path
+import argparse
############################ logger #############################
############################ #############################
@@ -600,12 +603,10 @@ class STLClient(object):
self.util_stats,
self.xstats,
self.async_client.monitor)
-
-
-
-
+
############# private functions - used by the class itself ###########
+
# some preprocessing for port argument
def __ports (self, port_id_list):
@@ -832,27 +833,6 @@ class STLClient(object):
return rc
- def __set_rx_sniffer (self, port_id_list, base_filename, limit):
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- head, tail = os.path.splitext(base_filename)
- filename = "{0}-{1}{2}".format(head, port_id, tail)
- rc.add(self.ports[port_id].set_rx_sniffer(filename, limit))
-
- return rc
-
-
- def __remove_rx_sniffer (self, port_id_list):
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].remove_rx_sniffer())
-
- return rc
-
def __set_rx_queue (self, port_id_list, size):
port_id_list = self.__ports(port_id_list)
rc = RC()
@@ -1071,7 +1051,7 @@ class STLClient(object):
############ functions used by other classes but not users ##############
- def _validate_port_list (self, port_id_list):
+ def _validate_port_list (self, port_id_list, allow_empty = False):
# listfiy single int
if isinstance(port_id_list, int):
port_id_list = [port_id_list]
@@ -1080,7 +1060,7 @@ class STLClient(object):
if not isinstance(port_id_list, list):
raise STLTypeError('port_id_list', type(port_id_list), list)
- if not port_id_list:
+ if not port_id_list and not allow_empty:
raise STLError('No ports provided')
valid_ports = self.get_all_ports()
@@ -2084,9 +2064,9 @@ class STLClient(object):
self.set_port_attr(ports,
promiscuous = False,
link_up = True if restart else None)
- self.set_service_mode(ports, False)
- self.remove_rx_sniffer(ports)
self.remove_rx_queue(ports)
+ self.set_service_mode(ports, False)
+
except STLError as e:
self.logger.post_cmd(False)
@@ -2996,7 +2976,7 @@ class STLClient(object):
Resolves ports (ARP resolution)
:parameters:
- ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ ports - which ports to resolve
retries - how many times to retry on each port (intervals of 100 milliseconds)
verbose - log for each request the response
:raises:
@@ -3025,57 +3005,166 @@ class STLClient(object):
@__api_check(True)
- def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000):
+ def start_capture (self, tx_ports, rx_ports, limit = 1000):
"""
- Sets a RX sniffer for port(s) written to a PCAP file
+ Starts a capture to PCAP on port(s)
:parameters:
- ports - for which ports to apply a unique sniffer (each port gets a unique file)
- base_filename - filename will be appended with '-<port_number>', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc.
+ tx_ports - on which ports to capture TX
+ rx_ports - on which ports to capture RX
limit - limit how many packets will be written
+
+ :returns:
+ returns a dictionary containing
+ {'id: <new_id>, 'ts': <starting timestamp>}
+
:raises:
+ :exe:'STLError'
"""
- ports = ports if ports is not None else self.get_acquired_ports()
- ports = self._validate_port_list(ports)
-
+
+ tx_ports = self._validate_port_list(tx_ports, allow_empty = True)
+ rx_ports = self._validate_port_list(rx_ports, allow_empty = True)
+ merge_ports = set(tx_ports + rx_ports)
+
+ if not merge_ports:
+ raise STLError("start_capture - must get at least one port to capture")
+
# check arguments
- validate_type('base_filename', base_filename, basestring)
validate_type('limit', limit, (int))
if limit <= 0:
raise STLError("'limit' must be a positive value")
- self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports))
- rc = self.__set_rx_sniffer(ports, base_filename, limit)
+ non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports())
+ if non_service_ports:
+ raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode".format(non_service_ports))
+
+
+ self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit))
+
+ rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
self.logger.post_cmd(rc)
if not rc:
raise STLError(rc)
+ return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']}
+
+
+ def __fetch_capture_packets (self, capture_id, output_filename, pkt_count):
+ self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename))
+
+ # create a PCAP file
+ writer = RawPcapWriter(output_filename, linktype = 1)
+ writer._write_header(None)
+
+ # fetch
+ pending = pkt_count
+ rc = RC_OK()
+ while pending > 0:
+ rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50})
+ if not rc:
+ self.logger.post_cmd(rc)
+ raise STLError(rc)
+
+ pkts = rc.data()['pkts']
+ pending = rc.data()['pending']
+ start_ts = rc.data()['start_ts']
+
+ for pkt in pkts:
+ ts = pkt['ts'] - start_ts
+ ts_sec = int(ts)
+ ts_usec = int( (ts - ts_sec) * 1e6 )
+
+ pkt_bin = base64.b64decode(pkt['binary'])
+ writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec)
+
+
+
+
+ self.logger.post_cmd(rc)
+
+
@__api_check(True)
- def remove_rx_sniffer (self, ports = None):
+ def stop_capture (self, capture_id, output_filename = None):
"""
- Removes RX sniffer from port(s)
+ Stops an active capture
+
+ :parameters:
+ capture_id - an active capture ID to stop
+ output_filename - output filename to save capture
:raises:
+ :exe:'STLError'
"""
- ports = ports if ports is not None else self.get_acquired_ports()
- ports = self._validate_port_list(ports)
-
- self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports))
- rc = self.__remove_rx_sniffer(ports)
+
+ # stopping a capture requires:
+ # 1. stopping
+ # 2. fetching
+ # 3. saving to file
+
+ # stop
+
+ self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id))
+ rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id})
self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+
+ # pkt count
+ pkt_count = rc.data()['pkt_count']
+
+ # fetch packets
+ if output_filename:
+ self.__fetch_capture_packets(capture_id, output_filename, pkt_count)
+
+ # remove
+ self.logger.pre_cmd("Removing PCAP capture {0} from server".format(capture_id))
+ rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id})
+ self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+
+
+ @__api_check(True)
+ def get_capture_status (self):
+ """
+ returns a list of all active captures
+ each element in the list is an object containing
+ info about the capture
+
+ """
+
+ rc = self._transmit("capture", params = {'command': 'status'})
if not rc:
raise STLError(rc)
-
+ return rc.data()
+
+ @__api_check(True)
+ def remove_all_captures (self):
+ """
+ Removes any existing captures
+ """
+ captures = self.get_capture_status()
+
+ self.logger.pre_cmd("Removing all PCAP captures from server")
+
+ for c in captures:
+ # remove
+ rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': c['id']})
+ if not rc:
+ raise STLError(rc)
+
+ self.logger.post_cmd(RC_OK())
+
+
+
@__api_check(True)
def set_rx_queue (self, ports = None, size = 1000):
"""
@@ -3196,6 +3285,7 @@ class STLClient(object):
return wrap
+
@__console
def ping_line (self, line):
'''pings the server / specific IP'''
@@ -3789,30 +3879,10 @@ class STLClient(object):
opts.link,
opts.led,
opts.flow_ctrl)
-
+
+
-
- @__console
- def set_rx_sniffer_line (self, line):
- '''Sets a port sniffer on RX channel in form of a PCAP file'''
-
- parser = parsing_opts.gen_parser(self,
- "set_rx_sniffer",
- self.set_rx_sniffer_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.OUTPUT_FILENAME,
- parsing_opts.LIMIT)
-
- opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
- if not opts:
- return opts
-
- self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit)
-
- return RC_OK()
-
-
@__console
def resolve_line (self, line):
'''Performs a port ARP resolution'''
@@ -4010,3 +4080,4 @@ class STLClient(object):
self.set_service_mode(ports = opts.ports, enabled = opts.enabled)
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index a9509ee9..31d752af 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -56,7 +56,8 @@ class Port(object):
def __init__ (self, port_id, user, comm_link, session_id, info):
self.port_id = port_id
- self.state = self.STATE_IDLE
+ self.state = self.STATE_IDLE
+ self.service_mode = False
self.handler = None
self.comm_link = comm_link
@@ -247,14 +248,16 @@ class Port(object):
raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
self.owner = rc.data()['owner']
-
+
self.next_available_id = int(rc.data()['max_stream_id']) + 1
self.status = rc.data()
-
+
# replace the attributes in a thread safe manner
self.set_ts_attr(rc.data()['attr'])
-
+
+ self.service_mode = rc.data()['service']
+
return self.ok()
@@ -490,33 +493,17 @@ class Port(object):
@owned
- def set_rx_sniffer (self, pcap_filename, limit):
+ def start_capture (self, pcap_filename, mode, limit):
- if not self.is_service_mode_on():
+ if mode != 'tx' and not self.is_service_mode_on():
return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode')
params = {"handler": self.handler,
"port_id": self.port_id,
- "type": "capture",
- "enabled": True,
- "pcap_filename": pcap_filename,
+ "mode": mode,
"limit": limit}
- rc = self.transmit("set_rx_feature", params)
- if rc.bad():
- return self.err(rc.err())
-
- return self.ok()
-
-
- @owned
- def remove_rx_sniffer (self):
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "type": "capture",
- "enabled": False}
-
- rc = self.transmit("set_rx_feature", params)
+ rc = self.transmit("start_capture", params)
if rc.bad():
return self.err(rc.err())
@@ -719,23 +706,21 @@ class Port(object):
@owned
def set_service_mode (self, enabled):
- rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw')
- if not rc:
- return rc
-
- if not enabled:
- rc = self.remove_rx_queue()
- if not rc:
- return rc
-
- rc = self.remove_rx_sniffer()
- if not rc:
- return rc
-
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "enabled": enabled}
+
+ rc = self.transmit("service", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.service_mode = enabled
return self.ok()
+
def is_service_mode_on (self):
- return self.get_rx_filter_mode() == 'all'
+ return self.service_mode
+
@writeable
def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec):
@@ -902,11 +887,6 @@ class Port(object):
# RX info
rx_info = self.status['rx_info']
- # RX sniffer
- sniffer = rx_info['sniffer']
- info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
-
-
# RX queue
queue = rx_info['queue']
info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
@@ -928,8 +908,6 @@ class Port(object):
def get_layer_cfg (self):
return self.__attr['layer_cfg']
- def get_rx_filter_mode (self):
- return self.__attr['rx_filter_mode']
def is_virtual(self):
return self.info.get('is_virtual')
@@ -1005,7 +983,6 @@ class Port(object):
"layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'),
"RX Filter Mode": info['rx_filter_mode'],
"RX Queueing": info['rx_queue'],
- "RX sniffer": info['rx_sniffer'],
"Grat ARP": info['grat_arp'],
}
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
index 38726062..21c9af87 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
@@ -682,7 +682,6 @@ class CTRexInfoGenerator(object):
("-----", []),
("RX Filter Mode", []),
("RX Queueing", []),
- ("RX sniffer", []),
("Grat ARP", []),
]
)
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
index cbbacb27..c386451b 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
@@ -107,4 +107,18 @@ def list_remove_dup (l):
return tmp
-
+def bitfield_to_list (bf):
+ rc = []
+ bitpos = 0
+
+ while bf > 0:
+ if bf & 0x1:
+ rc.append(bitpos)
+ bitpos += 1
+ bf = bf >> 1
+
+ return rc
+
+def bitfield_to_str (bf):
+ lst = bitfield_to_list(bf)
+ return "-" if not lst else ', '.join([str(x) for x in lst])
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
index f5dab30c..8d3aedbe 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
@@ -63,9 +63,14 @@ PKT_SIZE
SERVICE_OFF
+TX_PORT_LIST
+RX_PORT_LIST
+
SRC_IPV4
DST_IPV4
+CAPTURE_ID
+
GLOBAL_STATS
PORT_STATS
PORT_STATUS
@@ -78,12 +83,18 @@ EXTENDED_INC_ZERO_STATS
STREAMS_MASK
CORE_MASK_GROUP
+CAPTURE_PORTS_GROUP
+
+MONITOR_TYPE_VERBOSE
+MONITOR_TYPE_PIPE
+MONITOR_TYPE
# ALL_STREAMS
# STREAM_LIST_WITH_ALL
# list of ArgumentGroup types
MUTEX
+NON_MUTEX
'''
@@ -389,7 +400,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'help': 'Output PCAP filename',
'dest': 'output_filename',
'default': None,
- 'required': True,
'type': str}),
@@ -591,6 +601,45 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'default': True,
'help': 'Deactivates services on port(s)'}),
+ TX_PORT_LIST: ArgumentPack(['--tx'],
+ {'nargs': '+',
+ 'dest':'tx_port_list',
+ 'metavar': 'TX',
+ 'action': 'merge',
+ 'type': int,
+ 'help': 'A list of ports to capture on the TX side',
+ 'default': []}),
+
+
+ RX_PORT_LIST: ArgumentPack(['--rx'],
+ {'nargs': '+',
+ 'dest':'rx_port_list',
+ 'metavar': 'RX',
+ 'action': 'merge',
+ 'type': int,
+ 'help': 'A list of ports to capture on the RX side',
+ 'default': []}),
+
+
+ MONITOR_TYPE_VERBOSE: ArgumentPack(['-v', '--verbose'],
+ {'action': 'store_true',
+ 'dest': 'verbose',
+ 'default': False,
+ 'help': 'output to screen as verbose'}),
+
+ MONITOR_TYPE_PIPE: ArgumentPack(['-p', '--pipe'],
+ {'action': 'store_true',
+ 'dest': 'pipe',
+ 'default': False,
+ 'help': 'forward packets to a pipe'}),
+
+
+ CAPTURE_ID: ArgumentPack(['-i', '--id'],
+ {'help': "capture ID to remove",
+ 'dest': "capture_id",
+ 'type': int,
+ 'required': True}),
+
# advanced options
PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
ALL_PORTS],
@@ -615,6 +664,13 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
CORE_MASK],
{'required': False}),
+ CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}),
+
+
+ MONITOR_TYPE: ArgumentGroup(MUTEX, [MONITOR_TYPE_VERBOSE,
+ MONITOR_TYPE_PIPE],
+ {'required': False}),
+
}
class _MergeAction(argparse._AppendAction):
@@ -633,12 +689,30 @@ class _MergeAction(argparse._AppendAction):
class CCmdArgParser(argparse.ArgumentParser):
- def __init__(self, stateless_client, *args, **kwargs):
+ def __init__(self, stateless_client = None, x = None, *args, **kwargs):
super(CCmdArgParser, self).__init__(*args, **kwargs)
self.stateless_client = stateless_client
self.cmd_name = kwargs.get('prog')
self.register('action', 'merge', _MergeAction)
+
+ def add_arg_list (self, *args):
+ populate_parser(self, *args)
+
+ def add_subparsers(self, *args, **kwargs):
+ sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs)
+
+ add_parser = sub.add_parser
+ stateless_client = self.stateless_client
+
+ def add_parser_hook (self, *args, **kwargs):
+ parser = add_parser(self, *args, **kwargs)
+ parser.stateless_client = stateless_client
+ return parser
+
+ sub.add_parser = add_parser_hook
+ return sub
+
# hook this to the logger
def _print_message(self, message, file=None):
self.stateless_client.logger.log(message)
@@ -709,13 +783,15 @@ class CCmdArgParser(argparse.ArgumentParser):
# recover from system exit scenarios, such as "help", or bad arguments.
return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action"))
+ def formatted_error (self, msg):
+ self.print_usage()
+ self._print_message(('%s: error: %s\n') % (self.prog, msg))
+
def get_flags (opt):
return OPTIONS_DB[opt].name_or_flags
-def gen_parser(stateless_client, op_name, description, *args):
- parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
- description=description)
+def populate_parser (parser, *args):
for param in args:
try:
@@ -731,6 +807,12 @@ def gen_parser(stateless_client, op_name, description, *args):
for sub_argument in argument.args:
group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
**OPTIONS_DB[sub_argument].options)
+
+ elif argument.type == NON_MUTEX:
+ group = parser.add_argument_group(**argument.options)
+ for sub_argument in argument.args:
+ group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
+ **OPTIONS_DB[sub_argument].options)
else:
# ignore invalid objects
continue
@@ -743,6 +825,12 @@ def gen_parser(stateless_client, op_name, description, *args):
except KeyError as e:
cause = e.args[0]
raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param))
+
+def gen_parser(stateless_client, op_name, description, *args):
+ parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
+ description=description)
+
+ populate_parser(parser, *args)
return parser
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
index 63b05bf4..3ffd07e2 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
@@ -133,12 +133,16 @@ def underline(text):
# apply attribute on each non-empty line
def text_attribute(text, attribute):
- return '\n'.join(['{start}{txt}{end}'.format(
- start = TEXT_CODES[attribute]['start'],
- txt = line,
- end = TEXT_CODES[attribute]['end'])
- if line else '' for line in ('%s' % text).split('\n')])
-
+ if isinstance(text, str):
+ return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'],
+ txt=text,
+ stop=TEXT_CODES[attribute]['end'])
+ elif isinstance(text, unicode):
+ return u"{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'],
+ txt=text,
+ stop=TEXT_CODES[attribute]['end'])
+ else:
+ raise Exception("not a string")
FUNC_DICT = {'blue': blue,
'bold': bold,
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index aa31cf0b..1d315fa7 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -3659,8 +3659,9 @@ void CGlobalTRex::rx_sl_configure(void) {
int i;
rx_sl_cfg.m_max_ports = m_max_ports;
+ rx_sl_cfg.m_tx_cores = get_cores_tx();
rx_sl_cfg.m_num_crc_fix_bytes = get_ex_drv()->get_num_crc_fix_bytes();
-
+
if ( get_vm_one_queue_enable() ) {
/* vm mode, indirect queues */
for (i=0; i < m_max_ports; i++) {
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 82cbaca1..6f0ab09a 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -28,6 +28,8 @@ limitations under the License.
#include <internal_api/trex_platform_api.h>
#include "trex_stateless_rx_core.h"
+#include "trex_stateless_capture.h"
+#include "trex_stateless_messaging.h"
#include <fstream>
#include <iostream>
@@ -339,24 +341,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
-
-int
-TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) {
- const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result);
-
- rx_filter_mode_e filter_mode;
- if (type == "hw") {
- filter_mode = RX_FILTER_MODE_HW;
- } else if (type == "all") {
- filter_mode = RX_FILTER_MODE_ALL;
- } else {
- /* can't happen - parsed choice */
- assert(0);
- }
-
- return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode);
-}
-
/**
* set port commands
*
@@ -399,11 +383,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode);
}
- else if (name == "rx_filter_mode") {
- const Json::Value &rx = parse_object(attr, name, result);
- ret = parse_rx_filter_mode(rx, port_id, result);
- }
-
/* unknown attribute */
else {
generate_execute_err(result, "unknown attribute type: '" + name + "'");
@@ -588,7 +567,8 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name());
result["result"]["state"] = port->get_state_as_string();
result["result"]["max_stream_id"] = port->get_max_stream_id();
-
+ result["result"]["service"] = port->is_service_mode_on();
+
/* attributes */
get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]);
@@ -671,6 +651,27 @@ TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
}
+
+/**
+ * set service mode on/off
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdSetServiceMode::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+ bool enabled = parse_bool(params, "enabled", result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ try {
+ port->set_service_mode(enabled);
+ } catch (TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+}
+
/**
* set on/off RX software receive mode
*
@@ -681,12 +682,14 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ if (!port->is_service_mode_on()) {
+ generate_execute_err(result, "rx_feature - available only under service mode");
+ }
+
/* decide which feature is being set */
- const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result);
+ const std::string type = parse_choice(params, "type", {"queue", "server"}, result);
- if (type == "capture") {
- parse_capture_msg(params, port, result);
- } else if (type == "queue") {
+ if (type == "queue") {
parse_queue_msg(params, port, result);
} else if (type == "server") {
parse_server_msg(params, port, result);
@@ -700,38 +703,6 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value &params, Json::Value &result) {
}
void
-TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
-
- bool enabled = parse_bool(msg, "enabled", result);
-
- if (enabled) {
-
- std::string pcap_filename = parse_string(msg, "pcap_filename", result);
- uint64_t limit = parse_uint32(msg, "limit", result);
-
- if (limit == 0) {
- generate_parse_err(result, "limit cannot be zero");
- }
-
- try {
- port->start_rx_capture(pcap_filename, limit);
- } catch (const TrexException &ex) {
- generate_execute_err(result, ex.what());
- }
-
- } else {
-
- try {
- port->stop_rx_capture();
- } catch (const TrexException &ex) {
- generate_execute_err(result, ex.what());
- }
-
- }
-
-}
-
-void
TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
bool enabled = parse_bool(msg, "enabled", result);
@@ -773,8 +744,13 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ if (!port->is_service_mode_on()) {
+ generate_execute_err(result, "get_rx_queue_pkts - available only under service mode");
+ }
+
+
try {
- const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts();
+ const TrexPktBuffer *pkt_buffer = port->get_rx_queue_pkts();
if (pkt_buffer) {
result["result"]["pkts"] = pkt_buffer->to_json();
delete pkt_buffer;
@@ -817,6 +793,7 @@ TrexRpcCmdSetL2::_run(const Json::Value &params, Json::Value &result) {
generate_execute_err(result, ex.what());
}
+ result["result"] = Json::objectValue;
return (TREX_RPC_CMD_OK);
}
@@ -874,7 +851,186 @@ TrexRpcCmdSetL3::_run(const Json::Value &params, Json::Value &result) {
}
}
-
+
+ result["result"] = Json::objectValue;
return (TREX_RPC_CMD_OK);
}
+
+
+/**
+ * capture command tree
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdCapture::_run(const Json::Value &params, Json::Value &result) {
+ const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status", "remove"}, result);
+
+ if (cmd == "start") {
+ parse_cmd_start(params, result);
+ } else if (cmd == "stop") {
+ parse_cmd_stop(params, result);
+ } else if (cmd == "fetch") {
+ parse_cmd_fetch(params, result);
+ } else if (cmd == "status") {
+ parse_cmd_status(params, result);
+ } else if (cmd == "remove") {
+ parse_cmd_remove(params, result);
+ } else {
+ /* can't happen */
+ assert(0);
+ }
+
+ return TREX_RPC_CMD_OK;
+}
+
+/**
+ * starts PCAP capturing
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_start(const Json::Value &params, Json::Value &result) {
+
+ uint32_t limit = parse_uint32(params, "limit", result);
+ const Json::Value &tx_json = parse_array(params, "tx", result);
+ const Json::Value &rx_json = parse_array(params, "rx", result);
+ CaptureFilter filter;
+
+ std::set<uint8_t> ports;
+
+ /* populate the filter */
+ for (int i = 0; i < tx_json.size(); i++) {
+ uint8_t tx_port = parse_byte(tx_json, i, result);
+ filter.add_tx(tx_port);
+ ports.insert(tx_port);
+ }
+
+ for (int i = 0; i < rx_json.size(); i++) {
+ uint8_t rx_port = parse_byte(rx_json, i, result);
+ filter.add_rx(rx_port);
+ ports.insert(rx_port);
+ }
+
+ /* check that all ports are under service mode */
+ for (uint8_t port_id : ports) {
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ if (!port->is_service_mode_on()) {
+ generate_parse_err(result, "start_capture is available only under service mode");
+ }
+ }
+
+ static MsgReply<TrexCaptureRCStart> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply);
+ get_stateless_obj()->send_msg_to_rx(start_msg);
+
+ TrexCaptureRCStart rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"]["capture_id"] = rc.get_new_id();
+ result["result"]["ts"] = now_sec();
+}
+
+/**
+ * stops PCAP capturing
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_stop(const Json::Value &params, Json::Value &result) {
+
+ uint32_t capture_id = parse_uint32(params, "capture_id", result);
+
+ static MsgReply<TrexCaptureRCStop> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply);
+ get_stateless_obj()->send_msg_to_rx(stop_msg);
+
+ TrexCaptureRCStop rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"]["pkt_count"] = rc.get_pkt_count();
+}
+
+/**
+ * gets the status of all captures in the system
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_status(const Json::Value &params, Json::Value &result) {
+
+ /* generate a status command */
+
+ static MsgReply<TrexCaptureRCStatus> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply);
+ get_stateless_obj()->send_msg_to_rx(status_msg);
+
+ TrexCaptureRCStatus rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"] = rc.get_status();
+}
+
+/**
+ * fetch packets from a capture
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value &params, Json::Value &result) {
+
+ uint32_t capture_id = parse_uint32(params, "capture_id", result);
+ uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result);
+
+ /* generate a fetch command */
+
+ static MsgReply<TrexCaptureRCFetch> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply);
+ get_stateless_obj()->send_msg_to_rx(fetch_msg);
+
+ TrexCaptureRCFetch rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer();
+
+ result["result"]["pending"] = rc.get_pending();
+ result["result"]["start_ts"] = rc.get_start_ts();
+ result["result"]["pkts"] = pkt_buffer->to_json();
+
+ /* delete the buffer */
+ delete pkt_buffer;
+}
+
+void
+TrexRpcCmdCapture::parse_cmd_remove(const Json::Value &params, Json::Value &result) {
+
+ uint32_t capture_id = parse_uint32(params, "capture_id", result);
+
+ /* generate a remove command */
+
+ static MsgReply<TrexCaptureRCRemove> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureRemove *remove_msg = new TrexStatelessRxCaptureRemove(capture_id, reply);
+ get_stateless_obj()->send_msg_to_rx(remove_msg);
+
+ TrexCaptureRCRemove rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"] = Json::objectValue;
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 6639be7b..54797bdf 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -94,8 +94,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1,
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE);
TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE,
-
- int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result);
);
@@ -150,16 +148,26 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_
TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE,
- void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, true, APIClass::API_CLASS_TYPE_CORE,
void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result);
);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIClass::API_CLASS_TYPE_CORE);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE);
+
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE,
+ void parse_cmd_start(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_stop(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_status(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_fetch(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_remove(const Json::Value &params, Json::Value &result);
+);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 94a3e1b9..2af9f4f5 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -75,8 +75,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdSetRxFeature());
register_command(new TrexRpcCmdGetRxQueuePkts());
+ register_command(new TrexRpcCmdSetServiceMode());
register_command(new TrexRpcCmdSetL2());
register_command(new TrexRpcCmdSetL3());
+
+ register_command(new TrexRpcCmdCapture());
}
diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp
new file mode 100644
index 00000000..f7d47ec0
--- /dev/null
+++ b/src/stateless/common/trex_stateless_pkt.cpp
@@ -0,0 +1,182 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include "trex_stateless_pkt.h"
+#include <assert.h>
+
+
+/**
+ * copy MBUF to a flat buffer
+ *
+ * @author imarom (12/20/2016)
+ *
+ * @param dest - buffer with at least rte_pktmbuf_pkt_len(m)
+ * bytes
+ * @param m - MBUF to copy
+ *
+ * @return uint8_t*
+ */
+void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) {
+
+ int index = 0;
+ for (const rte_mbuf_t *it = m; it != NULL; it = it->next) {
+ const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *);
+ memcpy(dest + index, src, it->data_len);
+ index += it->data_len;
+ }
+}
+
+/**************************************
+ * TRex packet
+ *
+ *************************************/
+TrexPkt::TrexPkt(const rte_mbuf_t *m, int port, origin_e origin, uint64_t index) {
+
+ /* allocate buffer */
+ m_size = m->pkt_len;
+ m_raw = new uint8_t[m_size];
+
+ /* copy data */
+ copy_mbuf(m_raw, m);
+
+ /* generate a packet timestamp */
+ m_timestamp = now_sec();
+
+ m_port = port;
+ m_origin = origin;
+ m_index = index;
+}
+
+TrexPkt::TrexPkt(const TrexPkt &other) {
+ m_size = other.m_size;
+ memcpy(m_raw, other.m_raw, m_size);
+
+ m_timestamp = other.m_timestamp;
+
+ m_port = other.m_port;
+ m_origin = other.m_origin;
+ m_index = other.m_index;
+}
+
+TrexPktBuffer::TrexPktBuffer(uint64_t size, mode_e mode) {
+ m_mode = mode;
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_bytes = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+
+ /* generate queue */
+ m_buffer = new const TrexPkt*[m_size](); // zeroed
+}
+
+TrexPktBuffer::~TrexPktBuffer() {
+ assert(m_buffer);
+
+ while (!is_empty()) {
+ const TrexPkt *pkt = pop();
+ delete pkt;
+ }
+ delete [] m_buffer;
+}
+
+/**
+ * packet will be copied to an internal object
+ */
+void
+TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uint64_t pkt_index) {
+
+ /* if full - decide by the policy */
+ if (is_full()) {
+ if (m_mode == MODE_DROP_HEAD) {
+ delete pop();
+ } else {
+ /* drop the tail (current packet) */
+ return;
+ }
+ }
+
+ /* push packet */
+ m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index);
+ m_bytes += m_buffer[m_head]->get_size();
+
+ m_head = next(m_head);
+
+}
+
+/**
+ * packet will be handled internally
+ */
+void
+TrexPktBuffer::push(const TrexPkt *pkt) {
+ /* if full - decide by the policy */
+ if (is_full()) {
+ if (m_mode == MODE_DROP_HEAD) {
+ delete pop();
+ } else {
+ /* drop the tail (current packet) */
+ delete pkt;
+ return;
+ }
+ }
+
+ /* push packet */
+ m_buffer[m_head] = pkt;
+ m_head = next(m_head);
+}
+
+const TrexPkt *
+TrexPktBuffer::pop() {
+ assert(!is_empty());
+
+ const TrexPkt *pkt = m_buffer[m_tail];
+ m_tail = next(m_tail);
+
+ m_bytes -= pkt->get_size();
+
+ return pkt;
+}
+
+uint32_t
+TrexPktBuffer::get_element_count() const {
+ if (m_head >= m_tail) {
+ return (m_head - m_tail);
+ } else {
+ return ( get_capacity() - (m_tail - m_head - 1) );
+ }
+}
+
+Json::Value
+TrexPktBuffer::to_json() const {
+
+ Json::Value output = Json::arrayValue;
+
+ int tmp = m_tail;
+ while (tmp != m_head) {
+ const TrexPkt *pkt = m_buffer[tmp];
+ output.append(pkt->to_json());
+ tmp = next(tmp);
+ }
+
+ return output;
+}
+
+
diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h
new file mode 100644
index 00000000..1b6bd2f8
--- /dev/null
+++ b/src/stateless/common/trex_stateless_pkt.h
@@ -0,0 +1,201 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef __TREX_STATELESS_PKT_H__
+#define __TREX_STATELESS_PKT_H__
+
+#include <stdint.h>
+#include <json/json.h>
+#include "mbuf.h"
+#include "common/base64.h"
+#include "os_time.h"
+
+
+/**
+ * copies MBUF to a flat buffer
+ *
+ * @author imarom (1/1/2017)
+ *
+ * @param dest
+ * @param m
+ */
+void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m);
+
+/**
+ * describes a single saved packet
+ *
+ */
+class TrexPkt {
+public:
+
+ enum origin_e {
+ ORIGIN_NONE = 1,
+ ORIGIN_TX,
+ ORIGIN_RX
+ };
+
+ TrexPkt(const rte_mbuf_t *m, int port = -1, origin_e origin = ORIGIN_NONE, uint64_t index = 0);
+ TrexPkt(const TrexPkt &other);
+
+ void set_index(uint64_t index) {
+ m_index = index;
+ }
+
+ /* slow path and also RVO - pass by value is ok */
+ Json::Value to_json() const {
+ Json::Value output;
+ output["ts"] = m_timestamp;
+ output["binary"] = base64_encode(m_raw, m_size);
+ output["port"] = m_port;
+ output["index"] = Json::UInt64(m_index);
+
+ switch (m_origin) {
+ case ORIGIN_TX:
+ output["origin"] = "TX";
+ break;
+ case ORIGIN_RX:
+ output["origin"] = "RX";
+ break;
+ default:
+ output["origin"] = "NONE";
+ break;
+ }
+
+ return output;
+ }
+
+ ~TrexPkt() {
+ if (m_raw) {
+ delete [] m_raw;
+ }
+ }
+
+ origin_e get_origin() const {
+ return m_origin;
+ }
+
+ int get_port() const {
+ return m_port;
+ }
+
+ uint16_t get_size() const {
+ return m_size;
+ }
+
+ dsec_t get_ts() const {
+ return m_timestamp;
+ }
+
+private:
+
+ uint8_t *m_raw;
+ uint16_t m_size;
+ dsec_t m_timestamp;
+ origin_e m_origin;
+ int m_port;
+ uint64_t m_index;
+};
+
+
+class TrexPktBuffer {
+public:
+
+ /**
+ * two modes for operations:
+ *
+ * MODE_DROP_HEAD - when the buffer is full, packets will be
+ * dropped from the head (the oldest packet)
+ *
+ * MODE_DROP_TAIL - when the buffer is full, packets will be
+ * dropped from the tail (the current packet)
+ */
+ enum mode_e {
+ MODE_DROP_HEAD = 1,
+ MODE_DROP_TAIL = 2,
+ };
+
+ TrexPktBuffer(uint64_t size, mode_e mode = MODE_DROP_TAIL);
+ ~TrexPktBuffer();
+
+ /**
+ * push a packet to the buffer
+ *
+ */
+ void push(const rte_mbuf_t *m, int port = -1, TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, uint64_t pkt_index = 0);
+ void push(const TrexPkt *pkt);
+
+ /**
+ * pops a packet from the buffer
+ * usually for internal usage
+ */
+ const TrexPkt * pop();
+
+ /**
+ * generate a JSON output of the queue
+ *
+ */
+ Json::Value to_json() const;
+
+
+ bool is_empty() const {
+ return (m_head == m_tail);
+ }
+
+ bool is_full() const {
+ return ( next(m_head) == m_tail);
+ }
+
+ /**
+ * return the total amount of space possible
+ */
+ uint32_t get_capacity() const {
+ /* one slot is used for diff between full/empty */
+ return (m_size - 1);
+ }
+
+ mode_e get_mode() const {
+ return m_mode;
+ }
+
+ /**
+ * returns how many elements are in the queue
+ */
+ uint32_t get_element_count() const;
+
+ uint32_t get_bytes() const {
+ return m_bytes;
+ }
+
+private:
+ int next(int v) const {
+ return ( (v + 1) % m_size );
+ }
+
+ mode_e m_mode;
+ int m_head;
+ int m_tail;
+ int m_size;
+ uint32_t m_bytes;
+ const TrexPkt **m_buffer;
+};
+
+
+#endif /* __TREX_STATELESS_PKT_H__*/
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index c31ba0a5..6ab9b417 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -18,13 +18,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless.h>
-#include <trex_stateless_port.h>
-#include <sched.h>
#include <iostream>
#include <unistd.h>
+#include "trex_stateless.h"
+#include "trex_stateless_port.h"
+#include "trex_stateless_messaging.h"
+
using namespace std;
/***********************************************************
@@ -141,53 +142,10 @@ TrexStateless::get_dp_core_count() {
}
void
-TrexStateless::encode_stats(Json::Value &global) {
-
- TrexPlatformGlobalStats stats;
- m_platform_api->get_global_stats(stats);
-
- global["cpu_util"] = stats.m_stats.m_cpu_util;
- global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util;
-
- global["tx_bps"] = stats.m_stats.m_tx_bps;
- global["rx_bps"] = stats.m_stats.m_rx_bps;
-
- global["tx_pps"] = stats.m_stats.m_tx_pps;
- global["rx_pps"] = stats.m_stats.m_rx_pps;
-
- global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
- global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
-
- global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
- global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
-
- global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
-
- for (uint8_t i = 0; i < m_port_count; i++) {
- std::stringstream ss;
-
- ss << "port " << i;
- Json::Value &port_section = global[ss.str()];
+TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const {
- m_ports[i]->encode_stats(port_section);
- }
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ ring->Enqueue((CGenNode *)msg);
}
-/**
- * generate a snapshot for publish (async publish)
- *
- */
-void
-TrexStateless::generate_publish_snapshot(std::string &snapshot) {
- Json::FastWriter writer;
- Json::Value root;
-
- root["name"] = "trex-stateless-info";
- root["type"] = 0;
-
- /* stateless specific info goes here */
- root["data"] = Json::nullValue;
-
- snapshot = writer.write(root);
-}
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 3a1a2c24..87d227f6 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -132,32 +132,14 @@ public:
/**
- * shutdown the server
+ * send a message to the RX core
*/
- void shutdown();
-
+ void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const;
+
/**
- * fetch xstats names (keys of dict)
- *
- */
- void encode_xstats_names(Json::Value &global);
-
- /**
- * fetch xstats values
- *
- */
- void encode_xstats_values(Json::Value &global);
-
- /**
- * fetch all the stats
- *
- */
- void encode_stats(Json::Value &global);
-
- /**
- * generate a snapshot for publish
+ * shutdown the server
*/
- void generate_publish_snapshot(std::string &snapshot);
+ void shutdown();
const TrexPlatformApi * get_platform_api() {
return (m_platform_api);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 7d331c6e..9cf048b0 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -162,7 +162,7 @@ private:
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() {
}
}
-void
-TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
- static MsgReply<bool> reply;
-
- reply.reset();
-
- TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
- send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
-
- /* as below, must wait for ACK from RX core before returning ACK */
- reply.wait_for_reply();
-}
-
-void
-TrexStatelessPort::stop_rx_capture() {
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
- send_message_to_rx(msg);
-}
void
TrexStatelessPort::start_rx_queue(uint64_t size) {
@@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) {
this might cause the user to lose some packets from the queue
*/
reply.wait_for_reply();
+
+ m_service_mode.set_rx_queue();
}
void
TrexStatelessPort::stop_rx_queue() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
send_message_to_rx(msg);
+
+ m_service_mode.unset_rx_queue();
}
-const RXPacketBuffer *
+const TrexPktBuffer *
TrexStatelessPort::get_rx_queue_pkts() {
- static MsgReply<const RXPacketBuffer *> reply;
+ static MsgReply<const TrexPktBuffer *> reply;
reply.reset();
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index d4ac4018..0ef8ae60 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -26,12 +26,14 @@ limitations under the License.
#include "trex_dp_port_events.h"
#include "trex_stateless_rx_defs.h"
#include "trex_stream.h"
+#include "trex_exception.h"
+#include "trex_stateless_capture.h"
class TrexStatelessCpToDpMsgBase;
class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
-class RXPacketBuffer;
+class TrexPktBuffer;
/**
@@ -113,6 +115,56 @@ private:
static const std::string g_unowned_handler;
};
+/**
+ * enforces in/out from service mode
+ *
+ * @author imarom (1/4/2017)
+ */
+class TrexServiceMode {
+public:
+ TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) {
+ m_is_enabled = false;
+ m_has_rx_queue = false;
+ m_port_id = port_id;
+ m_port_attr = api->getPortAttrObj(port_id);
+ }
+
+ void enable() {
+ m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL);
+ m_is_enabled = true;
+ }
+
+ void disable() {
+ if (m_has_rx_queue) {
+ throw TrexException("unable to disable service mode - please remove RX queue");
+ }
+
+ if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) {
+ throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists");
+ }
+
+ m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW);
+ m_is_enabled = false;
+ }
+
+ bool is_enabled() const {
+ return m_is_enabled;
+ }
+
+ void set_rx_queue() {
+ m_has_rx_queue = true;
+ }
+
+ void unset_rx_queue() {
+ m_has_rx_queue = false;
+ }
+
+private:
+ bool m_is_enabled;
+ bool m_has_rx_queue;
+ TRexPortAttr *m_port_attr;
+ uint8_t m_port_id;
+};
class AsyncStopEvent;
@@ -150,7 +202,15 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
-
+ /**
+ * port capture mode
+ */
+ enum capture_mode_e {
+ PORT_CAPTURE_NONE = 0,
+ PORT_CAPTURE_RX,
+ PORT_CAPTURE_ALL
+ };
+
TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
~TrexStatelessPort();
@@ -227,6 +287,20 @@ public:
double duration,
bool is_dual);
+ /**
+ * moves port to / out service mode
+ */
+ void set_service_mode(bool enabled) {
+ if (enabled) {
+ m_service_mode.enable();
+ } else {
+ m_service_mode.disable();
+ }
+ }
+ bool is_service_mode_on() const {
+ return m_service_mode.is_enabled();
+ }
+
/**
* get the port state
*
@@ -365,19 +439,6 @@ public:
void get_pci_info(std::string &pci_addr, int &numa_node);
-
- /**
- * enable RX capture on port
- *
- */
- void start_rx_capture(const std::string &pcap_filename, uint64_t limit);
-
- /**
- * disable RX capture if on
- *
- */
- void stop_rx_capture();
-
/**
* start RX queueing of packets
*
@@ -398,7 +459,7 @@ public:
* fetch the RX queue packets from the queue
*
*/
- const RXPacketBuffer *get_rx_queue_pkts();
+ const TrexPktBuffer *get_rx_queue_pkts();
/**
* configures port for L2 mode
@@ -429,7 +490,9 @@ public:
}
private:
-
+ void set_service_mode_on();
+ void set_service_mode_off();
+
bool is_core_active(int core_id);
const std::vector<uint8_t> get_core_id_list () {
@@ -514,6 +577,9 @@ private:
TrexPortOwner m_owner;
int m_pending_async_stop_event;
+
+ TrexServiceMode m_service_mode;
+
static const uint32_t MAX_STREAMS = 20000;
};
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index aeb1e677..2452487c 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -262,23 +262,72 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
bool
-TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
+TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStart start_rc;
+
+ TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc);
+
+ /* mark as done */
+ m_reply.set_reply(start_rc);
+
+ return true;
+}
+bool
+TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStop stop_rc;
+
+ TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc);
+
/* mark as done */
- m_reply.set_reply(true);
+ m_reply.set_reply(stop_rc);
return true;
}
bool
-TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->stop_recorder(m_port_id);
+TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCFetch fetch_rc;
+
+ TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc);
+
+ /* mark as done */
+ m_reply.set_reply(fetch_rc);
+
+ return true;
+}
+bool
+TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStatus status_rc;
+
+ status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json());
+
+ /* mark as done */
+ m_reply.set_reply(status_rc);
+
return true;
}
bool
+TrexStatelessRxCaptureRemove::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCRemove remove_rc;
+
+ TrexStatelessCaptureMngr::getInstance().remove(m_capture_id, remove_rc);
+
+ /* mark as done */
+ m_reply.set_reply(remove_rc);
+
+ return true;
+}
+
+
+bool
TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
rx_core->start_queue(m_port_id, m_size);
@@ -299,7 +348,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
bool
TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
- const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+ const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
/* set the reply */
m_reply.set_reply(pkt_buffer);
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 72b92d11..3535ad4f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -28,12 +28,13 @@ limitations under the License.
#include "trex_stateless_rx_defs.h"
#include "os_time.h"
#include "utl_ip.h"
+#include "trex_stateless_capture.h"
class TrexStatelessDpCore;
class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
-class RXPacketBuffer;
+class TrexPktBuffer;
/**
* Generic message reply object
@@ -484,39 +485,85 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
};
+class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase {
+public:
+ virtual bool handle (CRxCoreStateless *rx_core) = 0;
+};
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture {
public:
- TrexStatelessRxStartCapture(uint8_t port_id,
- const std::string &pcap_filename,
+ TrexStatelessRxCaptureStart(const CaptureFilter& filter,
uint64_t limit,
- MsgReply<bool> &reply) : m_reply(reply) {
+ MsgReply<TrexCaptureRCStart> &reply) : m_reply(reply) {
- m_port_id = port_id;
- m_limit = limit;
- m_pcap_filename = pcap_filename;
+ m_limit = limit;
+ m_filter = filter;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- std::string m_pcap_filename;
- uint64_t m_limit;
- MsgReply<bool> &m_reply;
+ uint8_t m_port_id;
+ uint64_t m_limit;
+ CaptureFilter m_filter;
+ MsgReply<TrexCaptureRCStart> &m_reply;
};
-class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture {
public:
- TrexStatelessRxStopCapture(uint8_t port_id) {
- m_port_id = port_id;
+ TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply<TrexCaptureRCStop> &reply) : m_reply(reply) {
+ m_capture_id = capture_id;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ capture_id_t m_capture_id;
+ MsgReply<TrexCaptureRCStop> &m_reply;
+};
+
+
+class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture {
+public:
+ TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply<TrexCaptureRCFetch> &reply) : m_reply(reply) {
+ m_capture_id = capture_id;
+ m_pkt_limit = pkt_limit;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ capture_id_t m_capture_id;
+ uint32_t m_pkt_limit;
+ MsgReply<TrexCaptureRCFetch> &m_reply;
+};
+
+
+class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture {
+public:
+ TrexStatelessRxCaptureStatus(MsgReply<TrexCaptureRCStatus> &reply) : m_reply(reply) {
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ MsgReply<TrexCaptureRCStatus> &m_reply;
+};
+
+
+
+class TrexStatelessRxCaptureRemove : public TrexStatelessRxCapture {
+public:
+ TrexStatelessRxCaptureRemove(capture_id_t capture_id, MsgReply<TrexCaptureRCRemove> &reply) : m_reply(reply) {
+ m_capture_id = capture_id;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ capture_id_t m_capture_id;
+ MsgReply<TrexCaptureRCRemove> &m_reply;
};
@@ -556,7 +603,7 @@ private:
class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const TrexPktBuffer *> &reply) : m_reply(reply) {
m_port_id = port_id;
}
@@ -568,7 +615,7 @@ public:
private:
uint8_t m_port_id;
- MsgReply<const RXPacketBuffer *> &m_reply;
+ MsgReply<const TrexPktBuffer *> &m_reply;
};
diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp
new file mode 100644
index 00000000..f0d4e806
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_capture.cpp
@@ -0,0 +1,258 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2016 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include "trex_stateless_capture.h"
+#include "trex_exception.h"
+
+TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter) {
+ m_id = id;
+ m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL);
+ m_filter = filter;
+ m_state = STATE_ACTIVE;
+ m_start_ts = now_sec();
+ m_pkt_index = 0;
+}
+
+TrexStatelessCapture::~TrexStatelessCapture() {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ }
+}
+
+void
+TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) {
+
+ if (m_state != STATE_ACTIVE) {
+ delete pkt;
+ return;
+ }
+
+ /* if not in filter - back off */
+ if (!m_filter.in_filter(pkt)) {
+ delete pkt;
+ return;
+ }
+
+ if (pkt->get_ts() < m_start_ts) {
+ delete pkt;
+ return;
+ }
+
+ pkt->set_index(++m_pkt_index);
+ m_pkt_buffer->push(pkt);
+}
+
+void
+TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
+
+ if (m_state != STATE_ACTIVE) {
+ return;
+ }
+
+ if (!m_filter.in_rx(port)) {
+ return;
+ }
+
+ m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index);
+}
+
+
+Json::Value
+TrexStatelessCapture::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ output["id"] = Json::UInt64(m_id);
+ output["filter"] = m_filter.to_json();
+ output["count"] = m_pkt_buffer->get_element_count();
+ output["bytes"] = m_pkt_buffer->get_bytes();
+ output["limit"] = m_pkt_buffer->get_capacity();
+
+ switch (m_state) {
+ case STATE_ACTIVE:
+ output["state"] = "ACTIVE";
+ break;
+
+ case STATE_STOPPED:
+ output["state"] = "STOPPED";
+ break;
+
+ default:
+ assert(0);
+ }
+
+ return output;
+}
+
+TrexPktBuffer *
+TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) {
+
+ /* if the total sum of packets is within the limit range - take it */
+ if (m_pkt_buffer->get_element_count() <= pkt_limit) {
+ TrexPktBuffer *current = m_pkt_buffer;
+ m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode());
+ pending = 0;
+ return current;
+ }
+
+ /* harder part - partial fetch */
+ TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit);
+ for (int i = 0; i < pkt_limit; i++) {
+ const TrexPkt *pkt = m_pkt_buffer->pop();
+ partial->push(pkt);
+ }
+
+ pending = m_pkt_buffer->get_element_count();
+
+ return partial;
+}
+
+void
+TrexStatelessCaptureMngr::update_global_filter() {
+ CaptureFilter new_filter;
+
+ for (TrexStatelessCapture *capture : m_captures) {
+ new_filter += capture->get_filter();
+ }
+
+ m_global_filter = new_filter;
+}
+
+TrexStatelessCapture *
+TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) {
+
+ for (int i = 0; i < m_captures.size(); i++) {
+ if (m_captures[i]->get_id() == capture_id) {
+ return m_captures[i];
+ }
+ }
+
+ /* does not exist */
+ return nullptr;
+}
+
+void
+TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) {
+
+ if (m_captures.size() > MAX_CAPTURE_SIZE) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED);
+ return;
+ }
+
+
+ int new_id = m_id_counter++;
+ TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter);
+ m_captures.push_back(new_buffer);
+
+ /* update global filter */
+ update_global_filter();
+
+ /* result */
+ rc.set_new_id(new_id);
+}
+
+void
+TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) {
+ TrexStatelessCapture *capture = lookup(capture_id);
+ if (!capture) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
+ }
+
+ capture->stop();
+ rc.set_count(capture->get_pkt_count());
+}
+
+void
+TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) {
+ TrexStatelessCapture *capture = lookup(capture_id);
+ if (!capture) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
+ }
+
+ uint32_t pending = 0;
+ TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
+
+ rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts());
+}
+
+void
+TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) {
+
+ int index = -1;
+ for (int i = 0; i < m_captures.size(); i++) {
+ if (m_captures[i]->get_id() == capture_id) {
+ index = i;
+ break;
+ }
+ }
+
+ /* does not exist */
+ if (index == -1) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
+ }
+
+ TrexStatelessCapture *capture = m_captures[index];
+ m_captures.erase(m_captures.begin() + index);
+
+ /* free memory */
+ delete capture;
+
+ /* update global filter */
+ update_global_filter();
+
+ rc.set_ok();
+}
+
+void
+TrexStatelessCaptureMngr::reset() {
+ TrexCaptureRCRemove dummy;
+
+ while (m_captures.size() > 0) {
+ remove(m_captures[0]->get_id(), dummy);
+ }
+}
+
+void
+TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) {
+ for (TrexStatelessCapture *capture : m_captures) {
+ capture->handle_pkt_tx(pkt);
+ }
+}
+
+void
+TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) {
+ for (TrexStatelessCapture *capture : m_captures) {
+ capture->handle_pkt_rx(m, port);
+ }
+}
+
+Json::Value
+TrexStatelessCaptureMngr::to_json() const {
+ Json::Value lst = Json::arrayValue;
+
+ for (TrexStatelessCapture *capture : m_captures) {
+ lst.append(capture->to_json());
+ }
+
+ return lst;
+}
+
diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h
new file mode 100644
index 00000000..bc1b88c5
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_capture.h
@@ -0,0 +1,397 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2016 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STATELESS_CAPTURE_H__
+#define __TREX_STATELESS_CAPTURE_H__
+
+#include <stdint.h>
+#include <assert.h>
+
+#include "trex_stateless_pkt.h"
+#include "trex_stateless_capture_msg.h"
+
+typedef int32_t capture_id_t;
+
+class TrexCaptureRC {
+public:
+
+ TrexCaptureRC() {
+ m_rc = RC_INVALID;
+ m_pkt_buffer = NULL;
+ }
+
+ enum rc_e {
+ RC_INVALID = 0,
+ RC_OK = 1,
+ RC_CAPTURE_NOT_FOUND,
+ RC_CAPTURE_LIMIT_REACHED,
+ RC_CAPTURE_FETCH_UNDER_ACTIVE
+ };
+
+ bool operator !() const {
+ return (m_rc != RC_OK);
+ }
+
+ std::string get_err() const {
+ assert(m_rc != RC_INVALID);
+
+ switch (m_rc) {
+ case RC_OK:
+ return "";
+ case RC_CAPTURE_LIMIT_REACHED:
+ return "capture limit has reached";
+ case RC_CAPTURE_NOT_FOUND:
+ return "capture ID not found";
+ case RC_CAPTURE_FETCH_UNDER_ACTIVE:
+ return "fetch command cannot be executed on an active capture";
+ default:
+ assert(0);
+ }
+ }
+
+ void set_err(rc_e rc) {
+ m_rc = rc;
+ }
+
+ Json::Value get_json() const {
+ return m_json_rc;
+ }
+
+public:
+ rc_e m_rc;
+ capture_id_t m_capture_id;
+ TrexPktBuffer *m_pkt_buffer;
+ Json::Value m_json_rc;
+};
+
+class TrexCaptureRCStart : public TrexCaptureRC {
+public:
+
+ void set_new_id(capture_id_t new_id) {
+ m_capture_id = new_id;
+ m_rc = RC_OK;
+ }
+
+ capture_id_t get_new_id() const {
+ return m_capture_id;
+ }
+
+private:
+ capture_id_t m_capture_id;
+};
+
+
+class TrexCaptureRCStop : public TrexCaptureRC {
+public:
+ void set_count(uint32_t pkt_count) {
+ m_pkt_count = pkt_count;
+ m_rc = RC_OK;
+ }
+
+ uint32_t get_pkt_count() const {
+ return m_pkt_count;
+ }
+
+private:
+ uint32_t m_pkt_count;
+};
+
+class TrexCaptureRCFetch : public TrexCaptureRC {
+public:
+
+ TrexCaptureRCFetch() {
+ m_pkt_buffer = nullptr;
+ m_pending = 0;
+ }
+
+ void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) {
+ m_pkt_buffer = pkt_buffer;
+ m_pending = pending;
+ m_start_ts = start_ts;
+ m_rc = RC_OK;
+ }
+
+ const TrexPktBuffer *get_pkt_buffer() const {
+ return m_pkt_buffer;
+ }
+
+ uint32_t get_pending() const {
+ return m_pending;
+ }
+
+ dsec_t get_start_ts() const {
+ return m_start_ts;
+ }
+
+private:
+ const TrexPktBuffer *m_pkt_buffer;
+ uint32_t m_pending;
+ dsec_t m_start_ts;
+};
+
+class TrexCaptureRCRemove : public TrexCaptureRC {
+public:
+ void set_ok() {
+ m_rc = RC_OK;
+ }
+};
+
+class TrexCaptureRCStatus : public TrexCaptureRC {
+public:
+
+ void set_status(const Json::Value &json) {
+ m_json = json;
+ m_rc = RC_OK;
+ }
+
+ const Json::Value & get_status() const {
+ return m_json;
+ }
+
+private:
+ Json::Value m_json;
+};
+
+/**
+ * capture filter
+ * specify which ports to capture and if TX/RX or both
+ */
+class CaptureFilter {
+public:
+ CaptureFilter() {
+ m_tx_active = 0;
+ m_rx_active = 0;
+ }
+
+ void add_tx(uint8_t port_id) {
+ m_tx_active |= (1LL << port_id);
+ }
+
+ void add_rx(uint8_t port_id) {
+ m_rx_active |= (1LL << port_id);
+ }
+
+ void add(uint8_t port_id) {
+ add_tx(port_id);
+ add_rx(port_id);
+ }
+
+ bool in_filter(const TrexPkt *pkt) {
+ switch (pkt->get_origin()) {
+ case TrexPkt::ORIGIN_TX:
+ return in_tx(pkt->get_port());
+
+ case TrexPkt::ORIGIN_RX:
+ return in_rx(pkt->get_port());
+
+ default:
+ return false;
+ }
+ }
+
+ bool in_rx(uint8_t port_id) const {
+ uint64_t bit = (1LL << port_id);
+ return ((m_rx_active & bit) == bit);
+ }
+
+ bool in_tx(uint8_t port_id) const {
+ uint64_t bit = (1LL << port_id);
+ return ((m_tx_active & bit) == bit);
+ }
+
+ bool in_any(uint8_t port_id) const {
+ return ( in_tx(port_id) || in_rx(port_id) );
+ }
+
+ CaptureFilter& operator +=(const CaptureFilter &other) {
+ m_tx_active |= other.m_tx_active;
+ m_rx_active |= other.m_rx_active;
+
+ return *this;
+ }
+
+ Json::Value to_json() const {
+ Json::Value output = Json::objectValue;
+ output["tx"] = Json::UInt64(m_tx_active);
+ output["rx"] = Json::UInt64(m_rx_active);
+
+ return output;
+ }
+
+private:
+
+ uint64_t m_tx_active;
+ uint64_t m_rx_active;
+};
+
+
+class TrexStatelessCapture {
+public:
+ enum state_e {
+ STATE_ACTIVE,
+ STATE_STOPPED,
+ };
+
+ TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter);
+
+ void handle_pkt_tx(TrexPkt *pkt);
+ void handle_pkt_rx(const rte_mbuf_t *m, int port);
+
+ ~TrexStatelessCapture();
+
+ uint64_t get_id() const {
+ return m_id;
+ }
+
+ const CaptureFilter & get_filter() const {
+ return m_filter;
+ }
+
+ Json::Value to_json() const;
+
+ void stop() {
+ m_state = STATE_STOPPED;
+ }
+
+ TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending);
+
+ bool is_active() const {
+ return m_state == STATE_ACTIVE;
+ }
+
+ uint32_t get_pkt_count() const {
+ return m_pkt_buffer->get_element_count();
+ }
+
+ dsec_t get_start_ts() const {
+ return m_start_ts;
+ }
+
+private:
+ state_e m_state;
+ TrexPktBuffer *m_pkt_buffer;
+ dsec_t m_start_ts;
+ CaptureFilter m_filter;
+ uint64_t m_id;
+ uint64_t m_pkt_index;
+};
+
+class TrexStatelessCaptureMngr {
+
+public:
+
+ static TrexStatelessCaptureMngr& getInstance() {
+ static TrexStatelessCaptureMngr instance;
+
+ return instance;
+ }
+
+
+ ~TrexStatelessCaptureMngr() {
+ reset();
+ }
+
+ /**
+ * starts a new capture
+ */
+ void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc);
+
+ /**
+ * stops an existing capture
+ *
+ */
+ void stop(capture_id_t capture_id, TrexCaptureRCStop &rc);
+
+ /**
+ * fetch packets from an existing capture
+ *
+ */
+ void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc);
+
+ /**
+ * removes an existing capture
+ * all packets captured will be detroyed
+ */
+ void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc);
+
+
+ /**
+ * removes all captures
+ *
+ */
+ void reset();
+
+
+ /**
+ * return true if any filter is active
+ *
+ * @author imarom (1/3/2017)
+ *
+ * @return bool
+ */
+ bool is_active(uint8_t port) const {
+ return m_global_filter.in_any(port);
+ }
+
+ /**
+ * handle packet from TX
+ */
+ void handle_pkt_tx(TrexPkt *pkt);
+
+ /**
+ * handle packet from RX
+ */
+ void handle_pkt_rx(const rte_mbuf_t *m, int port) {
+ /* fast path */
+ if (!is_active(port)) {
+ return;
+ }
+
+ /* slow path */
+ handle_pkt_rx_slow_path(m, port);
+ }
+
+ Json::Value to_json() const;
+
+private:
+
+ TrexStatelessCaptureMngr() {
+ /* init this to 1 */
+ m_id_counter = 1;
+ }
+
+
+ TrexStatelessCapture * lookup(capture_id_t capture_id);
+
+ void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port);
+ void update_global_filter();
+
+ std::vector<TrexStatelessCapture *> m_captures;
+
+ capture_id_t m_id_counter;
+
+ /* a union of all the filters curently active */
+ CaptureFilter m_global_filter;
+
+ static const int MAX_CAPTURE_SIZE = 10;
+};
+
+#endif /* __TREX_STATELESS_CAPTURE_H__ */
+
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index d27485de..00c18082 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_capture = false;
m_max_ports = cfg.m_max_ports;
-
+ m_tx_cores = cfg.m_tx_cores;
+
CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
m_ring_from_cp = cp_rx->getRingCpToDp(0);
- m_ring_to_cp = cp_rx->getRingDpToCp(0);
m_state = STATE_IDLE;
for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
@@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
}
+void
+CRxCoreStateless::periodic_check_for_dp_messages() {
+
+ for (int i = 0; i < m_tx_cores; i++) {
+ periodic_check_for_dp_messages_core(i);
+ }
+
+}
+
+void
+CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id);
+
+ /* fast path */
+ if ( likely ( ring->isEmpty() ) ) {
+ return;
+ }
+
+ while (true) {
+ CGenNode *node = NULL;
+
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+
+ //assert(node);
+ }
+}
+
void CRxCoreStateless::recalculate_next_state() {
if (m_state == STATE_QUIT) {
return;
@@ -176,16 +206,6 @@ void CRxCoreStateless::idle_state_loop() {
}
/**
- * for each port give a tick (for flushing if needed)
- *
- */
-void CRxCoreStateless::port_manager_tick() {
- for (int i = 0; i < m_max_ports; i++) {
- m_rx_port_mngr[i].tick();
- }
-}
-
-/**
* for each port handle the grat ARP mechansim
*
*/
@@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() {
/* set the next sync time to */
dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
- dsec_t tick_time_sec = now_sec() + 1.0;
dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per;
while (m_state == STATE_WORKING) {
@@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() {
if ( (now - sync_time_sec) > 0 ) {
periodic_check_for_cp_messages();
+ //periodic_check_for_dp_messages();
sync_time_sec = now + (1.0 / 1000);
}
- if ( (now - tick_time_sec) > 0) {
- port_manager_tick();
- tick_time_sec = now + 1.0;
- }
-
if ( (now - grat_arp_sec) > 0) {
handle_grat_arp();
grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per;
@@ -255,10 +270,6 @@ void CRxCoreStateless::start() {
m_monitor.disable();
}
-void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
-
-}
-
int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
int total_pkts = 0;
@@ -318,18 +329,6 @@ double CRxCoreStateless::get_cpu_util() {
void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
- m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
- recalculate_next_state();
-}
-
-void
-CRxCoreStateless::stop_recorder(uint8_t port_id) {
- m_rx_port_mngr[port_id].stop_recorder();
- recalculate_next_state();
-}
-
-void
CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
m_rx_port_mngr[port_id].start_queue(size);
recalculate_next_state();
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 4eed59a1..954a5f04 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -27,6 +27,7 @@
#include "pal/linux/sanb_atomic.h"
#include "utl_cpuu.h"
#include "trex_stateless_rx_port_mngr.h"
+#include "trex_stateless_capture.h"
class TrexStatelessCpToRxMsgBase;
@@ -127,17 +128,10 @@ class CRxCoreStateless {
double get_cpu_util();
void update_cpu_util();
- const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
-
- /**
- * start capturing of RX packets on a specific port
- *
- */
- void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
- void stop_recorder(uint8_t port_id);
-
+
/**
* start RX queueing of packets
*
@@ -162,17 +156,20 @@ class CRxCoreStateless {
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+
bool periodic_check_for_cp_messages();
+
+ void periodic_check_for_dp_messages();
+ void periodic_check_for_dp_messages_core(uint32_t core_id);
+
void tickle();
void idle_state_loop();
void recalculate_next_state();
bool are_any_features_active();
- void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void handle_work_stage();
- void port_manager_tick();
void handle_grat_arp();
int process_all_pending_pkts(bool flush_rx = false);
@@ -186,10 +183,10 @@ class CRxCoreStateless {
private:
TrexMonitor m_monitor;
uint32_t m_max_ports;
+ uint32_t m_tx_cores;
bool m_capture;
state_e m_state;
CNodeRing *m_ring_from_cp;
- CNodeRing *m_ring_to_cp;
CCpuUtlDp m_cpu_dp_u;
CCpuUtlCp m_cpu_cp_u;
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
index aefcc133..367cf4e3 100644
--- a/src/stateless/rx/trex_stateless_rx_defs.h
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -38,10 +38,12 @@ class CRxSlCfg {
m_max_ports = 0;
m_cps = 0.0;
m_num_crc_fix_bytes = 0;
+ m_tx_cores = 0;
}
public:
uint32_t m_max_ports;
+ uint32_t m_tx_cores;
double m_cps;
CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
uint8_t m_num_crc_fix_bytes;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index caed2bee..ede86062 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -20,48 +20,10 @@
*/
#include "bp_sim.h"
#include "trex_stateless_rx_port_mngr.h"
-#include "common/captureFile.h"
#include "trex_stateless_rx_core.h"
#include "common/Network/Packet/Arp.h"
#include "pkt_gen.h"
-
-/**
- * copy MBUF to a flat buffer
- *
- * @author imarom (12/20/2016)
- *
- * @param dest - buffer with at least rte_pktmbuf_pkt_len(m)
- * bytes
- * @param m - MBUF to copy
- *
- * @return uint8_t*
- */
-void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) {
-
- int index = 0;
- for (const rte_mbuf_t *it = m; it != NULL; it = it->next) {
- const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *);
- memcpy(dest + index, src, it->data_len);
- index += it->data_len;
- }
-}
-
-/**************************************
- * RX packet
- *
- *************************************/
-RXPacket::RXPacket(const rte_mbuf_t *m) {
-
- /* allocate buffer */
- m_size = m->pkt_len;
- m_raw = new uint8_t[m_size];
-
- /* copy data */
- copy_mbuf(m_raw, m);
-
- /* generate a packet timestamp */
- m_timestamp = now_sec();
-}
+#include "trex_stateless_capture.h"
/**************************************
* latency RX feature
@@ -231,79 +193,12 @@ RXLatency::to_json() const {
*
*************************************/
-RXPacketBuffer::RXPacketBuffer(uint64_t size) {
- m_buffer = nullptr;
- m_head = 0;
- m_tail = 0;
- m_size = (size + 1); // for the empty/full difference 1 slot reserved
-
- /* generate queue */
- m_buffer = new RXPacket*[m_size](); // zeroed
-}
-
-RXPacketBuffer::~RXPacketBuffer() {
- assert(m_buffer);
-
- while (!is_empty()) {
- RXPacket *pkt = pop();
- delete pkt;
- }
- delete [] m_buffer;
-}
-
-void
-RXPacketBuffer::push(const rte_mbuf_t *m) {
- /* if full - pop the oldest */
- if (is_full()) {
- delete pop();
- }
-
- /* push packet */
- m_buffer[m_head] = new RXPacket(m);
- m_head = next(m_head);
-}
-
-RXPacket *
-RXPacketBuffer::pop() {
- assert(!is_empty());
-
- RXPacket *pkt = m_buffer[m_tail];
- m_tail = next(m_tail);
-
- return pkt;
-}
-
-uint64_t
-RXPacketBuffer::get_element_count() const {
- if (m_head >= m_tail) {
- return (m_head - m_tail);
- } else {
- return ( get_capacity() - (m_tail - m_head - 1) );
- }
-}
-
-Json::Value
-RXPacketBuffer::to_json() const {
-
- Json::Value output = Json::arrayValue;
-
- int tmp = m_tail;
- while (tmp != m_head) {
- RXPacket *pkt = m_buffer[tmp];
- output.append(pkt->to_json());
- tmp = next(tmp);
- }
-
- return output;
-}
-
-
void
RXQueue::start(uint64_t size) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RXPacketBuffer(size);
+ m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD);
}
void
@@ -314,7 +209,7 @@ RXQueue::stop() {
}
}
-const RXPacketBuffer *
+const TrexPktBuffer *
RXQueue::fetch() {
/* if no buffer or the buffer is empty - give a NULL one */
@@ -323,10 +218,10 @@ RXQueue::fetch() {
}
/* hold a pointer to the old one */
- RXPacketBuffer *old_buffer = m_pkt_buffer;
+ TrexPktBuffer *old_buffer = m_pkt_buffer;
/* replace the old one with a new one and freeze the old */
- m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
+ m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode());
return old_buffer;
}
@@ -348,97 +243,6 @@ RXQueue::to_json() const {
return output;
}
-/**************************************
- * RX feature recorder
- *
- *************************************/
-
-RXPacketRecorder::RXPacketRecorder() {
- m_writer = NULL;
- m_count = 0;
- m_limit = 0;
- m_epoch = -1;
-
- m_pending_flush = false;
-}
-
-void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
- m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
- if (m_writer == NULL) {
- std::stringstream ss;
- ss << "unable to create PCAP file: " << pcap;
- throw TrexException(ss.str());
- }
-
- assert(limit > 0);
-
- m_limit = limit;
- m_count = 0;
- m_pending_flush = false;
- m_pcap_filename = pcap;
-}
-
-void
-RXPacketRecorder::stop() {
- if (!m_writer) {
- return;
- }
-
- delete m_writer;
- m_writer = NULL;
-}
-
-void
-RXPacketRecorder::flush_to_disk() {
-
- if (m_writer && m_pending_flush) {
- m_writer->flush_to_disk();
- m_pending_flush = false;
- }
-}
-
-void
-RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
- if (!m_writer) {
- return;
- }
-
- dsec_t now = now_sec();
- if (m_epoch < 0) {
- m_epoch = now;
- }
-
- dsec_t dt = now - m_epoch;
-
- CPktNsecTimeStamp t_c(dt);
- m_pkt.time_nsec = t_c.m_time_nsec;
- m_pkt.time_sec = t_c.m_time_sec;
-
- copy_mbuf((uint8_t *)m_pkt.raw, m);
- m_pkt.pkt_len = m->pkt_len;
-
- m_writer->write_packet(&m_pkt);
- m_count++;
- m_pending_flush = true;
-
- if (m_count == m_limit) {
- stop();
- }
-
-}
-
-Json::Value
-RXPacketRecorder::to_json() const {
- Json::Value output = Json::objectValue;
-
- output["pcap_filename"] = m_pcap_filename;
- output["limit"] = Json::UInt64(m_limit);
- output["count"] = Json::UInt64(m_count);
-
- return output;
-}
-
/**************************************
* RX feature server (ARP, ICMP) and etc.
@@ -789,10 +593,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
m_latency.handle_pkt(m);
}
- if (is_feature_set(RECORDER)) {
- m_recorder.handle_pkt(m);
- }
-
if (is_feature_set(QUEUE)) {
m_queue.handle_pkt(m);
}
@@ -800,6 +600,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
if (is_feature_set(SERVER)) {
m_server.handle_pkt(m);
}
+
+ /* capture */
+ TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id);
}
int RXPortManager::process_all_pending_pkts(bool flush_rx) {
@@ -838,13 +641,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) {
}
void
-RXPortManager::tick() {
- if (is_feature_set(RECORDER)) {
- m_recorder.flush_to_disk();
- }
-}
-
-void
RXPortManager::send_next_grat_arp() {
if (is_feature_set(GRAT_ARP)) {
m_grat_arp.send_next_grat_arp();
@@ -890,13 +686,6 @@ RXPortManager::to_json() const {
output["latency"]["is_active"] = false;
}
- if (is_feature_set(RECORDER)) {
- output["sniffer"] = m_recorder.to_json();
- output["sniffer"]["is_active"] = true;
- } else {
- output["sniffer"]["is_active"] = false;
- }
-
if (is_feature_set(QUEUE)) {
output["queue"] = m_queue.to_json();
output["queue"]["is_active"] = true;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 6efdae64..0cc60716 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -25,8 +25,7 @@
#include <stdint.h>
#include "common/base64.h"
-#include "common/captureFile.h"
-
+#include "trex_stateless_pkt.h"
class CPortLatencyHWBase;
class CRFC2544Info;
@@ -80,97 +79,12 @@ public:
CRxCoreErrCntrs *m_err_cntrs;
};
-/**
- * describes a single saved RX packet
- *
- */
-class RXPacket {
-public:
-
- RXPacket(const rte_mbuf_t *m);
-
- /* slow path and also RVO - pass by value is ok */
- Json::Value to_json() {
- Json::Value output;
- output["ts"] = m_timestamp;
- output["binary"] = base64_encode(m_raw, m_size);
- return output;
- }
-
- ~RXPacket() {
- if (m_raw) {
- delete [] m_raw;
- }
- }
-
-private:
-
- uint8_t *m_raw;
- uint16_t m_size;
- dsec_t m_timestamp;
-};
-
/**************************************
* RX feature queue
*
*************************************/
-class RXPacketBuffer {
-public:
-
- RXPacketBuffer(uint64_t size);
- ~RXPacketBuffer();
-
- /**
- * push a packet to the buffer
- *
- */
- void push(const rte_mbuf_t *m);
-
- /**
- * generate a JSON output of the queue
- *
- */
- Json::Value to_json() const;
-
-
- bool is_empty() const {
- return (m_head == m_tail);
- }
-
- bool is_full() const {
- return ( next(m_head) == m_tail);
- }
-
- /**
- * return the total amount of space possible
- */
- uint64_t get_capacity() const {
- /* one slot is used for diff between full/empty */
- return (m_size - 1);
- }
-
- /**
- * returns how many elements are in the queue
- */
- uint64_t get_element_count() const;
-
-private:
- int next(int v) const {
- return ( (v + 1) % m_size );
- }
-
- /* pop in case of full queue - internal usage */
- RXPacket * pop();
-
- int m_head;
- int m_tail;
- int m_size;
- RXPacket **m_buffer;
-};
-
-
class RXQueue {
public:
RXQueue() {
@@ -191,7 +105,7 @@ public:
* fetch the current buffer
* return NULL if no packets
*/
- const RXPacketBuffer * fetch();
+ const TrexPktBuffer * fetch();
/**
* stop RX queue
@@ -204,42 +118,7 @@ public:
Json::Value to_json() const;
private:
- RXPacketBuffer *m_pkt_buffer;
-};
-
-/**************************************
- * RX feature PCAP recorder
- *
- *************************************/
-
-class RXPacketRecorder {
-public:
- RXPacketRecorder();
-
- ~RXPacketRecorder() {
- stop();
- }
-
- void start(const std::string &pcap, uint64_t limit);
- void stop();
- void handle_pkt(const rte_mbuf_t *m);
-
- /**
- * flush any cached packets to disk
- *
- */
- void flush_to_disk();
-
- Json::Value to_json() const;
-
-private:
- CFileWriterBase *m_writer;
- std::string m_pcap_filename;
- CCapPktRaw m_pkt;
- dsec_t m_epoch;
- uint64_t m_limit;
- uint64_t m_count;
- bool m_pending_flush;
+ TrexPktBuffer *m_pkt_buffer;
};
@@ -311,7 +190,6 @@ public:
enum feature_t {
NO_FEATURES = 0x0,
LATENCY = 0x1,
- RECORDER = 0x2,
QUEUE = 0x4,
SERVER = 0x8,
GRAT_ARP = 0x10,
@@ -354,17 +232,6 @@ public:
unset_feature(LATENCY);
}
- /* recorder */
- void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
- m_recorder.start(pcap, limit_pkts);
- set_feature(RECORDER);
- }
-
- void stop_recorder() {
- m_recorder.stop();
- unset_feature(RECORDER);
- }
-
/* queue */
void start_queue(uint32_t size) {
m_queue.start(size);
@@ -376,7 +243,7 @@ public:
unset_feature(QUEUE);
}
- const RXPacketBuffer *get_pkt_buffer() {
+ const TrexPktBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
return nullptr;
}
@@ -415,13 +282,6 @@ public:
void handle_pkt(const rte_mbuf_t *m);
/**
- * maintenance
- *
- * @author imarom (11/24/2016)
- */
- void tick();
-
- /**
* send next grat arp (if on)
*
* @author imarom (12/13/2016)
@@ -482,7 +342,6 @@ private:
uint32_t m_features;
uint8_t m_port_id;
RXLatency m_latency;
- RXPacketRecorder m_recorder;
RXQueue m_queue;
RXServer m_server;
RXGratARP m_grat_arp;