diff options
Diffstat (limited to 'apps/hiperf/src/client.cc')
-rw-r--r-- | apps/hiperf/src/client.cc | 1612 |
1 files changed, 672 insertions, 940 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc index ba36cd20e..0e1f596c5 100644 --- a/apps/hiperf/src/client.cc +++ b/apps/hiperf/src/client.cc @@ -14,8 +14,7 @@ */ #include <client.h> -#include <forwarder_config.h> -#include <forwarder_interface.h> +#include <hicn/transport/portability/endianess.h> #include <libconfig.h++> @@ -31,1098 +30,831 @@ class Callback; * Hiperf client class: configure and setup an hicn consumer following the * ClientConfiguration. */ -class HIperfClient::Impl : ForwarderInterface::ICallback { +class HIperfClient::Impl { friend class Callback; friend class RTCCallback; - static const constexpr uint16_t log2_header_counter = 4; - - struct nack_packet_t { - uint64_t timestamp; - uint32_t prod_rate; - uint32_t prod_seg; - - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } - - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } - - inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } - inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } - }; - - public: - Impl(const hiperf::ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - old_interest_tx_value_(0), - old_fec_interest_tx_value_(0), - old_fec_data_rx_value_(0), - old_lost_data_value_(0), - old_bytes_recovered_value_(0), - old_definitely_lost_data_value_(0), - old_retx_value_(0), - old_sent_int_value_(0), - old_received_nacks_value_(0), - old_fec_pkt_(0), - avg_data_delay_(0), - delay_sample_(0), - received_bytes_(0), - received_data_pkt_(0), - auth_alerts_(0), - data_delays_(""), - signals_(io_service_), - rtc_callback_(*this), - callback_(*this), - socket_(io_service_), - // switch_threshold_(~0), - fwd_connected_(false), - use_bestpath_(false), - rtt_threshold_(~0), - loss_threshold_(~0), - prefix_name_(""), - prefix_len_(0), - // done_(false), - header_counter_mask_((1 << log2_header_counter) - 1), - header_counter_(0), - print_headers_(configuration_.print_headers_), - first_(true), - forwarder_interface_(io_service_) { - setForwarderConnection(conf.forwarder_type_); + static inline constexpr uint16_t klog2_header_counter() { return 4; } + static inline constexpr uint16_t kheader_counter_mask() { + return (1 << klog2_header_counter()) - 1; } - virtual ~Impl() {} - - void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) {} - - void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} - - void addFace(const std::string &local_address, uint16_t local_port, - const std::string &remote_address, uint16_t remote_port, - std::string interface); - - void handleTimerExpiration(ConsumerSocket &c, - const TransportStatistics &stats) { - const char separator = ' '; - const int width = 18; - - utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); - auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2); - - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - - std::stringstream bytes_transferred; - bytes_transferred << std::fixed << std::setprecision(3) - << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator); - - std::stringstream bandwidth; - bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / - (exact_duration.count()) / 1000.0 - << std::setfill(separator); - - std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator); + class ConsumerContext + : public Base<ConsumerContext, ClientConfiguration, Impl>, + private ConsumerSocket::ReadCallback { + static inline const std::size_t kmtu = HIPERF_MTU; - std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator); - - if (configuration_.rtc_) { - std::stringstream lost_data; - lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator); - - std::stringstream bytes_recovered_data; - bytes_recovered_data << stats.getBytesRecoveredData() - - old_bytes_recovered_value_ - << std::setfill(separator); - - std::stringstream definitely_lost_data; - definitely_lost_data << stats.getDefinitelyLostData() - - old_definitely_lost_data_value_ - << std::setfill(separator); - - std::stringstream data_delay; - data_delay << std::fixed << std::setprecision(3) << avg_data_delay_ - << std::setfill(separator); - - std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator); - - std::stringstream goodput; - goodput << std::fixed << std::setprecision(3) - << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator); - - std::stringstream loss_rate; - loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator); - - std::stringstream retx_sent; - retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator); - - std::stringstream interest_sent; - interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator); + public: + using ConfType = ClientConfiguration; + using ParentType = typename HIperfClient::Impl; + static inline auto getContextType() -> std::string { + return "ConsumerContext"; + } + + ConsumerContext(Impl &client, int consumer_identifier) + : Base(client, client.io_service_, consumer_identifier), + receive_buffer_( + utils::MemBuf::create(client.config_.receive_buffer_size_)), + socket_(client.io_service_), + payload_size_max_(PayloadSize(client.config_.packet_format_) + .getPayloadSizeMax(RTC_HEADER_SIZE)), + nb_iterations_(client.config_.nb_iterations_) {} + + ConsumerContext(ConsumerContext &&other) noexcept + : Base(std::move(other)), + receive_buffer_(std::move(other.receive_buffer_)), + socket_(std::move(other.socket_)), + payload_size_max_(other.payload_size_max_), + remote_(std::move(other.remote_)), + nb_iterations_(other.nb_iterations_), + saved_stats_(std::move(other.saved_stats_)), + header_counter_(other.header_counter_), + first_(other.first_), + consumer_socket_(std::move(other.consumer_socket_)), + producer_socket_(std::move(other.producer_socket_)) {} + + ~ConsumerContext() override = default; + + /*************************************************************** + * ConsumerSocket::ReadCallback implementation + ***************************************************************/ - std::stringstream nacks; - nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator); + bool isBufferMovable() noexcept override { return false; } - std::stringstream fec_pkt; - fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ - << std::setfill(separator); + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = receive_buffer_->writableData(); - std::stringstream queuing_delay; - queuing_delay << std::fixed << std::setprecision(3) - << stats.getQueuingDelay() << std::setfill(separator); + if (configuration_.rtc_) { + *max_length = kmtu; + } else { + *max_length = configuration_.receive_buffer_size_; + } + } - std::stringstream residual_losses; - double rl_perc = stats.getResidualLossRate() * 100; - residual_losses << std::fixed << std::setprecision(2) << rl_perc - << std::setfill(separator); + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override { + // Nothing to do here + auto ret = std::move(buffer); + } - std::stringstream quality_score; - quality_score << std::fixed << (int)stats.getQualityScore() - << std::setfill(separator); + void readDataAvailable(std::size_t length) noexcept override { + if (configuration_.rtc_) { + saved_stats_.received_bytes_ += length; + saved_stats_.received_data_pkt_++; - std::stringstream alerts; - alerts << stats.getAlerts() << std::setfill(separator); + // collecting delay stats. Just for performance testing + auto senderTimeStamp = + *reinterpret_cast<uint64_t *>(receive_buffer_->writableData()); - std::stringstream auth_alerts; - auth_alerts << auth_alerts_ << std::setfill(separator); + auto now = utils::SystemTime::nowMs().count(); + auto new_delay = double(now - senderTimeStamp); - if (fwd_connected_ && use_bestpath_ && - ((stats.getAverageRtt() > rtt_threshold_) || - ((stats.getResidualLossRate() * 100) > loss_threshold_))) { - forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath"); - } + if (senderTimeStamp > now) + new_delay = -1 * double(senderTimeStamp - now); - if ((header_counter_ == 0 && print_headers_) || first_) { - std::cout << std::right << std::setw(width) << "Interval[ms]"; - std::cout << std::right << std::setw(width) << "RecvData[pkt]"; - std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; - std::cout << std::right << std::setw(width) << "Goodput[Mbps]"; - std::cout << std::right << std::setw(width) << "LossRate[%]"; - std::cout << std::right << std::setw(width) << "Retr[pkt]"; - std::cout << std::right << std::setw(width) << "InterestSent"; - std::cout << std::right << std::setw(width) << "ReceivedNacks"; - std::cout << std::right << std::setw(width) << "SyncWnd[pkt]"; - std::cout << std::right << std::setw(width) << "MinRtt[ms]"; - std::cout << std::right << std::setw(width) << "QueuingDelay[ms]"; - std::cout << std::right << std::setw(width) << "LostData[pkt]"; - std::cout << std::right << std::setw(width) << "RecoveredData"; - std::cout << std::right << std::setw(width) << "DefinitelyLost"; - std::cout << std::right << std::setw(width) << "State"; - std::cout << std::right << std::setw(width) << "DataDelay[ms]"; - std::cout << std::right << std::setw(width) << "FecPkt"; - std::cout << std::right << std::setw(width) << "Congestion"; - std::cout << std::right << std::setw(width) << "ResidualLosses"; - std::cout << std::right << std::setw(width) << "QualityScore"; - std::cout << std::right << std::setw(width) << "Alerts"; - std::cout << std::right << std::setw(width) << "AuthAlerts" - << std::endl; - - first_ = false; - } + saved_stats_.delay_sample_++; + saved_stats_.avg_data_delay_ = + saved_stats_.avg_data_delay_ + + (double(new_delay) - saved_stats_.avg_data_delay_) / + saved_stats_.delay_sample_; - std::cout << std::right << std::setw(width) << interval_ms.str(); - std::cout << std::right << std::setw(width) << received_data_pkt.str(); - std::cout << std::right << std::setw(width) << bandwidth.str(); - std::cout << std::right << std::setw(width) << goodput.str(); - std::cout << std::right << std::setw(width) << loss_rate.str(); - std::cout << std::right << std::setw(width) << retx_sent.str(); - std::cout << std::right << std::setw(width) << interest_sent.str(); - std::cout << std::right << std::setw(width) << nacks.str(); - std::cout << std::right << std::setw(width) << window.str(); - std::cout << std::right << std::setw(width) << avg_rtt.str(); - std::cout << std::right << std::setw(width) << queuing_delay.str(); - std::cout << std::right << std::setw(width) << lost_data.str(); - std::cout << std::right << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::right << std::setw(width) << definitely_lost_data.str(); - std::cout << std::right << std::setw(width) << stats.getCCStatus(); - std::cout << std::right << std::setw(width) << data_delay.str(); - std::cout << std::right << std::setw(width) << fec_pkt.str(); - std::cout << std::right << std::setw(width) << stats.isCongested(); - std::cout << std::right << std::setw(width) << residual_losses.str(); - std::cout << std::right << std::setw(width) << quality_score.str(); - std::cout << std::right << std::setw(width) << alerts.str(); - std::cout << std::right << std::setw(width) << auth_alerts.str(); - std::cout << std::endl; - - if (configuration_.test_mode_) { - if (data_delays_.size() > 0) data_delays_.pop_back(); - - auto now = utils::SteadyTime::nowMs(); - std::cout << std::fixed << std::setprecision(0) << now.count() - << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl; - } + if (configuration_.test_mode_) { + saved_stats_.data_delays_ += std::to_string(int(new_delay)); + saved_stats_.data_delays_ += ","; + } - // statistics not yet available in the transport - // std::cout << std::right << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::right << std::setw(width) << bytes_fec_recv.str(); - } else { - if ((header_counter_ == 0 && print_headers_) || first_) { - std::cout << std::right << std::setw(width) << "Interval[ms]"; - std::cout << std::right << std::setw(width) << "Transfer[MB]"; - std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]"; - std::cout << std::right << std::setw(width) << "Retr[pkt]"; - std::cout << std::right << std::setw(width) << "Cwnd[Int]"; - std::cout << std::right << std::setw(width) << "AvgRtt[ms]" - << std::endl; - - first_ = false; + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + producer_socket_->produceDatagram( + configuration_.relay_name_.makeName(), + receive_buffer_->writableData(), + length < payload_size_max_ ? length : payload_size_max_); + } + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + const uint8_t *start = receive_buffer_->writableData(); + start += sizeof(uint64_t); + std::size_t pkt_len = length - sizeof(uint64_t); + socket_.send_to(asio::buffer(start, pkt_len), remote_); + } } - - std::cout << std::right << std::setw(width) << interval_ms.str(); - std::cout << std::right << std::setw(width) << bytes_transferred.str(); - std::cout << std::right << std::setw(width) << bandwidth.str(); - std::cout << std::right << std::setw(width) << stats.getRetxCount(); - std::cout << std::right << std::setw(width) << window.str(); - std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl; - } - - total_duration_milliseconds_ += (uint32_t)exact_duration.count(); - old_bytes_value_ = stats.getBytesRecv(); - old_lost_data_value_ = stats.getLostData(); - old_bytes_recovered_value_ = stats.getBytesRecoveredData(); - old_definitely_lost_data_value_ = stats.getDefinitelyLostData(); - old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); - old_fec_data_rx_value_ = stats.getBytesFecRecv(); - old_retx_value_ = stats.getRetxCount(); - old_sent_int_value_ = stats.getInterestTx(); - old_received_nacks_value_ = stats.getReceivedNacks(); - old_fec_pkt_ = stats.getReceivedFEC(); - delay_sample_ = 0; - avg_data_delay_ = 0; - received_bytes_ = 0; - received_data_pkt_ = 0; - data_delays_ = ""; - - t_stats_ = utils::SteadyTime::Clock::now(); - - header_counter_ = (header_counter_ + 1) & header_counter_mask_; - - if (--configuration_.nb_iterations_ == 0) { - // We reached the maximum nb of runs. Stop now. - io_service_.stop(); - } - } - - bool setForwarderConnection(forwarder_type_t forwarder_type) { - using namespace libconfig; - Config cfg; - - const char *conf_file = getenv("FORWARDER_CONFIG"); - if (!conf_file) return false; - - if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG)) - return false; - - try { - cfg.readFile(conf_file); - } catch (const FileIOException &fioex) { - std::cerr << "I/O error while reading file." << std::endl; - return false; - } catch (const ParseException &pex) { - std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() - << " - " << pex.getError() << std::endl; - return false; } - Setting &config = cfg.getRoot(); - - /* conf file example - * - * use_bestpath = "ON | OFF" - * rtt_threshold = 200 //ms - * loss_threshold = 20 //% - * name = "b001::/16" - */ - - if (config.exists("use_bestpath")) { - std::string val; - config.lookupValue("use_bestpath", val); - if (val.compare("ON") == 0) use_bestpath_ = true; + size_t maxBufferSize() const override { + return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_; } - if (config.exists("rtt_threshold")) { - unsigned val; - config.lookupValue("rtt_threshold", val); - rtt_threshold_ = val; + void readError(const std::error_code &ec) noexcept override { + getOutputStream() << "Error " << ec.message() + << " while reading from socket" << std::endl; + parent_.io_service_.stop(); } - if (config.exists("loss_threshold")) { - unsigned val; - config.lookupValue("loss_threshold", val); - loss_threshold_ = val; - } + void readSuccess(std::size_t total_size) noexcept override { + if (configuration_.rtc_) { + getOutputStream() << "Data successfully read" << std::endl; + } else { + auto t2 = utils::SteadyTime::now(); + auto dt = + utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2); + auto usec = dt.count(); - if (config.exists("name")) { - std::string route; - config.lookupValue("name", route); + getOutputStream() << "Content retrieved. Size: " << total_size + << " [Bytes]" << std::endl; - std::string delimiter = "/"; - size_t pos = 0; + getOutputStream() << "Elapsed Time: " << usec / 1000000.0 + << " seconds -- " + << double(total_size * 8) * 1.0 / double(usec) * 1.0 + << " [Mbps]" << std::endl; - if ((pos = route.find(delimiter)) != std::string::npos) { - prefix_name_ = route.substr(0, pos); - route.erase(0, pos + delimiter.length()); - prefix_len_ = std::stoul(route.substr(0)); + parent_.io_service_.stop(); } } - forwarder_interface_.initForwarderInterface(this, forwarder_type); - - return true; - } + /*************************************************************** + * End of ConsumerSocket::ReadCallback implementation + ***************************************************************/ - void onHicnServiceReady() override { - std::cout << "Successfully connected to local forwarder!" << std::endl; - fwd_connected_ = true; - } + private: + struct SavedStatistics { + utils::SteadyTime::TimePoint t_stats_{}; + utils::SteadyTime::TimePoint t_download_{}; + uint32_t total_duration_milliseconds_{0}; + uint64_t old_bytes_value_{0}; + uint64_t old_interest_tx_value_{0}; + uint64_t old_fec_interest_tx_value_{0}; + uint64_t old_fec_data_rx_value_{0}; + uint64_t old_lost_data_value_{0}; + uint64_t old_bytes_recovered_value_{0}; + uint64_t old_definitely_lost_data_value_{0}; + uint64_t old_retx_value_{0}; + uint64_t old_sent_int_value_{0}; + uint64_t old_received_nacks_value_{0}; + uint32_t old_fec_pkt_{0}; + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_{0}; + uint32_t delay_sample_{0}; + uint32_t received_bytes_{0}; + uint32_t received_data_pkt_{0}; + uint32_t auth_alerts_{0}; + std::string data_delays_{""}; + }; + + /*************************************************************** + * Transport callbacks + ***************************************************************/ + + void checkReceivedRtcContent( + [[maybe_unused]] const ConsumerSocket &c, + [[maybe_unused]] const ContentObject &content_object) const { + // Nothing to do here + } + + void processLeavingInterest(const ConsumerSocket & /*c*/, + const Interest & /*interest*/) const { + // Nothing to do here + } + + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix /*suffix*/, + transport::auth::VerificationPolicy /*policy*/) { + saved_stats_.auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + + void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c, + const TransportStatistics &stats) { + const char separator = ' '; + const int width = 18; + + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = + utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2); + + std::stringstream interval_ms; + interval_ms << saved_stats_.total_duration_milliseconds_ << "-" + << saved_stats_.total_duration_milliseconds_ + + exact_duration.count(); + + std::stringstream bytes_transferred; + bytes_transferred << std::fixed << std::setprecision(3) + << double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) / + 1000000.0 + << std::setfill(separator); + + std::stringstream bandwidth; + bandwidth << (double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) * + 8) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); - void onRouteConfigured( - std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { - std::cout << "Routes successfully configured!" << std::endl; - } + std::stringstream window; + window << stats.getAverageWindowSize() << std::setfill(separator); -#ifdef FORWARDER_INTERFACE - bool parseConfig(const char *conf_file) { - using namespace libconfig; - Config cfg; - - try { - cfg.readFile(conf_file); - } catch (const FileIOException &fioex) { - std::cerr << "I/O error while reading file." << std::endl; - return false; - } catch (const ParseException &pex) { - std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() - << " - " << pex.getError() << std::endl; - return false; - } + std::stringstream avg_rtt; + avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt() + << std::setfill(separator); - Setting &config = cfg.getRoot(); + if (configuration_.rtc_) { + std::stringstream lost_data; + lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_ + << std::setfill(separator); + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + saved_stats_.old_bytes_recovered_value_ + << std::setfill(separator); + + std::stringstream definitely_lost_data; + definitely_lost_data << stats.getDefinitelyLostData() - + saved_stats_.old_definitely_lost_data_value_ + << std::setfill(separator); + + std::stringstream data_delay; + data_delay << std::fixed << std::setprecision(3) + << saved_stats_.avg_data_delay_ << std::setfill(separator); + + std::stringstream received_data_pkt; + received_data_pkt << saved_stats_.received_data_pkt_ + << std::setfill(separator); + + std::stringstream goodput; + goodput << std::fixed << std::setprecision(3) + << (saved_stats_.received_bytes_ * 8.0) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); - if (config.exists("switch_threshold")) { - unsigned threshold; - config.lookupValue("switch_threshold", threshold); - switch_threshold_ = threshold; - } + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator); - // listeners - if (config.exists("listeners")) { - // get path where looking for modules - const Setting &listeners = config.lookup("listeners"); - auto count = listeners.getLength(); - - for (int i = 0; i < count; i++) { - const Setting &listener = listeners[i]; - ListenerConfig list; - unsigned port; - std::string interface; - - list.name = listener.getName(); - listener.lookupValue("local_address", list.address); - listener.lookupValue("local_port", port); - listener.lookupValue("interface", list.interface); - list.port = (uint16_t)(port); - - std::cout << "Adding listener " << list.name << ", ( " << list.address - << ":" << list.port << ")" << std::endl; - config_.addListener(std::move(list)); - } - } + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_ + << std::setfill(separator); - // connectors - if (config.exists("connectors")) { - // get path where looking for modules - const Setting &connectors = config.lookup("connectors"); - auto count = connectors.getLength(); + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - + saved_stats_.old_sent_int_value_ + << std::setfill(separator); - for (int i = 0; i < count; i++) { - const Setting &connector = connectors[i]; - ConnectorConfig conn; + std::stringstream nacks; + nacks << stats.getReceivedNacks() - + saved_stats_.old_received_nacks_value_ + << std::setfill(separator); - conn.name = connector.getName(); - unsigned port = 0; + std::stringstream fec_pkt; + fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_ + << std::setfill(separator); - if (!connector.lookupValue("local_address", conn.local_address)) { - conn.local_address = ""; - } + std::stringstream queuing_delay; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); - if (!connector.lookupValue("local_port", port)) { - port = 0; - } + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); - conn.local_port = (uint16_t)(port); + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); - if (!connector.lookupValue("remote_address", conn.remote_address)) { - std::cerr - << "Error in configuration file: remote_address is a mandatory " - "field of Connectors." - << std::endl; - return false; + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator); + + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() + << std::right << std::setw(width) << "RecvData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() + << std::right << std::setw(width) << "Goodput[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "LossRate[%]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "InterestSent"; + getOutputStream() + << std::right << std::setw(width) << "ReceivedNacks"; + getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]"; + getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]"; + getOutputStream() + << std::right << std::setw(width) << "QueuingDelay[ms]"; + getOutputStream() + << std::right << std::setw(width) << "LostData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "RecoveredData"; + getOutputStream() + << std::right << std::setw(width) << "DefinitelyLost"; + getOutputStream() << std::right << std::setw(width) << "State"; + getOutputStream() + << std::right << std::setw(width) << "DataDelay[ms]"; + getOutputStream() << std::right << std::setw(width) << "FecPkt"; + getOutputStream() << std::right << std::setw(width) << "Congestion"; + getOutputStream() + << std::right << std::setw(width) << "ResidualLosses"; + getOutputStream() << std::right << std::setw(width) << "QualityScore"; + getOutputStream() << std::right << std::setw(width) << "Alerts"; + getOutputStream() + << std::right << std::setw(width) << "AuthAlerts" << std::endl; + + first_ = false; } - if (!connector.lookupValue("remote_port", port)) { - std::cerr << "Error in configuration file: remote_port is a " - "mandatory field of Connectors." - << std::endl; - return false; + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << received_data_pkt.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) << goodput.str(); + getOutputStream() << std::right << std::setw(width) << loss_rate.str(); + getOutputStream() << std::right << std::setw(width) << retx_sent.str(); + getOutputStream() << std::right << std::setw(width) + << interest_sent.str(); + getOutputStream() << std::right << std::setw(width) << nacks.str(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str(); + getOutputStream() << std::right << std::setw(width) + << queuing_delay.str(); + getOutputStream() << std::right << std::setw(width) << lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_recovered_data.str(); + getOutputStream() << std::right << std::setw(width) + << definitely_lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getCCStatus(); + getOutputStream() << std::right << std::setw(width) << data_delay.str(); + getOutputStream() << std::right << std::setw(width) << fec_pkt.str(); + getOutputStream() << std::right << std::setw(width) + << stats.isCongested(); + getOutputStream() << std::right << std::setw(width) + << residual_losses.str(); + getOutputStream() << std::right << std::setw(width) + << quality_score.str(); + getOutputStream() << std::right << std::setw(width) << alerts.str(); + getOutputStream() << std::right << std::setw(width) << auth_alerts.str() + << std::endl; + + if (configuration_.test_mode_) { + if (saved_stats_.data_delays_.size() > 0) + saved_stats_.data_delays_.pop_back(); + + auto now = utils::SteadyTime::nowMs(); + getOutputStream() << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << saved_stats_.data_delays_ + << "]" << std::endl; } - - if (!connector.lookupValue("interface", conn.interface)) { - std::cerr << "Error in configuration file: interface is a " - "mandatory field of Connectors." - << std::endl; - return false; + } else { + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() << std::right << std::setw(width) << "Transfer[MB]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]"; + getOutputStream() + << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl; + + first_ = false; } - conn.remote_port = (uint16_t)(port); - - std::cout << "Adding connector " << conn.name << ", (" - << conn.local_address << ":" << conn.local_port << " " - << conn.remote_address << ":" << conn.remote_port << ")" - << std::endl; - config_.addConnector(conn.name, std::move(conn)); + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_transferred.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getRetxCount(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str() + << std::endl; } - } - // Routes - if (config.exists("routes")) { - const Setting &routes = config.lookup("routes"); - auto count = routes.getLength(); - - for (int i = 0; i < count; i++) { - const Setting &route = routes[i]; - RouteConfig r; - unsigned weight; - - r.name = route.getName(); - route.lookupValue("prefix", r.prefix); - route.lookupValue("weight", weight); - route.lookupValue("main_connector", r.main_connector); - route.lookupValue("backup_connector", r.backup_connector); - r.weight = (uint16_t)(weight); - - std::cout << "Adding route " << r.name << " " << r.prefix << " (" - << r.main_connector << " " << r.backup_connector << " " - << r.weight << ")" << std::endl; - config_.addRoute(std::move(r)); + saved_stats_.total_duration_milliseconds_ += + (uint32_t)exact_duration.count(); + saved_stats_.old_bytes_value_ = stats.getBytesRecv(); + saved_stats_.old_lost_data_value_ = stats.getLostData(); + saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + saved_stats_.old_definitely_lost_data_value_ = + stats.getDefinitelyLostData(); + saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv(); + saved_stats_.old_retx_value_ = stats.getRetxCount(); + saved_stats_.old_sent_int_value_ = stats.getInterestTx(); + saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks(); + saved_stats_.old_fec_pkt_ = stats.getReceivedFEC(); + saved_stats_.delay_sample_ = 0; + saved_stats_.avg_data_delay_ = 0; + saved_stats_.received_bytes_ = 0; + saved_stats_.received_data_pkt_ = 0; + saved_stats_.data_delays_ = ""; + saved_stats_.t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & kheader_counter_mask(); + + if (--nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + parent_.io_service_.stop(); } } - std::cout << "Ok" << std::endl; + /*************************************************************** + * Setup functions + ***************************************************************/ - return true; - } + int setupRTCSocket() { + int ret = ERROR_SUCCESS; - bool splitRoute(std::string route, std::string &prefix, - uint8_t &prefix_length) { - std::string delimiter = "/"; - - size_t pos = 0; - if ((pos = route.find(delimiter)) != std::string::npos) { - prefix = route.substr(0, pos); - route.erase(0, pos + delimiter.length()); - } else { - return false; - } + configuration_.transport_protocol_ = RTC; - prefix_length = std::stoul(route.substr(0)); - return true; - } + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + producer_socket_ = + std::make_unique<ProducerSocket>(production_protocol); + producer_socket_->registerPrefix(configuration_.relay_name_); + producer_socket_->connect(); + producer_socket_->start(); + } - void onHicnServiceReady() override { - std::cout << "Successfully connected to local forwarder!" << std::endl; + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + } - std::cout << "Setting up listeners" << std::endl; - const char *config = getenv("FORWARDER_CONFIG"); + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); - if (config) { - if (!parseConfig(config)) { - return; + RtcTransportRecoveryStrategies recovery_strategy = + RtcTransportRecoveryStrategies::RTX_ONLY; + switch (configuration_.recovery_strategy_) { + case 1: + recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF; + break; + case 2: + recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY; + break; + case 3: + recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY; + break; + case 4: + recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED; + break; + case 5: + recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE; + break; + case 6: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH; + break; + case 7: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION; + break; + case 8: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES; + break; + case 9: + recovery_strategy = + RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES; + break; + case 10: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH; + break; + case 11: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION; + break; + default: + break; } - // Create faces and route using first face in the list. - auto &routes = config_.getRoutes(); - auto &connectors = config_.getConnectors(); + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + static_cast<uint32_t>(recovery_strategy)); - if (routes.size() == 0 || connectors.size() == 0) { - std::cerr << "Nothing to configure" << std::endl; - return; + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - for (auto &route : routes) { - auto the_connector_it = connectors.find(route.main_connector); - if (the_connector_it == connectors.end()) { - std::cerr << "No valid main connector found for route " << route.name - << std::endl; - continue; - } - - auto &the_connector = the_connector_it->second; - auto route_info = std::make_shared<ForwarderInterface::RouteInfo>(); - route_info->family = AF_INET; - route_info->local_addr = the_connector.local_address; - route_info->local_port = the_connector.local_port; - route_info->remote_addr = the_connector.remote_address; - route_info->remote_port = the_connector.remote_port; - route_info->interface = the_connector.interface; - route_info->name = the_connector.name; - - std::string prefix; - uint8_t prefix_length; - auto ret = splitRoute(route.prefix, prefix, prefix_length); - - if (!ret) { - std::cerr << "Error parsing route" << std::endl; - return; - } + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); - route_info->route_addr = prefix; - route_info->route_len = prefix_length; - - main_routes_.emplace_back(route_info); - - if (!route.backup_connector.empty()) { - // Add also the backup route - auto the_backup_connector_it = - connectors.find(route.backup_connector); - if (the_backup_connector_it == connectors.end()) { - std::cerr << "No valid backup connector found for route " - << route.name << std::endl; - continue; - } - - auto &the_backup_connector = the_backup_connector_it->second; - auto backup_route_info = - std::make_shared<ForwarderInterface::RouteInfo>(); - backup_route_info->family = AF_INET; - backup_route_info->local_addr = the_backup_connector.local_address; - backup_route_info->local_port = the_backup_connector.local_port; - backup_route_info->remote_addr = the_backup_connector.remote_address; - backup_route_info->remote_port = the_backup_connector.remote_port; - backup_route_info->interface = the_backup_connector.interface; - backup_route_info->name = the_backup_connector.name; - - std::string prefix; - uint8_t prefix_length; - auto ret = splitRoute(route.prefix, prefix, prefix_length); - - if (!ret) { - std::cerr << "Error parsing route" << std::endl; - return; - } - - backup_route_info->route_addr = prefix; - backup_route_info->route_len = prefix_length; - - backup_routes_.emplace_back(backup_route_info); - } + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - // Create main routes - std::cout << "Creating main routes" << std::endl; - forwarder_interface_.createFaceAndRoutes(main_routes_); - } - } + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::CONTENT_SHARING_MODE, + configuration_.content_sharing_mode_); - void onRouteConfigured( - std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { - std::cout << "Routes successfully configured!" << std::endl; - } -#endif + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - transport::auth::VerificationPolicy onAuthFailed( - transport::auth::Suffix suffix, - transport::auth::VerificationPolicy policy) { - auth_alerts_++; - return transport::auth::VerificationPolicy::ACCEPT; - } + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (ConsumerContentObjectCallback)std::bind( + &Impl::ConsumerContext::checkReceivedRtcContent, this, + std::placeholders::_1, std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - int setup() { - int ret; + std::shared_ptr<TransportStatistics> transport_stats; + ret = consumer_socket_->getSocketOption( + OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); - if (configuration_.rtc_) { - configuration_.transport_protocol_ = RTC; - } else if (configuration_.window < 0) { - configuration_.transport_protocol_ = RAAQM; - } else { - configuration_.transport_protocol_ = CBR; - } + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - if (configuration_.relay_ && configuration_.rtc_) { - int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; - producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); - producer_socket_->registerPrefix(configuration_.relay_name_); - producer_socket_->connect(); - producer_socket_->start(); + return ERROR_SUCCESS; } - if (configuration_.output_stream_mode_ && configuration_.rtc_) { - remote_ = asio::ip::udp::endpoint( - asio::ip::address::from_string("127.0.0.1"), configuration_.port_); - socket_.open(asio::ip::udp::v4()); - } + int setupRAAQMSocket() { + int ret = ERROR_SUCCESS; + + configuration_.transport_protocol_ = RAAQM; - if (configuration_.secure_) { - consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>( - RAAQM, configuration_.transport_protocol_); - if (configuration_.producer_prefix_.getPrefixLength() == 0) { - std::cerr << "ERROR -- Missing producer prefix on which perform the " - "handshake." - << std::endl; - } else { - P2PSecureConsumerSocket &secure_consumer_socket = - *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get())); - secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); - } - } else { consumer_socket_ = std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); - } - consumer_socket_->setSocketOption( - GeneralTransportOptions::INTEREST_LIFETIME, - configuration_.interest_lifetime_); - - consumer_socket_->setSocketOption( - GeneralTransportOptions::UNVERIFIED_INTERVAL, - configuration_.unverified_interval_); - - consumer_socket_->setSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, - configuration_.unverified_ratio_); - - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::PACKET_FORMAT, - configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) { - std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; - return ERROR_SETUP; - } - -#if defined(DEBUG) && defined(__linux__) - std::shared_ptr<transport::BasePortal> portal; - consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - signals_ = - std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1); - signals_->async_wait([this](const std::error_code &, const int &) { - std::cout << "Signal SIGUSR1!" << std::endl; - mtrace(); - }); - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, - [this](notification::Strategy strategy) { - std::cout << "Forwarder strategy callback" << std::endl; - }); - if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, - [this](notification::Strategy strategy) { - std::cout << "Recovery strategy callback" << std::endl; - }); - if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; -#endif - - if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, - configuration_.window) == - SOCKET_OPTION_NOT_SET) { - std::cerr << "ERROR -- Impossible to set the size of the window." - << std::endl; - return ERROR_SETUP; - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.beta != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - configuration_.beta) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + if (configuration_.beta_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::BETA_VALUE, configuration_.beta_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - } - if (configuration_.transport_protocol_ == RAAQM && - configuration_.drop_factor != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - configuration_.drop_factor) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + if (configuration_.drop_factor_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - } - - std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>(); - if (!configuration_.producer_certificate.empty()) { - verifier = std::make_shared<AsymmetricVerifier>( - configuration_.producer_certificate); + return ERROR_SUCCESS; } - if (!configuration_.passphrase.empty()) { - verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase); - } + int setupCBRSocket() { + configuration_.transport_protocol_ = CBR; - verifier->setVerificationFailedCallback( - std::bind(&HIperfClient::Impl::onAuthFailed, this, - std::placeholders::_1, std::placeholders::_2)); + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + return ERROR_SUCCESS; } - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this, - std::placeholders::_1, - std::placeholders::_2)); + public: + int setup() { + int ret; + std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>(); + + if (configuration_.rtc_) { + ret = setupRTCSocket(); + } else if (configuration_.window_ < 0) { + ret = setupRAAQMSocket(); + } else { + ret = setupCBRSocket(); + } - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } + if (ret != ERROR_SUCCESS) { + return ret; + } - if (!configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &callback_); - } else { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); - } - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (configuration_.rtc_) { - if (configuration_.recovery_strategy_ == 1) { // unreliable - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF); - } else if (configuration_.recovery_strategy_ == 2) { // rtx only - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); - } else if (configuration_.recovery_strategy_ == 3) { // fec only - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY); - } else if (configuration_.recovery_strategy_ == 4) { // delay based - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED); - } else if (configuration_.recovery_strategy_ == 5) { // low rate flow - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE); - } else if (configuration_.recovery_strategy_ == - 6) { // low rate + bestpath - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH); - } else if (configuration_.recovery_strategy_ == - 7) { // low rate + replication - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION); - } else if (configuration_.recovery_strategy_ == - 8) { // low rate + bestpath or replication - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies:: - LOW_RATE_AND_ALL_FWD_STRATEGIES); - } else { - // default - ret = consumer_socket_->setSocketOption( - RtcTransportOptions::RECOVERY_STRATEGY, - (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY); + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + configuration_.manifest_factor_relevant_); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - } - if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - RtcTransportOptions::AGGREGATED_DATA, - configuration_.aggregated_data_); + GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + configuration_.manifest_factor_alert_); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - } - if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - (ConsumerContentObjectCallback)std::bind( - &Impl::checkReceivedRtcContent, this, std::placeholders::_1, - std::placeholders::_2)); + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_); if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; return ERROR_SETUP; } - } - - if (configuration_.rtc_) { - std::shared_ptr<TransportStatistics> transport_stats; - consumer_socket_->getSocketOption( - OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); - transport_stats->setAlpha(0.0); - } - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::STATS_SUMMARY, - (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this, - std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::STATS_INTERVAL, - configuration_.report_interval_milliseconds_) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - consumer_socket_->connect(); - return ERROR_SUCCESS; - } + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] notification::Strategy strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - int run() { - std::cout << "Starting download of " << configuration_.name << std::endl; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] notification::Strategy strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - signals_.add(SIGINT); - signals_.async_wait( - [this](const std::error_code &, const int &) { io_service_.stop(); }); + ret = consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, + configuration_.window_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() + << "ERROR -- Impossible to set the size of the window." + << std::endl; + return ERROR_SETUP; + } - t_download_ = t_stats_ = utils::SteadyTime::now(); - consumer_socket_->consume(configuration_.name); + if (!configuration_.producer_certificate_.empty()) { + verifier = std::make_shared<AsymmetricVerifier>( + configuration_.producer_certificate_); + } - io_service_.run(); - consumer_socket_->stop(); + if (!configuration_.passphrase_.empty()) { + verifier = + std::make_shared<SymmetricVerifier>(configuration_.passphrase_); + } - return ERROR_SUCCESS; - } + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); - private: - class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = HIPERF_MTU; + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - public: - RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); - Packet::Format format = - PayloadSize::getFormatFromName(client_.configuration_.name, false); - payload_size_max_ = - PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE); - } + // Signer for aggregatd interests + std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + signer = std::make_shared<SymmetricSigner>( + CryptoSuite::HMAC_SHA256, configuration_.aggr_interest_passphrase_); + } + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; - bool isBufferMovable() noexcept override { return false; } + if (configuration_.aggregated_interests_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_INTERESTS, true); - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = mtu; - } + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + } - void readDataAvailable(std::size_t length) noexcept override { - client_.received_bytes_ += length; - client_.received_data_pkt_++; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &ConsumerContext::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); - // collecting delay stats. Just for performance testing - uint64_t *senderTimeStamp = - (uint64_t *)client_.configuration_.receive_buffer->writableData(); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - auto now = utils::SystemTime::nowMs().count(); - double new_delay = (double)(now - *senderTimeStamp); + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, this); - if (*senderTimeStamp > now) - new_delay = -1 * (double)(*senderTimeStamp - now); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - client_.delay_sample_++; - client_.avg_data_delay_ = - client_.avg_data_delay_ + - (new_delay - client_.avg_data_delay_) / client_.delay_sample_; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::STATS_SUMMARY, + (ConsumerTimerCallback)std::bind( + &Impl::ConsumerContext::handleTimerExpiration, this, + std::placeholders::_1, std::placeholders::_2)); - if (client_.configuration_.test_mode_) { - client_.data_delays_ += std::to_string(int(new_delay)); - client_.data_delays_ += ","; + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - if (client_.configuration_.relay_) { - client_.producer_socket_->produceDatagram( - client_.configuration_.relay_name_.getName(), - client_.configuration_.receive_buffer->writableData(), - length < payload_size_max_ ? length : payload_size_max_); - } - if (client_.configuration_.output_stream_mode_) { - uint8_t *start = - (uint8_t *)client_.configuration_.receive_buffer->writableData(); - start += sizeof(uint64_t); - std::size_t pkt_len = length - sizeof(uint64_t); - client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_); + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::STATS_INTERVAL, + configuration_.report_interval_milliseconds_) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - } - size_t maxBufferSize() const override { return mtu; } + consumer_socket_->connect(); - void readError(const std::error_code &ec) noexcept override { - std::cerr << "Error while reading from RTC socket" << std::endl; - client_.io_service_.stop(); + return ERROR_SUCCESS; } - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Data successfully read" << std::endl; + /*************************************************************** + * Run functions + ***************************************************************/ + + int run() { + getOutputStream() << "Starting download of " << flow_name_ << std::endl; + + saved_stats_.t_download_ = saved_stats_.t_stats_ = + utils::SteadyTime::now(); + consumer_socket_->consume(flow_name_); + + return ERROR_SUCCESS; } - private: - Impl &client_; + // Members initialized by the constructor + std::shared_ptr<utils::MemBuf> receive_buffer_; + asio::ip::udp::socket socket_; std::size_t payload_size_max_; + asio::ip::udp::endpoint remote_; + std::uint32_t nb_iterations_; + + // Members initialized by in-class initializer + SavedStatistics saved_stats_{}; + uint16_t header_counter_{0}; + bool first_{true}; + std::unique_ptr<ConsumerSocket> consumer_socket_; + std::unique_ptr<ProducerSocket> producer_socket_; }; - class Callback : public ConsumerSocket::ReadCallback { - public: - Callback(Impl &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = - utils::MemBuf::create(client_.configuration_.receive_buffer_size_); - } + public: + explicit Impl(const hiperf::ClientConfiguration &conf) + : config_(conf), signals_(io_service_) {} - bool isBufferMovable() noexcept override { return false; } + virtual ~Impl() = default; - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = client_.configuration_.receive_buffer_size_; + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; } - void readDataAvailable(std::size_t length) noexcept override {} - - void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} + consumer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = consumer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); - size_t maxBufferSize() const override { - return client_.configuration_.receive_buffer_size_; - } - - void readError(const std::error_code &ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); + if (ret) { + break; + } } - void readSuccess(std::size_t total_size) noexcept override { - auto t2 = utils::SteadyTime::now(); - auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2); - long usec = (long)dt.count(); - - std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" - << std::endl; + return ret; + } - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; + int run() { + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); - client_.io_service_.stop(); + for (auto &consumer_context : consumer_contexts_) { + consumer_context.run(); } - private: - Impl &client_; - }; + io_service_.run(); - hiperf::ClientConfiguration configuration_; - utils::SteadyTime::TimePoint t_stats_; - utils::SteadyTime::TimePoint t_download_; - uint32_t total_duration_milliseconds_; - uint64_t old_bytes_value_; - uint64_t old_interest_tx_value_; - uint64_t old_fec_interest_tx_value_; - uint64_t old_fec_data_rx_value_; - uint64_t old_lost_data_value_; - uint64_t old_bytes_recovered_value_; - uint64_t old_definitely_lost_data_value_; - uint32_t old_retx_value_; - uint32_t old_sent_int_value_; - uint32_t old_received_nacks_value_; - uint32_t old_fec_pkt_; - - // IMPORTANT: to be used only for performance testing, when consumer and - // producer are synchronized. Used for rtc only at the moment - double avg_data_delay_; - uint32_t delay_sample_; - - uint32_t received_bytes_; - uint32_t received_data_pkt_; - uint32_t auth_alerts_; - - std::string data_delays_; + return ERROR_SUCCESS; + } + ClientConfiguration &getConfig() { return config_; } + + private: asio::io_service io_service_; + hiperf::ClientConfiguration config_; asio::signal_set signals_; - RTCCallback rtc_callback_; - Callback callback_; - std::unique_ptr<ConsumerSocket> consumer_socket_; - std::unique_ptr<ProducerSocket> producer_socket_; - asio::ip::udp::socket socket_; - asio::ip::udp::endpoint remote_; - - ForwarderConfiguration config_; - // uint16_t switch_threshold_; /* ms */ - bool fwd_connected_; - bool use_bestpath_; - uint32_t rtt_threshold_; /* ms */ - uint32_t loss_threshold_; /* ms */ - std::string prefix_name_; // bestpath route - uint32_t prefix_len_; - // bool done_; - - std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; - std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; - uint16_t header_counter_mask_; - uint16_t header_counter_; - - bool print_headers_; - bool first_; - - ForwarderInterface forwarder_interface_; + std::vector<ConsumerContext> consumer_contexts_; }; -HIperfClient::HIperfClient(const ClientConfiguration &conf) { - impl_ = new Impl(conf); -} - -HIperfClient::HIperfClient(HIperfClient &&other) { - impl_ = other.impl_; - other.impl_ = nullptr; -} - -HIperfClient &HIperfClient::operator=(HIperfClient &&other) { - if (this != &other) { - impl_ = other.impl_; - other.impl_ = nullptr; - } - - return *this; -} +HIperfClient::HIperfClient(const ClientConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} -HIperfClient::~HIperfClient() { delete impl_; } +HIperfClient::~HIperfClient() = default; -int HIperfClient::setup() { return impl_->setup(); } +int HIperfClient::setup() const { return impl_->setup(); } -void HIperfClient::run() { impl_->run(); } +void HIperfClient::run() const { impl_->run(); } } // namespace hiperf |