diff options
Diffstat (limited to 'apps/hiperf/src/client.cc')
-rw-r--r-- | apps/hiperf/src/client.cc | 524 |
1 files changed, 372 insertions, 152 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc index 820ebf0ce..319fa82ab 100644 --- a/apps/hiperf/src/client.cc +++ b/apps/hiperf/src/client.cc @@ -31,17 +31,12 @@ class Callback; * Hiperf client class: configure and setup an hicn consumer following the * ClientConfiguration. */ -class HIperfClient::Impl -#ifdef FORWARDER_INTERFACE - : ForwarderInterface::ICallback -#endif -{ - typedef std::chrono::time_point<std::chrono::steady_clock> Time; - typedef std::chrono::microseconds TimeDuration; - +class HIperfClient::Impl : ForwarderInterface::ICallback { friend class Callback; friend class RTCCallback; + static const constexpr uint16_t log2_header_counter = 4; + struct nack_packet_t { uint64_t timestamp; uint32_t prod_rate; @@ -76,21 +71,29 @@ class HIperfClient::Impl delay_sample_(0), received_bytes_(0), received_data_pkt_(0), + auth_alerts_(0), data_delays_(""), signals_(io_service_), rtc_callback_(*this), callback_(*this), socket_(io_service_), - done_(false), - switch_threshold_(~0) -#ifdef FORWARDER_INTERFACE - , - forwarder_interface_(io_service_, this) -#endif - { + // switch_threshold_(~0), + fwd_connected_(false), + use_bestpath_(false), + rtt_threshold_(~0), + loss_threshold_(~0), + prefix_name_(""), + prefix_len_(0), + // done_(false), + header_counter_mask_((1 << log2_header_counter) - 1), + header_counter_(0), + print_headers_(configuration_.print_headers_), + first_(true), + forwarder_interface_(io_service_) { + setForwarderConnection(conf.forwarder_type_); } - ~Impl() {} + virtual ~Impl() {} void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) {} @@ -104,174 +107,187 @@ class HIperfClient::Impl void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { const char separator = ' '; - const int width = 15; + const int width = 18; - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2); - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; + std::stringstream interval_ms; + interval_ms << total_duration_milliseconds_ << "-" + << total_duration_milliseconds_ + exact_duration.count(); std::stringstream bytes_transferred; bytes_transferred << std::fixed << std::setprecision(3) << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MB]"; + << std::setfill(separator); std::stringstream bandwidth; bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + << std::setfill(separator); std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; + window << stats.getAverageWindowSize() << std::setfill(separator); std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + avg_rtt << stats.getAverageRtt() << std::setfill(separator); if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - std::stringstream lost_data; lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream bytes_recovered_data; bytes_recovered_data << stats.getBytesRecoveredData() - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream definitely_lost_data; definitely_lost_data << stats.getDefinitelyLostData() - old_definitely_lost_data_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; + data_delay << std::fixed << std::setprecision(3) << avg_data_delay_ + << std::setfill(separator); std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; + received_data_pkt << received_data_pkt_ << std::setfill(separator); std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + goodput << std::fixed << std::setprecision(3) + << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 + << std::setfill(separator); std::stringstream loss_rate; loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; + << stats.getLossRatio() * 100.0 << std::setfill(separator); std::stringstream retx_sent; retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream interest_sent; interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream nacks; nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream fec_pkt; fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ - << std::setfill(separator) << "[pkt]"; + << std::setfill(separator); std::stringstream queuing_delay; - queuing_delay << stats.getQueuingDelay() << std::setfill(separator) - << "[ms]"; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); -#ifdef FORWARDER_INTERFACE - if (!done_ && stats.getQueuingDelay() >= switch_threshold_ && - total_duration_milliseconds_ > 1000) { - std::cout << "Switching due to queuing delay" << std::endl; - forwarder_interface_.createFaceAndRoutes(backup_routes_); - forwarder_interface_.deleteFaceAndRoutes(main_routes_); - std::swap(backup_routes_, main_routes_); - done_ = true; + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << auth_alerts_ << std::setfill(separator); + + if (fwd_connected_ && use_bestpath_ && + ((stats.getAverageRtt() > rtt_threshold_) || + ((stats.getResidualLossRate() * 100) > loss_threshold_))) { + forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath"); } -#endif - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "QueuingDelay"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "DefinitelyLost"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay"; - std::cout << std::left << std::setw(width) << "FecPkt" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << queuing_delay.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << definitely_lost_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::left << std::setw(width) << fec_pkt.str(); + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "RecvData[pkt]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Goodput[Mbps]"; + std::cout << std::right << std::setw(width) << "LossRate[%]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "InterestSent"; + std::cout << std::right << std::setw(width) << "ReceivedNacks"; + std::cout << std::right << std::setw(width) << "SyncWnd[pkt]"; + std::cout << std::right << std::setw(width) << "MinRtt[ms]"; + std::cout << std::right << std::setw(width) << "QueuingDelay[ms]"; + std::cout << std::right << std::setw(width) << "LostData[pkt]"; + std::cout << std::right << std::setw(width) << "RecoveredData"; + std::cout << std::right << std::setw(width) << "DefinitelyLost"; + std::cout << std::right << std::setw(width) << "State"; + std::cout << std::right << std::setw(width) << "DataDelay[ms]"; + std::cout << std::right << std::setw(width) << "FecPkt"; + std::cout << std::right << std::setw(width) << "Congestion"; + std::cout << std::right << std::setw(width) << "ResidualLosses"; + std::cout << std::right << std::setw(width) << "QualityScore"; + std::cout << std::right << std::setw(width) << "Alerts"; + std::cout << std::right << std::setw(width) << "AuthAlerts" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << received_data_pkt.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << goodput.str(); + std::cout << std::right << std::setw(width) << loss_rate.str(); + std::cout << std::right << std::setw(width) << retx_sent.str(); + std::cout << std::right << std::setw(width) << interest_sent.str(); + std::cout << std::right << std::setw(width) << nacks.str(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str(); + std::cout << std::right << std::setw(width) << queuing_delay.str(); + std::cout << std::right << std::setw(width) << lost_data.str(); + std::cout << std::right << std::setw(width) << bytes_recovered_data.str(); + std::cout << std::right << std::setw(width) << definitely_lost_data.str(); + std::cout << std::right << std::setw(width) << stats.getCCStatus(); + std::cout << std::right << std::setw(width) << data_delay.str(); + std::cout << std::right << std::setw(width) << fec_pkt.str(); + std::cout << std::right << std::setw(width) << stats.isCongested(); + std::cout << std::right << std::setw(width) << residual_losses.str(); + std::cout << std::right << std::setw(width) << quality_score.str(); + std::cout << std::right << std::setw(width) << alerts.str(); + std::cout << std::right << std::setw(width) << auth_alerts.str(); std::cout << std::endl; if (configuration_.test_mode_) { if (data_delays_.size() > 0) data_delays_.pop_back(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]" - << std::endl; + auto now = utils::SteadyTime::nowMs(); + std::cout << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl; } // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); + // std::cout << std::right << std::setw(width) << interest_fec_tx.str(); + // std::cout << std::right << std::setw(width) << bytes_fec_recv.str(); } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; + if ((header_counter_ == 0 && print_headers_) || first_) { + std::cout << std::right << std::setw(width) << "Interval[ms]"; + std::cout << std::right << std::setw(width) << "Transfer[MB]"; + std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; + std::cout << std::right << std::setw(width) << "Retr[pkt]"; + std::cout << std::right << std::setw(width) << "Cwnd[Int]"; + std::cout << std::right << std::setw(width) << "AvgRtt[ms]" + << std::endl; + + first_ = false; + } + + std::cout << std::right << std::setw(width) << interval_ms.str(); + std::cout << std::right << std::setw(width) << bytes_transferred.str(); + std::cout << std::right << std::setw(width) << bandwidth.str(); + std::cout << std::right << std::setw(width) << stats.getRetxCount(); + std::cout << std::right << std::setw(width) << window.str(); + std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl; } + total_duration_milliseconds_ += (uint32_t)exact_duration.count(); old_bytes_value_ = stats.getBytesRecv(); old_lost_data_value_ = stats.getLostData(); @@ -289,9 +305,95 @@ class HIperfClient::Impl received_data_pkt_ = 0; data_delays_ = ""; - t_stats_ = utils::SteadyClock::now(); + t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & header_counter_mask_; + + if (--configuration_.nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + io_service_.stop(); + } } + bool setForwarderConnection(forwarder_type_t forwarder_type) { + using namespace libconfig; + Config cfg; + + const char *conf_file = getenv("FORWARDER_CONFIG"); + if (!conf_file) return false; + + if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG)) + return false; + + try { + cfg.readFile(conf_file); + } catch (const FileIOException &fioex) { + std::cerr << "I/O error while reading file." << std::endl; + return false; + } catch (const ParseException &pex) { + std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() + << " - " << pex.getError() << std::endl; + return false; + } + + Setting &config = cfg.getRoot(); + + /* conf file example + * + * use_bestpath = "ON | OFF" + * rtt_threshold = 200 //ms + * loss_threshold = 20 //% + * name = "b001::/16" + */ + + if (config.exists("use_bestpath")) { + std::string val; + config.lookupValue("use_bestpath", val); + if (val.compare("ON") == 0) use_bestpath_ = true; + } + + if (config.exists("rtt_threshold")) { + unsigned val; + config.lookupValue("rtt_threshold", val); + rtt_threshold_ = val; + } + + if (config.exists("loss_threshold")) { + unsigned val; + config.lookupValue("loss_threshold", val); + loss_threshold_ = val; + } + + if (config.exists("name")) { + std::string route; + config.lookupValue("name", route); + + std::string delimiter = "/"; + size_t pos = 0; + + if ((pos = route.find(delimiter)) != std::string::npos) { + prefix_name_ = route.substr(0, pos); + route.erase(0, pos + delimiter.length()); + prefix_len_ = std::stoul(route.substr(0)); + } + } + + forwarder_interface_.initForwarderInterface(this, forwarder_type); + + return true; + } + + void onHicnServiceReady() override { + std::cout << "Successfully connected to local forwarder!" << std::endl; + fwd_connected_ = true; + } + + void onRouteConfigured( + std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { + std::cout << "Routes successfully configured!" << std::endl; + } + +#ifdef FORWARDER_INTERFACE bool parseConfig(const char *conf_file) { using namespace libconfig; Config cfg; @@ -439,7 +541,6 @@ class HIperfClient::Impl return true; } -#ifdef FORWARDER_INTERFACE void onHicnServiceReady() override { std::cout << "Successfully connected to local forwarder!" << std::endl; @@ -541,6 +642,13 @@ class HIperfClient::Impl } #endif + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix suffix, + transport::auth::VerificationPolicy policy) { + auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + int setup() { int ret; @@ -557,6 +665,7 @@ class HIperfClient::Impl producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); producer_socket_->registerPrefix(configuration_.relay_name_); producer_socket_->connect(); + producer_socket_->start(); } if (configuration_.output_stream_mode_ && configuration_.rtc_) { @@ -586,6 +695,17 @@ class HIperfClient::Impl GeneralTransportOptions::INTEREST_LIFETIME, configuration_.interest_lifetime_); + consumer_socket_->setSocketOption( + GeneralTransportOptions::MAX_UNVERIFIED_TIME, + configuration_.unverified_delay_); + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + #if defined(DEBUG) && defined(__linux__) std::shared_ptr<transport::BasePortal> portal; consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); @@ -623,20 +743,24 @@ class HIperfClient::Impl } } + std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>(); + if (!configuration_.producer_certificate.empty()) { - std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>( + verifier = std::make_shared<AsymmetricVerifier>( configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; } if (!configuration_.passphrase.empty()) { - std::shared_ptr<Verifier> verifier = - std::make_shared<SymmetricVerifier>(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; + verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase); + } + + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); + + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } ret = consumer_socket_->setSocketOption( @@ -662,6 +786,65 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + if (configuration_.recovery_strategy_ == 1) { // unreliable + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF); + } else if (configuration_.recovery_strategy_ == 2) { // rtx only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } else if (configuration_.recovery_strategy_ == 3) { // fec only + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY); + } else if (configuration_.recovery_strategy_ == 4) { // delay based + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED); + } else if (configuration_.recovery_strategy_ == 5) { // low rate flow + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE); + } else if (configuration_.recovery_strategy_ == + 6) { // low rate + bestpath + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH); + } else if (configuration_.recovery_strategy_ == + 7) { // low rate + replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION); + } else if (configuration_.recovery_strategy_ == + 8) { // low rate + bestpath or replication + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES); + } else { + // default + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + } + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, (ConsumerContentObjectCallback)std::bind( @@ -673,6 +856,15 @@ class HIperfClient::Impl } if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { std::shared_ptr<TransportStatistics> transport_stats; consumer_socket_->getSocketOption( OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); @@ -708,10 +900,10 @@ class HIperfClient::Impl signals_.async_wait( [this](const std::error_code &, const int &) { io_service_.stop(); }); - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); + t_download_ = t_stats_ = utils::SteadyTime::now(); + consumer_socket_->consume(configuration_.name); + io_service_.run(); consumer_socket_->stop(); return ERROR_SUCCESS; @@ -719,11 +911,15 @@ class HIperfClient::Impl private: class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; + static constexpr std::size_t mtu = HIPERF_MTU; public: RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + Packet::Format format = + PayloadSize::getFormatFromName(client_.configuration_.name, false); + payload_size_max_ = + PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE); } bool isBufferMovable() noexcept override { return false; } @@ -743,9 +939,7 @@ class HIperfClient::Impl uint64_t *senderTimeStamp = (uint64_t *)client_.configuration_.receive_buffer->writableData(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + auto now = utils::SystemTime::nowMs().count(); double new_delay = (double)(now - *senderTimeStamp); if (*senderTimeStamp > now) @@ -765,7 +959,7 @@ class HIperfClient::Impl client_.producer_socket_->produceDatagram( client_.configuration_.relay_name_.getName(), client_.configuration_.receive_buffer->writableData(), - length < 1400 ? length : 1400); + length < payload_size_max_ ? length : payload_size_max_); } if (client_.configuration_.output_stream_mode_) { uint8_t *start = @@ -778,7 +972,7 @@ class HIperfClient::Impl size_t maxBufferSize() const override { return mtu; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error while reading from RTC socket" << std::endl; client_.io_service_.stop(); } @@ -789,6 +983,7 @@ class HIperfClient::Impl private: Impl &client_; + std::size_t payload_size_max_; }; class Callback : public ConsumerSocket::ReadCallback { @@ -816,16 +1011,15 @@ class HIperfClient::Impl return client_.configuration_.receive_buffer_size_; } - void readError(const std::error_code ec) noexcept override { + void readError(const std::error_code &ec) noexcept override { std::cerr << "Error " << ec.message() << " while reading from socket" << std::endl; client_.io_service_.stop(); } void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); + auto t2 = utils::SteadyTime::now(); + auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2); long usec = (long)dt.count(); std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" @@ -843,8 +1037,8 @@ class HIperfClient::Impl }; hiperf::ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; + utils::SteadyTime::TimePoint t_stats_; + utils::SteadyTime::TimePoint t_download_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; uint64_t old_interest_tx_value_; @@ -865,6 +1059,7 @@ class HIperfClient::Impl uint32_t received_bytes_; uint32_t received_data_pkt_; + uint32_t auth_alerts_; std::string data_delays_; @@ -878,19 +1073,44 @@ class HIperfClient::Impl asio::ip::udp::endpoint remote_; ForwarderConfiguration config_; - uint16_t switch_threshold_; /* ms */ - bool done_; + // uint16_t switch_threshold_; /* ms */ + bool fwd_connected_; + bool use_bestpath_; + uint32_t rtt_threshold_; /* ms */ + uint32_t loss_threshold_; /* ms */ + std::string prefix_name_; // bestpath route + uint32_t prefix_len_; + // bool done_; + std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; -#ifdef FORWARDER_INTERFACE + uint16_t header_counter_mask_; + uint16_t header_counter_; + + bool print_headers_; + bool first_; + ForwarderInterface forwarder_interface_; -#endif }; HIperfClient::HIperfClient(const ClientConfiguration &conf) { impl_ = new Impl(conf); } +HIperfClient::HIperfClient(HIperfClient &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfClient &HIperfClient::operator=(HIperfClient &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfClient::~HIperfClient() { delete impl_; } int HIperfClient::setup() { return impl_->setup(); } |