diff options
Diffstat (limited to 'apps/hiperf/src')
-rw-r--r-- | apps/hiperf/src/client.cc | 524 | ||||
-rw-r--r-- | apps/hiperf/src/client.h | 6 | ||||
-rw-r--r-- | apps/hiperf/src/common.h | 80 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_config.h | 2 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.cc | 122 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.h | 18 | ||||
-rw-r--r-- | apps/hiperf/src/main.cc | 145 | ||||
-rw-r--r-- | apps/hiperf/src/server.cc | 209 | ||||
-rw-r--r-- | apps/hiperf/src/server.h | 3 |
9 files changed, 775 insertions, 334 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc index 820ebf0ce..319fa82ab 100644 --- a/apps/hiperf/src/client.cc +++ b/apps/hiperf/src/client.cc @@ -31,17 +31,12 @@ class Callback; * Hiperf client class: configure and setup an hicn consumer following the * ClientConfiguration. */ -class HIperfClient::Impl -#ifdef FORWARDER_INTERFACE - : ForwarderInterface::ICallback -#endif -{ - typedef std::chrono::time_point<std::chrono::steady_clock> Time; - typedef std::chrono::microseconds TimeDuration; - +class HIperfClient::Impl : ForwarderInterface::ICallback { friend class Callback; friend class RTCCallback; + static const constexpr uint16_t log2_header_counter = 4; + struct nack_packet_t { uint64_t timestamp; uint32_t prod_rate; @@ -76,21 +71,29 @@ class HIperfClient::Impl delay_sample_(0), received_bytes_(0), received_data_pkt_(0), + auth_alerts_(0), data_delays_(""), signals_(io_service_), rtc_callback_(*this), callback_(*this), socket_(io_service_), - done_(false), - switch_threshold_(~0) -#ifdef FORWARDER_INTERFACE - , - forwarder_interface_(io_service_, this) -#endif - { + // switch_threshold_(~0), + fwd_connected_(false), + use_bestpath_(false), + rtt_threshold_(~0), + loss_threshold_(~0), + prefix_name_(""), + prefix_len_(0), + // done_(false), + header_counter_mask_((1 << log2_header_counter) - 1), + header_counter_(0), + print_headers_(configuration_.print_headers_), + first_(true), + forwarder_interface_(io_service_) { + setForwarderConnection(conf.forwarder_type_); } - ~Impl() {} + virtual ~Impl() {} void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) {} @@ -104,174 +107,187 @@ class HIperfClient::Impl void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { const char separator = ' '; - const int width = 15; + const int width = 18; - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2); - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; + std::stringstream interval_ms; + interval_ms << total_duration_milliseconds_ << "-" + << total_duration_milliseconds_ + exact_duration.count(); std::stringstream bytes_transferred; bytes_transferred << std::fixed << std::setprecision(3) << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MB]"; + << std::setfill(separator); std::stringstream bandwidth; bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + << std::setfill(separator); std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; + window << stats.getAverageWindowSize() << std::setfill(separator); std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + avg_rtt << stats.getAverageRtt() << std::setfill(separator); if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - std::stringstream lost_data; lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream bytes_recovered_data; bytes_recovered_data << stats.getBytesRecoveredData() - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream definitely_lost_data; definitely_lost_data << stats.getDefinitelyLostData() - old_definitely_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; + data_delay << std::fixed << std::setprecision(3) << avg_data_delay_ + << std::setfill(separator); std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; + received_data_pkt << received_data_pkt_ << std::setfill(separator); std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + goodput << std::fixed << std::setprecision(3) + << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 + << std::setfill(separator); std::stringstream loss_rate; loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; + << stats.getLossRatio() * 100.0 << std::setfill(separator); std::stringstream retx_sent; retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream interest_sent; interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream nacks; nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream fec_pkt; fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream queuing_delay; - queuing_delay << stats.getQueuingDelay() << std::setfill(separator) - << "[ms]"; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); -#ifdef FORWARDER_INTERFACE - if (!done_ && stats.getQueuingDelay() >= switch_threshold_ && - total_duration_milliseconds_ > 1000) { - std::cout << "Switching due to queuing delay" << std::endl; - forwarder_interface_.createFaceAndRoutes(backup_routes_); - forwarder_interface_.deleteFaceAndRoutes(main_routes_); - std::swap(backup_routes_, main_routes_); - done_ = true; + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << auth_alerts_ << std::setfill(separator); + + if (fwd_connected_ && use_bestpath_ && + ((stats.getAverageRtt() > rtt_threshold_) || + ((stats.getResidualLossRate() * 100) > loss_threshold_))) { + forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath"); } -#endif - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "QueuingDelay"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "DefinitelyLost"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay"; - std::cout << std::left << std::setw(width) << "FecPkt" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << queuing_delay.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << definitely_lost_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::left << std::setw(width) << fec_pkt.str(); + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "RecvData[pkt]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Goodput[Mbps]"; + std::cout << std::right << std::setw(width) << "LossRate[%]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "InterestSent"; + std::cout << std::right << std::setw(width) << "ReceivedNacks"; + std::cout << std::right << std::setw(width) << "SyncWnd[pkt]"; + std::cout << std::right << std::setw(width) << "MinRtt[ms]"; + std::cout << std::right << std::setw(width) << "QueuingDelay[ms]"; + std::cout << std::right << std::setw(width) << "LostData[pkt]"; + std::cout << std::right << std::setw(width) << "RecoveredData"; + std::cout << std::right << std::setw(width) << "DefinitelyLost"; + std::cout << std::right << std::setw(width) << "State"; + std::cout << std::right << std::setw(width) << "DataDelay[ms]"; + std::cout << std::right << std::setw(width) << "FecPkt"; + std::cout << std::right << std::setw(width) << "Congestion"; + std::cout << std::right << std::setw(width) << "ResidualLosses"; + std::cout << std::right << std::setw(width) << "QualityScore"; + std::cout << std::right << std::setw(width) << "Alerts"; + std::cout << std::right << std::setw(width) << "AuthAlerts" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << received_data_pkt.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << goodput.str(); + std::cout << std::right << std::setw(width) << loss_rate.str(); + std::cout << std::right << std::setw(width) << retx_sent.str(); + std::cout << std::right << std::setw(width) << interest_sent.str(); + std::cout << std::right << std::setw(width) << nacks.str(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str(); + std::cout << std::right << std::setw(width) << queuing_delay.str(); + std::cout << std::right << std::setw(width) << lost_data.str(); + std::cout << std::right << std::setw(width) << bytes_recovered_data.str(); + std::cout << std::right << std::setw(width) << definitely_lost_data.str(); + std::cout << std::right << std::setw(width) << stats.getCCStatus(); + std::cout << std::right << std::setw(width) << data_delay.str(); + std::cout << std::right << std::setw(width) << fec_pkt.str(); + std::cout << std::right << std::setw(width) << stats.isCongested(); + std::cout << std::right << std::setw(width) << residual_losses.str(); + std::cout << std::right << std::setw(width) << quality_score.str(); + std::cout << std::right << std::setw(width) << alerts.str(); + std::cout << std::right << std::setw(width) << auth_alerts.str(); std::cout << std::endl; if (configuration_.test_mode_) { if (data_delays_.size() > 0) data_delays_.pop_back(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]" - << std::endl; + auto now = utils::SteadyTime::nowMs(); + std::cout << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl; } // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); + // std::cout << std::right << std::setw(width) << interest_fec_tx.str(); + // std::cout << std::right << std::setw(width) << bytes_fec_recv.str(); } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "Transfer[MB]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "Cwnd[Int]"; + std::cout << std::right << std::setw(width) << "AvgRtt[ms]" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << bytes_transferred.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << stats.getRetxCount(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl; } + total_duration_milliseconds_ += (uint32_t)exact_duration.count(); old_bytes_value_ = stats.getBytesRecv(); old_lost_data_value_ = stats.getLostData(); @@ -289,9 +305,95 @@ class HIperfClient::Impl received_data_pkt_ = 0; data_delays_ = ""; - t_stats_ = utils::SteadyClock::now(); + t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & header_counter_mask_; + + if (--configuration_.nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + io_service_.stop(); + } } + bool setForwarderConnection(forwarder_type_t forwarder_type) { + using namespace libconfig; + Config cfg; + + const char *conf_file = getenv("FORWARDER_CONFIG"); + if (!conf_file) return false; + + if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG)) + return false; + + try { + cfg.readFile(conf_file); + } catch (const FileIOException &fioex) { + std::cerr << "I/O error while reading file." << std::endl; + return false; + } catch (const ParseException &pex) { + std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() + << " - " << pex.getError() << std::endl; + return false; + } + + Setting &config = cfg.getRoot(); + + /* conf file example + * + * use_bestpath = "ON | OFF" + * rtt_threshold = 200 //ms + * loss_threshold = 20 //% + * name = "b001::/16" + */ + + if (config.exists("use_bestpath")) { + std::string val; + config.lookupValue("use_bestpath", val); + if (val.compare("ON") == 0) use_bestpath_ = true; + } + + if (config.exists("rtt_threshold")) { + unsigned val; + config.lookupValue("rtt_threshold", val); + rtt_threshold_ = val; + } + + if (config.exists("loss_threshold")) { + unsigned val; + config.lookupValue("loss_threshold", val); + loss_threshold_ = val; + } + + if (config.exists("name")) { + std::string route; + config.lookupValue("name", route); + + std::string delimiter = "/"; + size_t pos = 0; + + if ((pos = route.find(delimiter)) != std::string::npos) { + prefix_name_ = route.substr(0, pos); + route.erase(0, pos + delimiter.length()); + prefix_len_ = std::stoul(route.substr(0)); + } + } + + forwarder_interface_.initForwarderInterface(this, forwarder_type); + + return true; + } + + void onHicnServiceReady() override { + std::cout << "Successfully connected to local forwarder!" << std::endl; + fwd_connected_ = true; + } + + void onRouteConfigured( + std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { + std::cout << "Routes successfully configured!" << std::endl; + } + +#ifdef FORWARDER_INTERFACE bool parseConfig(const char *conf_file) { using namespace libconfig; Config cfg; @@ -439,7 +541,6 @@ class HIperfClient::Impl return true; } -#ifdef FORWARDER_INTERFACE void onHicnServiceReady() override { std::cout << "Successfully connected to local forwarder!" << std::endl; @@ -541,6 +642,13 @@ class HIperfClient::Impl } #endif + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix suffix, + transport::auth::VerificationPolicy policy) { + auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + int setup() { int ret; @@ -557,6 +665,7 @@ class HIperfClient::Impl producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); producer_socket_->registerPrefix(configuration_.relay_name_); producer_socket_->connect(); + producer_socket_->start(); } if (configuration_.output_stream_mode_ && configuration_.rtc_) { @@ -586,6 +695,17 @@ class HIperfClient::Impl GeneralTransportOptions::INTEREST_LIFETIME, configuration_.interest_lifetime_); + consumer_socket_->setSocketOption( + GeneralTransportOptions::MAX_UNVERIFIED_TIME, + configuration_.unverified_delay_); + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + #if defined(DEBUG) && defined(__linux__) std::shared_ptr<transport::BasePortal> portal; consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); @@ -623,20 +743,24 @@ class HIperfClient::Impl } } + std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>(); + if (!configuration_.producer_certificate.empty()) { - std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>( + verifier = std::make_shared<AsymmetricVerifier>( configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; } if (!configuration_.passphrase.empty()) { - std::shared_ptr<Verifier> verifier = - std::make_shared<SymmetricVerifier>(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; + verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase); + } + + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); + + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } ret = consumer_socket_->setSocketOption( @@ -662,6 +786,65 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + if (configuration_.recovery_strategy_ == 1) { // unreliable + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF); + } else if (configuration_.recovery_strategy_ == 2) { // rtx only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } else if (configuration_.recovery_strategy_ == 3) { // fec only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY); + } else if (configuration_.recovery_strategy_ == 4) { // delay based + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED); + } else if (configuration_.recovery_strategy_ == 5) { // low rate flow + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE); + } else if (configuration_.recovery_strategy_ == + 6) { // low rate + bestpath + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH); + } else if (configuration_.recovery_strategy_ == + 7) { // low rate + replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION); + } else if (configuration_.recovery_strategy_ == + 8) { // low rate + bestpath or replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES); + } else { + // default + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, (ConsumerContentObjectCallback)std::bind( @@ -673,6 +856,15 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { std::shared_ptr<TransportStatistics> transport_stats; consumer_socket_->getSocketOption( OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); @@ -708,10 +900,10 @@ class HIperfClient::Impl signals_.async_wait( [this](const std::error_code &, const int &) { io_service_.stop(); }); - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); + t_download_ = t_stats_ = utils::SteadyTime::now(); + consumer_socket_->consume(configuration_.name); + io_service_.run(); consumer_socket_->stop(); return ERROR_SUCCESS; @@ -719,11 +911,15 @@ class HIperfClient::Impl private: class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; + static constexpr std::size_t mtu = HIPERF_MTU; public: RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + Packet::Format format = + PayloadSize::getFormatFromName(client_.configuration_.name, false); + payload_size_max_ = + PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE); } bool isBufferMovable() noexcept override { return false; } @@ -743,9 +939,7 @@ class HIperfClient::Impl uint64_t *senderTimeStamp = (uint64_t *)client_.configuration_.receive_buffer->writableData(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + auto now = utils::SystemTime::nowMs().count(); double new_delay = (double)(now - *senderTimeStamp); if (*senderTimeStamp > now) @@ -765,7 +959,7 @@ class HIperfClient::Impl client_.producer_socket_->produceDatagram( client_.configuration_.relay_name_.getName(), client_.configuration_.receive_buffer->writableData(), - length < 1400 ? length : 1400); + length < payload_size_max_ ? length : payload_size_max_); } if (client_.configuration_.output_stream_mode_) { uint8_t *start = @@ -778,7 +972,7 @@ class HIperfClient::Impl size_t maxBufferSize() const override { return mtu; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error while reading from RTC socket" << std::endl; client_.io_service_.stop(); } @@ -789,6 +983,7 @@ class HIperfClient::Impl private: Impl &client_; + std::size_t payload_size_max_; }; class Callback : public ConsumerSocket::ReadCallback { @@ -816,16 +1011,15 @@ class HIperfClient::Impl return client_.configuration_.receive_buffer_size_; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error " << ec.message() << " while reading from socket" << std::endl; client_.io_service_.stop(); } void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); + auto t2 = utils::SteadyTime::now(); + auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2); long usec = (long)dt.count(); std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" @@ -843,8 +1037,8 @@ class HIperfClient::Impl }; hiperf::ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; + utils::SteadyTime::TimePoint t_stats_; + utils::SteadyTime::TimePoint t_download_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; uint64_t old_interest_tx_value_; @@ -865,6 +1059,7 @@ class HIperfClient::Impl uint32_t received_bytes_; uint32_t received_data_pkt_; + uint32_t auth_alerts_; std::string data_delays_; @@ -878,19 +1073,44 @@ class HIperfClient::Impl asio::ip::udp::endpoint remote_; ForwarderConfiguration config_; - uint16_t switch_threshold_; /* ms */ - bool done_; + // uint16_t switch_threshold_; /* ms */ + bool fwd_connected_; + bool use_bestpath_; + uint32_t rtt_threshold_; /* ms */ + uint32_t loss_threshold_; /* ms */ + std::string prefix_name_; // bestpath route + uint32_t prefix_len_; + // bool done_; + std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; -#ifdef FORWARDER_INTERFACE + uint16_t header_counter_mask_; + uint16_t header_counter_; + + bool print_headers_; + bool first_; + ForwarderInterface forwarder_interface_; -#endif }; HIperfClient::HIperfClient(const ClientConfiguration &conf) { impl_ = new Impl(conf); } +HIperfClient::HIperfClient(HIperfClient &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfClient &HIperfClient::operator=(HIperfClient &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfClient::~HIperfClient() { delete impl_; } int HIperfClient::setup() { return impl_->setup(); } diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h index f45b9af43..bc80c874c 100644 --- a/apps/hiperf/src/client.h +++ b/apps/hiperf/src/client.h @@ -16,12 +16,16 @@ #pragma once #include <common.h> +#include <hicn/transport/utils/noncopyable.h> namespace hiperf { -class HIperfClient { +class HIperfClient : ::utils::NonCopyable { public: HIperfClient(const ClientConfiguration &conf); + HIperfClient(HIperfClient &&other); + HIperfClient &operator=(HIperfClient &&other); + ~HIperfClient(); int setup(); void run(); diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h index e6ba526f9..13c9dcc1d 100644 --- a/apps/hiperf/src/common.h +++ b/apps/hiperf/src/common.h @@ -15,7 +15,7 @@ #pragma once -#include <hicn/transport/auth/identity.h> +#include <forwarder_interface.h> #include <hicn/transport/auth/signer.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> @@ -45,6 +45,9 @@ #endif #define ERROR_SETUP -5 #define MIN_PROBE_SEQ 0xefffffff +#define RTC_HEADER_SIZE 12 +#define FEC_HEADER_MAX_SIZE 36 +#define HIPERF_MTU 1500 using namespace transport::interface; using namespace transport::auth; @@ -73,11 +76,51 @@ static inline uint64_t _htonll(const uint64_t *input) { namespace hiperf { /** + * Class to retrieve the maximum payload size given the MTU and packet headers. + */ +class PayloadSize { + public: + PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU) + : mtu_(mtu), format_(format) {} + + std::size_t getPayloadSizeMax(std::size_t transport_size = 0, + std::size_t fec_size = 0, + std::size_t signature_size = 0) { + return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) - + transport_size - fec_size; + } + + static Packet::Format getFormatFromName(Name name, bool ah = false) { + switch (name.getAddressFamily()) { + case AF_INET: + return ah ? HF_INET_TCP_AH : HF_INET_TCP; + case AF_INET6: + return ah ? HF_INET6_TCP_AH : HF_INET6_TCP; + default: + return HF_UNSPEC; + } + } + + private: + std::size_t mtu_; + Packet::Format format_; +}; + +/** * Class for handling the production rate for the RTC producer. */ class Rate { public: Rate() : rate_kbps_(0) {} + ~Rate() {} + + Rate &operator=(const Rate &other) { + if (this != &other) { + rate_kbps_ = other.rate_kbps_; + } + + return *this; + } Rate(const std::string &rate) { std::size_t found = rate.find("kbps"); @@ -137,9 +180,16 @@ struct ClientConfiguration { secure_(false), producer_prefix_(), interest_lifetime_(500), + unverified_delay_(2000), relay_name_("c001::abcd/64"), output_stream_mode_(false), - port_(0) {} + port_(0), + recovery_strategy_(4), + aggregated_data_(false), + fec_type_(""), + packet_format_(default_values::packet_format), + print_headers_(true), + nb_iterations_(std::numeric_limits<decltype(nb_iterations_)>::max()) {} Name name; double beta; @@ -158,9 +208,17 @@ struct ClientConfiguration { bool secure_; Prefix producer_prefix_; uint32_t interest_lifetime_; + uint32_t unverified_delay_; Prefix relay_name_; bool output_stream_mode_; uint16_t port_; + uint32_t recovery_strategy_; + bool aggregated_data_; + std::string fec_type_; + Packet::Format packet_format_; + bool print_headers_; + std::uint32_t nb_iterations_; + forwarder_type_t forwarder_type_; }; /** @@ -170,11 +228,11 @@ struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), virtual_producer(true), - manifest(false), + manifest(0), live_production(false), content_lifetime(600000000_U32), download_size(20 * 1024 * 1024), - hash_algorithm(CryptoHashType::SHA256), + hash_algorithm_(CryptoHashType::SHA256), keystore_name(""), passphrase(""), keystore_password("cisco"), @@ -185,18 +243,21 @@ struct ServerConfiguration { trace_index_(0), trace_file_(nullptr), production_rate_(std::string("2048kbps")), - payload_size_(1400), + payload_size_(1384), secure_(false), input_stream_mode_(false), - port_(0) {} + port_(0), + aggregated_data_(false), + fec_type_(""), + packet_format_(default_values::packet_format) {} Prefix name; bool virtual_producer; - bool manifest; + std::uint32_t manifest; bool live_production; std::uint32_t content_lifetime; std::uint32_t download_size; - CryptoHashType hash_algorithm; + CryptoHashType hash_algorithm_; std::string keystore_name; std::string passphrase; std::string keystore_password; @@ -212,6 +273,9 @@ struct ServerConfiguration { bool input_stream_mode_; uint16_t port_; std::vector<struct packet_t> trace_; + bool aggregated_data_; + std::string fec_type_; + Packet::Format packet_format_; }; } // namespace hiperf diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h index 655ac3b66..aaac14839 100644 --- a/apps/hiperf/src/forwarder_config.h +++ b/apps/hiperf/src/forwarder_config.h @@ -94,4 +94,4 @@ class ForwarderConfiguration { std::size_t n_threads_; }; -}
\ No newline at end of file +} // namespace hiperf
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc index 864208239..e87a5953d 100644 --- a/apps/hiperf/src/forwarder_interface.cc +++ b/apps/hiperf/src/forwarder_interface.cc @@ -25,6 +25,7 @@ extern "C" { #include <hicn/error.h> #include <hicn/util/ip_address.h> +#include <hicn/util/sstrncpy.h> } // XXX the main listener should be retrieve in this class at initialization, aka @@ -36,27 +37,14 @@ extern "C" { namespace hiperf { +ForwarderInterface::ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), timer_(io_service) {} + ForwarderInterface::ForwarderInterface(asio::io_service &io_service, - ICallback *callback) - : external_ioservice_(io_service), - forwarder_interface_callback_(callback), - work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), - sock_(nullptr), - thread_(std::make_unique<std::thread>([this]() { - std::cout << "Starting Forwarder Interface thread" << std::endl; - internal_ioservice_.run(); - std::cout << "Stopping Forwarder Interface thread" << std::endl; - })), - // set_route_callback_(std::forward<Callback &&>(setRouteCallback)), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - hicn_listen_port_(9695), - /* We start in disabled state even when a forwarder is always available */ - state_(State::Disabled), - timer_(io_service), - num_reattempts(0) { - std::cout << "Forwarder interface created... connecting to forwarder...\n"; - internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); + ICallback *callback, + forwarder_type_t fwd_type) + : external_ioservice_(io_service), timer_(io_service) { + initForwarderInterface(callback, fwd_type); } ForwarderInterface::~ForwarderInterface() { @@ -72,8 +60,27 @@ ForwarderInterface::~ForwarderInterface() { thread_->join(); } +} - std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl; +void ForwarderInterface::initForwarderInterface(ICallback *callback, + forwarder_type_t fwd_type) { + forwarder_interface_callback_ = callback; + work_ = std::make_unique<asio::io_service::work>(internal_ioservice_); + sock_ = nullptr; + thread_ = std::make_unique<std::thread>([this]() { + std::cout << "Starting Forwarder Interface thread" << std::endl; + internal_ioservice_.run(); + std::cout << "Stopping Forwarder Interface thread" << std::endl; + }); + check_routes_timer_ = nullptr; + pending_add_route_counter_ = 0; + hicn_listen_port_ = 9695; + /* We start in disabled state even when a forwarder is always available */ + state_ = State::Disabled; + fwd_type_ = fwd_type; + num_reattempts = 0; + std::cout << "Forwarder interface created... connecting to forwarder...\n"; + internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); } void ForwarderInterface::onHicnServiceAvailable(bool flag) { @@ -93,26 +100,15 @@ void ForwarderInterface::onHicnServiceAvailable(bool flag) { std::cout << "Connected to forwarder... cancelling reconnection timer" << std::endl; + timer_.cancel(); num_reattempts = 0; - // case State::Connected: - // checkListener(); - - // if (state_ != State::Ready) { - // std::cout << "Listener not found" << std::endl; - // goto REATTEMPT; - // } - // state_ = State::Ready; - - // timer_.cancel(); - // num_reattempts = 0; - std::cout << "Forwarder interface is ready... communicate to controller" << std::endl; forwarder_interface_callback_->onHicnServiceReady(); - + case State::Connected: case State::Ready: break; } @@ -136,14 +132,14 @@ REATTEMPT: std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS)); // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable, // this, flag, std::placeholders::_1)); - timer_.async_wait([this, flag](std::error_code ec) { + timer_.async_wait([this, flag](const std::error_code &ec) { if (ec) return; onHicnServiceAvailable(flag); }); } int ForwarderInterface::connectToForwarder() { - sock_ = hc_sock_create(); + sock_ = hc_sock_create_forwarder(fwd_type_); if (!sock_) { std::cout << "Could not create socket" << std::endl; goto ERR_SOCK; @@ -256,6 +252,27 @@ void ForwarderInterface::deleteFaceAndRoutes( }); } +void ForwarderInterface::setStrategy(std::string prefix, uint32_t prefix_len, + std::string strategy) { + if (!sock_) return; + + ip_address_t ip_prefix; + if (ip_address_pton(prefix.c_str(), &ip_prefix) < 0) { + return; + } + + strategy_type_t strategy_type = strategy_type_from_str(strategy.c_str()); + if (strategy_type == STRATEGY_TYPE_UNDEFINED) return; + + hc_strategy_t strategy_conf; + strategy_conf.address = ip_prefix; + strategy_conf.len = prefix_len; + strategy_conf.family = AF_INET6; + strategy_conf.type = strategy_type; + + hc_strategy_set(sock_, &strategy_conf); +} + void ForwarderInterface::internalDeleteFaceAndRoute( const RouteInfoPtr &route_info) { if (!sock_) return; @@ -346,10 +363,11 @@ void ForwarderInterface::internalCreateFaceAndRoutes( } max_try--; timer->expires_from_now(std::chrono::milliseconds(500)); - timer->async_wait([this, failed, max_try, timer](std::error_code ec) { - if (ec) return; - internalCreateFaceAndRoutes(failed, max_try, timer); - }); + timer->async_wait( + [this, failed, max_try, timer](const std::error_code &ec) { + if (ec) return; + internalCreateFaceAndRoutes(failed, max_try, timer); + }); return; } @@ -422,16 +440,18 @@ int ForwarderInterface::tryToCreateFace(RouteInfo *route_info, std::string name = "l_" + route_info->name; listener.local_addr = local_address; - listener.type = CONNECTION_TYPE_UDP; + listener.type = FACE_TYPE_UDP; listener.family = AF_INET; listener.local_port = route_info->local_port; - strncpy(listener.name, name.c_str(), sizeof(listener.name)); - strncpy(listener.interface_name, route_info->interface.c_str(), - sizeof(listener.interface_name)); + int ret = strcpy_s(listener.name, SYMBOLIC_NAME_LEN - 1, name.c_str()); + if (ret < EOK) goto ERR; + ret = strcpy_s(listener.interface_name, INTERFACE_LEN - 1, + route_info->interface.c_str()); + if (ret < EOK) goto ERR; std::cout << "------------> " << route_info->interface << std::endl; - int ret = hc_listener_create(sock_, &listener); + ret = hc_listener_create(sock_, &listener); if (ret < 0) { std::cerr << "Error creating listener." << std::endl; @@ -520,7 +540,7 @@ int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info, #if 0 // not used void ForwarderInterface::checkRoutesLoop() { check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000)); - check_routes_timer_->async_wait([this](std::error_code ec) { + check_routes_timer_->async_wait([this](const std::error_code &ec) { if (ec) return; if (pending_add_route_counter_ == 0) checkRoutes(); }); @@ -580,7 +600,7 @@ void ForwarderInterface::checkRoutes() { } else { // XXX This should be moved somewhere else getMainListener( - [this](std::error_code ec, uint32_t hicn_listen_port) { + [this](const std::error_code &ec, uint32_t hicn_listen_port) { if (!ec) { hicn_listen_port_ = hicn_listen_port; @@ -597,7 +617,7 @@ void ForwarderInterface::checkRoutes() { tryToConnectToForwarder(); } private: - void doGetMainListener(std::error_code ec) + void doGetMainListener(const std::error_code &ec) { if (!ec) { @@ -607,7 +627,7 @@ void ForwarderInterface::checkRoutes() { { // Since without the main listener of the forwarder the proxy cannot // work, we can stop the program here until we get the listener port. - std::cout << + std::cout << "Could not retrieve main listener port from the forwarder. " "Retrying."; @@ -635,7 +655,7 @@ void ForwarderInterface::checkRoutes() { doTryToConnectToForwarder(std::make_error_code(std::errc(0))); } - void doTryToConnectToForwarder(std::error_code ec) + void doTryToConnectToForwarder(const std::error_code &ec) { if (!ec) { @@ -673,4 +693,4 @@ void ForwarderInterface::checkRoutes() { constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS; constexpr uint32_t ForwarderInterface::MAX_REATTEMPT; -} // namespace hiperf
\ No newline at end of file +} // namespace hiperf diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h index 7591ea257..e58989295 100644 --- a/apps/hiperf/src/forwarder_interface.h +++ b/apps/hiperf/src/forwarder_interface.h @@ -15,6 +15,8 @@ #pragma once +#include <hicn/transport/utils/noncopyable.h> + extern "C" { #ifndef WITH_POLICY #define WITH_POLICY @@ -27,14 +29,13 @@ extern "C" { #define ASIO_STANDALONE #endif #include <asio.hpp> - #include <functional> #include <thread> #include <unordered_map> namespace hiperf { -class ForwarderInterface { +class ForwarderInterface : ::utils::NonCopyable { static const uint32_t REATTEMPT_DELAY_MS = 500; static const uint32_t MAX_REATTEMPT = 10; @@ -68,10 +69,14 @@ class ForwarderInterface { }; public: - ForwarderInterface(asio::io_service &io_service, ICallback *callback); + explicit ForwarderInterface(asio::io_service &io_service); + explicit ForwarderInterface(asio::io_service &io_service, ICallback *callback, + forwarder_type_t fwd_type); ~ForwarderInterface(); + void initForwarderInterface(ICallback *callback, forwarder_type_t fwd_type); + State getState(); void setState(State state); @@ -88,11 +93,16 @@ class ForwarderInterface { void deleteFaceAndRoute(const RouteInfoPtr &route_info); + void setStrategy(std::string prefix, uint32_t prefix_len, + std::string strategy); + void close(); uint16_t getHicnListenerPort() { return hicn_listen_port_; } private: + ForwarderInterface &operator=(const ForwarderInterface &other) = delete; + int connectToForwarder(); int checkListener(); @@ -123,6 +133,8 @@ class ForwarderInterface { State state_; + forwarder_type_t fwd_type_; + /* Reattempt timer */ asio::steady_timer timer_; unsigned num_reattempts; diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc index b2d99c4a4..b69392de8 100644 --- a/apps/hiperf/src/main.cc +++ b/apps/hiperf/src/main.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,13 +14,13 @@ */ #include <client.h> -#include <server.h> #include <forwarder_interface.h> +#include <server.h> namespace hiperf { void usage() { - std::cerr << "HIPERF - A tool for performing network throughput " + std::cerr << "HIPERF - Instrumentation tool for performing active network" "measurements with hICN" << std::endl; std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; @@ -34,19 +34,27 @@ void usage() { std::cerr << "-f\t<filename>\t\t\t" << "Log file" << std::endl; std::cerr << "-z\t<io_module>\t\t\t" - << "IO module to use. Default: hicnlight_module" << std::endl; + << "IO module to use. Default: hicnlightng_module" << std::endl; + std::cerr << "-F\t<conf_file>\t\t\t" + << "Path to optional configuration file for libtransport" + << std::endl; + std::cerr << "-a\t\t\t\t\t" + << "Enables data packet aggregation. " + << "Works only in RTC mode" << std::endl; + std::cerr << "-X\t<param>\t\t\t\t" + << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#" + << std::endl; #endif std::cerr << std::endl; std::cerr << "SERVER SPECIFIC:" << std::endl; std::cerr << "-A\t<content_size>\t\t\t" - "Size of the content to publish. This " - "is not the size of the packet (see -s for it)." - << std::endl; - std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." + "Sends an application data unit in bytes that is published once " + "before exit" << std::endl; + std::cerr << "-s\t<packet_size>\t\t\tData packet payload size." << std::endl; std::cerr << "-r\t\t\t\t\t" << "Produce real content of <content_size> bytes" << std::endl; - std::cerr << "-m\t\t\t\t\t" + std::cerr << "-m\t<manifest_capacity>\t\t" << "Produce transport manifest" << std::endl; std::cerr << "-l\t\t\t\t\t" << "Start producing content upon the reception of the " @@ -60,20 +68,17 @@ void usage() { << "String from which a 128-bit symmetric key will be " "derived for signing packets" << std::endl; + std::cerr << "-p\t<password>\t\t\t" + << "Password for p12 keystore" << std::endl; std::cerr << "-y\t<hash_algorithm>\t\t" << "Use the selected hash algorithm for " - "calculating manifest digests" + "computing manifest digests (default: SHA256)" << std::endl; - std::cerr << "-p\t<password>\t\t\t" - << "Password for p12 keystore" << std::endl; std::cerr << "-x\t\t\t\t\t" - << "Produce a content of <content_size>, then after downloading " - "it produce a new content of" - << "\n\t\t\t\t\t<content_size> without resetting " - "the suffix to 0." - << std::endl; + << "Produces application data units of size <content_size> " + << "without resetting the name suffix to 0." << std::endl; std::cerr << "-B\t<bitrate>\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." + << "RTC producer data bitrate, to be used with the -R option." << std::endl; #ifndef _WIN32 std::cerr << "-I\t\t\t\t\t" @@ -92,8 +97,8 @@ void usage() { "file containing the " "crypto material used for the TLS handshake" << std::endl; - std::cerr << "-G\t<port>\t\t\t" - << "input stream from localhost at the specified port" << std::endl; + std::cerr << "-G\t<port>\t\t\t\t" + << "Input stream from localhost at the specified port" << std::endl; #endif std::cerr << std::endl; std::cerr << "CLIENT SPECIFIC:" << std::endl; @@ -105,6 +110,8 @@ void usage() { << std::endl; std::cerr << "-L\t<interest lifetime>\t\t" << "Set interest lifetime." << std::endl; + std::cerr << "-u\t<delay>\t\t\t\t" + << "Set max lifetime of unverified packets." << std::endl; std::cerr << "-M\t<input_buffer_size>\t\t" << "Size of consumer input buffer. If 0, reassembly of packets " "will be disabled." @@ -127,17 +134,31 @@ void usage() { std::cerr << "-t\t\t\t\t\t" "Test mode, check if the client is receiving the " "correct data. This is an RTC specific option, to be " - "used with the -R (default false)" + "used with the -R (default: false)" << std::endl; std::cerr << "-P\t\t\t\t\t" << "Prefix of the producer where to do the handshake" << std::endl; std::cerr << "-j\t<relay_name>\t\t\t" - << "Publish the received content under the name relay_name." + << "Publish received content under the name relay_name." "This is an RTC specific option, to be " - "used with the -R (default false)" + "used with the -R (default: false)" + << std::endl; + std::cerr << "-g\t<port>\t\t\t\t" + << "Output stream to localhost at the specified port" << std::endl; + std::cerr << "-e\t<strategy>\t\t\t" + << "Enance the network with a realiability strategy. Options 1:" + << " unreliable, 2: rtx only, 3: fec only, " + << "4: delay based, 5: low rate, 6: low rate and best path " + << "7: low rate and replication, 8: low rate and best" + << " path/replication" + << "(default: 2 = rtx only) " << std::endl; + std::cerr << "-H\t\t\t\t\t" + << "Disable periodic print headers in stats report." << std::endl; + std::cerr << "-n\t<nb_iterations>\t\t\t" + << "Print the stats report <nb_iterations> times and exit.\n" + << "\t\t\t\t\tThis option limits the duration of the run to " + "<nb_iterations> * <stats_interval> milliseconds." << std::endl; - std::cerr << "-g\t<port>\t\t\t" - << "output stream to localhost at the specified port" << std::endl; } int main(int argc, char *argv[]) { @@ -156,7 +177,7 @@ int main(int argc, char *argv[]) { char *log_file = nullptr; transport::interface::global_config::IoModuleConfiguration config; std::string conf_file; - config.name = "hicnlight_module"; + config.name = "hicnlightng_module"; // Consumer ClientConfiguration client_configuration; @@ -166,10 +187,9 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt( - argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) != - -1) { + while ((opt = getopt(argc, argv, + "DSCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:" + "g:G:e:awHn:X:u:")) != -1) { switch (opt) { // Common case 'D': { @@ -202,9 +222,11 @@ int main(int argc, char *argv[]) { break; } #else - while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) != - -1) { + while ( + (opt = getopt( + argc, argv, + "SCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xB:E:P:tL:z:F:j:e:awHn:X:u:")) != + -1) { switch (opt) { #endif case 'f': { @@ -216,6 +238,21 @@ int main(int argc, char *argv[]) { server_configuration.rtc_ = true; break; } + case 'a': { + client_configuration.aggregated_data_ = true; + server_configuration.aggregated_data_ = true; + break; + } + case 'X': { + client_configuration.fec_type_ = std::string(optarg); + server_configuration.fec_type_ = std::string(optarg); + break; + } + case 'w': { + client_configuration.packet_format_ = Packet::Format::HF_INET6_UDP; + server_configuration.packet_format_ = Packet::Format::HF_INET6_UDP; + break; + } case 'z': { config.name = optarg; break; @@ -286,12 +323,27 @@ int main(int argc, char *argv[]) { options = 1; break; } + case 'u': { + client_configuration.unverified_delay_ = std::stoul(optarg); + options = 1; + break; + } case 'j': { client_configuration.relay_ = true; client_configuration.relay_name_ = Prefix(optarg); options = 1; break; } + case 'H': { + client_configuration.print_headers_ = false; + options = 1; + break; + } + case 'n': { + client_configuration.nb_iterations_ = std::stoul(optarg); + options = 1; + break; + } // Server specific case 'A': { server_configuration.download_size = std::stoul(optarg); @@ -309,7 +361,7 @@ int main(int argc, char *argv[]) { break; } case 'm': { - server_configuration.manifest = true; + server_configuration.manifest = std::stoul(optarg); options = -1; break; } @@ -324,18 +376,19 @@ int main(int argc, char *argv[]) { break; } case 'y': { + CryptoHashType hash_algorithm = CryptoHashType::SHA256; if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA256; + hash_algorithm = CryptoHashType::SHA256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA512; + hash_algorithm = CryptoHashType::SHA512; } else if (strncasecmp(optarg, "blake2b512", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512; + hash_algorithm = CryptoHashType::BLAKE2B512; } else if (strncasecmp(optarg, "blake2s256", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256; + hash_algorithm = CryptoHashType::BLAKE2S256; } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; + std::cerr << "Unknown hash algorithm. Using SHA 256." << std::endl; } + server_configuration.hash_algorithm_ = hash_algorithm; options = -1; break; } @@ -361,6 +414,11 @@ int main(int argc, char *argv[]) { server_configuration.secure_ = true; break; } + case 'e': { + client_configuration.recovery_strategy_ = std::stoul(optarg); + options = 1; + break; + } case 'h': default: usage(); @@ -430,6 +488,13 @@ int main(int argc, char *argv[]) { transport::interface::global_config::parseConfigurationFile(conf_file); if (role > 0) { + // set forwarder type + client_configuration.forwarder_type_ = UNDEFINED; + if (config.name.compare("hicnlightng_module") == 0) + client_configuration.forwarder_type_ = HICNLIGHT; + else if (config.name.compare("hicnlightng_module") == 0) + client_configuration.forwarder_type_ = HICNLIGHT_NG; + HIperfClient c(client_configuration); if (c.setup() != ERROR_SETUP) { c.run(); diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc index 968d42e2c..7101e7a4a 100644 --- a/apps/hiperf/src/server.cc +++ b/apps/hiperf/src/server.cc @@ -21,7 +21,7 @@ namespace hiperf { * Hiperf server class: configure and setup an hicn producer following the * ServerConfiguration. */ -class HIperfServer::Impl { +class HIperfServer::Impl : public ProducerSocket::Callback { const std::size_t log2_content_object_buffer_size = 8; public: @@ -35,11 +35,8 @@ class HIperfServer::Impl { mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), last_segment_(0), #ifndef _WIN32 - ptr_last_segment_(&last_segment_), input_(io_service_), rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), #endif flow_name_(configuration_.name.getName()), socket_(io_service_), @@ -54,14 +51,16 @@ class HIperfServer::Impl { #endif for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); + content_objects_[i] = ContentObject::Ptr(new ContentObject( + configuration_.name.getName(), configuration_.packet_format_, 0, + (const uint8_t *)buffer.data(), buffer.size())); content_objects_[i]->setLifetime( default_values::content_object_expiry_time); } } + virtual ~Impl() {} + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); @@ -91,16 +90,14 @@ class HIperfServer::Impl { if (suffix == 0) { last_segment_ = 0; - ptr_last_segment_ = &last_segment_; unsatisfied_interests_.clear(); } - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed + // The suffix will either come from the received interest or will be set to + // the smallest suffix of a previous interest not satisfied if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); + auto it = std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), last_segment_); if (it != unsatisfied_interests_.end()) { suffix = *it; } @@ -121,27 +118,28 @@ class HIperfServer::Impl { b->append(configuration_.download_size); uint32_t total; - utils::TimePoint t0 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now(); total = p.produceStream(content_name, std::move(b), !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now(); - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() - << " us)" << std::endl; + std::cout << "Written " << total + << " data packets in output buffer (Segmentation time: " + << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)" + << std::endl; } void produceContentAsync(ProducerSocket &p, Name content_name, uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); + produce_thread_.add([this, suffix, content_name]() { + auto b = utils::MemBuf::create(configuration_.download_size); + std::memset(b->writableData(), '?', configuration_.download_size); + b->append(configuration_.download_size); + + last_segment_ = suffix + producer_socket_->produceStream( + content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + }); } void cacheMiss(ProducerSocket &p, const Interest &interest) { @@ -156,27 +154,22 @@ class HIperfServer::Impl { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path, - std::string &keystore_pwd, - CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type); - } - return std::make_shared<Identity>(keystore_path, keystore_pwd, - CryptoSuite::RSA_SHA256, 1024, 365, - "producer-test"); + void produceError(const std::error_code &err) noexcept override { + std::cerr << "Error from producer transport: " << err.message() + << std::endl; + producer_socket_->stop(); + io_service_.stop(); } int setup() { int ret; int production_protocol; + std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>(); if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); producer_socket_ = std::make_unique<P2PSecureProducerSocket>( - configuration_.rtc_, identity); + configuration_.rtc_, configuration_.keystore_name, + configuration_.keystore_password); } else { if (!configuration_.rtc_) { production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; @@ -188,36 +181,79 @@ class HIperfServer::Impl { } if (producer_socket_->setSocketOption( + ProducerCallbacksOptions::PRODUCER_CALLBACK, this) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "Failed to set producer callback." << std::endl; + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::HASH_ALGORITHM, + configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption(PACKET_FORMAT, + configuration_.packet_format_) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + if (!configuration_.passphrase.empty()) { - std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>( - CryptoSuite::HMAC_SHA256, configuration_.passphrase); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256, + configuration_.passphrase); } if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - std::shared_ptr<Signer> signer = identity->getSigner(); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<AsymmetricSigner>( + configuration_.keystore_name, configuration_.keystore_password); + } + + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer); + + // Compute maximum payload size + Packet::Format format = PayloadSize::getFormatFromName( + configuration_.name.getName(), !configuration_.manifest); + payload_size_max_ = PayloadSize(format).getPayloadSizeMax( + configuration_.rtc_ ? RTC_HEADER_SIZE : 0, + configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE, + !configuration_.manifest ? signer->getSignatureFieldSize() : 0); + + if (configuration_.payload_size_ > payload_size_max_) { + std::cerr << "WARNING: Payload has size " << configuration_.payload_size_ + << ", maximum is " << payload_size_max_ + << ". Payload will be truncated to fit." << std::endl; + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); producer_socket_->registerPrefix(configuration_.name); producer_socket_->connect(); + producer_socket_->start(); if (configuration_.rtc_) { std::cout << "Running RTC producer: the prefix length will be ignored." @@ -239,6 +275,13 @@ class HIperfServer::Impl { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAX_SEGMENT_SIZE, + static_cast<uint32_t>(configuration_.payload_size_)) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + if (!configuration_.live_production) { produceContent(*producer_socket_, configuration_.name.getName(), 0); } else { @@ -283,7 +326,7 @@ class HIperfServer::Impl { void receiveStream() { socket_.async_receive_from( asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_, - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (ec) return; sendRTCContentFromStream(recv_buffer_.first, length); receiveStream(); @@ -296,9 +339,8 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); + uint8_t *start = (uint8_t *)payload->writableData(); std::memcpy(start, &now, sizeof(uint64_t)); std::memcpy(start + sizeof(uint64_t), buff, len); @@ -306,7 +348,7 @@ class HIperfServer::Impl { len + sizeof(uint64_t)); } - void sendRTCContentObjectCallback(std::error_code ec) { + void sendRTCContentObjectCallback(const std::error_code &ec) { if (ec) return; rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( @@ -319,18 +361,17 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); + producer_socket_->produceDatagram(flow_name_, payload->data(), + payload->length() < payload_size_max_ + ? payload->length() + : payload_size_max_); } - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) { if (ec) return; auto payload = @@ -342,14 +383,11 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // used only for performance evaluation // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); if (packet_len > payload->length()) packet_len = payload->length(); - if (packet_len > 1400) packet_len = 1400; + if (packet_len > payload_size_max_) packet_len = payload_size_max_; producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); @@ -450,13 +488,13 @@ class HIperfServer::Impl { std::placeholders::_1)); } else if (configuration_.input_stream_mode_) { rtc_running_ = true; - // crate socket + // create socket remote_ = asio::ip::udp::endpoint( asio::ip::address::from_string("127.0.0.1"), configuration_.port_); socket_.open(asio::ip::udp::v4()); socket_.bind(remote_); - recv_buffer_.first = (uint8_t *)malloc(1500); - recv_buffer_.second = 1500; + recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU); + recv_buffer_.second = HIPERF_MTU; receiveStream(); } else { rtc_running_ = true; @@ -490,8 +528,9 @@ class HIperfServer::Impl { std::uint16_t content_objects_index_; std::uint16_t mask_; std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; std::unique_ptr<ProducerSocket> producer_socket_; + ::utils::EventThread produce_thread_; + std::size_t payload_size_max_; #ifndef _WIN32 asio::posix::stream_descriptor input_; asio::streambuf input_buffer_; @@ -507,6 +546,20 @@ HIperfServer::HIperfServer(const ServerConfiguration &conf) { impl_ = new Impl(conf); } +HIperfServer::HIperfServer(HIperfServer &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfServer &HIperfServer::operator=(HIperfServer &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfServer::~HIperfServer() { delete impl_; } int HIperfServer::setup() { return impl_->setup(); } diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h index 05407a807..73ac72123 100644 --- a/apps/hiperf/src/server.h +++ b/apps/hiperf/src/server.h @@ -22,6 +22,9 @@ namespace hiperf { class HIperfServer { public: HIperfServer(const ServerConfiguration &conf); + HIperfServer(HIperfServer &&other); + HIperfServer &operator=(HIperfServer &&other); + ~HIperfServer(); int setup(); void run(); |