From c48c89a97ad070b8f79ac746b6b83aab1cc6f177 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 10 Apr 2016 10:15:54 +0300 Subject: multicore scheduling --- .../stl/trex_stl_lib/trex_stl_sim.py | 141 +++++++++++++++++++-- src/stateless/cp/trex_stream.cpp | 3 +- src/stateless/cp/trex_stream.h | 13 +- src/stateless/cp/trex_streams_compiler.cpp | 43 ++++--- src/stateless/dp/trex_stateless_dp_core.cpp | 2 +- src/stateless/dp/trex_stream_node.h | 7 +- 6 files changed, 169 insertions(+), 40 deletions(-) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py index 1d89a599..2f6b1571 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py @@ -22,6 +22,7 @@ from .trex_stl_streams import * from .utils import parsing_opts from .trex_stl_client import STLClient from .utils import pcap +from trex_stl_lib.trex_stl_packet_builder_scapy import RawPcapReader, RawPcapWriter, hexdump from yaml import YAMLError @@ -291,13 +292,13 @@ class STLSim(object): return - print("Mering cores output to a single pcap file...\n") + if not self.silent: + print("Mering cores output to a single pcap file...\n") inputs = ["{0}-{1}".format(self.outfile, index) for index in range(0, self.dp_core_count)] pcap.merge_cap_files(inputs, self.outfile, delete_src = True) - def is_valid_file(filename): if not os.path.isfile(filename): raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename) @@ -421,6 +422,11 @@ def setParserOptions(): action = "store_true", default = False) + group.add_argument("--test_multi_core", + help = "runs the profile with c=1-8", + action = "store_true", + default = False) + return parser @@ -435,6 +441,110 @@ def validate_args (parser, options): parser.error("limit cannot be lower than number of DP cores") +# a more flexible check +def compare_caps (cap1, cap2, max_diff_sec = (5 * 1e-6)): + pkts1 = list(RawPcapReader(cap1)) + pkts2 = list(RawPcapReader(cap2)) + + if len(pkts1) != len(pkts2): + print('{0} contains {1} packets vs. {1} contains {2} packets'.format(cap1, len(pkts1), cap2, len(pkts2))) + return False + + # to be less strict we define equality if all packets from cap1 exists and in cap2 + # and vice versa + # 'exists' means the same packet with abs(TS1-TS2) < 5nsec + # its O(n^2) but who cares, right ? + for i, pkt1 in enumerate(pkts1): + ts1 = float(pkt1[1][0]) + (float(pkt1[1][1]) / 1e6) + found = None + for j, pkt2 in enumerate(pkts2): + ts2 = float(pkt2[1][0]) + (float(pkt2[1][1]) / 1e6) + + if abs(ts1-ts2) > max_diff_sec: + break + + if pkt1[0] == pkt2[0]: + found = j + break + + + if found is None: + print(format_text("cannot find packet #{0} from {1} in {2}\n".format(i, cap1, cap2), 'bold')) + return False + else: + del pkts2[found] + + return True + + + + +# a more strict comparsion 1 <--> 1 +def compare_caps_strict (cap1, cap2, max_diff_sec = (5 * 1e-6)): + pkts1 = list(RawPcapReader(cap1)) + pkts2 = list(RawPcapReader(cap2)) + + if len(pkts1) != len(pkts2): + print('{0} contains {1} packets vs. {1} contains {2} packets'.format(cap1, len(pkts1), cap2, len(pkts2))) + return False + + # a strict check + for pkt1, pkt2, i in zip(pkts1, pkts2, range(1, len(pkts1))): + ts1 = float(pkt1[1][0]) + (float(pkt1[1][1]) / 1e6) + ts2 = float(pkt2[1][0]) + (float(pkt2[1][1]) / 1e6) + + if abs(ts1-ts2) > 0.000005: # 5 nsec + print(format_text("TS error: cap files '{0}', '{1}' differ in cap #{2} - '{3}' vs. '{4}'\n".format(cap1, cap2, i, ts1, ts2), 'bold')) + return False + + if pkt1[0] != pkt2[0]: + print(format_text("RAW error: cap files '{0}', '{1}' differ in cap #{2}\n".format(cap1, cap2, i), 'bold')) + print(hexdump(pkt1[0])) + print("") + print(hexdump(pkt2[0])) + print("") + return False + + return True + +# +def test_multi_core (r, options): + + for core_count in [1, 2, 4, 6, 8]: + r.run(input_list = options.input_file, + outfile = '{0}.cap'.format(core_count), + dp_core_count = core_count, + is_debug = (not options.release), + pkt_limit = options.limit, + mult = options.mult, + duration = options.duration, + mode = 'none', + silent = True, + tunables = options.tunables) + + print("") + + print(format_text("comparing 2 cores to 1 core:\n", 'underline')) + rc = compare_caps('1.cap', '2.cap') + if rc: + print("[Passed]\n") + + print(format_text("comparing 4 cores to 1 core:\n", 'underline')) + rc = compare_caps('1.cap', '4.cap') + if rc: + print("[Passed]\n") + + print(format_text("comparing 6 cores to 1 core:\n", 'underline')) + rc = compare_caps('1.cap', '6.cap') + if rc: + print("[Passed]\n") + + print(format_text("comparing 8 cores to 1 core:\n", 'underline')) + rc = compare_caps('1.cap', '8.cap') + if rc: + print("[Passed]\n") + + def main (args = None): parser = setParserOptions() options = parser.parse_args(args = args) @@ -455,23 +565,28 @@ def main (args = None): mode = 'native' elif options.pkt: mode = 'pkt' + elif options.test_multi_core: + mode = 'test_multi_core' else: mode = 'none' try: r = STLSim(bp_sim_path = options.bp_sim_path, port_id = options.port_id) - r.run(input_list = options.input_file, - outfile = options.output_file, - dp_core_count = options.dp_core_count, - dp_core_index = options.dp_core_index, - is_debug = (not options.release), - pkt_limit = options.limit, - mult = options.mult, - duration = options.duration, - mode = mode, - silent = options.silent, - tunables = options.tunables) + if mode == 'test_multi_core': + test_multi_core(r, options) + else: + r.run(input_list = options.input_file, + outfile = options.output_file, + dp_core_count = options.dp_core_count, + dp_core_index = options.dp_core_index, + is_debug = (not options.release), + pkt_limit = options.limit, + mult = options.mult, + duration = options.duration, + mode = mode, + silent = options.silent, + tunables = options.tunables) except KeyboardInterrupt as e: print("\n\n*** Caught Ctrl + C... Exiting...\n\n") diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 6e4478d6..4325858c 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -135,7 +135,8 @@ TrexStream::TrexStream(uint8_t type, m_enabled = false; m_self_start = false; - m_delay_next_stream_sec = 0; + m_mc_phase_pre_sec = 0; + m_mc_phase_post_sec = 0; m_pkt.binary = NULL; m_pkt.len = 0; diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index e4ce914e..1914026b 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -401,6 +401,7 @@ public: set_multi_burst(burst_total_pkts,1,0.0); } + /* create new stream */ TrexStream * clone(bool full = false) const { @@ -422,7 +423,11 @@ public: } dp->m_isg_usec = m_isg_usec; - dp->m_delay_next_stream_sec = m_delay_next_stream_sec; + + /* multi core phase paramters */ + dp->m_mc_phase_pre_sec = m_mc_phase_pre_sec; + dp->m_mc_phase_post_sec = m_mc_phase_post_sec; + dp->m_next_stream_id = m_next_stream_id; dp->m_enabled = m_enabled; @@ -457,7 +462,7 @@ public: return ( (m_burst_total_pkts / get_pps()) * 1000 * 1000); } - double get_ipg() { + double get_ipg_sec() { return (1.0 / get_pps()); } @@ -502,7 +507,9 @@ public: /* config fields */ - double m_delay_next_stream_sec; + double m_mc_phase_pre_sec; + double m_mc_phase_post_sec; + double m_isg_usec; int m_next_stream_id; diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index da2b9c9b..c4ae0bed 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -586,6 +586,11 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count); int burst_remainder = (stream->m_burst_total_pkts % dp_core_count); + bool has_remainder = (burst_remainder > 0); + int remainder_left = burst_remainder; + + double base_ipg_sec = factor * stream->get_ipg_sec(); + /* for each core - creates its own version of the stream */ for (uint8_t i = 0; i < dp_core_count; i++) { @@ -594,30 +599,32 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, /* fix stream ID */ dp_stream->fix_dp_stream_id(new_id, new_next_id); + /* some phase */ + dp_stream->m_mc_phase_pre_sec = base_ipg_sec * i; + /* each core gets a share of the packets */ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts; + + /* rate is slower * dp_core_count */ + dp_stream->update_rate_factor(factor / dp_core_count); + - /* core 0 also gets the remainder */ - if (i == 0) { - dp_stream->m_burst_total_pkts += burst_remainder; - } - - /* for continous the rate is divided by the cores */ - if (stream->m_type == TrexStream::stCONTINUOUS) { - dp_stream->update_rate_factor(factor / dp_core_count); + /* allocate the rest of the packets */ + if (has_remainder) { + if (remainder_left > 0) { + dp_stream->m_burst_total_pkts++; + remainder_left--; + dp_stream->m_mc_phase_post_sec = base_ipg_sec * remainder_left; + } else { + /* a delay slot if no packets left */ + dp_stream->m_mc_phase_post_sec = base_ipg_sec * (dp_core_count - 1 - i + burst_remainder); + } + } else { - /* rate is according to the share of the packetes the core got */ - dp_stream->update_rate_factor(factor * (dp_stream->m_burst_total_pkts / double(stream->m_burst_total_pkts))); + /* if no remainder (or continous) simply add a reverse phase */ + dp_stream->m_mc_phase_post_sec = base_ipg_sec * (dp_core_count - 1 - i); } - - - //dp_stream->m_pkt.binary[14 + 20] = 0; - //dp_stream->m_pkt.binary[14 + 21] = i; - - /* some phase */ - dp_stream->m_isg_usec += (stream->get_ipg() * i) * 1e6; - dp_stream->m_delay_next_stream_sec = stream->get_ipg() * (dp_core_count - 1 - i); core_streams[i] = dp_stream; } diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 42ff9e24..2a4a384b 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -602,7 +602,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, node->m_state =CGenNodeStateless::ss_INACTIVE; } - node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec); + node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec) + stream->m_mc_phase_pre_sec; pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id); node->m_flags = 0; diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index fa6fd8bd..d756ba43 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -144,7 +144,7 @@ public: /* we restart the stream, schedule it using stream isg */ inline void update_refresh_time(double cur_time){ - m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec); + m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec) + m_ref_stream_info->m_mc_phase_pre_sec; } inline bool is_mask_for_free(){ @@ -236,8 +236,7 @@ public: set_state(CGenNodeStateless::ss_INACTIVE); if ( thread->set_stateless_next_node(this,m_next_stream) ){ /* update the next stream time using isg */ - //m_next_stream->update_refresh_time(m_time + m_next_time_offset); - m_next_stream->update_refresh_time(m_time + m_ref_stream_info->m_delay_next_stream_sec); + m_next_stream->update_refresh_time(m_time + m_ref_stream_info->m_mc_phase_post_sec); thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); }else{ @@ -246,7 +245,7 @@ public: } }else{ - m_time += get_multi_ibg_sec(); + m_time += get_multi_ibg_sec() + m_ref_stream_info->m_mc_phase_post_sec + m_ref_stream_info->m_mc_phase_pre_sec; m_single_burst = m_single_burst_refill; thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } -- cgit 1.2.3-korg