From 0901331fc21088307fc4a264d5b38089a1ce7f1a Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 22 Dec 2015 02:44:01 -0500 Subject: support for VM split - 1st phase --- linux/ws_main.py | 1 + linux_dpdk/ws_main.py | 1 + scripts/exp/stl_vm_split_flow_var_1.erf-0-ex.erf | Bin 0 -> 11000 bytes scripts/exp/stl_vm_split_flow_var_1.erf-0.erf | Bin 0 -> 11000 bytes src/gtest/trex_stateless_gtest.cpp | 158 +++++++++++------------ src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 3 +- src/stateless/cp/trex_stream.cpp | 31 ++--- src/stateless/cp/trex_stream.h | 77 +++++------ src/stateless/cp/trex_stream_vm.cpp | 59 ++++++++- src/stateless/cp/trex_stream_vm.h | 133 +++++++++++++++---- src/stateless/cp/trex_streams_compiler.cpp | 31 +++-- src/stateless/cp/trex_streams_compiler.h | 2 - src/stateless/cp/trex_vm_splitter.cpp | 146 +++++++++++++++++++++ src/stateless/cp/trex_vm_splitter.h | 54 ++++++++ src/stateless/dp/trex_stateless_dp_core.cpp | 24 ++-- 15 files changed, 519 insertions(+), 201 deletions(-) create mode 100644 scripts/exp/stl_vm_split_flow_var_1.erf-0-ex.erf create mode 100644 scripts/exp/stl_vm_split_flow_var_1.erf-0.erf create mode 100644 src/stateless/cp/trex_vm_splitter.cpp create mode 100644 src/stateless/cp/trex_vm_splitter.h diff --git a/linux/ws_main.py b/linux/ws_main.py index b0499842..deaeeb27 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -153,6 +153,7 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', 'cp/trex_streams_compiler.cpp', + 'cp/trex_vm_splitter.cpp', 'cp/trex_dp_port_events.cpp', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp', diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 8c7c02ab..4fd30d4e 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -162,6 +162,7 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', 'cp/trex_streams_compiler.cpp', + 'cp/trex_vm_splitter.cpp', 'cp/trex_dp_port_events.cpp', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp' diff --git a/scripts/exp/stl_vm_split_flow_var_1.erf-0-ex.erf b/scripts/exp/stl_vm_split_flow_var_1.erf-0-ex.erf new file mode 100644 index 00000000..cc612d1d Binary files /dev/null and b/scripts/exp/stl_vm_split_flow_var_1.erf-0-ex.erf differ diff --git a/scripts/exp/stl_vm_split_flow_var_1.erf-0.erf b/scripts/exp/stl_vm_split_flow_var_1.erf-0.erf new file mode 100644 index 00000000..cc612d1d Binary files /dev/null and b/scripts/exp/stl_vm_split_flow_var_1.erf-0.erf differ diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 1626ac25..e7564350 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -181,9 +181,7 @@ TEST_F(basic_vm, vm1) { ); vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -206,9 +204,7 @@ TEST_F(basic_vm, vm2) { ); //vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -284,9 +280,7 @@ TEST_F(basic_vm, vm3) { ); //vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -369,9 +363,7 @@ TEST_F(basic_vm, vm4) { ); //vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -466,9 +458,7 @@ TEST_F(basic_vm, vm5) { vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -608,9 +598,7 @@ TEST_F(basic_vm, vm6) { vm.add_instruction( new StreamVmInstructionFixChecksumIpv4(14) ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -675,10 +663,7 @@ TEST_F(basic_vm, vm7) { vm.add_instruction( new StreamVmInstructionWriteToPkt( "cl1.port",34, 0,true) ); - - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -742,9 +727,7 @@ TEST_F(basic_vm, vm8) { ); - vm.set_packet_size(128); - - vm.compile(); + vm.compile(128); uint32_t program_size=vm.get_dp_instruction_buffer()->get_program_size(); @@ -805,11 +788,8 @@ static void vm_build_program_seq(StreamVm & vm, vm.add_instruction( new StreamVmInstructionWriteToPkt( "tuple_gen.port",34, 0,true) ); - - vm.set_packet_size(packet_size); - if (should_compile) { - vm.compile(); + vm.compile(packet_size); } } @@ -873,7 +853,7 @@ TEST_F(basic_vm, vm10) { EXPECT_EQ(36,vm.get_max_packet_update_offset()); - StreamVmDp * lpDpVm =vm.cloneAsVmDp(); + StreamVmDp * lpDpVm =vm.generate_dp_object(); EXPECT_EQ(lpDpVm->get_bss_size(),vm.get_bss_size()); @@ -2638,76 +2618,86 @@ TEST_F(basic_stl, graph_generator2) { delete obj; } -/* stress test */ -#if 0 -TEST_F(basic_stl, graph_generator2) { - std::vector streams; - TrexStreamsGraph graph; - TrexStream *stream; +static +void vm_split_test(const char *erf_filename, + TrexStream::STREAM_TYPE stream_type, + double pps, + StreamVmInstructionFlowMan::flow_var_op_e op, + uint8_t dp_core_count, + uint8_t dp_core_to_check) { - /* add some multi burst streams */ - stream = new TrexStream(TrexStream::stMULTI_BURST, 0, 1); - stream->m_enabled = true; - stream->m_self_start = true; - stream->m_isg_usec = 100; - - stream->set_pps(20); - stream->set_multi_burst(4918, 321312, 15); - stream->m_next_stream_id = -1; - stream->m_pkt.len = 64; + TrexStreamsCompiler compile; + std::vector objs; + std::vector streams; - streams.push_back(stream); + TrexStream *stream = new TrexStream(stream_type, 0, 1); - stream = new TrexStream(TrexStream::stMULTI_BURST, 0, 2); - stream->m_enabled = true; - stream->m_self_start = true; - stream->m_isg_usec = 59281; + stream->set_single_burst(pps); + stream->set_pps(pps); - stream->set_pps(30); - stream->set_multi_burst(4918, 51040, 27); - stream->m_next_stream_id = -1; - stream->m_pkt.len = 64; + stream->m_enabled = true; + stream->m_self_start = true; - streams.push_back(stream); + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream); + - stream = new TrexStream(TrexStream::stMULTI_BURST, 0, 3); - stream->m_enabled = true; - stream->m_self_start = true; - stream->m_isg_usec = 59281492; + StreamVm &vm = stream->m_vm; - stream->set_pps(40); - stream->set_multi_burst(4918, 412312, 2917); - stream->m_next_stream_id = -1; - stream->m_pkt.len = 64; + StreamVmInstruction *flow_var = new StreamVmInstructionFlowMan("var1", + 1, + op, + 0, + 0, + 255); - streams.push_back(stream); + vm.add_instruction(flow_var); + vm.add_instruction(new StreamVmInstructionWriteToPkt( "var1", 59, 0,true)); + vm.add_instruction(new StreamVmInstructionFixChecksumIpv4(14)); + vm.set_split_instruction(flow_var); + streams.push_back(stream); - /* stream 3 */ - stream = new TrexStream(TrexStream::stCONTINUOUS, 0, 4); - stream->m_enabled = true; - stream->m_self_start = true; + /* compiling for 8 cores */ + assert(compile.compile(0, streams, objs, dp_core_count)); + for (auto stream : streams) { + delete stream; + } - stream->m_isg_usec = 50; - stream->set_pps(30); - stream->m_next_stream_id = -1; - stream->m_pkt.len = 1512; + /* choose one DP object */ + TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(0, 0, objs[dp_core_to_check], 1 /*sec */ ); + objs[dp_core_to_check] = NULL; + /* free all the non used DP objects */ + for (auto obj : objs) { + if (obj) { + delete obj; + } + } - streams.push_back(stream); + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file = erf_filename; - const TrexStreamsGraphObj &obj = graph.generate(streams); - printf("event_count is: %lu, max BPS: %f, max PPS: %f\n", obj.get_events().size(), obj.get_max_bps(), obj.get_max_pps()); + CBasicStl t1; + t1.m_msg = lpStartCmd; + bool res=t1.init(); + EXPECT_EQ_UINT32(1, res?1:0); +} -// for (const TrexStreamsGraphObj::rate_event_st &ev : obj.get_events()) { -// printf("time: %f, diff bps: %f, diff pps: %f\n", ev.time, ev.diff_bps, ev.diff_pps); -// } +TEST_F(basic_stl, vm_split_flow_var_1) { - for (auto stream : streams) { - delete stream; - } -} + vm_split_test("exp/stl_vm_split_flow_var_1.erf", + TrexStream::stSINGLE_BURST, + 1000, + StreamVmInstructionFlowMan::FLOW_VAR_OP_INC, + 8, + 4); + +} -#endif /********************************************* Itay Tests End *************************************/ diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index d8f7e772..e0186493 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -230,8 +230,7 @@ TrexRpcCmdAddStream::parse_vm_instr_flow_var(const Json::Value &inst, TrexStream op_type, init_value, min_value, - max_value - )); + max_value)); } void diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 02f43a3a..976cfa07 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -54,32 +54,19 @@ std::string TrexStream::get_stream_type_str(stream_type_t stream_type){ void -TrexStream::compile() { +TrexStream::vm_compile() { /* in case there are no instructions - nothing to do */ if (m_vm.is_vm_empty()) { - m_has_vm = false; return; } - m_has_vm = true; + /* compile */ + m_vm.compile(m_pkt.len); - m_vm.set_packet_size(m_pkt.len); + /* create DP object */ + m_vm_dp = m_vm.generate_dp_object(); - m_vm.compile(); - - #if 0 - m_vm.Dump(stdout); - #endif - - m_vm_dp = m_vm.cloneAsVmDp(); - - /* calc m_vm_prefix_size which is the size of the writable packet */ - uint16_t max_pkt_offset = m_vm_dp->get_max_packet_update_offset(); - uint16_t pkt_size = m_pkt.len; - - /* calculate the mbuf size that we should allocate */ - m_vm_prefix_size = calc_writable_mbuf_size(max_pkt_offset, pkt_size); } @@ -133,8 +120,6 @@ TrexStream::TrexStream(uint8_t type, m_pkt.binary = NULL; m_pkt.len = 0; - m_has_vm = false; - m_vm_prefix_size = 0; m_rx_check.m_enable = false; @@ -150,9 +135,9 @@ TrexStream::~TrexStream() { if (m_pkt.binary) { delete [] m_pkt.binary; } - if ( m_vm_dp ){ - delete m_vm_dp; - m_vm_dp=NULL; + if (m_vm_dp){ + delete m_vm_dp; + m_vm_dp = NULL; } } diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index 720246f6..80368e4c 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -65,7 +65,7 @@ static inline uint16_t get_log2_size(uint16_t size){ * */ static inline uint16_t calc_writable_mbuf_size(uint16_t max_offset_writable, - uint16_t pkt_size){ + uint16_t pkt_size){ if ( pkt_size<=64 ){ return (pkt_size); @@ -177,34 +177,33 @@ public: } /* create new stream */ - TrexStream * clone_as_dp() const { - - TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id); - dp->m_has_vm = m_has_vm; - if (m_vm_dp) { - /* should have vm */ - assert(m_has_vm); - dp->m_vm_dp = m_vm_dp->clone(); - }else{ - dp->m_vm_dp = NULL; - } - dp->m_vm_prefix_size = m_vm_prefix_size; - - dp->m_isg_usec = m_isg_usec; - dp->m_next_stream_id = m_next_stream_id; - - dp->m_enabled = m_enabled; - dp->m_self_start = m_self_start; - - /* deep copy */ - dp->m_pkt.clone(m_pkt.binary,m_pkt.len); - - dp->m_rx_check = m_rx_check; - dp->m_pps = m_pps; - dp->m_burst_total_pkts = m_burst_total_pkts; - dp->m_num_bursts = m_num_bursts; - dp->m_ibg_usec = m_ibg_usec ; - return (dp); + TrexStream * clone() const { + + /* not all fields will be cloned */ + + TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id); + if (m_vm_dp) { + dp->m_vm_dp = m_vm_dp->clone(); + } else { + dp->m_vm_dp = NULL; + } + + dp->m_isg_usec = m_isg_usec; + dp->m_next_stream_id = m_next_stream_id; + + dp->m_enabled = m_enabled; + dp->m_self_start = m_self_start; + + /* deep copy */ + dp->m_pkt.clone(m_pkt.binary,m_pkt.len); + + dp->m_rx_check = m_rx_check; + dp->m_pps = m_pps; + dp->m_burst_total_pkts = m_burst_total_pkts; + dp->m_num_bursts = m_num_bursts; + dp->m_ibg_usec = m_ibg_usec; + + return(dp); } @@ -219,27 +218,20 @@ public: void Dump(FILE *fd); - bool is_vm(){ - return ( m_has_vm ); + StreamVmDp * getDpVm(){ + return (m_vm_dp); } - StreamVmDp * getDpVm(){ - return ( m_vm_dp); - } - - void post_vm_compile(); - /** * internal compilation of stream (for DP) * */ - void compile(); + void vm_compile(); public: /* basic */ uint8_t m_type; uint8_t m_port_id; - uint16_t m_vm_prefix_size; /* writeable mbuf size */ uint32_t m_stream_id; /* id from RPC can be anything */ @@ -250,16 +242,15 @@ public: /* indicators */ bool m_enabled; bool m_self_start; - bool m_has_vm; /* do we have instructions to run */ - StreamVmDp * m_vm_dp; /* compile VM */ + /* VM CP and DP */ + StreamVm m_vm; + StreamVmDp *m_vm_dp; CStreamPktData m_pkt; /* pkt */ - /* VM */ - StreamVm m_vm; /* RX check */ struct { diff --git a/src/stateless/cp/trex_stream_vm.cpp b/src/stateless/cp/trex_stream_vm.cpp index e10e1a81..b086e21b 100644 --- a/src/stateless/cp/trex_stream_vm.cpp +++ b/src/stateless/cp/trex_stream_vm.cpp @@ -593,9 +593,61 @@ void StreamVm::build_bss() { } } +/** + * set the VM split instruction + * instr is a pointer to an instruction inside + * the VM program + * + */ +void +StreamVm::set_split_instruction(StreamVmInstruction *instr) { + if (!instr->is_splitable()) { + throw TrexException("non splitable instruction"); + return; + } + + m_split_instr = instr; +} + +/** + * copy instructions from this VM to 'other' + * + * @author imarom (22-Dec-15) + * + * @param other + */ +void +StreamVm::copy_instructions(StreamVm &other) const { + /* clear previous if any exists */ + for (auto instr : other.m_inst_list) { + delete instr; + } + + other.m_inst_list.clear(); + + for (auto instr : m_inst_list) { + StreamVmInstruction *new_instr = instr->clone(); + other.m_inst_list.push_back(new_instr); + /* for the split instruction - find the right one */ + if (instr == m_split_instr) { + other.m_split_instr = new_instr; + } + } + +} + +/** + * actual work - compile the VM + * + */ +void StreamVm::compile(uint16_t pkt_len) { + + if (is_vm_empty()) { + return; + } -void StreamVm::compile() { + m_pkt_size = pkt_len; /* build flow var offset table */ build_flow_var_table() ; @@ -610,6 +662,11 @@ void StreamVm::compile() { ss << "maximum offset is" << get_max_packet_update_offset() << " bigger than maximum " <get_program(), get_dp_instruction_buffer()->get_program_size(), - get_max_packet_update_offset() + get_max_packet_update_offset(), + get_prefix_size() ); assert(lp); return (lp); } + /** + * clone VM instructions + * + */ + void copy_instructions(StreamVm &other) const; + + bool is_vm_empty() { return (m_inst_list.size() == 0); } @@ -958,14 +1037,20 @@ public: return ( m_max_field_update ); } + uint16_t get_prefix_size() { + return m_prefix_size; + } + bool is_compiled() { + return m_is_compiled; + } /** * compile the VM * return true of success, o.w false * */ - void compile(); + void compile(uint16_t pkt_len); ~StreamVm(); @@ -1002,6 +1087,8 @@ private: void add_field_cnt(uint16_t new_cnt); private: + bool m_is_compiled; + uint16_t m_prefix_size; uint16_t m_pkt_size; uint16_t m_cur_var_offset; uint16_t m_max_field_update; /* the location of the last byte that is going to be changed in the packet */ @@ -1012,6 +1099,8 @@ private: StreamDPVmInstructions m_instructions; + StreamVmInstruction *m_split_instr; + }; #endif /* __TREX_STREAM_VM_API_H__ */ diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index c4900e66..24b14469 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -21,12 +21,16 @@ limitations under the License. #include #include -#include -#include #include -#include #include +#include +#include +#include +#include + + + /** * describes a graph node in the pre compile check * @@ -175,7 +179,7 @@ TrexStreamsCompiledObj::clone() { * clone each element */ for (auto obj : m_objs) { - TrexStream *new_stream = obj.m_stream->clone_as_dp(); + TrexStream *new_stream = obj.m_stream->clone(); new_compiled_obj->add_compiled_stream(new_stream); } @@ -463,19 +467,16 @@ TrexStreamsCompiler::compile_stream(const TrexStream *stream, new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id; } + std::vector core_streams(dp_core_count); + /* calculate rate */ double per_core_rate = (stream->m_pps * (factor / dp_core_count)); int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count); - /* compile VM */ - /* fix this const away problem */ - ((TrexStream *)stream)->compile(); - - std::vector per_core_streams(dp_core_count); /* for each core - creates its own version of the stream */ for (uint8_t i = 0; i < dp_core_count; i++) { - TrexStream *dp_stream = stream->clone_as_dp(); + TrexStream *dp_stream = stream->clone(); /* fix stream ID */ dp_stream->fix_dp_stream_id(new_id, new_next_id); @@ -485,16 +486,20 @@ TrexStreamsCompiler::compile_stream(const TrexStream *stream, dp_stream->m_pps = per_core_rate; dp_stream->m_burst_total_pkts = per_core_burst_total_pkts; - per_core_streams[i] = dp_stream; + core_streams[i] = dp_stream; } /* take care of remainder from a burst */ int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count); - per_core_streams[0]->m_burst_total_pkts += burst_remainder; + core_streams[0]->m_burst_total_pkts += burst_remainder; + + /* handle VM (split if needed) */ + TrexVmSplitter vm_splitter; + vm_splitter.split( (TrexStream *)stream, core_streams); /* attach the compiled stream of every core to its object */ for (uint8_t i = 0; i < dp_core_count; i++) { - objs[i]->add_compiled_stream(per_core_streams[i]); + objs[i]->add_compiled_stream(core_streams[i]); } diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index d2b0cd1d..4b61dcfa 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -125,8 +125,6 @@ private: std::vector &objs, GraphNodeMap &nodes); - void compile_stream_vm(TrexStream *stream); - std::vector m_warnings; }; diff --git a/src/stateless/cp/trex_vm_splitter.cpp b/src/stateless/cp/trex_vm_splitter.cpp new file mode 100644 index 00000000..489e3b75 --- /dev/null +++ b/src/stateless/cp/trex_vm_splitter.cpp @@ -0,0 +1,146 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include + +void +TrexVmSplitter::split(TrexStream *stream, std::vector core_streams) { + + /* nothing to do if no VM */ + if (stream->m_vm.is_vm_empty()) { + return; + } + + /* prepare some vars */ + m_dp_core_count = core_streams.size(); + m_core_streams = &core_streams; + m_stream = stream; + + /* if we cannot split - compile the main and duplicate */ + bool rc = split_internal(); + if (!rc) { + + /* compile the stream and simply clone it to all streams */ + m_stream->vm_compile(); + + /* for every core - simply clone the DP object */ + for (TrexStream *core_stream : *m_core_streams) { + core_stream->m_vm_dp = m_stream->m_vm_dp->clone(); + } + + /* no need for the reference stream DP object */ + delete m_stream->m_vm_dp; + m_stream->m_vm_dp = NULL; + } +} + +bool +TrexVmSplitter::split_internal() { + + const StreamVmInstruction *split_instr = m_stream->m_vm.get_split_instruction(); + if (split_instr == NULL) { + return false; + } + + if (split_instr->get_instruction_type() == StreamVmInstruction::itFLOW_MAN) { + return split_by_flow_var( (const StreamVmInstructionFlowMan *)split_instr ); + + } else if (split_instr->get_instruction_type() == StreamVmInstruction::itFLOW_CLIENT) { + return split_by_flow_client_var( (const StreamVmInstructionFlowClient *)split_instr ); + + } else { + throw TrexException("VM splitter : cannot split by instruction which is not flow var or flow client var"); + } + +} + +/** + * split VM by flow var + * + * @author imarom (20-Dec-15) + * + * @param instr + * + * @return bool + */ +bool +TrexVmSplitter::split_by_flow_var(const StreamVmInstructionFlowMan *instr) { + /* no point in splitting random */ + if (instr->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM) { + return false; + } + + /* if the range is too small - multiply */ + if (instr->get_range() < m_dp_core_count) { + // FIXME + return false; + } + + /* we need to split - duplicate VM now */ + duplicate_vm(); + + /* calculate range splitting */ + uint64_t range = instr->get_range(); + + uint64_t range_part = range / m_dp_core_count; + uint64_t leftover = range % m_dp_core_count; + + /* first core handles a bit more */ + uint64_t start = instr->m_min_value; + uint64_t end = start + range_part + leftover - 1; + + + /* do work */ + for (TrexStream *core_stream : *m_core_streams) { + + /* get the per-core instruction to split */ + StreamVmInstructionFlowMan *per_core_instr = (StreamVmInstructionFlowMan *)core_stream->m_vm.get_split_instruction(); + + per_core_instr->m_min_value = start; + per_core_instr->m_max_value = end; + + /* init value is the max value because the VM program's first iteration */ + per_core_instr->m_init_value = end; + + core_stream->vm_compile(); + + start = end + 1; + end = start + range_part - 1; + } + + return true; +} + + +bool +TrexVmSplitter::split_by_flow_client_var(const StreamVmInstructionFlowClient *instr) { + return false; +} + + +void +TrexVmSplitter::duplicate_vm() { + /* for each core - duplicate the instructions */ + for (TrexStream *core_stream : *m_core_streams) { + m_stream->m_vm.copy_instructions(core_stream->m_vm); + } +} diff --git a/src/stateless/cp/trex_vm_splitter.h b/src/stateless/cp/trex_vm_splitter.h new file mode 100644 index 00000000..37c61599 --- /dev/null +++ b/src/stateless/cp/trex_vm_splitter.h @@ -0,0 +1,54 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_VM_SPLITTER_H__ +#define __TREX_VM_SPLITTER_H__ + +#include + + +class TrexVmSplitter { + +public: + + TrexVmSplitter() { + m_dp_core_count = 0; + } + + /** + * split a stream's VM to per core streams + */ + void split(TrexStream *stream, std::vector core_streams); + + +private: + bool split_internal(); + bool split_by_flow_var(const StreamVmInstructionFlowMan *instr); + bool split_by_flow_client_var(const StreamVmInstructionFlowClient *instr); + + void duplicate_vm(); + + TrexStream *m_stream; + std::vector *m_core_streams; + uint8_t m_dp_core_count; +}; + + +#endif /* __TREX_VM_SPLITTER_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 585ff2c7..e0378cfb 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -470,7 +470,8 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, node->m_cache_mbuf=0; node->m_type = CGenNode::STATELESS_PKT; - node->m_ref_stream_info = stream->clone_as_dp(); + /* clone the stream from control plane memory to DP memory */ + node->m_ref_stream_info = stream->clone(); node->m_next_stream=0; /* will be fixed later */ @@ -539,7 +540,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, node->set_mbuf_cache_dir(dir); - if (stream->is_vm() == false ) { + if (node->m_ref_stream_info->getDpVm() == NULL) { /* no VM */ node->m_vm_flow_var = NULL; @@ -569,15 +570,15 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, StreamVmDp * lpDpVm = local_mem_stream->getDpVm(); - node->m_vm_flow_var = lpDpVm->clone_bss(); /* clone the flow var */ - node->m_vm_program = lpDpVm->get_program(); /* same ref to the program */ - node->m_vm_program_size =lpDpVm->get_program_size(); + node->m_vm_flow_var = lpDpVm->clone_bss(); /* clone the flow var */ + node->m_vm_program = lpDpVm->get_program(); /* same ref to the program */ + node->m_vm_program_size = lpDpVm->get_program_size(); /* we need to copy the object */ - if ( pkt_size > stream->m_vm_prefix_size ) { + if ( pkt_size > lpDpVm->get_prefix_size() ) { /* we need const packet */ - uint16_t const_pkt_size = pkt_size - stream->m_vm_prefix_size ; + uint16_t const_pkt_size = pkt_size - lpDpVm->get_prefix_size() ; rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size ); assert(m); @@ -585,17 +586,18 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, assert(p); /* copy packet data */ - memcpy(p,(stream_pkt+ stream->m_vm_prefix_size),const_pkt_size); + memcpy(p,(stream_pkt + lpDpVm->get_prefix_size()),const_pkt_size); node->set_const_mbuf(m); } - if (stream->m_vm_prefix_size > pkt_size ) { - stream->m_vm_prefix_size = pkt_size; + if (lpDpVm->get_prefix_size() > pkt_size ) { + lpDpVm->set_prefix_size(pkt_size); } + /* copy the headr */ - uint16_t header_size = stream->m_vm_prefix_size; + uint16_t header_size = lpDpVm->get_prefix_size(); assert(header_size); node->alloc_prefix_header(header_size); uint8_t *p=node->m_original_packet_data_prefix; -- cgit 1.2.3-korg