diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/common/basic_utils.cpp | 44 | ||||
-rwxr-xr-x | src/common/basic_utils.h | 6 | ||||
-rwxr-xr-x | src/common/pcap.h | 5 | ||||
-rw-r--r-- | src/flow_stat.cpp | 4 | ||||
-rw-r--r-- | src/internal_api/trex_platform_api.h | 11 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 90 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 293 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmds.h | 20 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmds_table.cpp | 5 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 70 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 71 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 47 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 130 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 327 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 106 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 157 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 253 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 411 | ||||
-rw-r--r-- | src/trex_port_attr.cpp | 77 | ||||
-rwxr-xr-x | src/trex_port_attr.h | 192 |
20 files changed, 1937 insertions, 382 deletions
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp index f169c29f..dfd3b183 100755 --- a/src/common/basic_utils.cpp +++ b/src/common/basic_utils.cpp @@ -20,6 +20,10 @@ limitations under the License. #include <sstream> #include <sys/resource.h> +#include "pal_utl.h" + +int my_inet_pton4(const char *src, unsigned char *dst); + bool utl_is_file_exists (const std::string& name) { if (FILE *file = fopen(name.c_str(), "r")) { fclose(file); @@ -190,6 +194,26 @@ void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) { } +std::string utl_macaddr_to_str(const uint8_t *macaddr) { + std::string tmp; + utl_macaddr_to_str(macaddr, tmp); + + return tmp; +} + +bool utl_str_to_macaddr(const std::string &s, uint8_t *mac) { + int last = -1; + int rc = sscanf(s.c_str(), "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%n", + mac + 0, mac + 1, mac + 2, mac + 3, mac + 4, mac + 5, + &last); + + if ( (rc != 6) || (s.size() != last) ) { + return false; + } + + return true; +} + /** * generate a random connection handler * @@ -248,3 +272,23 @@ void utl_set_coredump_size(long size, bool map_huge_pages) { fprintf(fp, "%08x\n", mask); fclose(fp); } + +uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) { + + uint32_t tmp; + + int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp); + if (!rc) { + return (0); + } + + ipv4_num = PAL_NTOHL(tmp); + + return (1); +} + +std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) { + std::stringstream ss; + ss << ((ipv4_addr >> 24) & 0xff) << "." << ((ipv4_addr >> 16) & 0xff) << "." << ((ipv4_addr >> 8) & 0xff) << "." << (ipv4_addr & 0xff); + return ss.str(); +} diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h index f6250a2b..ab0ff1ec 100755 --- a/src/common/basic_utils.h +++ b/src/common/basic_utils.h @@ -86,6 +86,9 @@ bool utl_is_file_exists (const std::string& name) ; void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output); +std::string utl_macaddr_to_str(const uint8_t *macaddr); +bool utl_str_to_macaddr(const std::string &s, uint8_t *mac); + std::string utl_generate_random_str(unsigned int &seed, int len); /** @@ -98,6 +101,9 @@ std::string utl_generate_random_str(unsigned int &seed, int len); */ void utl_set_coredump_size(long size, bool map_huge_pages = false); +uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num); +std::string utl_uint32_to_ipv4(uint32_t ipv4_addr); + #endif diff --git a/src/common/pcap.h b/src/common/pcap.h index 3f8dfd21..c9139e4c 100755 --- a/src/common/pcap.h +++ b/src/common/pcap.h @@ -1,5 +1,5 @@ -#ifndef __LIBPCAP_H__ -#define __LIBPCAP_H__ +#ifndef __TREX_LIBPCAP_H__ +#define __TREX_LIBPCAP_H__ /* Copyright (c) 2015-2015 Cisco Systems, Inc. @@ -151,4 +151,5 @@ private: bool m_is_open; uint32_t m_pkt_count; }; + #endif diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index 84be590f..dae29795 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -968,9 +968,9 @@ void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) { TrexStatelessCpToRxMsgBase *msg; if (is_start) { - msg = new TrexStatelessRxStartMsg(); + msg = new TrexStatelessRxEnableLatency(); } else { - msg = new TrexStatelessRxStopMsg(); + msg = new TrexStatelessRxDisableLatency(); } m_ring_to_rx->Enqueue((CGenNode *)msg); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 631f9a3e..5723503c 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -28,6 +28,7 @@ limitations under the License. #include <string.h> #include "flow_stat_parser.h" #include "trex_defs.h" +#include "trex_stateless_rx_defs.h" #include "trex_port_attr.h" #include <json/json.h> @@ -112,19 +113,13 @@ public: IF_STAT_RX_BYTES_COUNT = 8, // Card support counting rx bytes }; - struct mac_cfg_st { - uint8_t hw_macaddr[6]; - uint8_t src_macaddr[6]; - uint8_t dst_macaddr[6]; - }; - /** * interface static info * */ struct intf_info_st { std::string driver_name; - mac_cfg_st mac_info; + uint8_t hw_macaddr[6]; std::string pci_addr; int numa_node; bool has_crc; @@ -234,7 +229,7 @@ public: info.has_crc = true; info.numa_node = 0; - memset(&info.mac_info, 0, sizeof(info.mac_info)); + memset(&info.hw_macaddr, 0, sizeof(info.hw_macaddr)); } virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const { diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index ab154b67..e799a5bd 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -316,7 +316,7 @@ public: | TrexPlatformApi::IF_STAT_PAYLOAD; } virtual CFlowStatParser *get_flow_stat_parser(); - virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return 0;} + virtual int set_rcv_all(CPhyEthIF * _if, bool set_on) {return -ENOTSUP;} }; class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G { @@ -1624,6 +1624,7 @@ int DpdkTRexPortAttr::set_led(bool on){ int DpdkTRexPortAttr::get_flow_ctrl(int &mode) { int ret = rte_eth_dev_flow_ctrl_get(m_port_id, &fc_conf_tmp); if (ret) { + mode = -1; return ret; } mode = (int) fc_conf_tmp.mode; @@ -1720,12 +1721,20 @@ bool DpdkTRexPortAttr::update_link_status_nowait(){ rte_eth_link new_link; bool changed = false; rte_eth_link_get_nowait(m_port_id, &new_link); + + /* if the link got down - update the dest atribute to move to unresolved */ + if (new_link.link_status != m_link.link_status) { + get_dest().on_link_down(); + changed = true; + } + + /* other changes */ if (new_link.link_speed != m_link.link_speed || new_link.link_duplex != m_link.link_duplex || - new_link.link_autoneg != m_link.link_autoneg || - new_link.link_status != m_link.link_status) { + new_link.link_autoneg != m_link.link_autoneg) { changed = true; } + m_link = new_link; return changed; } @@ -1767,7 +1776,7 @@ bool DpdkTRexPortAttr::get_promiscuous(){ } -void DpdkTRexPortAttr::macaddr_get(struct ether_addr *mac_addr){ +void DpdkTRexPortAttr::get_hw_src_mac(struct ether_addr *mac_addr){ rte_eth_macaddr_get(m_port_id , mac_addr); } @@ -3217,6 +3226,7 @@ void CGlobalTRex::pre_test() { exit(1); } memcpy(CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest, mac, ETHER_ADDR_LEN); + // if port is connected in loopback, no need to send gratuitous ARP. It will only confuse our ingress counters. if (pretest.is_loopback(port_id)) CGlobalInfo::m_options.m_ip_cfg[port_id].set_grat_arp_needed(false); @@ -3229,6 +3239,18 @@ void CGlobalTRex::pre_test() { // Configure port back to normal mode. Only relevant packets handled by software. CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(pif, false); + + + /* set resolved IPv4 */ + uint32_t dg = CGlobalInfo::m_options.m_ip_cfg[port_id].get_def_gw(); + const uint8_t *dst_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest; + if (dg) { + m_ports[port_id].get_port_attr()->get_dest().set_dest_ipv4(dg, dst_mac); + } else { + m_ports[port_id].get_port_attr()->get_dest().set_dest_mac(dst_mac); + } + + } } @@ -4293,18 +4315,9 @@ CGlobalTRex:: publish_async_port_attr_changed(uint8_t port_id) { Json::Value data; data["port_id"] = port_id; TRexPortAttr * _attr = m_ports[port_id].get_port_attr(); - - /* attributes */ - data["attr"]["speed"] = _attr->get_link_speed(); - data["attr"]["promiscuous"]["enabled"] = _attr->get_promiscuous(); - data["attr"]["link"]["up"] = _attr->is_link_up(); - int mode; - int ret = _attr->get_flow_ctrl(mode); - if (ret != 0) { - mode = -1; - } - data["attr"]["fc"]["mode"] = mode; - + + _attr->to_json(data["attr"]); + m_zmq_publisher.publish_event(TrexPublisher::EVENT_PORT_ATTR_CHANGED, data); } @@ -4808,6 +4821,18 @@ bool CPhyEthIF::Create(uint8_t portid) { m_last_tx_pps = 0.0; m_port_attr = g_trex.m_drv->create_port_attr(portid); + + uint32_t src_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_ip(); + if (src_ipv4) { + m_port_attr->set_src_ipv4(src_ipv4); + } + + /* for now set as unresolved IPv4 destination */ + uint32_t dest_ipv4 = CGlobalInfo::m_options.m_ip_cfg[m_port_id].get_def_gw(); + if (dest_ipv4) { + m_port_attr->get_dest().set_dest_ipv4(dest_ipv4); + } + return true; } @@ -5580,7 +5605,7 @@ CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() { // in 1G we need to wait if links became ready to soon void CTRexExtendedDriverBase1G::wait_after_link_up(){ - wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic); + //wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic); } int CTRexExtendedDriverBase1G::wait_for_stable_link(){ @@ -6993,19 +7018,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info /* mac INFO */ /* hardware */ - g_trex.m_ports[interface_id].get_port_attr()->macaddr_get(&rte_mac_addr); + g_trex.m_ports[interface_id].get_port_attr()->get_hw_src_mac(&rte_mac_addr); assert(ETHER_ADDR_LEN == 6); - /* software */ - uint8_t sw_macaddr[12]; - memcpy(sw_macaddr, CGlobalInfo::m_options.get_dst_src_mac_addr(interface_id), 12); - - for (int i = 0; i < 6; i++) { - info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i]; - info.mac_info.dst_macaddr[i] = sw_macaddr[i]; - info.mac_info.src_macaddr[i] = sw_macaddr[6 + i]; - - } + memcpy(info.hw_macaddr, rte_mac_addr.addr_bytes, 6); info.numa_node = g_trex.m_ports[interface_id].m_dev_info.pci_dev->numa_node; struct rte_pci_addr *loc = &g_trex.m_ports[interface_id].m_dev_info.pci_dev->addr; @@ -7111,6 +7127,21 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const { return g_trex.m_ports[port_id].get_port_attr(); } + +int DpdkTRexPortAttr::set_rx_filter_mode(rx_filter_mode_e rx_filter_mode) { + + CPhyEthIF *_if = &g_trex.m_ports[m_port_id]; + bool recv_all = (rx_filter_mode == RX_FILTER_MODE_ALL); + int rc = CTRexExtendedDriverDb::Ins()->get_drv()->set_rcv_all(_if, recv_all); + if (rc != 0) { + return (rc); + } + + m_rx_filter_mode = rx_filter_mode; + + return (0); +} + /** * marks the control plane for a total server shutdown * @@ -7119,3 +7150,6 @@ TRexPortAttr *TrexDpdkPlatformApi::getPortAttrObj(uint8_t port_id) const { void TrexDpdkPlatformApi::mark_for_shutdown() const { g_trex.mark_for_shutdown(CGlobalTRex::SHUTDOWN_RPC_REQ); } + + + diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 109cc1a4..14b38165 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -27,6 +27,8 @@ limitations under the License. #include <internal_api/trex_platform_api.h> +#include "trex_stateless_rx_core.h" + #include <fstream> #include <iostream> #include <unistd.h> @@ -289,19 +291,15 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { section["ports"] = Json::arrayValue; for (int i = 0; i < main->get_port_count(); i++) { - uint32_t speed; string driver; - string hw_macaddr; - string src_macaddr; - string dst_macaddr; string pci_addr; string description; supp_speeds_t supp_speeds; int numa; TrexStatelessPort *port = main->get_port_by_id(i); - port->get_properties(driver, speed); - port->get_macaddr(hw_macaddr, src_macaddr, dst_macaddr); + + port->get_properties(driver); port->get_pci_info(pci_addr, numa); main->get_platform_api()->getPortAttrObj(i)->get_description(description); @@ -311,12 +309,9 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { section["ports"][i]["driver"] = driver; section["ports"][i]["description"] = description; - section["ports"][i]["hw_macaddr"] = hw_macaddr; - section["ports"][i]["src_macaddr"] = src_macaddr; - section["ports"][i]["dst_macaddr"] = dst_macaddr; - section["ports"][i]["pci_addr"] = pci_addr; - section["ports"][i]["numa"] = numa; + section["ports"][i]["pci_addr"] = pci_addr; + section["ports"][i]["numa"] = numa; uint16_t caps = port->get_rx_caps(); section["ports"][i]["rx"]["caps"] = Json::arrayValue; @@ -330,7 +325,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { section["ports"][i]["rx"]["caps"].append("rx_bytes"); } section["ports"][i]["rx"]["counters"] = port->get_rx_count_num(); - section["ports"][i]["speed"] = (uint16_t) speed / 1000; section["ports"][i]["is_fc_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_fc_change_supported(); section["ports"][i]["is_led_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_led_change_supported(); section["ports"][i]["is_link_supported"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(i)->is_link_change_supported(); @@ -345,6 +339,68 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } + +int +TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) { + const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result); + + rx_filter_mode_e filter_mode; + if (type == "hw") { + filter_mode = RX_FILTER_MODE_HW; + } else if (type == "all") { + filter_mode = RX_FILTER_MODE_ALL; + } else { + assert(0); + } + + return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode); +} + +int +TrexRpcCmdSetPortAttr::parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result) { + + const std::string ipv4_str = parse_string(msg, "addr", result); + + uint32_t ipv4_addr; + if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) { + std::stringstream ss; + ss << "invalid IPv4 address: '" << ipv4_str << "'"; + generate_parse_err(result, ss.str()); + } + + get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_src_ipv4(ipv4_addr); + return (0); +} + +int +TrexRpcCmdSetPortAttr::parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result) { + + /* can be either IPv4 or MAC */ + const std::string addr = parse_string(msg, "addr", result); + + TRexPortAttr *port_attr = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id); + + /* try IPv4 */ + uint32_t ipv4_addr; + uint8_t mac[6]; + + if (utl_ipv4_to_uint32(addr.c_str(), ipv4_addr)) { + port_attr->get_dest().set_dest_ipv4(ipv4_addr); + + } else if (utl_str_to_macaddr(addr, mac)) { + port_attr->get_dest().set_dest_mac(mac); + + } else { + std::stringstream ss; + ss << "'dest' is not an IPv4 address or a MAC address: '" << addr << "'"; + generate_parse_err(result, ss.str()); + } + + + return (0); +} + + /** * set port commands * @@ -361,46 +417,64 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); const Json::Value &attr = parse_object(params, "attr", result); + int ret = 0; - bool changed = false; + /* iterate over all attributes in the dict */ for (const std::string &name : attr.getMemberNames()) { + if (name == "promiscuous") { bool enabled = parse_bool(attr[name], "enabled", result); ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_promiscuous(enabled); } + else if (name == "link_status") { bool up = parse_bool(attr[name], "up", result); ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_link_up(up); } + else if (name == "led_status") { bool on = parse_bool(attr[name], "on", result); ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_led(on); - } else if (name == "flow_ctrl_mode") { + } + + else if (name == "flow_ctrl_mode") { int mode = parse_int(attr[name], "mode", result); ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode); - } else { - generate_execute_err(result, "Not recognized attribute: " + name); - break; } - if (ret != 0){ - if ( ret == -ENOTSUP ) { - generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC."); - } - else if (ret) { - generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret)); - } + + else if (name == "rx_filter_mode") { + const Json::Value &rx = parse_object(attr, name, result); + ret = parse_rx_filter_mode(rx, port_id, result); + } + + else if (name == "ipv4") { + const Json::Value &ipv4 = parse_object(attr, name, result); + ret = parse_ipv4(ipv4, port_id, result); + } + + else if (name == "dest") { + const Json::Value &dest = parse_object(attr, name, result); + ret = parse_dest(dest, port_id, result); + } + + /* unknown attribute */ + else { + generate_execute_err(result, "unknown attribute type: '" + name + "'"); break; - } else { - changed = true; } - } - if (changed) { - get_stateless_obj()->get_platform_api()->publish_async_port_attr_changed(port_id); - } + /* check error code */ + if ( ret == -ENOTSUP ) { + generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC."); + } else if (ret) { + generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret)); + } + } + result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); + } @@ -568,17 +642,12 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name()); result["result"]["state"] = port->get_state_as_string(); result["result"]["max_stream_id"] = port->get_max_stream_id(); - result["result"]["speed"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_link_speed(); /* attributes */ - result["result"]["attr"]["promiscuous"]["enabled"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_promiscuous(); - result["result"]["attr"]["link"]["up"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->is_link_up(); - int mode; - int ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_flow_ctrl(mode); - if (ret != 0) { - mode = -1; - } - result["result"]["attr"]["fc"]["mode"] = mode; + get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]); + + /* RX info */ + port->get_rx_features().to_json(result["result"]["rx_info"]); return (TREX_RPC_CMD_OK); } @@ -640,3 +709,149 @@ TrexRpcCmdPushRemote::_run(const Json::Value ¶ms, Json::Value &result) { } +/** + * set on/off RX software receive mode + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_port(params, result); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + /* decide which feature is being set */ + const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result); + + if (type == "capture") { + parse_capture_msg(params, port, result); + } else if (type == "queue") { + parse_queue_msg(params, port, result); + } else if (type == "server") { + parse_server_msg(params, port, result); + } else { + assert(0); + } + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); + +} + +void +TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { + + bool enabled = parse_bool(msg, "enabled", result); + + if (enabled) { + + std::string pcap_filename = parse_string(msg, "pcap_filename", result); + uint64_t limit = parse_uint32(msg, "limit", result); + + if (limit == 0) { + generate_parse_err(result, "limit cannot be zero"); + } + + try { + port->start_rx_capture(pcap_filename, limit); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } else { + + try { + port->stop_rx_capture(); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } + +} + +void +TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { + bool enabled = parse_bool(msg, "enabled", result); + + if (enabled) { + + uint64_t size = parse_uint32(msg, "size", result); + + if (size == 0) { + generate_parse_err(result, "queue size cannot be zero"); + } + + try { + port->start_rx_queue(size); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } else { + + try { + port->stop_rx_queue(); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } + +} + +void +TrexRpcCmdSetRxFeature::parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { +} + + +trex_rpc_cmd_rc_e +TrexRpcCmdGetRxQueuePkts::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_port(params, result); + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + const RxPacketBuffer *pkt_buffer = port->get_rx_queue_pkts(); + if (pkt_buffer) { + result["result"]["pkts"] = pkt_buffer->to_json(); + } else { + result["result"]["pkts"] = Json::arrayValue; + } + + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + + return (TREX_RPC_CMD_OK); +} + +trex_rpc_cmd_rc_e +TrexRpcCmdSetARPRes::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_port(params, result); + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + const std::string ipv4_str = parse_string(params, "ipv4", result); + const std::string mac_str = parse_string(params, "mac", result); + + uint32_t ipv4_addr; + if (!utl_ipv4_to_uint32(ipv4_str.c_str(), ipv4_addr)) { + std::stringstream ss; + ss << "invalid IPv4 address: '" << ipv4_str << "'"; + generate_parse_err(result, ss.str()); + } + + uint8_t mac[6]; + if (!utl_str_to_macaddr(mac_str, mac)) { + std::stringstream ss; + ss << "'invalid MAC address: '" << mac_str << "'"; + generate_parse_err(result, ss.str()); + } + + port->getPortAttrObj()->get_dest().set_dest_ipv4(ipv4_addr, mac); + + return (TREX_RPC_CMD_OK); + +} diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 5fde1d0c..2b2178e2 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -27,6 +27,7 @@ limitations under the License. #include <memory> class TrexStream; +class TrexStatelessPort; /* all the RPC commands decl. goes here */ @@ -89,10 +90,17 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true, APIClass: */ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false, APIClass::API_CLASS_TYPE_CORE); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1, false, APIClass::API_CLASS_TYPE_CORE); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE, + + int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result); + int parse_ipv4(const Json::Value &msg, uint8_t port_id, Json::Value &result); + int parse_dest(const Json::Value &msg, uint8_t port_id, Json::Value &result); +); + + /** * stream cmds */ @@ -144,5 +152,15 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_ TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE, + void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); + void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); + void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); + +); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetARPRes, "set_arp_resolution", 2, false, APIClass::API_CLASS_TYPE_CORE); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index cddf19b9..919be1f1 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -71,6 +71,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdPushRemote()); register_command(new TrexRpcCmdShutdown()); + + register_command(new TrexRpcCmdSetRxFeature()); + register_command(new TrexRpcCmdGetRxQueuePkts()); + + register_command(new TrexRpcCmdSetARPRes()); } diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 9bb20990..d4bc5c36 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -25,6 +25,7 @@ limitations under the License. #include <trex_streams_compiler.h> #include <common/basic_utils.h> #include <common/captureFile.h> +#include "trex_stateless_rx_defs.h" #include <string> @@ -156,9 +157,9 @@ private: TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; - m_port_id = port_id; - m_port_state = PORT_STATE_IDLE; - m_platform_api = api; + m_port_id = port_id; + m_port_state = PORT_STATE_IDLE; + m_platform_api = api; /* get the platform specific data */ api->get_interface_info(port_id, m_api_info); @@ -584,10 +585,9 @@ TrexStatelessPort::get_max_stream_id() const { } void -TrexStatelessPort::get_properties(std::string &driver, uint32_t &speed) { +TrexStatelessPort::get_properties(std::string &driver) { driver = m_api_info.driver_name; - speed = m_platform_api->getPortAttrObj(m_port_id)->get_link_speed(); } bool @@ -888,16 +888,6 @@ TrexStatelessPort::get_port_effective_rate(double &pps, } void -TrexStatelessPort::get_macaddr(std::string &hw_macaddr, - std::string &src_macaddr, - std::string &dst_macaddr) { - - utl_macaddr_to_str(m_api_info.mac_info.hw_macaddr, hw_macaddr); - utl_macaddr_to_str(m_api_info.mac_info.src_macaddr, src_macaddr); - utl_macaddr_to_str(m_api_info.mac_info.dst_macaddr, dst_macaddr); -} - -void TrexStatelessPort::get_pci_info(std::string &pci_addr, int &numa_node) { pci_addr = m_api_info.pci_addr; numa_node = m_api_info.numa_node; @@ -944,6 +934,56 @@ TrexStatelessPort::remove_and_delete_all_streams() { } } +void +TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { + + m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit); + + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); + send_message_to_rx(msg); +} + +void +TrexStatelessPort::stop_rx_capture() { + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); + send_message_to_rx(msg); + m_rx_features_info.m_rx_capture_info.disable(); +} + +void +TrexStatelessPort::start_rx_queue(uint64_t size) { + + m_rx_features_info.m_rx_queue_info.enable(size); + + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); + send_message_to_rx(msg); +} + +void +TrexStatelessPort::stop_rx_queue() { + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); + send_message_to_rx(msg); + m_rx_features_info.m_rx_queue_info.disable(); +} + + +RxPacketBuffer * +TrexStatelessPort::get_rx_queue_pkts() { + + if (m_rx_features_info.m_rx_queue_info.is_empty()) { + return NULL; + } + + /* ask RX core for the pkt queue */ + TrexStatelessMsgReply<RxPacketBuffer *> msg_reply; + + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply); + send_message_to_rx(msg); + + RxPacketBuffer *pkt_buffer = msg_reply.wait_for_reply(); + return pkt_buffer; +} + /************* Trex Port Owner **************/ TrexPortOwner::TrexPortOwner() { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index e2a2aeba..cf6b2716 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -24,12 +24,15 @@ limitations under the License. #include "common/basic_utils.h" #include "internal_api/trex_platform_api.h" #include "trex_dp_port_events.h" +#include "trex_stateless_rx_defs.h" #include "trex_stream.h" class TrexStatelessCpToDpMsgBase; class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; +class RxPacketBuffer; + /** * TRex port owner can perform @@ -255,11 +258,8 @@ public: * @author imarom (16-Sep-15) * * @param driver - * @param speed */ - void get_properties(std::string &driver, uint32_t &speed); - - + void get_properties(std::string &driver); /** * encode stats as JSON @@ -362,14 +362,60 @@ public: double &bps_L2, double &percentage); + void get_pci_info(std::string &pci_addr, int &numa_node); - void get_macaddr(std::string &hw_macaddr, - std::string &src_macaddr, - std::string &dst_macaddr); - void get_pci_info(std::string &pci_addr, int &numa_node); + /** + * enable RX capture on port + * + */ + void start_rx_capture(const std::string &pcap_filename, uint64_t limit); + /** + * disable RX capture if on + * + */ + void stop_rx_capture(); + + /** + * start RX queueing of packets + * + * @author imarom (11/7/2016) + * + * @param limit + */ + void start_rx_queue(uint64_t limit); + /** + * stop RX queueing + * + * @author imarom (11/7/2016) + */ + void stop_rx_queue(); + + + /** + * get the RX features info object + * + */ + const RXFeaturesInfo &get_rx_features() { + return m_rx_features_info; + } + + /** + * fetch the RX queue packets from the queue + * + */ + RxPacketBuffer *get_rx_queue_pkts(); + + /** + * return the port attribute object + * + */ + TRexPortAttr *getPortAttrObj() { + return m_platform_api->getPortAttrObj(m_port_id); + } + private: bool is_core_active(int core_id); @@ -456,6 +502,9 @@ private: TrexPortOwner m_owner; int m_pending_async_stop_event; + + RXFeaturesInfo m_rx_features_info; + }; @@ -502,9 +551,9 @@ public: static const std::initializer_list<std::string> g_types; static const std::initializer_list<std::string> g_ops; - mul_type_e m_type; - mul_op_e m_op; - double m_value; + mul_type_e m_type; + mul_op_e m_op; + double m_value; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 95613b41..c2182f3c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -241,13 +241,13 @@ TrexDpPortEventMsg::handle() { } /************************* messages from CP to RX **********************/ -bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) { - rx_core->work(); +bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) { + rx_core->enable_latency(); return true; } -bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) { - rx_core->idle(); +bool TrexStatelessRxDisableLatency::handle (CRxCoreStateless *rx_core) { + rx_core->disable_latency(); return true; } @@ -255,3 +255,42 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { rx_core->quit(); return true; } + + +bool +TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { + rx_core->start_capture(m_port_id, m_pcap_filename, m_limit, m_shared_counter); + + return true; +} + +bool +TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { + rx_core->stop_capture(m_port_id); + + return true; +} + +bool +TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { + rx_core->start_queue(m_port_id, m_size, m_shared_counter); + + return true; +} + +bool +TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { + rx_core->stop_queue(m_port_id); + + return true; +} + + + +bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { + RxPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); + assert(pkt_buffer); + m_reply.set(pkt_buffer); + + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index fb2c27ab..52b1662e 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -24,11 +24,15 @@ limitations under the License. #include "msg_manager.h" #include "trex_dp_port_events.h" +#include "trex_exception.h" +#include "trex_stateless_rx_defs.h" +#include "os_time.h" class TrexStatelessDpCore; class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; +class RxPacketBuffer; /** * defines the base class for CP to DP messages @@ -312,7 +316,7 @@ private: /************************* messages from DP to CP **********************/ /** - * defines the base class for CP to DP messages + * defines the base class for DP to CP messages * * @author imarom (27-Oct-15) */ @@ -404,11 +408,11 @@ public: }; -class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; -class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; @@ -416,4 +420,124 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; + +class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) { + m_port_id = port_id; + m_limit = rx_capture_info.m_limit; + m_pcap_filename = rx_capture_info.m_pcap_filename; + m_shared_counter = &rx_capture_info.m_shared_counter; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + std::string m_pcap_filename; + uint64_t m_limit; + uint64_t *m_shared_counter; +}; + + +class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStopCapture(uint8_t port_id) { + m_port_id = port_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; +}; + +class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) { + m_port_id = port_id; + m_size = rx_queue_info.m_size; + m_shared_counter = &rx_queue_info.m_shared_counter; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + uint64_t m_size; + uint64_t *m_shared_counter; +}; + +class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStopQueue(uint8_t port_id) { + m_port_id = port_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; +}; + + +template<typename T> class TrexStatelessMsgReply { +public: + TrexStatelessMsgReply() { + m_pending = true; + } + + bool is_pending() const { + return m_pending; + } + + void set(T reply) { + m_reply = reply; + + /* before marking as done - memory fence */ + asm volatile("mfence" ::: "memory"); + m_pending = false; + } + + T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { + int guard = timeout_ms; + + while (is_pending()) { + guard -= backoff_ms; + if (guard < 0) { + throw TrexException("timeout: RX core has failed to reply"); + } + + delay(backoff_ms); + + } + return m_reply; + + } +private: + bool m_pending; + T m_reply; +}; + + + +class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { +public: + + TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RxPacketBuffer *> &reply) : m_reply(reply) { + m_port_id = port_id; + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + TrexStatelessMsgReply<RxPacketBuffer*> &m_reply; +}; + + #endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d162c5b3..2a678365 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -26,6 +26,8 @@ #include "pal/linux/sanb_atomic.h" #include "trex_stateless_messaging.h" #include "trex_stateless_rx_core.h" +#include "trex_stateless.h" + void CRFC2544Info::create() { m_latency.Create(); @@ -64,15 +66,7 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { obj.set_latency_json(json); }; -void CCPortLatencyStl::reset() { - for (int i = 0; i < MAX_FLOW_STATS; i++) { - m_rx_pg_stat[i].clear(); - m_rx_pg_stat_payload[i].clear(); - } -} - void CRxCoreStateless::create(const CRxSlCfg &cfg) { - m_rcv_all = false; m_capture = false; m_max_ports = cfg.m_max_ports; @@ -82,15 +76,18 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; - for (int i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPortStl * lp = &m_ports[i]; - lp->m_io = cfg.m_ports[i]; - lp->m_port.reset(); + for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { + m_rfc2544[i].create(); } + m_cpu_cp_u.Create(&m_cpu_dp_u); - for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { - m_rfc2544[i].create(); + /* init per port manager */ + for (int i = 0; i < m_max_ports; i++) { + m_rx_port_mngr[i].create(cfg.m_ports[i], + m_rfc2544, + &m_err_cntrs, + &m_cpu_dp_u); } } @@ -124,10 +121,30 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { handle_cp_msg(msg); } + recalculate_next_state(); return true; } +void CRxCoreStateless::recalculate_next_state() { + if (m_state == STATE_QUIT) { + return; + } + + /* next state is determine by the question are there any ports with active features ? */ + m_state = (are_any_features_active() ? STATE_WORKING : STATE_IDLE); +} + +bool CRxCoreStateless::are_any_features_active() { + for (int i = 0; i < m_max_ports; i++) { + if (m_rx_port_mngr[i].has_features_set()) { + return true; + } + } + + return false; +} + void CRxCoreStateless::idle_state_loop() { const int SHORT_DELAY_MS = 2; const int LONG_DELAY_MS = 50; @@ -141,7 +158,7 @@ void CRxCoreStateless::idle_state_loop() { counter = 0; continue; } else { - flush_rx(); + flush_all_pending_pkts(); } /* enter deep sleep only if enough time had passed */ @@ -154,143 +171,55 @@ void CRxCoreStateless::idle_state_loop() { } } -void CRxCoreStateless::start() { - int count = 0; +void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) { int i = 0; - bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; + + while (m_state == STATE_WORKING) { + + if (do_try_rx_queue) { + try_rx_queues(); + } + + process_all_pending_pkts(); + + i++; + if (i == 100000) { // approx 10msec + i = 0; + periodic_check_for_cp_messages(); // m_state might change in here + } + + rte_pause(); + } +} + +void CRxCoreStateless::start() { + bool do_try_rx_queue = CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; /* register a watchdog handle on current core */ m_monitor.create("STL RX CORE", 1); TrexWatchDog::getInstance().register_monitor(&m_monitor); - while (true) { - if (m_state == STATE_WORKING) { - i++; - if (i == 100000) { // approx 10msec - i = 0; - periodic_check_for_cp_messages(); // m_state might change in here - } - } else { - if (m_state == STATE_QUIT) - break; - count = 0; - i = 0; + while (m_state != STATE_QUIT) { + + switch (m_state) { + case STATE_IDLE: set_working_msg_ack(false); idle_state_loop(); set_working_msg_ack(true); - } - if (do_try_rx_queue) { - try_rx_queues(); - } - count += try_rx(); - } - rte_pause(); + break; - m_monitor.disable(); -} + case STATE_WORKING: + handle_work_stage(do_try_rx_queue); + break; -void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) { - CFlowStatParser parser; - - if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { - uint32_t ip_id; - if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) { - if (m_rcv_all || is_flow_stat_id(ip_id)) { - uint16_t hw_id; - if (m_rcv_all || is_flow_stat_payload_id(ip_id)) { - bool good_packet = true; - uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); - struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) - (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); - hw_id = fsp_head->hw_id; - CRFC2544Info *curr_rfc2544; - - if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) { - good_packet = false; - if (!m_rcv_all) - m_err_cntrs.m_bad_header++; - } else { - curr_rfc2544 = &m_rfc2544[hw_id]; - - if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) { - // bad flow seq num - // Might be the first packet of a new flow, packet from an old flow, or garbage. - - if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) { - // packet from previous flow using this hw_id that arrived late - good_packet = false; - m_err_cntrs.m_old_flow++; - } else { - if (curr_rfc2544->no_flow_seq()) { - // first packet we see from this flow - good_packet = true; - curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq); - } else { - // garbage packet - good_packet = false; - m_err_cntrs.m_bad_header++; - } - } - } - } - - if (good_packet) { - uint32_t pkt_seq = fsp_head->seq; - uint32_t exp_seq = curr_rfc2544->get_seq(); - if (unlikely(pkt_seq != exp_seq)) { - if (pkt_seq < exp_seq) { - if (exp_seq - pkt_seq > 100000) { - // packet loss while we had wrap around - curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); - curr_rfc2544->inc_seq_err_too_big(); - curr_rfc2544->set_seq(pkt_seq + 1); - } else { - if (pkt_seq == (exp_seq - 1)) { - curr_rfc2544->inc_dup(); - } else { - curr_rfc2544->inc_ooo(); - // We thought it was lost, but it was just out of order - curr_rfc2544->dec_seq_err(); - } - curr_rfc2544->inc_seq_err_too_low(); - } - } else { - if (unlikely (pkt_seq - exp_seq > 100000)) { - // packet reorder while we had wrap around - if (pkt_seq == (exp_seq - 1)) { - curr_rfc2544->inc_dup(); - } else { - curr_rfc2544->inc_ooo(); - // We thought it was lost, but it was just out of order - curr_rfc2544->dec_seq_err(); - } - curr_rfc2544->inc_seq_err_too_low(); - } else { - // seq > curr_rfc2544->seq. Assuming lost packets - curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); - curr_rfc2544->inc_seq_err_too_big(); - curr_rfc2544->set_seq(pkt_seq + 1); - } - } - } else { - curr_rfc2544->set_seq(pkt_seq + 1); - } - lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC - uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); - dsec_t ctime = ptime_convert_hr_dsec(d); - curr_rfc2544->add_sample(ctime); - } - } else { - hw_id = get_hw_id(ip_id); - if (hw_id < MAX_FLOW_STATS) { - lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC - } - } - } + default: + assert(0); + break; } + } + + m_monitor.disable(); } void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { @@ -310,7 +239,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { CGenNodeLatencyPktInfo * l_msg; uint8_t msg_type = msg->m_msg_type; uint8_t rx_port_index; - CLatencyManagerPerPortStl * lp; + switch (msg_type) { case CGenNodeMsgBase::LATENCY_PKT: @@ -318,8 +247,9 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { assert(l_msg->m_latency_offset == 0xdead); rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1); assert( rx_port_index < m_max_ports ); - lp = &m_ports[rx_port_index]; - handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt); + + m_rx_port_mngr[rx_port_index].handle_pkt((rte_mbuf_t *)l_msg->m_pkt); + if (m_capture) capture_pkt((rte_mbuf_t *)l_msg->m_pkt); rte_pktmbuf_free((rte_mbuf_t *)l_msg->m_pkt); @@ -347,87 +277,38 @@ void CRxCoreStateless::try_rx_queues() { } } -// exactly the same as try_rx, without the handle_rx_pkt -// purpose is to flush rx queues when core is in idle state -void CRxCoreStateless::flush_rx() { - rte_mbuf_t * rx_pkts[64]; - int i, total_pkts = 0; - for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPortStl * lp = &m_ports[i]; - rte_mbuf_t * m; - /* try to read 64 packets clean up the queue */ - uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); - total_pkts += cnt_p; - if (cnt_p) { - m_cpu_dp_u.start_work1(); - int j; - for (j = 0; j < cnt_p; j++) { - m = rx_pkts[j]; - rte_pktmbuf_free(m); - } - /* commit only if there was work to do ! */ - m_cpu_dp_u.commit1(); - }/* if work */ - }// all ports -} +int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { + + int total_pkts = 0; + for (int i = 0; i < m_max_ports; i++) { + total_pkts += m_rx_port_mngr[i].process_all_pending_pkts(flush_rx); + } -int CRxCoreStateless::try_rx() { - rte_mbuf_t * rx_pkts[64]; - int i, total_pkts = 0; - for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPortStl * lp = &m_ports[i]; - rte_mbuf_t * m; - /* try to read 64 packets clean up the queue */ - uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); - total_pkts += cnt_p; - if (cnt_p) { - m_cpu_dp_u.start_work1(); - int j; - for (j = 0; j < cnt_p; j++) { - m = rx_pkts[j]; - handle_rx_pkt(lp, m); - rte_pktmbuf_free(m); - } - /* commit only if there was work to do ! */ - m_cpu_dp_u.commit1(); - }/* if work */ - }// all ports return total_pkts; -} -bool CRxCoreStateless::is_flow_stat_id(uint32_t id) { - if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true; - return false; } -bool CRxCoreStateless::is_flow_stat_payload_id(uint32_t id) { - if (id == FLOW_STAT_PAYLOAD_IP_ID) return true; - return false; -} - -uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { - return (0x00ff & id); -} void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { - for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) { - m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); - } + m_rx_port_mngr[port_id].clear_stats(); } int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max , bool reset, TrexPlatformApi::driver_stat_cap_e type) { + + RXLatency &latency = m_rx_port_mngr[port_id].get_latency(); + for (int hw_id = min; hw_id <= max; hw_id++) { if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { - rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id]; + rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id]; } else { - rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id]; } if (reset) { if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { - m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear(); + latency.m_rx_pg_stat_payload[hw_id].clear(); } else { - m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + latency.m_rx_pg_stat[hw_id].clear(); } } } @@ -472,3 +353,39 @@ void CRxCoreStateless::update_cpu_util(){ double CRxCoreStateless::get_cpu_util() { return m_cpu_cp_u.GetVal(); } + + +void +CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter); +} + +void +CRxCoreStateless::stop_capture(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_capture(); +} + +void +CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_queue(size, shared_counter); +} + +void +CRxCoreStateless::stop_queue(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_queue(); +} + +void +CRxCoreStateless::enable_latency() { + for (int i = 0; i < m_max_ports; i++) { + m_rx_port_mngr[i].enable_latency(); + } +} + +void +CRxCoreStateless::disable_latency() { + for (int i = 0; i < m_max_ports; i++) { + m_rx_port_mngr[i].disable_latency(); + } +} + diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 3f9fb6cc..519724d8 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -25,36 +25,10 @@ #include "os_time.h" #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" +#include "trex_stateless_rx_port_mngr.h" class TrexStatelessCpToRxMsgBase; -class CCPortLatencyStl { - public: - void reset(); - - public: - rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; - rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD]; -}; - -class CLatencyManagerPerPortStl { -public: - CCPortLatencyStl m_port; - CPortLatencyHWBase * m_io; -}; - -class CRxSlCfg { - public: - CRxSlCfg (){ - m_max_ports = 0; - m_cps = 0.0; - } - - public: - uint32_t m_max_ports; - double m_cps; - CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; -}; class CRFC2544Info { public: @@ -109,7 +83,7 @@ class CRxCoreErrCntrs { m_old_flow = 0; } - private: + public: uint64_t m_bad_header; uint64_t m_old_flow; }; @@ -129,47 +103,81 @@ class CRxCoreStateless { , TrexPlatformApi::driver_stat_cap_e type); int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset); int get_rx_err_cntrs(CRxCoreErrCntrs *rx_err); - void work() { - m_state = STATE_WORKING; - m_err_cntrs.reset(); // When starting to work, reset global counters - } - void idle() {m_state = STATE_IDLE;} + + void quit() {m_state = STATE_QUIT;} bool is_working() const {return (m_ack_start_work_msg == true);} void set_working_msg_ack(bool val); double get_cpu_util(); void update_cpu_util(); + RxPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + return m_rx_port_mngr[port_id].get_pkt_buffer(); + } + + /** + * start capturing of RX packets on a specific port + * + * @author imarom (11/2/2016) + * + * @param port_id + * @param pcap_filename + * @param limit + */ + void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); + void stop_capture(uint8_t port_id); + + /** + * start RX queueing of packets + * + */ + void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter); + void stop_queue(uint8_t port_id); + + /** + * enable latency feature for RX packets + * will be apply to all ports + */ + void enable_latency(); + void disable_latency(); private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); void tickle(); void idle_state_loop(); - void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m); + + void recalculate_next_state(); + bool are_any_features_active(); + void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); - void flush_rx(); - int try_rx(); + void handle_work_stage(bool do_try_rx_queue); + + int process_all_pending_pkts(bool flush_rx = false); + + void flush_all_pending_pkts() { + process_all_pending_pkts(true); + } + void try_rx_queues(); - bool is_flow_stat_id(uint32_t id); - bool is_flow_stat_payload_id(uint32_t id); - uint16_t get_hw_id(uint16_t id); private: - TrexMonitor m_monitor; - uint32_t m_max_ports; - bool m_capture; - bool m_rcv_all; - CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS]; - state_e m_state; - CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; - CCpuUtlDp m_cpu_dp_u; - CCpuUtlCp m_cpu_cp_u; + TrexMonitor m_monitor; + uint32_t m_max_ports; + bool m_capture; + state_e m_state; + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + CCpuUtlDp m_cpu_dp_u; + CCpuUtlCp m_cpu_cp_u; + // Used for acking "work" (go out of idle) messages from cp volatile bool m_ack_start_work_msg __rte_cache_aligned; + CRxCoreErrCntrs m_err_cntrs; CRFC2544Info m_rfc2544[MAX_FLOW_STATS_PAYLOAD]; + + RXPortManager m_rx_port_mngr[TREX_MAX_PORTS]; }; #endif diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h new file mode 100644 index 00000000..7b1e0f32 --- /dev/null +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -0,0 +1,157 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TREX_STATELESS_RX_DEFS_H__ +#define __TREX_STATELESS_RX_DEFS_H__ + +#include "trex_defs.h" +#include <json/json.h> + +class CPortLatencyHWBase; + +/** + * general SL cfg + * + */ +class CRxSlCfg { + public: + CRxSlCfg (){ + m_max_ports = 0; + m_cps = 0.0; + } + + public: + uint32_t m_max_ports; + double m_cps; + CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; +}; + +/** + * describes the filter type applied to the RX + * RX_FILTER_MODE_HW - only hardware filtered traffic will + * reach the RX core + * + */ +typedef enum rx_filter_mode_ { + RX_FILTER_MODE_HW, + RX_FILTER_MODE_ALL +} rx_filter_mode_e; + +/** + * holds RX capture info + * + */ +class RXCaptureInfo { +public: + RXCaptureInfo() { + m_is_active = false; + m_limit = 0; + m_shared_counter = 0; + } + + void enable(const std::string &pcap_filename, uint64_t limit) { + m_pcap_filename = pcap_filename; + m_limit = limit; + m_is_active = true; + } + + void disable() { + m_is_active = false; + m_pcap_filename = ""; + m_limit = 0; + } + + bool is_empty() const { + return (m_shared_counter == 0); + } + + void to_json(Json::Value &output) const { + output["is_active"] = m_is_active; + if (m_is_active) { + output["pcap_filename"] = m_pcap_filename; + output["limit"] = Json::UInt64(m_limit); + output["count"] = Json::UInt64(m_shared_counter); + } + } + +public: + bool m_is_active; + std::string m_pcap_filename; + uint64_t m_limit; + uint64_t m_shared_counter; +}; + + +class RXQueueInfo { +public: + + RXQueueInfo() { + m_is_active = false; + m_shared_counter = 0; + } + + void enable(uint64_t size) { + m_size = size; + m_is_active = true; + } + + void disable() { + m_is_active = false; + m_size = 0; + } + + bool is_empty() const { + return (m_shared_counter == 0); + } + + void to_json(Json::Value &output) const { + output["is_active"] = m_is_active; + if (m_is_active) { + output["size"] = Json::UInt64(m_size); + output["count"] = Json::UInt64(m_shared_counter); + } + } + +public: + bool m_is_active; + uint64_t m_size; + uint64_t m_shared_counter; +}; + + +/** + * holds all the RX features info + * + * @author imarom (11/7/2016) + */ +class RXFeaturesInfo { +public: + RXCaptureInfo m_rx_capture_info; + RXQueueInfo m_rx_queue_info; + + void to_json(Json::Value &msg) const { + m_rx_capture_info.to_json(msg["sniffer"]); + m_rx_queue_info.to_json(msg["queue"]); + } +}; + +#endif /* __TREX_STATELESS_RX_DEFS_H__ */ + diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp new file mode 100644 index 00000000..2683dbe1 --- /dev/null +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -0,0 +1,253 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +#include "bp_sim.h" +#include "trex_stateless_rx_port_mngr.h" +#include "common/captureFile.h" +#include "trex_stateless_rx_core.h" + +/************************** latency feature ************/ +void RXLatency::handle_pkt(const rte_mbuf_t *m) { + CFlowStatParser parser; + + if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { + uint32_t ip_id; + if (m_rcv_all || (parser.get_ip_id(ip_id) == 0)) { + if (m_rcv_all || is_flow_stat_id(ip_id)) { + uint16_t hw_id; + if (m_rcv_all || is_flow_stat_payload_id(ip_id)) { + bool good_packet = true; + uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); + hw_id = fsp_head->hw_id; + CRFC2544Info *curr_rfc2544; + + if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) { + good_packet = false; + if (!m_rcv_all) + m_err_cntrs->m_bad_header++; + } else { + curr_rfc2544 = &m_rfc2544[hw_id]; + + if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) { + // bad flow seq num + // Might be the first packet of a new flow, packet from an old flow, or garbage. + + if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) { + // packet from previous flow using this hw_id that arrived late + good_packet = false; + m_err_cntrs->m_old_flow++; + } else { + if (curr_rfc2544->no_flow_seq()) { + // first packet we see from this flow + good_packet = true; + curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq); + } else { + // garbage packet + good_packet = false; + m_err_cntrs->m_bad_header++; + } + } + } + } + + if (good_packet) { + uint32_t pkt_seq = fsp_head->seq; + uint32_t exp_seq = curr_rfc2544->get_seq(); + if (unlikely(pkt_seq != exp_seq)) { + if (pkt_seq < exp_seq) { + if (exp_seq - pkt_seq > 100000) { + // packet loss while we had wrap around + curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); + curr_rfc2544->inc_seq_err_too_big(); + curr_rfc2544->set_seq(pkt_seq + 1); + } else { + if (pkt_seq == (exp_seq - 1)) { + curr_rfc2544->inc_dup(); + } else { + curr_rfc2544->inc_ooo(); + // We thought it was lost, but it was just out of order + curr_rfc2544->dec_seq_err(); + } + curr_rfc2544->inc_seq_err_too_low(); + } + } else { + if (unlikely (pkt_seq - exp_seq > 100000)) { + // packet reorder while we had wrap around + if (pkt_seq == (exp_seq - 1)) { + curr_rfc2544->inc_dup(); + } else { + curr_rfc2544->inc_ooo(); + // We thought it was lost, but it was just out of order + curr_rfc2544->dec_seq_err(); + } + curr_rfc2544->inc_seq_err_too_low(); + } else { + // seq > curr_rfc2544->seq. Assuming lost packets + curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); + curr_rfc2544->inc_seq_err_too_big(); + curr_rfc2544->set_seq(pkt_seq + 1); + } + } + } else { + curr_rfc2544->set_seq(pkt_seq + 1); + } + m_rx_pg_stat_payload[hw_id].add_pkts(1); + m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC + uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); + dsec_t ctime = ptime_convert_hr_dsec(d); + curr_rfc2544->add_sample(ctime); + } + } else { + hw_id = get_hw_id(ip_id); + if (hw_id < MAX_FLOW_STATS) { + m_rx_pg_stat[hw_id].add_pkts(1); + m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC + } + } + } + } + } +} + +void +RXLatency::reset_stats() { + for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) { + m_rx_pg_stat[hw_id].clear(); + } +} + +/****************************** packet recorder ****************************/ + +RXPacketRecorder::RXPacketRecorder() { + m_writer = NULL; + m_shared_counter = NULL; + m_limit = 0; + m_epoch = -1; +} + +RXPacketRecorder::~RXPacketRecorder() { + stop(); +} + +void +RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) { + m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); + if (m_writer == NULL) { + std::stringstream ss; + ss << "unable to create PCAP file: " << pcap; + throw TrexException(ss.str()); + } + + assert(limit > 0); + m_limit = limit; + m_shared_counter = shared_counter; + (*m_shared_counter) = 0; +} + +void +RXPacketRecorder::stop() { + if (m_writer) { + delete m_writer; + m_writer = NULL; + } +} + +void +RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { + if (!m_writer) { + return; + } + + dsec_t now = now_sec(); + if (m_epoch < 0) { + m_epoch = now; + } + + dsec_t dt = now - m_epoch; + + CPktNsecTimeStamp t_c(dt); + m_pkt.time_nsec = t_c.m_time_nsec; + m_pkt.time_sec = t_c.m_time_sec; + + const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *); + m_pkt.pkt_len = m->pkt_len; + memcpy(m_pkt.raw, p, m->pkt_len); + + m_writer->write_packet(&m_pkt); + + m_limit--; + (*m_shared_counter)++; + + if (m_limit == 0) { + stop(); + } +} + + +void RXPortManager::handle_pkt(const rte_mbuf_t *m) { + + /* handle features */ + + if (is_feature_set(LATENCY)) { + m_latency.handle_pkt(m); + } + + if (is_feature_set(CAPTURE)) { + m_recorder.handle_pkt(m); + } + + if (is_feature_set(QUEUE)) { + m_pkt_buffer->handle_pkt(m); + } +} + + +int RXPortManager::process_all_pending_pkts(bool flush_rx) { + + rte_mbuf_t *rx_pkts[64]; + + /* try to read 64 packets clean up the queue */ + uint16_t cnt_p = m_io->rx_burst(rx_pkts, 64); + if (cnt_p == 0) { + return cnt_p; + } + + + m_cpu_dp_u->start_work1(); + + for (int j = 0; j < cnt_p; j++) { + rte_mbuf_t *m = rx_pkts[j]; + + if (!flush_rx) { + handle_pkt(m); + } + + rte_pktmbuf_free(m); + } + + /* commit only if there was work to do ! */ + m_cpu_dp_u->commit1(); + + + return cnt_p; +} + diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h new file mode 100644 index 00000000..aa8ba8e9 --- /dev/null +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -0,0 +1,411 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TREX_STATELESS_RX_PORT_MNGR_H__ +#define __TREX_STATELESS_RX_PORT_MNGR_H__ + +#include <stdint.h> +#include "common/base64.h" + +#include "common/captureFile.h" + +/************************* latency ***********************/ + +class CPortLatencyHWBase; +class CRFC2544Info; +class CRxCoreErrCntrs; + +class RXLatency { +public: + + RXLatency() { + m_rcv_all = false; + m_rfc2544 = NULL; + m_err_cntrs = NULL; + + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + m_rx_pg_stat_payload[i].clear(); + } + } + + void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) { + m_rfc2544 = rfc2544; + m_err_cntrs = err_cntrs; + } + + void reset_stats(); + + void handle_pkt(const rte_mbuf_t *m); + +private: + bool is_flow_stat_id(uint32_t id) { + if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true; + return false; + } + + bool is_flow_stat_payload_id(uint32_t id) { + if (id == FLOW_STAT_PAYLOAD_IP_ID) return true; + return false; + } + + uint16_t get_hw_id(uint16_t id) { + return (0x00ff & id); +} + +public: + + rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; + rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD]; + + bool m_rcv_all; + CRFC2544Info *m_rfc2544; + CRxCoreErrCntrs *m_err_cntrs; +}; + +/************************ queue ***************************/ + +/** + * describes a single saved RX packet + * + */ +class RxPacket { +public: + + RxPacket(const rte_mbuf_t *m) { + /* assume single part packet */ + assert(m->nb_segs == 1); + + m_size = m->pkt_len; + const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *); + + m_raw = (uint8_t *)malloc(m_size); + memcpy(m_raw, p, m_size); + } + + /* RVO here - no performance impact */ + const std::string to_base64_str() const { + return base64_encode(m_raw, m_size); + } + + ~RxPacket() { + if (m_raw) { + delete m_raw; + } + } + +private: + + uint8_t *m_raw; + uint16_t m_size; +}; + +/** + * a simple cyclic buffer to hold RX packets + * + */ +class RxPacketBuffer { +public: + + RxPacketBuffer(uint64_t size, uint64_t *shared_counter) { + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + m_shared_counter = shared_counter; + + *m_shared_counter = 0; + + m_buffer = new RxPacket*[m_size](); // zeroed + + m_is_enabled = true; + } + + ~RxPacketBuffer() { + assert(m_buffer); + + while (!is_empty()) { + RxPacket *pkt = pop(); + delete pkt; + } + delete [] m_buffer; + } + + /* freeze the data structure - no more packets can be pushed / poped */ + RxPacketBuffer * freeze_and_clone() { + /* create a new one */ + RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter); + + /* freeze the current */ + m_shared_counter = NULL; + m_is_enabled = false; + + return new_buffer; + } + + bool is_empty() const { + return (m_head == m_tail); + } + + bool is_full() const { + return ( next(m_head) == m_tail); + } + + void handle_pkt(const rte_mbuf_t *m) { + assert(m_is_enabled); + + /* if full - pop the oldest */ + if (is_full()) { + delete pop(); + } + + (*m_shared_counter)++; + + m_buffer[m_head] = new RxPacket(m); + m_head = next(m_head); + } + + /** + * generate a JSON output of the queue + * + */ + Json::Value to_json() const { + + Json::Value output = Json::arrayValue; + + int tmp = m_tail; + while (tmp != m_head) { + RxPacket *pkt = m_buffer[tmp]; + output.append(pkt->to_base64_str()); + tmp = next(tmp); + } + + return output; + } + +private: + int next(int v) const { + return ( (v + 1) % m_size ); + } + + /* pop in case of full queue - internal usage */ + RxPacket * pop() { + assert(m_is_enabled); + assert(!is_empty()); + + RxPacket *pkt = m_buffer[m_tail]; + m_tail = next(m_tail); + (*m_shared_counter)--; + return pkt; + } + + int m_head; + int m_tail; + int m_size; + RxPacket **m_buffer; + uint64_t *m_shared_counter; + bool m_is_enabled; +}; + +/************************ recoder ***************************/ + +/** + * RX packet recorder to PCAP file + * + */ +class RXPacketRecorder { +public: + RXPacketRecorder(); + ~RXPacketRecorder(); + void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter); + void stop(); + void handle_pkt(const rte_mbuf_t *m); + +private: + CFileWriterBase *m_writer; + CCapPktRaw m_pkt; + dsec_t m_epoch; + uint64_t m_limit; + uint64_t *m_shared_counter; +}; + + +/************************ manager ***************************/ + +/** + * per port RX features manager + * + * @author imarom (10/30/2016) + */ +class RXPortManager { +public: + enum features_t { + LATENCY = 0x1, + CAPTURE = 0x2, + QUEUE = 0x4 + }; + + RXPortManager() { + m_features = 0; + m_pkt_buffer = NULL; + m_io = NULL; + m_cpu_dp_u = NULL; + } + + void create(CPortLatencyHWBase *io, + CRFC2544Info *rfc2544, + CRxCoreErrCntrs *err_cntrs, + CCpuUtlDp *cpu_util) { + m_io = io; + m_cpu_dp_u = cpu_util; + m_latency.create(rfc2544, err_cntrs); + } + + void clear_stats() { + m_latency.reset_stats(); + } + + RXLatency & get_latency() { + return m_latency; + } + + void enable_latency() { + set_feature(LATENCY); + } + + void disable_latency() { + unset_feature(LATENCY); + } + + /** + * capturing of RX packets + * + * @author imarom (11/2/2016) + * + * @param pcap + * @param limit_pkts + */ + void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { + m_recorder.start(pcap, limit_pkts, shared_counter); + set_feature(CAPTURE); + } + + void stop_capture() { + m_recorder.stop(); + unset_feature(CAPTURE); + } + + /** + * queueing packets + * + */ + void start_queue(uint32_t size, uint64_t *shared_counter) { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } + m_pkt_buffer = new RxPacketBuffer(size, shared_counter); + set_feature(QUEUE); + } + + void stop_queue() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + m_pkt_buffer = NULL; + } + unset_feature(QUEUE); + } + + RxPacketBuffer *get_pkt_buffer() { + if (!is_feature_set(QUEUE)) { + return NULL; + } + + assert(m_pkt_buffer); + + /* hold a pointer to the old one */ + RxPacketBuffer *old_buffer = m_pkt_buffer; + + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); + + return old_buffer; + } + + + /** + * fetch and process all packets + * + */ + int process_all_pending_pkts(bool flush_rx = false); + + + /** + * flush all pending packets without processing them + * + */ + void flush_all_pending_pkts() { + process_all_pending_pkts(true); + } + + + /** + * handle a single packet + * + */ + void handle_pkt(const rte_mbuf_t *m); + + + bool has_features_set() { + return (m_features != 0); + } + + + bool no_features_set() { + return (!has_features_set()); + } + +private: + + + void set_feature(features_t feature) { + m_features |= feature; + } + + void unset_feature(features_t feature) { + m_features &= (~feature); + } + + bool is_feature_set(features_t feature) { + return ( (m_features & feature) == feature ); + } + + uint32_t m_features; + RXPacketRecorder m_recorder; + RXLatency m_latency; + RxPacketBuffer *m_pkt_buffer; + CCpuUtlDp *m_cpu_dp_u; + CPortLatencyHWBase *m_io; +}; + + + +#endif /* __TREX_STATELESS_RX_PORT_MNGR_H__ */ + diff --git a/src/trex_port_attr.cpp b/src/trex_port_attr.cpp new file mode 100644 index 00000000..26199e33 --- /dev/null +++ b/src/trex_port_attr.cpp @@ -0,0 +1,77 @@ +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_port_attr.h" +#include "bp_sim.h" + +const uint8_t DestAttr::g_dummy_mac[6] = {0x0,0x0,0x0,0x1,0x0,0x0}; + + +DestAttr::DestAttr(uint8_t port_id) { + m_port_id = port_id; + + m_mac = CGlobalInfo::m_options.m_mac_addr[port_id].u.m_mac.dest; +} + +const uint8_t * +TRexPortAttr::get_src_mac() const { + return CGlobalInfo::m_options.get_src_mac_addr(m_port_id); +} + + +std::string +TRexPortAttr::get_rx_filter_mode() const { + switch (m_rx_filter_mode) { + case RX_FILTER_MODE_ALL: + return "all"; + case RX_FILTER_MODE_HW: + return "hw"; + default: + assert(0); + } +} + + +void +TRexPortAttr::to_json(Json::Value &output) { + + output["src_mac"] = utl_macaddr_to_str(get_src_mac()); + output["promiscuous"]["enabled"] = get_promiscuous(); + output["link"]["up"] = is_link_up(); + output["speed"] = get_link_speed(); + output["rx_filter_mode"] = get_rx_filter_mode(); + + if (get_src_ipv4() != 0) { + output["src_ipv4"] = utl_uint32_to_ipv4(get_src_ipv4()); + } else { + output["src_ipv4"] = "none"; + } + + + int mode; + get_flow_ctrl(mode); + output["fc"]["mode"] = mode; + + m_dest.to_json(output["dest"]); + +} + +void +TRexPortAttr::update_src_dst_mac(uint8_t *raw_pkt) { + memcpy(raw_pkt, get_dest().get_dest_mac(), 6); + memcpy(raw_pkt + 6, get_src_mac(), 6); +} + diff --git a/src/trex_port_attr.h b/src/trex_port_attr.h index 9231e263..eb7c85de 100755 --- a/src/trex_port_attr.h +++ b/src/trex_port_attr.h @@ -21,10 +21,141 @@ limitations under the License. #include <vector> #include "rte_ethdev_includes.h" #include "trex_defs.h" +#include "common/basic_utils.h" +#include <json/json.h> +#include "trex_stateless_rx_defs.h" +#include <string.h> + +/** + * destination port attribute + * + */ +class DestAttr { +private: + static const uint8_t g_dummy_mac[6]; +public: + + DestAttr(uint8_t port_id); + + enum dest_type_e { + DEST_TYPE_IPV4 = 1, + DEST_TYPE_MAC = 2 + }; + + /** + * set dest as an IPv4 unresolved + */ + void set_dest_ipv4(uint32_t ipv4) { + assert(ipv4 != 0); + + m_src_ipv4 = ipv4; + memset(m_mac, 0, 6); + m_type = DEST_TYPE_IPV4; + } + + /** + * set dest as a resolved IPv4 + */ + void set_dest_ipv4(uint32_t ipv4, const uint8_t *mac) { + assert(ipv4 != 0); + + m_src_ipv4 = ipv4; + memcpy(m_mac, mac, 6); + m_type = DEST_TYPE_IPV4; + + } + + /** + * dest dest as MAC + * + */ + void set_dest_mac(const uint8_t *mac) { + + m_src_ipv4 = 0; + memcpy(m_mac, mac, 6); + m_type = DEST_TYPE_MAC; + } + + + bool is_resolved() const { + if (m_type == DEST_TYPE_MAC) { + return true; + } + + for (int i = 0; i < 6; i++) { + if (m_mac[i] != 0) { + return true; + } + } + + /* all zeroes - non resolved */ + return false; + } + + /** + * get the dest mac + * if no MAC is configured and dest was not resolved + * will return a dummy + */ + const uint8_t *get_dest_mac() { + + if (is_resolved()) { + return m_mac; + } else { + return g_dummy_mac; + } + } + + /** + * when link gets down - this should be called + * + */ + void on_link_down() { + if (m_type == DEST_TYPE_IPV4) { + /* reset the IPv4 dest with no resolution */ + set_dest_ipv4(m_src_ipv4); + } + } + + void to_json(Json::Value &output) { + switch (m_type) { + + case DEST_TYPE_IPV4: + output["type"] = "ipv4"; + output["addr"] = utl_uint32_to_ipv4(m_src_ipv4); + if (is_resolved()) { + output["arp"] = utl_macaddr_to_str(m_mac); + } else { + output["arp"] = "none"; + } + break; + + case DEST_TYPE_MAC: + output["type"] = "mac"; + output["addr"] = utl_macaddr_to_str(m_mac); + break; + + default: + assert(0); + } + + } + +private: + uint32_t m_src_ipv4; + uint8_t *m_mac; + dest_type_e m_type; + uint8_t m_port_id; +}; class TRexPortAttr { public: + + TRexPortAttr(uint8_t port_id) : m_dest(port_id) { + m_src_ipv4 = 0; + } + virtual ~TRexPortAttr(){} /* UPDATES */ @@ -33,10 +164,10 @@ public: virtual void update_device_info() = 0; virtual void reset_xstats() = 0; virtual void update_description() = 0; - + /* GETTERS */ virtual bool get_promiscuous() = 0; - virtual void macaddr_get(struct ether_addr *mac_addr) = 0; + virtual void get_hw_src_mac(struct ether_addr *mac_addr) = 0; virtual uint32_t get_link_speed() { return m_link.link_speed; } // L1 Mbps virtual bool is_link_duplex() { return (m_link.link_duplex ? true : false); } virtual bool is_link_autoneg() { return (m_link.link_autoneg ? true : false); } @@ -51,24 +182,50 @@ public: virtual void get_description(std::string &description) { description = intf_info_st.description; } virtual void get_supported_speeds(supp_speeds_t &supp_speeds) = 0; + uint32_t get_src_ipv4() {return m_src_ipv4;} + DestAttr & get_dest() {return m_dest;} + + const uint8_t *get_src_mac() const; + std::string get_rx_filter_mode() const; + + /* for a raw packet, write the src/dst MACs */ + void update_src_dst_mac(uint8_t *raw_pkt); + /* SETTERS */ virtual int set_promiscuous(bool enabled) = 0; virtual int add_mac(char * mac) = 0; virtual int set_link_up(bool up) = 0; virtual int set_flow_ctrl(int mode) = 0; virtual int set_led(bool on) = 0; - -/* DUMPS */ + virtual int set_rx_filter_mode(rx_filter_mode_e mode) = 0; + + void set_src_ipv4(uint32_t addr) { + m_src_ipv4 = addr; + } + + /* DUMPS */ virtual void dump_link(FILE *fd) = 0; + /* dump object status to JSON */ + void to_json(Json::Value &output); + + protected: - uint8_t m_port_id; - rte_eth_link m_link; - struct rte_eth_dev_info dev_info; - bool flag_is_virtual; - bool flag_is_fc_change_supported; - bool flag_is_led_change_supported; - bool flag_is_link_change_supported; + + uint8_t m_port_id; + rte_eth_link m_link; + uint32_t m_src_ipv4; + DestAttr m_dest; + + struct rte_eth_dev_info dev_info; + + rx_filter_mode_e m_rx_filter_mode; + + bool flag_is_virtual; + bool flag_is_fc_change_supported; + bool flag_is_led_change_supported; + bool flag_is_link_change_supported; + struct intf_info_st { std::string pci_addr; @@ -81,8 +238,11 @@ protected: class DpdkTRexPortAttr : public TRexPortAttr { public: - DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) { + DpdkTRexPortAttr(uint8_t port_id, bool is_virtual, bool fc_change_allowed) : TRexPortAttr(port_id) { + m_port_id = port_id; + m_rx_filter_mode = RX_FILTER_MODE_HW; + flag_is_virtual = is_virtual; int tmp; flag_is_fc_change_supported = fc_change_allowed && (get_flow_ctrl(tmp) != -ENOTSUP); @@ -101,7 +261,7 @@ public: /* GETTERS */ virtual bool get_promiscuous(); - virtual void macaddr_get(struct ether_addr *mac_addr); + virtual void get_hw_src_mac(struct ether_addr *mac_addr); virtual int get_xstats_values(xstats_values_t &xstats_values); virtual int get_xstats_names(xstats_names_t &xstats_names); virtual int get_flow_ctrl(int &mode); @@ -114,6 +274,7 @@ public: virtual int set_flow_ctrl(int mode); virtual int set_led(bool on); + virtual int set_rx_filter_mode(rx_filter_mode_e mode); /* DUMPS */ virtual void dump_link(FILE *fd); @@ -128,7 +289,7 @@ private: class SimTRexPortAttr : public TRexPortAttr { public: - SimTRexPortAttr() { + SimTRexPortAttr() : TRexPortAttr(0) { m_link.link_speed = 10000; m_link.link_duplex = 1; m_link.link_autoneg = 0; @@ -146,7 +307,7 @@ public: void reset_xstats() {} void update_description() {} bool get_promiscuous() { return false; } - void macaddr_get(struct ether_addr *mac_addr) {} + void get_hw_src_mac(struct ether_addr *mac_addr) {} int get_xstats_values(xstats_values_t &xstats_values) { return -ENOTSUP; } int get_xstats_names(xstats_names_t &xstats_names) { return -ENOTSUP; } int get_flow_ctrl(int &mode) { return -ENOTSUP; } @@ -158,6 +319,7 @@ public: int set_flow_ctrl(int mode) { return -ENOTSUP; } int set_led(bool on) { return -ENOTSUP; } void dump_link(FILE *fd) {} + int set_rx_filter_mode(rx_filter_mode_e mode) { return -ENOTSUP; } }; |