aboutsummaryrefslogtreecommitdiffstats
path: root/apps/hiperf/src/client.cc
diff options
context:
space:
mode:
Diffstat (limited to 'apps/hiperf/src/client.cc')
-rw-r--r--apps/hiperf/src/client.cc1448
1 files changed, 715 insertions, 733 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index 820ebf0ce..1ce5b4c55 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -14,8 +14,7 @@
*/
#include <client.h>
-#include <forwarder_config.h>
-#include <forwarder_interface.h>
+#include <hicn/transport/portability/endianess.h>
#include <libconfig.h++>
@@ -27,874 +26,857 @@ namespace hiperf {
class RTCCallback;
class Callback;
+using transport::auth::CryptoHashType;
+using transport::core::Packet;
+using transport::core::Prefix;
+using transport::interface::ConsumerCallbacksOptions;
+using transport::interface::ConsumerSocket;
+using transport::interface::GeneralTransportOptions;
+using transport::interface::ProducerSocket;
+using transport::interface::ProductionProtocolAlgorithms;
+using transport::interface::RaaqmTransportOptions;
+using transport::interface::RtcTransportOptions;
+using transport::interface::RtcTransportRecoveryStrategies;
+using transport::interface::StrategyCallback;
+using transport::interface::TransportStatistics;
+
/**
* Hiperf client class: configure and setup an hicn consumer following the
* ClientConfiguration.
*/
-class HIperfClient::Impl
-#ifdef FORWARDER_INTERFACE
- : ForwarderInterface::ICallback
-#endif
-{
- typedef std::chrono::time_point<std::chrono::steady_clock> Time;
- typedef std::chrono::microseconds TimeDuration;
-
+class HIperfClient::Impl {
friend class Callback;
friend class RTCCallback;
- struct nack_packet_t {
- uint64_t timestamp;
- uint32_t prod_rate;
- uint32_t prod_seg;
-
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
-
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
-
- inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
- inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
- };
-
- public:
- Impl(const hiperf::ClientConfiguration &conf)
- : configuration_(conf),
- total_duration_milliseconds_(0),
- old_bytes_value_(0),
- old_interest_tx_value_(0),
- old_fec_interest_tx_value_(0),
- old_fec_data_rx_value_(0),
- old_lost_data_value_(0),
- old_bytes_recovered_value_(0),
- old_definitely_lost_data_value_(0),
- old_retx_value_(0),
- old_sent_int_value_(0),
- old_received_nacks_value_(0),
- old_fec_pkt_(0),
- avg_data_delay_(0),
- delay_sample_(0),
- received_bytes_(0),
- received_data_pkt_(0),
- data_delays_(""),
- signals_(io_service_),
- rtc_callback_(*this),
- callback_(*this),
- socket_(io_service_),
- done_(false),
- switch_threshold_(~0)
-#ifdef FORWARDER_INTERFACE
- ,
- forwarder_interface_(io_service_, this)
-#endif
- {
+ static inline constexpr uint16_t klog2_header_counter() { return 4; }
+ static inline constexpr uint16_t kheader_counter_mask() {
+ return (1 << klog2_header_counter()) - 1;
}
- ~Impl() {}
+ class ConsumerContext
+ : public Base<ConsumerContext, ClientConfiguration, Impl>,
+ private ConsumerSocket::ReadCallback {
+ static inline const std::size_t kmtu = HIPERF_MTU;
- void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {}
+ public:
+ using ConfType = ClientConfiguration;
+ using ParentType = typename HIperfClient::Impl;
+ static inline auto getContextType() -> std::string {
+ return "ConsumerContext";
+ }
+
+ ConsumerContext(Impl &client, int consumer_identifier)
+ : Base(client, client.io_service_, consumer_identifier),
+ receive_buffer_(
+ utils::MemBuf::create(client.config_.receive_buffer_size_)),
+ socket_(client.io_service_),
+ payload_size_max_(PayloadSize(client.config_.packet_format_)
+ .getPayloadSizeMax(RTC_HEADER_SIZE)),
+ nb_iterations_(client.config_.nb_iterations_) {}
+
+ ConsumerContext(ConsumerContext &&other) noexcept
+ : Base(std::move(other)),
+ receive_buffer_(std::move(other.receive_buffer_)),
+ socket_(std::move(other.socket_)),
+ payload_size_max_(other.payload_size_max_),
+ remote_(std::move(other.remote_)),
+ nb_iterations_(other.nb_iterations_),
+ saved_stats_(std::move(other.saved_stats_)),
+ header_counter_(other.header_counter_),
+ first_(other.first_),
+ consumer_socket_(std::move(other.consumer_socket_)),
+ producer_socket_(std::move(other.producer_socket_)) {}
+
+ ~ConsumerContext() override = default;
+
+ /***************************************************************
+ * ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
+ bool isBufferMovable() noexcept override { return false; }
- void addFace(const std::string &local_address, uint16_t local_port,
- const std::string &remote_address, uint16_t remote_port,
- std::string interface);
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer = receive_buffer_->writableData();
- void handleTimerExpiration(ConsumerSocket &c,
- const TransportStatistics &stats) {
- const char separator = ' ';
- const int width = 15;
+ if (configuration_.rtc_) {
+ *max_length = kmtu;
+ } else {
+ *max_length = configuration_.receive_buffer_size_;
+ }
+ }
- utils::TimePoint t2 = utils::SteadyClock::now();
- auto exact_duration =
- std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ // Nothing to do here
+ auto ret = std::move(buffer);
+ }
- std::stringstream interval;
- interval << total_duration_milliseconds_ / 1000 << "-"
- << total_duration_milliseconds_ / 1000 +
- exact_duration.count() / 1000;
+ void readDataAvailable(std::size_t length) noexcept override {
+ if (configuration_.rtc_) {
+ saved_stats_.received_bytes_ += length;
+ saved_stats_.received_data_pkt_++;
- std::stringstream bytes_transferred;
- bytes_transferred << std::fixed << std::setprecision(3)
- << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
- << std::setfill(separator) << "[MB]";
+ // collecting delay stats. Just for performance testing
+ auto senderTimeStamp =
+ *reinterpret_cast<uint64_t *>(receive_buffer_->writableData());
- std::stringstream bandwidth;
- bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
- (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
+ auto now = utils::SystemTime::nowMs().count();
+ auto new_delay = double(now - senderTimeStamp);
- std::stringstream window;
- window << stats.getAverageWindowSize() << std::setfill(separator)
- << "[Int]";
+ if (senderTimeStamp > now)
+ new_delay = -1 * double(senderTimeStamp - now);
- std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+ saved_stats_.delay_sample_++;
+ saved_stats_.avg_data_delay_ =
+ saved_stats_.avg_data_delay_ +
+ (double(new_delay) - saved_stats_.avg_data_delay_) /
+ saved_stats_.delay_sample_;
- if (configuration_.rtc_) {
- // we get rtc stats more often, thus we need ms in the interval
- std::stringstream interval_ms;
- interval_ms << total_duration_milliseconds_ << "-"
- << total_duration_milliseconds_ + exact_duration.count();
-
- std::stringstream lost_data;
- lost_data << stats.getLostData() - old_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream bytes_recovered_data;
- bytes_recovered_data << stats.getBytesRecoveredData() -
- old_bytes_recovered_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream definitely_lost_data;
- definitely_lost_data << stats.getDefinitelyLostData() -
- old_definitely_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream data_delay;
- data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
-
- std::stringstream received_data_pkt;
- received_data_pkt << received_data_pkt_ << std::setfill(separator)
- << "[pkt]";
-
- std::stringstream goodput;
- goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
-
- std::stringstream loss_rate;
- loss_rate << std::fixed << std::setprecision(2)
- << stats.getLossRatio() * 100.0 << std::setfill(separator)
- << "[%]";
-
- std::stringstream retx_sent;
- retx_sent << stats.getRetxCount() - old_retx_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream interest_sent;
- interest_sent << stats.getInterestTx() - old_sent_int_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream nacks;
- nacks << stats.getReceivedNacks() - old_received_nacks_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream fec_pkt;
- fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream queuing_delay;
- queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
- << "[ms]";
-
-#ifdef FORWARDER_INTERFACE
- if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
- total_duration_milliseconds_ > 1000) {
- std::cout << "Switching due to queuing delay" << std::endl;
- forwarder_interface_.createFaceAndRoutes(backup_routes_);
- forwarder_interface_.deleteFaceAndRoutes(main_routes_);
- std::swap(backup_routes_, main_routes_);
- done_ = true;
- }
-#endif
-
- // statistics not yet available in the transport
- // std::stringstream interest_fec_tx;
- // interest_fec_tx << stats.getInterestFecTxCount() -
- // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
- // std::stringstream bytes_fec_recv;
- // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
- // << std::setfill(separator) << "[bytes]";
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "RecvData";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Goodput";
- std::cout << std::left << std::setw(width) << "LossRate";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "InterestSent";
- std::cout << std::left << std::setw(width) << "ReceivedNacks";
- std::cout << std::left << std::setw(width) << "SyncWnd";
- std::cout << std::left << std::setw(width) << "MinRtt";
- std::cout << std::left << std::setw(width) << "QueuingDelay";
- std::cout << std::left << std::setw(width) << "LostData";
- std::cout << std::left << std::setw(width) << "RecoveredData";
- std::cout << std::left << std::setw(width) << "DefinitelyLost";
- std::cout << std::left << std::setw(width) << "State";
- std::cout << std::left << std::setw(width) << "DataDelay";
- std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval_ms.str();
- std::cout << std::left << std::setw(width) << received_data_pkt.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << goodput.str();
- std::cout << std::left << std::setw(width) << loss_rate.str();
- std::cout << std::left << std::setw(width) << retx_sent.str();
- std::cout << std::left << std::setw(width) << interest_sent.str();
- std::cout << std::left << std::setw(width) << nacks.str();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str();
- std::cout << std::left << std::setw(width) << queuing_delay.str();
- std::cout << std::left << std::setw(width) << lost_data.str();
- std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
- std::cout << std::left << std::setw(width) << definitely_lost_data.str();
- std::cout << std::left << std::setw(width) << stats.getCCStatus();
- std::cout << std::left << std::setw(width) << data_delay.str();
- std::cout << std::left << std::setw(width) << fec_pkt.str();
- std::cout << std::endl;
-
- if (configuration_.test_mode_) {
- if (data_delays_.size() > 0) data_delays_.pop_back();
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
- << std::endl;
- }
-
- // statistics not yet available in the transport
- // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
- // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
- } else {
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "Transfer";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "Cwnd";
- std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval.str();
- std::cout << std::left << std::setw(width) << bytes_transferred.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << stats.getRetxCount();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
- std::cout << std::endl;
- }
- total_duration_milliseconds_ += (uint32_t)exact_duration.count();
- old_bytes_value_ = stats.getBytesRecv();
- old_lost_data_value_ = stats.getLostData();
- old_bytes_recovered_value_ = stats.getBytesRecoveredData();
- old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
- old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
- old_fec_data_rx_value_ = stats.getBytesFecRecv();
- old_retx_value_ = stats.getRetxCount();
- old_sent_int_value_ = stats.getInterestTx();
- old_received_nacks_value_ = stats.getReceivedNacks();
- old_fec_pkt_ = stats.getReceivedFEC();
- delay_sample_ = 0;
- avg_data_delay_ = 0;
- received_bytes_ = 0;
- received_data_pkt_ = 0;
- data_delays_ = "";
-
- t_stats_ = utils::SteadyClock::now();
- }
+ if (configuration_.test_mode_) {
+ saved_stats_.data_delays_ += std::to_string(int(new_delay));
+ saved_stats_.data_delays_ += ",";
+ }
- bool parseConfig(const char *conf_file) {
- using namespace libconfig;
- Config cfg;
-
- try {
- cfg.readFile(conf_file);
- } catch (const FileIOException &fioex) {
- std::cerr << "I/O error while reading file." << std::endl;
- return false;
- } catch (const ParseException &pex) {
- std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
- << " - " << pex.getError() << std::endl;
- return false;
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ producer_socket_->produceDatagram(
+ configuration_.relay_name_.makeName(),
+ receive_buffer_->writableData(),
+ length < payload_size_max_ ? length : payload_size_max_);
+ }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ const uint8_t *start = receive_buffer_->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ socket_.send_to(asio::buffer(start, pkt_len), remote_);
+ }
+ }
}
- Setting &config = cfg.getRoot();
+ size_t maxBufferSize() const override {
+ return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_;
+ }
- if (config.exists("switch_threshold")) {
- unsigned threshold;
- config.lookupValue("switch_threshold", threshold);
- switch_threshold_ = threshold;
+ void readError(const std::error_code &ec) noexcept override {
+ getOutputStream() << "Error " << ec.message()
+ << " while reading from socket" << std::endl;
+ parent_.io_service_.stop();
}
- // listeners
- if (config.exists("listeners")) {
- // get path where looking for modules
- const Setting &listeners = config.lookup("listeners");
- auto count = listeners.getLength();
+ void readSuccess(std::size_t total_size) noexcept override {
+ if (configuration_.rtc_) {
+ getOutputStream() << "Data successfully read" << std::endl;
+ } else {
+ auto t2 = utils::SteadyTime::now();
+ auto dt =
+ utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2);
+ auto usec = dt.count();
- for (int i = 0; i < count; i++) {
- const Setting &listener = listeners[i];
- ListenerConfig list;
- unsigned port;
- std::string interface;
+ getOutputStream() << "Content retrieved. Size: " << total_size
+ << " [Bytes]" << std::endl;
- list.name = listener.getName();
- listener.lookupValue("local_address", list.address);
- listener.lookupValue("local_port", port);
- listener.lookupValue("interface", list.interface);
- list.port = (uint16_t)(port);
+ getOutputStream() << "Elapsed Time: " << usec / 1000000.0
+ << " seconds -- "
+ << double(total_size * 8) * 1.0 / double(usec) * 1.0
+ << " [Mbps]" << std::endl;
- std::cout << "Adding listener " << list.name << ", ( " << list.address
- << ":" << list.port << ")" << std::endl;
- config_.addListener(std::move(list));
+ parent_.io_service_.stop();
}
}
- // connectors
- if (config.exists("connectors")) {
- // get path where looking for modules
- const Setting &connectors = config.lookup("connectors");
- auto count = connectors.getLength();
-
- for (int i = 0; i < count; i++) {
- const Setting &connector = connectors[i];
- ConnectorConfig conn;
-
- conn.name = connector.getName();
- unsigned port = 0;
-
- if (!connector.lookupValue("local_address", conn.local_address)) {
- conn.local_address = "";
- }
+ /***************************************************************
+ * End of ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- if (!connector.lookupValue("local_port", port)) {
- port = 0;
- }
-
- conn.local_port = (uint16_t)(port);
+ private:
+ struct SavedStatistics {
+ utils::SteadyTime::TimePoint t_stats_{};
+ utils::SteadyTime::TimePoint t_download_{};
+ uint32_t total_duration_milliseconds_{0};
+ uint64_t old_bytes_value_{0};
+ uint64_t old_interest_tx_value_{0};
+ uint64_t old_fec_interest_tx_value_{0};
+ uint64_t old_fec_data_rx_value_{0};
+ uint64_t old_lost_data_value_{0};
+ uint64_t old_bytes_recovered_value_{0};
+ uint64_t old_definitely_lost_data_value_{0};
+ uint64_t old_retx_value_{0};
+ uint64_t old_sent_int_value_{0};
+ uint64_t old_received_nacks_value_{0};
+ uint32_t old_fec_pkt_{0};
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_{0};
+ uint32_t delay_sample_{0};
+ uint32_t received_bytes_{0};
+ uint32_t received_data_pkt_{0};
+ uint32_t auth_alerts_{0};
+ std::string data_delays_{""};
+ };
+
+ /***************************************************************
+ * Transport callbacks
+ ***************************************************************/
+
+ void checkReceivedRtcContent(
+ [[maybe_unused]] const ConsumerSocket &c,
+ [[maybe_unused]] const transport::core::ContentObject &content_object)
+ const {
+ // Nothing to do here
+ }
+
+ void processLeavingInterest(
+ const ConsumerSocket & /*c*/,
+ const transport::core::Interest & /*interest*/) const {
+ // Nothing to do here
+ }
+
+ transport::auth::VerificationPolicy onAuthFailed(
+ transport::auth::Suffix /*suffix*/,
+ transport::auth::VerificationPolicy /*policy*/) {
+ saved_stats_.auth_alerts_++;
+ return transport::auth::VerificationPolicy::ACCEPT;
+ }
+
+ void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 18;
+
+ utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
+ auto exact_duration =
+ utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2);
- if (!connector.lookupValue("remote_address", conn.remote_address)) {
- std::cerr
- << "Error in configuration file: remote_address is a mandatory "
- "field of Connectors."
- << std::endl;
- return false;
+ std::stringstream interval_ms;
+ interval_ms << saved_stats_.total_duration_milliseconds_ << "-"
+ << saved_stats_.total_duration_milliseconds_ +
+ exact_duration.count();
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) /
+ 1000000.0
+ << std::setfill(separator);
+
+ std::stringstream bandwidth;
+ bandwidth << (double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) *
+ 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator);
+
+ std::stringstream avg_rtt;
+ avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt()
+ << std::setfill(separator);
+
+ if (configuration_.rtc_) {
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ saved_stats_.old_bytes_recovered_value_
+ << std::setfill(separator);
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ saved_stats_.old_definitely_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream data_delay;
+ data_delay << std::fixed << std::setprecision(3)
+ << saved_stats_.avg_data_delay_ << std::setfill(separator);
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << saved_stats_.received_data_pkt_
+ << std::setfill(separator);
+
+ std::stringstream goodput;
+ goodput << std::fixed << std::setprecision(3)
+ << (saved_stats_.received_bytes_ * 8.0) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator);
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_
+ << std::setfill(separator);
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() -
+ saved_stats_.old_sent_int_value_
+ << std::setfill(separator);
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() -
+ saved_stats_.old_received_nacks_value_
+ << std::setfill(separator);
+
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_
+ << std::setfill(separator);
+
+ std::stringstream queuing_delay;
+ queuing_delay << std::fixed << std::setprecision(3)
+ << stats.getQueuingDelay() << std::setfill(separator);
+
+ std::stringstream residual_losses;
+ double rl_perc = stats.getResidualLossRate() * 100;
+ residual_losses << std::fixed << std::setprecision(2) << rl_perc
+ << std::setfill(separator);
+
+ std::stringstream quality_score;
+ quality_score << std::fixed << (int)stats.getQualityScore()
+ << std::setfill(separator);
+
+ std::stringstream alerts;
+ alerts << stats.getAlerts() << std::setfill(separator);
+
+ std::stringstream auth_alerts;
+ auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator);
+
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecvData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Goodput[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "LossRate[%]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "InterestSent";
+ getOutputStream()
+ << std::right << std::setw(width) << "ReceivedNacks";
+ getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "QueuingDelay[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "LostData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecoveredData";
+ getOutputStream()
+ << std::right << std::setw(width) << "DefinitelyLost";
+ getOutputStream() << std::right << std::setw(width) << "State";
+ getOutputStream()
+ << std::right << std::setw(width) << "DataDelay[ms]";
+ getOutputStream() << std::right << std::setw(width) << "FecPkt";
+ getOutputStream() << std::right << std::setw(width) << "Congestion";
+ getOutputStream()
+ << std::right << std::setw(width) << "ResidualLosses";
+ getOutputStream() << std::right << std::setw(width) << "QualityScore";
+ getOutputStream() << std::right << std::setw(width) << "Alerts";
+ getOutputStream()
+ << std::right << std::setw(width) << "AuthAlerts" << std::endl;
+
+ first_ = false;
}
- if (!connector.lookupValue("remote_port", port)) {
- std::cerr << "Error in configuration file: remote_port is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << received_data_pkt.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width) << goodput.str();
+ getOutputStream() << std::right << std::setw(width) << loss_rate.str();
+ getOutputStream() << std::right << std::setw(width) << retx_sent.str();
+ getOutputStream() << std::right << std::setw(width)
+ << interest_sent.str();
+ getOutputStream() << std::right << std::setw(width) << nacks.str();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << queuing_delay.str();
+ getOutputStream() << std::right << std::setw(width) << lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_recovered_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << definitely_lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getCCStatus();
+ getOutputStream() << std::right << std::setw(width) << data_delay.str();
+ getOutputStream() << std::right << std::setw(width) << fec_pkt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.isCongested();
+ getOutputStream() << std::right << std::setw(width)
+ << residual_losses.str();
+ getOutputStream() << std::right << std::setw(width)
+ << quality_score.str();
+ getOutputStream() << std::right << std::setw(width) << alerts.str();
+ getOutputStream() << std::right << std::setw(width) << auth_alerts.str()
+ << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (saved_stats_.data_delays_.size() > 0)
+ saved_stats_.data_delays_.pop_back();
+
+ auto now = utils::SteadyTime::nowMs();
+ getOutputStream() << std::fixed << std::setprecision(0) << now.count()
+ << " DATA-DELAYS:[" << saved_stats_.data_delays_
+ << "]" << std::endl;
}
-
- if (!connector.lookupValue("interface", conn.interface)) {
- std::cerr << "Error in configuration file: interface is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ } else {
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream() << std::right << std::setw(width) << "Transfer[MB]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]";
+ getOutputStream()
+ << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl;
+
+ first_ = false;
}
- conn.remote_port = (uint16_t)(port);
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_transferred.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getRetxCount();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str()
+ << std::endl;
+ }
- std::cout << "Adding connector " << conn.name << ", ("
- << conn.local_address << ":" << conn.local_port << " "
- << conn.remote_address << ":" << conn.remote_port << ")"
- << std::endl;
- config_.addConnector(conn.name, std::move(conn));
+ saved_stats_.total_duration_milliseconds_ +=
+ (uint32_t)exact_duration.count();
+ saved_stats_.old_bytes_value_ = stats.getBytesRecv();
+ saved_stats_.old_lost_data_value_ = stats.getLostData();
+ saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ saved_stats_.old_definitely_lost_data_value_ =
+ stats.getDefinitelyLostData();
+ saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ saved_stats_.old_retx_value_ = stats.getRetxCount();
+ saved_stats_.old_sent_int_value_ = stats.getInterestTx();
+ saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks();
+ saved_stats_.old_fec_pkt_ = stats.getReceivedFEC();
+ saved_stats_.delay_sample_ = 0;
+ saved_stats_.avg_data_delay_ = 0;
+ saved_stats_.received_bytes_ = 0;
+ saved_stats_.received_data_pkt_ = 0;
+ saved_stats_.data_delays_ = "";
+ saved_stats_.t_stats_ = utils::SteadyTime::Clock::now();
+
+ header_counter_ = (header_counter_ + 1) & kheader_counter_mask();
+
+ if (--nb_iterations_ == 0) {
+ // We reached the maximum nb of runs. Stop now.
+ parent_.io_service_.stop();
}
}
- // Routes
- if (config.exists("routes")) {
- const Setting &routes = config.lookup("routes");
- auto count = routes.getLength();
+ /***************************************************************
+ * Setup functions
+ ***************************************************************/
- for (int i = 0; i < count; i++) {
- const Setting &route = routes[i];
- RouteConfig r;
- unsigned weight;
+ int setupRTCSocket() {
+ int ret = ERROR_SUCCESS;
- r.name = route.getName();
- route.lookupValue("prefix", r.prefix);
- route.lookupValue("weight", weight);
- route.lookupValue("main_connector", r.main_connector);
- route.lookupValue("backup_connector", r.backup_connector);
- r.weight = (uint16_t)(weight);
+ configuration_.transport_protocol_ = transport::interface::RTC;
- std::cout << "Adding route " << r.name << " " << r.prefix << " ("
- << r.main_connector << " " << r.backup_connector << " "
- << r.weight << ")" << std::endl;
- config_.addRoute(std::move(r));
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ =
+ std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ producer_socket_->start();
}
- }
- std::cout << "Ok" << std::endl;
-
- return true;
- }
-
- bool splitRoute(std::string route, std::string &prefix,
- uint8_t &prefix_length) {
- std::string delimiter = "/";
-
- size_t pos = 0;
- if ((pos = route.find(delimiter)) != std::string::npos) {
- prefix = route.substr(0, pos);
- route.erase(0, pos + delimiter.length());
- } else {
- return false;
- }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
- prefix_length = std::stoul(route.substr(0));
- return true;
- }
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
-#ifdef FORWARDER_INTERFACE
- void onHicnServiceReady() override {
- std::cout << "Successfully connected to local forwarder!" << std::endl;
+ RtcTransportRecoveryStrategies recovery_strategy =
+ RtcTransportRecoveryStrategies::RTX_ONLY;
+ switch (configuration_.recovery_strategy_) {
+ case 1:
+ recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF;
+ break;
+ case 2:
+ recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY;
+ break;
+ case 3:
+ recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY;
+ break;
+ case 4:
+ recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED;
+ break;
+ case 5:
+ recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE;
+ break;
+ case 6:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH;
+ break;
+ case 7:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION;
+ break;
+ case 8:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES;
+ break;
+ case 9:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES;
+ break;
+ case 10:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH;
+ break;
+ case 11:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION;
+ break;
+ default:
+ break;
+ }
- std::cout << "Setting up listeners" << std::endl;
- const char *config = getenv("FORWARDER_CONFIG");
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ static_cast<uint32_t>(recovery_strategy));
- if (config) {
- if (!parseConfig(config)) {
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create faces and route using first face in the list.
- auto &routes = config_.getRoutes();
- auto &connectors = config_.getConnectors();
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- if (routes.size() == 0 || connectors.size() == 0) {
- std::cerr << "Nothing to configure" << std::endl;
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- for (auto &route : routes) {
- auto the_connector_it = connectors.find(route.main_connector);
- if (the_connector_it == connectors.end()) {
- std::cerr << "No valid main connector found for route " << route.name
- << std::endl;
- continue;
- }
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::CONTENT_SHARING_MODE,
+ configuration_.content_sharing_mode_);
- auto &the_connector = the_connector_it->second;
- auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
- route_info->family = AF_INET;
- route_info->local_addr = the_connector.local_address;
- route_info->local_port = the_connector.local_port;
- route_info->remote_addr = the_connector.remote_address;
- route_info->remote_port = the_connector.remote_port;
- route_info->interface = the_connector.interface;
- route_info->name = the_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- route_info->route_addr = prefix;
- route_info->route_len = prefix_length;
-
- main_routes_.emplace_back(route_info);
-
- if (!route.backup_connector.empty()) {
- // Add also the backup route
- auto the_backup_connector_it =
- connectors.find(route.backup_connector);
- if (the_backup_connector_it == connectors.end()) {
- std::cerr << "No valid backup connector found for route "
- << route.name << std::endl;
- continue;
- }
-
- auto &the_backup_connector = the_backup_connector_it->second;
- auto backup_route_info =
- std::make_shared<ForwarderInterface::RouteInfo>();
- backup_route_info->family = AF_INET;
- backup_route_info->local_addr = the_backup_connector.local_address;
- backup_route_info->local_port = the_backup_connector.local_port;
- backup_route_info->remote_addr = the_backup_connector.remote_address;
- backup_route_info->remote_port = the_backup_connector.remote_port;
- backup_route_info->interface = the_backup_connector.interface;
- backup_route_info->name = the_backup_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
-
- backup_route_info->route_addr = prefix;
- backup_route_info->route_len = prefix_length;
-
- backup_routes_.emplace_back(backup_route_info);
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (transport::interface::ConsumerContentObjectCallback)std::bind(
+ &Impl::ConsumerContext::checkReceivedRtcContent, this,
+ std::placeholders::_1, std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create main routes
- std::cout << "Creating main routes" << std::endl;
- forwarder_interface_.createFaceAndRoutes(main_routes_);
- }
- }
+ std::shared_ptr<TransportStatistics> transport_stats;
+ ret = consumer_socket_->getSocketOption(
+ transport::interface::OtherOptions::STATISTICS,
+ (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
- void onRouteConfigured(
- std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
- std::cout << "Routes successfully configured!" << std::endl;
- }
-#endif
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int setup() {
- int ret;
-
- if (configuration_.rtc_) {
- configuration_.transport_protocol_ = RTC;
- } else if (configuration_.window < 0) {
- configuration_.transport_protocol_ = RAAQM;
- } else {
- configuration_.transport_protocol_ = CBR;
+ return ERROR_SUCCESS;
}
- if (configuration_.relay_ && configuration_.rtc_) {
- int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
- producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- producer_socket_->registerPrefix(configuration_.relay_name_);
- producer_socket_->connect();
- }
+ int setupRAAQMSocket() {
+ int ret = ERROR_SUCCESS;
- if (configuration_.output_stream_mode_ && configuration_.rtc_) {
- remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
- socket_.open(asio::ip::udp::v4());
- }
+ configuration_.transport_protocol_ = transport::interface::RAAQM;
- if (configuration_.secure_) {
- consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
- RAAQM, configuration_.transport_protocol_);
- if (configuration_.producer_prefix_.getPrefixLength() == 0) {
- std::cerr << "ERROR -- Missing producer prefix on which perform the "
- "handshake."
- << std::endl;
- } else {
- P2PSecureConsumerSocket &secure_consumer_socket =
- *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
- secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
- }
- } else {
consumer_socket_ =
std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- }
- consumer_socket_->setSocketOption(
- GeneralTransportOptions::INTEREST_LIFETIME,
- configuration_.interest_lifetime_);
-
-#if defined(DEBUG) && defined(__linux__)
- std::shared_ptr<transport::BasePortal> portal;
- consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- signals_ =
- std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
- signals_->async_wait([this](const std::error_code &, const int &) {
- std::cout << "Signal SIGUSR1!" << std::endl;
- mtrace();
- });
-#endif
-
- if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
- configuration_.window) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the size of the window."
- << std::endl;
- return ERROR_SETUP;
- }
-
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.beta != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
- configuration_.beta) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.beta_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::BETA_VALUE, configuration_.beta_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.drop_factor != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- configuration_.drop_factor) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.drop_factor_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (!configuration_.producer_certificate.empty()) {
- std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
- configuration_.producer_certificate);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Verifier> verifier =
- std::make_shared<SymmetricVerifier>(configuration_.passphrase);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
- }
+ int setupCBRSocket() {
+ configuration_.transport_protocol_ = transport::interface::CBR;
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.rtc_) {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
- } else {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
- }
+ public:
+ int setup() {
+ int ret;
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::VoidVerifier>();
+
+ if (configuration_.rtc_) {
+ ret = setupRTCSocket();
+ } else if (configuration_.window_ < 0) {
+ ret = setupRAAQMSocket();
+ } else {
+ ret = setupCBRSocket();
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
- if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- (ConsumerContentObjectCallback)std::bind(
- &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
- std::placeholders::_2));
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- }
-
- if (configuration_.rtc_) {
- std::shared_ptr<TransportStatistics> transport_stats;
- consumer_socket_->getSocketOption(
- OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
- transport_stats->setAlpha(0.0);
- }
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::STATS_SUMMARY,
- (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ configuration_.manifest_factor_relevant_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ configuration_.manifest_factor_alert_);
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::STATS_INTERVAL,
- configuration_.report_interval_milliseconds_) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- consumer_socket_->connect();
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::PACKET_FORMAT,
+ configuration_.packet_format_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- return ERROR_SUCCESS;
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int run() {
- std::cout << "Starting download of " << configuration_.name << std::endl;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- signals_.add(SIGINT);
- signals_.async_wait(
- [this](const std::error_code &, const int &) { io_service_.stop(); });
+ ret = consumer_socket_->setSocketOption(
+ transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream()
+ << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name);
- io_service_.run();
+ if (!configuration_.producer_certificate_.empty()) {
+ verifier = std::make_shared<transport::auth::AsymmetricVerifier>(
+ configuration_.producer_certificate_);
+ }
- consumer_socket_->stop();
+ if (!configuration_.passphrase_.empty()) {
+ verifier = std::make_shared<transport::auth::SymmetricVerifier>(
+ configuration_.passphrase_);
+ }
- return ERROR_SUCCESS;
- }
+ verifier->setVerificationFailedCallback(
+ std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this,
+ std::placeholders::_1, std::placeholders::_2));
- private:
- class RTCCallback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t mtu = 1500;
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- public:
- RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
- }
+ // Signer for aggregatd interests
+ std::shared_ptr<transport::auth::Signer> signer =
+ std::make_shared<transport::auth::VoidSigner>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ signer = std::make_shared<transport::auth::SymmetricSigner>(
+ transport::auth::CryptoSuite::HMAC_SHA256,
+ configuration_.aggr_interest_passphrase_);
+ }
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- bool isBufferMovable() noexcept override { return false; }
+ if (configuration_.aggregated_interests_) {
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_INTERESTS, true);
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = mtu;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+ }
- void readDataAvailable(std::size_t length) noexcept override {
- client_.received_bytes_ += length;
- client_.received_data_pkt_++;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (transport::interface::ConsumerInterestCallback)std::bind(
+ &ConsumerContext::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
- // collecting delay stats. Just for performance testing
- uint64_t *senderTimeStamp =
- (uint64_t *)client_.configuration_.receive_buffer->writableData();
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- double new_delay = (double)(now - *senderTimeStamp);
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, this);
- if (*senderTimeStamp > now)
- new_delay = -1 * (double)(*senderTimeStamp - now);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- client_.delay_sample_++;
- client_.avg_data_delay_ =
- client_.avg_data_delay_ +
- (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (transport::interface::ConsumerTimerCallback)std::bind(
+ &Impl::ConsumerContext::handleTimerExpiration, this,
+ std::placeholders::_1, std::placeholders::_2));
- if (client_.configuration_.test_mode_) {
- client_.data_delays_ += std::to_string(int(new_delay));
- client_.data_delays_ += ",";
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- if (client_.configuration_.relay_) {
- client_.producer_socket_->produceDatagram(
- client_.configuration_.relay_name_.getName(),
- client_.configuration_.receive_buffer->writableData(),
- length < 1400 ? length : 1400);
- }
- if (client_.configuration_.output_stream_mode_) {
- uint8_t *start =
- (uint8_t *)client_.configuration_.receive_buffer->writableData();
- start += sizeof(uint64_t);
- std::size_t pkt_len = length - sizeof(uint64_t);
- client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- }
- size_t maxBufferSize() const override { return mtu; }
+ consumer_socket_->connect();
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error while reading from RTC socket" << std::endl;
- client_.io_service_.stop();
+ return ERROR_SUCCESS;
}
- void readSuccess(std::size_t total_size) noexcept override {
- std::cout << "Data successfully read" << std::endl;
- }
+ /***************************************************************
+ * Run functions
+ ***************************************************************/
- private:
- Impl &client_;
- };
+ int run() {
+ getOutputStream() << "Starting download of " << flow_name_ << std::endl;
- class Callback : public ConsumerSocket::ReadCallback {
- public:
- Callback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer =
- utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ saved_stats_.t_download_ = saved_stats_.t_stats_ =
+ utils::SteadyTime::now();
+ consumer_socket_->consume(flow_name_);
+
+ return ERROR_SUCCESS;
}
- bool isBufferMovable() noexcept override { return false; }
+ // Members initialized by the constructor
+ std::shared_ptr<utils::MemBuf> receive_buffer_;
+ asio::ip::udp::socket socket_;
+ std::size_t payload_size_max_;
+ asio::ip::udp::endpoint remote_;
+ std::uint32_t nb_iterations_;
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = client_.configuration_.receive_buffer_size_;
- }
+ // Members initialized by in-class initializer
+ SavedStatistics saved_stats_{};
+ uint16_t header_counter_{0};
+ bool first_{true};
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+ };
- void readDataAvailable(std::size_t length) noexcept override {}
+ public:
+ explicit Impl(const hiperf::ClientConfiguration &conf)
+ : config_(conf), signals_(io_service_) {}
- void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+ virtual ~Impl() = default;
- size_t maxBufferSize() const override {
- return client_.configuration_.receive_buffer_size_;
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
}
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error " << ec.message() << " while reading from socket"
- << std::endl;
- client_.io_service_.stop();
- }
+ consumer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = consumer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
- void readSuccess(std::size_t total_size) noexcept override {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
- long usec = (long)dt.count();
+ if (ret) {
+ break;
+ }
+ }
- std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
- << std::endl;
+ return ret;
+ }
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
+ int run() {
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
- client_.io_service_.stop();
+ for (auto &consumer_context : consumer_contexts_) {
+ consumer_context.run();
}
- private:
- Impl &client_;
- };
+ io_service_.run();
+
+ return ERROR_SUCCESS;
+ }
- hiperf::ClientConfiguration configuration_;
- Time t_stats_;
- Time t_download_;
- uint32_t total_duration_milliseconds_;
- uint64_t old_bytes_value_;
- uint64_t old_interest_tx_value_;
- uint64_t old_fec_interest_tx_value_;
- uint64_t old_fec_data_rx_value_;
- uint64_t old_lost_data_value_;
- uint64_t old_bytes_recovered_value_;
- uint64_t old_definitely_lost_data_value_;
- uint32_t old_retx_value_;
- uint32_t old_sent_int_value_;
- uint32_t old_received_nacks_value_;
- uint32_t old_fec_pkt_;
-
- // IMPORTANT: to be used only for performance testing, when consumer and
- // producer are synchronized. Used for rtc only at the moment
- double avg_data_delay_;
- uint32_t delay_sample_;
-
- uint32_t received_bytes_;
- uint32_t received_data_pkt_;
-
- std::string data_delays_;
+ ClientConfiguration &getConfig() { return config_; }
+ private:
asio::io_service io_service_;
+ hiperf::ClientConfiguration config_;
asio::signal_set signals_;
- RTCCallback rtc_callback_;
- Callback callback_;
- std::unique_ptr<ConsumerSocket> consumer_socket_;
- std::unique_ptr<ProducerSocket> producer_socket_;
- asio::ip::udp::socket socket_;
- asio::ip::udp::endpoint remote_;
-
- ForwarderConfiguration config_;
- uint16_t switch_threshold_; /* ms */
- bool done_;
- std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
- std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
-#ifdef FORWARDER_INTERFACE
- ForwarderInterface forwarder_interface_;
-#endif
+ std::vector<ConsumerContext> consumer_contexts_;
};
-HIperfClient::HIperfClient(const ClientConfiguration &conf) {
- impl_ = new Impl(conf);
-}
+HIperfClient::HIperfClient(const ClientConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfClient::~HIperfClient() { delete impl_; }
+HIperfClient::~HIperfClient() = default;
-int HIperfClient::setup() { return impl_->setup(); }
+int HIperfClient::setup() const { return impl_->setup(); }
-void HIperfClient::run() { impl_->run(); }
+void HIperfClient::run() const { impl_->run(); }
} // namespace hiperf