diff options
author | Luca Muscariello <lumuscar@cisco.com> | 2022-03-30 22:29:28 +0200 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2022-03-31 19:51:47 +0200 |
commit | c46e5df56b67bb8ea7a068d39324c640084ead2b (patch) | |
tree | eddeb17785938e09bc42eec98ee09b8a28846de6 /apps/hiperf | |
parent | 18fa668f25d3cc5463417ce7df6637e31578e898 (diff) |
feat: boostrap hicn 22.02
The current patch provides several new features, improvements,
bug fixes and also complete rewrite of entire components.
- lib
The hicn packet parser has been improved with a new packet
format fully based on UDP. The TCP header is still temporarily
supported but the UDP header will replace completely the new hicn
packet format. Improvements have been made to make sure every
packet parsing operation is made via this library. The current
new header can be used as header between the payload and the
UDP header or as trailer in the UDP surplus area to be tested
when UDP options will start to be used.
- hicn-light
The portable packet forwarder has been completely rewritten from
scratch with the twofold objective to improve performance and
code size but also to drop dependencies such as libparc which is
now removed by the current implementation.
- hicn control
the control library is the agent that is used to program the
packet forwarders via their binary API. This component has
benefited from significant improvements in terms of interaction
model which is now event driven and more robust to failures.
- VPP plugin has been updated to support VPP 22.02
- transport
Major improvement have been made to the RTC protocol, to the
support of IO modules and to the security sub system. Signed
manifests are the default data authenticity and integrity framework.
Confidentiality can be enabled by sharing the encryption key to the
prod/cons layer. The library has been tested with group key based
applications such as broadcast/multicast and real-time on-line
meetings with trusted server keys or MLS.
- testing
Unit testing has been introduced using GoogleTest. One third of
the code base is covered by unit testing with priority on
critical features. Functional testing has also been introduce
using Docker, linux bridging and Robot Framework to define
test with Less Code techniques to facilitate the extension
of the coverage.
Co-authored-by: Mauro Sardara <msardara@cisco.com>
Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Angelo Mantellini <manangel@cisco.com>
Co-authored-by: Jacques Samain <jsamain@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Enrico Loparco <eloparco@cisco.com>
Co-authored-by: Giulio Grassi <gigrassi@cisco.com>
Change-Id: I75d0ef70f86d921e3ef503c99271216ff583c215
Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'apps/hiperf')
-rw-r--r-- | apps/hiperf/CMakeLists.txt | 41 | ||||
-rw-r--r-- | apps/hiperf/src/client.cc | 524 | ||||
-rw-r--r-- | apps/hiperf/src/client.h | 6 | ||||
-rw-r--r-- | apps/hiperf/src/common.h | 80 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_config.h | 2 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.cc | 122 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.h | 18 | ||||
-rw-r--r-- | apps/hiperf/src/main.cc | 145 | ||||
-rw-r--r-- | apps/hiperf/src/server.cc | 209 | ||||
-rw-r--r-- | apps/hiperf/src/server.h | 3 |
10 files changed, 804 insertions, 346 deletions
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt index 564525e67..6986c90aa 100644 --- a/apps/hiperf/CMakeLists.txt +++ b/apps/hiperf/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -12,6 +12,9 @@ # limitations under the License. if (NOT DISABLE_EXECUTABLES) +############################################################## +# Source files +############################################################## list(APPEND HIPERF_SRC ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc @@ -19,26 +22,40 @@ if (NOT DISABLE_EXECUTABLES) ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc ) + +############################################################## +# Libraries +############################################################## list (APPEND HIPERF_LIBRARIES - ${LIBTRANSPORT_LIBRARIES} - ${LIBHICNCTRL_LIBRARIES} - ${LIBHICN_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} - ${LIBCONFIG_CPP_LIBRARIES} - ${WSOCK32_LIBRARY} - ${WS2_32_LIBRARY} + PRIVATE ${LIBTRANSPORT_LIBRARIES} + PRIVATE ${LIBHICNCTRL_LIBRARIES} + PRIVATE ${LIBHICN_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + PRIVATE ${LIBCONFIG_CPP_LIBRARIES} + PRIVATE ${WSOCK32_LIBRARY} + PRIVATE ${WS2_32_LIBRARY} + ) + +############################################################## +# Compiler options +############################################################## + set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} ) + +############################################################## +# Build hiperf +############################################################## build_executable(hiperf SOURCES ${HIPERF_SRC} LINK_LIBRARIES ${HIPERF_LIBRARIES} INCLUDE_DIRS - ${CMAKE_CURRENT_SOURCE_DIR}/src - ${LIBTRANSPORT_INCLUDE_DIRS} - ${LIBHICNCTRL_INCLUDE_DIRS} - ${LIBCONFIG_CPP_INCLUDE_DIRS} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS} DEPENDS ${DEPENDENCIES} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) endif()
\ No newline at end of file diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc index 820ebf0ce..319fa82ab 100644 --- a/apps/hiperf/src/client.cc +++ b/apps/hiperf/src/client.cc @@ -31,17 +31,12 @@ class Callback; * Hiperf client class: configure and setup an hicn consumer following the * ClientConfiguration. */ -class HIperfClient::Impl -#ifdef FORWARDER_INTERFACE - : ForwarderInterface::ICallback -#endif -{ - typedef std::chrono::time_point<std::chrono::steady_clock> Time; - typedef std::chrono::microseconds TimeDuration; - +class HIperfClient::Impl : ForwarderInterface::ICallback { friend class Callback; friend class RTCCallback; + static const constexpr uint16_t log2_header_counter = 4; + struct nack_packet_t { uint64_t timestamp; uint32_t prod_rate; @@ -76,21 +71,29 @@ class HIperfClient::Impl delay_sample_(0), received_bytes_(0), received_data_pkt_(0), + auth_alerts_(0), data_delays_(""), signals_(io_service_), rtc_callback_(*this), callback_(*this), socket_(io_service_), - done_(false), - switch_threshold_(~0) -#ifdef FORWARDER_INTERFACE - , - forwarder_interface_(io_service_, this) -#endif - { + // switch_threshold_(~0), + fwd_connected_(false), + use_bestpath_(false), + rtt_threshold_(~0), + loss_threshold_(~0), + prefix_name_(""), + prefix_len_(0), + // done_(false), + header_counter_mask_((1 << log2_header_counter) - 1), + header_counter_(0), + print_headers_(configuration_.print_headers_), + first_(true), + forwarder_interface_(io_service_) { + setForwarderConnection(conf.forwarder_type_); } - ~Impl() {} + virtual ~Impl() {} void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) {} @@ -104,174 +107,187 @@ class HIperfClient::Impl void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { const char separator = ' '; - const int width = 15; + const int width = 18; - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2); - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; + std::stringstream interval_ms; + interval_ms << total_duration_milliseconds_ << "-" + << total_duration_milliseconds_ + exact_duration.count(); std::stringstream bytes_transferred; bytes_transferred << std::fixed << std::setprecision(3) << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MB]"; + << std::setfill(separator); std::stringstream bandwidth; bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + << std::setfill(separator); std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; + window << stats.getAverageWindowSize() << std::setfill(separator); std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + avg_rtt << stats.getAverageRtt() << std::setfill(separator); if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - std::stringstream lost_data; lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream bytes_recovered_data; bytes_recovered_data << stats.getBytesRecoveredData() - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream definitely_lost_data; definitely_lost_data << stats.getDefinitelyLostData() - old_definitely_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; + data_delay << std::fixed << std::setprecision(3) << avg_data_delay_ + << std::setfill(separator); std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; + received_data_pkt << received_data_pkt_ << std::setfill(separator); std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + goodput << std::fixed << std::setprecision(3) + << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 + << std::setfill(separator); std::stringstream loss_rate; loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; + << stats.getLossRatio() * 100.0 << std::setfill(separator); std::stringstream retx_sent; retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream interest_sent; interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream nacks; nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream fec_pkt; fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream queuing_delay; - queuing_delay << stats.getQueuingDelay() << std::setfill(separator) - << "[ms]"; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); -#ifdef FORWARDER_INTERFACE - if (!done_ && stats.getQueuingDelay() >= switch_threshold_ && - total_duration_milliseconds_ > 1000) { - std::cout << "Switching due to queuing delay" << std::endl; - forwarder_interface_.createFaceAndRoutes(backup_routes_); - forwarder_interface_.deleteFaceAndRoutes(main_routes_); - std::swap(backup_routes_, main_routes_); - done_ = true; + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << auth_alerts_ << std::setfill(separator); + + if (fwd_connected_ && use_bestpath_ && + ((stats.getAverageRtt() > rtt_threshold_) || + ((stats.getResidualLossRate() * 100) > loss_threshold_))) { + forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath"); } -#endif - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "QueuingDelay"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "DefinitelyLost"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay"; - std::cout << std::left << std::setw(width) << "FecPkt" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << queuing_delay.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << definitely_lost_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::left << std::setw(width) << fec_pkt.str(); + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "RecvData[pkt]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Goodput[Mbps]"; + std::cout << std::right << std::setw(width) << "LossRate[%]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "InterestSent"; + std::cout << std::right << std::setw(width) << "ReceivedNacks"; + std::cout << std::right << std::setw(width) << "SyncWnd[pkt]"; + std::cout << std::right << std::setw(width) << "MinRtt[ms]"; + std::cout << std::right << std::setw(width) << "QueuingDelay[ms]"; + std::cout << std::right << std::setw(width) << "LostData[pkt]"; + std::cout << std::right << std::setw(width) << "RecoveredData"; + std::cout << std::right << std::setw(width) << "DefinitelyLost"; + std::cout << std::right << std::setw(width) << "State"; + std::cout << std::right << std::setw(width) << "DataDelay[ms]"; + std::cout << std::right << std::setw(width) << "FecPkt"; + std::cout << std::right << std::setw(width) << "Congestion"; + std::cout << std::right << std::setw(width) << "ResidualLosses"; + std::cout << std::right << std::setw(width) << "QualityScore"; + std::cout << std::right << std::setw(width) << "Alerts"; + std::cout << std::right << std::setw(width) << "AuthAlerts" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << received_data_pkt.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << goodput.str(); + std::cout << std::right << std::setw(width) << loss_rate.str(); + std::cout << std::right << std::setw(width) << retx_sent.str(); + std::cout << std::right << std::setw(width) << interest_sent.str(); + std::cout << std::right << std::setw(width) << nacks.str(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str(); + std::cout << std::right << std::setw(width) << queuing_delay.str(); + std::cout << std::right << std::setw(width) << lost_data.str(); + std::cout << std::right << std::setw(width) << bytes_recovered_data.str(); + std::cout << std::right << std::setw(width) << definitely_lost_data.str(); + std::cout << std::right << std::setw(width) << stats.getCCStatus(); + std::cout << std::right << std::setw(width) << data_delay.str(); + std::cout << std::right << std::setw(width) << fec_pkt.str(); + std::cout << std::right << std::setw(width) << stats.isCongested(); + std::cout << std::right << std::setw(width) << residual_losses.str(); + std::cout << std::right << std::setw(width) << quality_score.str(); + std::cout << std::right << std::setw(width) << alerts.str(); + std::cout << std::right << std::setw(width) << auth_alerts.str(); std::cout << std::endl; if (configuration_.test_mode_) { if (data_delays_.size() > 0) data_delays_.pop_back(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]" - << std::endl; + auto now = utils::SteadyTime::nowMs(); + std::cout << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl; } // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); + // std::cout << std::right << std::setw(width) << interest_fec_tx.str(); + // std::cout << std::right << std::setw(width) << bytes_fec_recv.str(); } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "Transfer[MB]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "Cwnd[Int]"; + std::cout << std::right << std::setw(width) << "AvgRtt[ms]" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << bytes_transferred.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << stats.getRetxCount(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl; } + total_duration_milliseconds_ += (uint32_t)exact_duration.count(); old_bytes_value_ = stats.getBytesRecv(); old_lost_data_value_ = stats.getLostData(); @@ -289,9 +305,95 @@ class HIperfClient::Impl received_data_pkt_ = 0; data_delays_ = ""; - t_stats_ = utils::SteadyClock::now(); + t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & header_counter_mask_; + + if (--configuration_.nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + io_service_.stop(); + } } + bool setForwarderConnection(forwarder_type_t forwarder_type) { + using namespace libconfig; + Config cfg; + + const char *conf_file = getenv("FORWARDER_CONFIG"); + if (!conf_file) return false; + + if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG)) + return false; + + try { + cfg.readFile(conf_file); + } catch (const FileIOException &fioex) { + std::cerr << "I/O error while reading file." << std::endl; + return false; + } catch (const ParseException &pex) { + std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() + << " - " << pex.getError() << std::endl; + return false; + } + + Setting &config = cfg.getRoot(); + + /* conf file example + * + * use_bestpath = "ON | OFF" + * rtt_threshold = 200 //ms + * loss_threshold = 20 //% + * name = "b001::/16" + */ + + if (config.exists("use_bestpath")) { + std::string val; + config.lookupValue("use_bestpath", val); + if (val.compare("ON") == 0) use_bestpath_ = true; + } + + if (config.exists("rtt_threshold")) { + unsigned val; + config.lookupValue("rtt_threshold", val); + rtt_threshold_ = val; + } + + if (config.exists("loss_threshold")) { + unsigned val; + config.lookupValue("loss_threshold", val); + loss_threshold_ = val; + } + + if (config.exists("name")) { + std::string route; + config.lookupValue("name", route); + + std::string delimiter = "/"; + size_t pos = 0; + + if ((pos = route.find(delimiter)) != std::string::npos) { + prefix_name_ = route.substr(0, pos); + route.erase(0, pos + delimiter.length()); + prefix_len_ = std::stoul(route.substr(0)); + } + } + + forwarder_interface_.initForwarderInterface(this, forwarder_type); + + return true; + } + + void onHicnServiceReady() override { + std::cout << "Successfully connected to local forwarder!" << std::endl; + fwd_connected_ = true; + } + + void onRouteConfigured( + std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { + std::cout << "Routes successfully configured!" << std::endl; + } + +#ifdef FORWARDER_INTERFACE bool parseConfig(const char *conf_file) { using namespace libconfig; Config cfg; @@ -439,7 +541,6 @@ class HIperfClient::Impl return true; } -#ifdef FORWARDER_INTERFACE void onHicnServiceReady() override { std::cout << "Successfully connected to local forwarder!" << std::endl; @@ -541,6 +642,13 @@ class HIperfClient::Impl } #endif + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix suffix, + transport::auth::VerificationPolicy policy) { + auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + int setup() { int ret; @@ -557,6 +665,7 @@ class HIperfClient::Impl producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); producer_socket_->registerPrefix(configuration_.relay_name_); producer_socket_->connect(); + producer_socket_->start(); } if (configuration_.output_stream_mode_ && configuration_.rtc_) { @@ -586,6 +695,17 @@ class HIperfClient::Impl GeneralTransportOptions::INTEREST_LIFETIME, configuration_.interest_lifetime_); + consumer_socket_->setSocketOption( + GeneralTransportOptions::MAX_UNVERIFIED_TIME, + configuration_.unverified_delay_); + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + #if defined(DEBUG) && defined(__linux__) std::shared_ptr<transport::BasePortal> portal; consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); @@ -623,20 +743,24 @@ class HIperfClient::Impl } } + std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>(); + if (!configuration_.producer_certificate.empty()) { - std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>( + verifier = std::make_shared<AsymmetricVerifier>( configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; } if (!configuration_.passphrase.empty()) { - std::shared_ptr<Verifier> verifier = - std::make_shared<SymmetricVerifier>(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; + verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase); + } + + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); + + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } ret = consumer_socket_->setSocketOption( @@ -662,6 +786,65 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + if (configuration_.recovery_strategy_ == 1) { // unreliable + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF); + } else if (configuration_.recovery_strategy_ == 2) { // rtx only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } else if (configuration_.recovery_strategy_ == 3) { // fec only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY); + } else if (configuration_.recovery_strategy_ == 4) { // delay based + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED); + } else if (configuration_.recovery_strategy_ == 5) { // low rate flow + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE); + } else if (configuration_.recovery_strategy_ == + 6) { // low rate + bestpath + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH); + } else if (configuration_.recovery_strategy_ == + 7) { // low rate + replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION); + } else if (configuration_.recovery_strategy_ == + 8) { // low rate + bestpath or replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES); + } else { + // default + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, (ConsumerContentObjectCallback)std::bind( @@ -673,6 +856,15 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { std::shared_ptr<TransportStatistics> transport_stats; consumer_socket_->getSocketOption( OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); @@ -708,10 +900,10 @@ class HIperfClient::Impl signals_.async_wait( [this](const std::error_code &, const int &) { io_service_.stop(); }); - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); + t_download_ = t_stats_ = utils::SteadyTime::now(); + consumer_socket_->consume(configuration_.name); + io_service_.run(); consumer_socket_->stop(); return ERROR_SUCCESS; @@ -719,11 +911,15 @@ class HIperfClient::Impl private: class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; + static constexpr std::size_t mtu = HIPERF_MTU; public: RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + Packet::Format format = + PayloadSize::getFormatFromName(client_.configuration_.name, false); + payload_size_max_ = + PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE); } bool isBufferMovable() noexcept override { return false; } @@ -743,9 +939,7 @@ class HIperfClient::Impl uint64_t *senderTimeStamp = (uint64_t *)client_.configuration_.receive_buffer->writableData(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + auto now = utils::SystemTime::nowMs().count(); double new_delay = (double)(now - *senderTimeStamp); if (*senderTimeStamp > now) @@ -765,7 +959,7 @@ class HIperfClient::Impl client_.producer_socket_->produceDatagram( client_.configuration_.relay_name_.getName(), client_.configuration_.receive_buffer->writableData(), - length < 1400 ? length : 1400); + length < payload_size_max_ ? length : payload_size_max_); } if (client_.configuration_.output_stream_mode_) { uint8_t *start = @@ -778,7 +972,7 @@ class HIperfClient::Impl size_t maxBufferSize() const override { return mtu; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error while reading from RTC socket" << std::endl; client_.io_service_.stop(); } @@ -789,6 +983,7 @@ class HIperfClient::Impl private: Impl &client_; + std::size_t payload_size_max_; }; class Callback : public ConsumerSocket::ReadCallback { @@ -816,16 +1011,15 @@ class HIperfClient::Impl return client_.configuration_.receive_buffer_size_; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error " << ec.message() << " while reading from socket" << std::endl; client_.io_service_.stop(); } void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); + auto t2 = utils::SteadyTime::now(); + auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2); long usec = (long)dt.count(); std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" @@ -843,8 +1037,8 @@ class HIperfClient::Impl }; hiperf::ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; + utils::SteadyTime::TimePoint t_stats_; + utils::SteadyTime::TimePoint t_download_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; uint64_t old_interest_tx_value_; @@ -865,6 +1059,7 @@ class HIperfClient::Impl uint32_t received_bytes_; uint32_t received_data_pkt_; + uint32_t auth_alerts_; std::string data_delays_; @@ -878,19 +1073,44 @@ class HIperfClient::Impl asio::ip::udp::endpoint remote_; ForwarderConfiguration config_; - uint16_t switch_threshold_; /* ms */ - bool done_; + // uint16_t switch_threshold_; /* ms */ + bool fwd_connected_; + bool use_bestpath_; + uint32_t rtt_threshold_; /* ms */ + uint32_t loss_threshold_; /* ms */ + std::string prefix_name_; // bestpath route + uint32_t prefix_len_; + // bool done_; + std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; -#ifdef FORWARDER_INTERFACE + uint16_t header_counter_mask_; + uint16_t header_counter_; + + bool print_headers_; + bool first_; + ForwarderInterface forwarder_interface_; -#endif }; HIperfClient::HIperfClient(const ClientConfiguration &conf) { impl_ = new Impl(conf); } +HIperfClient::HIperfClient(HIperfClient &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfClient &HIperfClient::operator=(HIperfClient &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfClient::~HIperfClient() { delete impl_; } int HIperfClient::setup() { return impl_->setup(); } diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h index f45b9af43..bc80c874c 100644 --- a/apps/hiperf/src/client.h +++ b/apps/hiperf/src/client.h @@ -16,12 +16,16 @@ #pragma once #include <common.h> +#include <hicn/transport/utils/noncopyable.h> namespace hiperf { -class HIperfClient { +class HIperfClient : ::utils::NonCopyable { public: HIperfClient(const ClientConfiguration &conf); + HIperfClient(HIperfClient &&other); + HIperfClient &operator=(HIperfClient &&other); + ~HIperfClient(); int setup(); void run(); diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h index e6ba526f9..13c9dcc1d 100644 --- a/apps/hiperf/src/common.h +++ b/apps/hiperf/src/common.h @@ -15,7 +15,7 @@ #pragma once -#include <hicn/transport/auth/identity.h> +#include <forwarder_interface.h> #include <hicn/transport/auth/signer.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> @@ -45,6 +45,9 @@ #endif #define ERROR_SETUP -5 #define MIN_PROBE_SEQ 0xefffffff +#define RTC_HEADER_SIZE 12 +#define FEC_HEADER_MAX_SIZE 36 +#define HIPERF_MTU 1500 using namespace transport::interface; using namespace transport::auth; @@ -73,11 +76,51 @@ static inline uint64_t _htonll(const uint64_t *input) { namespace hiperf { /** + * Class to retrieve the maximum payload size given the MTU and packet headers. + */ +class PayloadSize { + public: + PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU) + : mtu_(mtu), format_(format) {} + + std::size_t getPayloadSizeMax(std::size_t transport_size = 0, + std::size_t fec_size = 0, + std::size_t signature_size = 0) { + return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) - + transport_size - fec_size; + } + + static Packet::Format getFormatFromName(Name name, bool ah = false) { + switch (name.getAddressFamily()) { + case AF_INET: + return ah ? HF_INET_TCP_AH : HF_INET_TCP; + case AF_INET6: + return ah ? HF_INET6_TCP_AH : HF_INET6_TCP; + default: + return HF_UNSPEC; + } + } + + private: + std::size_t mtu_; + Packet::Format format_; +}; + +/** * Class for handling the production rate for the RTC producer. */ class Rate { public: Rate() : rate_kbps_(0) {} + ~Rate() {} + + Rate &operator=(const Rate &other) { + if (this != &other) { + rate_kbps_ = other.rate_kbps_; + } + + return *this; + } Rate(const std::string &rate) { std::size_t found = rate.find("kbps"); @@ -137,9 +180,16 @@ struct ClientConfiguration { secure_(false), producer_prefix_(), interest_lifetime_(500), + unverified_delay_(2000), relay_name_("c001::abcd/64"), output_stream_mode_(false), - port_(0) {} + port_(0), + recovery_strategy_(4), + aggregated_data_(false), + fec_type_(""), + packet_format_(default_values::packet_format), + print_headers_(true), + nb_iterations_(std::numeric_limits<decltype(nb_iterations_)>::max()) {} Name name; double beta; @@ -158,9 +208,17 @@ struct ClientConfiguration { bool secure_; Prefix producer_prefix_; uint32_t interest_lifetime_; + uint32_t unverified_delay_; Prefix relay_name_; bool output_stream_mode_; uint16_t port_; + uint32_t recovery_strategy_; + bool aggregated_data_; + std::string fec_type_; + Packet::Format packet_format_; + bool print_headers_; + std::uint32_t nb_iterations_; + forwarder_type_t forwarder_type_; }; /** @@ -170,11 +228,11 @@ struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), virtual_producer(true), - manifest(false), + manifest(0), live_production(false), content_lifetime(600000000_U32), download_size(20 * 1024 * 1024), - hash_algorithm(CryptoHashType::SHA256), + hash_algorithm_(CryptoHashType::SHA256), keystore_name(""), passphrase(""), keystore_password("cisco"), @@ -185,18 +243,21 @@ struct ServerConfiguration { trace_index_(0), trace_file_(nullptr), production_rate_(std::string("2048kbps")), - payload_size_(1400), + payload_size_(1384), secure_(false), input_stream_mode_(false), - port_(0) {} + port_(0), + aggregated_data_(false), + fec_type_(""), + packet_format_(default_values::packet_format) {} Prefix name; bool virtual_producer; - bool manifest; + std::uint32_t manifest; bool live_production; std::uint32_t content_lifetime; std::uint32_t download_size; - CryptoHashType hash_algorithm; + CryptoHashType hash_algorithm_; std::string keystore_name; std::string passphrase; std::string keystore_password; @@ -212,6 +273,9 @@ struct ServerConfiguration { bool input_stream_mode_; uint16_t port_; std::vector<struct packet_t> trace_; + bool aggregated_data_; + std::string fec_type_; + Packet::Format packet_format_; }; } // namespace hiperf diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h index 655ac3b66..aaac14839 100644 --- a/apps/hiperf/src/forwarder_config.h +++ b/apps/hiperf/src/forwarder_config.h @@ -94,4 +94,4 @@ class ForwarderConfiguration { std::size_t n_threads_; }; -}
\ No newline at end of file +} // namespace hiperf
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc index 864208239..e87a5953d 100644 --- a/apps/hiperf/src/forwarder_interface.cc +++ b/apps/hiperf/src/forwarder_interface.cc @@ -25,6 +25,7 @@ extern "C" { #include <hicn/error.h> #include <hicn/util/ip_address.h> +#include <hicn/util/sstrncpy.h> } // XXX the main listener should be retrieve in this class at initialization, aka @@ -36,27 +37,14 @@ extern "C" { namespace hiperf { +ForwarderInterface::ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), timer_(io_service) {} + ForwarderInterface::ForwarderInterface(asio::io_service &io_service, - ICallback *callback) - : external_ioservice_(io_service), - forwarder_interface_callback_(callback), - work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), - sock_(nullptr), - thread_(std::make_unique<std::thread>([this]() { - std::cout << "Starting Forwarder Interface thread" << std::endl; - internal_ioservice_.run(); - std::cout << "Stopping Forwarder Interface thread" << std::endl; - })), - // set_route_callback_(std::forward<Callback &&>(setRouteCallback)), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - hicn_listen_port_(9695), - /* We start in disabled state even when a forwarder is always available */ - state_(State::Disabled), - timer_(io_service), - num_reattempts(0) { - std::cout << "Forwarder interface created... connecting to forwarder...\n"; - internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); + ICallback *callback, + forwarder_type_t fwd_type) + : external_ioservice_(io_service), timer_(io_service) { + initForwarderInterface(callback, fwd_type); } ForwarderInterface::~ForwarderInterface() { @@ -72,8 +60,27 @@ ForwarderInterface::~ForwarderInterface() { thread_->join(); } +} - std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl; +void ForwarderInterface::initForwarderInterface(ICallback *callback, + forwarder_type_t fwd_type) { + forwarder_interface_callback_ = callback; + work_ = std::make_unique<asio::io_service::work>(internal_ioservice_); + sock_ = nullptr; + thread_ = std::make_unique<std::thread>([this]() { + std::cout << "Starting Forwarder Interface thread" << std::endl; + internal_ioservice_.run(); + std::cout << "Stopping Forwarder Interface thread" << std::endl; + }); + check_routes_timer_ = nullptr; + pending_add_route_counter_ = 0; + hicn_listen_port_ = 9695; + /* We start in disabled state even when a forwarder is always available */ + state_ = State::Disabled; + fwd_type_ = fwd_type; + num_reattempts = 0; + std::cout << "Forwarder interface created... connecting to forwarder...\n"; + internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); } void ForwarderInterface::onHicnServiceAvailable(bool flag) { @@ -93,26 +100,15 @@ void ForwarderInterface::onHicnServiceAvailable(bool flag) { std::cout << "Connected to forwarder... cancelling reconnection timer" << std::endl; + timer_.cancel(); num_reattempts = 0; - // case State::Connected: - // checkListener(); - - // if (state_ != State::Ready) { - // std::cout << "Listener not found" << std::endl; - // goto REATTEMPT; - // } - // state_ = State::Ready; - - // timer_.cancel(); - // num_reattempts = 0; - std::cout << "Forwarder interface is ready... communicate to controller" << std::endl; forwarder_interface_callback_->onHicnServiceReady(); - + case State::Connected: case State::Ready: break; } @@ -136,14 +132,14 @@ REATTEMPT: std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS)); // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable, // this, flag, std::placeholders::_1)); - timer_.async_wait([this, flag](std::error_code ec) { + timer_.async_wait([this, flag](const std::error_code &ec) { if (ec) return; onHicnServiceAvailable(flag); }); } int ForwarderInterface::connectToForwarder() { - sock_ = hc_sock_create(); + sock_ = hc_sock_create_forwarder(fwd_type_); if (!sock_) { std::cout << "Could not create socket" << std::endl; goto ERR_SOCK; @@ -256,6 +252,27 @@ void ForwarderInterface::deleteFaceAndRoutes( }); } +void ForwarderInterface::setStrategy(std::string prefix, uint32_t prefix_len, + std::string strategy) { + if (!sock_) return; + + ip_address_t ip_prefix; + if (ip_address_pton(prefix.c_str(), &ip_prefix) < 0) { + return; + } + + strategy_type_t strategy_type = strategy_type_from_str(strategy.c_str()); + if (strategy_type == STRATEGY_TYPE_UNDEFINED) return; + + hc_strategy_t strategy_conf; + strategy_conf.address = ip_prefix; + strategy_conf.len = prefix_len; + strategy_conf.family = AF_INET6; + strategy_conf.type = strategy_type; + + hc_strategy_set(sock_, &strategy_conf); +} + void ForwarderInterface::internalDeleteFaceAndRoute( const RouteInfoPtr &route_info) { if (!sock_) return; @@ -346,10 +363,11 @@ void ForwarderInterface::internalCreateFaceAndRoutes( } max_try--; timer->expires_from_now(std::chrono::milliseconds(500)); - timer->async_wait([this, failed, max_try, timer](std::error_code ec) { - if (ec) return; - internalCreateFaceAndRoutes(failed, max_try, timer); - }); + timer->async_wait( + [this, failed, max_try, timer](const std::error_code &ec) { + if (ec) return; + internalCreateFaceAndRoutes(failed, max_try, timer); + }); return; } @@ -422,16 +440,18 @@ int ForwarderInterface::tryToCreateFace(RouteInfo *route_info, std::string name = "l_" + route_info->name; listener.local_addr = local_address; - listener.type = CONNECTION_TYPE_UDP; + listener.type = FACE_TYPE_UDP; listener.family = AF_INET; listener.local_port = route_info->local_port; - strncpy(listener.name, name.c_str(), sizeof(listener.name)); - strncpy(listener.interface_name, route_info->interface.c_str(), - sizeof(listener.interface_name)); + int ret = strcpy_s(listener.name, SYMBOLIC_NAME_LEN - 1, name.c_str()); + if (ret < EOK) goto ERR; + ret = strcpy_s(listener.interface_name, INTERFACE_LEN - 1, + route_info->interface.c_str()); + if (ret < EOK) goto ERR; std::cout << "------------> " << route_info->interface << std::endl; - int ret = hc_listener_create(sock_, &listener); + ret = hc_listener_create(sock_, &listener); if (ret < 0) { std::cerr << "Error creating listener." << std::endl; @@ -520,7 +540,7 @@ int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info, #if 0 // not used void ForwarderInterface::checkRoutesLoop() { check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000)); - check_routes_timer_->async_wait([this](std::error_code ec) { + check_routes_timer_->async_wait([this](const std::error_code &ec) { if (ec) return; if (pending_add_route_counter_ == 0) checkRoutes(); }); @@ -580,7 +600,7 @@ void ForwarderInterface::checkRoutes() { } else { // XXX This should be moved somewhere else getMainListener( - [this](std::error_code ec, uint32_t hicn_listen_port) { + [this](const std::error_code &ec, uint32_t hicn_listen_port) { if (!ec) { hicn_listen_port_ = hicn_listen_port; @@ -597,7 +617,7 @@ void ForwarderInterface::checkRoutes() { tryToConnectToForwarder(); } private: - void doGetMainListener(std::error_code ec) + void doGetMainListener(const std::error_code &ec) { if (!ec) { @@ -607,7 +627,7 @@ void ForwarderInterface::checkRoutes() { { // Since without the main listener of the forwarder the proxy cannot // work, we can stop the program here until we get the listener port. - std::cout << + std::cout << "Could not retrieve main listener port from the forwarder. " "Retrying."; @@ -635,7 +655,7 @@ void ForwarderInterface::checkRoutes() { doTryToConnectToForwarder(std::make_error_code(std::errc(0))); } - void doTryToConnectToForwarder(std::error_code ec) + void doTryToConnectToForwarder(const std::error_code &ec) { if (!ec) { @@ -673,4 +693,4 @@ void ForwarderInterface::checkRoutes() { constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS; constexpr uint32_t ForwarderInterface::MAX_REATTEMPT; -} // namespace hiperf
\ No newline at end of file +} // namespace hiperf diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h index 7591ea257..e58989295 100644 --- a/apps/hiperf/src/forwarder_interface.h +++ b/apps/hiperf/src/forwarder_interface.h @@ -15,6 +15,8 @@ #pragma once +#include <hicn/transport/utils/noncopyable.h> + extern "C" { #ifndef WITH_POLICY #define WITH_POLICY @@ -27,14 +29,13 @@ extern "C" { #define ASIO_STANDALONE #endif #include <asio.hpp> - #include <functional> #include <thread> #include <unordered_map> namespace hiperf { -class ForwarderInterface { +class ForwarderInterface : ::utils::NonCopyable { static const uint32_t REATTEMPT_DELAY_MS = 500; static const uint32_t MAX_REATTEMPT = 10; @@ -68,10 +69,14 @@ class ForwarderInterface { }; public: - ForwarderInterface(asio::io_service &io_service, ICallback *callback); + explicit ForwarderInterface(asio::io_service &io_service); + explicit ForwarderInterface(asio::io_service &io_service, ICallback *callback, + forwarder_type_t fwd_type); ~ForwarderInterface(); + void initForwarderInterface(ICallback *callback, forwarder_type_t fwd_type); + State getState(); void setState(State state); @@ -88,11 +93,16 @@ class ForwarderInterface { void deleteFaceAndRoute(const RouteInfoPtr &route_info); + void setStrategy(std::string prefix, uint32_t prefix_len, + std::string strategy); + void close(); uint16_t getHicnListenerPort() { return hicn_listen_port_; } private: + ForwarderInterface &operator=(const ForwarderInterface &other) = delete; + int connectToForwarder(); int checkListener(); @@ -123,6 +133,8 @@ class ForwarderInterface { State state_; + forwarder_type_t fwd_type_; + /* Reattempt timer */ asio::steady_timer timer_; unsigned num_reattempts; diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc index b2d99c4a4..b69392de8 100644 --- a/apps/hiperf/src/main.cc +++ b/apps/hiperf/src/main.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,13 +14,13 @@ */ #include <client.h> -#include <server.h> #include <forwarder_interface.h> +#include <server.h> namespace hiperf { void usage() { - std::cerr << "HIPERF - A tool for performing network throughput " + std::cerr << "HIPERF - Instrumentation tool for performing active network" "measurements with hICN" << std::endl; std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; @@ -34,19 +34,27 @@ void usage() { std::cerr << "-f\t<filename>\t\t\t" << "Log file" << std::endl; std::cerr << "-z\t<io_module>\t\t\t" - << "IO module to use. Default: hicnlight_module" << std::endl; + << "IO module to use. Default: hicnlightng_module" << std::endl; + std::cerr << "-F\t<conf_file>\t\t\t" + << "Path to optional configuration file for libtransport" + << std::endl; + std::cerr << "-a\t\t\t\t\t" + << "Enables data packet aggregation. " + << "Works only in RTC mode" << std::endl; + std::cerr << "-X\t<param>\t\t\t\t" + << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#" + << std::endl; #endif std::cerr << std::endl; std::cerr << "SERVER SPECIFIC:" << std::endl; std::cerr << "-A\t<content_size>\t\t\t" - "Size of the content to publish. This " - "is not the size of the packet (see -s for it)." - << std::endl; - std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." + "Sends an application data unit in bytes that is published once " + "before exit" << std::endl; + std::cerr << "-s\t<packet_size>\t\t\tData packet payload size." << std::endl; std::cerr << "-r\t\t\t\t\t" << "Produce real content of <content_size> bytes" << std::endl; - std::cerr << "-m\t\t\t\t\t" + std::cerr << "-m\t<manifest_capacity>\t\t" << "Produce transport manifest" << std::endl; std::cerr << "-l\t\t\t\t\t" << "Start producing content upon the reception of the " @@ -60,20 +68,17 @@ void usage() { << "String from which a 128-bit symmetric key will be " "derived for signing packets" << std::endl; + std::cerr << "-p\t<password>\t\t\t" + << "Password for p12 keystore" << std::endl; std::cerr << "-y\t<hash_algorithm>\t\t" << "Use the selected hash algorithm for " - "calculating manifest digests" + "computing manifest digests (default: SHA256)" << std::endl; - std::cerr << "-p\t<password>\t\t\t" - << "Password for p12 keystore" << std::endl; std::cerr << "-x\t\t\t\t\t" - << "Produce a content of <content_size>, then after downloading " - "it produce a new content of" - << "\n\t\t\t\t\t<content_size> without resetting " - "the suffix to 0." - << std::endl; + << "Produces application data units of size <content_size> " + << "without resetting the name suffix to 0." << std::endl; std::cerr << "-B\t<bitrate>\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." + << "RTC producer data bitrate, to be used with the -R option." << std::endl; #ifndef _WIN32 std::cerr << "-I\t\t\t\t\t" @@ -92,8 +97,8 @@ void usage() { "file containing the " "crypto material used for the TLS handshake" << std::endl; - std::cerr << "-G\t<port>\t\t\t" - << "input stream from localhost at the specified port" << std::endl; + std::cerr << "-G\t<port>\t\t\t\t" + << "Input stream from localhost at the specified port" << std::endl; #endif std::cerr << std::endl; std::cerr << "CLIENT SPECIFIC:" << std::endl; @@ -105,6 +110,8 @@ void usage() { << std::endl; std::cerr << "-L\t<interest lifetime>\t\t" << "Set interest lifetime." << std::endl; + std::cerr << "-u\t<delay>\t\t\t\t" + << "Set max lifetime of unverified packets." << std::endl; std::cerr << "-M\t<input_buffer_size>\t\t" << "Size of consumer input buffer. If 0, reassembly of packets " "will be disabled." @@ -127,17 +134,31 @@ void usage() { std::cerr << "-t\t\t\t\t\t" "Test mode, check if the client is receiving the " "correct data. This is an RTC specific option, to be " - "used with the -R (default false)" + "used with the -R (default: false)" << std::endl; std::cerr << "-P\t\t\t\t\t" << "Prefix of the producer where to do the handshake" << std::endl; std::cerr << "-j\t<relay_name>\t\t\t" - << "Publish the received content under the name relay_name." + << "Publish received content under the name relay_name." "This is an RTC specific option, to be " - "used with the -R (default false)" + "used with the -R (default: false)" + << std::endl; + std::cerr << "-g\t<port>\t\t\t\t" + << "Output stream to localhost at the specified port" << std::endl; + std::cerr << "-e\t<strategy>\t\t\t" + << "Enance the network with a realiability strategy. Options 1:" + << " unreliable, 2: rtx only, 3: fec only, " + << "4: delay based, 5: low rate, 6: low rate and best path " + << "7: low rate and replication, 8: low rate and best" + << " path/replication" + << "(default: 2 = rtx only) " << std::endl; + std::cerr << "-H\t\t\t\t\t" + << "Disable periodic print headers in stats report." << std::endl; + std::cerr << "-n\t<nb_iterations>\t\t\t" + << "Print the stats report <nb_iterations> times and exit.\n" + << "\t\t\t\t\tThis option limits the duration of the run to " + "<nb_iterations> * <stats_interval> milliseconds." << std::endl; - std::cerr << "-g\t<port>\t\t\t" - << "output stream to localhost at the specified port" << std::endl; } int main(int argc, char *argv[]) { @@ -156,7 +177,7 @@ int main(int argc, char *argv[]) { char *log_file = nullptr; transport::interface::global_config::IoModuleConfiguration config; std::string conf_file; - config.name = "hicnlight_module"; + config.name = "hicnlightng_module"; // Consumer ClientConfiguration client_configuration; @@ -166,10 +187,9 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt( - argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) != - -1) { + while ((opt = getopt(argc, argv, + "DSCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:" + "g:G:e:awHn:X:u:")) != -1) { switch (opt) { // Common case 'D': { @@ -202,9 +222,11 @@ int main(int argc, char *argv[]) { break; } #else - while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) != - -1) { + while ( + (opt = getopt( + argc, argv, + "SCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xB:E:P:tL:z:F:j:e:awHn:X:u:")) != + -1) { switch (opt) { #endif case 'f': { @@ -216,6 +238,21 @@ int main(int argc, char *argv[]) { server_configuration.rtc_ = true; break; } + case 'a': { + client_configuration.aggregated_data_ = true; + server_configuration.aggregated_data_ = true; + break; + } + case 'X': { + client_configuration.fec_type_ = std::string(optarg); + server_configuration.fec_type_ = std::string(optarg); + break; + } + case 'w': { + client_configuration.packet_format_ = Packet::Format::HF_INET6_UDP; + server_configuration.packet_format_ = Packet::Format::HF_INET6_UDP; + break; + } case 'z': { config.name = optarg; break; @@ -286,12 +323,27 @@ int main(int argc, char *argv[]) { options = 1; break; } + case 'u': { + client_configuration.unverified_delay_ = std::stoul(optarg); + options = 1; + break; + } case 'j': { client_configuration.relay_ = true; client_configuration.relay_name_ = Prefix(optarg); options = 1; break; } + case 'H': { + client_configuration.print_headers_ = false; + options = 1; + break; + } + case 'n': { + client_configuration.nb_iterations_ = std::stoul(optarg); + options = 1; + break; + } // Server specific case 'A': { server_configuration.download_size = std::stoul(optarg); @@ -309,7 +361,7 @@ int main(int argc, char *argv[]) { break; } case 'm': { - server_configuration.manifest = true; + server_configuration.manifest = std::stoul(optarg); options = -1; break; } @@ -324,18 +376,19 @@ int main(int argc, char *argv[]) { break; } case 'y': { + CryptoHashType hash_algorithm = CryptoHashType::SHA256; if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA256; + hash_algorithm = CryptoHashType::SHA256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA512; + hash_algorithm = CryptoHashType::SHA512; } else if (strncasecmp(optarg, "blake2b512", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512; + hash_algorithm = CryptoHashType::BLAKE2B512; } else if (strncasecmp(optarg, "blake2s256", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256; + hash_algorithm = CryptoHashType::BLAKE2S256; } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; + std::cerr << "Unknown hash algorithm. Using SHA 256." << std::endl; } + server_configuration.hash_algorithm_ = hash_algorithm; options = -1; break; } @@ -361,6 +414,11 @@ int main(int argc, char *argv[]) { server_configuration.secure_ = true; break; } + case 'e': { + client_configuration.recovery_strategy_ = std::stoul(optarg); + options = 1; + break; + } case 'h': default: usage(); @@ -430,6 +488,13 @@ int main(int argc, char *argv[]) { transport::interface::global_config::parseConfigurationFile(conf_file); if (role > 0) { + // set forwarder type + client_configuration.forwarder_type_ = UNDEFINED; + if (config.name.compare("hicnlightng_module") == 0) + client_configuration.forwarder_type_ = HICNLIGHT; + else if (config.name.compare("hicnlightng_module") == 0) + client_configuration.forwarder_type_ = HICNLIGHT_NG; + HIperfClient c(client_configuration); if (c.setup() != ERROR_SETUP) { c.run(); diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc index 968d42e2c..7101e7a4a 100644 --- a/apps/hiperf/src/server.cc +++ b/apps/hiperf/src/server.cc @@ -21,7 +21,7 @@ namespace hiperf { * Hiperf server class: configure and setup an hicn producer following the * ServerConfiguration. */ -class HIperfServer::Impl { +class HIperfServer::Impl : public ProducerSocket::Callback { const std::size_t log2_content_object_buffer_size = 8; public: @@ -35,11 +35,8 @@ class HIperfServer::Impl { mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), last_segment_(0), #ifndef _WIN32 - ptr_last_segment_(&last_segment_), input_(io_service_), rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), #endif flow_name_(configuration_.name.getName()), socket_(io_service_), @@ -54,14 +51,16 @@ class HIperfServer::Impl { #endif for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); + content_objects_[i] = ContentObject::Ptr(new ContentObject( + configuration_.name.getName(), configuration_.packet_format_, 0, + (const uint8_t *)buffer.data(), buffer.size())); content_objects_[i]->setLifetime( default_values::content_object_expiry_time); } } + virtual ~Impl() {} + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); @@ -91,16 +90,14 @@ class HIperfServer::Impl { if (suffix == 0) { last_segment_ = 0; - ptr_last_segment_ = &last_segment_; unsatisfied_interests_.clear(); } - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed + // The suffix will either come from the received interest or will be set to + // the smallest suffix of a previous interest not satisfied if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); + auto it = std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), last_segment_); if (it != unsatisfied_interests_.end()) { suffix = *it; } @@ -121,27 +118,28 @@ class HIperfServer::Impl { b->append(configuration_.download_size); uint32_t total; - utils::TimePoint t0 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now(); total = p.produceStream(content_name, std::move(b), !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now(); - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() - << " us)" << std::endl; + std::cout << "Written " << total + << " data packets in output buffer (Segmentation time: " + << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)" + << std::endl; } void produceContentAsync(ProducerSocket &p, Name content_name, uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); + produce_thread_.add([this, suffix, content_name]() { + auto b = utils::MemBuf::create(configuration_.download_size); + std::memset(b->writableData(), '?', configuration_.download_size); + b->append(configuration_.download_size); + + last_segment_ = suffix + producer_socket_->produceStream( + content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + }); } void cacheMiss(ProducerSocket &p, const Interest &interest) { @@ -156,27 +154,22 @@ class HIperfServer::Impl { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path, - std::string &keystore_pwd, - CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type); - } - return std::make_shared<Identity>(keystore_path, keystore_pwd, - CryptoSuite::RSA_SHA256, 1024, 365, - "producer-test"); + void produceError(const std::error_code &err) noexcept override { + std::cerr << "Error from producer transport: " << err.message() + << std::endl; + producer_socket_->stop(); + io_service_.stop(); } int setup() { int ret; int production_protocol; + std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>(); if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); producer_socket_ = std::make_unique<P2PSecureProducerSocket>( - configuration_.rtc_, identity); + configuration_.rtc_, configuration_.keystore_name, + configuration_.keystore_password); } else { if (!configuration_.rtc_) { production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; @@ -188,36 +181,79 @@ class HIperfServer::Impl { } if (producer_socket_->setSocketOption( + ProducerCallbacksOptions::PRODUCER_CALLBACK, this) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "Failed to set producer callback." << std::endl; + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::HASH_ALGORITHM, + configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption(PACKET_FORMAT, + configuration_.packet_format_) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + if (!configuration_.passphrase.empty()) { - std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>( - CryptoSuite::HMAC_SHA256, configuration_.passphrase); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256, + configuration_.passphrase); } if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - std::shared_ptr<Signer> signer = identity->getSigner(); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<AsymmetricSigner>( + configuration_.keystore_name, configuration_.keystore_password); + } + + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer); + + // Compute maximum payload size + Packet::Format format = PayloadSize::getFormatFromName( + configuration_.name.getName(), !configuration_.manifest); + payload_size_max_ = PayloadSize(format).getPayloadSizeMax( + configuration_.rtc_ ? RTC_HEADER_SIZE : 0, + configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE, + !configuration_.manifest ? signer->getSignatureFieldSize() : 0); + + if (configuration_.payload_size_ > payload_size_max_) { + std::cerr << "WARNING: Payload has size " << configuration_.payload_size_ + << ", maximum is " << payload_size_max_ + << ". Payload will be truncated to fit." << std::endl; + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); producer_socket_->registerPrefix(configuration_.name); producer_socket_->connect(); + producer_socket_->start(); if (configuration_.rtc_) { std::cout << "Running RTC producer: the prefix length will be ignored." @@ -239,6 +275,13 @@ class HIperfServer::Impl { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAX_SEGMENT_SIZE, + static_cast<uint32_t>(configuration_.payload_size_)) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + if (!configuration_.live_production) { produceContent(*producer_socket_, configuration_.name.getName(), 0); } else { @@ -283,7 +326,7 @@ class HIperfServer::Impl { void receiveStream() { socket_.async_receive_from( asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_, - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (ec) return; sendRTCContentFromStream(recv_buffer_.first, length); receiveStream(); @@ -296,9 +339,8 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); + uint8_t *start = (uint8_t *)payload->writableData(); std::memcpy(start, &now, sizeof(uint64_t)); std::memcpy(start + sizeof(uint64_t), buff, len); @@ -306,7 +348,7 @@ class HIperfServer::Impl { len + sizeof(uint64_t)); } - void sendRTCContentObjectCallback(std::error_code ec) { + void sendRTCContentObjectCallback(const std::error_code &ec) { if (ec) return; rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( @@ -319,18 +361,17 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); + producer_socket_->produceDatagram(flow_name_, payload->data(), + payload->length() < payload_size_max_ + ? payload->length() + : payload_size_max_); } - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) { if (ec) return; auto payload = @@ -342,14 +383,11 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // used only for performance evaluation // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); if (packet_len > payload->length()) packet_len = payload->length(); - if (packet_len > 1400) packet_len = 1400; + if (packet_len > payload_size_max_) packet_len = payload_size_max_; producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); @@ -450,13 +488,13 @@ class HIperfServer::Impl { std::placeholders::_1)); } else if (configuration_.input_stream_mode_) { rtc_running_ = true; - // crate socket + // create socket remote_ = asio::ip::udp::endpoint( asio::ip::address::from_string("127.0.0.1"), configuration_.port_); socket_.open(asio::ip::udp::v4()); socket_.bind(remote_); - recv_buffer_.first = (uint8_t *)malloc(1500); - recv_buffer_.second = 1500; + recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU); + recv_buffer_.second = HIPERF_MTU; receiveStream(); } else { rtc_running_ = true; @@ -490,8 +528,9 @@ class HIperfServer::Impl { std::uint16_t content_objects_index_; std::uint16_t mask_; std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; std::unique_ptr<ProducerSocket> producer_socket_; + ::utils::EventThread produce_thread_; + std::size_t payload_size_max_; #ifndef _WIN32 asio::posix::stream_descriptor input_; asio::streambuf input_buffer_; @@ -507,6 +546,20 @@ HIperfServer::HIperfServer(const ServerConfiguration &conf) { impl_ = new Impl(conf); } +HIperfServer::HIperfServer(HIperfServer &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfServer &HIperfServer::operator=(HIperfServer &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfServer::~HIperfServer() { delete impl_; } int HIperfServer::setup() { return impl_->setup(); } diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h index 05407a807..73ac72123 100644 --- a/apps/hiperf/src/server.h +++ b/apps/hiperf/src/server.h @@ -22,6 +22,9 @@ namespace hiperf { class HIperfServer { public: HIperfServer(const ServerConfiguration &conf); + HIperfServer(HIperfServer &&other); + HIperfServer &operator=(HIperfServer &&other); + ~HIperfServer(); int setup(); void run(); |