diff options
Diffstat (limited to 'apps/hiperf/src/client.cc')
-rw-r--r-- | apps/hiperf/src/client.cc | 882 |
1 files changed, 882 insertions, 0 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc new file mode 100644 index 000000000..1ce5b4c55 --- /dev/null +++ b/apps/hiperf/src/client.cc @@ -0,0 +1,882 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <client.h> +#include <hicn/transport/portability/endianess.h> + +#include <libconfig.h++> + +namespace hiperf { + +/** + * Forward declaration of client Read callbacks. + */ +class RTCCallback; +class Callback; + +using transport::auth::CryptoHashType; +using transport::core::Packet; +using transport::core::Prefix; +using transport::interface::ConsumerCallbacksOptions; +using transport::interface::ConsumerSocket; +using transport::interface::GeneralTransportOptions; +using transport::interface::ProducerSocket; +using transport::interface::ProductionProtocolAlgorithms; +using transport::interface::RaaqmTransportOptions; +using transport::interface::RtcTransportOptions; +using transport::interface::RtcTransportRecoveryStrategies; +using transport::interface::StrategyCallback; +using transport::interface::TransportStatistics; + +/** + * Hiperf client class: configure and setup an hicn consumer following the + * ClientConfiguration. + */ +class HIperfClient::Impl { + friend class Callback; + friend class RTCCallback; + + static inline constexpr uint16_t klog2_header_counter() { return 4; } + static inline constexpr uint16_t kheader_counter_mask() { + return (1 << klog2_header_counter()) - 1; + } + + class ConsumerContext + : public Base<ConsumerContext, ClientConfiguration, Impl>, + private ConsumerSocket::ReadCallback { + static inline const std::size_t kmtu = HIPERF_MTU; + + public: + using ConfType = ClientConfiguration; + using ParentType = typename HIperfClient::Impl; + static inline auto getContextType() -> std::string { + return "ConsumerContext"; + } + + ConsumerContext(Impl &client, int consumer_identifier) + : Base(client, client.io_service_, consumer_identifier), + receive_buffer_( + utils::MemBuf::create(client.config_.receive_buffer_size_)), + socket_(client.io_service_), + payload_size_max_(PayloadSize(client.config_.packet_format_) + .getPayloadSizeMax(RTC_HEADER_SIZE)), + nb_iterations_(client.config_.nb_iterations_) {} + + ConsumerContext(ConsumerContext &&other) noexcept + : Base(std::move(other)), + receive_buffer_(std::move(other.receive_buffer_)), + socket_(std::move(other.socket_)), + payload_size_max_(other.payload_size_max_), + remote_(std::move(other.remote_)), + nb_iterations_(other.nb_iterations_), + saved_stats_(std::move(other.saved_stats_)), + header_counter_(other.header_counter_), + first_(other.first_), + consumer_socket_(std::move(other.consumer_socket_)), + producer_socket_(std::move(other.producer_socket_)) {} + + ~ConsumerContext() override = default; + + /*************************************************************** + * ConsumerSocket::ReadCallback implementation + ***************************************************************/ + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = receive_buffer_->writableData(); + + if (configuration_.rtc_) { + *max_length = kmtu; + } else { + *max_length = configuration_.receive_buffer_size_; + } + } + + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override { + // Nothing to do here + auto ret = std::move(buffer); + } + + void readDataAvailable(std::size_t length) noexcept override { + if (configuration_.rtc_) { + saved_stats_.received_bytes_ += length; + saved_stats_.received_data_pkt_++; + + // collecting delay stats. Just for performance testing + auto senderTimeStamp = + *reinterpret_cast<uint64_t *>(receive_buffer_->writableData()); + + auto now = utils::SystemTime::nowMs().count(); + auto new_delay = double(now - senderTimeStamp); + + if (senderTimeStamp > now) + new_delay = -1 * double(senderTimeStamp - now); + + saved_stats_.delay_sample_++; + saved_stats_.avg_data_delay_ = + saved_stats_.avg_data_delay_ + + (double(new_delay) - saved_stats_.avg_data_delay_) / + saved_stats_.delay_sample_; + + if (configuration_.test_mode_) { + saved_stats_.data_delays_ += std::to_string(int(new_delay)); + saved_stats_.data_delays_ += ","; + } + + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + producer_socket_->produceDatagram( + configuration_.relay_name_.makeName(), + receive_buffer_->writableData(), + length < payload_size_max_ ? length : payload_size_max_); + } + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + const uint8_t *start = receive_buffer_->writableData(); + start += sizeof(uint64_t); + std::size_t pkt_len = length - sizeof(uint64_t); + socket_.send_to(asio::buffer(start, pkt_len), remote_); + } + } + } + + size_t maxBufferSize() const override { + return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_; + } + + void readError(const std::error_code &ec) noexcept override { + getOutputStream() << "Error " << ec.message() + << " while reading from socket" << std::endl; + parent_.io_service_.stop(); + } + + void readSuccess(std::size_t total_size) noexcept override { + if (configuration_.rtc_) { + getOutputStream() << "Data successfully read" << std::endl; + } else { + auto t2 = utils::SteadyTime::now(); + auto dt = + utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2); + auto usec = dt.count(); + + getOutputStream() << "Content retrieved. Size: " << total_size + << " [Bytes]" << std::endl; + + getOutputStream() << "Elapsed Time: " << usec / 1000000.0 + << " seconds -- " + << double(total_size * 8) * 1.0 / double(usec) * 1.0 + << " [Mbps]" << std::endl; + + parent_.io_service_.stop(); + } + } + + /*************************************************************** + * End of ConsumerSocket::ReadCallback implementation + ***************************************************************/ + + private: + struct SavedStatistics { + utils::SteadyTime::TimePoint t_stats_{}; + utils::SteadyTime::TimePoint t_download_{}; + uint32_t total_duration_milliseconds_{0}; + uint64_t old_bytes_value_{0}; + uint64_t old_interest_tx_value_{0}; + uint64_t old_fec_interest_tx_value_{0}; + uint64_t old_fec_data_rx_value_{0}; + uint64_t old_lost_data_value_{0}; + uint64_t old_bytes_recovered_value_{0}; + uint64_t old_definitely_lost_data_value_{0}; + uint64_t old_retx_value_{0}; + uint64_t old_sent_int_value_{0}; + uint64_t old_received_nacks_value_{0}; + uint32_t old_fec_pkt_{0}; + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_{0}; + uint32_t delay_sample_{0}; + uint32_t received_bytes_{0}; + uint32_t received_data_pkt_{0}; + uint32_t auth_alerts_{0}; + std::string data_delays_{""}; + }; + + /*************************************************************** + * Transport callbacks + ***************************************************************/ + + void checkReceivedRtcContent( + [[maybe_unused]] const ConsumerSocket &c, + [[maybe_unused]] const transport::core::ContentObject &content_object) + const { + // Nothing to do here + } + + void processLeavingInterest( + const ConsumerSocket & /*c*/, + const transport::core::Interest & /*interest*/) const { + // Nothing to do here + } + + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix /*suffix*/, + transport::auth::VerificationPolicy /*policy*/) { + saved_stats_.auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + + void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c, + const TransportStatistics &stats) { + const char separator = ' '; + const int width = 18; + + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = + utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2); + + std::stringstream interval_ms; + interval_ms << saved_stats_.total_duration_milliseconds_ << "-" + << saved_stats_.total_duration_milliseconds_ + + exact_duration.count(); + + std::stringstream bytes_transferred; + bytes_transferred << std::fixed << std::setprecision(3) + << double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) / + 1000000.0 + << std::setfill(separator); + + std::stringstream bandwidth; + bandwidth << (double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) * + 8) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream window; + window << stats.getAverageWindowSize() << std::setfill(separator); + + std::stringstream avg_rtt; + avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt() + << std::setfill(separator); + + if (configuration_.rtc_) { + std::stringstream lost_data; + lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_ + << std::setfill(separator); + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + saved_stats_.old_bytes_recovered_value_ + << std::setfill(separator); + + std::stringstream definitely_lost_data; + definitely_lost_data << stats.getDefinitelyLostData() - + saved_stats_.old_definitely_lost_data_value_ + << std::setfill(separator); + + std::stringstream data_delay; + data_delay << std::fixed << std::setprecision(3) + << saved_stats_.avg_data_delay_ << std::setfill(separator); + + std::stringstream received_data_pkt; + received_data_pkt << saved_stats_.received_data_pkt_ + << std::setfill(separator); + + std::stringstream goodput; + goodput << std::fixed << std::setprecision(3) + << (saved_stats_.received_bytes_ * 8.0) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator); + + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_ + << std::setfill(separator); + + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - + saved_stats_.old_sent_int_value_ + << std::setfill(separator); + + std::stringstream nacks; + nacks << stats.getReceivedNacks() - + saved_stats_.old_received_nacks_value_ + << std::setfill(separator); + + std::stringstream fec_pkt; + fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_ + << std::setfill(separator); + + std::stringstream queuing_delay; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); + + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator); + + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() + << std::right << std::setw(width) << "RecvData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() + << std::right << std::setw(width) << "Goodput[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "LossRate[%]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "InterestSent"; + getOutputStream() + << std::right << std::setw(width) << "ReceivedNacks"; + getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]"; + getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]"; + getOutputStream() + << std::right << std::setw(width) << "QueuingDelay[ms]"; + getOutputStream() + << std::right << std::setw(width) << "LostData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "RecoveredData"; + getOutputStream() + << std::right << std::setw(width) << "DefinitelyLost"; + getOutputStream() << std::right << std::setw(width) << "State"; + getOutputStream() + << std::right << std::setw(width) << "DataDelay[ms]"; + getOutputStream() << std::right << std::setw(width) << "FecPkt"; + getOutputStream() << std::right << std::setw(width) << "Congestion"; + getOutputStream() + << std::right << std::setw(width) << "ResidualLosses"; + getOutputStream() << std::right << std::setw(width) << "QualityScore"; + getOutputStream() << std::right << std::setw(width) << "Alerts"; + getOutputStream() + << std::right << std::setw(width) << "AuthAlerts" << std::endl; + + first_ = false; + } + + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << received_data_pkt.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) << goodput.str(); + getOutputStream() << std::right << std::setw(width) << loss_rate.str(); + getOutputStream() << std::right << std::setw(width) << retx_sent.str(); + getOutputStream() << std::right << std::setw(width) + << interest_sent.str(); + getOutputStream() << std::right << std::setw(width) << nacks.str(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str(); + getOutputStream() << std::right << std::setw(width) + << queuing_delay.str(); + getOutputStream() << std::right << std::setw(width) << lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_recovered_data.str(); + getOutputStream() << std::right << std::setw(width) + << definitely_lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getCCStatus(); + getOutputStream() << std::right << std::setw(width) << data_delay.str(); + getOutputStream() << std::right << std::setw(width) << fec_pkt.str(); + getOutputStream() << std::right << std::setw(width) + << stats.isCongested(); + getOutputStream() << std::right << std::setw(width) + << residual_losses.str(); + getOutputStream() << std::right << std::setw(width) + << quality_score.str(); + getOutputStream() << std::right << std::setw(width) << alerts.str(); + getOutputStream() << std::right << std::setw(width) << auth_alerts.str() + << std::endl; + + if (configuration_.test_mode_) { + if (saved_stats_.data_delays_.size() > 0) + saved_stats_.data_delays_.pop_back(); + + auto now = utils::SteadyTime::nowMs(); + getOutputStream() << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << saved_stats_.data_delays_ + << "]" << std::endl; + } + } else { + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() << std::right << std::setw(width) << "Transfer[MB]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]"; + getOutputStream() + << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl; + + first_ = false; + } + + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_transferred.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getRetxCount(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str() + << std::endl; + } + + saved_stats_.total_duration_milliseconds_ += + (uint32_t)exact_duration.count(); + saved_stats_.old_bytes_value_ = stats.getBytesRecv(); + saved_stats_.old_lost_data_value_ = stats.getLostData(); + saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + saved_stats_.old_definitely_lost_data_value_ = + stats.getDefinitelyLostData(); + saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv(); + saved_stats_.old_retx_value_ = stats.getRetxCount(); + saved_stats_.old_sent_int_value_ = stats.getInterestTx(); + saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks(); + saved_stats_.old_fec_pkt_ = stats.getReceivedFEC(); + saved_stats_.delay_sample_ = 0; + saved_stats_.avg_data_delay_ = 0; + saved_stats_.received_bytes_ = 0; + saved_stats_.received_data_pkt_ = 0; + saved_stats_.data_delays_ = ""; + saved_stats_.t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & kheader_counter_mask(); + + if (--nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + parent_.io_service_.stop(); + } + } + + /*************************************************************** + * Setup functions + ***************************************************************/ + + int setupRTCSocket() { + int ret = ERROR_SUCCESS; + + configuration_.transport_protocol_ = transport::interface::RTC; + + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + producer_socket_ = + std::make_unique<ProducerSocket>(production_protocol); + producer_socket_->registerPrefix(configuration_.relay_name_); + producer_socket_->connect(); + producer_socket_->start(); + } + + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + } + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + RtcTransportRecoveryStrategies recovery_strategy = + RtcTransportRecoveryStrategies::RTX_ONLY; + switch (configuration_.recovery_strategy_) { + case 1: + recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF; + break; + case 2: + recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY; + break; + case 3: + recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY; + break; + case 4: + recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED; + break; + case 5: + recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE; + break; + case 6: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH; + break; + case 7: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION; + break; + case 8: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES; + break; + case 9: + recovery_strategy = + RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES; + break; + case 10: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH; + break; + case 11: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION; + break; + default: + break; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + static_cast<uint32_t>(recovery_strategy)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::CONTENT_SHARING_MODE, + configuration_.content_sharing_mode_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (transport::interface::ConsumerContentObjectCallback)std::bind( + &Impl::ConsumerContext::checkReceivedRtcContent, this, + std::placeholders::_1, std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + std::shared_ptr<TransportStatistics> transport_stats; + ret = consumer_socket_->getSocketOption( + transport::interface::OtherOptions::STATISTICS, + (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + return ERROR_SUCCESS; + } + + int setupRAAQMSocket() { + int ret = ERROR_SUCCESS; + + configuration_.transport_protocol_ = transport::interface::RAAQM; + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + if (configuration_.beta_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::BETA_VALUE, configuration_.beta_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.drop_factor_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + return ERROR_SUCCESS; + } + + int setupCBRSocket() { + configuration_.transport_protocol_ = transport::interface::CBR; + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + return ERROR_SUCCESS; + } + + public: + int setup() { + int ret; + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::VoidVerifier>(); + + if (configuration_.rtc_) { + ret = setupRTCSocket(); + } else if (configuration_.window_ < 0) { + ret = setupRAAQMSocket(); + } else { + ret = setupCBRSocket(); + } + + if (ret != ERROR_SUCCESS) { + return ret; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + configuration_.manifest_factor_relevant_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + configuration_.manifest_factor_alert_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() + << "ERROR -- Impossible to set the size of the window." + << std::endl; + return ERROR_SETUP; + } + + if (!configuration_.producer_certificate_.empty()) { + verifier = std::make_shared<transport::auth::AsymmetricVerifier>( + configuration_.producer_certificate_); + } + + if (!configuration_.passphrase_.empty()) { + verifier = std::make_shared<transport::auth::SymmetricVerifier>( + configuration_.passphrase_); + } + + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); + + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + // Signer for aggregatd interests + std::shared_ptr<transport::auth::Signer> signer = + std::make_shared<transport::auth::VoidSigner>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + signer = std::make_shared<transport::auth::SymmetricSigner>( + transport::auth::CryptoSuite::HMAC_SHA256, + configuration_.aggr_interest_passphrase_); + } + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + + if (configuration_.aggregated_interests_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_INTERESTS, true); + + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (transport::interface::ConsumerInterestCallback)std::bind( + &ConsumerContext::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, this); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::STATS_SUMMARY, + (transport::interface::ConsumerTimerCallback)std::bind( + &Impl::ConsumerContext::handleTimerExpiration, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::STATS_INTERVAL, + configuration_.report_interval_milliseconds_) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + consumer_socket_->connect(); + + return ERROR_SUCCESS; + } + + /*************************************************************** + * Run functions + ***************************************************************/ + + int run() { + getOutputStream() << "Starting download of " << flow_name_ << std::endl; + + saved_stats_.t_download_ = saved_stats_.t_stats_ = + utils::SteadyTime::now(); + consumer_socket_->consume(flow_name_); + + return ERROR_SUCCESS; + } + + // Members initialized by the constructor + std::shared_ptr<utils::MemBuf> receive_buffer_; + asio::ip::udp::socket socket_; + std::size_t payload_size_max_; + asio::ip::udp::endpoint remote_; + std::uint32_t nb_iterations_; + + // Members initialized by in-class initializer + SavedStatistics saved_stats_{}; + uint16_t header_counter_{0}; + bool first_{true}; + std::unique_ptr<ConsumerSocket> consumer_socket_; + std::unique_ptr<ProducerSocket> producer_socket_; + }; + + public: + explicit Impl(const hiperf::ClientConfiguration &conf) + : config_(conf), signals_(io_service_) {} + + virtual ~Impl() = default; + + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; + } + + consumer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = consumer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); + + if (ret) { + break; + } + } + + return ret; + } + + int run() { + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); + + for (auto &consumer_context : consumer_contexts_) { + consumer_context.run(); + } + + io_service_.run(); + + return ERROR_SUCCESS; + } + + ClientConfiguration &getConfig() { return config_; } + + private: + asio::io_service io_service_; + hiperf::ClientConfiguration config_; + asio::signal_set signals_; + std::vector<ConsumerContext> consumer_contexts_; +}; + +HIperfClient::HIperfClient(const ClientConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} + +HIperfClient::~HIperfClient() = default; + +int HIperfClient::setup() const { return impl_->setup(); } + +void HIperfClient::run() const { impl_->run(); } + +} // namespace hiperf |