diff options
45 files changed, 2218 insertions, 1045 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py index eac46ac7..951d89b1 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -91,12 +91,20 @@ def configure(conf): conf.load('g++') verify_cc_version(conf.env) +bp_sim_main = SrcGroup(dir='src', + src_list=['main.cpp']) + +bp_sim_gtest = SrcGroup(dir='src', + src_list=[ + 'bp_gtest.cpp', + 'gtest/tuple_gen_test.cpp', + 'gtest/nat_test.cpp', + 'gtest/trex_stateless_gtest.cpp' + ]) main_src = SrcGroup(dir='src', src_list=[ - 'main.cpp', 'bp_sim.cpp', - 'bp_gtest.cpp', 'os_time.cpp', 'rx_check.cpp', 'tuple_gen.cpp', @@ -110,9 +118,6 @@ main_src = SrcGroup(dir='src', 'utl_cpuu.cpp', 'msg_manager.cpp', - 'gtest/tuple_gen_test.cpp', - 'gtest/nat_test.cpp', - 'gtest/trex_stateless_gtest.cpp', 'pal/linux/pal_utl.cpp', 'pal/linux/mbuf.cpp' @@ -146,7 +151,9 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stream_vm.cpp', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', - 'dp/trex_stateless_dp_core.cpp' + 'cp/trex_streams_compiler.cpp', + 'dp/trex_stateless_dp_core.cpp', + 'messaging/trex_stateless_messaging.cpp', ]) # RPC code rpc_server_src = SrcGroup(dir='src/rpc-server/', @@ -168,9 +175,8 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/', rpc_server_mock_src = SrcGroup(dir='src/mock/', src_list=[ 'trex_rpc_server_mock.cpp', + 'trex_platform_api_mock.cpp', '../gtest/rpc_test.cpp', - '../pal/linux/mbuf.cpp', - '../os_time.cpp', ]) # JSON package @@ -179,12 +185,6 @@ json_src = SrcGroup(dir='external_libs/json', 'jsoncpp.cpp' ]) -rpc_server_mock = SrcGroups([cmn_src, - rpc_server_src, - rpc_server_mock_src, - stateless_src, - json_src - ]) yaml_src = SrcGroup(dir='external_libs/yaml-cpp/src/', src_list=[ @@ -216,11 +216,31 @@ yaml_src = SrcGroup(dir='external_libs/yaml-cpp/src/', 'stream.cpp', 'tag.cpp']); + +rpc_server_mock = SrcGroups([ + main_src, + cmn_src, + rpc_server_src, + rpc_server_mock_src, + stateless_src, + json_src, + yaml_src, + net_src, + ]) + +# REMOVE ME - need to decide if stateless is part of bp sim or not +bp_hack_for_compile = SrcGroup(dir='/src/stub/', + src_list=['trex_stateless_stub.cpp' + ]) + bp =SrcGroups([ + bp_sim_main, + bp_sim_gtest, main_src, cmn_src , net_src , yaml_src, + bp_hack_for_compile, ]); @@ -242,6 +262,7 @@ includes_path =''' ../src/pal/linux/ ../src/rpc-server/ ../src/stateless/cp/ ../src/stateless/dp/ + ../src/stateless/messaging/ ../external_libs/json/ ../external_libs/zmq/include/ ../external_libs/yaml-cpp/include/ @@ -372,13 +393,11 @@ class build_option: build_types = [ - #build_option(name = "bp-sim", src = bp, debug_mode= DEBUG_, platform = PLATFORM_32, is_pie = False), build_option(name = "bp-sim", src = bp, debug_mode= DEBUG_, platform = PLATFORM_64, is_pie = False), - #build_option(name = "bp-sim", src = bp, debug_mode= RELEASE_,platform = PLATFORM_32, is_pie = False), build_option(name = "bp-sim", src = bp, debug_mode= RELEASE_,platform = PLATFORM_64, is_pie = False), build_option(name = "mock-rpc-server", use = ['zmq'], src = rpc_server_mock, debug_mode= DEBUG_,platform = PLATFORM_64, is_pie = False, - flags = ['-DTREX_RPC_MOCK_SERVER', '-Wall', '-Wno-sign-compare', '-Werror'], + flags = ['-DTREX_RPC_MOCK_SERVER', '-Wall', '-Wno-sign-compare'], rpath = ['.']), ] diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 61a9d4f3..3f3c0950 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -158,7 +158,9 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stream_vm.cpp', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', - 'dp/trex_stateless_dp_core.cpp' + 'cp/trex_streams_compiler.cpp', + 'dp/trex_stateless_dp_core.cpp', + 'messaging/trex_stateless_messaging.cpp' ]) # JSON package json_src = SrcGroup(dir='external_libs/json', @@ -415,6 +417,7 @@ includes_path =''' ../src/pal/linux_dpdk/ ../src/rpc-server/ ../src/stateless/cp/ ../src/stateless/dp/ + ../src/stateless/messaging/ ../external_libs/yaml-cpp/include/ ../external_libs/zmq/include/ diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py new file mode 100644 index 00000000..49ef9506 --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -0,0 +1,184 @@ +#!/router/bin/python + +try: + # support import for Python 2 + import outer_packages +except ImportError: + # support import for Python 3 + import client.outer_packages +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage + +import json +import threading +import time +import datetime +import zmq +import re + +from common.trex_stats import * +from common.trex_streams import * + +# basic async stats class +class TrexAsyncStats(object): + def __init__ (self): + self.ref_point = None + self.current = {} + self.last_update_ts = datetime.datetime.now() + + def update (self, snapshot): + + #update + self.last_update_ts = datetime.datetime.now() + + self.current = snapshot + + if self.ref_point == None: + self.ref_point = self.current + + + def get (self, field): + + if not field in self.current: + return 0 + + return self.current[field] + + def get_rel (self, field): + if not field in self.current: + return 0 + + return self.current[field] - self.ref_point[field] + + + # return true if new data has arrived in the past 2 seconds + def is_online (self): + delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 + return (delta_ms < 2000) + +# describes the general stats provided by TRex +class TrexAsyncStatsGeneral(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsGeneral, self).__init__() + + +# per port stats +class TrexAsyncStatsPort(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsPort, self).__init__() + + +# stats manager +class TrexAsyncStatsManager(): + def __init__ (self): + + self.general_stats = TrexAsyncStatsGeneral() + self.port_stats = {} + + + def get_general_stats (self): + return self.general_stats + + def get_port_stats (self, port_id): + + if not port_id in self.port_stats: + return None + + return self.port_stats[port_id] + + + def update (self, snapshot): + + if snapshot['name'] == 'trex-global': + self.__handle_snapshot(snapshot['data']) + else: + # for now ignore the rest + return + + def __handle_snapshot (self, snapshot): + + general_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in snapshot.iteritems(): + + # match a pattern of ports + m = re.search('.*\-([0-8])', key) + if m: + port_id = m.group(1) + + if not port_id in port_stats: + port_stats[port_id] = {} + + port_stats[port_id][key] = value + + else: + # no port match - general stats + general_stats[key] = value + + # update the general object with the snapshot + self.general_stats.update(general_stats) + + # update all ports + for port_id, data in port_stats.iteritems(): + + if not port_id in self.port_stats: + self.port_stats[port_id] = TrexAsyncStatsPort() + + self.port_stats[port_id].update(data) + + + + + +class CTRexAsyncClient(): + def __init__ (self, port): + + self.port = port + + self.raw_snapshot = {} + + self.stats = TrexAsyncStatsManager() + + + self.tr = "tcp://localhost:{0}".format(self.port) + print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + + self.active = True + self.t = threading.Thread(target = self._run) + + # kill this thread on exit and don't add it to the join list + self.t.setDaemon(True) + self.t.start() + + + def _run (self): + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + self.socket.connect(self.tr) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + + while self.active: + msg = json.loads(self.socket.recv_string()) + + key = msg['name'] + self.raw_snapshot[key] = msg['data'] + + self.stats.update(msg) + + + def get_stats (self): + return self.stats + + def get_raw_snapshot (self): + #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes')) + return self.raw_snapshot + + + def stop (self): + self.active = False + self.t.join() + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 9e1c7cf3..6b1b7b94 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -13,6 +13,8 @@ from common.trex_stats import * from common.trex_streams import * from collections import namedtuple +from trex_async_client import CTRexAsyncClient + RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): @@ -27,10 +29,10 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - def __init__(self, username, server="localhost", port=5050, virtual=False): + def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username - self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual) + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) self.verbose = False self._conn_handler = {} self._active_ports = set() @@ -39,6 +41,9 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None + self._async_client = CTRexAsyncClient(async_port) + + # ----- decorator methods ----- # def force_status(owned=True, active_and_owned=False): def wrapper(func): @@ -100,6 +105,12 @@ class CTRexStatelessClient(object): return rc, err return self._init_sync() + def get_stats_async (self): + return self._async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port + def disconnect(self): return self.comm_link.disconnect() @@ -125,6 +136,10 @@ class CTRexStatelessClient(object): else: return port_ids + def sync_user(self): + return self.transmit("sync_user") + + def get_acquired_ports(self): return self._conn_handler.keys() @@ -264,7 +279,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id, "mul": 1.0}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -272,7 +287,8 @@ class CTRexStatelessClient(object): success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} + "port_id": port_id, + "mul": 1.0} command = RpcCmdData("start_traffic", params) return self._handle_start_traffic_response(command, self.transmit(command.method, command.params), @@ -299,10 +315,10 @@ class CTRexStatelessClient(object): self.transmit(command.method, command.params), self.ack_success_test) - def get_global_stats(self): - command = RpcCmdData("get_global_stats", {}) - return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) - # return self.transmit("get_global_stats") +# def get_global_stats(self): +# command = RpcCmdData("get_global_stats", {}) +# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) +# # return self.transmit("get_global_stats") @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): diff --git a/scripts/automation/trex_control_plane/client_utils/general_utils.py b/scripts/automation/trex_control_plane/client_utils/general_utils.py index 3c025608..69ad14b2 100755 --- a/scripts/automation/trex_control_plane/client_utils/general_utils.py +++ b/scripts/automation/trex_control_plane/client_utils/general_utils.py @@ -24,7 +24,7 @@ def user_input(): def get_current_user(): if pwd: - return pwd.getpwuid( os.geteuid() ).pw_name + return pwd.getpwuid(os.geteuid()).pw_name else: return getpass.getuser() diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index fe94e5ef..58491aba 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -514,3 +514,6 @@ class TrexStatelessClient(JsonRpcClient): return True, resp_list # return self.invoke_rpc_method('add_stream', params = params) + +if __name__ == "__main__": + pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/console/line_parsing.py b/scripts/automation/trex_control_plane/console/line_parsing.py new file mode 100644 index 00000000..34776424 --- /dev/null +++ b/scripts/automation/trex_control_plane/console/line_parsing.py @@ -0,0 +1,5 @@ +__author__ = 'danklei' + + +if __name__ == "__main__": + pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 51a1f8cc..e707a9e1 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -31,10 +31,8 @@ import trex_root_path from common.trex_streams import * from client.trex_stateless_client import CTRexStatelessClient from common.text_opts import * -from client_utils.general_utils import user_input +from client_utils.general_utils import user_input, get_current_user - -from client_utils.jsonrpc_client import TrexStatelessClient import trex_status from collections import namedtuple @@ -177,6 +175,7 @@ class TRexConsole(cmd.Cmd): dotext = 'do_'+text return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] + # set verbose on / off def do_verbose (self, line): '''Shows or set verbose mode\n''' @@ -393,9 +392,9 @@ class TRexConsole(cmd.Cmd): print "Example: rpc test_add {'x': 12, 'y': 17}\n" return - res_ok, msg = self.stateless_client.invoke_rpc_method(method, params) + res_ok, msg = self.stateless_client.transmit(method, params) if res_ok: - print "\nServer Response:\n\n" + self.stateless_client.pretty_json(json.dumps(msg)) + "\n" + print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n" else: print "\n*** " + msg + "\n" #print "Please try 'reconnect' to reconnect to server" @@ -409,6 +408,10 @@ class TRexConsole(cmd.Cmd): def do_status (self, line): '''Shows a graphical console\n''' + if not self.stateless_client.is_connected(): + print "Not connected to server\n" + return + self.do_verbose('off') trex_status.show_trex_status(self.stateless_client) @@ -770,8 +773,12 @@ def setParserOptions(): default = 5050, type = int) - parser.add_argument("-u", "--user", help = "User Name [default is random generated]\n", - default = 'user_' + ''.join(random.choice(string.digits) for _ in range(5)), + parser.add_argument("-z", "--pub", help = "TRex Async Publisher Port [default is 4500]\n", + default = 4500, + type = int) + + parser.add_argument("-u", "--user", help = "User Name [default is currently logged in user]\n", + default = get_current_user(), type = str) parser.add_argument("--verbose", dest="verbose", @@ -782,11 +789,10 @@ def setParserOptions(): def main(): parser = setParserOptions() - options = parser.parse_args()#sys.argv[1:]) + options = parser.parse_args() # Stateless client connection - # stateless_client = TrexStatelessClient(options.server, options.port, options.user) - stateless_client = CTRexStatelessClient(options.user, options.server, options.port) + stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) # console try: diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py index 2c5a648f..4e73e0bb 100644 --- a/scripts/automation/trex_control_plane/console/trex_status.py +++ b/scripts/automation/trex_control_plane/console/trex_status.py @@ -11,6 +11,8 @@ import datetime g_curses_active = False +################### utils ################# + # simple percetange show def percentage (a, total): x = int ((float(a) / total) * 100) @@ -18,15 +20,25 @@ def percentage (a, total): # simple float to human readable def float_to_human_readable (size, suffix = "bps"): - for unit in ['','K','M','G']: - if abs(size) < 1024.0: - return "%3.1f %s%s" % (size, unit, suffix) - size /= 1024.0 + for unit in ['','K','M','G','T']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 return "NaN" + +################### panels ################# + # panel object class TrexStatusPanel(object): - def __init__ (self, h, l, y, x, headline): + def __init__ (self, h, l, y, x, headline, status_obj): + + self.status_obj = status_obj + + self.log = status_obj.log + self.stateless_client = status_obj.stateless_client + self.general_stats = status_obj.general_stats + self.h = h self.l = l self.y = y @@ -53,64 +65,26 @@ class TrexStatusPanel(object): return self.win -# total stats (ports + global) -class Stats(): - def __init__ (self, rpc_client, port_list, interval = 100): - - self.rpc_client = rpc_client - - self.port_list = port_list - self.port_stats = {} - - self.interval = interval - self.delay_count = 0 - - def get_port_stats (self, port_id): - if self.port_stats.get(port_id): - return self.port_stats[port_id] - else: - return None - - def query_sync (self): - self.delay_count += 1 - if self.delay_count < self.interval: - return - - self.delay_count = 0 - - # query global stats - - # query port stats - - rc, resp_list = self.rpc_client.get_port_stats(self.port_list) - if not rc: - return - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_stats[self.port_list[i]] = rc[1] - - # various kinds of panels # Server Info Panel class ServerInfoPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:") - - self.status_obj = status_obj + super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) def draw (self): - if self.status_obj.server_version == None: + if not self.status_obj.server_version : + return + + if not self.status_obj.server_sys_info: return - self.clear() - connection_details = self.status_obj.rpc_client.get_connection_details() + self.clear() - self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(connection_details['port']))) + self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", self.status_obj.server_version["build_date"] + " @ " + @@ -123,7 +97,7 @@ class ServerInfoPanel(TrexStatusPanel): self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) - ports_owned = " ".join(str(x) for x in self.status_obj.rpc_client.get_owned_ports()) + ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports) if not ports_owned: ports_owned = "None" @@ -134,26 +108,43 @@ class ServerInfoPanel(TrexStatusPanel): class GeneralInfoPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:") - - self.status_obj = status_obj + super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) def draw (self): - pass + self.clear() + + if not self.general_stats.is_online(): + self.getwin().addstr(3, 2, "No Published Data From TRex Server") + return + + self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) + + self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", + float_to_human_readable(self.general_stats.get("m_tx_bps")), + float_to_human_readable(self.general_stats.get("m_tx_pps"), suffix = "pps"))) + + # missing RX field + #self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", + # float_to_human_readable(self.general_stats.get("m_rx_bps")), + # float_to_human_readable(self.general_stats.get("m_rx_pps"), suffix = "pps"))) + + self.getwin().addstr(7, 2, "{:<30} {:} / {:}".format("Total Tx:", + float_to_human_readable(self.general_stats.get_rel("m_total_tx_bytes"), suffix = "B"), + float_to_human_readable(self.general_stats.get_rel("m_total_tx_pkts"), suffix = "pkts"))) # all ports stats class PortsStatsPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:") + super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) - self.status_obj = status_obj def draw (self): self.clear() + return - owned_ports = self.status_obj.rpc_client.get_owned_ports() + owned_ports = self.status_obj.owned_ports if not owned_ports: self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") return @@ -193,33 +184,23 @@ class PortsStatsPanel(TrexStatusPanel): class ControlPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(ControlPanel, self).__init__(h, l, y, x, "") + super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) - self.status_obj = status_obj def draw (self): self.clear() self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.status_obj.rpc_client.get_port_count() - 1)) + .format(self.status_obj.stateless_client.get_port_count() - 1)) - index = 3 - - cut = len(self.status_obj.log) - 4 - if cut < 0: - cut = 0 - - for l in self.status_obj.log[cut:]: - self.getwin().addstr(index, 2, l) - index += 1 + self.log.draw(self.getwin(), 2, 3) # specific ports panels class SinglePortPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj, port_id): - super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id)) + super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) - self.status_obj = status_obj self.port_id = port_id def draw (self): @@ -227,7 +208,7 @@ class SinglePortPanel(TrexStatusPanel): self.clear() - if not self.port_id in self.status_obj.rpc_client.get_owned_ports(): + if not self.port_id in self.status_obj.stateless_client.get_owned_ports(): self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) return @@ -292,96 +273,122 @@ class SinglePortPanel(TrexStatusPanel): y += 2 -# status object -class TrexStatus(): - def __init__ (self, stdscr, rpc_client): - self.stdscr = stdscr +################### main objects ################# + +# status log +class TrexStatusLog(): + def __init__ (self): self.log = [] - self.rpc_client = rpc_client - self.snapshot = self.rpc_client.snapshot() + def add_event (self, msg): + self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + + def draw (self, window, x, y, max_lines = 4): + index = y - # fetch server info - self.get_server_info() + cut = len(self.log) - max_lines + if cut < 0: + cut = 0 + + for msg in self.log[cut:]: + window.addstr(index, x, msg) + index += 1 - # create stats objects - self.stats = Stats(rpc_client, self.rpc_client.get_owned_ports()) +# status commands +class TrexStatusCommands(): + def __init__ (self, status_object): + + self.status_object = status_object + + self.stateless_client = status_object.stateless_client + self.log = self.status_object.log - # register actions self.actions = {} - self.actions[ord('q')] = self.action_quit - self.actions[ord('p')] = self.action_ping - self.actions[ord('f')] = self.action_freeze + self.actions[ord('q')] = self._quit + self.actions[ord('p')] = self._ping + self.actions[ord('f')] = self._freeze - self.actions[ord('g')] = self.action_show_ports_stats + self.actions[ord('g')] = self._show_ports_stats - for port_id in xrange(0, self.rpc_client.get_port_count()): - self.actions[ord('0') + port_id] = self.action_show_port_generator(port_id) + # register all the available ports shortcuts + for port_id in xrange(0, self.stateless_client.get_port_count()): + self.actions[ord('0') + port_id] = self._show_port_generator(port_id) + + + # handle a key pressed + def handle (self, ch): + if ch in self.actions: + return self.actions[ch]() + else: + self.log.add_event("Unknown key pressed, please see legend") + return True + + # show all ports + def _show_ports_stats (self): + self.log.add_event("Switching to all ports view") + self.status_object.stats_panel = self.status_object.ports_stats_panel - - # all ports stats - def action_show_ports_stats (self): - self.add_log_event("Switching to all ports view") - self.stats_panel = self.ports_stats_panel - return True - # function generator for different ports requests - def action_show_port_generator (self, port_id): - def action_show_port(): - self.add_log_event("Switching panel to port {0}".format(port_id)) - self.stats_panel = self.ports_panels[port_id] + + # function generator for different ports requests + def _show_port_generator (self, port_id): + def _show_port(): + self.log.add_event("Switching panel to port {0}".format(port_id)) + self.status_object.stats_panel = self.status_object.ports_panels[port_id] return True - return action_show_port + return _show_port - def action_freeze (self): - self.update_active = not self.update_active - self.add_log_event("Update continued" if self.update_active else "Update stopped") + def _freeze (self): + self.status_object.update_active = not self.status_object.update_active + self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") return True - def action_quit(self): + def _quit(self): return False - def action_ping (self): - self.add_log_event("Pinging RPC server") + def _ping (self): + self.log.add_event("Pinging RPC server") - rc, msg = self.rpc_client.ping_rpc_server() + rc, msg = self.stateless_client.ping() if rc: - self.add_log_event("Server replied: '{0}'".format(msg)) + self.log.add_event("Server replied: '{0}'".format(msg)) else: - self.add_log_event("Failed to get reply") + self.log.add_event("Failed to get reply") return True - def get_server_info (self): - - self.server_version = self.rpc_client.get_rpc_server_version() - self.server_sys_info = self.rpc_client.get_system_info() - +# status object +# +# +# +class TrexStatus(): + def __init__ (self, stdscr, stateless_client): + self.stdscr = stdscr - def add_log_event (self, msg): - self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + self.stateless_client = stateless_client - # control panel - def update_control (self): - self.control_panel.clear() + self.log = TrexStatusLog() + self.cmds = TrexStatusCommands(self) - self.control_panel.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.rpc_client.get_port_count() - 1)) + self.general_stats = stateless_client.get_stats_async().get_general_stats() - index = 3 + # fetch server info + rc, self.server_sys_info = self.stateless_client.get_system_info() + if not rc: + return - cut = len(self.log) - 4 - if cut < 0: - cut = 0 + rc, self.server_version = self.stateless_client.get_version() + if not rc: + return - for l in self.log[cut:]: - self.control_panel.getwin().addstr(index, 2, l) - index += 1 + self.owned_ports = self.stateless_client.get_acquired_ports() + + def generate_layout (self): self.max_y = self.stdscr.getmaxyx()[0] self.max_x = self.stdscr.getmaxyx()[1] @@ -394,7 +401,7 @@ class TrexStatus(): self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) self.ports_panels = {} - for i in xrange(0, self.rpc_client.get_port_count()): + for i in xrange(0, self.stateless_client.get_port_count()): self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) # at start time we point to the main one @@ -411,14 +418,8 @@ class TrexStatus(): # no key , continue if ch == curses.ERR: return True - - # check for registered function - if ch in self.actions: - return self.actions[ch]() - else: - self.add_log_event("Unknown key pressed, please see legend") - - return True + + return self.cmds.handle(ch) # main run entry point def run (self): @@ -454,14 +455,14 @@ class TrexStatus(): sleep(0.01) -def show_trex_status_internal (stdscr, rpc_client): - trex_status = TrexStatus(stdscr, rpc_client) +def show_trex_status_internal (stdscr, stateless_client): + trex_status = TrexStatus(stdscr, stateless_client) trex_status.run() -def show_trex_status (rpc_client): +def show_trex_status (stateless_client): try: - curses.wrapper(show_trex_status_internal, rpc_client) + curses.wrapper(show_trex_status_internal, stateless_client) except KeyboardInterrupt: curses.endwin() diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp index a529d637..e3145f2a 100755 --- a/src/bp_gtest.cpp +++ b/src/bp_gtest.cpp @@ -205,7 +205,7 @@ public: } } - lpt->generate_erf(buf,CGlobalInfo::m_options.preview); + lpt->start_generate_stateful(buf,CGlobalInfo::m_options.preview); lpt->m_node_gen.DumpHist(stdout); cmp.d_sec = m_time_diff; @@ -2031,6 +2031,10 @@ public: virtual int send_node(CGenNode * node); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + /** * flush all pending packets into the stream @@ -2105,7 +2109,7 @@ public: int i; for (i=0; i<m_threads; i++) { lpt=fl.m_threads_info[i]; - lpt->generate_erf("t1",CGlobalInfo::m_options.preview); + lpt->start_generate_stateful("t1",CGlobalInfo::m_options.preview); } fl.Delete(); return (true); @@ -2755,7 +2759,7 @@ TEST_F(gt_ring, ring1) { TEST_F(gt_ring, ring2) { CMessagingManager ringmg; - ringmg.Create(8); + ringmg.Create(8, "test"); int i; for (i=0; i<8; i++) { diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index c3581c55..92beab91 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -25,6 +25,7 @@ limitations under the License. #include "msg_manager.h" #include <common/basic_utils.h> +#include <trex_stream_node.h> #undef VALG @@ -279,7 +280,7 @@ void CPlatformSocketInfoConfig::dump(FILE *fd){ fprintf(fd," \n"); fprintf(fd," active sockets : %d \n",max_num_active_sockets()); - fprintf(fd," ports_sockets : \n",max_num_active_sockets()); + fprintf(fd," ports_sockets : %d \n",max_num_active_sockets()); for (i=0; i<(MAX_LATENCY_PORTS); i++) { fprintf(fd,"%d,",port_to_socket(i)); @@ -3114,10 +3115,6 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ return (0); } -int CNodeGenerator::flush_one_node_to_file(CGenNode * node){ - BP_ASSERT(m_v_if); - return (m_v_if->send_node(node)); -} int CNodeGenerator::update_stats(CGenNode * node){ if ( m_preview_mode.getVMode() >2 ){ @@ -3204,6 +3201,10 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, assert(m_ring_from_rx); assert(m_ring_to_rx); + + /* create the info required for stateless DP core */ + m_stateless_dp_info = new TrexStatelessDpCore(thread_id, this); + return (true); } @@ -3351,6 +3352,9 @@ void CFlowGenListPerThread::Delete(){ m_node_gen.Delete(); Clean(); m_cpu_cp_u.Delete(); + + delete m_stateless_dp_info; + m_stateless_dp_info = NULL; } @@ -3416,7 +3420,8 @@ int CNodeGenerator::flush_file(dsec_t max_time, if ( likely ( m_is_realtime ) ){ dsec_t dt ; thread->m_cpu_dp_u.commit(); - bool once=false; + + thread->check_msgs(); while ( true ) { dt = now_sec() - n_time ; @@ -3425,12 +3430,6 @@ int CNodeGenerator::flush_file(dsec_t max_time, break; } - if (!once) { - /* check the msg queue once */ - thread->check_msgs(); - once=true; - } - rte_pause(); } thread->m_cpu_dp_u.start_work(); @@ -3449,55 +3448,69 @@ int CNodeGenerator::flush_file(dsec_t max_time, flush_time=now_sec(); } } + #ifndef RTE_DPDK - thread->check_msgs(); + thread->check_msgs(); #endif uint8_t type=node->m_type; - if ( likely( type == CGenNode::FLOW_PKT ) ) { - /* PKT */ - if ( !(node->is_repeat_flow()) || (always==false)) { - flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif - } - m_p_queue.pop(); - if ( node->is_last_in_flow() ) { - if ((node->is_repeat_flow()) && (always==false)) { - /* Flow is repeated, reschedule it */ - thread->reschedule_flow( node); - }else{ - /* Flow will not be repeated, so free node */ - thread->free_last_flow_node( node); - } - }else{ - node->update_next_pkt_in_flow(); - m_p_queue.push(node); - } + if ( type == CGenNode::STATELESS_PKT ) { + m_p_queue.pop(); + CGenNodeStateless *node_sl = (CGenNodeStateless *)node; + + /* if the stream has been deactivated - end */ + if (unlikely(!node_sl->is_active())) { + thread->free_node(node); + } else { + node_sl->handle(thread); + } + }else{ - if ((type == CGenNode::FLOW_FIF)) { - /* callback to our method */ + if ( likely( type == CGenNode::FLOW_PKT ) ) { + /* PKT */ + if ( !(node->is_repeat_flow()) || (always==false)) { + flush_one_node_to_file(node); + #ifdef _DEBUG + update_stats(node); + #endif + } m_p_queue.pop(); - if ( always == false) { - thread->m_cur_time_sec = node->m_time ; - - if ( thread->generate_flows_roundrobin(&done) <0){ - break; + if ( node->is_last_in_flow() ) { + if ((node->is_repeat_flow()) && (always==false)) { + /* Flow is repeated, reschedule it */ + thread->reschedule_flow( node); + }else{ + /* Flow will not be repeated, so free node */ + thread->free_last_flow_node( node); } - if (!done) { - node->m_time +=d_time; - m_p_queue.push(node); + }else{ + node->update_next_pkt_in_flow(); + m_p_queue.push(node); + } + }else{ + if ((type == CGenNode::FLOW_FIF)) { + /* callback to our method */ + m_p_queue.pop(); + if ( always == false) { + thread->m_cur_time_sec = node->m_time ; + + if ( thread->generate_flows_roundrobin(&done) <0){ + break; + } + if (!done) { + node->m_time +=d_time; + m_p_queue.push(node); + }else{ + thread->free_node(node); + } }else{ thread->free_node(node); } + }else{ - thread->free_node(node); + handle_slow_messages(type,node,thread,always); } - - }else{ - handle_slow_messages(type,node,thread,always); } } } @@ -3513,16 +3526,16 @@ int CNodeGenerator::flush_file(dsec_t max_time, } void CNodeGenerator::handle_slow_messages(uint8_t type, - CGenNode * node, - CFlowGenListPerThread * thread, + CGenNode * node, + CFlowGenListPerThread * thread, bool always){ if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) { m_p_queue.pop(); thread->handler_defer_job(node); thread->free_node(node); - }else{ - if (type == CGenNode::FLOW_PKT_NAT) { + + } else if (type == CGenNode::FLOW_PKT_NAT) { /*repeat and NAT is not supported */ if ( node->is_nat_first_state() ){ node->set_nat_wait_state(); @@ -3556,24 +3569,28 @@ void CNodeGenerator::handle_slow_messages(uint8_t type, m_p_queue.push(node); } - }else{ - if ( type == CGenNode::FLOW_SYNC ){ - thread->check_msgs(); /* check messages */ - m_v_if->flush_tx_queue(); /* flush pkt each timeout */ - m_p_queue.pop(); - if ( always == false) { - node->m_time += SYNC_TIME_OUT; - m_p_queue.push(node); - }else{ - thread->free_node(node); - } + } else if ( type == CGenNode::FLOW_SYNC ) { + + m_p_queue.pop(); + thread->check_msgs(); /* check messages */ + m_v_if->flush_tx_queue(); /* flush pkt each timeout */ + + if (always == false) { + node->m_time += SYNC_TIME_OUT; + m_p_queue.push(node); }else{ - printf(" ERROR type is not valid %d \n",type); - assert(0); + thread->free_node(node); } + + /* must be the last section of processing */ + } else if ( type == CGenNode::EXIT_SCHED ) { + remove_all(thread); + + } else { + printf(" ERROR type is not valid %d \n",type); + assert(0); } - } } @@ -3809,11 +3826,15 @@ void CFlowGenListPerThread::handel_nat_msg(CGenNodeNatInfo * msg){ } } +void CFlowGenListPerThread::check_msgs(void) { -void CFlowGenListPerThread::check_msgs(void){ - if ( likely ( m_ring_from_rx->isEmpty() ) ){ + /* inlined for performance */ + m_stateless_dp_info->periodic_check_for_cp_messages(); + + if ( likely ( m_ring_from_rx->isEmpty() ) ) { return; } + #ifdef NAT_TRACE_ printf(" %.03f got message from RX \n",now_sec()); #endif @@ -3833,9 +3854,11 @@ void CFlowGenListPerThread::check_msgs(void){ case CGenNodeMsgBase::NAT_FIRST: handel_nat_msg((CGenNodeNatInfo * )msg); break; + case CGenNodeMsgBase::LATENCY_PKT: handel_latecy_pkt_msg((CGenNodeLatencyPktInfo *) msg); break; + default: printf("ERROR pkt-thread message type is not valid %d \n",msg_type); assert(0); @@ -3845,8 +3868,48 @@ void CFlowGenListPerThread::check_msgs(void){ } } +void delay(int msec); + + +const uint8_t test_udp_pkt[]={ + 0x00,0x00,0x00,0x01,0x00,0x00, + 0x00,0x00,0x00,0x01,0x00,0x00, + 0x08,0x00, + + 0x45,0x00,0x00,0x81, + 0xaf,0x7e,0x00,0x00, + 0x12,0x11,0xd9,0x23, + 0x01,0x01,0x01,0x01, + 0x3d,0xad,0x72,0x1b, + + 0x11,0x11, + 0x11,0x11, + + 0x00,0x6d, + 0x00,0x00, + + 0x64,0x31,0x3a,0x61, + 0x64,0x32,0x3a,0x69,0x64, + 0x32,0x30,0x3a,0xd0,0x0e, + 0xa1,0x4b,0x7b,0xbd,0xbd, + 0x16,0xc6,0xdb,0xc4,0xbb,0x43, + 0xf9,0x4b,0x51,0x68,0x33,0x72, + 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, + 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, + 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, + 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, + 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, + 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, + 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, + 0xe7 +}; + +void CFlowGenListPerThread::start_stateless_daemon(){ + m_stateless_dp_info->start(); +} + -void CFlowGenListPerThread::generate_erf(std::string erf_file_name, +void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name, CPreviewMode & preview){ /* now we are ready to generate*/ if ( m_cap_gen.size()==0 ){ @@ -3963,6 +4026,7 @@ int CFlowGenList::load_from_mac_file(std::string file_name) { exit(-1); } + return (0); } diff --git a/src/bp_sim.h b/src/bp_sim.h index b7cfb20b..af084757 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -57,6 +57,8 @@ limitations under the License. #include <arpa/inet.h> #include "platform_cfg.h" +#include <trex_stateless_dp_core.h> + #undef NAT_TRACE_ @@ -360,6 +362,16 @@ public: */ virtual int flush_tx_queue(void)=0; + + /** + * update the source and destination mac-addr of a given mbuf by global database + * + * @param dir + * @param m + * + * @return + */ + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m)=0; public: @@ -721,6 +733,7 @@ public: m_run_mode = RUN_MODE_INVALID; } + CPreviewMode preview; float m_factor; float m_duration; @@ -741,9 +754,10 @@ public: uint16_t m_run_flags; uint8_t m_mac_splitter; uint8_t m_pad; - trex_run_mode_e m_run_mode; + + std::string cfg_file; std::string mac_file; std::string platform_cfg_file; @@ -778,6 +792,10 @@ public: return ( m_latency_rate == 0 ?true:false); } + bool is_stateless(){ + return (m_run_mode == RUN_MODE_INTERACTIVE ?true:false); + } + bool is_latency_enabled(){ return ( !is_latency_disabled() ); } @@ -1181,12 +1199,15 @@ public: public: static CRteMemPool m_mem_pool[MAX_SOCKETS_SUPPORTED]; - static uint32_t m_nodes_pool_size; - static CParserOption m_options; - static CGlobalMemory m_memory_cfg; - static CPlatformSocketInfo m_socket; + static uint32_t m_nodes_pool_size; + static CParserOption m_options; + static CGlobalMemory m_memory_cfg; + static CPlatformSocketInfo m_socket; }; +static inline int get_is_stateless(){ + return (CGlobalInfo::m_options.is_stateless() ); +} static inline int get_is_rx_check_mode(){ return (CGlobalInfo::m_options.preview.get_is_rx_check_enable() ?1:0); @@ -1333,15 +1354,18 @@ class CCapFileFlowInfo ; /* this is a simple struct, do not add constructor and destractor here! we are optimizing the allocation dealocation !!! */ -struct CGenNode { + +struct CGenNodeBase { public: enum { - FLOW_PKT=0, - FLOW_FIF=1, - FLOW_DEFER_PORT_RELEASE=2, - FLOW_PKT_NAT=3, - FLOW_SYNC=4 /* called evey 1 msec */ + FLOW_PKT =0, + FLOW_FIF =1, + FLOW_DEFER_PORT_RELEASE =2, + FLOW_PKT_NAT =3, + FLOW_SYNC =4, /* called evey 1 msec */ + STATELESS_PKT =5, + EXIT_SCHED =6 }; @@ -1349,7 +1373,7 @@ public: enum { NODE_FLAGS_DIR =1, NODE_FLAGS_MBUF_CACHE =2, - NODE_FLAGS_SAMPLE_RX_CHECK =4, + NODE_FLAGS_SAMPLE_RX_CHECK =4, NODE_FLAGS_LEARN_MODE =8, /* bits 3,4 MASK 0x18 wait for second direction packet */ NODE_FLAGS_LEARN_MSG_PROCESSED =0x10, /* got NAT msg */ @@ -1360,19 +1384,49 @@ public: NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100 /* init packet start from server side with server addr */ }; + public: - /* C1 */ + /*********************************************/ + /* C1 must */ uint8_t m_type; uint8_t m_thread_id; /* zero base */ uint8_t m_socket_id; - uint8_t m_pad2; + uint8_t m_pad2; uint16_t m_src_port; uint16_t m_flags; /* BIT 0 - DIR , BIT 1 - mbug_cache BIT 2 - SAMPLE DUPLICATE */ - double m_time; + double m_time; /* can't change this header - size 16 bytes*/ + +public: + bool operator <(const CGenNodeBase * rsh ) const { + return (m_time<rsh->m_time); + } + bool operator ==(const CGenNodeBase * rsh ) const { + return (m_time==rsh->m_time); + } + bool operator >(const CGenNodeBase * rsh ) const { + return (m_time>rsh->m_time); + } + +public: + void set_socket_id(socket_id_t socket){ + m_socket_id=socket; + } + + socket_id_t get_socket_id(){ + return ( m_socket_id ); + } + + +}; + + +struct CGenNode : public CGenNodeBase { + +public: uint32_t m_src_ip; /* client ip */ uint32_t m_dest_ip; /* server ip */ @@ -1402,25 +1456,8 @@ public: uint32_t m_end_of_cache_line[6]; public: - bool operator <(const CGenNode * rsh ) const { - return (m_time<rsh->m_time); - } - bool operator ==(const CGenNode * rsh ) const { - return (m_time==rsh->m_time); - } - bool operator >(const CGenNode * rsh ) const { - return (m_time>rsh->m_time); - } -public: void Dump(FILE *fd); - void set_socket_id(socket_id_t socket){ - m_socket_id=socket; - } - - socket_id_t get_socket_id(){ - return ( m_socket_id ); - } static void DumpHeader(FILE *fd); @@ -1603,6 +1640,7 @@ public: } __rte_cache_aligned; + #if __x86_64__ /* size of 64 bytes */ #define DEFER_CLIENTS_NUM (16) @@ -1647,19 +1685,29 @@ public: need to clean this up and derive this objects from base object but require too much refactoring right now hhaim */ + +#define COMPARE_NODE_OBJECT(NODE_NAME) if ( sizeof(NODE_NAME) != sizeof(CGenNode) ) { \ + printf("ERROR sizeof(%s) %lu != sizeof(CGenNode) %lu must be the same size \n",#NODE_NAME,sizeof(NODE_NAME),sizeof(CGenNode)); \ + assert(0); \ + }\ + if ( (int)offsetof(struct NODE_NAME,m_type)!=offsetof(struct CGenNodeBase,m_type) ){\ + printf("ERROR offsetof(struct %s,m_type)!=offsetof(struct CGenNodeBase,m_type) \n",#NODE_NAME);\ + assert(0);\ + }\ + if ( (int)offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNodeBase,m_time) ){\ + printf("ERROR offsetof(struct %s,m_time)!=offsetof(struct CGenNodeBase,m_time) \n",#NODE_NAME);\ + assert(0);\ + } + +#define COMPARE_NODE_OBJECT_SIZE(NODE_NAME) if ( sizeof(NODE_NAME) != sizeof(CGenNode) ) { \ + printf("ERROR sizeof(%s) %lu != sizeof(CGenNode) %lu must be the same size \n",#NODE_NAME,sizeof(NODE_NAME),sizeof(CGenNode)); \ + assert(0); \ + } + + + inline int check_objects_sizes(void){ - if ( sizeof(CGenNodeDeferPort) != sizeof(CGenNode) ) { - printf("ERROR sizeof(CGenNodeDeferPort) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeDeferPort),sizeof(CGenNode)); - assert(0); - } - if ( (int)offsetof(struct CGenNodeDeferPort,m_type)!=offsetof(struct CGenNode,m_type) ){ - printf("ERROR offsetof(struct CGenNodeDeferPort,m_type)!=offsetof(struct CGenNode,m_type) \n"); - assert(0); - } - if ( (int)offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNode,m_time) ){ - printf("ERROR offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNode,m_time) \n"); - assert(0); - } + COMPARE_NODE_OBJECT(CGenNodeDeferPort); return (0); } @@ -1718,6 +1766,11 @@ public: virtual int write_pkt(CCapPktRaw *pkt_raw); virtual int close_file(void); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + + /** * send one packet @@ -1779,6 +1832,10 @@ public: return (0); } + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + virtual int send_node(CGenNode * node); @@ -1799,6 +1856,7 @@ public: CFlowGenListPerThread * Parent(){ return (m_parent); } + public: void add_node(CGenNode * mynode); void remove_all(CFlowGenListPerThread * thread); @@ -1830,7 +1888,9 @@ public: private: - int flush_one_node_to_file(CGenNode * node); + inline int flush_one_node_to_file(CGenNode * node){ + return (m_v_if->send_node(node)); + } int update_stats(CGenNode * node); FORCE_NO_INLINE void handle_slow_messages(uint8_t type, CGenNode * node, @@ -3325,13 +3385,21 @@ public : inline CGenNode * create_node(void); + inline CGenNodeStateless * create_node_sl(void){ + return ((CGenNodeStateless*)create_node() ); + } + + inline void free_node(CGenNode *p); inline void free_last_flow_node(CGenNode *p); public: void Clean(); - void generate_erf(std::string erf_file_name,CPreviewMode &preview); + void start_generate_stateful(std::string erf_file_name,CPreviewMode &preview); + void start_stateless_daemon(); + + void Dump(FILE *fd); void DumpCsv(FILE *fd); void DumpStats(FILE *fd); @@ -3344,6 +3412,7 @@ public: private: void check_msgs(void); + void handel_nat_msg(CGenNodeNatInfo * msg); void handel_latecy_pkt_msg(CGenNodeLatencyPktInfo * msg); @@ -3421,7 +3490,12 @@ private: CNodeRing * m_ring_to_rx; /* ring dp -> latency thread */ flow_id_node_t m_flow_id_to_node_lookup; -}; + + TrexStatelessDpCore *m_stateless_dp_info; + +private: + uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech +} __rte_cache_aligned ; inline CGenNode * CFlowGenListPerThread::create_node(void){ CGenNode * res; diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp index 250d5342..6b8e3eff 100644 --- a/src/gtest/rpc_test.cpp +++ b/src/gtest/rpc_test.cpp @@ -30,6 +30,8 @@ limitations under the License. using namespace std; +uint16_t gtest_get_mock_server_port(); + class RpcTest : public testing::Test { protected: @@ -44,7 +46,12 @@ protected: m_context = zmq_ctx_new (); m_socket = zmq_socket (m_context, ZMQ_REQ); - zmq_connect (m_socket, "tcp://localhost:5050"); + + std::stringstream ss; + ss << "tcp://localhost:"; + ss << gtest_get_mock_server_port(); + + zmq_connect (m_socket, ss.str().c_str()); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h new file mode 100644 index 00000000..5c2d42d2 --- /dev/null +++ b/src/internal_api/trex_platform_api.h @@ -0,0 +1,130 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef __TREX_PLATFORM_API_H__ +#define __TREX_PLATFORM_API_H__ + +#include <stdint.h> + +/** + * Global stats + * + * @author imarom (06-Oct-15) + */ +class TrexPlatformGlobalStats { +public: + TrexPlatformGlobalStats() { + m_stats = {0}; + } + + struct { + double m_cpu_util; + + double m_tx_bps; + double m_rx_bps; + + double m_tx_pps; + double m_rx_pps; + + uint64_t m_total_tx_pkts; + uint64_t m_total_rx_pkts; + + uint64_t m_total_tx_bytes; + uint64_t m_total_rx_bytes; + + uint64_t m_tx_rx_errors; + } m_stats; +}; + +/** + * Per Interface stats + * + * @author imarom (26-Oct-15) + */ +class TrexPlatformInterfaceStats { + +public: + TrexPlatformInterfaceStats() { + m_stats = {0}; + } + +public: + + struct { + + double m_tx_bps; + double m_rx_bps; + + double m_tx_pps; + double m_rx_pps; + + uint64_t m_total_tx_pkts; + uint64_t m_total_rx_pkts; + + uint64_t m_total_tx_bytes; + uint64_t m_total_rx_bytes; + + uint64_t m_tx_rx_errors; + } m_stats; +}; + + +/** + * low level API interface + * can be implemented by DPDK or mock + * + * @author imarom (25-Oct-15) + */ + +class TrexPlatformApi { +public: + virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0; + virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0; + virtual uint8_t get_dp_core_count() const = 0; + virtual ~TrexPlatformApi() {} +}; + + +/** + * DPDK implementation of the platform API + * + * @author imarom (26-Oct-15) + */ +class TrexDpdkPlatformApi : public TrexPlatformApi { +public: + void get_global_stats(TrexPlatformGlobalStats &stats) const; + void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; + uint8_t get_dp_core_count() const; +}; + +/** + * MOCK implementation of the platform API + * + * @author imarom (26-Oct-15) + */ +class TrexMockPlatformApi : public TrexPlatformApi { +public: + void get_global_stats(TrexPlatformGlobalStats &stats) const; + void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; + uint8_t get_dp_core_count() const; +}; + +#endif /* __TREX_PLATFORM_API_H__ */ diff --git a/src/main.cpp b/src/main.cpp index 96789cdd..bd64c5a4 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -185,25 +185,6 @@ int curent_time(){ #include <pthread.h> -void delay(int msec){ - - if (msec == 0) - {//user that requested that probebly wanted the minimal delay - //but because of scaling problem he have got 0 so we will give the min delay - //printf("\n\n\nERROR-Task delay ticks == 0 found in task %s task id = %d\n\n\n\n", - // SANB_TaskName(SANB_TaskIdSelf()), SANB_TaskIdSelf()); - msec =1; - - } - - struct timespec time1, remain; // 2 sec max delay - time1.tv_sec=msec/1000; - time1.tv_nsec=(msec - (time1.tv_sec*1000))*1000000; - - nanosleep(&time1,&remain); -} - - struct per_thread_t { pthread_t tid; }; @@ -234,7 +215,7 @@ void * thread_task(void *info){ char buf[100]; sprintf(buf,"my%d.erf",obj->thread_id); volatile int i; - lpt->generate_erf(buf,*obj->preview_info); + lpt->start_generate_stateful(buf,*obj->preview_info); lpt->m_node_gen.DumpHist(stdout); printf("end thread %d \n",obj->thread_id); } @@ -325,7 +306,7 @@ void test_load_list_of_cap_files(CParserOption * op){ lpt=fl.m_threads_info[i]; char buf[100]; sprintf(buf,"my%d.erf",i); - lpt->generate_erf(buf,op->preview); + lpt->start_generate_stateful(buf,op->preview); lpt->m_node_gen.DumpHist(stdout); } //sprintf(buf,"my%d.erf",7); @@ -353,7 +334,7 @@ int load_list_of_cap_files(CParserOption * op){ lpt->set_vif(&erf_vif); if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) { - lpt->generate_erf(op->out_file,op->preview); + lpt->start_generate_stateful(op->out_file,op->preview); } lpt->m_node_gen.DumpHist(stdout); diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index a0af9fdf..d4e07ef2 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -55,7 +55,10 @@ limitations under the License. #include <common/arg/SimpleGlob.h> #include <common/arg/SimpleOpt.h> #include <common/basic_utils.h> + #include <stateless/cp/trex_stateless.h> +#include <stateless/dp/trex_stream_node.h> + #include <../linux_dpdk/version.h> extern "C" { @@ -73,6 +76,7 @@ extern "C" { #include "msg_manager.h" #include "platform_cfg.h" +#include <internal_api/trex_platform_api.h> #define RX_CHECK_MIX_SAMPLE_RATE 8 #define RX_CHECK_MIX_SAMPLE_RATE_1G 2 @@ -104,8 +108,6 @@ extern "C" int vmxnet3_xmit_set_callback(rte_mbuf_convert_to_one_seg_t cb); #define RTE_TEST_TX_DESC_DEFAULT 512 #define RTE_TEST_RX_DESC_DROP 0 - - static inline int get_vm_one_queue_enable(){ return (CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ?1:0); } @@ -928,42 +930,32 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t } } - return 0; -} - - -int main_test(int argc , char * argv[]); - - - - -void delay(int msec){ - - if (msec == 0) - {//user that requested that probebly wanted the minimal delay - //but because of scaling problem he have got 0 so we will give the min delay - //printf("\n\n\nERROR-Task delay ticks == 0 found in task %s task id = %d\n\n\n\n", - // SANB_TaskName(SANB_TaskIdSelf()), SANB_TaskIdSelf()); - msec =1; + if ( get_is_stateless() ) { + if ( po->preview.get_is_rx_check_enable() ) { + parse_err("Rx check is not supported with interactive mode "); + } - } + if ( (! po->is_latency_disabled()) || (po->preview.getOnlyLatency()) ){ + parse_err("Latecny check is not supported with interactive mode "); + } - struct timespec time1, remain; // 2 sec max delay - time1.tv_sec=msec/1000; - time1.tv_nsec=(msec - (time1.tv_sec*1000))*1000000; + if ( po->preview.getSingleCore() ){ + parse_err("single core is not supported with interactive mode "); + } - nanosleep(&time1,&remain); + } + return 0; } +int main_test(int argc , char * argv[]); + static const char * default_argv[] = {"xx","-c", "0x7", "-n","2","-b","0000:0b:01.01"}; static int argv_num = 7; - - #define RX_PTHRESH 8 /**< Default values of RX prefetch threshold reg. */ #define RX_HTHRESH 8 /**< Default values of RX host threshold reg. */ #define RX_WTHRESH 4 /**< Default values of RX write-back threshold reg. */ @@ -1855,6 +1847,8 @@ public: bool process_rx_pkt(pkt_dir_t dir,rte_mbuf_t * m); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m); + public: void GetCoreCounters(CVirtualIFPerSideStats *stats); @@ -1867,7 +1861,7 @@ public: return ( CGlobalInfo::m_socket.port_to_socket( m_ports[0].m_port->get_port_id() ) ); } -private: +protected: int send_burst(CCorePerPort * lp_port, uint16_t len, @@ -1878,13 +1872,21 @@ private: -private: +protected: uint8_t m_core_id; uint16_t m_mbuf_cache; CCorePerPort m_ports[CS_NUM]; /* each core has 2 tx queues 1. client side and server side */ CNodeRing * m_ring_to_rx; + +} __rte_cache_aligned; ; + +class CCoreEthIFStateless : public CCoreEthIF { +public: + virtual int send_node(CGenNode * node); }; + + bool CCoreEthIF::Create(uint8_t core_id, uint16_t tx_client_queue_id, CPhyEthIF * tx_client_port, @@ -1985,6 +1987,7 @@ void CCoreEthIF::flush_rx_queue(void){ } } + int CCoreEthIF::flush_tx_queue(void){ /* flush both sides */ pkt_dir_t dir ; @@ -2140,6 +2143,23 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){ } + +int CCoreEthIFStateless::send_node(CGenNode * no){ + CGenNodeStateless * node_sl=(CGenNodeStateless *) no; + + /* check that we have mbuf */ + rte_mbuf_t * m=node_sl->get_cache_mbuf(); + assert( m ); + pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir(); + CCorePerPort * lp_port=&m_ports[dir]; + CVirtualIFPerSideStats * lp_stats = &m_stats[dir]; + rte_pktmbuf_refcnt_update(m,1); + send_pkt(lp_port,m,lp_stats); + return (0); +}; + + + int CCoreEthIF::send_node(CGenNode * node){ if ( unlikely( node->get_cache_mbuf() !=NULL ) ) { @@ -2233,6 +2253,19 @@ int CCoreEthIF::send_node(CGenNode * node){ } +int CCoreEthIF::update_mac_addr_from_global_cfg(pkt_dir_t dir, + rte_mbuf_t *m){ + assert(m); + assert(dir<2); + CCorePerPort * lp_port=&m_ports[dir]; + uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*); + uint8_t p_id=lp_port->m_port->get_port_id(); + + memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12); + return (0); +} + + class CLatencyHWPort : public CPortLatencyHWBase { public: @@ -2719,10 +2752,10 @@ void CGlobalStats::Dump(FILE *fd,DumpFormat mode){ -struct CGlobalPortCfg { +struct CGlobalTRex { public: - CGlobalPortCfg (){ + CGlobalTRex (){ m_max_ports=4; m_max_cores=1; m_cores_to_dual_ports=0; @@ -2732,10 +2765,11 @@ public: m_expected_pps=0.0; m_expected_cps=0.0; m_expected_bps=0.0; + m_trex_stateless = NULL; } public: - bool Create(bool is_stateless); + bool Create(); void Delete(); int ixgbe_prob_init(); @@ -2754,6 +2788,8 @@ public: public: int start_send_master(); + int start_master_stateless(); + int run_in_core(virtual_thread_id_t virt_core_id); int stop_core(virtual_thread_id_t virt_core_id); @@ -2800,7 +2836,6 @@ public: - int test_send1(); int rcv_send(int port,int queue_id); int rcv_send_all(int queue_id); @@ -2866,7 +2901,9 @@ public: CPhyEthIF m_ports[BP_MAX_PORTS]; - CCoreEthIF m_cores_vif[BP_MAX_CORES]; /* counted from 1 , 2,3 core zero is reserve*/ + CCoreEthIF m_cores_vif_sf[BP_MAX_CORES]; /* counted from 1 , 2,3 core zero is reserve - stateful */ + CCoreEthIFStateless m_cores_vif_sl[BP_MAX_CORES]; /* counted from 1 , 2,3 core zero is reserve - stateless*/ + CCoreEthIF * m_cores_vif[BP_MAX_CORES]; CParserOption m_po ; @@ -2889,54 +2926,14 @@ private: CLatencyPktInfo m_latency_pkt; CZMqPublisher m_zmq_publisher; -}; - +public: + TrexStateless *m_trex_stateless; +}; -int CGlobalPortCfg::test_send1(){ - - CParserOption po ; - CFlowGenList fl; - - po.cfg_file = "cap2/dns.yaml"; - //po.cfg_file = "cap2/sfr3.yaml"; - //po.cfg_file = "cap2/sfr4.yaml"; - //po.cfg_file = "cap2/sfr.yaml"; - - po.preview.setVMode(3); - po.preview.setFileWrite(true); - - fl.Create(); - - fl.load_from_yaml(po.cfg_file,1); - //fl.DumpPktSize(); - - fl.generate_p_thread_info(1); - CFlowGenListPerThread * lpt; - - int i; - for (i=0; i<1; i++) { - lpt = fl.m_threads_info[i]; - //CNullIF * erf_vif = new CNullIF(); - CVirtualIF * erf_vif = &m_cores_vif[0]; - lpt->set_vif(erf_vif); - lpt->generate_erf("hey",po.preview); - lpt->m_node_gen.DumpHist(stdout); - lpt->DumpStats(stdout); - } - - m_cores_vif[0].flush_tx_queue(); - delay(1000); - //fprintf(stdout," drop : %llu \n",m_test_drop); - - m_cores_vif[0].DumpCoreStats(stdout); - m_cores_vif[0].DumpIfStats(stdout); - - fl.Delete(); -} -int CGlobalPortCfg::rcv_send(int port,int queue_id){ +int CGlobalTRex::rcv_send(int port,int queue_id){ CPhyEthIF * lp=&m_ports[port]; rte_mbuf_t * rx_pkts[32]; @@ -2955,7 +2952,7 @@ int CGlobalPortCfg::rcv_send(int port,int queue_id){ return (0); } -int CGlobalPortCfg::rcv_send_all(int queue_id){ +int CGlobalTRex::rcv_send_all(int queue_id){ int i; for (i=0; i<m_max_ports; i++) { rcv_send(i,queue_id); @@ -2966,7 +2963,7 @@ int CGlobalPortCfg::rcv_send_all(int queue_id){ -int CGlobalPortCfg::test_send(){ +int CGlobalTRex::test_send(){ int i; CPhyEthIF * lp=&m_ports[0]; @@ -3118,7 +3115,7 @@ const uint8_t sctp_pkt1[]={ -int CGlobalPortCfg::create_pkt(uint8_t *pkt,int pkt_size){ +int CGlobalTRex::create_pkt(uint8_t *pkt,int pkt_size){ rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ; rte_mbuf_t * m=rte_pktmbuf_alloc(mp); @@ -3138,17 +3135,17 @@ int CGlobalPortCfg::create_pkt(uint8_t *pkt,int pkt_size){ return (0); } -int CGlobalPortCfg::create_udp_pkt(){ +int CGlobalTRex::create_udp_pkt(){ return (create_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt))); } -int CGlobalPortCfg::create_sctp_pkt(){ +int CGlobalTRex::create_sctp_pkt(){ return (create_pkt((uint8_t*)sctp_pkt1,sizeof(sctp_pkt1))); } /* test by sending 10 packets ...*/ -int CGlobalPortCfg::test_send_pkts(uint16_t queue_id, +int CGlobalTRex::test_send_pkts(uint16_t queue_id, int pkt, int port){ @@ -3174,7 +3171,7 @@ int CGlobalPortCfg::test_send_pkts(uint16_t queue_id, -int CGlobalPortCfg::set_promisc_all(bool enable){ +int CGlobalTRex::set_promisc_all(bool enable){ int i; for (i=0; i<m_max_ports; i++) { CPhyEthIF * _if=&m_ports[i]; @@ -3184,7 +3181,7 @@ int CGlobalPortCfg::set_promisc_all(bool enable){ -int CGlobalPortCfg::reset_counters(){ +int CGlobalTRex::reset_counters(){ int i; for (i=0; i<m_max_ports; i++) { CPhyEthIF * _if=&m_ports[i]; @@ -3193,7 +3190,7 @@ int CGlobalPortCfg::reset_counters(){ } -bool CGlobalPortCfg::is_all_links_are_up(bool dump){ +bool CGlobalTRex::is_all_links_are_up(bool dump){ bool all_link_are=true; int i; for (i=0; i<m_max_ports; i++) { @@ -3212,7 +3209,7 @@ bool CGlobalPortCfg::is_all_links_are_up(bool dump){ -int CGlobalPortCfg::ixgbe_rx_queue_flush(){ +int CGlobalTRex::ixgbe_rx_queue_flush(){ int i; for (i=0; i<m_max_ports; i++) { CPhyEthIF * _if=&m_ports[i]; @@ -3222,7 +3219,7 @@ int CGlobalPortCfg::ixgbe_rx_queue_flush(){ } -int CGlobalPortCfg::ixgbe_configure_mg(void){ +int CGlobalTRex::ixgbe_configure_mg(void){ int i; CLatencyManagerCfg mg_cfg; mg_cfg.m_max_ports = m_max_ports; @@ -3265,7 +3262,7 @@ int CGlobalPortCfg::ixgbe_configure_mg(void){ } -int CGlobalPortCfg::ixgbe_start(void){ +int CGlobalTRex::ixgbe_start(void){ int i; for (i=0; i<m_max_ports; i++) { @@ -3388,7 +3385,12 @@ int CGlobalPortCfg::ixgbe_start(void){ for (i=0; i<get_cores_tx(); i++) { int j=(i+1); int queue_id=((j-1)/get_base_num_cores() ); /* for the first min core queue 0 , then queue 1 etc */ - m_cores_vif[j].Create(j, + if ( get_is_stateless() ){ + m_cores_vif[j]=&m_cores_vif_sl[j]; + }else{ + m_cores_vif[j]=&m_cores_vif_sf[j]; + } + m_cores_vif[j]->Create(j, queue_id, &m_ports[port_offset], /* 0,2*/ queue_id, @@ -3403,28 +3405,37 @@ int CGlobalPortCfg::ixgbe_start(void){ fprintf(stdout," -------------------------------\n"); CCoreEthIF::DumpIfCfgHeader(stdout); for (i=0; i<get_cores_tx(); i++) { - m_cores_vif[i+1].DumpIfCfg(stdout); + m_cores_vif[i+1]->DumpIfCfg(stdout); } fprintf(stdout," -------------------------------\n"); } -bool CGlobalPortCfg::Create(bool is_stateless){ +bool CGlobalTRex::Create(){ + CFlowsYamlInfo pre_yaml_info; - /* hack - need to refactor */ - if (!is_stateless) { - if ( !m_zmq_publisher.Create( CGlobalInfo::m_options.m_zmq_port, - !CGlobalInfo::m_options.preview.get_zmq_publish_enable() ) ){ - return (false); - } + if (get_is_stateless()) { + + TrexStatelessCfg cfg; + + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); + + cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = NULL; + cfg.m_rpc_server_verbose = false; + cfg.m_platform_api = new TrexDpdkPlatformApi(); + + m_trex_stateless = new TrexStateless(cfg); + + } else { + pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file); } - /* We load the YAML twice, - this is the first time. to update global flags */ - CFlowsYamlInfo pre_yaml_info; - if (!is_stateless) { - pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file); - } + if ( !m_zmq_publisher.Create( CGlobalInfo::m_options.m_zmq_port, + !CGlobalInfo::m_options.preview.get_zmq_publish_enable() ) ){ + return (false); + } if ( pre_yaml_info.m_vlan_info.m_enable ){ CGlobalInfo::m_options.preview.set_vlan_mode_enable(true); @@ -3434,13 +3445,14 @@ bool CGlobalPortCfg::Create(bool is_stateless){ ixgbe_prob_init(); cores_prob_init(); queues_prob_init(); - /* allocate rings */ - assert( CMsgIns::Ins()->Create(get_cores_tx()) ); - if ( sizeof(CGenNodeNatInfo) != sizeof(CGenNode) ) { - printf("ERROR sizeof(CGenNodeNatInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode)); - assert(0); - } + /* allocate rings */ + assert( CMsgIns::Ins()->Create(get_cores_tx()) ); + + if ( sizeof(CGenNodeNatInfo) != sizeof(CGenNode) ) { + printf("ERROR sizeof(CGenNodeNatInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode)); + assert(0); + } if ( sizeof(CGenNodeLatencyPktInfo) != sizeof(CGenNode) ) { printf("ERROR sizeof(CGenNodeLatencyPktInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeLatencyPktInfo),sizeof(CGenNode)); @@ -3463,13 +3475,13 @@ bool CGlobalPortCfg::Create(bool is_stateless){ return (true); } -void CGlobalPortCfg::Delete(){ +void CGlobalTRex::Delete(){ m_zmq_publisher.Delete(); } -int CGlobalPortCfg::ixgbe_prob_init(void){ +int CGlobalTRex::ixgbe_prob_init(void){ uint8_t nb_ports; @@ -3561,13 +3573,13 @@ int CGlobalPortCfg::ixgbe_prob_init(void){ return (0); } -int CGlobalPortCfg::cores_prob_init(){ +int CGlobalTRex::cores_prob_init(){ m_max_cores = rte_lcore_count(); assert(m_max_cores>0); return (0); } -int CGlobalPortCfg::queues_prob_init(){ +int CGlobalTRex::queues_prob_init(){ if (m_max_cores < 2) { rte_exit(EXIT_FAILURE, "number of cores should be at least 3 \n"); @@ -3608,7 +3620,7 @@ int CGlobalPortCfg::queues_prob_init(){ } -void CGlobalPortCfg::dump_config(FILE *fd){ +void CGlobalTRex::dump_config(FILE *fd){ fprintf(fd," number of ports : %u \n",m_max_ports); fprintf(fd," max cores for 2 ports : %u \n",m_cores_to_dual_ports); fprintf(fd," max queue per port : %u \n",m_max_queues_per_port); @@ -3616,7 +3628,7 @@ void CGlobalPortCfg::dump_config(FILE *fd){ -void CGlobalPortCfg::dump_post_test_stats(FILE *fd){ +void CGlobalTRex::dump_post_test_stats(FILE *fd){ uint64_t pkt_out=0; uint64_t pkt_out_bytes=0; uint64_t pkt_in_bytes=0; @@ -3627,7 +3639,7 @@ void CGlobalPortCfg::dump_post_test_stats(FILE *fd){ int i; for (i=0; i<get_cores_tx(); i++) { - CCoreEthIF * erf_vif = &m_cores_vif[i+1]; + CCoreEthIF * erf_vif = m_cores_vif[i+1]; CVirtualIFPerSideStats stats; erf_vif->GetCoreCounters(&stats); sw_pkt_out += stats.m_tx_pkt; @@ -3675,7 +3687,7 @@ void CGlobalPortCfg::dump_post_test_stats(FILE *fd){ } -void CGlobalPortCfg::update_stats(){ +void CGlobalTRex::update_stats(){ int i; for (i=0; i<m_max_ports; i++) { @@ -3696,7 +3708,7 @@ void CGlobalPortCfg::update_stats(){ } -void CGlobalPortCfg::get_stats(CGlobalStats & stats){ +void CGlobalTRex::get_stats(CGlobalStats & stats){ int i; float total_tx=0.0; @@ -3798,7 +3810,13 @@ void CGlobalPortCfg::get_stats(CGlobalStats & stats){ stats.m_total_clients = total_clients; stats.m_total_servers = total_servers; stats.m_active_sockets = active_sockets; - stats.m_socket_util =100.0*(double)active_sockets/(double)total_sockets; + + if (total_sockets != 0) { + stats.m_socket_util =100.0*(double)active_sockets/(double)total_sockets; + } else { + stats.m_socket_util = 0; + } + float drop_rate=total_tx-total_rx; @@ -3822,7 +3840,7 @@ void CGlobalPortCfg::get_stats(CGlobalStats & stats){ stats.m_tx_expected_bps = m_expected_bps*pf; } -bool CGlobalPortCfg::sanity_check(){ +bool CGlobalTRex::sanity_check(){ CFlowGenListPerThread * lpt; uint32_t errors=0; @@ -3842,7 +3860,7 @@ bool CGlobalPortCfg::sanity_check(){ /* dump the template info */ -void CGlobalPortCfg::dump_template_info(std::string & json){ +void CGlobalTRex::dump_template_info(std::string & json){ CFlowGenListPerThread * lpt = m_fl.m_threads_info[0]; CFlowsYamlInfo * yaml_info=&lpt->m_yaml_info; @@ -3857,7 +3875,7 @@ void CGlobalPortCfg::dump_template_info(std::string & json){ json+="]}" ; } -void CGlobalPortCfg::dump_stats(FILE *fd,std::string & json, +void CGlobalTRex::dump_stats(FILE *fd,std::string & json, CGlobalStats::DumpFormat format){ CGlobalStats stats; update_stats(); @@ -3897,11 +3915,13 @@ void CGlobalPortCfg::dump_stats(FILE *fd,std::string & json, } -int CGlobalPortCfg::run_in_master(){ +int CGlobalTRex::run_in_master(){ std::string json; bool was_stopped=false; + m_trex_stateless->launch_control_plane(); + while ( true ) { if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){ @@ -3953,10 +3973,10 @@ int CGlobalPortCfg::run_in_master(){ m_fl.m_threads_info[0]->m_node_gen.dump_json(json); m_zmq_publisher.publish_json(json); - dump_template_info(json); - m_zmq_publisher.publish_json(json); - - + if ( !get_is_stateless() ){ + dump_template_info(json); + m_zmq_publisher.publish_json(json); + } if ( !CGlobalInfo::m_options.is_latency_disabled() ){ m_mg.update(); @@ -4009,6 +4029,10 @@ int CGlobalPortCfg::run_in_master(){ } + /* stateless info */ + m_trex_stateless->generate_publish_snapshot(json); + m_zmq_publisher.publish_json(json); + delay(500); if ( is_all_cores_finished() ) { @@ -4027,7 +4051,7 @@ int CGlobalPortCfg::run_in_master(){ -int CGlobalPortCfg::run_in_laterncy_core(void){ +int CGlobalTRex::run_in_laterncy_core(void){ if ( !CGlobalInfo::m_options.is_latency_disabled() ){ m_mg.start(0); } @@ -4035,12 +4059,12 @@ int CGlobalPortCfg::run_in_laterncy_core(void){ } -int CGlobalPortCfg::stop_core(virtual_thread_id_t virt_core_id){ +int CGlobalTRex::stop_core(virtual_thread_id_t virt_core_id){ m_signal[virt_core_id]=1; return (0); } -int CGlobalPortCfg::run_in_core(virtual_thread_id_t virt_core_id){ +int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ CPreviewMode *lp=&CGlobalInfo::m_options.preview; if ( lp->getSingleCore() && @@ -4051,19 +4075,23 @@ int CGlobalPortCfg::run_in_core(virtual_thread_id_t virt_core_id){ return (0); } + assert(m_fl_was_init); CFlowGenListPerThread * lpt; lpt = m_fl.m_threads_info[virt_core_id-1]; - lpt->generate_erf(CGlobalInfo::m_options.out_file,*lp); - //lpt->m_node_gen.DumpHist(stdout); - //lpt->DumpStats(stdout); + + if (get_is_stateless()) { + lpt->start_stateless_daemon(); + }else{ + lpt->start_generate_stateful(CGlobalInfo::m_options.out_file,*lp); + } m_signal[virt_core_id]=1; return (0); } -int CGlobalPortCfg::stop_master(){ +int CGlobalTRex::stop_master(){ delay(1000); std::string json; @@ -4084,7 +4112,7 @@ int CGlobalPortCfg::stop_master(){ int i; for (i=0; i<get_cores_tx(); i++) { lpt = m_fl.m_threads_info[i]; - CCoreEthIF * erf_vif = &m_cores_vif[i+1]; + CCoreEthIF * erf_vif = m_cores_vif[i+1]; erf_vif->DumpCoreStats(stdout); erf_vif->DumpIfStats(stdout); @@ -4117,7 +4145,7 @@ int CGlobalPortCfg::stop_master(){ } -bool CGlobalPortCfg::is_all_cores_finished(){ +bool CGlobalTRex::is_all_cores_finished(){ int i; for (i=0; i<get_cores_tx(); i++) { if ( m_signal[i+1]==0){ @@ -4128,8 +4156,32 @@ bool CGlobalPortCfg::is_all_cores_finished(){ } +int CGlobalTRex::start_master_stateless(){ + int i; + for (i=0; i<BP_MAX_CORES; i++) { + m_signal[i]=0; + } + m_fl.Create(); + m_expected_pps = 0; + m_expected_cps = 0; + m_expected_bps = 0; + + m_fl.generate_p_thread_info(get_cores_tx()); + CFlowGenListPerThread * lpt; + + for (i=0; i<get_cores_tx(); i++) { + lpt = m_fl.m_threads_info[i]; + CVirtualIF * erf_vif = m_cores_vif[i+1]; + lpt->set_vif(erf_vif); + lpt->m_node_gen.m_socket_id =m_cores_vif[i+1]->get_socket_id(); + } + m_fl_was_init=true; +} + + + -int CGlobalPortCfg::start_send_master(){ +int CGlobalTRex::start_send_master(){ int i; for (i=0; i<BP_MAX_CORES; i++) { m_signal[i]=0; @@ -4174,10 +4226,10 @@ int CGlobalPortCfg::start_send_master(){ for (i=0; i<get_cores_tx(); i++) { lpt = m_fl.m_threads_info[i]; //CNullIF * erf_vif = new CNullIF(); - CVirtualIF * erf_vif = &m_cores_vif[i+1]; + CVirtualIF * erf_vif = m_cores_vif[i+1]; lpt->set_vif(erf_vif); /* socket id */ - lpt->m_node_gen.m_socket_id =m_cores_vif[i+1].get_socket_id(); + lpt->m_node_gen.m_socket_id =m_cores_vif[i+1]->get_socket_id(); } m_fl_was_init=true; @@ -4187,7 +4239,12 @@ int CGlobalPortCfg::start_send_master(){ //////////////////////////////////////////// -static CGlobalPortCfg ports_cfg; +static CGlobalTRex g_trex; + + +TrexStateless * get_stateless_obj() { + return g_trex.m_trex_stateless; +} static int latency_one_lcore(__attribute__((unused)) void *dummy) { @@ -4196,34 +4253,22 @@ static int latency_one_lcore(__attribute__((unused)) void *dummy) if ( lpsock->thread_phy_is_latency( phy_id ) ){ - ports_cfg.run_in_laterncy_core(); + g_trex.run_in_laterncy_core(); }else{ if ( lpsock->thread_phy_is_master( phy_id ) ) { - ports_cfg.run_in_master(); + g_trex.run_in_master(); delay(1); }else{ delay((uint32_t)(1000.0*CGlobalInfo::m_options.m_duration)); /* this core has stopped */ - ports_cfg.m_signal[ lpsock->thread_phy_to_virt( phy_id ) ]=1; + g_trex.m_signal[ lpsock->thread_phy_to_virt( phy_id ) ]=1; } } return 0; } -static int stateless_entry(__attribute__((unused)) void *dummy) { - CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; - physical_thread_id_t phy_id = rte_lcore_id(); - - if (lpsock->thread_phy_is_master( phy_id )) { - TrexStateless::get_instance().launch_control_plane(); - } else { - TrexStateless::get_instance().launch_on_dp_core(phy_id); - } - - return (0); -} static int slave_one_lcore(__attribute__((unused)) void *dummy) { @@ -4232,13 +4277,13 @@ static int slave_one_lcore(__attribute__((unused)) void *dummy) if ( lpsock->thread_phy_is_latency( phy_id ) ){ - ports_cfg.run_in_laterncy_core(); + g_trex.run_in_laterncy_core(); }else{ if ( lpsock->thread_phy_is_master( phy_id ) ) { - ports_cfg.run_in_master(); + g_trex.run_in_master(); delay(1); }else{ - ports_cfg.run_in_core( lpsock->thread_phy_to_virt( phy_id ) ); + g_trex.run_in_core( lpsock->thread_phy_to_virt( phy_id ) ); } } return 0; @@ -4418,7 +4463,7 @@ int sim_load_list_of_cap_files(CParserOption * op){ lpt->set_vif(&erf_vif); if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) { - lpt->generate_erf(op->out_file,op->preview); + lpt->start_generate_stateful(op->out_file,op->preview); } lpt->m_node_gen.DumpHist(stdout); @@ -4430,42 +4475,6 @@ int sim_load_list_of_cap_files(CParserOption * op){ } - - -static int -launch_stateless_trex() { - CPlatformSocketInfo *lpsock=&CGlobalInfo::m_socket; - CParserOption *lpop= &CGlobalInfo::m_options; - CPlatformYamlInfo *cg=&global_platform_cfg_info; - - TrexStatelessCfg cfg; - - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - - cfg.m_dp_core_count = lpop->preview.getCores(); - cfg.m_port_count = lpop->m_expected_portd; - cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = &rpc_async_cfg; - cfg.m_rpc_server_verbose = true; - - TrexStateless::configure(cfg); - - printf("\nStarting T-Rex Stateless\n"); - printf("Starting RPC Server...\n\n"); - - rte_eal_mp_remote_launch(stateless_entry, NULL, CALL_MASTER); - - unsigned lcore_id; - RTE_LCORE_FOREACH_SLAVE(lcore_id) { - if (rte_eal_wait_lcore(lcore_id) < 0) - return -1; - } - return (0); -} - - - int main_test(int argc , char * argv[]){ utl_termio_init(); @@ -4525,78 +4534,62 @@ int main_test(int argc , char * argv[]){ return ( sim_load_list_of_cap_files(&CGlobalInfo::m_options) ); } - bool is_stateless = (CGlobalInfo::m_options.m_run_mode == CParserOption::RUN_MODE_INTERACTIVE); - - if ( !ports_cfg.Create(is_stateless) ){ + if ( !g_trex.Create() ){ exit(1); } - /* patch here */ - if (is_stateless) { - return launch_stateless_trex(); - } - - if (po->preview.get_is_rx_check_enable() && (po->m_rx_check_sampe< get_min_sample_rate()) ) { po->m_rx_check_sampe = get_min_sample_rate(); printf("Warning rx check sample rate should be lower than %d setting it to %d\n",get_min_sample_rate(),get_min_sample_rate()); } /* set dump mode */ - ports_cfg.m_io_modes.set_mode((CTrexGlobalIoMode::CliDumpMode)CGlobalInfo::m_options.m_io_mode); + g_trex.m_io_modes.set_mode((CTrexGlobalIoMode::CliDumpMode)CGlobalInfo::m_options.m_io_mode); if ( !CGlobalInfo::m_options.is_latency_disabled() && (CGlobalInfo::m_options.m_latency_prev>0) ){ uint32_t pkts = CGlobalInfo::m_options.m_latency_prev* CGlobalInfo::m_options.m_latency_rate; printf("Start prev latency check - hack for Keren for %d sec \n",CGlobalInfo::m_options.m_latency_prev); - ports_cfg.m_mg.start(pkts); + g_trex.m_mg.start(pkts); printf("Delay now you can call command \n"); delay(CGlobalInfo::m_options.m_latency_prev* 1000); printf("Finish wating \n"); - ports_cfg.m_mg.reset(); - ports_cfg.reset_counters(); + g_trex.m_mg.reset(); + g_trex.reset_counters(); } - ports_cfg.start_send_master(); + if ( get_is_stateless() ) { + g_trex.start_master_stateless(); - // TBD remove - //ports_cfg.test_latency(); - /* test seding */ - //while (1) { - //} + }else{ + g_trex.start_send_master(); + } /* TBD_FDIR */ #if 0 printf(" test_send \n"); - ports_cfg.test_send(); + g_trex.test_send(); while (1) { delay(10000); } #endif - - - - //ports_cfg.test_latency(); - //return (0); - - if ( CGlobalInfo::m_options.preview.getOnlyLatency() ){ rte_eal_mp_remote_launch(latency_one_lcore, NULL, CALL_MASTER); RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (rte_eal_wait_lcore(lcore_id) < 0) return -1; } - ports_cfg.stop_master(); + g_trex.stop_master(); return (0); } if ( CGlobalInfo::m_options.preview.getSingleCore() ) { - ports_cfg.run_in_core(1); - ports_cfg.stop_master(); + g_trex.run_in_core(1); + g_trex.stop_master(); return (0); } @@ -4606,8 +4599,8 @@ int main_test(int argc , char * argv[]){ return -1; } - ports_cfg.stop_master(); - ports_cfg.Delete(); + g_trex.stop_master(); + g_trex.Delete(); utl_termio_reset(); return (0); @@ -5146,3 +5139,36 @@ struct rte_mbuf * rte_mbuf_convert_to_one_seg(struct rte_mbuf *m){ } +/*********************************************************** + * platfrom API object + * TODO: REMOVE THIS TO A SEPERATE FILE + * + **********************************************************/ +void +TrexDpdkPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const { + CGlobalStats trex_stats; + g_trex.get_stats(trex_stats); + + stats.m_stats.m_cpu_util = trex_stats.m_cpu_util; + + stats.m_stats.m_tx_bps = trex_stats.m_tx_bps; + stats.m_stats.m_tx_pps = trex_stats.m_tx_pps; + stats.m_stats.m_total_tx_pkts = trex_stats.m_total_tx_pkts; + stats.m_stats.m_total_tx_bytes = trex_stats.m_total_tx_bytes; + + stats.m_stats.m_rx_bps = trex_stats.m_rx_bps; + stats.m_stats.m_rx_pps = /*trex_stats.m_rx_pps*/ 0; /* missing */ + stats.m_stats.m_total_rx_pkts = trex_stats.m_total_rx_pkts; + stats.m_stats.m_total_rx_bytes = trex_stats.m_total_rx_bytes; +} + +void +TrexDpdkPlatformApi::get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const { + +} + +uint8_t +TrexDpdkPlatformApi::get_dp_core_count() const { + return CGlobalInfo::m_options.preview.getCores(); +} + diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp new file mode 100644 index 00000000..54f71e10 --- /dev/null +++ b/src/mock/trex_platform_api_mock.cpp @@ -0,0 +1,49 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <internal_api/trex_platform_api.h> + +void +TrexMockPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const { + + stats.m_stats.m_cpu_util = 0; + + stats.m_stats.m_tx_bps = 0; + stats.m_stats.m_tx_pps = 0; + stats.m_stats.m_total_tx_pkts = 0; + stats.m_stats.m_total_tx_bytes = 0; + + stats.m_stats.m_rx_bps = 0; + stats.m_stats.m_rx_pps = 0; + stats.m_stats.m_total_rx_pkts = 0; + stats.m_stats.m_total_rx_bytes = 0; +} + +void +TrexMockPlatformApi::get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const { + +} + +uint8_t +TrexMockPlatformApi::get_dp_core_count() const { + return (1); +} + diff --git a/src/mock/trex_rpc_server_mock.cpp b/src/mock/trex_rpc_server_mock.cpp index de43f92f..0bdf6cf1 100644 --- a/src/mock/trex_rpc_server_mock.cpp +++ b/src/mock/trex_rpc_server_mock.cpp @@ -21,12 +21,71 @@ limitations under the License. #include <trex_rpc_server_api.h> #include <trex_stateless.h> +#include <trex_stateless_dp_core.h> + +#include <msg_manager.h> #include <iostream> +#include <sstream> #include <unistd.h> +#include <string.h> +#include <zmq.h> +#include <bp_sim.h> using namespace std; +static TrexStateless *g_trex_stateless; +static uint16_t g_rpc_port; + +static bool +verify_tcp_port_is_free(uint16_t port) { + void *m_context = zmq_ctx_new(); + void *m_socket = zmq_socket (m_context, ZMQ_REP); + std::stringstream ss; + ss << "tcp://*:"; + ss << port; + + int rc = zmq_bind (m_socket, ss.str().c_str()); + + zmq_close(m_socket); + zmq_term(m_context); + + return (rc == 0); +} + +static uint16_t +find_free_tcp_port(uint16_t start_port = 5050) { + void *m_context = zmq_ctx_new(); + void *m_socket = zmq_socket (m_context, ZMQ_REP); + + uint16_t port = start_port; + while (true) { + std::stringstream ss; + ss << "tcp://*:"; + ss << port; + + int rc = zmq_bind (m_socket, ss.str().c_str()); + if (rc == 0) { + break; + } + + port++; + } + + zmq_close(m_socket); + zmq_term(m_context); + + return port; +} + +TrexStateless * get_stateless_obj() { + return g_trex_stateless; +} + +uint16_t gtest_get_mock_server_port() { + return g_rpc_port; +} + /** * on simulation this is not rebuild every version * (improved stub) @@ -42,44 +101,87 @@ extern "C" const char * get_build_time(void){ int gtest_main(int argc, char **argv); -int main(int argc, char *argv[]) { +static bool parse_uint16(const string arg, uint16_t &port) { + stringstream ss(arg); + + bool x = (ss >> port); + + return (x); +} + +static void +run_dummy_core() { + //TODO: connect this to the scheduler + + //CFlowGenList fl; + //fl.Create(); + //CFlowGenListPerThread *lp = new CFlowGenListPerThread(); + //lp->Create(0, 0, NULL, 0); + //TrexStatelessDpCore dummy_core(0, lp); + //lp->start_stateless_daemon(); +} +int main(int argc, char *argv[]) { bool is_gtest = false; + time_init(); + CGlobalInfo::m_socket.Create(0); + + CGlobalInfo::init_pools(1000); + assert( CMsgIns::Ins()->Create(1)); + + std::thread *m_thread = new std::thread(run_dummy_core); + (void)m_thread; + // gtest ? if (argc > 1) { - if (string(argv[1]) != "--ut") { - cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n"; + string arg = string(argv[1]); + + if (arg == "--ut") { + g_rpc_port = find_free_tcp_port(); + is_gtest = true; + } else if (parse_uint16(arg, g_rpc_port)) { + bool rc = verify_tcp_port_is_free(g_rpc_port); + if (!rc) { + cout << "port " << g_rpc_port << " is not available to use\n"; + exit(-1); + } + } else { + + cout << "\n[Usage] " << argv[0] << ": " << " [--ut] or [port number < 65535]\n\n"; exit(-1); } - is_gtest = true; + + } else { + g_rpc_port = find_free_tcp_port(); } /* configure the stateless object with 4 ports */ TrexStatelessCfg cfg; - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, g_rpc_port); + //TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); cfg.m_port_count = 4; - cfg.m_dp_core_count = 2; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = &rpc_async_cfg; + cfg.m_rpc_async_cfg = NULL; cfg.m_rpc_server_verbose = (is_gtest ? false : true); + cfg.m_platform_api = new TrexMockPlatformApi(); - TrexStateless::configure(cfg); + g_trex_stateless = new TrexStateless(cfg); - TrexStateless::get_instance().launch_control_plane(); + g_trex_stateless->launch_control_plane(); /* gtest handling */ if (is_gtest) { int rc = gtest_main(argc, argv); - TrexStateless::destroy(); + delete g_trex_stateless; + g_trex_stateless = NULL; return rc; } cout << "\n-= Starting RPC Server Mock =-\n\n"; - cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; + cout << "Listening on tcp://localhost:" << g_rpc_port << " [ZMQ]\n\n"; cout << "Server Started\n\n"; @@ -87,6 +189,7 @@ int main(int argc, char *argv[]) { sleep(1); } - TrexStateless::destroy(); + delete g_trex_stateless; + g_trex_stateless = NULL; } diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp index 4db96583..9f41d08c 100755 --- a/src/msg_manager.cpp +++ b/src/msg_manager.cpp @@ -26,7 +26,7 @@ limitations under the License. /*TBD: need to fix socket_id for NUMA */ -bool CMessagingManager::Create(uint8_t num_dp_threads){ +bool CMessagingManager::Create(uint8_t num_dp_threads,std::string a_name){ m_num_dp_threads=num_dp_threads; assert(m_dp_to_cp==0); assert(m_cp_to_dp==0); @@ -38,11 +38,11 @@ bool CMessagingManager::Create(uint8_t num_dp_threads){ char name[100]; lp=getRingCpToDp(i); - sprintf(name,"cp_to_dp_%d",i); + sprintf(name,"%s_to_%d",(char *)a_name.c_str(),i); assert(lp->Create(std::string(name),1024,0)==true); lp=getRingDpToCp(i); - sprintf(name,"dp_to_cp_%d",i); + sprintf(name,"%s_from_%d",(char *)a_name.c_str(),i); assert(lp->Create(std::string(name),1024,0)==true); } @@ -89,7 +89,12 @@ CMsgIns * CMsgIns::Ins(void){ } bool CMsgIns::Create(uint8_t num_threads){ - return ( m_rx_dp.Create(num_threads) ); + + bool res = m_cp_dp.Create(num_threads,"cp_dp"); + if (!res) { + return (res); + } + return (m_rx_dp.Create(num_threads,"rx_dp")); } diff --git a/src/msg_manager.h b/src/msg_manager.h index b25660bb..8958f826 100755 --- a/src/msg_manager.h +++ b/src/msg_manager.h @@ -23,12 +23,20 @@ limitations under the License. #include "CRing.h" +#include <string> /* messages from CP->DP Ids */ -#define NAT_MSG (7) -#define LATENCY_PKT_SEND_MSG (8) +struct CGenNodeMsgBase { + enum { + NAT_FIRST = 7, + LATENCY_PKT = 8, + } msg_types; + +public: + uint8_t m_msg_type; /* msg type */ +}; /* @@ -71,7 +79,7 @@ public: m_dp_to_cp=0; m_num_dp_threads=0; } - bool Create(uint8_t num_dp_threads); + bool Create(uint8_t num_dp_threads,std::string name); void Delete(); CNodeRing * getRingCpToDp(uint8_t thread_id); CNodeRing * getRingDpToCp(uint8_t thread_id); @@ -94,12 +102,18 @@ public: CMessagingManager * getRxDp(){ return (&m_rx_dp); } + CMessagingManager * getCpDp(){ + return (&m_cp_dp); + } + uint8_t get_num_threads(){ return (m_rx_dp.get_num_threads()); } private: CMessagingManager m_rx_dp; + CMessagingManager m_cp_dp; + private: /* one instance */ diff --git a/src/nat_check.h b/src/nat_check.h index b67c523c..a500ddaf 100755 --- a/src/nat_check.h +++ b/src/nat_check.h @@ -59,16 +59,6 @@ struct CNatFlowInfo { this struct should be in the same size of CGenNode beacuse allocator is global . */ -struct CGenNodeMsgBase { - enum { - NAT_FIRST = NAT_MSG, - LATENCY_PKT = LATENCY_PKT_SEND_MSG - } msg_types; - -public: - uint8_t m_msg_type; /* msg type */ -}; - struct CGenNodeNatInfo : public CGenNodeMsgBase { uint8_t m_pad; diff --git a/src/os_time.h b/src/os_time.h index 153ee3e3..0e732abf 100755 --- a/src/os_time.h +++ b/src/os_time.h @@ -22,6 +22,7 @@ limitations under the License. */ #include <stdint.h> +#include <time.h> typedef uint64_t hr_time_t; // time in high res tick typedef uint32_t hr_time_32_t; // time in high res tick @@ -129,6 +130,25 @@ static inline dsec_t now_sec(void){ } +static inline +void delay(int msec){ + + if (msec == 0) + {//user that requested that probebly wanted the minimal delay + //but because of scaling problem he have got 0 so we will give the min delay + //printf("\n\n\nERROR-Task delay ticks == 0 found in task %s task id = %d\n\n\n\n", + // SANB_TaskName(SANB_TaskIdSelf()), SANB_TaskIdSelf()); + msec =1; + + } + + struct timespec time1, remain; // 2 sec max delay + time1.tv_sec=msec/1000; + time1.tv_nsec=(msec - (time1.tv_sec*1000))*1000000; + + nanosleep(&time1,&remain); +} + #endif diff --git a/src/pal/linux/mbuf.h b/src/pal/linux/mbuf.h index 693b095a..35a442bf 100755 --- a/src/pal/linux/mbuf.h +++ b/src/pal/linux/mbuf.h @@ -185,8 +185,9 @@ static inline void utl_rte_pktmbuf_add_last(rte_mbuf_t *m,rte_mbuf_t *m_last){ #define __rte_cache_aligned -#define CACHE_LINE_SIZE 64 +#define CACHE_LINE_SIZE 64 +#define RTE_CACHE_LINE_SIZE 64 #define SOCKET_ID_ANY 0 #endif diff --git a/src/platform_cfg.cpp b/src/platform_cfg.cpp index f0911611..92ffefbd 100755 --- a/src/platform_cfg.cpp +++ b/src/platform_cfg.cpp @@ -300,6 +300,10 @@ void operator >> (const YAML::Node& node, CPlatformYamlInfo & plat_info) { plat_info.m_telnet_exist=true; } + if ( node.FindValue("zmq_rpc_port") ){ + node["zmq_rpc_port"] >> plat_info.m_zmq_rpc_port; + } + if ( node.FindValue("port_bandwidth_gb") ){ node["port_bandwidth_gb"] >> plat_info.m_port_bandwidth_gb; } diff --git a/src/platform_cfg.h b/src/platform_cfg.h index 2f335471..b4b03b10 100755 --- a/src/platform_cfg.h +++ b/src/platform_cfg.h @@ -184,6 +184,7 @@ public: m_telnet_exist=false; m_telnet_port=4502 ; + m_zmq_rpc_port = 5050; m_mac_info_exist=false; m_port_bandwidth_gb = 10; @@ -209,15 +210,17 @@ public: std::string m_limit_memory; uint32_t m_thread_per_dual_if; - uint32_t m_port_bandwidth_gb; + uint32_t m_port_bandwidth_gb; - bool m_enable_zmq_pub_exist; - bool m_enable_zmq_pub; - uint16_t m_zmq_pub_port; + bool m_enable_zmq_pub_exist; + bool m_enable_zmq_pub; + uint16_t m_zmq_pub_port; - bool m_telnet_exist; - uint16_t m_telnet_port; + bool m_telnet_exist; + uint16_t m_telnet_port; + + uint16_t m_zmq_rpc_port; bool m_mac_info_exist; std::vector <CMacYamlInfo> m_mac_info; diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index ae87d749..b40e996f 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -145,7 +145,7 @@ trex_rpc_cmd_rc_e TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { string hostname; - TrexStateless & instance = TrexStateless::get_instance(); + TrexStateless * main = get_stateless_obj(); Json::Value §ion = result["result"]; @@ -155,21 +155,21 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { section["uptime"] = TrexRpcServer::get_server_uptime(); /* FIXME: core count */ - section["dp_core_count"] = instance.get_dp_core_count(); + section["dp_core_count"] = main->get_dp_core_count(); section["core_type"] = get_cpu_model(); /* ports */ - section["port_count"] = instance.get_port_count(); + section["port_count"] = main->get_port_count(); section["ports"] = Json::arrayValue; - for (int i = 0; i < instance.get_port_count(); i++) { + for (int i = 0; i < main->get_port_count(); i++) { string driver; string speed; - TrexStatelessPort *port = instance.get_port_by_id(i); + TrexStatelessPort *port = main->get_port_by_id(i); port->get_properties(driver, speed); section["ports"][i]["index"] = i; @@ -201,7 +201,7 @@ TrexRpcCmdGetOwner::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); section["owner"] = port->get_owner(); return (TREX_RPC_CMD_OK); @@ -220,7 +220,7 @@ TrexRpcCmdAcquire::_run(const Json::Value ¶ms, Json::Value &result) { bool force = parse_bool(params, "force", result); /* if not free and not you and not force - fail */ - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); if ( (!port->is_free_to_aquire()) && (port->get_owner() != new_owner) && (!force)) { generate_execute_err(result, "port is already taken by '" + port->get_owner() + "'"); @@ -242,7 +242,7 @@ TrexRpcCmdRelease::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); if (port->get_state() == TrexStatelessPort::PORT_STATE_TRANSMITTING) { generate_execute_err(result, "cannot release a port during transmission"); @@ -264,7 +264,7 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); if (port->get_state() == TrexStatelessPort::PORT_STATE_DOWN) { generate_execute_err(result, "cannot get stats - port is down"); @@ -277,3 +277,42 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } +/** + * request the server a sync about a specific user + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdSyncUser::_run(const Json::Value ¶ms, Json::Value &result) { + + const string &user = parse_string(params, "user", result); + bool sync_streams = parse_bool(params, "sync_streams", result); + + result["result"] = Json::arrayValue; + + for (auto port : get_stateless_obj()->get_port_list()) { + if (port->get_owner() == user) { + + Json::Value owned_port; + + owned_port["port_id"] = port->get_port_id(); + owned_port["handler"] = port->get_owner_handler(); + owned_port["state"] = port->get_state_as_string(); + + /* if sync streams was asked - sync all the streams */ + if (sync_streams) { + owned_port["streams"] = Json::arrayValue; + + std::vector <TrexStream *> streams; + port->get_stream_table()->get_object_list(streams); + + for (auto stream : streams) { + owned_port["streams"].append(stream->get_stream_json()); + } + } + + result["result"].append(owned_port); + } + } + + return (TREX_RPC_CMD_OK); +} diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 20107411..9854cad7 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -114,7 +114,7 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { /* make sure this is a valid stream to add */ validate_stream(stream, result); - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(stream->m_port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(stream->m_port_id); port->get_stream_table()->add_stream(stream); result["result"] = "ACK"; @@ -282,15 +282,15 @@ TrexRpcCmdAddStream::validate_stream(const TrexStream *stream, Json::Value &resu } /* port id should be between 0 and count - 1 */ - if (stream->m_port_id >= TrexStateless::get_instance().get_port_count()) { + if (stream->m_port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; delete stream; generate_execute_err(result, ss.str()); } /* add the stream to the port's stream table */ - TrexStatelessPort * port = TrexStateless::get_instance().get_port_by_id(stream->m_port_id); + TrexStatelessPort * port = get_stateless_obj()->get_port_by_id(stream->m_port_id); /* does such a stream exists ? */ if (port->get_stream_table()->get_stream_by_id(stream->m_stream_id)) { @@ -312,13 +312,13 @@ TrexRpcCmdRemoveStream::_run(const Json::Value ¶ms, Json::Value &result) { uint32_t stream_id = parse_int(params, "stream_id", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id); if (!stream) { @@ -344,13 +344,13 @@ trex_rpc_cmd_rc_e TrexRpcCmdRemoveAllStreams::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_byte(params, "port_id", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); port->get_stream_table()->remove_and_delete_all_streams(); result["result"] = "ACK"; @@ -369,15 +369,15 @@ TrexRpcCmdGetStreamList::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_byte(params, "port_id", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - port->get_stream_table()->get_stream_list(stream_list); + port->get_stream_table()->get_id_list(stream_list); Json::Value json_list = Json::arrayValue; @@ -401,13 +401,13 @@ TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { uint32_t stream_id = parse_int(params, "stream_id", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id); @@ -432,16 +432,17 @@ trex_rpc_cmd_rc_e TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_byte(params, "port_id", result); + double mul = parse_double(params, "mul", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - TrexStatelessPort::rc_e rc = port->start_traffic(); + TrexStatelessPort::rc_e rc = port->start_traffic(mul); if (rc == TrexStatelessPort::RC_OK) { result["result"] = "ACK"; @@ -462,7 +463,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { generate_execute_err(result, ss.str()); } - return (TREX_RPC_CMD_OK); + return (TREX_RPC_CMD_OK); } /*************************** @@ -473,13 +474,13 @@ trex_rpc_cmd_rc_e TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_byte(params, "port_id", result); - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); port->stop_traffic(); result["result"] = "ACK"; diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 5926a8d8..91c29548 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -102,8 +102,10 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, true); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 2, true); TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSyncUser, "sync_user", 2, false); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index f4d21f2f..46fe499b 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -79,7 +79,7 @@ TrexRpcServerAsync::_rpc_thread_cb() { } /* trigger a full update for stats */ - TrexStateless::get_instance().update_stats(); + //get_stateless_obj()->update_stats(); /* done with the lock */ if (m_lock) { @@ -87,7 +87,7 @@ TrexRpcServerAsync::_rpc_thread_cb() { } /* encode them to JSON */ - TrexStateless::get_instance().encode_stats(snapshot); + get_stateless_obj()->encode_stats(snapshot); /* write to string and publish */ std::string snapshot_str = writer.write(snapshot); diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index 920a8d30..af0db3f4 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -61,7 +61,7 @@ TrexRpcCommand::verify_ownership(const Json::Value ¶ms, Json::Value &result) std::string handler = parse_string(params, "handler", result); uint8_t port_id = parse_port(params, result); - TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); if (!port->verify_owner_handler(handler)) { generate_execute_err(result, "invalid handler provided. please pass the handler given when calling 'acquire' or take ownership"); @@ -78,9 +78,9 @@ TrexRpcCommand::parse_port(const Json::Value ¶ms, Json::Value &result) { void TrexRpcCommand::validate_port_id(uint8_t port_id, Json::Value &result) { - if (port_id >= TrexStateless::get_instance().get_port_count()) { + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } } diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index c1c546f3..46281aff 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -42,6 +42,8 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdRelease()); register_command(new TrexRpcCmdGetPortStats()); + register_command(new TrexRpcCmdSyncUser()); + /* stream commands */ register_command(new TrexRpcCmdAddStream()); register_command(new TrexRpcCmdRemoveStream()); diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 72762e26..e0e95450 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -31,55 +31,58 @@ using namespace std; * Trex stateless object * **********************************************************/ -TrexStateless::TrexStateless() { - m_is_configured = false; -} - /** - * configure the singleton stateless object * */ -void TrexStateless::configure(const TrexStatelessCfg &cfg) { - - TrexStateless& instance = get_instance_internal(); - - /* check status */ - if (instance.m_is_configured) { - throw TrexException("re-configuration of stateless object is not allowed"); - } +TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &instance.m_global_cp_lock); - instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock); + m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ + m_port_count = cfg.m_port_count; - instance.m_port_count = cfg.m_port_count; - - for (int i = 0; i < instance.m_port_count; i++) { - instance.m_ports.push_back(new TrexStatelessPort(i)); + for (int i = 0; i < m_port_count; i++) { + m_ports.push_back(new TrexStatelessPort(i)); } - /* cores */ - instance.m_dp_core_count = cfg.m_dp_core_count; - for (int i = 0; i < instance.m_dp_core_count; i++) { - instance.m_dp_cores.push_back(new TrexStatelessDpCore(i)); + m_platform_api = cfg.m_platform_api; +} + +/** + * release all memory + * + * @author imarom (08-Oct-15) + */ +TrexStateless::~TrexStateless() { + + /* release memory for ports */ + for (auto port : m_ports) { + delete port; } + m_ports.clear(); + + /* stops the RPC server */ + m_rpc_server->stop(); + delete m_rpc_server; + + m_rpc_server = NULL; - /* done */ - instance.m_is_configured = true; + delete m_platform_api; + m_platform_api = NULL; } + /** * starts the control plane side * */ void TrexStateless::launch_control_plane() { - //std::cout << "\n on control/master core \n"; /* pin this process to the current running CPU any new thread will be called on the same CPU @@ -94,39 +97,6 @@ TrexStateless::launch_control_plane() { m_rpc_server->start(); } -void -TrexStateless::launch_on_dp_core(uint8_t core_id) { - m_dp_cores[core_id - 1]->run(); -} - -/** - * destroy the singleton and release all memory - * - * @author imarom (08-Oct-15) - */ -void -TrexStateless::destroy() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - return; - } - - /* release memory for ports */ - for (auto port : instance.m_ports) { - delete port; - } - instance.m_ports.clear(); - - /* stops the RPC server */ - instance.m_rpc_server->stop(); - delete instance.m_rpc_server; - - instance.m_rpc_server = NULL; - - /* done */ - instance.m_is_configured = false; -} /** * fetch a port by ID @@ -148,57 +118,32 @@ TrexStateless::get_port_count() { uint8_t TrexStateless::get_dp_core_count() { - return m_dp_core_count; -} - -void -TrexStateless::update_stats() { - - /* update CPU util. - TODO - */ - m_stats.m_stats.m_cpu_util = 0; - - /* for every port update and accumulate */ - for (uint8_t i = 0; i < m_port_count; i++) { - m_ports[i]->update_stats(); - - const TrexPortStats & port_stats = m_ports[i]->get_stats(); - - m_stats.m_stats.m_tx_bps += port_stats.m_stats.m_tx_bps; - m_stats.m_stats.m_rx_bps += port_stats.m_stats.m_rx_bps; - - m_stats.m_stats.m_tx_pps += port_stats.m_stats.m_tx_pps; - m_stats.m_stats.m_rx_pps += port_stats.m_stats.m_rx_pps; - - m_stats.m_stats.m_total_tx_pkts += port_stats.m_stats.m_total_tx_pkts; - m_stats.m_stats.m_total_rx_pkts += port_stats.m_stats.m_total_rx_pkts; - - m_stats.m_stats.m_total_tx_bytes += port_stats.m_stats.m_total_tx_bytes; - m_stats.m_stats.m_total_rx_bytes += port_stats.m_stats.m_total_rx_bytes; - - m_stats.m_stats.m_tx_rx_errors += port_stats.m_stats.m_tx_rx_errors; - } + return m_platform_api->get_dp_core_count(); } void TrexStateless::encode_stats(Json::Value &global) { - global["cpu_util"] = m_stats.m_stats.m_cpu_util; + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + + TrexPlatformGlobalStats stats; + api->get_global_stats(stats); - global["tx_bps"] = m_stats.m_stats.m_tx_bps; - global["rx_bps"] = m_stats.m_stats.m_rx_bps; + global["cpu_util"] = stats.m_stats.m_cpu_util; - global["tx_pps"] = m_stats.m_stats.m_tx_pps; - global["rx_pps"] = m_stats.m_stats.m_rx_pps; + global["tx_bps"] = stats.m_stats.m_tx_bps; + global["rx_bps"] = stats.m_stats.m_rx_bps; - global["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); + global["tx_pps"] = stats.m_stats.m_tx_pps; + global["rx_pps"] = stats.m_stats.m_rx_pps; - global["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); + global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - global["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); + global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); for (uint8_t i = 0; i < m_port_count; i++) { std::stringstream ss; @@ -210,3 +155,20 @@ TrexStateless::encode_stats(Json::Value &global) { } } +/** + * generate a snapshot for publish (async publish) + * + */ +void +TrexStateless::generate_publish_snapshot(std::string &snapshot) { + Json::FastWriter writer; + Json::Value root; + + root["name"] = "trex-stateless-info"; + root["type"] = 0; + + /* stateless specific info goes here */ + root["data"] = Json::nullValue; + + snapshot = writer.write(root); +} diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 649b25dd..57c6ef1d 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -29,9 +29,10 @@ limitations under the License. #include <trex_stream.h> #include <trex_stateless_port.h> -#include <trex_stateless_dp_core.h> #include <trex_rpc_server_api.h> +#include <internal_api/trex_platform_api.h> + /** * generic exception for errors * TODO: move this to a better place @@ -88,17 +89,17 @@ public: /* default values */ TrexStatelessCfg() { m_port_count = 0; - m_dp_core_count = 0; m_rpc_req_resp_cfg = NULL; m_rpc_async_cfg = NULL; - m_rpc_server_verbose = false; + m_rpc_server_verbose = false; + m_platform_api = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; const TrexRpcServerConfig *m_rpc_async_cfg; + const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; - uint8_t m_dp_core_count; }; /** @@ -113,27 +114,8 @@ public: * reconfiguration is not allowed * an exception will be thrown */ - static void configure(const TrexStatelessCfg &cfg); - - /** - * destroy the instance - * - */ - static void destroy(); - - /** - * singleton public get instance - * - */ - static TrexStateless& get_instance() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - throw TrexException("object is not configured"); - } - - return instance; - } + TrexStateless(const TrexStatelessCfg &cfg); + ~TrexStateless(); /** * starts the control plane side @@ -152,12 +134,6 @@ public: uint8_t get_dp_core_count(); - /** - * update all the stats (deep update) - * (include all the ports and global stats) - * - */ - void update_stats(); /** * fetch all the stats @@ -165,22 +141,25 @@ public: */ void encode_stats(Json::Value &global); + /** + * generate a snapshot for publish + */ + void generate_publish_snapshot(std::string &snapshot); -protected: - TrexStateless(); + const TrexPlatformApi * get_platform_api() { + return (m_platform_api); + } - static TrexStateless& get_instance_internal () { - static TrexStateless instance; - return instance; + const std::vector <TrexStatelessPort *> get_port_list() { + return m_ports; } - /* c++ 2011 style singleton */ +protected: + + /* no copy or assignment */ TrexStateless(TrexStateless const&) = delete; void operator=(TrexStateless const&) = delete; - /* status */ - bool m_is_configured; - /* RPC server array */ TrexRpcServer *m_rpc_server; @@ -188,15 +167,20 @@ protected: std::vector <TrexStatelessPort *> m_ports; uint8_t m_port_count; - /* cores */ - std::vector <TrexStatelessDpCore *> m_dp_cores; - uint8_t m_dp_core_count; - - /* stats */ - TrexStatelessStats m_stats; + /* platform API */ + const TrexPlatformApi *m_platform_api; std::mutex m_global_cp_lock; }; +/** + * an anchor function + * + * @author imarom (25-Oct-15) + * + * @return TrexStateless& + */ +TrexStateless * get_stateless_obj(); + #endif /* __TREX_STATELESS_H__ */ diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index a31847a5..a0b57b63 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -18,8 +18,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include <trex_stateless.h> #include <trex_stateless_port.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> + #include <string> #ifndef TREX_RPC_MOCK_SERVER @@ -49,7 +53,7 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { * */ TrexStatelessPort::rc_e -TrexStatelessPort::start_traffic(void) { +TrexStatelessPort::start_traffic(double mul) { if (m_port_state != PORT_STATE_UP_IDLE) { return (RC_ERR_BAD_STATE_FOR_OP); @@ -59,19 +63,52 @@ TrexStatelessPort::start_traffic(void) { return (RC_ERR_NO_STREAMS); } + /* fetch all the streams from the table */ + vector<TrexStream *> streams; + get_stream_table()->get_object_list(streams); + + /* compiler it */ + TrexStreamsCompiler compiler; + TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(m_port_id, mul); + + bool rc = compiler.compile(streams, *compiled_obj); + if (!rc) { + return (RC_ERR_FAILED_TO_COMPILE_STREAMS); + } + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj); + + // FIXME (add the right core list) + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + + ring->Enqueue((CGenNode *)start_msg); + + /* move the state to transmiting */ m_port_state = PORT_STATE_TRANSMITTING; - /* real code goes here */ return (RC_OK); } -void +TrexStatelessPort::rc_e TrexStatelessPort::stop_traffic(void) { /* real code goes here */ - if (m_port_state == PORT_STATE_TRANSMITTING) { - m_port_state = PORT_STATE_UP_IDLE; + if (m_port_state != PORT_STATE_TRANSMITTING) { + return (RC_ERR_BAD_STATE_FOR_OP); } + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); + + // FIXME (add the right core list) + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + + ring->Enqueue((CGenNode *)stop_msg); + + m_port_state = PORT_STATE_UP_IDLE; + + return (RC_OK); } /** @@ -130,99 +167,27 @@ TrexStatelessPort::generate_handler() { return (ss.str()); } -/** - * update stats for the port - * - */ -void -TrexStatelessPort::update_stats() { - struct rte_eth_stats stats; - rte_eth_stats_get(m_port_id, &stats); - - /* copy straight values */ - m_stats.m_stats.m_total_tx_bytes = stats.obytes; - m_stats.m_stats.m_total_rx_bytes = stats.ibytes; - - m_stats.m_stats.m_total_tx_pkts = stats.opackets; - m_stats.m_stats.m_total_rx_pkts = stats.ipackets; - - /* calculate stats */ - m_stats.m_stats.m_tx_bps = m_stats.m_bw_tx_bps.add(stats.obytes); - m_stats.m_stats.m_rx_bps = m_stats.m_bw_rx_bps.add(stats.ibytes); - - m_stats.m_stats.m_tx_pps = m_stats.m_bw_tx_pps.add(stats.opackets); - m_stats.m_stats.m_rx_pps = m_stats.m_bw_rx_pps.add(stats.ipackets); - -} - -const TrexPortStats & -TrexStatelessPort::get_stats() { - return m_stats; -} void TrexStatelessPort::encode_stats(Json::Value &port) { - port["tx_bps"] = m_stats.m_stats.m_tx_bps; - port["rx_bps"] = m_stats.m_stats.m_rx_bps; - - port["tx_pps"] = m_stats.m_stats.m_tx_pps; - port["rx_pps"] = m_stats.m_stats.m_rx_pps; - - port["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - port["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); - - port["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - port["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); - - port["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); -} - + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + TrexPlatformInterfaceStats stats; + api->get_interface_stats(m_port_id, stats); -/*************************** - * BW measurement - * - **************************/ -/* TODO: move this to a common place */ -BWMeasure::BWMeasure() { - reset(); -} - -void BWMeasure::reset(void) { - m_start=false; - m_last_time_msec=0; - m_last_bytes=0; - m_last_result=0.0; -}; - -double BWMeasure::calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes){ - double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) ); - return(rate); -} + port["tx_bps"] = stats.m_stats.m_tx_bps; + port["rx_bps"] = stats.m_stats.m_rx_bps; -double BWMeasure::add(uint64_t size) { - if ( false == m_start ) { - m_start=true; - m_last_time_msec = os_get_time_msec() ; - m_last_bytes=size; - return(0.0); - } - - uint32_t ctime=os_get_time_msec(); - if ((ctime - m_last_time_msec) <os_get_time_freq() ) { - return(m_last_result); - } + port["tx_pps"] = stats.m_stats.m_tx_pps; + port["rx_pps"] = stats.m_stats.m_rx_pps; - uint32_t dtime_msec = ctime-m_last_time_msec; - uint64_t dbytes = size - m_last_bytes; + port["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + port["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - m_last_time_msec = ctime; - m_last_bytes = size; - - m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result); - return( m_last_result ); + port["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + port["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); } - diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 428d5aee..3e071954 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -24,71 +24,6 @@ limitations under the License. #include <trex_stream.h> /** - * bandwidth measurement class - * - */ -class BWMeasure { -public: - BWMeasure(); - void reset(void); - double add(uint64_t size); - -private: - double calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes); - -public: - bool m_start; - uint32_t m_last_time_msec; - uint64_t m_last_bytes; - double m_last_result; -}; - -/** - * TRex stateless port stats - * - * @author imarom (24-Sep-15) - */ -class TrexPortStats { - -public: - TrexPortStats() { - m_stats = {0}; - - m_bw_tx_bps.reset(); - m_bw_rx_bps.reset(); - - m_bw_tx_pps.reset(); - m_bw_rx_pps.reset(); - } - -public: - - BWMeasure m_bw_tx_bps; - BWMeasure m_bw_rx_bps; - - BWMeasure m_bw_tx_pps; - BWMeasure m_bw_rx_pps; - - struct { - - double m_tx_bps; - double m_rx_bps; - - double m_tx_pps; - double m_rx_pps; - - uint64_t m_total_tx_pkts; - uint64_t m_total_rx_pkts; - - uint64_t m_total_tx_bytes; - uint64_t m_total_rx_bytes; - - uint64_t m_tx_rx_errors; - } m_stats; -}; - -/** * describes a stateless port * * @author imarom (31-Aug-15) @@ -121,13 +56,13 @@ public: * start traffic * */ - rc_e start_traffic(void); + rc_e start_traffic(double mul); /** * stop traffic * */ - void stop_traffic(void); + rc_e stop_traffic(void); /** * access the stream table @@ -203,18 +138,14 @@ public: } /** - * update the values of the stats - * - */ - void update_stats(); - - const TrexPortStats & get_stats(); - - /** * encode stats as JSON */ void encode_stats(Json::Value &port); + uint8_t get_port_id() { + return m_port_id; + } + private: std::string generate_handler(); @@ -224,7 +155,6 @@ private: port_state_e m_port_state; std::string m_owner; std::string m_owner_handler; - TrexPortStats m_stats; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 182036f1..ba306137 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include <trex_stream.h> #include <cstddef> +#include <string.h> /************************************** * stream @@ -103,14 +104,24 @@ TrexStream * TrexStreamTable::get_stream_by_id(uint32_t stream_id) { } } -void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) { - stream_list.clear(); +void TrexStreamTable::get_id_list(std::vector<uint32_t> &id_list) { + id_list.clear(); for (auto stream : m_stream_table) { - stream_list.push_back(stream.first); + id_list.push_back(stream.first); } } +void TrexStreamTable::get_object_list(std::vector<TrexStream *> &object_list) { + object_list.clear(); + + for (auto stream : m_stream_table) { + object_list.push_back(stream.second); + } + +} + int TrexStreamTable::size() { return m_stream_table.size(); } + diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index f5bc96ef..c8a15240 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -37,10 +37,6 @@ class TrexRpcCmdAddStream; * */ class TrexStream { - /* provide the RPC parser a way to access private fields */ - friend class TrexRpcCmdAddStream; - friend class TrexRpcCmdGetStream; - friend class TrexStreamTable; public: TrexStream(uint8_t port_id, uint32_t stream_id); @@ -56,7 +52,7 @@ public: /* access the stream json */ const Json::Value & get_stream_json(); -protected: +public: /* basic */ uint8_t m_port_id; uint32_t m_stream_id; @@ -189,7 +185,13 @@ public: * * @param stream_list */ - void get_stream_list(std::vector<uint32_t> &stream_list); + void get_id_list(std::vector<uint32_t> &id_list); + + /** + * populate a list with all the stream objects + * + */ + void get_object_list(std::vector<TrexStream *> &object_list); /** * get the table size @@ -197,6 +199,9 @@ public: */ int size(); + std::unordered_map<int, TrexStream *>::iterator begin() {return m_stream_table.begin();} + std::unordered_map<int, TrexStream *>::iterator end() {return m_stream_table.end();} + private: /** * holds all the stream in a hash table by stream id diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp new file mode 100644 index 00000000..5e2602ec --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -0,0 +1,85 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <string.h> +#include <trex_streams_compiler.h> +#include <trex_stream.h> + +/************************************** + * stream compiled object + *************************************/ +TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) { +} + +TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { + for (auto &obj : m_objs) { + delete obj.m_pkt; + } + m_objs.clear(); +} + +void +TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) { + obj_st obj; + + obj.m_port_id = m_port_id; + obj.m_pps = pps * m_mul; + obj.m_pkt_len = pkt_len; + + obj.m_pkt = new uint8_t[pkt_len]; + memcpy(obj.m_pkt, pkt, pkt_len); + + m_objs.push_back(obj); +} + +/************************************** + * stream compiler + *************************************/ +bool +TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) { + /* for now we do something trivial, */ + for (auto stream : streams) { + + /* skip non-enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* for now skip also non self started streams */ + if (!stream->m_self_start) { + continue; + } + + /* for now support only continous ... */ + TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream); + if (!cont_stream) { + continue; + } + + /* add it */ + obj.add_compiled_stream(cont_stream->get_pps(), + cont_stream->m_pkt.binary, + cont_stream->m_pkt.len); + } + + return true; +} + diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h new file mode 100644 index 00000000..06f992ed --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.h @@ -0,0 +1,72 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STREAMS_COMPILER_H__ +#define __TREX_STREAMS_COMPILER_H__ + +#include <stdint.h> +#include <vector> + +class TrexStreamsCompiler; +class TrexStream; + +/** + * compiled object for a table of streams + * + * @author imarom (28-Oct-15) + */ +class TrexStreamsCompiledObj { + friend class TrexStreamsCompiler; +public: + + TrexStreamsCompiledObj(uint8_t port_id, double m_mul); + ~TrexStreamsCompiledObj(); + + struct obj_st { + double m_pps; + uint8_t *m_pkt; + uint16_t m_pkt_len; + uint8_t m_port_id; + }; + + const std::vector<obj_st> & get_objects() { + return m_objs; + } + +private: + void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len); + std::vector<obj_st> m_objs; + + uint8_t m_port_id; + double m_mul; +}; + +class TrexStreamsCompiler { +public: + /** + * compiles a vector of streams to an object passable to the DP + * + * @author imarom (28-Oct-15) + * + */ + bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj); +}; + +#endif /* __TREX_STREAMS_COMPILER_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 3755b82c..306b23d0 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -18,118 +18,165 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - #include <trex_stateless_dp_core.h> -#include <stdio.h> -#include <unistd.h> -#include <trex_stateless.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> +#include <trex_stream_node.h> #include <bp_sim.h> -#ifndef TREX_RPC_MOCK_SERVER +TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) { + m_thread_id = thread_id; + m_core = core; + + CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); -// DPDK c++ issue -#define UINT8_MAX 255 -#define UINT16_MAX 0xFFFF -// DPDK c++ issue -#endif + m_ring_from_cp = cp_dp->getRingCpToDp(thread_id); + m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); -#include <rte_ethdev.h> -#include "mbuf.h" + m_state = STATE_IDLE; +} /** - * TEST + * in idle state loop, the processor most of the time sleeps + * and periodically checks for messages * + * @author imarom (01-Nov-15) */ -static const uint8_t udp_pkt[]={ - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x08,0x00, - - 0x45,0x00,0x00,0x81, - 0xaf,0x7e,0x00,0x00, - 0x12,0x11,0xd9,0x23, - 0x01,0x01,0x01,0x01, - 0x3d,0xad,0x72,0x1b, - - 0x11,0x11, - 0x11,0x11, - - 0x00,0x6d, - 0x00,0x00, - - 0x64,0x31,0x3a,0x61, - 0x64,0x32,0x3a,0x69,0x64, - 0x32,0x30,0x3a,0xd0,0x0e, - 0xa1,0x4b,0x7b,0xbd,0xbd, - 0x16,0xc6,0xdb,0xc4,0xbb,0x43, - 0xf9,0x4b,0x51,0x68,0x33,0x72, - 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, - 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, - 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, - 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, - 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, - 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, - 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, - 0xe7 -}; - -static int -test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) { - - #ifndef TREX_RPC_MOCK_SERVER - rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ; - #else - rte_mempool_t * mp = NULL; - #endif - - rte_mbuf_t *m = rte_pktmbuf_alloc(mp); - if ( unlikely(m==0) ) { - printf("ERROR no packets \n"); - return (-1); +void +TrexStatelessDpCore::idle_state_loop() { + + while (m_state == STATE_IDLE) { + periodic_check_for_cp_messages(); + delay(200); } +} + +/** + * scehduler runs when traffic exists + * it will return when no more transmitting is done on this + * core + * + * @author imarom (01-Nov-15) + */ +void +TrexStatelessDpCore::start_scheduler() { + /* creates a maintenace job using the scheduler */ + CGenNode * node_sync = m_core->create_node() ; + node_sync->m_type = CGenNode::FLOW_SYNC; + node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT; + m_core->m_node_gen.add_node(node_sync); + + double old_offset = 0.0; + m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset); +} + +void +TrexStatelessDpCore::start() { + + while (true) { + idle_state_loop(); + + start_scheduler(); + } +} + +void +TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) { + CGenNodeStateless *node = m_core->create_node_sl(); + + /* add periodic */ + node->m_type = CGenNode::STATELESS_PKT; + node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */; + node->m_flags = 0; + + /* set socket id */ + node->set_socket_id(m_core->m_node_gen.m_socket_id); + + /* build a mbuf from a packet */ + uint16_t pkt_size = pkt_len; + const uint8_t *stream_pkt = pkt; + + node->m_next_time_offset = 1.0 / pps; + node->m_is_stream_active = 1; + + /* allocate const mbuf */ + rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size); + assert(m); + char *p = rte_pktmbuf_append(m, pkt_size); assert(p); - /* set pkt data */ - memcpy(p,pkt,pkt_size); + /* copy the packet */ + memcpy(p,stream_pkt,pkt_size); - rte_mbuf_t *tx_pkts[32]; - tx_pkts[0] = m; - uint8_t nb_pkts = 1; - uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts); - (void)ret; - rte_pktmbuf_free(m); + /* set dir 0 or 1 client or server */ + pkt_dir_t dir = 0; + node->set_mbuf_cache_dir(dir); - return (0); -} + /* TBD repace the mac if req we should add flag */ + m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m); + + /* set the packet as a readonly */ + node->set_cache_mbuf(m); + + /* keep track */ + m_active_nodes.push_back(node); + + /* schedule */ + m_core->m_node_gen.add_node((CGenNode *)node); + + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; -static int -test_inject_udp_pkt(){ - return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt))); } void -TrexStatelessDpCore::test_inject_dummy_pkt() { - test_inject_udp_pkt(); +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) { + for (auto single_stream : obj->get_objects()) { + add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len); + } } -/*************************** - * DP core - * - **************************/ -TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) { +void +TrexStatelessDpCore::stop_traffic(uint8_t port_id) { + /* we cannot remove nodes not from the top of the queue so + for every active node - make sure next time + the scheduler invokes it, it will be free */ + for (auto node : m_active_nodes) { + if (node->m_port_id == port_id) { + node->m_is_stream_active = 0; + } + } + + /* remove all the non active nodes */ + auto pred = std::remove_if(m_active_nodes.begin(), + m_active_nodes.end(), + [](CGenNodeStateless *node) { return (!node->m_is_stream_active); }); + + m_active_nodes.erase(pred, m_active_nodes.end()); + + if (m_active_nodes.size() == 0) { + m_state = STATE_IDLE; + /* stop the scheduler */ + + CGenNode *node = m_core->create_node() ; + + node->m_type = CGenNode::EXIT_SCHED; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_node_gen.m_p_queue.top()->m_time; + + m_core->m_node_gen.add_node(node); + } + } /** - * main function for DP core + * handle a message from CP to DP * */ -void -TrexStatelessDpCore::run() { - printf("\nOn DP core %d\n", m_core_id); - while (true) { - test_inject_dummy_pkt(); - rte_pause(); - } +void +TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) { + msg->handle(this); + delete msg; } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 4b09b752..698cac2f 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -21,23 +21,113 @@ limitations under the License. #ifndef __TREX_STATELESS_DP_CORE_H__ #define __TREX_STATELESS_DP_CORE_H__ -#include <stdint.h> +#include <vector> + +#include <msg_manager.h> +#include <pal_utl.h> + +class TrexStatelessCpToDpMsgBase; +class TrexStatelessDpStart; +class CFlowGenListPerThread; +class CGenNodeStateless; +class TrexStreamsCompiledObj; -/** - * stateless DP core object - * - */ class TrexStatelessDpCore { + public: - TrexStatelessDpCore(uint8_t core_id); + /* states */ + enum state_e { + STATE_IDLE, + STATE_TRANSMITTING + }; + + TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core); + + /** + * launch the stateless DP core code + * + */ + void start(); + + /** + * dummy traffic creator + * + * @author imarom (27-Oct-15) + * + * @param pkt + * @param pkt_len + */ + void start_traffic(TrexStreamsCompiledObj *obj); + + /** + * stop all traffic for this core + * + */ + void stop_traffic(uint8_t port_id); + + /** + * check for and handle messages from CP + * + * @author imarom (27-Oct-15) + */ + void periodic_check_for_cp_messages() { + // doing this inline for performance reasons + + /* fast path */ + if ( likely ( m_ring_from_cp->isEmpty() ) ) { + return; + } - /* starts the DP core run */ - void run(); + while ( true ) { + CGenNode * node = NULL; + if (m_ring_from_cp->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node; + handle_cp_msg(msg); + } + + } private: - void test_inject_dummy_pkt(); - uint8_t m_core_id; + /** + * in idle state loop, the processor most of the time sleeps + * and periodically checks for messages + * + */ + void idle_state_loop(); + + /** + * real job is done when scheduler is launched + * + */ + void start_scheduler(); + + /** + * handles a CP to DP message + * + * @author imarom (27-Oct-15) + * + * @param msg + */ + void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); + + void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len); + + uint8_t m_thread_id; + state_e m_state; + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + + /* holds the current active nodes */ + std::vector<CGenNodeStateless *> m_active_nodes; + + /* pointer to the main object */ + CFlowGenListPerThread *m_core; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ + diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h new file mode 100644 index 00000000..92b428ab --- /dev/null +++ b/src/stateless/dp/trex_stream_node.h @@ -0,0 +1,105 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STREAM_NODE_H__ +#define __TREX_STREAM_NODE_H__ + +#include <bp_sim.h> + +class TrexStatelessDpCore; + +/* this is a event for stateless */ +struct CGenNodeStateless : public CGenNodeBase { +friend class TrexStatelessDpCore; + +private: + void * m_cache_mbuf; + + double m_next_time_offset; + uint8_t m_is_stream_active; + uint8_t m_port_id; + + /* pad to match the size of CGenNode */ + uint8_t m_pad_end[87]; + + +public: + + inline bool is_active() { + return m_is_stream_active; + } + + + /** + * main function to handle an event of a packet tx + * + */ + inline void handle(CFlowGenListPerThread *thread) { + + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + + /* in case of continues */ + m_time += m_next_time_offset; + + /* insert a new event */ + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + } + + void set_socket_id(socket_id_t socket){ + m_socket_id=socket; + } + + socket_id_t get_socket_id(){ + return ( m_socket_id ); + } + + inline void set_mbuf_cache_dir(pkt_dir_t dir){ + if (dir) { + m_flags |=NODE_FLAGS_DIR; + }else{ + m_flags &=~NODE_FLAGS_DIR; + } + } + + inline pkt_dir_t get_mbuf_cache_dir(){ + return ((pkt_dir_t)( m_flags &1)); + } + + + + inline void set_cache_mbuf(rte_mbuf_t * m){ + m_cache_mbuf=(void *)m; + m_flags |= NODE_FLAGS_MBUF_CACHE; + } + + inline rte_mbuf_t * get_cache_mbuf(){ + if ( m_flags &NODE_FLAGS_MBUF_CACHE ) { + return ((rte_mbuf_t *)m_cache_mbuf); + }else{ + return ((rte_mbuf_t *)0); + } + } + + +} __rte_cache_aligned; + +static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)"); + +#endif /* __TREX_STREAM_NODE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp new file mode 100644 index 00000000..3e754649 --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -0,0 +1,53 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include <trex_stateless_messaging.h> +#include <trex_stateless_dp_core.h> +#include <trex_streams_compiler.h> +#include <string.h> + +/************************* + start traffic message + ************************/ +TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) { +} + +TrexStatelessDpStart::~TrexStatelessDpStart() { + if (m_obj) { + delete m_obj; + } +} + +bool +TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { + + dp_core->start_traffic(m_obj); + return true; +} + +/************************* + stop traffic message + ************************/ +bool +TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { + dp_core->stop_traffic(m_port_id); + return true; +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h new file mode 100644 index 00000000..381e146d --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -0,0 +1,86 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_MESSAGING_H__ +#define __TREX_STATELESS_MESSAGING_H__ + +#include <msg_manager.h> + +class TrexStatelessDpCore; +class TrexStreamsCompiledObj; + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessCpToDpMsgBase() { + } + + virtual ~TrexStatelessCpToDpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; +}; + +/** + * a message to start traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpStart(TrexStreamsCompiledObj *obj); + + ~TrexStatelessDpStart(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +private: + TrexStreamsCompiledObj *m_obj; +}; + +/** + * a message to stop traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + +private: + uint8_t m_port_id; +}; + + +#endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stub/trex_stateless_stub.cpp b/src/stub/trex_stateless_stub.cpp new file mode 100644 index 00000000..de56e57a --- /dev/null +++ b/src/stub/trex_stateless_stub.cpp @@ -0,0 +1,22 @@ + +#include <trex_stateless_dp_core.h> + +class CFlowGenListPerThread; +class TrexStatelessCpToDpMsgBase; + +TrexStatelessDpCore::TrexStatelessDpCore(unsigned char, CFlowGenListPerThread*) { + m_thread_id = 0; + m_core = NULL; + + m_state = STATE_IDLE; + + CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); + + m_ring_from_cp = cp_dp->getRingCpToDp(0); + m_ring_to_cp = cp_dp->getRingDpToCp(0); +} + +void TrexStatelessDpCore::start(){} + +void TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase*) {} + |