aboutsummaryrefslogtreecommitdiffstats
path: root/apps/hiperf
diff options
context:
space:
mode:
Diffstat (limited to 'apps/hiperf')
-rw-r--r--apps/hiperf/CMakeLists.txt41
-rw-r--r--apps/hiperf/src/client.cc524
-rw-r--r--apps/hiperf/src/client.h6
-rw-r--r--apps/hiperf/src/common.h80
-rw-r--r--apps/hiperf/src/forwarder_config.h2
-rw-r--r--apps/hiperf/src/forwarder_interface.cc122
-rw-r--r--apps/hiperf/src/forwarder_interface.h18
-rw-r--r--apps/hiperf/src/main.cc145
-rw-r--r--apps/hiperf/src/server.cc209
-rw-r--r--apps/hiperf/src/server.h3
10 files changed, 804 insertions, 346 deletions
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt
index 564525e67..6986c90aa 100644
--- a/apps/hiperf/CMakeLists.txt
+++ b/apps/hiperf/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -12,6 +12,9 @@
# limitations under the License.
if (NOT DISABLE_EXECUTABLES)
+##############################################################
+# Source files
+##############################################################
list(APPEND HIPERF_SRC
${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc
@@ -19,26 +22,40 @@ if (NOT DISABLE_EXECUTABLES)
${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc
)
+
+##############################################################
+# Libraries
+##############################################################
list (APPEND HIPERF_LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${LIBHICNCTRL_LIBRARIES}
- ${LIBHICN_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
- ${LIBCONFIG_CPP_LIBRARIES}
- ${WSOCK32_LIBRARY}
- ${WS2_32_LIBRARY}
+ PRIVATE ${LIBTRANSPORT_LIBRARIES}
+ PRIVATE ${LIBHICNCTRL_LIBRARIES}
+ PRIVATE ${LIBHICN_LIBRARIES}
+ PRIVATE ${CMAKE_THREAD_LIBS_INIT}
+ PRIVATE ${LIBCONFIG_CPP_LIBRARIES}
+ PRIVATE ${WSOCK32_LIBRARY}
+ PRIVATE ${WS2_32_LIBRARY}
+ )
+
+##############################################################
+# Compiler options
+##############################################################
+ set(COMPILER_OPTIONS
+ ${DEFAULT_COMPILER_OPTIONS}
)
+
+##############################################################
+# Build hiperf
+##############################################################
build_executable(hiperf
SOURCES ${HIPERF_SRC}
LINK_LIBRARIES ${HIPERF_LIBRARIES}
INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${LIBTRANSPORT_INCLUDE_DIRS}
- ${LIBHICNCTRL_INCLUDE_DIRS}
- ${LIBCONFIG_CPP_INCLUDE_DIRS}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
+ PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS}
DEPENDS ${DEPENDENCIES}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif() \ No newline at end of file
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index 820ebf0ce..319fa82ab 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -31,17 +31,12 @@ class Callback;
* Hiperf client class: configure and setup an hicn consumer following the
* ClientConfiguration.
*/
-class HIperfClient::Impl
-#ifdef FORWARDER_INTERFACE
- : ForwarderInterface::ICallback
-#endif
-{
- typedef std::chrono::time_point<std::chrono::steady_clock> Time;
- typedef std::chrono::microseconds TimeDuration;
-
+class HIperfClient::Impl : ForwarderInterface::ICallback {
friend class Callback;
friend class RTCCallback;
+ static const constexpr uint16_t log2_header_counter = 4;
+
struct nack_packet_t {
uint64_t timestamp;
uint32_t prod_rate;
@@ -76,21 +71,29 @@ class HIperfClient::Impl
delay_sample_(0),
received_bytes_(0),
received_data_pkt_(0),
+ auth_alerts_(0),
data_delays_(""),
signals_(io_service_),
rtc_callback_(*this),
callback_(*this),
socket_(io_service_),
- done_(false),
- switch_threshold_(~0)
-#ifdef FORWARDER_INTERFACE
- ,
- forwarder_interface_(io_service_, this)
-#endif
- {
+ // switch_threshold_(~0),
+ fwd_connected_(false),
+ use_bestpath_(false),
+ rtt_threshold_(~0),
+ loss_threshold_(~0),
+ prefix_name_(""),
+ prefix_len_(0),
+ // done_(false),
+ header_counter_mask_((1 << log2_header_counter) - 1),
+ header_counter_(0),
+ print_headers_(configuration_.print_headers_),
+ first_(true),
+ forwarder_interface_(io_service_) {
+ setForwarderConnection(conf.forwarder_type_);
}
- ~Impl() {}
+ virtual ~Impl() {}
void checkReceivedRtcContent(ConsumerSocket &c,
const ContentObject &contentObject) {}
@@ -104,174 +107,187 @@ class HIperfClient::Impl
void handleTimerExpiration(ConsumerSocket &c,
const TransportStatistics &stats) {
const char separator = ' ';
- const int width = 15;
+ const int width = 18;
- utils::TimePoint t2 = utils::SteadyClock::now();
- auto exact_duration =
- std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+ utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
+ auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2);
- std::stringstream interval;
- interval << total_duration_milliseconds_ / 1000 << "-"
- << total_duration_milliseconds_ / 1000 +
- exact_duration.count() / 1000;
+ std::stringstream interval_ms;
+ interval_ms << total_duration_milliseconds_ << "-"
+ << total_duration_milliseconds_ + exact_duration.count();
std::stringstream bytes_transferred;
bytes_transferred << std::fixed << std::setprecision(3)
<< (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
- << std::setfill(separator) << "[MB]";
+ << std::setfill(separator);
std::stringstream bandwidth;
bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
(exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
+ << std::setfill(separator);
std::stringstream window;
- window << stats.getAverageWindowSize() << std::setfill(separator)
- << "[Int]";
+ window << stats.getAverageWindowSize() << std::setfill(separator);
std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+ avg_rtt << stats.getAverageRtt() << std::setfill(separator);
if (configuration_.rtc_) {
- // we get rtc stats more often, thus we need ms in the interval
- std::stringstream interval_ms;
- interval_ms << total_duration_milliseconds_ << "-"
- << total_duration_milliseconds_ + exact_duration.count();
-
std::stringstream lost_data;
lost_data << stats.getLostData() - old_lost_data_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream bytes_recovered_data;
bytes_recovered_data << stats.getBytesRecoveredData() -
old_bytes_recovered_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream definitely_lost_data;
definitely_lost_data << stats.getDefinitelyLostData() -
old_definitely_lost_data_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream data_delay;
- data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
+ data_delay << std::fixed << std::setprecision(3) << avg_data_delay_
+ << std::setfill(separator);
std::stringstream received_data_pkt;
- received_data_pkt << received_data_pkt_ << std::setfill(separator)
- << "[pkt]";
+ received_data_pkt << received_data_pkt_ << std::setfill(separator);
std::stringstream goodput;
- goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
+ goodput << std::fixed << std::setprecision(3)
+ << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
std::stringstream loss_rate;
loss_rate << std::fixed << std::setprecision(2)
- << stats.getLossRatio() * 100.0 << std::setfill(separator)
- << "[%]";
+ << stats.getLossRatio() * 100.0 << std::setfill(separator);
std::stringstream retx_sent;
retx_sent << stats.getRetxCount() - old_retx_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream interest_sent;
interest_sent << stats.getInterestTx() - old_sent_int_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream nacks;
nacks << stats.getReceivedNacks() - old_received_nacks_value_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream fec_pkt;
fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
- << std::setfill(separator) << "[pkt]";
+ << std::setfill(separator);
std::stringstream queuing_delay;
- queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
- << "[ms]";
+ queuing_delay << std::fixed << std::setprecision(3)
+ << stats.getQueuingDelay() << std::setfill(separator);
-#ifdef FORWARDER_INTERFACE
- if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
- total_duration_milliseconds_ > 1000) {
- std::cout << "Switching due to queuing delay" << std::endl;
- forwarder_interface_.createFaceAndRoutes(backup_routes_);
- forwarder_interface_.deleteFaceAndRoutes(main_routes_);
- std::swap(backup_routes_, main_routes_);
- done_ = true;
+ std::stringstream residual_losses;
+ double rl_perc = stats.getResidualLossRate() * 100;
+ residual_losses << std::fixed << std::setprecision(2) << rl_perc
+ << std::setfill(separator);
+
+ std::stringstream quality_score;
+ quality_score << std::fixed << (int)stats.getQualityScore()
+ << std::setfill(separator);
+
+ std::stringstream alerts;
+ alerts << stats.getAlerts() << std::setfill(separator);
+
+ std::stringstream auth_alerts;
+ auth_alerts << auth_alerts_ << std::setfill(separator);
+
+ if (fwd_connected_ && use_bestpath_ &&
+ ((stats.getAverageRtt() > rtt_threshold_) ||
+ ((stats.getResidualLossRate() * 100) > loss_threshold_))) {
+ forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath");
}
-#endif
- // statistics not yet available in the transport
- // std::stringstream interest_fec_tx;
- // interest_fec_tx << stats.getInterestFecTxCount() -
- // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
- // std::stringstream bytes_fec_recv;
- // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
- // << std::setfill(separator) << "[bytes]";
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "RecvData";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Goodput";
- std::cout << std::left << std::setw(width) << "LossRate";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "InterestSent";
- std::cout << std::left << std::setw(width) << "ReceivedNacks";
- std::cout << std::left << std::setw(width) << "SyncWnd";
- std::cout << std::left << std::setw(width) << "MinRtt";
- std::cout << std::left << std::setw(width) << "QueuingDelay";
- std::cout << std::left << std::setw(width) << "LostData";
- std::cout << std::left << std::setw(width) << "RecoveredData";
- std::cout << std::left << std::setw(width) << "DefinitelyLost";
- std::cout << std::left << std::setw(width) << "State";
- std::cout << std::left << std::setw(width) << "DataDelay";
- std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval_ms.str();
- std::cout << std::left << std::setw(width) << received_data_pkt.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << goodput.str();
- std::cout << std::left << std::setw(width) << loss_rate.str();
- std::cout << std::left << std::setw(width) << retx_sent.str();
- std::cout << std::left << std::setw(width) << interest_sent.str();
- std::cout << std::left << std::setw(width) << nacks.str();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str();
- std::cout << std::left << std::setw(width) << queuing_delay.str();
- std::cout << std::left << std::setw(width) << lost_data.str();
- std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
- std::cout << std::left << std::setw(width) << definitely_lost_data.str();
- std::cout << std::left << std::setw(width) << stats.getCCStatus();
- std::cout << std::left << std::setw(width) << data_delay.str();
- std::cout << std::left << std::setw(width) << fec_pkt.str();
+ if ((header_counter_ == 0 && print_headers_) || first_) {
+ std::cout << std::right << std::setw(width) << "Interval[ms]";
+ std::cout << std::right << std::setw(width) << "RecvData[pkt]";
+ std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ std::cout << std::right << std::setw(width) << "Goodput[Mbps]";
+ std::cout << std::right << std::setw(width) << "LossRate[%]";
+ std::cout << std::right << std::setw(width) << "Retr[pkt]";
+ std::cout << std::right << std::setw(width) << "InterestSent";
+ std::cout << std::right << std::setw(width) << "ReceivedNacks";
+ std::cout << std::right << std::setw(width) << "SyncWnd[pkt]";
+ std::cout << std::right << std::setw(width) << "MinRtt[ms]";
+ std::cout << std::right << std::setw(width) << "QueuingDelay[ms]";
+ std::cout << std::right << std::setw(width) << "LostData[pkt]";
+ std::cout << std::right << std::setw(width) << "RecoveredData";
+ std::cout << std::right << std::setw(width) << "DefinitelyLost";
+ std::cout << std::right << std::setw(width) << "State";
+ std::cout << std::right << std::setw(width) << "DataDelay[ms]";
+ std::cout << std::right << std::setw(width) << "FecPkt";
+ std::cout << std::right << std::setw(width) << "Congestion";
+ std::cout << std::right << std::setw(width) << "ResidualLosses";
+ std::cout << std::right << std::setw(width) << "QualityScore";
+ std::cout << std::right << std::setw(width) << "Alerts";
+ std::cout << std::right << std::setw(width) << "AuthAlerts"
+ << std::endl;
+
+ first_ = false;
+ }
+
+ std::cout << std::right << std::setw(width) << interval_ms.str();
+ std::cout << std::right << std::setw(width) << received_data_pkt.str();
+ std::cout << std::right << std::setw(width) << bandwidth.str();
+ std::cout << std::right << std::setw(width) << goodput.str();
+ std::cout << std::right << std::setw(width) << loss_rate.str();
+ std::cout << std::right << std::setw(width) << retx_sent.str();
+ std::cout << std::right << std::setw(width) << interest_sent.str();
+ std::cout << std::right << std::setw(width) << nacks.str();
+ std::cout << std::right << std::setw(width) << window.str();
+ std::cout << std::right << std::setw(width) << avg_rtt.str();
+ std::cout << std::right << std::setw(width) << queuing_delay.str();
+ std::cout << std::right << std::setw(width) << lost_data.str();
+ std::cout << std::right << std::setw(width) << bytes_recovered_data.str();
+ std::cout << std::right << std::setw(width) << definitely_lost_data.str();
+ std::cout << std::right << std::setw(width) << stats.getCCStatus();
+ std::cout << std::right << std::setw(width) << data_delay.str();
+ std::cout << std::right << std::setw(width) << fec_pkt.str();
+ std::cout << std::right << std::setw(width) << stats.isCongested();
+ std::cout << std::right << std::setw(width) << residual_losses.str();
+ std::cout << std::right << std::setw(width) << quality_score.str();
+ std::cout << std::right << std::setw(width) << alerts.str();
+ std::cout << std::right << std::setw(width) << auth_alerts.str();
std::cout << std::endl;
if (configuration_.test_mode_) {
if (data_delays_.size() > 0) data_delays_.pop_back();
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
- << std::endl;
+ auto now = utils::SteadyTime::nowMs();
+ std::cout << std::fixed << std::setprecision(0) << now.count()
+ << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl;
}
// statistics not yet available in the transport
- // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
- // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
+ // std::cout << std::right << std::setw(width) << interest_fec_tx.str();
+ // std::cout << std::right << std::setw(width) << bytes_fec_recv.str();
} else {
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "Transfer";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "Cwnd";
- std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval.str();
- std::cout << std::left << std::setw(width) << bytes_transferred.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << stats.getRetxCount();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
- std::cout << std::endl;
+ if ((header_counter_ == 0 && print_headers_) || first_) {
+ std::cout << std::right << std::setw(width) << "Interval[ms]";
+ std::cout << std::right << std::setw(width) << "Transfer[MB]";
+ std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ std::cout << std::right << std::setw(width) << "Retr[pkt]";
+ std::cout << std::right << std::setw(width) << "Cwnd[Int]";
+ std::cout << std::right << std::setw(width) << "AvgRtt[ms]"
+ << std::endl;
+
+ first_ = false;
+ }
+
+ std::cout << std::right << std::setw(width) << interval_ms.str();
+ std::cout << std::right << std::setw(width) << bytes_transferred.str();
+ std::cout << std::right << std::setw(width) << bandwidth.str();
+ std::cout << std::right << std::setw(width) << stats.getRetxCount();
+ std::cout << std::right << std::setw(width) << window.str();
+ std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl;
}
+
total_duration_milliseconds_ += (uint32_t)exact_duration.count();
old_bytes_value_ = stats.getBytesRecv();
old_lost_data_value_ = stats.getLostData();
@@ -289,9 +305,95 @@ class HIperfClient::Impl
received_data_pkt_ = 0;
data_delays_ = "";
- t_stats_ = utils::SteadyClock::now();
+ t_stats_ = utils::SteadyTime::Clock::now();
+
+ header_counter_ = (header_counter_ + 1) & header_counter_mask_;
+
+ if (--configuration_.nb_iterations_ == 0) {
+ // We reached the maximum nb of runs. Stop now.
+ io_service_.stop();
+ }
}
+ bool setForwarderConnection(forwarder_type_t forwarder_type) {
+ using namespace libconfig;
+ Config cfg;
+
+ const char *conf_file = getenv("FORWARDER_CONFIG");
+ if (!conf_file) return false;
+
+ if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG))
+ return false;
+
+ try {
+ cfg.readFile(conf_file);
+ } catch (const FileIOException &fioex) {
+ std::cerr << "I/O error while reading file." << std::endl;
+ return false;
+ } catch (const ParseException &pex) {
+ std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
+ << " - " << pex.getError() << std::endl;
+ return false;
+ }
+
+ Setting &config = cfg.getRoot();
+
+ /* conf file example
+ *
+ * use_bestpath = "ON | OFF"
+ * rtt_threshold = 200 //ms
+ * loss_threshold = 20 //%
+ * name = "b001::/16"
+ */
+
+ if (config.exists("use_bestpath")) {
+ std::string val;
+ config.lookupValue("use_bestpath", val);
+ if (val.compare("ON") == 0) use_bestpath_ = true;
+ }
+
+ if (config.exists("rtt_threshold")) {
+ unsigned val;
+ config.lookupValue("rtt_threshold", val);
+ rtt_threshold_ = val;
+ }
+
+ if (config.exists("loss_threshold")) {
+ unsigned val;
+ config.lookupValue("loss_threshold", val);
+ loss_threshold_ = val;
+ }
+
+ if (config.exists("name")) {
+ std::string route;
+ config.lookupValue("name", route);
+
+ std::string delimiter = "/";
+ size_t pos = 0;
+
+ if ((pos = route.find(delimiter)) != std::string::npos) {
+ prefix_name_ = route.substr(0, pos);
+ route.erase(0, pos + delimiter.length());
+ prefix_len_ = std::stoul(route.substr(0));
+ }
+ }
+
+ forwarder_interface_.initForwarderInterface(this, forwarder_type);
+
+ return true;
+ }
+
+ void onHicnServiceReady() override {
+ std::cout << "Successfully connected to local forwarder!" << std::endl;
+ fwd_connected_ = true;
+ }
+
+ void onRouteConfigured(
+ std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
+ std::cout << "Routes successfully configured!" << std::endl;
+ }
+
+#ifdef FORWARDER_INTERFACE
bool parseConfig(const char *conf_file) {
using namespace libconfig;
Config cfg;
@@ -439,7 +541,6 @@ class HIperfClient::Impl
return true;
}
-#ifdef FORWARDER_INTERFACE
void onHicnServiceReady() override {
std::cout << "Successfully connected to local forwarder!" << std::endl;
@@ -541,6 +642,13 @@ class HIperfClient::Impl
}
#endif
+ transport::auth::VerificationPolicy onAuthFailed(
+ transport::auth::Suffix suffix,
+ transport::auth::VerificationPolicy policy) {
+ auth_alerts_++;
+ return transport::auth::VerificationPolicy::ACCEPT;
+ }
+
int setup() {
int ret;
@@ -557,6 +665,7 @@ class HIperfClient::Impl
producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
producer_socket_->registerPrefix(configuration_.relay_name_);
producer_socket_->connect();
+ producer_socket_->start();
}
if (configuration_.output_stream_mode_ && configuration_.rtc_) {
@@ -586,6 +695,17 @@ class HIperfClient::Impl
GeneralTransportOptions::INTEREST_LIFETIME,
configuration_.interest_lifetime_);
+ consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_UNVERIFIED_TIME,
+ configuration_.unverified_delay_);
+
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::PACKET_FORMAT,
+ configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) {
+ std::cerr << "ERROR -- Impossible to set the packet format." << std::endl;
+ return ERROR_SETUP;
+ }
+
#if defined(DEBUG) && defined(__linux__)
std::shared_ptr<transport::BasePortal> portal;
consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
@@ -623,20 +743,24 @@ class HIperfClient::Impl
}
}
+ std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>();
+
if (!configuration_.producer_certificate.empty()) {
- std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
+ verifier = std::make_shared<AsymmetricVerifier>(
configuration_.producer_certificate);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
}
if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Verifier> verifier =
- std::make_shared<SymmetricVerifier>(configuration_.passphrase);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
+ verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase);
+ }
+
+ verifier->setVerificationFailedCallback(
+ std::bind(&HIperfClient::Impl::onAuthFailed, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
ret = consumer_socket_->setSocketOption(
@@ -662,6 +786,65 @@ class HIperfClient::Impl
}
if (configuration_.rtc_) {
+ if (configuration_.recovery_strategy_ == 1) { // unreliable
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF);
+ } else if (configuration_.recovery_strategy_ == 2) { // rtx only
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY);
+ } else if (configuration_.recovery_strategy_ == 3) { // fec only
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY);
+ } else if (configuration_.recovery_strategy_ == 4) { // delay based
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED);
+ } else if (configuration_.recovery_strategy_ == 5) { // low rate flow
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE);
+ } else if (configuration_.recovery_strategy_ ==
+ 6) { // low rate + bestpath
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH);
+ } else if (configuration_.recovery_strategy_ ==
+ 7) { // low rate + replication
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION);
+ } else if (configuration_.recovery_strategy_ ==
+ 8) { // low rate + bestpath or replication
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES);
+ } else {
+ // default
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY);
+ }
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
(ConsumerContentObjectCallback)std::bind(
@@ -673,6 +856,15 @@ class HIperfClient::Impl
}
if (configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE,
+ configuration_.fec_type_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
std::shared_ptr<TransportStatistics> transport_stats;
consumer_socket_->getSocketOption(
OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
@@ -708,10 +900,10 @@ class HIperfClient::Impl
signals_.async_wait(
[this](const std::error_code &, const int &) { io_service_.stop(); });
- t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name);
- io_service_.run();
+ t_download_ = t_stats_ = utils::SteadyTime::now();
+ consumer_socket_->consume(configuration_.name);
+ io_service_.run();
consumer_socket_->stop();
return ERROR_SUCCESS;
@@ -719,11 +911,15 @@ class HIperfClient::Impl
private:
class RTCCallback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t mtu = 1500;
+ static constexpr std::size_t mtu = HIPERF_MTU;
public:
RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
+ Packet::Format format =
+ PayloadSize::getFormatFromName(client_.configuration_.name, false);
+ payload_size_max_ =
+ PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE);
}
bool isBufferMovable() noexcept override { return false; }
@@ -743,9 +939,7 @@ class HIperfClient::Impl
uint64_t *senderTimeStamp =
(uint64_t *)client_.configuration_.receive_buffer->writableData();
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
+ auto now = utils::SystemTime::nowMs().count();
double new_delay = (double)(now - *senderTimeStamp);
if (*senderTimeStamp > now)
@@ -765,7 +959,7 @@ class HIperfClient::Impl
client_.producer_socket_->produceDatagram(
client_.configuration_.relay_name_.getName(),
client_.configuration_.receive_buffer->writableData(),
- length < 1400 ? length : 1400);
+ length < payload_size_max_ ? length : payload_size_max_);
}
if (client_.configuration_.output_stream_mode_) {
uint8_t *start =
@@ -778,7 +972,7 @@ class HIperfClient::Impl
size_t maxBufferSize() const override { return mtu; }
- void readError(const std::error_code ec) noexcept override {
+ void readError(const std::error_code &ec) noexcept override {
std::cerr << "Error while reading from RTC socket" << std::endl;
client_.io_service_.stop();
}
@@ -789,6 +983,7 @@ class HIperfClient::Impl
private:
Impl &client_;
+ std::size_t payload_size_max_;
};
class Callback : public ConsumerSocket::ReadCallback {
@@ -816,16 +1011,15 @@ class HIperfClient::Impl
return client_.configuration_.receive_buffer_size_;
}
- void readError(const std::error_code ec) noexcept override {
+ void readError(const std::error_code &ec) noexcept override {
std::cerr << "Error " << ec.message() << " while reading from socket"
<< std::endl;
client_.io_service_.stop();
}
void readSuccess(std::size_t total_size) noexcept override {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
+ auto t2 = utils::SteadyTime::now();
+ auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2);
long usec = (long)dt.count();
std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
@@ -843,8 +1037,8 @@ class HIperfClient::Impl
};
hiperf::ClientConfiguration configuration_;
- Time t_stats_;
- Time t_download_;
+ utils::SteadyTime::TimePoint t_stats_;
+ utils::SteadyTime::TimePoint t_download_;
uint32_t total_duration_milliseconds_;
uint64_t old_bytes_value_;
uint64_t old_interest_tx_value_;
@@ -865,6 +1059,7 @@ class HIperfClient::Impl
uint32_t received_bytes_;
uint32_t received_data_pkt_;
+ uint32_t auth_alerts_;
std::string data_delays_;
@@ -878,19 +1073,44 @@ class HIperfClient::Impl
asio::ip::udp::endpoint remote_;
ForwarderConfiguration config_;
- uint16_t switch_threshold_; /* ms */
- bool done_;
+ // uint16_t switch_threshold_; /* ms */
+ bool fwd_connected_;
+ bool use_bestpath_;
+ uint32_t rtt_threshold_; /* ms */
+ uint32_t loss_threshold_; /* ms */
+ std::string prefix_name_; // bestpath route
+ uint32_t prefix_len_;
+ // bool done_;
+
std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
-#ifdef FORWARDER_INTERFACE
+ uint16_t header_counter_mask_;
+ uint16_t header_counter_;
+
+ bool print_headers_;
+ bool first_;
+
ForwarderInterface forwarder_interface_;
-#endif
};
HIperfClient::HIperfClient(const ClientConfiguration &conf) {
impl_ = new Impl(conf);
}
+HIperfClient::HIperfClient(HIperfClient &&other) {
+ impl_ = other.impl_;
+ other.impl_ = nullptr;
+}
+
+HIperfClient &HIperfClient::operator=(HIperfClient &&other) {
+ if (this != &other) {
+ impl_ = other.impl_;
+ other.impl_ = nullptr;
+ }
+
+ return *this;
+}
+
HIperfClient::~HIperfClient() { delete impl_; }
int HIperfClient::setup() { return impl_->setup(); }
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
index f45b9af43..bc80c874c 100644
--- a/apps/hiperf/src/client.h
+++ b/apps/hiperf/src/client.h
@@ -16,12 +16,16 @@
#pragma once
#include <common.h>
+#include <hicn/transport/utils/noncopyable.h>
namespace hiperf {
-class HIperfClient {
+class HIperfClient : ::utils::NonCopyable {
public:
HIperfClient(const ClientConfiguration &conf);
+ HIperfClient(HIperfClient &&other);
+ HIperfClient &operator=(HIperfClient &&other);
+
~HIperfClient();
int setup();
void run();
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
index e6ba526f9..13c9dcc1d 100644
--- a/apps/hiperf/src/common.h
+++ b/apps/hiperf/src/common.h
@@ -15,7 +15,7 @@
#pragma once
-#include <hicn/transport/auth/identity.h>
+#include <forwarder_interface.h>
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
@@ -45,6 +45,9 @@
#endif
#define ERROR_SETUP -5
#define MIN_PROBE_SEQ 0xefffffff
+#define RTC_HEADER_SIZE 12
+#define FEC_HEADER_MAX_SIZE 36
+#define HIPERF_MTU 1500
using namespace transport::interface;
using namespace transport::auth;
@@ -73,11 +76,51 @@ static inline uint64_t _htonll(const uint64_t *input) {
namespace hiperf {
/**
+ * Class to retrieve the maximum payload size given the MTU and packet headers.
+ */
+class PayloadSize {
+ public:
+ PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU)
+ : mtu_(mtu), format_(format) {}
+
+ std::size_t getPayloadSizeMax(std::size_t transport_size = 0,
+ std::size_t fec_size = 0,
+ std::size_t signature_size = 0) {
+ return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) -
+ transport_size - fec_size;
+ }
+
+ static Packet::Format getFormatFromName(Name name, bool ah = false) {
+ switch (name.getAddressFamily()) {
+ case AF_INET:
+ return ah ? HF_INET_TCP_AH : HF_INET_TCP;
+ case AF_INET6:
+ return ah ? HF_INET6_TCP_AH : HF_INET6_TCP;
+ default:
+ return HF_UNSPEC;
+ }
+ }
+
+ private:
+ std::size_t mtu_;
+ Packet::Format format_;
+};
+
+/**
* Class for handling the production rate for the RTC producer.
*/
class Rate {
public:
Rate() : rate_kbps_(0) {}
+ ~Rate() {}
+
+ Rate &operator=(const Rate &other) {
+ if (this != &other) {
+ rate_kbps_ = other.rate_kbps_;
+ }
+
+ return *this;
+ }
Rate(const std::string &rate) {
std::size_t found = rate.find("kbps");
@@ -137,9 +180,16 @@ struct ClientConfiguration {
secure_(false),
producer_prefix_(),
interest_lifetime_(500),
+ unverified_delay_(2000),
relay_name_("c001::abcd/64"),
output_stream_mode_(false),
- port_(0) {}
+ port_(0),
+ recovery_strategy_(4),
+ aggregated_data_(false),
+ fec_type_(""),
+ packet_format_(default_values::packet_format),
+ print_headers_(true),
+ nb_iterations_(std::numeric_limits<decltype(nb_iterations_)>::max()) {}
Name name;
double beta;
@@ -158,9 +208,17 @@ struct ClientConfiguration {
bool secure_;
Prefix producer_prefix_;
uint32_t interest_lifetime_;
+ uint32_t unverified_delay_;
Prefix relay_name_;
bool output_stream_mode_;
uint16_t port_;
+ uint32_t recovery_strategy_;
+ bool aggregated_data_;
+ std::string fec_type_;
+ Packet::Format packet_format_;
+ bool print_headers_;
+ std::uint32_t nb_iterations_;
+ forwarder_type_t forwarder_type_;
};
/**
@@ -170,11 +228,11 @@ struct ServerConfiguration {
ServerConfiguration()
: name("b001::abcd/64"),
virtual_producer(true),
- manifest(false),
+ manifest(0),
live_production(false),
content_lifetime(600000000_U32),
download_size(20 * 1024 * 1024),
- hash_algorithm(CryptoHashType::SHA256),
+ hash_algorithm_(CryptoHashType::SHA256),
keystore_name(""),
passphrase(""),
keystore_password("cisco"),
@@ -185,18 +243,21 @@ struct ServerConfiguration {
trace_index_(0),
trace_file_(nullptr),
production_rate_(std::string("2048kbps")),
- payload_size_(1400),
+ payload_size_(1384),
secure_(false),
input_stream_mode_(false),
- port_(0) {}
+ port_(0),
+ aggregated_data_(false),
+ fec_type_(""),
+ packet_format_(default_values::packet_format) {}
Prefix name;
bool virtual_producer;
- bool manifest;
+ std::uint32_t manifest;
bool live_production;
std::uint32_t content_lifetime;
std::uint32_t download_size;
- CryptoHashType hash_algorithm;
+ CryptoHashType hash_algorithm_;
std::string keystore_name;
std::string passphrase;
std::string keystore_password;
@@ -212,6 +273,9 @@ struct ServerConfiguration {
bool input_stream_mode_;
uint16_t port_;
std::vector<struct packet_t> trace_;
+ bool aggregated_data_;
+ std::string fec_type_;
+ Packet::Format packet_format_;
};
} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
index 655ac3b66..aaac14839 100644
--- a/apps/hiperf/src/forwarder_config.h
+++ b/apps/hiperf/src/forwarder_config.h
@@ -94,4 +94,4 @@ class ForwarderConfiguration {
std::size_t n_threads_;
};
-} \ No newline at end of file
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
index 864208239..e87a5953d 100644
--- a/apps/hiperf/src/forwarder_interface.cc
+++ b/apps/hiperf/src/forwarder_interface.cc
@@ -25,6 +25,7 @@
extern "C" {
#include <hicn/error.h>
#include <hicn/util/ip_address.h>
+#include <hicn/util/sstrncpy.h>
}
// XXX the main listener should be retrieve in this class at initialization, aka
@@ -36,27 +37,14 @@ extern "C" {
namespace hiperf {
+ForwarderInterface::ForwarderInterface(asio::io_service &io_service)
+ : external_ioservice_(io_service), timer_(io_service) {}
+
ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
- ICallback *callback)
- : external_ioservice_(io_service),
- forwarder_interface_callback_(callback),
- work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
- sock_(nullptr),
- thread_(std::make_unique<std::thread>([this]() {
- std::cout << "Starting Forwarder Interface thread" << std::endl;
- internal_ioservice_.run();
- std::cout << "Stopping Forwarder Interface thread" << std::endl;
- })),
- // set_route_callback_(std::forward<Callback &&>(setRouteCallback)),
- check_routes_timer_(nullptr),
- pending_add_route_counter_(0),
- hicn_listen_port_(9695),
- /* We start in disabled state even when a forwarder is always available */
- state_(State::Disabled),
- timer_(io_service),
- num_reattempts(0) {
- std::cout << "Forwarder interface created... connecting to forwarder...\n";
- internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
+ ICallback *callback,
+ forwarder_type_t fwd_type)
+ : external_ioservice_(io_service), timer_(io_service) {
+ initForwarderInterface(callback, fwd_type);
}
ForwarderInterface::~ForwarderInterface() {
@@ -72,8 +60,27 @@ ForwarderInterface::~ForwarderInterface() {
thread_->join();
}
+}
- std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl;
+void ForwarderInterface::initForwarderInterface(ICallback *callback,
+ forwarder_type_t fwd_type) {
+ forwarder_interface_callback_ = callback;
+ work_ = std::make_unique<asio::io_service::work>(internal_ioservice_);
+ sock_ = nullptr;
+ thread_ = std::make_unique<std::thread>([this]() {
+ std::cout << "Starting Forwarder Interface thread" << std::endl;
+ internal_ioservice_.run();
+ std::cout << "Stopping Forwarder Interface thread" << std::endl;
+ });
+ check_routes_timer_ = nullptr;
+ pending_add_route_counter_ = 0;
+ hicn_listen_port_ = 9695;
+ /* We start in disabled state even when a forwarder is always available */
+ state_ = State::Disabled;
+ fwd_type_ = fwd_type;
+ num_reattempts = 0;
+ std::cout << "Forwarder interface created... connecting to forwarder...\n";
+ internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
}
void ForwarderInterface::onHicnServiceAvailable(bool flag) {
@@ -93,26 +100,15 @@ void ForwarderInterface::onHicnServiceAvailable(bool flag) {
std::cout << "Connected to forwarder... cancelling reconnection timer"
<< std::endl;
+
timer_.cancel();
num_reattempts = 0;
- // case State::Connected:
- // checkListener();
-
- // if (state_ != State::Ready) {
- // std::cout << "Listener not found" << std::endl;
- // goto REATTEMPT;
- // }
- // state_ = State::Ready;
-
- // timer_.cancel();
- // num_reattempts = 0;
-
std::cout << "Forwarder interface is ready... communicate to controller"
<< std::endl;
forwarder_interface_callback_->onHicnServiceReady();
-
+ case State::Connected:
case State::Ready:
break;
}
@@ -136,14 +132,14 @@ REATTEMPT:
std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
// timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
// this, flag, std::placeholders::_1));
- timer_.async_wait([this, flag](std::error_code ec) {
+ timer_.async_wait([this, flag](const std::error_code &ec) {
if (ec) return;
onHicnServiceAvailable(flag);
});
}
int ForwarderInterface::connectToForwarder() {
- sock_ = hc_sock_create();
+ sock_ = hc_sock_create_forwarder(fwd_type_);
if (!sock_) {
std::cout << "Could not create socket" << std::endl;
goto ERR_SOCK;
@@ -256,6 +252,27 @@ void ForwarderInterface::deleteFaceAndRoutes(
});
}
+void ForwarderInterface::setStrategy(std::string prefix, uint32_t prefix_len,
+ std::string strategy) {
+ if (!sock_) return;
+
+ ip_address_t ip_prefix;
+ if (ip_address_pton(prefix.c_str(), &ip_prefix) < 0) {
+ return;
+ }
+
+ strategy_type_t strategy_type = strategy_type_from_str(strategy.c_str());
+ if (strategy_type == STRATEGY_TYPE_UNDEFINED) return;
+
+ hc_strategy_t strategy_conf;
+ strategy_conf.address = ip_prefix;
+ strategy_conf.len = prefix_len;
+ strategy_conf.family = AF_INET6;
+ strategy_conf.type = strategy_type;
+
+ hc_strategy_set(sock_, &strategy_conf);
+}
+
void ForwarderInterface::internalDeleteFaceAndRoute(
const RouteInfoPtr &route_info) {
if (!sock_) return;
@@ -346,10 +363,11 @@ void ForwarderInterface::internalCreateFaceAndRoutes(
}
max_try--;
timer->expires_from_now(std::chrono::milliseconds(500));
- timer->async_wait([this, failed, max_try, timer](std::error_code ec) {
- if (ec) return;
- internalCreateFaceAndRoutes(failed, max_try, timer);
- });
+ timer->async_wait(
+ [this, failed, max_try, timer](const std::error_code &ec) {
+ if (ec) return;
+ internalCreateFaceAndRoutes(failed, max_try, timer);
+ });
return;
}
@@ -422,16 +440,18 @@ int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
std::string name = "l_" + route_info->name;
listener.local_addr = local_address;
- listener.type = CONNECTION_TYPE_UDP;
+ listener.type = FACE_TYPE_UDP;
listener.family = AF_INET;
listener.local_port = route_info->local_port;
- strncpy(listener.name, name.c_str(), sizeof(listener.name));
- strncpy(listener.interface_name, route_info->interface.c_str(),
- sizeof(listener.interface_name));
+ int ret = strcpy_s(listener.name, SYMBOLIC_NAME_LEN - 1, name.c_str());
+ if (ret < EOK) goto ERR;
+ ret = strcpy_s(listener.interface_name, INTERFACE_LEN - 1,
+ route_info->interface.c_str());
+ if (ret < EOK) goto ERR;
std::cout << "------------> " << route_info->interface << std::endl;
- int ret = hc_listener_create(sock_, &listener);
+ ret = hc_listener_create(sock_, &listener);
if (ret < 0) {
std::cerr << "Error creating listener." << std::endl;
@@ -520,7 +540,7 @@ int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
#if 0 // not used
void ForwarderInterface::checkRoutesLoop() {
check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
- check_routes_timer_->async_wait([this](std::error_code ec) {
+ check_routes_timer_->async_wait([this](const std::error_code &ec) {
if (ec) return;
if (pending_add_route_counter_ == 0) checkRoutes();
});
@@ -580,7 +600,7 @@ void ForwarderInterface::checkRoutes() {
} else {
// XXX This should be moved somewhere else
getMainListener(
- [this](std::error_code ec, uint32_t hicn_listen_port) {
+ [this](const std::error_code &ec, uint32_t hicn_listen_port) {
if (!ec)
{
hicn_listen_port_ = hicn_listen_port;
@@ -597,7 +617,7 @@ void ForwarderInterface::checkRoutes() {
tryToConnectToForwarder();
}
private:
- void doGetMainListener(std::error_code ec)
+ void doGetMainListener(const std::error_code &ec)
{
if (!ec)
{
@@ -607,7 +627,7 @@ void ForwarderInterface::checkRoutes() {
{
// Since without the main listener of the forwarder the proxy cannot
// work, we can stop the program here until we get the listener port.
- std::cout <<
+ std::cout <<
"Could not retrieve main listener port from the forwarder. "
"Retrying.";
@@ -635,7 +655,7 @@ void ForwarderInterface::checkRoutes() {
doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
}
- void doTryToConnectToForwarder(std::error_code ec)
+ void doTryToConnectToForwarder(const std::error_code &ec)
{
if (!ec)
{
@@ -673,4 +693,4 @@ void ForwarderInterface::checkRoutes() {
constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
-} // namespace hiperf \ No newline at end of file
+} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
index 7591ea257..e58989295 100644
--- a/apps/hiperf/src/forwarder_interface.h
+++ b/apps/hiperf/src/forwarder_interface.h
@@ -15,6 +15,8 @@
#pragma once
+#include <hicn/transport/utils/noncopyable.h>
+
extern "C" {
#ifndef WITH_POLICY
#define WITH_POLICY
@@ -27,14 +29,13 @@ extern "C" {
#define ASIO_STANDALONE
#endif
#include <asio.hpp>
-
#include <functional>
#include <thread>
#include <unordered_map>
namespace hiperf {
-class ForwarderInterface {
+class ForwarderInterface : ::utils::NonCopyable {
static const uint32_t REATTEMPT_DELAY_MS = 500;
static const uint32_t MAX_REATTEMPT = 10;
@@ -68,10 +69,14 @@ class ForwarderInterface {
};
public:
- ForwarderInterface(asio::io_service &io_service, ICallback *callback);
+ explicit ForwarderInterface(asio::io_service &io_service);
+ explicit ForwarderInterface(asio::io_service &io_service, ICallback *callback,
+ forwarder_type_t fwd_type);
~ForwarderInterface();
+ void initForwarderInterface(ICallback *callback, forwarder_type_t fwd_type);
+
State getState();
void setState(State state);
@@ -88,11 +93,16 @@ class ForwarderInterface {
void deleteFaceAndRoute(const RouteInfoPtr &route_info);
+ void setStrategy(std::string prefix, uint32_t prefix_len,
+ std::string strategy);
+
void close();
uint16_t getHicnListenerPort() { return hicn_listen_port_; }
private:
+ ForwarderInterface &operator=(const ForwarderInterface &other) = delete;
+
int connectToForwarder();
int checkListener();
@@ -123,6 +133,8 @@ class ForwarderInterface {
State state_;
+ forwarder_type_t fwd_type_;
+
/* Reattempt timer */
asio::steady_timer timer_;
unsigned num_reattempts;
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
index b2d99c4a4..b69392de8 100644
--- a/apps/hiperf/src/main.cc
+++ b/apps/hiperf/src/main.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,13 +14,13 @@
*/
#include <client.h>
-#include <server.h>
#include <forwarder_interface.h>
+#include <server.h>
namespace hiperf {
void usage() {
- std::cerr << "HIPERF - A tool for performing network throughput "
+ std::cerr << "HIPERF - Instrumentation tool for performing active network"
"measurements with hICN"
<< std::endl;
std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl;
@@ -34,19 +34,27 @@ void usage() {
std::cerr << "-f\t<filename>\t\t\t"
<< "Log file" << std::endl;
std::cerr << "-z\t<io_module>\t\t\t"
- << "IO module to use. Default: hicnlight_module" << std::endl;
+ << "IO module to use. Default: hicnlightng_module" << std::endl;
+ std::cerr << "-F\t<conf_file>\t\t\t"
+ << "Path to optional configuration file for libtransport"
+ << std::endl;
+ std::cerr << "-a\t\t\t\t\t"
+ << "Enables data packet aggregation. "
+ << "Works only in RTC mode" << std::endl;
+ std::cerr << "-X\t<param>\t\t\t\t"
+ << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#"
+ << std::endl;
#endif
std::cerr << std::endl;
std::cerr << "SERVER SPECIFIC:" << std::endl;
std::cerr << "-A\t<content_size>\t\t\t"
- "Size of the content to publish. This "
- "is not the size of the packet (see -s for it)."
- << std::endl;
- std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
+ "Sends an application data unit in bytes that is published once "
+ "before exit"
<< std::endl;
+ std::cerr << "-s\t<packet_size>\t\t\tData packet payload size." << std::endl;
std::cerr << "-r\t\t\t\t\t"
<< "Produce real content of <content_size> bytes" << std::endl;
- std::cerr << "-m\t\t\t\t\t"
+ std::cerr << "-m\t<manifest_capacity>\t\t"
<< "Produce transport manifest" << std::endl;
std::cerr << "-l\t\t\t\t\t"
<< "Start producing content upon the reception of the "
@@ -60,20 +68,17 @@ void usage() {
<< "String from which a 128-bit symmetric key will be "
"derived for signing packets"
<< std::endl;
+ std::cerr << "-p\t<password>\t\t\t"
+ << "Password for p12 keystore" << std::endl;
std::cerr << "-y\t<hash_algorithm>\t\t"
<< "Use the selected hash algorithm for "
- "calculating manifest digests"
+ "computing manifest digests (default: SHA256)"
<< std::endl;
- std::cerr << "-p\t<password>\t\t\t"
- << "Password for p12 keystore" << std::endl;
std::cerr << "-x\t\t\t\t\t"
- << "Produce a content of <content_size>, then after downloading "
- "it produce a new content of"
- << "\n\t\t\t\t\t<content_size> without resetting "
- "the suffix to 0."
- << std::endl;
+ << "Produces application data units of size <content_size> "
+ << "without resetting the name suffix to 0." << std::endl;
std::cerr << "-B\t<bitrate>\t\t\t"
- << "Bitrate for RTC producer, to be used with the -R option."
+ << "RTC producer data bitrate, to be used with the -R option."
<< std::endl;
#ifndef _WIN32
std::cerr << "-I\t\t\t\t\t"
@@ -92,8 +97,8 @@ void usage() {
"file containing the "
"crypto material used for the TLS handshake"
<< std::endl;
- std::cerr << "-G\t<port>\t\t\t"
- << "input stream from localhost at the specified port" << std::endl;
+ std::cerr << "-G\t<port>\t\t\t\t"
+ << "Input stream from localhost at the specified port" << std::endl;
#endif
std::cerr << std::endl;
std::cerr << "CLIENT SPECIFIC:" << std::endl;
@@ -105,6 +110,8 @@ void usage() {
<< std::endl;
std::cerr << "-L\t<interest lifetime>\t\t"
<< "Set interest lifetime." << std::endl;
+ std::cerr << "-u\t<delay>\t\t\t\t"
+ << "Set max lifetime of unverified packets." << std::endl;
std::cerr << "-M\t<input_buffer_size>\t\t"
<< "Size of consumer input buffer. If 0, reassembly of packets "
"will be disabled."
@@ -127,17 +134,31 @@ void usage() {
std::cerr << "-t\t\t\t\t\t"
"Test mode, check if the client is receiving the "
"correct data. This is an RTC specific option, to be "
- "used with the -R (default false)"
+ "used with the -R (default: false)"
<< std::endl;
std::cerr << "-P\t\t\t\t\t"
<< "Prefix of the producer where to do the handshake" << std::endl;
std::cerr << "-j\t<relay_name>\t\t\t"
- << "Publish the received content under the name relay_name."
+ << "Publish received content under the name relay_name."
"This is an RTC specific option, to be "
- "used with the -R (default false)"
+ "used with the -R (default: false)"
+ << std::endl;
+ std::cerr << "-g\t<port>\t\t\t\t"
+ << "Output stream to localhost at the specified port" << std::endl;
+ std::cerr << "-e\t<strategy>\t\t\t"
+ << "Enance the network with a realiability strategy. Options 1:"
+ << " unreliable, 2: rtx only, 3: fec only, "
+ << "4: delay based, 5: low rate, 6: low rate and best path "
+ << "7: low rate and replication, 8: low rate and best"
+ << " path/replication"
+ << "(default: 2 = rtx only) " << std::endl;
+ std::cerr << "-H\t\t\t\t\t"
+ << "Disable periodic print headers in stats report." << std::endl;
+ std::cerr << "-n\t<nb_iterations>\t\t\t"
+ << "Print the stats report <nb_iterations> times and exit.\n"
+ << "\t\t\t\t\tThis option limits the duration of the run to "
+ "<nb_iterations> * <stats_interval> milliseconds."
<< std::endl;
- std::cerr << "-g\t<port>\t\t\t"
- << "output stream to localhost at the specified port" << std::endl;
}
int main(int argc, char *argv[]) {
@@ -156,7 +177,7 @@ int main(int argc, char *argv[]) {
char *log_file = nullptr;
transport::interface::global_config::IoModuleConfiguration config;
std::string conf_file;
- config.name = "hicnlight_module";
+ config.name = "hicnlightng_module";
// Consumer
ClientConfiguration client_configuration;
@@ -166,10 +187,9 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(
- argc, argv,
- "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) !=
- -1) {
+ while ((opt = getopt(argc, argv,
+ "DSCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:"
+ "g:G:e:awHn:X:u:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -202,9 +222,11 @@ int main(int argc, char *argv[]) {
break;
}
#else
- while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) !=
- -1) {
+ while (
+ (opt = getopt(
+ argc, argv,
+ "SCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xB:E:P:tL:z:F:j:e:awHn:X:u:")) !=
+ -1) {
switch (opt) {
#endif
case 'f': {
@@ -216,6 +238,21 @@ int main(int argc, char *argv[]) {
server_configuration.rtc_ = true;
break;
}
+ case 'a': {
+ client_configuration.aggregated_data_ = true;
+ server_configuration.aggregated_data_ = true;
+ break;
+ }
+ case 'X': {
+ client_configuration.fec_type_ = std::string(optarg);
+ server_configuration.fec_type_ = std::string(optarg);
+ break;
+ }
+ case 'w': {
+ client_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
+ server_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
+ break;
+ }
case 'z': {
config.name = optarg;
break;
@@ -286,12 +323,27 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'u': {
+ client_configuration.unverified_delay_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
case 'j': {
client_configuration.relay_ = true;
client_configuration.relay_name_ = Prefix(optarg);
options = 1;
break;
}
+ case 'H': {
+ client_configuration.print_headers_ = false;
+ options = 1;
+ break;
+ }
+ case 'n': {
+ client_configuration.nb_iterations_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
// Server specific
case 'A': {
server_configuration.download_size = std::stoul(optarg);
@@ -309,7 +361,7 @@ int main(int argc, char *argv[]) {
break;
}
case 'm': {
- server_configuration.manifest = true;
+ server_configuration.manifest = std::stoul(optarg);
options = -1;
break;
}
@@ -324,18 +376,19 @@ int main(int argc, char *argv[]) {
break;
}
case 'y': {
+ CryptoHashType hash_algorithm = CryptoHashType::SHA256;
if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA256;
+ hash_algorithm = CryptoHashType::SHA256;
} else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA512;
+ hash_algorithm = CryptoHashType::SHA512;
} else if (strncasecmp(optarg, "blake2b512", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512;
+ hash_algorithm = CryptoHashType::BLAKE2B512;
} else if (strncasecmp(optarg, "blake2s256", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256;
+ hash_algorithm = CryptoHashType::BLAKE2S256;
} else {
- std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
- << std::endl;
+ std::cerr << "Unknown hash algorithm. Using SHA 256." << std::endl;
}
+ server_configuration.hash_algorithm_ = hash_algorithm;
options = -1;
break;
}
@@ -361,6 +414,11 @@ int main(int argc, char *argv[]) {
server_configuration.secure_ = true;
break;
}
+ case 'e': {
+ client_configuration.recovery_strategy_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
case 'h':
default:
usage();
@@ -430,6 +488,13 @@ int main(int argc, char *argv[]) {
transport::interface::global_config::parseConfigurationFile(conf_file);
if (role > 0) {
+ // set forwarder type
+ client_configuration.forwarder_type_ = UNDEFINED;
+ if (config.name.compare("hicnlightng_module") == 0)
+ client_configuration.forwarder_type_ = HICNLIGHT;
+ else if (config.name.compare("hicnlightng_module") == 0)
+ client_configuration.forwarder_type_ = HICNLIGHT_NG;
+
HIperfClient c(client_configuration);
if (c.setup() != ERROR_SETUP) {
c.run();
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
index 968d42e2c..7101e7a4a 100644
--- a/apps/hiperf/src/server.cc
+++ b/apps/hiperf/src/server.cc
@@ -21,7 +21,7 @@ namespace hiperf {
* Hiperf server class: configure and setup an hicn producer following the
* ServerConfiguration.
*/
-class HIperfServer::Impl {
+class HIperfServer::Impl : public ProducerSocket::Callback {
const std::size_t log2_content_object_buffer_size = 8;
public:
@@ -35,11 +35,8 @@ class HIperfServer::Impl {
mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
last_segment_(0),
#ifndef _WIN32
- ptr_last_segment_(&last_segment_),
input_(io_service_),
rtc_running_(false),
-#else
- ptr_last_segment_(&last_segment_),
#endif
flow_name_(configuration_.name.getName()),
socket_(io_service_),
@@ -54,14 +51,16 @@ class HIperfServer::Impl {
#endif
for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = ContentObject::Ptr(
- new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
- (const uint8_t *)buffer.data(), buffer.size()));
+ content_objects_[i] = ContentObject::Ptr(new ContentObject(
+ configuration_.name.getName(), configuration_.packet_format_, 0,
+ (const uint8_t *)buffer.data(), buffer.size()));
content_objects_[i]->setLifetime(
default_values::content_object_expiry_time);
}
}
+ virtual ~Impl() {}
+
void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
content_objects_[content_objects_index_ & mask_]->setName(
interest.getName());
@@ -91,16 +90,14 @@ class HIperfServer::Impl {
if (suffix == 0) {
last_segment_ = 0;
- ptr_last_segment_ = &last_segment_;
unsatisfied_interests_.clear();
}
- // The suffix will either be the one from the received interest or the
- // smallest suffix of a previous interest not satisfed
+ // The suffix will either come from the received interest or will be set to
+ // the smallest suffix of a previous interest not satisfied
if (!unsatisfied_interests_.empty()) {
- auto it =
- std::lower_bound(unsatisfied_interests_.begin(),
- unsatisfied_interests_.end(), *ptr_last_segment_);
+ auto it = std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), last_segment_);
if (it != unsatisfied_interests_.end()) {
suffix = *it;
}
@@ -121,27 +118,28 @@ class HIperfServer::Impl {
b->append(configuration_.download_size);
uint32_t total;
- utils::TimePoint t0 = utils::SteadyClock::now();
+ utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
total = p.produceStream(content_name, std::move(b),
!configuration_.multiphase_produce_, suffix);
- utils::TimePoint t1 = utils::SteadyClock::now();
+ utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
- std::cout
- << "Written " << total
- << " data packets in output buffer (Segmentation time: "
- << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count()
- << " us)" << std::endl;
+ std::cout << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
+ << std::endl;
}
void produceContentAsync(ProducerSocket &p, Name content_name,
uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
-
- p.asyncProduce(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix,
- &ptr_last_segment_);
+ produce_thread_.add([this, suffix, content_name]() {
+ auto b = utils::MemBuf::create(configuration_.download_size);
+ std::memset(b->writableData(), '?', configuration_.download_size);
+ b->append(configuration_.download_size);
+
+ last_segment_ = suffix + producer_socket_->produceStream(
+ content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ });
}
void cacheMiss(ProducerSocket &p, const Interest &interest) {
@@ -156,27 +154,22 @@ class HIperfServer::Impl {
std::placeholders::_1, std::placeholders::_2));
}
- std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path,
- std::string &keystore_pwd,
- CryptoHashType &hash_type) {
- if (access(keystore_path.c_str(), F_OK) != -1) {
- return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type);
- }
- return std::make_shared<Identity>(keystore_path, keystore_pwd,
- CryptoSuite::RSA_SHA256, 1024, 365,
- "producer-test");
+ void produceError(const std::error_code &err) noexcept override {
+ std::cerr << "Error from producer transport: " << err.message()
+ << std::endl;
+ producer_socket_->stop();
+ io_service_.stop();
}
int setup() {
int ret;
int production_protocol;
+ std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
if (configuration_.secure_) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
- configuration_.rtc_, identity);
+ configuration_.rtc_, configuration_.keystore_name,
+ configuration_.keystore_password);
} else {
if (!configuration_.rtc_) {
production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
@@ -188,36 +181,79 @@ class HIperfServer::Impl {
}
if (producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
+ SOCKET_OPTION_NOT_SET) {
+ std::cerr << "Failed to set producer callback." << std::endl;
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::HASH_ALGORITHM,
+ configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(PACKET_FORMAT,
+ configuration_.packet_format_) ==
+ SOCKET_OPTION_NOT_SET) {
+ std::cerr << "ERROR -- Impossible to set the packet format." << std::endl;
+ return ERROR_SETUP;
+ }
+
if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>(
- CryptoSuite::HMAC_SHA256, configuration_.passphrase);
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
- signer);
+ signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256,
+ configuration_.passphrase);
}
if (!configuration_.keystore_name.empty()) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
- std::shared_ptr<Signer> signer = identity->getSigner();
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
- signer);
+ signer = std::make_shared<AsymmetricSigner>(
+ configuration_.keystore_name, configuration_.keystore_password);
+ }
+
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer);
+
+ // Compute maximum payload size
+ Packet::Format format = PayloadSize::getFormatFromName(
+ configuration_.name.getName(), !configuration_.manifest);
+ payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
+ configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
+ configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
+ !configuration_.manifest ? signer->getSignatureFieldSize() : 0);
+
+ if (configuration_.payload_size_ > payload_size_max_) {
+ std::cerr << "WARNING: Payload has size " << configuration_.payload_size_
+ << ", maximum is " << payload_size_max_
+ << ". Payload will be truncated to fit." << std::endl;
+ }
+
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE,
+ configuration_.fec_type_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- uint32_t rtc_header_size = 0;
- if (configuration_.rtc_) rtc_header_size = 12;
- producer_socket_->setSocketOption(
- GeneralTransportOptions::DATA_PACKET_SIZE,
- (uint32_t)(
- configuration_.payload_size_ + rtc_header_size +
- (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60)));
producer_socket_->registerPrefix(configuration_.name);
producer_socket_->connect();
+ producer_socket_->start();
if (configuration_.rtc_) {
std::cout << "Running RTC producer: the prefix length will be ignored."
@@ -239,6 +275,13 @@ class HIperfServer::Impl {
return ERROR_SETUP;
}
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_SEGMENT_SIZE,
+ static_cast<uint32_t>(configuration_.payload_size_)) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
if (!configuration_.live_production) {
produceContent(*producer_socket_, configuration_.name.getName(), 0);
} else {
@@ -283,7 +326,7 @@ class HIperfServer::Impl {
void receiveStream() {
socket_.async_receive_from(
asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
if (ec) return;
sendRTCContentFromStream(recv_buffer_.first, length);
receiveStream();
@@ -296,9 +339,8 @@ class HIperfServer::Impl {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SystemTime::nowMs().count();
+
uint8_t *start = (uint8_t *)payload->writableData();
std::memcpy(start, &now, sizeof(uint64_t));
std::memcpy(start + sizeof(uint64_t), buff, len);
@@ -306,7 +348,7 @@ class HIperfServer::Impl {
len + sizeof(uint64_t));
}
- void sendRTCContentObjectCallback(std::error_code ec) {
+ void sendRTCContentObjectCallback(const std::error_code &ec) {
if (ec) return;
rtc_timer_.expires_from_now(
configuration_.production_rate_.getMicrosecondsForPacket(
@@ -319,18 +361,17 @@ class HIperfServer::Impl {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SystemTime::nowMs().count();
std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
- producer_socket_->produceDatagram(
- flow_name_, payload->data(),
- payload->length() < 1400 ? payload->length() : 1400);
+ producer_socket_->produceDatagram(flow_name_, payload->data(),
+ payload->length() < payload_size_max_
+ ? payload->length()
+ : payload_size_max_);
}
- void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) {
if (ec) return;
auto payload =
@@ -342,14 +383,11 @@ class HIperfServer::Impl {
// this is used to compute the data packet delay
// used only for performance evaluation
// it requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-
+ uint64_t now = utils::SystemTime::nowMs().count();
std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
if (packet_len > payload->length()) packet_len = payload->length();
- if (packet_len > 1400) packet_len = 1400;
+ if (packet_len > payload_size_max_) packet_len = payload_size_max_;
producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
@@ -450,13 +488,13 @@ class HIperfServer::Impl {
std::placeholders::_1));
} else if (configuration_.input_stream_mode_) {
rtc_running_ = true;
- // crate socket
+ // create socket
remote_ = asio::ip::udp::endpoint(
asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
socket_.open(asio::ip::udp::v4());
socket_.bind(remote_);
- recv_buffer_.first = (uint8_t *)malloc(1500);
- recv_buffer_.second = 1500;
+ recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU);
+ recv_buffer_.second = HIPERF_MTU;
receiveStream();
} else {
rtc_running_ = true;
@@ -490,8 +528,9 @@ class HIperfServer::Impl {
std::uint16_t content_objects_index_;
std::uint16_t mask_;
std::uint32_t last_segment_;
- std::uint32_t *ptr_last_segment_;
std::unique_ptr<ProducerSocket> producer_socket_;
+ ::utils::EventThread produce_thread_;
+ std::size_t payload_size_max_;
#ifndef _WIN32
asio::posix::stream_descriptor input_;
asio::streambuf input_buffer_;
@@ -507,6 +546,20 @@ HIperfServer::HIperfServer(const ServerConfiguration &conf) {
impl_ = new Impl(conf);
}
+HIperfServer::HIperfServer(HIperfServer &&other) {
+ impl_ = other.impl_;
+ other.impl_ = nullptr;
+}
+
+HIperfServer &HIperfServer::operator=(HIperfServer &&other) {
+ if (this != &other) {
+ impl_ = other.impl_;
+ other.impl_ = nullptr;
+ }
+
+ return *this;
+}
+
HIperfServer::~HIperfServer() { delete impl_; }
int HIperfServer::setup() { return impl_->setup(); }
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
index 05407a807..73ac72123 100644
--- a/apps/hiperf/src/server.h
+++ b/apps/hiperf/src/server.h
@@ -22,6 +22,9 @@ namespace hiperf {
class HIperfServer {
public:
HIperfServer(const ServerConfiguration &conf);
+ HIperfServer(HIperfServer &&other);
+ HIperfServer &operator=(HIperfServer &&other);
+
~HIperfServer();
int setup();
void run();