diff options
28 files changed, 2454 insertions, 761 deletions
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 29588622..4273dfe7 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -297,7 +297,9 @@ stateless_src = SrcGroup(dir='src/stateless/', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp', 'rx/trex_stateless_rx_core.cpp', - 'rx/trex_stateless_rx_port_mngr.cpp' + 'rx/trex_stateless_rx_port_mngr.cpp', + 'rx/trex_stateless_capture.cpp', + 'common/trex_stateless_pkt.cpp', ]) # JSON package json_src = SrcGroup(dir='external_libs/json', @@ -571,6 +573,7 @@ includes_path =''' ../src/pal/linux_dpdk/ ../src/stateless/cp/ ../src/stateless/dp/ ../src/stateless/rx/ + ../src/stateless/common/ ../src/stateless/messaging/ ../external_libs/yaml-cpp/include/ diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py new file mode 100644 index 00000000..dfd7f0a4 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -0,0 +1,574 @@ +from trex_stl_lib.api import * +from trex_stl_lib.utils import parsing_opts, text_tables +import threading +import tempfile +import select + +class CaptureMonitorWriter(object): + def init (self, start_ts): + raise NotImplementedError + + def deinit(self): + raise NotImplementedError + + def handle_pkts (self, pkts): + raise NotImplementedError + + def periodic_check (self): + raise NotImplementedError + + +class CaptureMonitorWriterStdout(CaptureMonitorWriter): + def __init__ (self, logger, is_brief): + self.logger = logger + self.is_brief = is_brief + + self.RX_ARROW = u'\u25c0\u2500\u2500' + self.TX_ARROW = u'\u25b6\u2500\u2500' + + def init (self, start_ts): + self.start_ts = start_ts + + self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high')) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) + + + def deinit (self): + pass + + + def periodic_check (self): + return RC_OK() + + def handle_pkts (self, pkts): + byte_count = 0 + + for pkt in pkts: + byte_count += self.__handle_pkt(pkt) + + self.logger.prompt_redraw() + return RC_OK(byte_count) + + + def get_scapy_name (self, pkt_scapy): + layer = pkt_scapy + while layer.payload and layer.payload.name not in('Padding', 'Raw'): + layer = layer.payload + + return layer.name + + + def format_origin (self, origin): + if origin == 'RX': + return u'{0} {1}'.format(self.RX_ARROW, 'RX') + elif origin == 'TX': + return u'{0} {1}'.format(self.TX_ARROW, 'TX') + else: + return '{0}'.format(origin) + + + def __handle_pkt (self, pkt): + pkt_bin = base64.b64decode(pkt['binary']) + + pkt_scapy = Ether(pkt_bin) + self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) + + + if self.is_brief: + self.logger.log(' {0}'.format(pkt_scapy.command())) + else: + pkt_scapy.show(label_lvl = ' ') + self.logger.log('') + + return len(pkt_bin) + +# +class CaptureMonitorWriterPipe(CaptureMonitorWriter): + def __init__ (self, logger): + self.logger = logger + + def init (self, start_ts): + self.start_ts = start_ts + self.fifo_name = tempfile.mktemp() + + try: + self.logger.pre_cmd('Starting pipe capture monitor') + os.mkfifo(self.fifo_name) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) + + self.logger.pre_cmd("Waiting for Wireshark pipe connection") + self.fifo = os.open(self.fifo_name, os.O_WRONLY) + self.logger.post_cmd(RC_OK()) + + self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) + + self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) + self.writer._write_header(None) + + # register a poller + self.poll = select.poll() + self.poll.register(self.fifo, select.EPOLLERR) + + except KeyboardInterrupt as e: + self.logger.post_cmd(RC_ERR("")) + raise STLError("*** pipe monitor aborted...cleaning up") + + except OSError as e: + self.logger.post_cmd(RC_ERR("")) + raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) + + + def deinit (self): + try: + os.unlink(self.fifo_name) + except OSError: + pass + + + def periodic_check (self): + return self.check_pipe() + + + def check_pipe (self): + if self.poll.poll(0): + return RC_ERR('*** pipe has been disconnected - aborting monitoring ***') + + return RC_OK() + + + def handle_pkts (self, pkts): + rc = self.check_pipe() + if not rc: + return rc + + byte_count = 0 + + for pkt in pkts: + pkt_bin = base64.b64decode(pkt['binary']) + ts = pkt['ts'] + sec = int(ts) + usec = int( (ts - sec) * 1e6 ) + + try: + self.writer._write_packet(pkt_bin, sec = sec, usec = usec) + except IOError: + return RC_ERR("*** failed to write packet to pipe ***") + + byte_count += len(pkt_bin) + + return RC_OK(byte_count) + + +class CaptureMonitor(object): + def __init__ (self, client, cmd_lock): + self.client = client + self.cmd_lock = cmd_lock + self.active = False + self.capture_id = None + self.logger = client.logger + self.writer = None + + def is_active (self): + return self.active + + + def get_capture_id (self): + return self.capture_id + + + def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + try: + self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) + except Exception as e: + self.__stop() + raise e + + def start_internal (self, tx_port_list, rx_port_list, rate_pps, mon_type): + # stop any previous monitors + if self.active: + self.stop() + + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + + if mon_type == 'compact': + self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True) + elif mon_type == 'verbose': + self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False) + elif mon_type == 'pipe': + self.writer = CaptureMonitorWriterPipe(self.logger) + else: + raise STLError('unknown writer type') + + + with self.logger.supress(): + data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) + + self.capture_id = data['id'] + self.start_ts = data['ts'] + + self.writer.init(self.start_ts) + + + self.t = threading.Thread(target = self.__thread_cb) + self.t.setDaemon(True) + + try: + self.active = True + self.t.start() + except Exception as e: + self.active = False + self.stop() + raise e + + # entry point stop + def stop (self): + + if self.active: + self.stop_logged() + else: + self.__stop() + + # wraps stop with a logging + def stop_logged (self): + self.logger.pre_cmd("Stopping capture monitor") + + try: + self.__stop() + except Exception as e: + self.logger.post_cmd(RC_ERR("")) + raise e + + self.logger.post_cmd(RC_OK()) + + # internal stop + def __stop (self): + + # shutdown thread + if self.active: + self.active = False + self.t.join() + + # deinit the writer + if self.writer is not None: + self.writer.deinit() + self.writer = None + + # cleanup capture ID if possible + if self.capture_id is None: + return + + capture_id = self.capture_id + self.capture_id = None + + # if we are disconnected - we cannot cleanup the capture + if not self.client.is_connected(): + return + + try: + captures = [x['id'] for x in self.client.get_capture_status()] + if capture_id not in captures: + return + + with self.logger.supress(): + self.client.stop_capture(capture_id) + + except STLError as e: + self.logger.post_cmd(RC_ERR("")) + raise e + + + def get_mon_row (self): + if not self.is_active(): + return None + + return [self.capture_id, + self.pkt_count, + format_num(self.byte_count, suffix = 'B'), + ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'), + ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-') + ] + + + # sleeps with high freq checks for active + def __sleep (self): + for _ in range(5): + if not self.active: + return False + + time.sleep(0.1) + + return True + + def __lock (self): + while True: + rc = self.cmd_lock.acquire(False) + if rc: + return True + + if not self.active: + return False + time.sleep(0.1) + + def __unlock (self): + self.cmd_lock.release() + + + def __thread_cb (self): + try: + rc = self.__thread_main_loop() + finally: + pass + + if not rc: + self.logger.log(str(rc)) + self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold')) + self.logger.prompt_redraw() + + + def __thread_main_loop (self): + self.pkt_count = 0 + self.byte_count = 0 + + while self.active: + + # sleep + if not self.__sleep(): + break + + # check that the writer is ok + rc = self.writer.periodic_check() + if not rc: + return rc + + # try to lock + if not self.__lock(): + break + + try: + if not self.client.is_connected(): + return RC_ERR('*** client has been disconnected, aborting monitoring ***') + rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) + if not rc: + return rc + + finally: + self.__unlock() + + + pkts = rc.data()['pkts'] + if not pkts: + continue + + rc = self.writer.handle_pkts(pkts) + if not rc: + return rc + + self.pkt_count += len(pkts) + self.byte_count += rc.data() + + # graceful shutdown + return RC_OK() + + + +# main class +class CaptureManager(object): + def __init__ (self, client, cmd_lock): + self.c = client + self.cmd_lock = cmd_lock + self.monitor = CaptureMonitor(client, cmd_lock) + self.logger = client.logger + + # install parsers + + self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__) + self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands") + + self.install_record_parser() + self.install_monitor_parser() + + # show + self.show_parser = self.subparsers.add_parser('show', help = "show all active captures") + + # reset + self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures") + + # register handlers + self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show} + + + def install_record_parser (self): + # recording + self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording") + record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd') + self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture") + self.record_stop_parser = record_sub.add_parser('stop', help = "stops an active buffered capture") + + # start + self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) + + # stop + self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + + + def install_monitor_parser (self): + # monitor + self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring') + monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd') + self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor') + self.monitor_stop_parser = monitor_sub.add_parser('stop', help = 'stops an active monitor') + + self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.MONITOR_TYPE) + + + + def stop (self): + self.monitor.stop() + + + # main entry point for parsing commands from console + def parse_line (self, line): + try: + self.parse_line_internal(line) + except STLError as e: + self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold')) + return RC_ERR(e.brief()) + + + def parse_line_internal (self, line): + '''Manage PCAP recorders''' + + # default + if not line: + line = "show" + + opts = self.parser.parse_args(line.split()) + if not opts: + return opts + + # call the handler + self.cmds[opts.commands](opts) + + + # record methods + def parse_record (self, opts): + if opts.record_cmd == 'start': + self.parse_record_start(opts) + elif opts.record_cmd == 'stop': + self.parse_record_stop(opts) + else: + self.record_parser.formatted_error("too few arguments") + + + def parse_record_start (self, opts): + if not opts.tx_port_list and not opts.rx_port_list: + self.record_start_parser.formatted_error('please provide either --tx or --rx') + return + + rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold')) + self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold')) + + + def parse_record_stop (self, opts): + captures = self.c.get_capture_status() + ids = [c['id'] for c in captures] + + if opts.capture_id == self.monitor.get_capture_id(): + self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id)) + return + + if opts.capture_id not in ids: + self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id)) + return + + self.c.stop_capture(opts.capture_id, opts.output_filename) + + + # monitor methods + def parse_monitor (self, opts): + if opts.mon_cmd == 'start': + self.parse_monitor_start(opts) + elif opts.mon_cmd == 'stop': + self.parse_monitor_stop(opts) + else: + self.monitor_parser.formatted_error("too few arguments") + + + def parse_monitor_start (self, opts): + mon_type = 'compact' + + if opts.verbose: + mon_type = 'verbose' + elif opts.pipe: + mon_type = 'pipe' + + if not opts.tx_port_list and not opts.rx_port_list: + self.monitor_start_parser.formatted_error('please provide either --tx or --rx') + return + + self.monitor.stop() + self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type) + + def parse_monitor_stop (self, opts): + self.monitor.stop() + + def parse_clear (self, opts): + self.monitor.stop() + self.c.remove_all_captures() + + + + def parse_show (self, opts): + data = self.c.get_capture_status() + + # captures + cap_table = text_tables.TRexTextTable() + cap_table.set_cols_align(["c"] * 6) + cap_table.set_cols_width([15] * 6) + + # monitor + mon_table = text_tables.TRexTextTable() + mon_table.set_cols_align(["c"] * 5) + mon_table.set_cols_width([15] * 5) + + for elem in data: + id = elem['id'] + + if self.monitor.get_capture_id() == id: + row = self.monitor.get_mon_row() + mon_table.add_rows([row], header=False) + + else: + row = [id, + format_text(elem['state'], 'bold'), + '[{0}/{1}]'.format(elem['count'], elem['limit']), + format_num(elem['bytes'], suffix = 'B'), + bitfield_to_str(elem['filter']['tx']), + bitfield_to_str(elem['filter']['rx'])] + + cap_table.add_rows([row], header=False) + + cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports']) + mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) + + if cap_table._rows: + text_tables.print_table_with_header(cap_table, '\nActive Recorders') + + if mon_table._rows: + text_tables.print_table_with_header(mon_table, '\nActive Monitor') + + diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index c9956472..83f36820 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -29,6 +29,8 @@ import string import os import sys import tty, termios +from threading import Lock +import threading try: import stl_path @@ -39,6 +41,7 @@ from trex_stl_lib.api import * from trex_stl_lib.utils.text_opts import * from trex_stl_lib.utils.common import user_input, get_current_user from trex_stl_lib.utils import parsing_opts +from .trex_capture import CaptureManager try: import trex_tui @@ -172,6 +175,8 @@ class TRexConsole(TRexGeneralCmd): def __init__(self, stateless_client, verbose = False): + self.cmd_lock = Lock() + self.stateless_client = stateless_client TRexGeneralCmd.__init__(self) @@ -184,8 +189,11 @@ class TRexConsole(TRexGeneralCmd): self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" + self.cap_mngr = CaptureManager(stateless_client, self.cmd_lock) + self.postcmd(False, "") + ################### internal section ######################## @@ -231,6 +239,7 @@ class TRexConsole(TRexGeneralCmd): lines = line.split(';') try: + self.cmd_lock.acquire() for line in lines: stop = self.onecmd(line) stop = self.postcmd(stop, line) @@ -238,10 +247,15 @@ class TRexConsole(TRexGeneralCmd): return "quit" return "" + except STLError as e: print(e) return '' + finally: + self.cmd_lock.release() + + def postcmd(self, stop, line): self.prompt = self.stateless_client.generate_prompt(prefix = 'trex') @@ -347,12 +361,12 @@ class TRexConsole(TRexGeneralCmd): @verify_connected - def do_set_rx_sniffer (self, line): - '''Sets a port sniffer on RX channel as PCAP recorder''' - self.stateless_client.set_rx_sniffer_line(line) + def do_capture (self, line): + '''Manage PCAP captures''' + self.cap_mngr.parse_line(line) - def help_sniffer (self): - self.do_set_rx_sniffer("-h") + def help_capture (self): + self.do_capture("-h") @verify_connected def do_resolve (self, line): @@ -443,7 +457,9 @@ class TRexConsole(TRexGeneralCmd): def do_disconnect (self, line): '''Disconnect from the server\n''' - + + # stop any monitors before disconnecting + self.cap_mngr.stop() self.stateless_client.disconnect_line(line) @@ -688,19 +704,24 @@ class TRexConsole(TRexGeneralCmd): l=help.splitlines() print("{:<30} {:<30}".format(cmd + " - ",l[0] )) + # a custorm cmdloop wrapper def start(self): - while True: - try: - self.cmdloop() - break - except KeyboardInterrupt as e: - if not readline.get_line_buffer(): - raise KeyboardInterrupt - else: - print("") - self.intro = None - continue + try: + while True: + try: + self.cmdloop() + break + except KeyboardInterrupt as e: + if not readline.get_line_buffer(): + raise KeyboardInterrupt + else: + print("") + self.intro = None + continue + + finally: + self.cap_mngr.stop() if self.terminal: self.terminal.kill() @@ -933,6 +954,8 @@ def main(): with stateless_client.logger.supress(): stateless_client.disconnect(stop_traffic = False) + + if __name__ == '__main__': main() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 21ae42f1..f7432107 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -17,15 +17,18 @@ from .utils.text_opts import * from functools import wraps from texttable import ansi_len + from collections import namedtuple from yaml import YAMLError import time import datetime +import threading import re import random import json import traceback import os.path +import argparse ############################ logger ############################# ############################ ############################# @@ -600,12 +603,10 @@ class STLClient(object): self.util_stats, self.xstats, self.async_client.monitor) - - - - + ############# private functions - used by the class itself ########### + # some preprocessing for port argument def __ports (self, port_id_list): @@ -832,27 +833,6 @@ class STLClient(object): return rc - def __set_rx_sniffer (self, port_id_list, base_filename, limit): - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - head, tail = os.path.splitext(base_filename) - filename = "{0}-{1}{2}".format(head, port_id, tail) - rc.add(self.ports[port_id].set_rx_sniffer(filename, limit)) - - return rc - - - def __remove_rx_sniffer (self, port_id_list): - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_rx_sniffer()) - - return rc - def __set_rx_queue (self, port_id_list, size): port_id_list = self.__ports(port_id_list) rc = RC() @@ -1071,7 +1051,7 @@ class STLClient(object): ############ functions used by other classes but not users ############## - def _validate_port_list (self, port_id_list): + def _validate_port_list (self, port_id_list, allow_empty = False): # listfiy single int if isinstance(port_id_list, int): port_id_list = [port_id_list] @@ -1080,7 +1060,7 @@ class STLClient(object): if not isinstance(port_id_list, list): raise STLTypeError('port_id_list', type(port_id_list), list) - if not port_id_list: + if not port_id_list and not allow_empty: raise STLError('No ports provided') valid_ports = self.get_all_ports() @@ -2084,9 +2064,9 @@ class STLClient(object): self.set_port_attr(ports, promiscuous = False, link_up = True if restart else None) - self.set_service_mode(ports, False) - self.remove_rx_sniffer(ports) self.remove_rx_queue(ports) + self.set_service_mode(ports, False) + except STLError as e: self.logger.post_cmd(False) @@ -2996,7 +2976,7 @@ class STLClient(object): Resolves ports (ARP resolution) :parameters: - ports - for which ports to apply a unique sniffer (each port gets a unique file) + ports - which ports to resolve retries - how many times to retry on each port (intervals of 100 milliseconds) verbose - log for each request the response :raises: @@ -3025,57 +3005,166 @@ class STLClient(object): @__api_check(True) - def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000): + def start_capture (self, tx_ports, rx_ports, limit = 1000): """ - Sets a RX sniffer for port(s) written to a PCAP file + Starts a capture to PCAP on port(s) :parameters: - ports - for which ports to apply a unique sniffer (each port gets a unique file) - base_filename - filename will be appended with '-<port_number>', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc. + tx_ports - on which ports to capture TX + rx_ports - on which ports to capture RX limit - limit how many packets will be written + + :returns: + returns a dictionary containing + {'id: <new_id>, 'ts': <starting timestamp>} + :raises: + :exe:'STLError' """ - ports = ports if ports is not None else self.get_acquired_ports() - ports = self._validate_port_list(ports) - + + tx_ports = self._validate_port_list(tx_ports, allow_empty = True) + rx_ports = self._validate_port_list(rx_ports, allow_empty = True) + merge_ports = set(tx_ports + rx_ports) + + if not merge_ports: + raise STLError("start_capture - must get at least one port to capture") + # check arguments - validate_type('base_filename', base_filename, basestring) validate_type('limit', limit, (int)) if limit <= 0: raise STLError("'limit' must be a positive value") - self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports)) - rc = self.__set_rx_sniffer(ports, base_filename, limit) + non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports()) + if non_service_ports: + raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode".format(non_service_ports)) + + + self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) + + rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) if not rc: raise STLError(rc) + return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']} + + + def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) + + # create a PCAP file + writer = RawPcapWriter(output_filename, linktype = 1) + writer._write_header(None) + + # fetch + pending = pkt_count + rc = RC_OK() + while pending > 0: + rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) + if not rc: + self.logger.post_cmd(rc) + raise STLError(rc) + + pkts = rc.data()['pkts'] + pending = rc.data()['pending'] + start_ts = rc.data()['start_ts'] + + for pkt in pkts: + ts = pkt['ts'] - start_ts + ts_sec = int(ts) + ts_usec = int( (ts - ts_sec) * 1e6 ) + + pkt_bin = base64.b64decode(pkt['binary']) + writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) + + + + + self.logger.post_cmd(rc) + + @__api_check(True) - def remove_rx_sniffer (self, ports = None): + def stop_capture (self, capture_id, output_filename = None): """ - Removes RX sniffer from port(s) + Stops an active capture + + :parameters: + capture_id - an active capture ID to stop + output_filename - output filename to save capture :raises: + :exe:'STLError' """ - ports = ports if ports is not None else self.get_acquired_ports() - ports = self._validate_port_list(ports) - - self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports)) - rc = self.__remove_rx_sniffer(ports) + + # stopping a capture requires: + # 1. stopping + # 2. fetching + # 3. saving to file + + # stop + + self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id)) + rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id}) self.logger.post_cmd(rc) + if not rc: + raise STLError(rc) + + # pkt count + pkt_count = rc.data()['pkt_count'] + + # fetch packets + if output_filename: + self.__fetch_capture_packets(capture_id, output_filename, pkt_count) + + # remove + self.logger.pre_cmd("Removing PCAP capture {0} from server".format(capture_id)) + rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id}) + self.logger.post_cmd(rc) + if not rc: + raise STLError(rc) + + + @__api_check(True) + def get_capture_status (self): + """ + returns a list of all active captures + each element in the list is an object containing + info about the capture + + """ + + rc = self._transmit("capture", params = {'command': 'status'}) if not rc: raise STLError(rc) - + return rc.data() + + @__api_check(True) + def remove_all_captures (self): + """ + Removes any existing captures + """ + captures = self.get_capture_status() + + self.logger.pre_cmd("Removing all PCAP captures from server") + + for c in captures: + # remove + rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': c['id']}) + if not rc: + raise STLError(rc) + + self.logger.post_cmd(RC_OK()) + + + @__api_check(True) def set_rx_queue (self, ports = None, size = 1000): """ @@ -3196,6 +3285,7 @@ class STLClient(object): return wrap + @__console def ping_line (self, line): '''pings the server / specific IP''' @@ -3789,30 +3879,10 @@ class STLClient(object): opts.link, opts.led, opts.flow_ctrl) - + + - - @__console - def set_rx_sniffer_line (self, line): - '''Sets a port sniffer on RX channel in form of a PCAP file''' - - parser = parsing_opts.gen_parser(self, - "set_rx_sniffer", - self.set_rx_sniffer_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.OUTPUT_FILENAME, - parsing_opts.LIMIT) - - opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) - if not opts: - return opts - - self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit) - - return RC_OK() - - @__console def resolve_line (self, line): '''Performs a port ARP resolution''' @@ -4010,3 +4080,4 @@ class STLClient(object): self.set_service_mode(ports = opts.ports, enabled = opts.enabled) + diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index a9509ee9..31d752af 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -56,7 +56,8 @@ class Port(object): def __init__ (self, port_id, user, comm_link, session_id, info): self.port_id = port_id - self.state = self.STATE_IDLE + self.state = self.STATE_IDLE + self.service_mode = False self.handler = None self.comm_link = comm_link @@ -247,14 +248,16 @@ class Port(object): raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) self.owner = rc.data()['owner'] - + self.next_available_id = int(rc.data()['max_stream_id']) + 1 self.status = rc.data() - + # replace the attributes in a thread safe manner self.set_ts_attr(rc.data()['attr']) - + + self.service_mode = rc.data()['service'] + return self.ok() @@ -490,33 +493,17 @@ class Port(object): @owned - def set_rx_sniffer (self, pcap_filename, limit): + def start_capture (self, pcap_filename, mode, limit): - if not self.is_service_mode_on(): + if mode != 'tx' and not self.is_service_mode_on(): return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode') params = {"handler": self.handler, "port_id": self.port_id, - "type": "capture", - "enabled": True, - "pcap_filename": pcap_filename, + "mode": mode, "limit": limit} - rc = self.transmit("set_rx_feature", params) - if rc.bad(): - return self.err(rc.err()) - - return self.ok() - - - @owned - def remove_rx_sniffer (self): - params = {"handler": self.handler, - "port_id": self.port_id, - "type": "capture", - "enabled": False} - - rc = self.transmit("set_rx_feature", params) + rc = self.transmit("start_capture", params) if rc.bad(): return self.err(rc.err()) @@ -719,23 +706,21 @@ class Port(object): @owned def set_service_mode (self, enabled): - rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw') - if not rc: - return rc - - if not enabled: - rc = self.remove_rx_queue() - if not rc: - return rc - - rc = self.remove_rx_sniffer() - if not rc: - return rc - + params = {"handler": self.handler, + "port_id": self.port_id, + "enabled": enabled} + + rc = self.transmit("service", params) + if rc.bad(): + return self.err(rc.err()) + + self.service_mode = enabled return self.ok() + def is_service_mode_on (self): - return self.get_rx_filter_mode() == 'all' + return self.service_mode + @writeable def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec): @@ -902,11 +887,6 @@ class Port(object): # RX info rx_info = self.status['rx_info'] - # RX sniffer - sniffer = rx_info['sniffer'] - info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' - - # RX queue queue = rx_info['queue'] info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' @@ -928,8 +908,6 @@ class Port(object): def get_layer_cfg (self): return self.__attr['layer_cfg'] - def get_rx_filter_mode (self): - return self.__attr['rx_filter_mode'] def is_virtual(self): return self.info.get('is_virtual') @@ -1005,7 +983,6 @@ class Port(object): "layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'), "RX Filter Mode": info['rx_filter_mode'], "RX Queueing": info['rx_queue'], - "RX sniffer": info['rx_sniffer'], "Grat ARP": info['grat_arp'], } diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py index 38726062..21c9af87 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py @@ -682,7 +682,6 @@ class CTRexInfoGenerator(object): ("-----", []), ("RX Filter Mode", []), ("RX Queueing", []), - ("RX sniffer", []), ("Grat ARP", []), ] ) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py index cbbacb27..c386451b 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py @@ -107,4 +107,18 @@ def list_remove_dup (l): return tmp - +def bitfield_to_list (bf): + rc = [] + bitpos = 0 + + while bf > 0: + if bf & 0x1: + rc.append(bitpos) + bitpos += 1 + bf = bf >> 1 + + return rc + +def bitfield_to_str (bf): + lst = bitfield_to_list(bf) + return "-" if not lst else ', '.join([str(x) for x in lst]) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index f5dab30c..8d3aedbe 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -63,9 +63,14 @@ PKT_SIZE SERVICE_OFF +TX_PORT_LIST +RX_PORT_LIST + SRC_IPV4 DST_IPV4 +CAPTURE_ID + GLOBAL_STATS PORT_STATS PORT_STATUS @@ -78,12 +83,18 @@ EXTENDED_INC_ZERO_STATS STREAMS_MASK CORE_MASK_GROUP +CAPTURE_PORTS_GROUP + +MONITOR_TYPE_VERBOSE +MONITOR_TYPE_PIPE +MONITOR_TYPE # ALL_STREAMS # STREAM_LIST_WITH_ALL # list of ArgumentGroup types MUTEX +NON_MUTEX ''' @@ -389,7 +400,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'help': 'Output PCAP filename', 'dest': 'output_filename', 'default': None, - 'required': True, 'type': str}), @@ -591,6 +601,45 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'default': True, 'help': 'Deactivates services on port(s)'}), + TX_PORT_LIST: ArgumentPack(['--tx'], + {'nargs': '+', + 'dest':'tx_port_list', + 'metavar': 'TX', + 'action': 'merge', + 'type': int, + 'help': 'A list of ports to capture on the TX side', + 'default': []}), + + + RX_PORT_LIST: ArgumentPack(['--rx'], + {'nargs': '+', + 'dest':'rx_port_list', + 'metavar': 'RX', + 'action': 'merge', + 'type': int, + 'help': 'A list of ports to capture on the RX side', + 'default': []}), + + + MONITOR_TYPE_VERBOSE: ArgumentPack(['-v', '--verbose'], + {'action': 'store_true', + 'dest': 'verbose', + 'default': False, + 'help': 'output to screen as verbose'}), + + MONITOR_TYPE_PIPE: ArgumentPack(['-p', '--pipe'], + {'action': 'store_true', + 'dest': 'pipe', + 'default': False, + 'help': 'forward packets to a pipe'}), + + + CAPTURE_ID: ArgumentPack(['-i', '--id'], + {'help': "capture ID to remove", + 'dest': "capture_id", + 'type': int, + 'required': True}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], @@ -615,6 +664,13 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], CORE_MASK], {'required': False}), + CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}), + + + MONITOR_TYPE: ArgumentGroup(MUTEX, [MONITOR_TYPE_VERBOSE, + MONITOR_TYPE_PIPE], + {'required': False}), + } class _MergeAction(argparse._AppendAction): @@ -633,12 +689,30 @@ class _MergeAction(argparse._AppendAction): class CCmdArgParser(argparse.ArgumentParser): - def __init__(self, stateless_client, *args, **kwargs): + def __init__(self, stateless_client = None, x = None, *args, **kwargs): super(CCmdArgParser, self).__init__(*args, **kwargs) self.stateless_client = stateless_client self.cmd_name = kwargs.get('prog') self.register('action', 'merge', _MergeAction) + + def add_arg_list (self, *args): + populate_parser(self, *args) + + def add_subparsers(self, *args, **kwargs): + sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs) + + add_parser = sub.add_parser + stateless_client = self.stateless_client + + def add_parser_hook (self, *args, **kwargs): + parser = add_parser(self, *args, **kwargs) + parser.stateless_client = stateless_client + return parser + + sub.add_parser = add_parser_hook + return sub + # hook this to the logger def _print_message(self, message, file=None): self.stateless_client.logger.log(message) @@ -709,13 +783,15 @@ class CCmdArgParser(argparse.ArgumentParser): # recover from system exit scenarios, such as "help", or bad arguments. return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action")) + def formatted_error (self, msg): + self.print_usage() + self._print_message(('%s: error: %s\n') % (self.prog, msg)) + def get_flags (opt): return OPTIONS_DB[opt].name_or_flags -def gen_parser(stateless_client, op_name, description, *args): - parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', - description=description) +def populate_parser (parser, *args): for param in args: try: @@ -731,6 +807,12 @@ def gen_parser(stateless_client, op_name, description, *args): for sub_argument in argument.args: group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, **OPTIONS_DB[sub_argument].options) + + elif argument.type == NON_MUTEX: + group = parser.add_argument_group(**argument.options) + for sub_argument in argument.args: + group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, + **OPTIONS_DB[sub_argument].options) else: # ignore invalid objects continue @@ -743,6 +825,12 @@ def gen_parser(stateless_client, op_name, description, *args): except KeyError as e: cause = e.args[0] raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param)) + +def gen_parser(stateless_client, op_name, description, *args): + parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', + description=description) + + populate_parser(parser, *args) return parser diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py index 63b05bf4..3ffd07e2 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py @@ -133,12 +133,16 @@ def underline(text): # apply attribute on each non-empty line def text_attribute(text, attribute): - return '\n'.join(['{start}{txt}{end}'.format( - start = TEXT_CODES[attribute]['start'], - txt = line, - end = TEXT_CODES[attribute]['end']) - if line else '' for line in ('%s' % text).split('\n')]) - + if isinstance(text, str): + return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], + txt=text, + stop=TEXT_CODES[attribute]['end']) + elif isinstance(text, unicode): + return u"{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], + txt=text, + stop=TEXT_CODES[attribute]['end']) + else: + raise Exception("not a string") FUNC_DICT = {'blue': blue, 'bold': bold, diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index aa31cf0b..1d315fa7 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -3659,8 +3659,9 @@ void CGlobalTRex::rx_sl_configure(void) { int i; rx_sl_cfg.m_max_ports = m_max_ports; + rx_sl_cfg.m_tx_cores = get_cores_tx(); rx_sl_cfg.m_num_crc_fix_bytes = get_ex_drv()->get_num_crc_fix_bytes(); - + if ( get_vm_one_queue_enable() ) { /* vm mode, indirect queues */ for (i=0; i < m_max_ports; i++) { diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 82cbaca1..6f0ab09a 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -28,6 +28,8 @@ limitations under the License. #include <internal_api/trex_platform_api.h> #include "trex_stateless_rx_core.h" +#include "trex_stateless_capture.h" +#include "trex_stateless_messaging.h" #include <fstream> #include <iostream> @@ -339,24 +341,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } - -int -TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) { - const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result); - - rx_filter_mode_e filter_mode; - if (type == "hw") { - filter_mode = RX_FILTER_MODE_HW; - } else if (type == "all") { - filter_mode = RX_FILTER_MODE_ALL; - } else { - /* can't happen - parsed choice */ - assert(0); - } - - return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode); -} - /** * set port commands * @@ -399,11 +383,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value ¶ms, Json::Value &result) { ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode); } - else if (name == "rx_filter_mode") { - const Json::Value &rx = parse_object(attr, name, result); - ret = parse_rx_filter_mode(rx, port_id, result); - } - /* unknown attribute */ else { generate_execute_err(result, "unknown attribute type: '" + name + "'"); @@ -588,7 +567,8 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name()); result["result"]["state"] = port->get_state_as_string(); result["result"]["max_stream_id"] = port->get_max_stream_id(); - + result["result"]["service"] = port->is_service_mode_on(); + /* attributes */ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]); @@ -671,6 +651,27 @@ TrexRpcCmdPushRemote::_run(const Json::Value ¶ms, Json::Value &result) { } + +/** + * set service mode on/off + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdSetServiceMode::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_port(params, result); + bool enabled = parse_bool(params, "enabled", result); + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + try { + port->set_service_mode(enabled); + } catch (TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); +} + /** * set on/off RX software receive mode * @@ -681,12 +682,14 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_execute_err(result, "rx_feature - available only under service mode"); + } + /* decide which feature is being set */ - const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result); + const std::string type = parse_choice(params, "type", {"queue", "server"}, result); - if (type == "capture") { - parse_capture_msg(params, port, result); - } else if (type == "queue") { + if (type == "queue") { parse_queue_msg(params, port, result); } else if (type == "server") { parse_server_msg(params, port, result); @@ -700,38 +703,6 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { } void -TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { - - bool enabled = parse_bool(msg, "enabled", result); - - if (enabled) { - - std::string pcap_filename = parse_string(msg, "pcap_filename", result); - uint64_t limit = parse_uint32(msg, "limit", result); - - if (limit == 0) { - generate_parse_err(result, "limit cannot be zero"); - } - - try { - port->start_rx_capture(pcap_filename, limit); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } - - } else { - - try { - port->stop_rx_capture(); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } - - } - -} - -void TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { bool enabled = parse_bool(msg, "enabled", result); @@ -773,8 +744,13 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_execute_err(result, "get_rx_queue_pkts - available only under service mode"); + } + + try { - const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts(); + const TrexPktBuffer *pkt_buffer = port->get_rx_queue_pkts(); if (pkt_buffer) { result["result"]["pkts"] = pkt_buffer->to_json(); delete pkt_buffer; @@ -817,6 +793,7 @@ TrexRpcCmdSetL2::_run(const Json::Value ¶ms, Json::Value &result) { generate_execute_err(result, ex.what()); } + result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); } @@ -874,7 +851,186 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { } } - + + result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); } + + +/** + * capture command tree + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status", "remove"}, result); + + if (cmd == "start") { + parse_cmd_start(params, result); + } else if (cmd == "stop") { + parse_cmd_stop(params, result); + } else if (cmd == "fetch") { + parse_cmd_fetch(params, result); + } else if (cmd == "status") { + parse_cmd_status(params, result); + } else if (cmd == "remove") { + parse_cmd_remove(params, result); + } else { + /* can't happen */ + assert(0); + } + + return TREX_RPC_CMD_OK; +} + +/** + * starts PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { + + uint32_t limit = parse_uint32(params, "limit", result); + const Json::Value &tx_json = parse_array(params, "tx", result); + const Json::Value &rx_json = parse_array(params, "rx", result); + CaptureFilter filter; + + std::set<uint8_t> ports; + + /* populate the filter */ + for (int i = 0; i < tx_json.size(); i++) { + uint8_t tx_port = parse_byte(tx_json, i, result); + filter.add_tx(tx_port); + ports.insert(tx_port); + } + + for (int i = 0; i < rx_json.size(); i++) { + uint8_t rx_port = parse_byte(rx_json, i, result); + filter.add_rx(rx_port); + ports.insert(rx_port); + } + + /* check that all ports are under service mode */ + for (uint8_t port_id : ports) { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_parse_err(result, "start_capture is available only under service mode"); + } + } + + static MsgReply<TrexCaptureRCStart> reply; + reply.reset(); + + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + get_stateless_obj()->send_msg_to_rx(start_msg); + + TrexCaptureRCStart rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["capture_id"] = rc.get_new_id(); + result["result"]["ts"] = now_sec(); +} + +/** + * stops PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_stop(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + static MsgReply<TrexCaptureRCStop> reply; + reply.reset(); + + TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(stop_msg); + + TrexCaptureRCStop rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkt_count"] = rc.get_pkt_count(); +} + +/** + * gets the status of all captures in the system + * + */ +void +TrexRpcCmdCapture::parse_cmd_status(const Json::Value ¶ms, Json::Value &result) { + + /* generate a status command */ + + static MsgReply<TrexCaptureRCStatus> reply; + reply.reset(); + + TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply); + get_stateless_obj()->send_msg_to_rx(status_msg); + + TrexCaptureRCStatus rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = rc.get_status(); +} + +/** + * fetch packets from a capture + * + */ +void +TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result); + + /* generate a fetch command */ + + static MsgReply<TrexCaptureRCFetch> reply; + reply.reset(); + + TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply); + get_stateless_obj()->send_msg_to_rx(fetch_msg); + + TrexCaptureRCFetch rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); + + result["result"]["pending"] = rc.get_pending(); + result["result"]["start_ts"] = rc.get_start_ts(); + result["result"]["pkts"] = pkt_buffer->to_json(); + + /* delete the buffer */ + delete pkt_buffer; +} + +void +TrexRpcCmdCapture::parse_cmd_remove(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + /* generate a remove command */ + + static MsgReply<TrexCaptureRCRemove> reply; + reply.reset(); + + TrexStatelessRxCaptureRemove *remove_msg = new TrexStatelessRxCaptureRemove(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(remove_msg); + + TrexCaptureRCRemove rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = Json::objectValue; +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 6639be7b..54797bdf 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -94,8 +94,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1, TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE); TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE, - - int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result); ); @@ -150,16 +148,26 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_ TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE, - void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, true, APIClass::API_CLASS_TYPE_CORE, void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); ); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIClass::API_CLASS_TYPE_CORE); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE); + +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE, + void parse_cmd_start(const Json::Value &msg, Json::Value &result); + void parse_cmd_stop(const Json::Value &msg, Json::Value &result); + void parse_cmd_status(const Json::Value &msg, Json::Value &result); + void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); + void parse_cmd_remove(const Json::Value ¶ms, Json::Value &result); +); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 94a3e1b9..2af9f4f5 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -75,8 +75,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdSetRxFeature()); register_command(new TrexRpcCmdGetRxQueuePkts()); + register_command(new TrexRpcCmdSetServiceMode()); register_command(new TrexRpcCmdSetL2()); register_command(new TrexRpcCmdSetL3()); + + register_command(new TrexRpcCmdCapture()); } diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp new file mode 100644 index 00000000..f7d47ec0 --- /dev/null +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -0,0 +1,182 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "trex_stateless_pkt.h" +#include <assert.h> + + +/** + * copy MBUF to a flat buffer + * + * @author imarom (12/20/2016) + * + * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) + * bytes + * @param m - MBUF to copy + * + * @return uint8_t* + */ +void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { + + int index = 0; + for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { + const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); + memcpy(dest + index, src, it->data_len); + index += it->data_len; + } +} + +/************************************** + * TRex packet + * + *************************************/ +TrexPkt::TrexPkt(const rte_mbuf_t *m, int port, origin_e origin, uint64_t index) { + + /* allocate buffer */ + m_size = m->pkt_len; + m_raw = new uint8_t[m_size]; + + /* copy data */ + copy_mbuf(m_raw, m); + + /* generate a packet timestamp */ + m_timestamp = now_sec(); + + m_port = port; + m_origin = origin; + m_index = index; +} + +TrexPkt::TrexPkt(const TrexPkt &other) { + m_size = other.m_size; + memcpy(m_raw, other.m_raw, m_size); + + m_timestamp = other.m_timestamp; + + m_port = other.m_port; + m_origin = other.m_origin; + m_index = other.m_index; +} + +TrexPktBuffer::TrexPktBuffer(uint64_t size, mode_e mode) { + m_mode = mode; + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_bytes = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + + /* generate queue */ + m_buffer = new const TrexPkt*[m_size](); // zeroed +} + +TrexPktBuffer::~TrexPktBuffer() { + assert(m_buffer); + + while (!is_empty()) { + const TrexPkt *pkt = pop(); + delete pkt; + } + delete [] m_buffer; +} + +/** + * packet will be copied to an internal object + */ +void +TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uint64_t pkt_index) { + + /* if full - decide by the policy */ + if (is_full()) { + if (m_mode == MODE_DROP_HEAD) { + delete pop(); + } else { + /* drop the tail (current packet) */ + return; + } + } + + /* push packet */ + m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); + m_bytes += m_buffer[m_head]->get_size(); + + m_head = next(m_head); + +} + +/** + * packet will be handled internally + */ +void +TrexPktBuffer::push(const TrexPkt *pkt) { + /* if full - decide by the policy */ + if (is_full()) { + if (m_mode == MODE_DROP_HEAD) { + delete pop(); + } else { + /* drop the tail (current packet) */ + delete pkt; + return; + } + } + + /* push packet */ + m_buffer[m_head] = pkt; + m_head = next(m_head); +} + +const TrexPkt * +TrexPktBuffer::pop() { + assert(!is_empty()); + + const TrexPkt *pkt = m_buffer[m_tail]; + m_tail = next(m_tail); + + m_bytes -= pkt->get_size(); + + return pkt; +} + +uint32_t +TrexPktBuffer::get_element_count() const { + if (m_head >= m_tail) { + return (m_head - m_tail); + } else { + return ( get_capacity() - (m_tail - m_head - 1) ); + } +} + +Json::Value +TrexPktBuffer::to_json() const { + + Json::Value output = Json::arrayValue; + + int tmp = m_tail; + while (tmp != m_head) { + const TrexPkt *pkt = m_buffer[tmp]; + output.append(pkt->to_json()); + tmp = next(tmp); + } + + return output; +} + + diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h new file mode 100644 index 00000000..1b6bd2f8 --- /dev/null +++ b/src/stateless/common/trex_stateless_pkt.h @@ -0,0 +1,201 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TREX_STATELESS_PKT_H__ +#define __TREX_STATELESS_PKT_H__ + +#include <stdint.h> +#include <json/json.h> +#include "mbuf.h" +#include "common/base64.h" +#include "os_time.h" + + +/** + * copies MBUF to a flat buffer + * + * @author imarom (1/1/2017) + * + * @param dest + * @param m + */ +void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m); + +/** + * describes a single saved packet + * + */ +class TrexPkt { +public: + + enum origin_e { + ORIGIN_NONE = 1, + ORIGIN_TX, + ORIGIN_RX + }; + + TrexPkt(const rte_mbuf_t *m, int port = -1, origin_e origin = ORIGIN_NONE, uint64_t index = 0); + TrexPkt(const TrexPkt &other); + + void set_index(uint64_t index) { + m_index = index; + } + + /* slow path and also RVO - pass by value is ok */ + Json::Value to_json() const { + Json::Value output; + output["ts"] = m_timestamp; + output["binary"] = base64_encode(m_raw, m_size); + output["port"] = m_port; + output["index"] = Json::UInt64(m_index); + + switch (m_origin) { + case ORIGIN_TX: + output["origin"] = "TX"; + break; + case ORIGIN_RX: + output["origin"] = "RX"; + break; + default: + output["origin"] = "NONE"; + break; + } + + return output; + } + + ~TrexPkt() { + if (m_raw) { + delete [] m_raw; + } + } + + origin_e get_origin() const { + return m_origin; + } + + int get_port() const { + return m_port; + } + + uint16_t get_size() const { + return m_size; + } + + dsec_t get_ts() const { + return m_timestamp; + } + +private: + + uint8_t *m_raw; + uint16_t m_size; + dsec_t m_timestamp; + origin_e m_origin; + int m_port; + uint64_t m_index; +}; + + +class TrexPktBuffer { +public: + + /** + * two modes for operations: + * + * MODE_DROP_HEAD - when the buffer is full, packets will be + * dropped from the head (the oldest packet) + * + * MODE_DROP_TAIL - when the buffer is full, packets will be + * dropped from the tail (the current packet) + */ + enum mode_e { + MODE_DROP_HEAD = 1, + MODE_DROP_TAIL = 2, + }; + + TrexPktBuffer(uint64_t size, mode_e mode = MODE_DROP_TAIL); + ~TrexPktBuffer(); + + /** + * push a packet to the buffer + * + */ + void push(const rte_mbuf_t *m, int port = -1, TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, uint64_t pkt_index = 0); + void push(const TrexPkt *pkt); + + /** + * pops a packet from the buffer + * usually for internal usage + */ + const TrexPkt * pop(); + + /** + * generate a JSON output of the queue + * + */ + Json::Value to_json() const; + + + bool is_empty() const { + return (m_head == m_tail); + } + + bool is_full() const { + return ( next(m_head) == m_tail); + } + + /** + * return the total amount of space possible + */ + uint32_t get_capacity() const { + /* one slot is used for diff between full/empty */ + return (m_size - 1); + } + + mode_e get_mode() const { + return m_mode; + } + + /** + * returns how many elements are in the queue + */ + uint32_t get_element_count() const; + + uint32_t get_bytes() const { + return m_bytes; + } + +private: + int next(int v) const { + return ( (v + 1) % m_size ); + } + + mode_e m_mode; + int m_head; + int m_tail; + int m_size; + uint32_t m_bytes; + const TrexPkt **m_buffer; +}; + + +#endif /* __TREX_STATELESS_PKT_H__*/ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index c31ba0a5..6ab9b417 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -18,13 +18,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include <trex_stateless.h> -#include <trex_stateless_port.h> -#include <sched.h> #include <iostream> #include <unistd.h> +#include "trex_stateless.h" +#include "trex_stateless_port.h" +#include "trex_stateless_messaging.h" + using namespace std; /*********************************************************** @@ -141,53 +142,10 @@ TrexStateless::get_dp_core_count() { } void -TrexStateless::encode_stats(Json::Value &global) { - - TrexPlatformGlobalStats stats; - m_platform_api->get_global_stats(stats); - - global["cpu_util"] = stats.m_stats.m_cpu_util; - global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; - - global["tx_bps"] = stats.m_stats.m_tx_bps; - global["rx_bps"] = stats.m_stats.m_rx_bps; - - global["tx_pps"] = stats.m_stats.m_tx_pps; - global["rx_pps"] = stats.m_stats.m_rx_pps; - - global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - - global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); - - global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); - - for (uint8_t i = 0; i < m_port_count; i++) { - std::stringstream ss; - - ss << "port " << i; - Json::Value &port_section = global[ss.str()]; +TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const { - m_ports[i]->encode_stats(port_section); - } + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + ring->Enqueue((CGenNode *)msg); } -/** - * generate a snapshot for publish (async publish) - * - */ -void -TrexStateless::generate_publish_snapshot(std::string &snapshot) { - Json::FastWriter writer; - Json::Value root; - - root["name"] = "trex-stateless-info"; - root["type"] = 0; - - /* stateless specific info goes here */ - root["data"] = Json::nullValue; - - snapshot = writer.write(root); -} diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 3a1a2c24..87d227f6 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -132,32 +132,14 @@ public: /** - * shutdown the server + * send a message to the RX core */ - void shutdown(); - + void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const; + /** - * fetch xstats names (keys of dict) - * - */ - void encode_xstats_names(Json::Value &global); - - /** - * fetch xstats values - * - */ - void encode_xstats_values(Json::Value &global); - - /** - * fetch all the stats - * - */ - void encode_stats(Json::Value &global); - - /** - * generate a snapshot for publish + * shutdown the server */ - void generate_publish_snapshot(std::string &snapshot); + void shutdown(); const TrexPlatformApi * get_platform_api() { return (m_platform_api); diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 7d331c6e..9cf048b0 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -162,7 +162,7 @@ private: * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) { std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; m_port_id = port_id; @@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() { } } -void -TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - static MsgReply<bool> reply; - - reply.reset(); - - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply); - send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg); - - /* as below, must wait for ACK from RX core before returning ACK */ - reply.wait_for_reply(); -} - -void -TrexStatelessPort::stop_rx_capture() { - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); - send_message_to_rx(msg); -} void TrexStatelessPort::start_rx_queue(uint64_t size) { @@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) { this might cause the user to lose some packets from the queue */ reply.wait_for_reply(); + + m_service_mode.set_rx_queue(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); + + m_service_mode.unset_rx_queue(); } -const RXPacketBuffer * +const TrexPktBuffer * TrexStatelessPort::get_rx_queue_pkts() { - static MsgReply<const RXPacketBuffer *> reply; + static MsgReply<const TrexPktBuffer *> reply; reply.reset(); diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index d4ac4018..0ef8ae60 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -26,12 +26,14 @@ limitations under the License. #include "trex_dp_port_events.h" #include "trex_stateless_rx_defs.h" #include "trex_stream.h" +#include "trex_exception.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToDpMsgBase; class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -class RXPacketBuffer; +class TrexPktBuffer; /** @@ -113,6 +115,56 @@ private: static const std::string g_unowned_handler; }; +/** + * enforces in/out from service mode + * + * @author imarom (1/4/2017) + */ +class TrexServiceMode { +public: + TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) { + m_is_enabled = false; + m_has_rx_queue = false; + m_port_id = port_id; + m_port_attr = api->getPortAttrObj(port_id); + } + + void enable() { + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL); + m_is_enabled = true; + } + + void disable() { + if (m_has_rx_queue) { + throw TrexException("unable to disable service mode - please remove RX queue"); + } + + if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); + } + + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); + m_is_enabled = false; + } + + bool is_enabled() const { + return m_is_enabled; + } + + void set_rx_queue() { + m_has_rx_queue = true; + } + + void unset_rx_queue() { + m_has_rx_queue = false; + } + +private: + bool m_is_enabled; + bool m_has_rx_queue; + TRexPortAttr *m_port_attr; + uint8_t m_port_id; +}; class AsyncStopEvent; @@ -150,7 +202,15 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - + /** + * port capture mode + */ + enum capture_mode_e { + PORT_CAPTURE_NONE = 0, + PORT_CAPTURE_RX, + PORT_CAPTURE_ALL + }; + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); ~TrexStatelessPort(); @@ -227,6 +287,20 @@ public: double duration, bool is_dual); + /** + * moves port to / out service mode + */ + void set_service_mode(bool enabled) { + if (enabled) { + m_service_mode.enable(); + } else { + m_service_mode.disable(); + } + } + bool is_service_mode_on() const { + return m_service_mode.is_enabled(); + } + /** * get the port state * @@ -365,19 +439,6 @@ public: void get_pci_info(std::string &pci_addr, int &numa_node); - - /** - * enable RX capture on port - * - */ - void start_rx_capture(const std::string &pcap_filename, uint64_t limit); - - /** - * disable RX capture if on - * - */ - void stop_rx_capture(); - /** * start RX queueing of packets * @@ -398,7 +459,7 @@ public: * fetch the RX queue packets from the queue * */ - const RXPacketBuffer *get_rx_queue_pkts(); + const TrexPktBuffer *get_rx_queue_pkts(); /** * configures port for L2 mode @@ -429,7 +490,9 @@ public: } private: - + void set_service_mode_on(); + void set_service_mode_off(); + bool is_core_active(int core_id); const std::vector<uint8_t> get_core_id_list () { @@ -514,6 +577,9 @@ private: TrexPortOwner m_owner; int m_pending_async_stop_event; + + TrexServiceMode m_service_mode; + static const uint32_t MAX_STREAMS = 20000; }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index aeb1e677..2452487c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -262,23 +262,72 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool -TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit); +TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStart start_rc; + + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + + /* mark as done */ + m_reply.set_reply(start_rc); + + return true; +} +bool +TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStop stop_rc; + + TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc); + /* mark as done */ - m_reply.set_reply(true); + m_reply.set_reply(stop_rc); return true; } bool -TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - rx_core->stop_recorder(m_port_id); +TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCFetch fetch_rc; + + TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc); + + /* mark as done */ + m_reply.set_reply(fetch_rc); + + return true; +} +bool +TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStatus status_rc; + + status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + + /* mark as done */ + m_reply.set_reply(status_rc); + return true; } bool +TrexStatelessRxCaptureRemove::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCRemove remove_rc; + + TrexStatelessCaptureMngr::getInstance().remove(m_capture_id, remove_rc); + + /* mark as done */ + m_reply.set_reply(remove_rc); + + return true; +} + + +bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size); @@ -299,7 +348,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { - const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); + const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); /* set the reply */ m_reply.set_reply(pkt_buffer); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 72b92d11..3535ad4f 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -28,12 +28,13 @@ limitations under the License. #include "trex_stateless_rx_defs.h" #include "os_time.h" #include "utl_ip.h" +#include "trex_stateless_capture.h" class TrexStatelessDpCore; class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; -class RXPacketBuffer; +class TrexPktBuffer; /** * Generic message reply object @@ -484,39 +485,85 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; +class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase { +public: + virtual bool handle (CRxCoreStateless *rx_core) = 0; +}; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: - TrexStatelessRxStartCapture(uint8_t port_id, - const std::string &pcap_filename, + TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, - MsgReply<bool> &reply) : m_reply(reply) { + MsgReply<TrexCaptureRCStart> &reply) : m_reply(reply) { - m_port_id = port_id; - m_limit = limit; - m_pcap_filename = pcap_filename; + m_limit = limit; + m_filter = filter; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - std::string m_pcap_filename; - uint64_t m_limit; - MsgReply<bool> &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply<TrexCaptureRCStart> &m_reply; }; -class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture { public: - TrexStatelessRxStopCapture(uint8_t port_id) { - m_port_id = port_id; + TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply<TrexCaptureRCStop> &reply) : m_reply(reply) { + m_capture_id = capture_id; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + capture_id_t m_capture_id; + MsgReply<TrexCaptureRCStop> &m_reply; +}; + + +class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply<TrexCaptureRCFetch> &reply) : m_reply(reply) { + m_capture_id = capture_id; + m_pkt_limit = pkt_limit; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + uint32_t m_pkt_limit; + MsgReply<TrexCaptureRCFetch> &m_reply; +}; + + +class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStatus(MsgReply<TrexCaptureRCStatus> &reply) : m_reply(reply) { + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + MsgReply<TrexCaptureRCStatus> &m_reply; +}; + + + +class TrexStatelessRxCaptureRemove : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureRemove(capture_id_t capture_id, MsgReply<TrexCaptureRCRemove> &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply<TrexCaptureRCRemove> &m_reply; }; @@ -556,7 +603,7 @@ private: class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) { + TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const TrexPktBuffer *> &reply) : m_reply(reply) { m_port_id = port_id; } @@ -568,7 +615,7 @@ public: private: uint8_t m_port_id; - MsgReply<const RXPacketBuffer *> &m_reply; + MsgReply<const TrexPktBuffer *> &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp new file mode 100644 index 00000000..f0d4e806 --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -0,0 +1,258 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "trex_stateless_capture.h" +#include "trex_exception.h" + +TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter) { + m_id = id; + m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); + m_filter = filter; + m_state = STATE_ACTIVE; + m_start_ts = now_sec(); + m_pkt_index = 0; +} + +TrexStatelessCapture::~TrexStatelessCapture() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } +} + +void +TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + delete pkt; + return; + } + + /* if not in filter - back off */ + if (!m_filter.in_filter(pkt)) { + delete pkt; + return; + } + + if (pkt->get_ts() < m_start_ts) { + delete pkt; + return; + } + + pkt->set_index(++m_pkt_index); + m_pkt_buffer->push(pkt); +} + +void +TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + + if (!m_filter.in_rx(port)) { + return; + } + + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index); +} + + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + } + + return output; +} + +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* harder part - partial fetch */ + TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); + for (int i = 0; i < pkt_limit; i++) { + const TrexPkt *pkt = m_pkt_buffer->pop(); + partial->push(pkt); + } + + pending = m_pkt_buffer->get_element_count(); + + return partial; +} + +void +TrexStatelessCaptureMngr::update_global_filter() { + CaptureFilter new_filter; + + for (TrexStatelessCapture *capture : m_captures) { + new_filter += capture->get_filter(); + } + + m_global_filter = new_filter; +} + +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { + + if (m_captures.size() > MAX_CAPTURE_SIZE) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; + } + + + int new_id = m_id_counter++; + TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); + m_captures.push_back(new_buffer); + + /* update global filter */ + update_global_filter(); + + /* result */ + rc.set_new_id(new_id); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_count(capture->get_pkt_count()); +} + +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + + int index = -1; + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + index = i; + break; + } + } + + /* does not exist */ + if (index == -1) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + TrexStatelessCapture *capture = m_captures[index]; + m_captures.erase(m_captures.begin() + index); + + /* free memory */ + delete capture; + + /* update global filter */ + update_global_filter(); + + rc.set_ok(); +} + +void +TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + + while (m_captures.size() > 0) { + remove(m_captures[0]->get_id(), dummy); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_tx(pkt); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_rx(m, port); + } +} + +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h new file mode 100644 index 00000000..bc1b88c5 --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.h @@ -0,0 +1,397 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_H__ +#define __TREX_STATELESS_CAPTURE_H__ + +#include <stdint.h> +#include <assert.h> + +#include "trex_stateless_pkt.h" +#include "trex_stateless_capture_msg.h" + +typedef int32_t capture_id_t; + +class TrexCaptureRC { +public: + + TrexCaptureRC() { + m_rc = RC_INVALID; + m_pkt_buffer = NULL; + } + + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + Json::Value get_json() const { + return m_json_rc; + } + +public: + rc_e m_rc; + capture_id_t m_capture_id; + TrexPktBuffer *m_pkt_buffer; + Json::Value m_json_rc; +}; + +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_new_id(capture_id_t new_id) { + m_capture_id = new_id; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + return m_capture_id; + } + +private: + capture_id_t m_capture_id; +}; + + +class TrexCaptureRCStop : public TrexCaptureRC { +public: + void set_count(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + TrexCaptureRCFetch() { + m_pkt_buffer = nullptr; + m_pending = 0; + } + + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + return m_pkt_buffer; + } + + uint32_t get_pending() const { + return m_pending; + } + + dsec_t get_start_ts() const { + return m_start_ts; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; + dsec_t m_start_ts; +}; + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_ok() { + m_rc = RC_OK; + } +}; + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_status(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + return m_json; + } + +private: + Json::Value m_json; +}; + +/** + * capture filter + * specify which ports to capture and if TX/RX or both + */ +class CaptureFilter { +public: + CaptureFilter() { + m_tx_active = 0; + m_rx_active = 0; + } + + void add_tx(uint8_t port_id) { + m_tx_active |= (1LL << port_id); + } + + void add_rx(uint8_t port_id) { + m_rx_active |= (1LL << port_id); + } + + void add(uint8_t port_id) { + add_tx(port_id); + add_rx(port_id); + } + + bool in_filter(const TrexPkt *pkt) { + switch (pkt->get_origin()) { + case TrexPkt::ORIGIN_TX: + return in_tx(pkt->get_port()); + + case TrexPkt::ORIGIN_RX: + return in_rx(pkt->get_port()); + + default: + return false; + } + } + + bool in_rx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((m_rx_active & bit) == bit); + } + + bool in_tx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((m_tx_active & bit) == bit); + } + + bool in_any(uint8_t port_id) const { + return ( in_tx(port_id) || in_rx(port_id) ); + } + + CaptureFilter& operator +=(const CaptureFilter &other) { + m_tx_active |= other.m_tx_active; + m_rx_active |= other.m_rx_active; + + return *this; + } + + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + +private: + + uint64_t m_tx_active; + uint64_t m_rx_active; +}; + + +class TrexStatelessCapture { +public: + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; + + TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); + + void handle_pkt_tx(TrexPkt *pkt); + void handle_pkt_rx(const rte_mbuf_t *m, int port); + + ~TrexStatelessCapture(); + + uint64_t get_id() const { + return m_id; + } + + const CaptureFilter & get_filter() const { + return m_filter; + } + + Json::Value to_json() const; + + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + + dsec_t get_start_ts() const { + return m_start_ts; + } + +private: + state_e m_state; + TrexPktBuffer *m_pkt_buffer; + dsec_t m_start_ts; + CaptureFilter m_filter; + uint64_t m_id; + uint64_t m_pkt_index; +}; + +class TrexStatelessCaptureMngr { + +public: + + static TrexStatelessCaptureMngr& getInstance() { + static TrexStatelessCaptureMngr instance; + + return instance; + } + + + ~TrexStatelessCaptureMngr() { + reset(); + } + + /** + * starts a new capture + */ + void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); + + /** + * stops an existing capture + * + */ + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + + + /** + * removes all captures + * + */ + void reset(); + + + /** + * return true if any filter is active + * + * @author imarom (1/3/2017) + * + * @return bool + */ + bool is_active(uint8_t port) const { + return m_global_filter.in_any(port); + } + + /** + * handle packet from TX + */ + void handle_pkt_tx(TrexPkt *pkt); + + /** + * handle packet from RX + */ + void handle_pkt_rx(const rte_mbuf_t *m, int port) { + /* fast path */ + if (!is_active(port)) { + return; + } + + /* slow path */ + handle_pkt_rx_slow_path(m, port); + } + + Json::Value to_json() const; + +private: + + TrexStatelessCaptureMngr() { + /* init this to 1 */ + m_id_counter = 1; + } + + + TrexStatelessCapture * lookup(capture_id_t capture_id); + + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void update_global_filter(); + + std::vector<TrexStatelessCapture *> m_captures; + + capture_id_t m_id_counter; + + /* a union of all the filters curently active */ + CaptureFilter m_global_filter; + + static const int MAX_CAPTURE_SIZE = 10; +}; + +#endif /* __TREX_STATELESS_CAPTURE_H__ */ + diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d27485de..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_capture = false; m_max_ports = cfg.m_max_ports; - + m_tx_cores = cfg.m_tx_cores; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); m_ring_from_cp = cp_rx->getRingCpToDp(0); - m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { @@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { } +void +CRxCoreStateless::periodic_check_for_dp_messages() { + + for (int i = 0; i < m_tx_cores; i++) { + periodic_check_for_dp_messages_core(i); + } + +} + +void +CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) { + + CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id); + + /* fast path */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while (true) { + CGenNode *node = NULL; + + if (ring->Dequeue(node) != 0) { + break; + } + + //assert(node); + } +} + void CRxCoreStateless::recalculate_next_state() { if (m_state == STATE_QUIT) { return; @@ -176,16 +206,6 @@ void CRxCoreStateless::idle_state_loop() { } /** - * for each port give a tick (for flushing if needed) - * - */ -void CRxCoreStateless::port_manager_tick() { - for (int i = 0; i < m_max_ports; i++) { - m_rx_port_mngr[i].tick(); - } -} - -/** * for each port handle the grat ARP mechansim * */ @@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() { /* set the next sync time to */ dsec_t sync_time_sec = now_sec() + (1.0 / 1000); - dsec_t tick_time_sec = now_sec() + 1.0; dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per; while (m_state == STATE_WORKING) { @@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() { if ( (now - sync_time_sec) > 0 ) { periodic_check_for_cp_messages(); + //periodic_check_for_dp_messages(); sync_time_sec = now + (1.0 / 1000); } - if ( (now - tick_time_sec) > 0) { - port_manager_tick(); - tick_time_sec = now + 1.0; - } - if ( (now - grat_arp_sec) > 0) { handle_grat_arp(); grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per; @@ -255,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -318,18 +329,6 @@ double CRxCoreStateless::get_cpu_util() { void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); - recalculate_next_state(); -} - -void -CRxCoreStateless::stop_recorder(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_recorder(); - recalculate_next_state(); -} - -void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); recalculate_next_state(); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 4eed59a1..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -27,6 +27,7 @@ #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" #include "trex_stateless_rx_port_mngr.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToRxMsgBase; @@ -127,17 +128,10 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing of RX packets on a specific port - * - */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); - void stop_recorder(uint8_t port_id); - + /** * start RX queueing of packets * @@ -162,17 +156,20 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + + void periodic_check_for_dp_messages(); + void periodic_check_for_dp_messages_core(uint32_t core_id); + void tickle(); void idle_state_loop(); void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); - void port_manager_tick(); void handle_grat_arp(); int process_all_pending_pkts(bool flush_rx = false); @@ -186,10 +183,10 @@ class CRxCoreStateless { private: TrexMonitor m_monitor; uint32_t m_max_ports; + uint32_t m_tx_cores; bool m_capture; state_e m_state; CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index aefcc133..367cf4e3 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -38,10 +38,12 @@ class CRxSlCfg { m_max_ports = 0; m_cps = 0.0; m_num_crc_fix_bytes = 0; + m_tx_cores = 0; } public: uint32_t m_max_ports; + uint32_t m_tx_cores; double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; uint8_t m_num_crc_fix_bytes; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index caed2bee..ede86062 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -20,48 +20,10 @@ */ #include "bp_sim.h" #include "trex_stateless_rx_port_mngr.h" -#include "common/captureFile.h" #include "trex_stateless_rx_core.h" #include "common/Network/Packet/Arp.h" #include "pkt_gen.h" - -/** - * copy MBUF to a flat buffer - * - * @author imarom (12/20/2016) - * - * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) - * bytes - * @param m - MBUF to copy - * - * @return uint8_t* - */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { - - int index = 0; - for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { - const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); - memcpy(dest + index, src, it->data_len); - index += it->data_len; - } -} - -/************************************** - * RX packet - * - *************************************/ -RXPacket::RXPacket(const rte_mbuf_t *m) { - - /* allocate buffer */ - m_size = m->pkt_len; - m_raw = new uint8_t[m_size]; - - /* copy data */ - copy_mbuf(m_raw, m); - - /* generate a packet timestamp */ - m_timestamp = now_sec(); -} +#include "trex_stateless_capture.h" /************************************** * latency RX feature @@ -231,79 +193,12 @@ RXLatency::to_json() const { * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - - /* generate queue */ - m_buffer = new RXPacket*[m_size](); // zeroed -} - -RXPacketBuffer::~RXPacketBuffer() { - assert(m_buffer); - - while (!is_empty()) { - RXPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; -} - -void -RXPacketBuffer::push(const rte_mbuf_t *m) { - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - /* push packet */ - m_buffer[m_head] = new RXPacket(m); - m_head = next(m_head); -} - -RXPacket * -RXPacketBuffer::pop() { - assert(!is_empty()); - - RXPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - - return pkt; -} - -uint64_t -RXPacketBuffer::get_element_count() const { - if (m_head >= m_tail) { - return (m_head - m_tail); - } else { - return ( get_capacity() - (m_tail - m_head - 1) ); - } -} - -Json::Value -RXPacketBuffer::to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RXPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; -} - - void RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size); + m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD); } void @@ -314,7 +209,7 @@ RXQueue::stop() { } } -const RXPacketBuffer * +const TrexPktBuffer * RXQueue::fetch() { /* if no buffer or the buffer is empty - give a NULL one */ @@ -323,10 +218,10 @@ RXQueue::fetch() { } /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; + TrexPktBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); + m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode()); return old_buffer; } @@ -348,97 +243,6 @@ RXQueue::to_json() const { return output; } -/************************************** - * RX feature recorder - * - *************************************/ - -RXPacketRecorder::RXPacketRecorder() { - m_writer = NULL; - m_count = 0; - m_limit = 0; - m_epoch = -1; - - m_pending_flush = false; -} - -void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { - m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); - if (m_writer == NULL) { - std::stringstream ss; - ss << "unable to create PCAP file: " << pcap; - throw TrexException(ss.str()); - } - - assert(limit > 0); - - m_limit = limit; - m_count = 0; - m_pending_flush = false; - m_pcap_filename = pcap; -} - -void -RXPacketRecorder::stop() { - if (!m_writer) { - return; - } - - delete m_writer; - m_writer = NULL; -} - -void -RXPacketRecorder::flush_to_disk() { - - if (m_writer && m_pending_flush) { - m_writer->flush_to_disk(); - m_pending_flush = false; - } -} - -void -RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { - if (!m_writer) { - return; - } - - dsec_t now = now_sec(); - if (m_epoch < 0) { - m_epoch = now; - } - - dsec_t dt = now - m_epoch; - - CPktNsecTimeStamp t_c(dt); - m_pkt.time_nsec = t_c.m_time_nsec; - m_pkt.time_sec = t_c.m_time_sec; - - copy_mbuf((uint8_t *)m_pkt.raw, m); - m_pkt.pkt_len = m->pkt_len; - - m_writer->write_packet(&m_pkt); - m_count++; - m_pending_flush = true; - - if (m_count == m_limit) { - stop(); - } - -} - -Json::Value -RXPacketRecorder::to_json() const { - Json::Value output = Json::objectValue; - - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_count); - - return output; -} - /************************************** * RX feature server (ARP, ICMP) and etc. @@ -789,10 +593,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(RECORDER)) { - m_recorder.handle_pkt(m); - } - if (is_feature_set(QUEUE)) { m_queue.handle_pkt(m); } @@ -800,6 +600,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { if (is_feature_set(SERVER)) { m_server.handle_pkt(m); } + + /* capture */ + TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id); } int RXPortManager::process_all_pending_pkts(bool flush_rx) { @@ -838,13 +641,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { } void -RXPortManager::tick() { - if (is_feature_set(RECORDER)) { - m_recorder.flush_to_disk(); - } -} - -void RXPortManager::send_next_grat_arp() { if (is_feature_set(GRAT_ARP)) { m_grat_arp.send_next_grat_arp(); @@ -890,13 +686,6 @@ RXPortManager::to_json() const { output["latency"]["is_active"] = false; } - if (is_feature_set(RECORDER)) { - output["sniffer"] = m_recorder.to_json(); - output["sniffer"]["is_active"] = true; - } else { - output["sniffer"]["is_active"] = false; - } - if (is_feature_set(QUEUE)) { output["queue"] = m_queue.to_json(); output["queue"]["is_active"] = true; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 6efdae64..0cc60716 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -25,8 +25,7 @@ #include <stdint.h> #include "common/base64.h" -#include "common/captureFile.h" - +#include "trex_stateless_pkt.h" class CPortLatencyHWBase; class CRFC2544Info; @@ -80,97 +79,12 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/** - * describes a single saved RX packet - * - */ -class RXPacket { -public: - - RXPacket(const rte_mbuf_t *m); - - /* slow path and also RVO - pass by value is ok */ - Json::Value to_json() { - Json::Value output; - output["ts"] = m_timestamp; - output["binary"] = base64_encode(m_raw, m_size); - return output; - } - - ~RXPacket() { - if (m_raw) { - delete [] m_raw; - } - } - -private: - - uint8_t *m_raw; - uint16_t m_size; - dsec_t m_timestamp; -}; - /************************************** * RX feature queue * *************************************/ -class RXPacketBuffer { -public: - - RXPacketBuffer(uint64_t size); - ~RXPacketBuffer(); - - /** - * push a packet to the buffer - * - */ - void push(const rte_mbuf_t *m); - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const; - - - bool is_empty() const { - return (m_head == m_tail); - } - - bool is_full() const { - return ( next(m_head) == m_tail); - } - - /** - * return the total amount of space possible - */ - uint64_t get_capacity() const { - /* one slot is used for diff between full/empty */ - return (m_size - 1); - } - - /** - * returns how many elements are in the queue - */ - uint64_t get_element_count() const; - -private: - int next(int v) const { - return ( (v + 1) % m_size ); - } - - /* pop in case of full queue - internal usage */ - RXPacket * pop(); - - int m_head; - int m_tail; - int m_size; - RXPacket **m_buffer; -}; - - class RXQueue { public: RXQueue() { @@ -191,7 +105,7 @@ public: * fetch the current buffer * return NULL if no packets */ - const RXPacketBuffer * fetch(); + const TrexPktBuffer * fetch(); /** * stop RX queue @@ -204,42 +118,7 @@ public: Json::Value to_json() const; private: - RXPacketBuffer *m_pkt_buffer; -}; - -/************************************** - * RX feature PCAP recorder - * - *************************************/ - -class RXPacketRecorder { -public: - RXPacketRecorder(); - - ~RXPacketRecorder() { - stop(); - } - - void start(const std::string &pcap, uint64_t limit); - void stop(); - void handle_pkt(const rte_mbuf_t *m); - - /** - * flush any cached packets to disk - * - */ - void flush_to_disk(); - - Json::Value to_json() const; - -private: - CFileWriterBase *m_writer; - std::string m_pcap_filename; - CCapPktRaw m_pkt; - dsec_t m_epoch; - uint64_t m_limit; - uint64_t m_count; - bool m_pending_flush; + TrexPktBuffer *m_pkt_buffer; }; @@ -311,7 +190,6 @@ public: enum feature_t { NO_FEATURES = 0x0, LATENCY = 0x1, - RECORDER = 0x2, QUEUE = 0x4, SERVER = 0x8, GRAT_ARP = 0x10, @@ -354,17 +232,6 @@ public: unset_feature(LATENCY); } - /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts) { - m_recorder.start(pcap, limit_pkts); - set_feature(RECORDER); - } - - void stop_recorder() { - m_recorder.stop(); - unset_feature(RECORDER); - } - /* queue */ void start_queue(uint32_t size) { m_queue.start(size); @@ -376,7 +243,7 @@ public: unset_feature(QUEUE); } - const RXPacketBuffer *get_pkt_buffer() { + const TrexPktBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -415,13 +282,6 @@ public: void handle_pkt(const rte_mbuf_t *m); /** - * maintenance - * - * @author imarom (11/24/2016) - */ - void tick(); - - /** * send next grat arp (if on) * * @author imarom (12/13/2016) @@ -482,7 +342,6 @@ private: uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; - RXPacketRecorder m_recorder; RXQueue m_queue; RXServer m_server; RXGratARP m_grat_arp; |