aboutsummaryrefslogtreecommitdiffstats
path: root/apps/hiperf
diff options
context:
space:
mode:
Diffstat (limited to 'apps/hiperf')
-rw-r--r--apps/hiperf/CMakeLists.txt45
-rw-r--r--apps/hiperf/src/client.cc1448
-rw-r--r--apps/hiperf/src/client.h14
-rw-r--r--apps/hiperf/src/common.h315
-rw-r--r--apps/hiperf/src/forwarder_config.h97
-rw-r--r--apps/hiperf/src/forwarder_interface.cc676
-rw-r--r--apps/hiperf/src/forwarder_interface.h131
-rw-r--r--apps/hiperf/src/main.cc435
-rw-r--r--apps/hiperf/src/server.cc802
-rw-r--r--apps/hiperf/src/server.h5
10 files changed, 1711 insertions, 2257 deletions
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt
index 564525e67..5a0dc3c06 100644
--- a/apps/hiperf/CMakeLists.txt
+++ b/apps/hiperf/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -12,33 +12,50 @@
# limitations under the License.
if (NOT DISABLE_EXECUTABLES)
+##############################################################
+# Source files
+##############################################################
list(APPEND HIPERF_SRC
${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc
)
+
+##############################################################
+# Libraries
+##############################################################
list (APPEND HIPERF_LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${LIBHICNCTRL_LIBRARIES}
- ${LIBHICN_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
- ${LIBCONFIG_CPP_LIBRARIES}
- ${WSOCK32_LIBRARY}
- ${WS2_32_LIBRARY}
+ PRIVATE ${LIBTRANSPORT_LIBRARIES}
+ PRIVATE ${LIBHICNCTRL_LIBRARIES}
+ PRIVATE ${LIBHICN_LIBRARIES}
+ PRIVATE ${CMAKE_THREAD_LIBS_INIT}
+ PRIVATE ${LIBCONFIG_CPP_LIBRARIES}
+ PRIVATE ${WSOCK32_LIBRARY}
+ PRIVATE ${WS2_32_LIBRARY}
+ )
+
+##############################################################
+# Compiler options
+##############################################################
+ set(COMPILER_OPTIONS
+ ${DEFAULT_COMPILER_OPTIONS}
)
+
+##############################################################
+# Build hiperf
+##############################################################
build_executable(hiperf
SOURCES ${HIPERF_SRC}
LINK_LIBRARIES ${HIPERF_LIBRARIES}
INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${LIBTRANSPORT_INCLUDE_DIRS}
- ${LIBHICNCTRL_INCLUDE_DIRS}
- ${LIBCONFIG_CPP_INCLUDE_DIRS}
- DEPENDS ${DEPENDENCIES}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
+ PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS}
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif() \ No newline at end of file
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index 820ebf0ce..1ce5b4c55 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -14,8 +14,7 @@
*/
#include <client.h>
-#include <forwarder_config.h>
-#include <forwarder_interface.h>
+#include <hicn/transport/portability/endianess.h>
#include <libconfig.h++>
@@ -27,874 +26,857 @@ namespace hiperf {
class RTCCallback;
class Callback;
+using transport::auth::CryptoHashType;
+using transport::core::Packet;
+using transport::core::Prefix;
+using transport::interface::ConsumerCallbacksOptions;
+using transport::interface::ConsumerSocket;
+using transport::interface::GeneralTransportOptions;
+using transport::interface::ProducerSocket;
+using transport::interface::ProductionProtocolAlgorithms;
+using transport::interface::RaaqmTransportOptions;
+using transport::interface::RtcTransportOptions;
+using transport::interface::RtcTransportRecoveryStrategies;
+using transport::interface::StrategyCallback;
+using transport::interface::TransportStatistics;
+
/**
* Hiperf client class: configure and setup an hicn consumer following the
* ClientConfiguration.
*/
-class HIperfClient::Impl
-#ifdef FORWARDER_INTERFACE
- : ForwarderInterface::ICallback
-#endif
-{
- typedef std::chrono::time_point<std::chrono::steady_clock> Time;
- typedef std::chrono::microseconds TimeDuration;
-
+class HIperfClient::Impl {
friend class Callback;
friend class RTCCallback;
- struct nack_packet_t {
- uint64_t timestamp;
- uint32_t prod_rate;
- uint32_t prod_seg;
-
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
-
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
-
- inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
- inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
- };
-
- public:
- Impl(const hiperf::ClientConfiguration &conf)
- : configuration_(conf),
- total_duration_milliseconds_(0),
- old_bytes_value_(0),
- old_interest_tx_value_(0),
- old_fec_interest_tx_value_(0),
- old_fec_data_rx_value_(0),
- old_lost_data_value_(0),
- old_bytes_recovered_value_(0),
- old_definitely_lost_data_value_(0),
- old_retx_value_(0),
- old_sent_int_value_(0),
- old_received_nacks_value_(0),
- old_fec_pkt_(0),
- avg_data_delay_(0),
- delay_sample_(0),
- received_bytes_(0),
- received_data_pkt_(0),
- data_delays_(""),
- signals_(io_service_),
- rtc_callback_(*this),
- callback_(*this),
- socket_(io_service_),
- done_(false),
- switch_threshold_(~0)
-#ifdef FORWARDER_INTERFACE
- ,
- forwarder_interface_(io_service_, this)
-#endif
- {
+ static inline constexpr uint16_t klog2_header_counter() { return 4; }
+ static inline constexpr uint16_t kheader_counter_mask() {
+ return (1 << klog2_header_counter()) - 1;
}
- ~Impl() {}
+ class ConsumerContext
+ : public Base<ConsumerContext, ClientConfiguration, Impl>,
+ private ConsumerSocket::ReadCallback {
+ static inline const std::size_t kmtu = HIPERF_MTU;
- void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {}
+ public:
+ using ConfType = ClientConfiguration;
+ using ParentType = typename HIperfClient::Impl;
+ static inline auto getContextType() -> std::string {
+ return "ConsumerContext";
+ }
+
+ ConsumerContext(Impl &client, int consumer_identifier)
+ : Base(client, client.io_service_, consumer_identifier),
+ receive_buffer_(
+ utils::MemBuf::create(client.config_.receive_buffer_size_)),
+ socket_(client.io_service_),
+ payload_size_max_(PayloadSize(client.config_.packet_format_)
+ .getPayloadSizeMax(RTC_HEADER_SIZE)),
+ nb_iterations_(client.config_.nb_iterations_) {}
+
+ ConsumerContext(ConsumerContext &&other) noexcept
+ : Base(std::move(other)),
+ receive_buffer_(std::move(other.receive_buffer_)),
+ socket_(std::move(other.socket_)),
+ payload_size_max_(other.payload_size_max_),
+ remote_(std::move(other.remote_)),
+ nb_iterations_(other.nb_iterations_),
+ saved_stats_(std::move(other.saved_stats_)),
+ header_counter_(other.header_counter_),
+ first_(other.first_),
+ consumer_socket_(std::move(other.consumer_socket_)),
+ producer_socket_(std::move(other.producer_socket_)) {}
+
+ ~ConsumerContext() override = default;
+
+ /***************************************************************
+ * ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
+ bool isBufferMovable() noexcept override { return false; }
- void addFace(const std::string &local_address, uint16_t local_port,
- const std::string &remote_address, uint16_t remote_port,
- std::string interface);
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer = receive_buffer_->writableData();
- void handleTimerExpiration(ConsumerSocket &c,
- const TransportStatistics &stats) {
- const char separator = ' ';
- const int width = 15;
+ if (configuration_.rtc_) {
+ *max_length = kmtu;
+ } else {
+ *max_length = configuration_.receive_buffer_size_;
+ }
+ }
- utils::TimePoint t2 = utils::SteadyClock::now();
- auto exact_duration =
- std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ // Nothing to do here
+ auto ret = std::move(buffer);
+ }
- std::stringstream interval;
- interval << total_duration_milliseconds_ / 1000 << "-"
- << total_duration_milliseconds_ / 1000 +
- exact_duration.count() / 1000;
+ void readDataAvailable(std::size_t length) noexcept override {
+ if (configuration_.rtc_) {
+ saved_stats_.received_bytes_ += length;
+ saved_stats_.received_data_pkt_++;
- std::stringstream bytes_transferred;
- bytes_transferred << std::fixed << std::setprecision(3)
- << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
- << std::setfill(separator) << "[MB]";
+ // collecting delay stats. Just for performance testing
+ auto senderTimeStamp =
+ *reinterpret_cast<uint64_t *>(receive_buffer_->writableData());
- std::stringstream bandwidth;
- bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
- (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
+ auto now = utils::SystemTime::nowMs().count();
+ auto new_delay = double(now - senderTimeStamp);
- std::stringstream window;
- window << stats.getAverageWindowSize() << std::setfill(separator)
- << "[Int]";
+ if (senderTimeStamp > now)
+ new_delay = -1 * double(senderTimeStamp - now);
- std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+ saved_stats_.delay_sample_++;
+ saved_stats_.avg_data_delay_ =
+ saved_stats_.avg_data_delay_ +
+ (double(new_delay) - saved_stats_.avg_data_delay_) /
+ saved_stats_.delay_sample_;
- if (configuration_.rtc_) {
- // we get rtc stats more often, thus we need ms in the interval
- std::stringstream interval_ms;
- interval_ms << total_duration_milliseconds_ << "-"
- << total_duration_milliseconds_ + exact_duration.count();
-
- std::stringstream lost_data;
- lost_data << stats.getLostData() - old_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream bytes_recovered_data;
- bytes_recovered_data << stats.getBytesRecoveredData() -
- old_bytes_recovered_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream definitely_lost_data;
- definitely_lost_data << stats.getDefinitelyLostData() -
- old_definitely_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream data_delay;
- data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
-
- std::stringstream received_data_pkt;
- received_data_pkt << received_data_pkt_ << std::setfill(separator)
- << "[pkt]";
-
- std::stringstream goodput;
- goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
-
- std::stringstream loss_rate;
- loss_rate << std::fixed << std::setprecision(2)
- << stats.getLossRatio() * 100.0 << std::setfill(separator)
- << "[%]";
-
- std::stringstream retx_sent;
- retx_sent << stats.getRetxCount() - old_retx_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream interest_sent;
- interest_sent << stats.getInterestTx() - old_sent_int_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream nacks;
- nacks << stats.getReceivedNacks() - old_received_nacks_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream fec_pkt;
- fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream queuing_delay;
- queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
- << "[ms]";
-
-#ifdef FORWARDER_INTERFACE
- if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
- total_duration_milliseconds_ > 1000) {
- std::cout << "Switching due to queuing delay" << std::endl;
- forwarder_interface_.createFaceAndRoutes(backup_routes_);
- forwarder_interface_.deleteFaceAndRoutes(main_routes_);
- std::swap(backup_routes_, main_routes_);
- done_ = true;
- }
-#endif
-
- // statistics not yet available in the transport
- // std::stringstream interest_fec_tx;
- // interest_fec_tx << stats.getInterestFecTxCount() -
- // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
- // std::stringstream bytes_fec_recv;
- // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
- // << std::setfill(separator) << "[bytes]";
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "RecvData";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Goodput";
- std::cout << std::left << std::setw(width) << "LossRate";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "InterestSent";
- std::cout << std::left << std::setw(width) << "ReceivedNacks";
- std::cout << std::left << std::setw(width) << "SyncWnd";
- std::cout << std::left << std::setw(width) << "MinRtt";
- std::cout << std::left << std::setw(width) << "QueuingDelay";
- std::cout << std::left << std::setw(width) << "LostData";
- std::cout << std::left << std::setw(width) << "RecoveredData";
- std::cout << std::left << std::setw(width) << "DefinitelyLost";
- std::cout << std::left << std::setw(width) << "State";
- std::cout << std::left << std::setw(width) << "DataDelay";
- std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval_ms.str();
- std::cout << std::left << std::setw(width) << received_data_pkt.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << goodput.str();
- std::cout << std::left << std::setw(width) << loss_rate.str();
- std::cout << std::left << std::setw(width) << retx_sent.str();
- std::cout << std::left << std::setw(width) << interest_sent.str();
- std::cout << std::left << std::setw(width) << nacks.str();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str();
- std::cout << std::left << std::setw(width) << queuing_delay.str();
- std::cout << std::left << std::setw(width) << lost_data.str();
- std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
- std::cout << std::left << std::setw(width) << definitely_lost_data.str();
- std::cout << std::left << std::setw(width) << stats.getCCStatus();
- std::cout << std::left << std::setw(width) << data_delay.str();
- std::cout << std::left << std::setw(width) << fec_pkt.str();
- std::cout << std::endl;
-
- if (configuration_.test_mode_) {
- if (data_delays_.size() > 0) data_delays_.pop_back();
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
- << std::endl;
- }
-
- // statistics not yet available in the transport
- // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
- // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
- } else {
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "Transfer";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "Cwnd";
- std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval.str();
- std::cout << std::left << std::setw(width) << bytes_transferred.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << stats.getRetxCount();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
- std::cout << std::endl;
- }
- total_duration_milliseconds_ += (uint32_t)exact_duration.count();
- old_bytes_value_ = stats.getBytesRecv();
- old_lost_data_value_ = stats.getLostData();
- old_bytes_recovered_value_ = stats.getBytesRecoveredData();
- old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
- old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
- old_fec_data_rx_value_ = stats.getBytesFecRecv();
- old_retx_value_ = stats.getRetxCount();
- old_sent_int_value_ = stats.getInterestTx();
- old_received_nacks_value_ = stats.getReceivedNacks();
- old_fec_pkt_ = stats.getReceivedFEC();
- delay_sample_ = 0;
- avg_data_delay_ = 0;
- received_bytes_ = 0;
- received_data_pkt_ = 0;
- data_delays_ = "";
-
- t_stats_ = utils::SteadyClock::now();
- }
+ if (configuration_.test_mode_) {
+ saved_stats_.data_delays_ += std::to_string(int(new_delay));
+ saved_stats_.data_delays_ += ",";
+ }
- bool parseConfig(const char *conf_file) {
- using namespace libconfig;
- Config cfg;
-
- try {
- cfg.readFile(conf_file);
- } catch (const FileIOException &fioex) {
- std::cerr << "I/O error while reading file." << std::endl;
- return false;
- } catch (const ParseException &pex) {
- std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
- << " - " << pex.getError() << std::endl;
- return false;
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ producer_socket_->produceDatagram(
+ configuration_.relay_name_.makeName(),
+ receive_buffer_->writableData(),
+ length < payload_size_max_ ? length : payload_size_max_);
+ }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ const uint8_t *start = receive_buffer_->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ socket_.send_to(asio::buffer(start, pkt_len), remote_);
+ }
+ }
}
- Setting &config = cfg.getRoot();
+ size_t maxBufferSize() const override {
+ return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_;
+ }
- if (config.exists("switch_threshold")) {
- unsigned threshold;
- config.lookupValue("switch_threshold", threshold);
- switch_threshold_ = threshold;
+ void readError(const std::error_code &ec) noexcept override {
+ getOutputStream() << "Error " << ec.message()
+ << " while reading from socket" << std::endl;
+ parent_.io_service_.stop();
}
- // listeners
- if (config.exists("listeners")) {
- // get path where looking for modules
- const Setting &listeners = config.lookup("listeners");
- auto count = listeners.getLength();
+ void readSuccess(std::size_t total_size) noexcept override {
+ if (configuration_.rtc_) {
+ getOutputStream() << "Data successfully read" << std::endl;
+ } else {
+ auto t2 = utils::SteadyTime::now();
+ auto dt =
+ utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2);
+ auto usec = dt.count();
- for (int i = 0; i < count; i++) {
- const Setting &listener = listeners[i];
- ListenerConfig list;
- unsigned port;
- std::string interface;
+ getOutputStream() << "Content retrieved. Size: " << total_size
+ << " [Bytes]" << std::endl;
- list.name = listener.getName();
- listener.lookupValue("local_address", list.address);
- listener.lookupValue("local_port", port);
- listener.lookupValue("interface", list.interface);
- list.port = (uint16_t)(port);
+ getOutputStream() << "Elapsed Time: " << usec / 1000000.0
+ << " seconds -- "
+ << double(total_size * 8) * 1.0 / double(usec) * 1.0
+ << " [Mbps]" << std::endl;
- std::cout << "Adding listener " << list.name << ", ( " << list.address
- << ":" << list.port << ")" << std::endl;
- config_.addListener(std::move(list));
+ parent_.io_service_.stop();
}
}
- // connectors
- if (config.exists("connectors")) {
- // get path where looking for modules
- const Setting &connectors = config.lookup("connectors");
- auto count = connectors.getLength();
-
- for (int i = 0; i < count; i++) {
- const Setting &connector = connectors[i];
- ConnectorConfig conn;
-
- conn.name = connector.getName();
- unsigned port = 0;
-
- if (!connector.lookupValue("local_address", conn.local_address)) {
- conn.local_address = "";
- }
+ /***************************************************************
+ * End of ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- if (!connector.lookupValue("local_port", port)) {
- port = 0;
- }
-
- conn.local_port = (uint16_t)(port);
+ private:
+ struct SavedStatistics {
+ utils::SteadyTime::TimePoint t_stats_{};
+ utils::SteadyTime::TimePoint t_download_{};
+ uint32_t total_duration_milliseconds_{0};
+ uint64_t old_bytes_value_{0};
+ uint64_t old_interest_tx_value_{0};
+ uint64_t old_fec_interest_tx_value_{0};
+ uint64_t old_fec_data_rx_value_{0};
+ uint64_t old_lost_data_value_{0};
+ uint64_t old_bytes_recovered_value_{0};
+ uint64_t old_definitely_lost_data_value_{0};
+ uint64_t old_retx_value_{0};
+ uint64_t old_sent_int_value_{0};
+ uint64_t old_received_nacks_value_{0};
+ uint32_t old_fec_pkt_{0};
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_{0};
+ uint32_t delay_sample_{0};
+ uint32_t received_bytes_{0};
+ uint32_t received_data_pkt_{0};
+ uint32_t auth_alerts_{0};
+ std::string data_delays_{""};
+ };
+
+ /***************************************************************
+ * Transport callbacks
+ ***************************************************************/
+
+ void checkReceivedRtcContent(
+ [[maybe_unused]] const ConsumerSocket &c,
+ [[maybe_unused]] const transport::core::ContentObject &content_object)
+ const {
+ // Nothing to do here
+ }
+
+ void processLeavingInterest(
+ const ConsumerSocket & /*c*/,
+ const transport::core::Interest & /*interest*/) const {
+ // Nothing to do here
+ }
+
+ transport::auth::VerificationPolicy onAuthFailed(
+ transport::auth::Suffix /*suffix*/,
+ transport::auth::VerificationPolicy /*policy*/) {
+ saved_stats_.auth_alerts_++;
+ return transport::auth::VerificationPolicy::ACCEPT;
+ }
+
+ void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 18;
+
+ utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
+ auto exact_duration =
+ utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2);
- if (!connector.lookupValue("remote_address", conn.remote_address)) {
- std::cerr
- << "Error in configuration file: remote_address is a mandatory "
- "field of Connectors."
- << std::endl;
- return false;
+ std::stringstream interval_ms;
+ interval_ms << saved_stats_.total_duration_milliseconds_ << "-"
+ << saved_stats_.total_duration_milliseconds_ +
+ exact_duration.count();
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) /
+ 1000000.0
+ << std::setfill(separator);
+
+ std::stringstream bandwidth;
+ bandwidth << (double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) *
+ 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator);
+
+ std::stringstream avg_rtt;
+ avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt()
+ << std::setfill(separator);
+
+ if (configuration_.rtc_) {
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ saved_stats_.old_bytes_recovered_value_
+ << std::setfill(separator);
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ saved_stats_.old_definitely_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream data_delay;
+ data_delay << std::fixed << std::setprecision(3)
+ << saved_stats_.avg_data_delay_ << std::setfill(separator);
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << saved_stats_.received_data_pkt_
+ << std::setfill(separator);
+
+ std::stringstream goodput;
+ goodput << std::fixed << std::setprecision(3)
+ << (saved_stats_.received_bytes_ * 8.0) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator);
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_
+ << std::setfill(separator);
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() -
+ saved_stats_.old_sent_int_value_
+ << std::setfill(separator);
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() -
+ saved_stats_.old_received_nacks_value_
+ << std::setfill(separator);
+
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_
+ << std::setfill(separator);
+
+ std::stringstream queuing_delay;
+ queuing_delay << std::fixed << std::setprecision(3)
+ << stats.getQueuingDelay() << std::setfill(separator);
+
+ std::stringstream residual_losses;
+ double rl_perc = stats.getResidualLossRate() * 100;
+ residual_losses << std::fixed << std::setprecision(2) << rl_perc
+ << std::setfill(separator);
+
+ std::stringstream quality_score;
+ quality_score << std::fixed << (int)stats.getQualityScore()
+ << std::setfill(separator);
+
+ std::stringstream alerts;
+ alerts << stats.getAlerts() << std::setfill(separator);
+
+ std::stringstream auth_alerts;
+ auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator);
+
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecvData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Goodput[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "LossRate[%]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "InterestSent";
+ getOutputStream()
+ << std::right << std::setw(width) << "ReceivedNacks";
+ getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "QueuingDelay[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "LostData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecoveredData";
+ getOutputStream()
+ << std::right << std::setw(width) << "DefinitelyLost";
+ getOutputStream() << std::right << std::setw(width) << "State";
+ getOutputStream()
+ << std::right << std::setw(width) << "DataDelay[ms]";
+ getOutputStream() << std::right << std::setw(width) << "FecPkt";
+ getOutputStream() << std::right << std::setw(width) << "Congestion";
+ getOutputStream()
+ << std::right << std::setw(width) << "ResidualLosses";
+ getOutputStream() << std::right << std::setw(width) << "QualityScore";
+ getOutputStream() << std::right << std::setw(width) << "Alerts";
+ getOutputStream()
+ << std::right << std::setw(width) << "AuthAlerts" << std::endl;
+
+ first_ = false;
}
- if (!connector.lookupValue("remote_port", port)) {
- std::cerr << "Error in configuration file: remote_port is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << received_data_pkt.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width) << goodput.str();
+ getOutputStream() << std::right << std::setw(width) << loss_rate.str();
+ getOutputStream() << std::right << std::setw(width) << retx_sent.str();
+ getOutputStream() << std::right << std::setw(width)
+ << interest_sent.str();
+ getOutputStream() << std::right << std::setw(width) << nacks.str();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << queuing_delay.str();
+ getOutputStream() << std::right << std::setw(width) << lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_recovered_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << definitely_lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getCCStatus();
+ getOutputStream() << std::right << std::setw(width) << data_delay.str();
+ getOutputStream() << std::right << std::setw(width) << fec_pkt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.isCongested();
+ getOutputStream() << std::right << std::setw(width)
+ << residual_losses.str();
+ getOutputStream() << std::right << std::setw(width)
+ << quality_score.str();
+ getOutputStream() << std::right << std::setw(width) << alerts.str();
+ getOutputStream() << std::right << std::setw(width) << auth_alerts.str()
+ << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (saved_stats_.data_delays_.size() > 0)
+ saved_stats_.data_delays_.pop_back();
+
+ auto now = utils::SteadyTime::nowMs();
+ getOutputStream() << std::fixed << std::setprecision(0) << now.count()
+ << " DATA-DELAYS:[" << saved_stats_.data_delays_
+ << "]" << std::endl;
}
-
- if (!connector.lookupValue("interface", conn.interface)) {
- std::cerr << "Error in configuration file: interface is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ } else {
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream() << std::right << std::setw(width) << "Transfer[MB]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]";
+ getOutputStream()
+ << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl;
+
+ first_ = false;
}
- conn.remote_port = (uint16_t)(port);
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_transferred.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getRetxCount();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str()
+ << std::endl;
+ }
- std::cout << "Adding connector " << conn.name << ", ("
- << conn.local_address << ":" << conn.local_port << " "
- << conn.remote_address << ":" << conn.remote_port << ")"
- << std::endl;
- config_.addConnector(conn.name, std::move(conn));
+ saved_stats_.total_duration_milliseconds_ +=
+ (uint32_t)exact_duration.count();
+ saved_stats_.old_bytes_value_ = stats.getBytesRecv();
+ saved_stats_.old_lost_data_value_ = stats.getLostData();
+ saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ saved_stats_.old_definitely_lost_data_value_ =
+ stats.getDefinitelyLostData();
+ saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ saved_stats_.old_retx_value_ = stats.getRetxCount();
+ saved_stats_.old_sent_int_value_ = stats.getInterestTx();
+ saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks();
+ saved_stats_.old_fec_pkt_ = stats.getReceivedFEC();
+ saved_stats_.delay_sample_ = 0;
+ saved_stats_.avg_data_delay_ = 0;
+ saved_stats_.received_bytes_ = 0;
+ saved_stats_.received_data_pkt_ = 0;
+ saved_stats_.data_delays_ = "";
+ saved_stats_.t_stats_ = utils::SteadyTime::Clock::now();
+
+ header_counter_ = (header_counter_ + 1) & kheader_counter_mask();
+
+ if (--nb_iterations_ == 0) {
+ // We reached the maximum nb of runs. Stop now.
+ parent_.io_service_.stop();
}
}
- // Routes
- if (config.exists("routes")) {
- const Setting &routes = config.lookup("routes");
- auto count = routes.getLength();
+ /***************************************************************
+ * Setup functions
+ ***************************************************************/
- for (int i = 0; i < count; i++) {
- const Setting &route = routes[i];
- RouteConfig r;
- unsigned weight;
+ int setupRTCSocket() {
+ int ret = ERROR_SUCCESS;
- r.name = route.getName();
- route.lookupValue("prefix", r.prefix);
- route.lookupValue("weight", weight);
- route.lookupValue("main_connector", r.main_connector);
- route.lookupValue("backup_connector", r.backup_connector);
- r.weight = (uint16_t)(weight);
+ configuration_.transport_protocol_ = transport::interface::RTC;
- std::cout << "Adding route " << r.name << " " << r.prefix << " ("
- << r.main_connector << " " << r.backup_connector << " "
- << r.weight << ")" << std::endl;
- config_.addRoute(std::move(r));
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ =
+ std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ producer_socket_->start();
}
- }
- std::cout << "Ok" << std::endl;
-
- return true;
- }
-
- bool splitRoute(std::string route, std::string &prefix,
- uint8_t &prefix_length) {
- std::string delimiter = "/";
-
- size_t pos = 0;
- if ((pos = route.find(delimiter)) != std::string::npos) {
- prefix = route.substr(0, pos);
- route.erase(0, pos + delimiter.length());
- } else {
- return false;
- }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
- prefix_length = std::stoul(route.substr(0));
- return true;
- }
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
-#ifdef FORWARDER_INTERFACE
- void onHicnServiceReady() override {
- std::cout << "Successfully connected to local forwarder!" << std::endl;
+ RtcTransportRecoveryStrategies recovery_strategy =
+ RtcTransportRecoveryStrategies::RTX_ONLY;
+ switch (configuration_.recovery_strategy_) {
+ case 1:
+ recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF;
+ break;
+ case 2:
+ recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY;
+ break;
+ case 3:
+ recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY;
+ break;
+ case 4:
+ recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED;
+ break;
+ case 5:
+ recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE;
+ break;
+ case 6:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH;
+ break;
+ case 7:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION;
+ break;
+ case 8:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES;
+ break;
+ case 9:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES;
+ break;
+ case 10:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH;
+ break;
+ case 11:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION;
+ break;
+ default:
+ break;
+ }
- std::cout << "Setting up listeners" << std::endl;
- const char *config = getenv("FORWARDER_CONFIG");
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ static_cast<uint32_t>(recovery_strategy));
- if (config) {
- if (!parseConfig(config)) {
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create faces and route using first face in the list.
- auto &routes = config_.getRoutes();
- auto &connectors = config_.getConnectors();
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- if (routes.size() == 0 || connectors.size() == 0) {
- std::cerr << "Nothing to configure" << std::endl;
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- for (auto &route : routes) {
- auto the_connector_it = connectors.find(route.main_connector);
- if (the_connector_it == connectors.end()) {
- std::cerr << "No valid main connector found for route " << route.name
- << std::endl;
- continue;
- }
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::CONTENT_SHARING_MODE,
+ configuration_.content_sharing_mode_);
- auto &the_connector = the_connector_it->second;
- auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
- route_info->family = AF_INET;
- route_info->local_addr = the_connector.local_address;
- route_info->local_port = the_connector.local_port;
- route_info->remote_addr = the_connector.remote_address;
- route_info->remote_port = the_connector.remote_port;
- route_info->interface = the_connector.interface;
- route_info->name = the_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- route_info->route_addr = prefix;
- route_info->route_len = prefix_length;
-
- main_routes_.emplace_back(route_info);
-
- if (!route.backup_connector.empty()) {
- // Add also the backup route
- auto the_backup_connector_it =
- connectors.find(route.backup_connector);
- if (the_backup_connector_it == connectors.end()) {
- std::cerr << "No valid backup connector found for route "
- << route.name << std::endl;
- continue;
- }
-
- auto &the_backup_connector = the_backup_connector_it->second;
- auto backup_route_info =
- std::make_shared<ForwarderInterface::RouteInfo>();
- backup_route_info->family = AF_INET;
- backup_route_info->local_addr = the_backup_connector.local_address;
- backup_route_info->local_port = the_backup_connector.local_port;
- backup_route_info->remote_addr = the_backup_connector.remote_address;
- backup_route_info->remote_port = the_backup_connector.remote_port;
- backup_route_info->interface = the_backup_connector.interface;
- backup_route_info->name = the_backup_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
-
- backup_route_info->route_addr = prefix;
- backup_route_info->route_len = prefix_length;
-
- backup_routes_.emplace_back(backup_route_info);
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (transport::interface::ConsumerContentObjectCallback)std::bind(
+ &Impl::ConsumerContext::checkReceivedRtcContent, this,
+ std::placeholders::_1, std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create main routes
- std::cout << "Creating main routes" << std::endl;
- forwarder_interface_.createFaceAndRoutes(main_routes_);
- }
- }
+ std::shared_ptr<TransportStatistics> transport_stats;
+ ret = consumer_socket_->getSocketOption(
+ transport::interface::OtherOptions::STATISTICS,
+ (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
- void onRouteConfigured(
- std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
- std::cout << "Routes successfully configured!" << std::endl;
- }
-#endif
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int setup() {
- int ret;
-
- if (configuration_.rtc_) {
- configuration_.transport_protocol_ = RTC;
- } else if (configuration_.window < 0) {
- configuration_.transport_protocol_ = RAAQM;
- } else {
- configuration_.transport_protocol_ = CBR;
+ return ERROR_SUCCESS;
}
- if (configuration_.relay_ && configuration_.rtc_) {
- int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
- producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- producer_socket_->registerPrefix(configuration_.relay_name_);
- producer_socket_->connect();
- }
+ int setupRAAQMSocket() {
+ int ret = ERROR_SUCCESS;
- if (configuration_.output_stream_mode_ && configuration_.rtc_) {
- remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
- socket_.open(asio::ip::udp::v4());
- }
+ configuration_.transport_protocol_ = transport::interface::RAAQM;
- if (configuration_.secure_) {
- consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
- RAAQM, configuration_.transport_protocol_);
- if (configuration_.producer_prefix_.getPrefixLength() == 0) {
- std::cerr << "ERROR -- Missing producer prefix on which perform the "
- "handshake."
- << std::endl;
- } else {
- P2PSecureConsumerSocket &secure_consumer_socket =
- *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
- secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
- }
- } else {
consumer_socket_ =
std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- }
- consumer_socket_->setSocketOption(
- GeneralTransportOptions::INTEREST_LIFETIME,
- configuration_.interest_lifetime_);
-
-#if defined(DEBUG) && defined(__linux__)
- std::shared_ptr<transport::BasePortal> portal;
- consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- signals_ =
- std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
- signals_->async_wait([this](const std::error_code &, const int &) {
- std::cout << "Signal SIGUSR1!" << std::endl;
- mtrace();
- });
-#endif
-
- if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
- configuration_.window) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the size of the window."
- << std::endl;
- return ERROR_SETUP;
- }
-
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.beta != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
- configuration_.beta) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.beta_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::BETA_VALUE, configuration_.beta_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.drop_factor != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- configuration_.drop_factor) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.drop_factor_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (!configuration_.producer_certificate.empty()) {
- std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
- configuration_.producer_certificate);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Verifier> verifier =
- std::make_shared<SymmetricVerifier>(configuration_.passphrase);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
- }
+ int setupCBRSocket() {
+ configuration_.transport_protocol_ = transport::interface::CBR;
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.rtc_) {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
- } else {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
- }
+ public:
+ int setup() {
+ int ret;
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::VoidVerifier>();
+
+ if (configuration_.rtc_) {
+ ret = setupRTCSocket();
+ } else if (configuration_.window_ < 0) {
+ ret = setupRAAQMSocket();
+ } else {
+ ret = setupCBRSocket();
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
- if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- (ConsumerContentObjectCallback)std::bind(
- &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
- std::placeholders::_2));
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- }
-
- if (configuration_.rtc_) {
- std::shared_ptr<TransportStatistics> transport_stats;
- consumer_socket_->getSocketOption(
- OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
- transport_stats->setAlpha(0.0);
- }
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::STATS_SUMMARY,
- (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ configuration_.manifest_factor_relevant_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ configuration_.manifest_factor_alert_);
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::STATS_INTERVAL,
- configuration_.report_interval_milliseconds_) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- consumer_socket_->connect();
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::PACKET_FORMAT,
+ configuration_.packet_format_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- return ERROR_SUCCESS;
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int run() {
- std::cout << "Starting download of " << configuration_.name << std::endl;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- signals_.add(SIGINT);
- signals_.async_wait(
- [this](const std::error_code &, const int &) { io_service_.stop(); });
+ ret = consumer_socket_->setSocketOption(
+ transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream()
+ << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name);
- io_service_.run();
+ if (!configuration_.producer_certificate_.empty()) {
+ verifier = std::make_shared<transport::auth::AsymmetricVerifier>(
+ configuration_.producer_certificate_);
+ }
- consumer_socket_->stop();
+ if (!configuration_.passphrase_.empty()) {
+ verifier = std::make_shared<transport::auth::SymmetricVerifier>(
+ configuration_.passphrase_);
+ }
- return ERROR_SUCCESS;
- }
+ verifier->setVerificationFailedCallback(
+ std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this,
+ std::placeholders::_1, std::placeholders::_2));
- private:
- class RTCCallback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t mtu = 1500;
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- public:
- RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
- }
+ // Signer for aggregatd interests
+ std::shared_ptr<transport::auth::Signer> signer =
+ std::make_shared<transport::auth::VoidSigner>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ signer = std::make_shared<transport::auth::SymmetricSigner>(
+ transport::auth::CryptoSuite::HMAC_SHA256,
+ configuration_.aggr_interest_passphrase_);
+ }
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- bool isBufferMovable() noexcept override { return false; }
+ if (configuration_.aggregated_interests_) {
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_INTERESTS, true);
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = mtu;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+ }
- void readDataAvailable(std::size_t length) noexcept override {
- client_.received_bytes_ += length;
- client_.received_data_pkt_++;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (transport::interface::ConsumerInterestCallback)std::bind(
+ &ConsumerContext::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
- // collecting delay stats. Just for performance testing
- uint64_t *senderTimeStamp =
- (uint64_t *)client_.configuration_.receive_buffer->writableData();
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- double new_delay = (double)(now - *senderTimeStamp);
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, this);
- if (*senderTimeStamp > now)
- new_delay = -1 * (double)(*senderTimeStamp - now);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- client_.delay_sample_++;
- client_.avg_data_delay_ =
- client_.avg_data_delay_ +
- (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (transport::interface::ConsumerTimerCallback)std::bind(
+ &Impl::ConsumerContext::handleTimerExpiration, this,
+ std::placeholders::_1, std::placeholders::_2));
- if (client_.configuration_.test_mode_) {
- client_.data_delays_ += std::to_string(int(new_delay));
- client_.data_delays_ += ",";
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- if (client_.configuration_.relay_) {
- client_.producer_socket_->produceDatagram(
- client_.configuration_.relay_name_.getName(),
- client_.configuration_.receive_buffer->writableData(),
- length < 1400 ? length : 1400);
- }
- if (client_.configuration_.output_stream_mode_) {
- uint8_t *start =
- (uint8_t *)client_.configuration_.receive_buffer->writableData();
- start += sizeof(uint64_t);
- std::size_t pkt_len = length - sizeof(uint64_t);
- client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- }
- size_t maxBufferSize() const override { return mtu; }
+ consumer_socket_->connect();
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error while reading from RTC socket" << std::endl;
- client_.io_service_.stop();
+ return ERROR_SUCCESS;
}
- void readSuccess(std::size_t total_size) noexcept override {
- std::cout << "Data successfully read" << std::endl;
- }
+ /***************************************************************
+ * Run functions
+ ***************************************************************/
- private:
- Impl &client_;
- };
+ int run() {
+ getOutputStream() << "Starting download of " << flow_name_ << std::endl;
- class Callback : public ConsumerSocket::ReadCallback {
- public:
- Callback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer =
- utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ saved_stats_.t_download_ = saved_stats_.t_stats_ =
+ utils::SteadyTime::now();
+ consumer_socket_->consume(flow_name_);
+
+ return ERROR_SUCCESS;
}
- bool isBufferMovable() noexcept override { return false; }
+ // Members initialized by the constructor
+ std::shared_ptr<utils::MemBuf> receive_buffer_;
+ asio::ip::udp::socket socket_;
+ std::size_t payload_size_max_;
+ asio::ip::udp::endpoint remote_;
+ std::uint32_t nb_iterations_;
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = client_.configuration_.receive_buffer_size_;
- }
+ // Members initialized by in-class initializer
+ SavedStatistics saved_stats_{};
+ uint16_t header_counter_{0};
+ bool first_{true};
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+ };
- void readDataAvailable(std::size_t length) noexcept override {}
+ public:
+ explicit Impl(const hiperf::ClientConfiguration &conf)
+ : config_(conf), signals_(io_service_) {}
- void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+ virtual ~Impl() = default;
- size_t maxBufferSize() const override {
- return client_.configuration_.receive_buffer_size_;
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
}
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error " << ec.message() << " while reading from socket"
- << std::endl;
- client_.io_service_.stop();
- }
+ consumer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = consumer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
- void readSuccess(std::size_t total_size) noexcept override {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
- long usec = (long)dt.count();
+ if (ret) {
+ break;
+ }
+ }
- std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
- << std::endl;
+ return ret;
+ }
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
+ int run() {
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
- client_.io_service_.stop();
+ for (auto &consumer_context : consumer_contexts_) {
+ consumer_context.run();
}
- private:
- Impl &client_;
- };
+ io_service_.run();
+
+ return ERROR_SUCCESS;
+ }
- hiperf::ClientConfiguration configuration_;
- Time t_stats_;
- Time t_download_;
- uint32_t total_duration_milliseconds_;
- uint64_t old_bytes_value_;
- uint64_t old_interest_tx_value_;
- uint64_t old_fec_interest_tx_value_;
- uint64_t old_fec_data_rx_value_;
- uint64_t old_lost_data_value_;
- uint64_t old_bytes_recovered_value_;
- uint64_t old_definitely_lost_data_value_;
- uint32_t old_retx_value_;
- uint32_t old_sent_int_value_;
- uint32_t old_received_nacks_value_;
- uint32_t old_fec_pkt_;
-
- // IMPORTANT: to be used only for performance testing, when consumer and
- // producer are synchronized. Used for rtc only at the moment
- double avg_data_delay_;
- uint32_t delay_sample_;
-
- uint32_t received_bytes_;
- uint32_t received_data_pkt_;
-
- std::string data_delays_;
+ ClientConfiguration &getConfig() { return config_; }
+ private:
asio::io_service io_service_;
+ hiperf::ClientConfiguration config_;
asio::signal_set signals_;
- RTCCallback rtc_callback_;
- Callback callback_;
- std::unique_ptr<ConsumerSocket> consumer_socket_;
- std::unique_ptr<ProducerSocket> producer_socket_;
- asio::ip::udp::socket socket_;
- asio::ip::udp::endpoint remote_;
-
- ForwarderConfiguration config_;
- uint16_t switch_threshold_; /* ms */
- bool done_;
- std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
- std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
-#ifdef FORWARDER_INTERFACE
- ForwarderInterface forwarder_interface_;
-#endif
+ std::vector<ConsumerContext> consumer_contexts_;
};
-HIperfClient::HIperfClient(const ClientConfiguration &conf) {
- impl_ = new Impl(conf);
-}
+HIperfClient::HIperfClient(const ClientConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfClient::~HIperfClient() { delete impl_; }
+HIperfClient::~HIperfClient() = default;
-int HIperfClient::setup() { return impl_->setup(); }
+int HIperfClient::setup() const { return impl_->setup(); }
-void HIperfClient::run() { impl_->run(); }
+void HIperfClient::run() const { impl_->run(); }
} // namespace hiperf
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
index f45b9af43..beecbd473 100644
--- a/apps/hiperf/src/client.h
+++ b/apps/hiperf/src/client.h
@@ -16,19 +16,21 @@
#pragma once
#include <common.h>
+#include <hicn/transport/utils/noncopyable.h>
namespace hiperf {
-class HIperfClient {
+class HIperfClient : public ::utils::NonCopyable {
public:
- HIperfClient(const ClientConfiguration &conf);
+ explicit HIperfClient(const ClientConfiguration &conf);
+
~HIperfClient();
- int setup();
- void run();
+ int setup() const;
+ void run() const;
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
-} // namespace hiperf \ No newline at end of file
+} // namespace hiperf
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
index e6ba526f9..0f96bef1f 100644
--- a/apps/hiperf/src/common.h
+++ b/apps/hiperf/src/common.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,27 +15,28 @@
#pragma once
-#include <hicn/transport/auth/identity.h>
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/interfaces/global_conf_interface.h>
-#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/color.h>
#include <hicn/transport/utils/literals.h>
#ifndef _WIN32
#include <hicn/transport/utils/daemonizator.h>
#endif
+#include <hicn/apps/utils/logger.h>
+
#include <asio.hpp>
#include <cmath>
#include <fstream>
#include <iomanip>
+#include <iostream>
#include <sstream>
#include <string>
#include <unordered_set>
@@ -43,43 +44,151 @@
#ifndef ERROR_SUCCESS
#define ERROR_SUCCESS 0
#endif
-#define ERROR_SETUP -5
-#define MIN_PROBE_SEQ 0xefffffff
-
-using namespace transport::interface;
-using namespace transport::auth;
-using namespace transport::core;
-
-static inline uint64_t _ntohll(const uint64_t *input) {
- uint64_t return_val;
- uint8_t *tmp = (uint8_t *)&return_val;
-
- tmp[0] = *input >> 56;
- tmp[1] = *input >> 48;
- tmp[2] = *input >> 40;
- tmp[3] = *input >> 32;
- tmp[4] = *input >> 24;
- tmp[5] = *input >> 16;
- tmp[6] = *input >> 8;
- tmp[7] = *input >> 0;
-
- return return_val;
-}
+static constexpr int ERROR_SETUP = -5;
+static constexpr uint32_t MIN_PROBE_SEQ = 0xefffffff;
+static constexpr uint32_t RTC_HEADER_SIZE = 12;
+static constexpr uint32_t FEC_HEADER_MAX_SIZE = 36;
+static constexpr uint32_t HIPERF_MTU = 1500;
+
+namespace hiperf {
+
+using transport::core::Packet;
+using transport::core::Prefix;
+
+/**
+ * Logger
+ */
+template <typename D, typename ConfType, typename ParentType>
+class Base : public std::stringbuf, public std::ostream {
+ protected:
+ static inline const char separator[] = "| ";
+
+ Base(ParentType &parent, asio::io_service &io_service, int identifier)
+ : std::stringbuf(),
+ std::ostream(this),
+ parent_(parent),
+ configuration_(parent_.getConfig()),
+ io_service_(io_service),
+ identifier_(identifier),
+ name_id_(D::getContextType() + std::to_string(identifier_)),
+ flow_name_(configuration_.name_.makeNameWithIndex(identifier_)) {
+ std::stringstream begin;
+ std::stringstream end;
+ if (configuration_.colored_) {
+ begin << color_mod_ << bold_mod_;
+ end << end_mod_;
+ } else {
+ begin << "";
+ end << "";
+ }
+
+ begin << "|" << name_id_ << separator;
+ begin_ = begin.str();
+ end_ = end.str();
+ }
+
+ Base(Base &&other) noexcept
+ : parent_(other.parent_),
+ configuration_(other.configuration_),
+ io_service_(other.io_service_),
+ identifier_(other.identifier_),
+ name_id_(std::move(other.name_id_)),
+ flow_name_(other.flow_name_) {}
+
+ ~Base() {}
+
+ /***************************************************************
+ * std::stringbuf sync override
+ ***************************************************************/
+
+ int sync() override {
+ auto string = str();
+ asio::post(io_service_,
+ [this, string]() { LoggerInfo() << begin_ << string << end_; });
+ str("");
+
+ return 0;
+ }
+
+ std::ostream &getOutputStream() { return *this; }
-static inline uint64_t _htonll(const uint64_t *input) {
- return (_ntohll(input));
+ // Members initialized by the constructor
+ ParentType &parent_;
+ ConfType &configuration_;
+ asio::io_service &io_service_;
+ int identifier_;
+ std::string name_id_;
+ transport::core::Name flow_name_;
+ std::string begin_;
+ std::string end_;
+
+ // Members initialized by the in-class initializer
+ utils::ColorModifier color_mod_;
+ utils::ColorModifier bold_mod_{utils::ColorModifier::Code::BOLD};
+ utils::ColorModifier end_mod_{utils::ColorModifier::Code::RESET};
+};
+
+static inline int ensureFlows(const Prefix &prefix, std::size_t flows) {
+ int ret = ERROR_SUCCESS;
+
+ // Make sure the provided prefix length not allows to accomodate the
+ // provided number of flows.
+ uint16_t max_ip_addr_len_bits;
+ uint16_t log2_n_flow;
+ u64 max_n_flow;
+ if (prefix.getAddressFamily() == AF_INET) {
+ max_ip_addr_len_bits = IPV4_ADDR_LEN_BITS;
+ } else if (prefix.getAddressFamily() == AF_INET6) {
+ max_ip_addr_len_bits = IPV6_ADDR_LEN_BITS;
+ } else {
+ LoggerErr() << "Error: unknown address family.";
+ ret = ERROR_SETUP;
+ goto END;
+ }
+
+ log2_n_flow = max_ip_addr_len_bits - prefix.getPrefixLength();
+ max_n_flow = log2_n_flow < 64 ? (1 << log2_n_flow) : ~0ULL;
+
+ if (flows > max_n_flow) {
+ LoggerErr() << "Error: the provided prefix length does not allow to "
+ "accomodate the provided number of flows ("
+ << flows << " > " << max_n_flow << ").";
+ ret = ERROR_SETUP;
+ }
+
+END:
+ return ret;
}
-namespace hiperf {
+/**
+ * Class to retrieve the maximum payload size given the MTU and packet headers.
+ */
+class PayloadSize {
+ public:
+ PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU)
+ : mtu_(mtu), format_(format) {}
+
+ std::size_t getPayloadSizeMax(std::size_t transport_size = 0,
+ std::size_t fec_size = 0,
+ std::size_t signature_size = 0) {
+ return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) -
+ transport_size - fec_size;
+ }
+
+ private:
+ std::size_t mtu_;
+ Packet::Format format_;
+};
/**
* Class for handling the production rate for the RTC producer.
*/
class Rate {
public:
- Rate() : rate_kbps_(0) {}
+ Rate() {}
+ ~Rate() {}
- Rate(const std::string &rate) {
+ explicit Rate(const std::string &rate) {
std::size_t found = rate.find("kbps");
if (found != std::string::npos) {
rate_kbps_ = std::stof(rate.substr(0, found));
@@ -107,7 +216,7 @@ class Rate {
}
private:
- float rate_kbps_;
+ float rate_kbps_ = 0.0;
};
struct packet_t {
@@ -115,103 +224,71 @@ struct packet_t {
uint32_t size;
};
+struct Configuration {
+ Prefix name_{"b001::abcd/64"};
+ std::string passphrase_;
+ std::string aggr_interest_passphrase_;
+ bool rtc_{false};
+ uint16_t port_{0};
+ bool aggregated_data_{false};
+ Packet::Format packet_format_{
+ transport::interface::default_values::packet_format};
+ uint32_t parallel_flows_{1};
+ bool colored_{true};
+};
+
/**
* Container for command line configuration for hiperf client.
*/
-struct ClientConfiguration {
- ClientConfiguration()
- : name("b001::abcd", 0),
- beta(-1.f),
- drop_factor(-1.f),
- window(-1),
- producer_certificate(""),
- passphrase(""),
- receive_buffer(nullptr),
- receive_buffer_size_(128 * 1024),
- download_size(0),
- report_interval_milliseconds_(1000),
- transport_protocol_(CBR),
- rtc_(false),
- test_mode_(false),
- relay_(false),
- secure_(false),
- producer_prefix_(),
- interest_lifetime_(500),
- relay_name_("c001::abcd/64"),
- output_stream_mode_(false),
- port_(0) {}
-
- Name name;
- double beta;
- double drop_factor;
- double window;
- std::string producer_certificate;
- std::string passphrase;
- std::shared_ptr<utils::MemBuf> receive_buffer;
- std::size_t receive_buffer_size_;
- std::size_t download_size;
- std::uint32_t report_interval_milliseconds_;
- TransportProtocolAlgorithms transport_protocol_;
- bool rtc_;
- bool test_mode_;
- bool relay_;
- bool secure_;
+struct ClientConfiguration : Configuration {
+ double beta_{-1.f};
+ double drop_factor_{-1.f};
+ double window_{-1.f};
+ std::string producer_certificate_;
+ std::size_t receive_buffer_size_{128 * 1024};
+ std::uint32_t report_interval_milliseconds_{1000};
+ transport::interface::TransportProtocolAlgorithms transport_protocol_{
+ transport::interface::CBR};
+ bool test_mode_{false};
+ bool relay_{false};
Prefix producer_prefix_;
- uint32_t interest_lifetime_;
- Prefix relay_name_;
- bool output_stream_mode_;
- uint16_t port_;
+ uint32_t interest_lifetime_{500};
+ uint32_t manifest_factor_relevant_{100};
+ uint32_t manifest_factor_alert_{20};
+ Prefix relay_name_{"c001::abcd/64"};
+ bool output_stream_mode_{false};
+ uint32_t recovery_strategy_{4};
+ bool print_headers_{true};
+ std::uint32_t nb_iterations_{
+ std::numeric_limits<decltype(nb_iterations_)>::max()};
+ bool content_sharing_mode_{false};
+ bool aggregated_interests_{false};
};
/**
* Container for command line configuration for hiperf server.
*/
-struct ServerConfiguration {
- ServerConfiguration()
- : name("b001::abcd/64"),
- virtual_producer(true),
- manifest(false),
- live_production(false),
- content_lifetime(600000000_U32),
- download_size(20 * 1024 * 1024),
- hash_algorithm(CryptoHashType::SHA256),
- keystore_name(""),
- passphrase(""),
- keystore_password("cisco"),
- multiphase_produce_(false),
- rtc_(false),
- interactive_(false),
- trace_based_(false),
- trace_index_(0),
- trace_file_(nullptr),
- production_rate_(std::string("2048kbps")),
- payload_size_(1400),
- secure_(false),
- input_stream_mode_(false),
- port_(0) {}
-
- Prefix name;
- bool virtual_producer;
- bool manifest;
- bool live_production;
- std::uint32_t content_lifetime;
- std::uint32_t download_size;
- CryptoHashType hash_algorithm;
- std::string keystore_name;
- std::string passphrase;
- std::string keystore_password;
- bool multiphase_produce_;
- bool rtc_;
- bool interactive_;
- bool trace_based_;
- std::uint32_t trace_index_;
- char *trace_file_;
- Rate production_rate_;
- std::size_t payload_size_;
- bool secure_;
- bool input_stream_mode_;
- uint16_t port_;
+struct ServerConfiguration : Configuration {
+ bool virtual_producer_{true};
+ std::uint32_t manifest_max_capacity_{0};
+ bool live_production_{false};
+ std::uint32_t content_lifetime_{
+ transport::interface::default_values::content_object_expiry_time};
+ std::uint32_t download_size_{20 * 1024 * 1024};
+ transport::auth::CryptoHashType hash_algorithm_{
+ transport::auth::CryptoHashType::SHA256};
+ std::string keystore_name_;
+ std::string keystore_password_{"cisco"};
+ bool multiphase_produce_{false};
+ bool interactive_{false};
+ bool trace_based_{false};
+ std::uint32_t trace_index_{0};
+ char *trace_file_{nullptr};
+ Rate production_rate_{"2048kbps"};
+ std::size_t payload_size_{1384};
+ bool input_stream_mode_{false};
std::vector<struct packet_t> trace_;
+ std::string fec_type_;
};
} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
deleted file mode 100644
index 655ac3b66..000000000
--- a/apps/hiperf/src/forwarder_config.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-namespace hiperf {
-
-struct ListenerConfig {
- std::string address;
- std::uint16_t port;
- std::string interface;
- std::string name;
-};
-
-struct ConnectorConfig {
- std::string local_address;
- std::uint16_t local_port;
- std::string remote_address;
- std::uint16_t remote_port;
- std::string interface;
- std::string name;
-};
-
-struct RouteConfig {
- std::string prefix;
- uint16_t weight;
- std::string main_connector;
- std::string backup_connector;
- std::string name;
-};
-
-class ForwarderConfiguration {
- public:
- ForwarderConfiguration() : n_threads_(1) {}
-
- bool empty() {
- return listeners_.empty() && connectors_.empty() && routes_.empty();
- }
-
- ForwarderConfiguration &setThreadNumber(std::size_t threads) {
- n_threads_ = threads;
- return *this;
- }
-
- std::size_t getThreadNumber() { return n_threads_; }
-
- template <typename... Args>
- ForwarderConfiguration &addListener(Args &&...args) {
- listeners_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addConnector(const std::string &name,
- Args &&...args) {
- connectors_.emplace(name, std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addRoute(Args &&...args) {
- routes_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- std::vector<ListenerConfig> &getListeners() { return listeners_; }
-
- std::unordered_map<std::string, ConnectorConfig> &getConnectors() {
- return connectors_;
- }
-
- std::vector<RouteConfig> &getRoutes() { return routes_; }
-
- private:
- std::vector<ListenerConfig> listeners_;
- std::unordered_map<std::string, ConnectorConfig> connectors_;
- std::vector<RouteConfig> routes_;
- std::size_t n_threads_;
-};
-
-} \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
deleted file mode 100644
index 864208239..000000000
--- a/apps/hiperf/src/forwarder_interface.cc
+++ /dev/null
@@ -1,676 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arpa/inet.h>
-#include <forwarder_interface.h>
-#include <hicn/transport/utils/log.h>
-
-#include <chrono>
-#include <iostream>
-#include <thread>
-#include <unordered_set>
-
-extern "C" {
-#include <hicn/error.h>
-#include <hicn/util/ip_address.h>
-}
-
-// XXX the main listener should be retrieve in this class at initialization, aka
-// when hICN becomes avialable
-//
-// XXX the main listener port will be retrieved in the forwarder
-// interface... everything else will be delayed until we have this
-// information
-
-namespace hiperf {
-
-ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
- ICallback *callback)
- : external_ioservice_(io_service),
- forwarder_interface_callback_(callback),
- work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
- sock_(nullptr),
- thread_(std::make_unique<std::thread>([this]() {
- std::cout << "Starting Forwarder Interface thread" << std::endl;
- internal_ioservice_.run();
- std::cout << "Stopping Forwarder Interface thread" << std::endl;
- })),
- // set_route_callback_(std::forward<Callback &&>(setRouteCallback)),
- check_routes_timer_(nullptr),
- pending_add_route_counter_(0),
- hicn_listen_port_(9695),
- /* We start in disabled state even when a forwarder is always available */
- state_(State::Disabled),
- timer_(io_service),
- num_reattempts(0) {
- std::cout << "Forwarder interface created... connecting to forwarder...\n";
- internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
-}
-
-ForwarderInterface::~ForwarderInterface() {
- if (thread_ && thread_->joinable()) {
- internal_ioservice_.dispatch([this]() {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- work_.reset();
- });
-
- thread_->join();
- }
-
- std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl;
-}
-
-void ForwarderInterface::onHicnServiceAvailable(bool flag) {
- if (flag) {
- switch (state_) {
- case State::Disabled:
- case State::Requested:
- state_ = State::Available;
- case State::Available:
- connectToForwarder();
- /* Synchronous */
- if (state_ != State::Connected) {
- std::cout << "ConnectToForwarder failed" << std::endl;
- goto REATTEMPT;
- }
- state_ = State::Ready;
-
- std::cout << "Connected to forwarder... cancelling reconnection timer"
- << std::endl;
- timer_.cancel();
- num_reattempts = 0;
-
- // case State::Connected:
- // checkListener();
-
- // if (state_ != State::Ready) {
- // std::cout << "Listener not found" << std::endl;
- // goto REATTEMPT;
- // }
- // state_ = State::Ready;
-
- // timer_.cancel();
- // num_reattempts = 0;
-
- std::cout << "Forwarder interface is ready... communicate to controller"
- << std::endl;
-
- forwarder_interface_callback_->onHicnServiceReady();
-
- case State::Ready:
- break;
- }
- } else {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
- state_ = State::Disabled; // XXX to be checked upon callback to prevent the
- // state from going forward (used to manage
- // concurrency)
- }
- return;
-
-REATTEMPT:
- /* Schedule reattempt */
- std::cout << "Failed to connect, scheduling reattempt" << std::endl;
- num_reattempts++;
-
- timer_.expires_from_now(
- std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
- // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
- // this, flag, std::placeholders::_1));
- timer_.async_wait([this, flag](std::error_code ec) {
- if (ec) return;
- onHicnServiceAvailable(flag);
- });
-}
-
-int ForwarderInterface::connectToForwarder() {
- sock_ = hc_sock_create();
- if (!sock_) {
- std::cout << "Could not create socket" << std::endl;
- goto ERR_SOCK;
- }
-
- if (hc_sock_connect(sock_) < 0) {
- std::cout << "Could not connect to forwarder" << std::endl;
- goto ERR;
- }
-
- std::cout << "Forwarder interface connected" << std::endl;
- state_ = State::Connected;
- return 0;
-
-ERR:
- hc_sock_free(sock_);
- sock_ = nullptr;
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::checkListener() {
- if (!sock_) return -1;
-
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) return -1;
-
- int ret = -1;
- foreach_listener(l, data) {
- std::string interface = std::string(l->interface_name);
- if (interface.compare("lo") != 0) {
- hicn_listen_port_ = l->local_port;
- state_ = State::Ready;
- ret = 0;
- std::cout << "Got listener port" << std::endl;
- break;
- }
- }
-
- hc_data_free(data);
- return ret;
-}
-
-void ForwarderInterface::close() {
- std::cout << "ForwarderInterface::close" << std::endl;
-
- state_ = State::Disabled;
- /* Cancelling eventual reattempts */
- timer_.cancel();
-
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- internal_ioservice_.post([this]() { work_.reset(); });
-
- if (thread_->joinable()) {
- thread_->join();
- }
-}
-
-#if 0
-void ForwarderInterface::enableCheckRoutesTimer() {
- if (check_routes_timer_ != nullptr) return;
-
- check_routes_timer_ =
- std::make_unique<asio::steady_timer>(internal_ioservice_);
- checkRoutesLoop();
-}
-
-void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) {
- internalRemoveConnectedUser(protocol);
-}
-
-void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) {
- internal_ioservice_.post(
- [this, protocol]() { internalRemoveConnectedUser(protocol); });
-}
-#endif
-
-void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- createFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::createFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- pending_add_route_counter_++;
- auto timer = new asio::steady_timer(internal_ioservice_);
- internal_ioservice_.post([this, routes_info, timer]() {
- internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT,
- timer);
- });
-}
-
-void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- deleteFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::deleteFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- internal_ioservice_.post([this, routes_info]() {
- for (auto &route : routes_info) {
- internalDeleteFaceAndRoute(route);
- }
- });
-}
-
-void ForwarderInterface::internalDeleteFaceAndRoute(
- const RouteInfoPtr &route_info) {
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) return;
-
- std::vector<hc_route_t *> routes_to_remove;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
-
- std::string route_addr(remote_addr);
- if (route_addr.compare(route_info->route_addr) == 0 &&
- r->len == route_info->route_len) {
- // route found
- routes_to_remove.push_back(r);
- }
- }
-
- if (routes_to_remove.size() == 0) {
- // nothing to do here
- hc_data_free(data);
- return;
- }
-
- std::unordered_set<uint32_t> connids_to_remove;
- for (unsigned i = 0; i < routes_to_remove.size(); i++) {
- connids_to_remove.insert(routes_to_remove[i]->face_id);
- if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
- std::cout << "Error removing route from forwarder." << std::endl;
- }
- }
-
- // remove connection
- if (hc_connection_list(sock_, &data) < 0) {
- hc_data_free(data);
- return;
- }
-
- // collects pointerst to the connections using the conn IDs
- std::vector<hc_connection_t *> conns_to_remove;
- foreach_connection(c, data) {
- if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
- // conn found
- conns_to_remove.push_back(c);
- }
- }
-
- if (conns_to_remove.size() == 0) {
- // nothing else to do here
- hc_data_free(data);
- return;
- }
-
- for (unsigned i = 0; i < conns_to_remove.size(); i++) {
- if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
- std::cout << "Error removing connection from forwarder." << std::endl;
- }
- }
-
- hc_data_free(data);
-}
-
-void ForwarderInterface::internalCreateFaceAndRoutes(
- const std::vector<RouteInfoPtr> &route_info, uint8_t max_try,
- asio::steady_timer *timer) {
- uint32_t face_id;
-
- std::vector<RouteInfoPtr> failed;
- for (auto &route : route_info) {
- int ret = tryToCreateFace(route.get(), &face_id);
- if (ret >= 0) {
- auto ret = tryToCreateRoute(route.get(), face_id);
- if (ret < 0) {
- failed.push_back(route);
- std::cerr << "Error creating route and face" << std::endl;
- continue;
- }
- }
- }
-
- if (failed.size() > 0) {
- if (max_try == 0) {
- /* All attempts failed */
- goto RESULT;
- }
- max_try--;
- timer->expires_from_now(std::chrono::milliseconds(500));
- timer->async_wait([this, failed, max_try, timer](std::error_code ec) {
- if (ec) return;
- internalCreateFaceAndRoutes(failed, max_try, timer);
- });
- return;
- }
-
-#if 0
- // route_status_[protocol] = std::move(route_info);
- for (size_t i = 0; i < route_info.size(); i++) {
- route_status_.insert(
- std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i])));
- }
-#endif
-
-RESULT:
- std::cout << "Face / Route create ok, now calling back protocol" << std::endl;
- pending_add_route_counter_--;
- external_ioservice_.post([this, r = std::move(route_info)]() mutable {
- forwarder_interface_callback_->onRouteConfigured(r);
- });
- delete timer;
-}
-
-int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
- uint32_t *face_id) {
- bool found = false;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateFace] socket error"
- << std::endl;
- goto ERR_SOCK;
- }
-
- // get listeners list
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners";
- goto ERR_LIST;
- }
-
- char _local_address[128];
- foreach_listener(l, data) {
- std::cout << "Processing " << l->interface_name << std::endl;
- std::string interface = std::string(l->interface_name);
- int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET);
- if (ret < 0) {
- std::cerr << "Error in ip_address_ntop" << std::endl;
- goto ERR;
- }
-
- std::string local_address = std::string(_local_address);
- uint16_t local_port = l->local_port;
-
- if (interface.compare(route_info->interface) == 0 &&
- local_address.compare(route_info->local_addr) == 0 &&
- local_port == route_info->local_port) {
- found = true;
- break;
- }
- }
-
- std::cout << route_info->remote_addr << std::endl;
-
- ip_address_t local_address, remote_address;
- ip_address_pton(route_info->local_addr.c_str(), &local_address);
- ip_address_pton(route_info->remote_addr.c_str(), &remote_address);
-
- if (!found) {
- // Create listener
- hc_listener_t listener;
- memset(&listener, 0, sizeof(hc_listener_t));
-
- std::string name = "l_" + route_info->name;
- listener.local_addr = local_address;
- listener.type = CONNECTION_TYPE_UDP;
- listener.family = AF_INET;
- listener.local_port = route_info->local_port;
- strncpy(listener.name, name.c_str(), sizeof(listener.name));
- strncpy(listener.interface_name, route_info->interface.c_str(),
- sizeof(listener.interface_name));
-
- std::cout << "------------> " << route_info->interface << std::endl;
-
- int ret = hc_listener_create(sock_, &listener);
-
- if (ret < 0) {
- std::cerr << "Error creating listener." << std::endl;
- return -1;
- } else {
- std::cout << "Listener " << listener.id << " created." << std::endl;
- }
- }
-
- // Create face
- hc_face_t face;
- memset(&face, 0, sizeof(hc_face_t));
-
- // crate face with the local interest
- face.face.type = FACE_TYPE_UDP;
- face.face.family = route_info->family;
- face.face.local_addr = local_address;
- face.face.remote_addr = remote_address;
- face.face.local_port = route_info->local_port;
- face.face.remote_port = route_info->remote_port;
-
- if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) <
- 0) {
- std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] "
- "netdevice_set_name "
- "("
- << face.face.netdevice.name << ", "
- << route_info->interface << ") error" << std::endl;
- goto ERR;
- }
-
- // create face
- if (hc_face_create(sock_, &face) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] error creating face";
- goto ERR;
- }
-
- std::cout << "Face created successfully" << std::endl;
-
- // assing face to the return value
- *face_id = face.id;
-
- hc_data_free(data);
- return 0;
-
-ERR:
- hc_data_free(data);
-ERR_LIST:
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
- uint32_t face_id) {
- std::cout << "Trying to create route" << std::endl;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] socket error";
- return -1;
- }
-
- ip_address_t route_ip;
- hc_route_t route;
-
- if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error";
- return -1;
- }
-
- route.face_id = face_id;
- route.family = AF_INET6;
- route.remote_addr = route_ip;
- route.len = route_info->route_len;
- route.cost = 1;
-
- if (hc_route_create(sock_, &route) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route";
- return -1;
- }
-
- std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl;
- return 0;
-}
-
-#if 0 // not used
-void ForwarderInterface::checkRoutesLoop() {
- check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
- check_routes_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- if (pending_add_route_counter_ == 0) checkRoutes();
- });
-}
-
-void ForwarderInterface::checkRoutes() {
- std::cout << "someone called the checkRoutes function" << std::endl;
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) {
- return;
- }
-
- std::unordered_set<std::string> routes_set;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
- std::string route(std::string(remote_addr) + "/" + std::to_string(r->len));
- routes_set.insert(route);
- }
-
- for (auto it = route_status_.begin(); it != route_status_.end(); it++) {
- std::string route(it->second->route_addr + "/" +
- std::to_string(it->second->route_len));
- if (routes_set.find(route) == routes_set.end()) {
- // the route is missing
- createFaceAndRoute(it->second, it->first);
- break;
- }
- }
-
- hc_data_free(data);
-}
-#endif
-
-#if 0
- using ListenerRetrievedCallback =
- std::function<void(std::error_code, uint32_t)>;
-
- ListenerRetrievedCallback listener_retrieved_callback_;
-
-#ifdef __ANDROID__
- hicn_listen_port_(9695),
-#else
- hicn_listen_port_(0),
-#endif
- timer_(forward_engine_.getIoService()),
-
- void initConfigurationProtocol(void)
- {
- // We need the configuration, which is different for every protocol...
- // so we move this step down towards the protocol implementation itself.
- if (!permanent_hicn) {
- doInitConfigurationProtocol();
- } else {
- // XXX This should be moved somewhere else
- getMainListener(
- [this](std::error_code ec, uint32_t hicn_listen_port) {
- if (!ec)
- {
- hicn_listen_port_ = hicn_listen_port;
- doInitConfigurationProtocol();
- }
- });
- }
- }
-
- template <typename Callback>
- void getMainListener(Callback &&callback)
- {
- listener_retrieved_callback_ = std::forward<Callback &&>(callback);
- tryToConnectToForwarder();
- }
- private:
- void doGetMainListener(std::error_code ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.getMainListenerPort();
- if (ret <= 0)
- {
- // Since without the main listener of the forwarder the proxy cannot
- // work, we can stop the program here until we get the listener port.
- std::cout <<
- "Could not retrieve main listener port from the forwarder. "
- "Retrying.";
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doGetMainListener, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- hicn_listen_port_ = uint16_t(ret);
- listener_retrieved_callback_(
- make_error_code(configuration_error::success), hicn_listen_port_);
- }
- }
- else
- {
- std::cout << "Timer for retrieving main hicn listener canceled." << std::endl;
- }
- }
-
- void tryToConnectToForwarder()
- {
- doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
- }
-
- void doTryToConnectToForwarder(std::error_code ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.connect();
- if (ret < 0)
- {
- // We were not able to connect to the local forwarder. Do not give up
- // and retry.
- std::cout << "Could not connect to local forwarder. Retrying." << std::endl;
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- doGetMainListener(std::make_error_code(std::errc(0)));
- }
- }
- else
- {
- std::cout << "Timer for re-trying forwarder connection canceled." << std::endl;
- }
- }
-
-
- template <typename ProtocolImplementation>
- constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL;
-
-#endif
-
-constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
-constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
-
-} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
deleted file mode 100644
index 7591ea257..000000000
--- a/apps/hiperf/src/forwarder_interface.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-extern "C" {
-#ifndef WITH_POLICY
-#define WITH_POLICY
-#endif
-#include <hicn/ctrl/api.h>
-#include <hicn/util/ip_address.h>
-}
-
-#ifndef ASIO_STANDALONE
-#define ASIO_STANDALONE
-#endif
-#include <asio.hpp>
-
-#include <functional>
-#include <thread>
-#include <unordered_map>
-
-namespace hiperf {
-
-class ForwarderInterface {
- static const uint32_t REATTEMPT_DELAY_MS = 500;
- static const uint32_t MAX_REATTEMPT = 10;
-
- public:
- struct RouteInfo {
- int family;
- std::string local_addr;
- uint16_t local_port;
- std::string remote_addr;
- uint16_t remote_port;
- std::string route_addr;
- uint8_t route_len;
- std::string interface;
- std::string name;
- };
-
- using RouteInfoPtr = std::shared_ptr<RouteInfo>;
-
- class ICallback {
- public:
- virtual void onHicnServiceReady() = 0;
- virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0;
- };
-
- enum class State {
- Disabled, /* Stack is stopped */
- Requested, /* Stack is starting */
- Available, /* Forwarder is running */
- Connected, /* Control socket connected */
- Ready, /* Listener present */
- };
-
- public:
- ForwarderInterface(asio::io_service &io_service, ICallback *callback);
-
- ~ForwarderInterface();
-
- State getState();
-
- void setState(State state);
-
- void onHicnServiceAvailable(bool flag);
-
- void enableCheckRoutesTimer();
-
- void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void createFaceAndRoute(const RouteInfoPtr &route_info);
-
- void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void deleteFaceAndRoute(const RouteInfoPtr &route_info);
-
- void close();
-
- uint16_t getHicnListenerPort() { return hicn_listen_port_; }
-
- private:
- int connectToForwarder();
-
- int checkListener();
-
- void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info,
- uint8_t max_try, asio::steady_timer *timer);
-
- void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info);
-
- int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id);
- int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id);
-
- void checkRoutesLoop();
-
- void checkRoutes();
-
- asio::io_service &external_ioservice_;
- asio::io_service internal_ioservice_;
- ICallback *forwarder_interface_callback_;
- std::unique_ptr<asio::io_service::work> work_;
- hc_sock_t *sock_;
- std::unique_ptr<std::thread> thread_;
- // SetRouteCallback set_route_callback_;
- // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_;
- std::unique_ptr<asio::steady_timer> check_routes_timer_;
- uint32_t pending_add_route_counter_;
- uint16_t hicn_listen_port_;
-
- State state_;
-
- /* Reattempt timer */
- asio::steady_timer timer_;
- unsigned num_reattempts;
-};
-
-} // namespace hiperf
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
index b2d99c4a4..25c1a288c 100644
--- a/apps/hiperf/src/main.cc
+++ b/apps/hiperf/src/main.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,133 +14,184 @@
*/
#include <client.h>
+#include <hicn/apps/utils/logger.h>
#include <server.h>
-#include <forwarder_interface.h>
namespace hiperf {
+using transport::auth::CryptoHashType;
+
+static std::unordered_map<std::string, hicn_packet_format_t> const
+ packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP},
+ {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP},
+ {"new", HICN_PACKET_FORMAT_NEW}};
+
+std::string str_tolower(std::string s) {
+ std::transform(s.begin(), s.end(), s.begin(),
+ [](unsigned char c) { return std::tolower(c); });
+ return s;
+}
+
void usage() {
- std::cerr << "HIPERF - A tool for performing network throughput "
- "measurements with hICN"
- << std::endl;
- std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl;
- std::cerr << std::endl;
- std::cerr << "SERVER OR CLIENT:" << std::endl;
+ LoggerInfo() << "HIPERF - Instrumentation tool for performing active network"
+ "measurements with hICN";
+ LoggerInfo() << "usage: hiperf [-S|-C] [options] [prefix|name]";
+ LoggerInfo();
+ LoggerInfo() << "SERVER OR CLIENT:";
#ifndef _WIN32
- std::cerr << "-D\t\t\t\t\t"
- << "Run as a daemon" << std::endl;
- std::cerr << "-R\t\t\t\t\t"
- << "Run RTC protocol (client or server)" << std::endl;
- std::cerr << "-f\t<filename>\t\t\t"
- << "Log file" << std::endl;
- std::cerr << "-z\t<io_module>\t\t\t"
- << "IO module to use. Default: hicnlight_module" << std::endl;
+ LoggerInfo() << "-D\t\t\t\t\t"
+ << "Run as a daemon";
+ LoggerInfo() << "-R\t\t\t\t\t"
+ << "Run RTC protocol (client or server)";
+ LoggerInfo() << "-f\t<filename>\t\t\t"
+ << "Log file";
+ LoggerInfo() << "-z\t<io_module>\t\t\t"
+ << "IO module to use. Default: hicnlight_module";
+ LoggerInfo() << "-F\t<conf_file>\t\t\t"
+ << "Path to optional configuration file for libtransport";
+ LoggerInfo() << "-a\t\t\t\t\t"
+ << "Enables data packet aggregation. "
+ << "Works only in RTC mode";
+ LoggerInfo() << "-X\t<param>\t\t\t\t"
+ << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#";
+ LoggerInfo()
+ << "-J\t<passphrase>\t\t\t"
+ << "Set the passphrase used to sign/verify aggregated interests. "
+ "If set on the client, aggregated interests are enable automatically.";
#endif
- std::cerr << std::endl;
- std::cerr << "SERVER SPECIFIC:" << std::endl;
- std::cerr << "-A\t<content_size>\t\t\t"
- "Size of the content to publish. This "
- "is not the size of the packet (see -s for it)."
- << std::endl;
- std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
- << std::endl;
- std::cerr << "-r\t\t\t\t\t"
- << "Produce real content of <content_size> bytes" << std::endl;
- std::cerr << "-m\t\t\t\t\t"
- << "Produce transport manifest" << std::endl;
- std::cerr << "-l\t\t\t\t\t"
- << "Start producing content upon the reception of the "
- "first interest"
- << std::endl;
- std::cerr << "-K\t<keystore_path>\t\t\t"
- << "Path of p12 file containing the "
- "crypto material used for signing packets"
- << std::endl;
- std::cerr << "-k\t<passphrase>\t\t\t"
- << "String from which a 128-bit symmetric key will be "
- "derived for signing packets"
- << std::endl;
- std::cerr << "-y\t<hash_algorithm>\t\t"
- << "Use the selected hash algorithm for "
- "calculating manifest digests"
- << std::endl;
- std::cerr << "-p\t<password>\t\t\t"
- << "Password for p12 keystore" << std::endl;
- std::cerr << "-x\t\t\t\t\t"
- << "Produce a content of <content_size>, then after downloading "
- "it produce a new content of"
- << "\n\t\t\t\t\t<content_size> without resetting "
- "the suffix to 0."
- << std::endl;
- std::cerr << "-B\t<bitrate>\t\t\t"
- << "Bitrate for RTC producer, to be used with the -R option."
- << std::endl;
+ LoggerInfo();
+ LoggerInfo() << "SERVER SPECIFIC:";
+ LoggerInfo()
+ << "-A\t<content_size>\t\t\t"
+ "Sends an application data unit in bytes that is published once "
+ "before exit";
+ LoggerInfo() << "-E\t<expiry_time>\t\t\t"
+ "Expiration time for data packets generated by the producer "
+ "socket";
+ LoggerInfo() << "-s\t<packet_size>\t\t\tData packet payload size.";
+ LoggerInfo() << "-r\t\t\t\t\t"
+ << "Produce real content of <content_size> bytes";
+ LoggerInfo()
+ << "-m\t<manifest_max_capacity>\t\t"
+ << "The maximum number of entries a manifest can contain. Set it "
+ "to 0 to disable manifests. Default is 30, max is 255.";
+ LoggerInfo() << "-l\t\t\t\t\t"
+ << "Start producing content upon the reception of the "
+ "first interest";
+ LoggerInfo() << "-K\t<keystore_path>\t\t\t"
+ << "Path of p12 file containing the "
+ "crypto material used for signing packets";
+ LoggerInfo() << "-k\t<passphrase>\t\t\t"
+ << "String from which a 128-bit symmetric key will be "
+ "derived for signing packets";
+ LoggerInfo() << "-p\t<password>\t\t\t"
+ << "Password for p12 keystore";
+ LoggerInfo() << "-y\t<hash_algorithm>\t\t"
+ << "Use the selected hash algorithm for "
+ "computing manifest digests (default: SHA256)";
+ LoggerInfo() << "-x\t\t\t\t\t"
+ << "Produces application data units of size <content_size> "
+ << "without resetting the name suffix to 0.";
+ LoggerInfo() << "-B\t<bitrate>\t\t\t"
+ << "RTC producer data bitrate, to be used with the -R option.";
#ifndef _WIN32
- std::cerr << "-I\t\t\t\t\t"
- "Interactive mode, start/stop real time content production "
- "by pressing return. To be used with the -R option"
- << std::endl;
- std::cerr
+ LoggerInfo() << "-I\t\t\t\t\t"
+ "Interactive mode, start/stop real time content production "
+ "by pressing return. To be used with the -R option";
+ LoggerInfo()
<< "-T\t<filename>\t\t\t"
"Trace based mode, hiperf takes as input a file with a trace. "
"Each line of the file indicates the timestamp and the size of "
"the packet to generate. To be used with the -R option. -B and -I "
- "will be ignored."
- << std::endl;
- std::cerr << "-E\t\t\t\t\t"
- << "Enable encrypted communication. Requires the path to a p12 "
- "file containing the "
- "crypto material used for the TLS handshake"
- << std::endl;
- std::cerr << "-G\t<port>\t\t\t"
- << "input stream from localhost at the specified port" << std::endl;
+ "will be ignored.";
+ LoggerInfo() << "-G\t<port>\t\t\t\t"
+ << "Input stream from localhost at the specified port";
#endif
- std::cerr << std::endl;
- std::cerr << "CLIENT SPECIFIC:" << std::endl;
- std::cerr << "-b\t<beta_parameter>\t\t"
- << "RAAQM beta parameter" << std::endl;
- std::cerr << "-d\t<drop_factor_parameter>\t\t"
- << "RAAQM drop factor "
- "parameter"
- << std::endl;
- std::cerr << "-L\t<interest lifetime>\t\t"
- << "Set interest lifetime." << std::endl;
- std::cerr << "-M\t<input_buffer_size>\t\t"
- << "Size of consumer input buffer. If 0, reassembly of packets "
- "will be disabled."
- << std::endl;
- std::cerr << "-W\t<window_size>\t\t\t"
- << "Use a fixed congestion window "
- "for retrieving the data."
- << std::endl;
- std::cerr << "-i\t<stats_interval>\t\t"
- << "Show the statistics every <stats_interval> milliseconds."
- << std::endl;
- std::cerr << "-c\t<certificate_path>\t\t"
- << "Path of the producer certificate to be used for verifying the "
- "origin of the packets received."
- << std::endl;
- std::cerr << "-k\t<passphrase>\t\t\t"
- << "String from which is derived the symmetric key used by the "
- "producer to sign packets and by the consumer to verify them."
- << std::endl;
- std::cerr << "-t\t\t\t\t\t"
- "Test mode, check if the client is receiving the "
- "correct data. This is an RTC specific option, to be "
- "used with the -R (default false)"
- << std::endl;
- std::cerr << "-P\t\t\t\t\t"
- << "Prefix of the producer where to do the handshake" << std::endl;
- std::cerr << "-j\t<relay_name>\t\t\t"
- << "Publish the received content under the name relay_name."
- "This is an RTC specific option, to be "
- "used with the -R (default false)"
- << std::endl;
- std::cerr << "-g\t<port>\t\t\t"
- << "output stream to localhost at the specified port" << std::endl;
+ LoggerInfo();
+ LoggerInfo() << "CLIENT SPECIFIC:";
+ LoggerInfo() << "-b\t<beta_parameter>\t\t"
+ << "RAAQM beta parameter";
+ LoggerInfo() << "-d\t<drop_factor_parameter>\t\t"
+ << "RAAQM drop factor "
+ "parameter";
+ LoggerInfo() << "-L\t<interest lifetime>\t\t"
+ << "Set interest lifetime.";
+ LoggerInfo() << "-U\t<factor>\t\t\t"
+ << "Update the relevance threshold: if an unverified packet has "
+ "been received before the last U * manifest_max_capacity_ "
+ "packets received (verified or not), it will be flushed out. "
+ "Should be > 1, default is 100.";
+ LoggerInfo()
+ << "-u\t<factor>\t\t\t"
+ << "Update the alert threshold: if the "
+ "number of unverified packet is > u * manifest_max_capacity_, "
+ "an alert is raised. Should be set such that U > u >= 1, "
+ "default is 20. If u >= U, no alert will ever be raised.";
+ LoggerInfo() << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled.";
+ LoggerInfo()
+ << "-N\t\t\t\t\t"
+ << "Enable aggregated interests; the number of suffixes (including "
+ "the one in the header) can be set through the env variable "
+ "`MAX_AGGREGATED_INTERESTS`.";
+ LoggerInfo() << "-W\t<window_size>\t\t\t"
+ << "Use a fixed congestion window "
+ "for retrieving the data.";
+ LoggerInfo() << "-i\t<stats_interval>\t\t"
+ << "Show the statistics every <stats_interval> milliseconds.";
+ LoggerInfo()
+ << "-c\t<certificate_path>\t\t"
+ << "Path of the producer certificate to be used for verifying the "
+ "origin of the packets received.";
+ LoggerInfo()
+ << "-k\t<passphrase>\t\t\t"
+ << "String from which is derived the symmetric key used by the "
+ "producer to sign packets and by the consumer to verify them.";
+ LoggerInfo() << "-t\t\t\t\t\t"
+ "Test mode, check if the client is receiving the "
+ "correct data. This is an RTC specific option, to be "
+ "used with the -R (default: false)";
+ LoggerInfo()
+ << "-P\t\t\t\t\t"
+ << "Number of parallel streams. For hiperf client, this is the "
+ "number of consumer to create, while for hiperf server this is "
+ "the number of producers to create.";
+ LoggerInfo() << "-j\t<relay_name>\t\t\t"
+ << "Publish received content under the name relay_name."
+ "This is an RTC specific option, to be "
+ "used with the -R (default: false)";
+ LoggerInfo() << "-g\t<port>\t\t\t\t"
+ << "Output stream to localhost at the specified port";
+ LoggerInfo()
+ << "-o\t\t\t\t\t"
+ << "Content sharing mode: if set the socket work in content sharing"
+ << "mode. It works only in RTC mode";
+ LoggerInfo() << "-e\t<strategy>\t\t\t"
+ << "Enhance the network with a reliability strategy. Options";
+ LoggerInfo() << "\t\t\t\t\t\t1: unreliable ";
+ LoggerInfo() << "\t\t\t\t\t\t2: rtx only ";
+ LoggerInfo() << "\t\t\t\t\t\t3: fec only ";
+ LoggerInfo() << "\t\t\t\t\t\t4: delay based ";
+ LoggerInfo() << "\t\t\t\t\t\t5: low rate ";
+ LoggerInfo() << "\t\t\t\t\t\t6: low rate and best path ";
+ LoggerInfo() << "\t\t\t\t\t\t7: low rate and replication";
+ LoggerInfo() << "\t\t\t\t\t\t8: low rate and best path/replication ";
+ LoggerInfo() << "\t\t\t\t\t\t9: only fec low residual losses ";
+ LoggerInfo() << "\t\t\t\t\t\t10: delay and best path ";
+ LoggerInfo() << "\t\t\t\t\t\t11: delay and replication ";
+ LoggerInfo() << "\t\t\t\t\t\t(default: 2 = rtx only) ";
+ LoggerInfo() << "-H\t\t\t\t\t"
+ << "Disable periodic print headers in stats report.";
+ LoggerInfo() << "-n\t<nb_iterations>\t\t\t"
+ << "Print the stats report <nb_iterations> times and exit.\n"
+ << "\t\t\t\t\tThis option limits the duration of the run to "
+ "<nb_iterations> * <stats_interval> milliseconds.";
+ LoggerInfo() << "-w <packet_format> Packet format (without signature, "
+ "defaults to IPV6_TCP)";
}
-int main(int argc, char *argv[]) {
+int hiperf_main(int argc, char *argv[]) {
#ifndef _WIN32
// Common
bool daemon = false;
@@ -149,11 +200,13 @@ int main(int argc, char *argv[]) {
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
+ transport::interface::global_config::GlobalConfigInterface global_conf;
+
// -1 server, 0 undefined, 1 client
int role = 0;
int options = 0;
- char *log_file = nullptr;
+ const char *log_file = nullptr;
transport::interface::global_config::IoModuleConfiguration config;
std::string conf_file;
config.name = "hicnlight_module";
@@ -166,10 +219,11 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(
- argc, argv,
- "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) !=
- -1) {
+ // Please keep in alphabetical order.
+ while (
+ (opt = getopt(argc, argv,
+ "A:B:CDE:F:G:HIJ:K:L:M:NP:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:"
+ "n:op:qrs:tu:vw:xy:z:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -202,11 +256,16 @@ int main(int argc, char *argv[]) {
break;
}
#else
+ // Please keep in alphabetical order.
while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) !=
- -1) {
+ "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:op:rs:"
+ "tu:vwxy:z:")) != -1) {
switch (opt) {
#endif
+ case 'E': {
+ server_configuration.content_lifetime_ = std::stoul(optarg);
+ break;
+ }
case 'f': {
log_file = optarg;
break;
@@ -216,6 +275,30 @@ int main(int argc, char *argv[]) {
server_configuration.rtc_ = true;
break;
}
+ case 'a': {
+ client_configuration.aggregated_data_ = true;
+ server_configuration.aggregated_data_ = true;
+ break;
+ }
+ case 'o': {
+ client_configuration.content_sharing_mode_ = true;
+ break;
+ }
+ case 'w': {
+ std::string packet_format_s = std::string(optarg);
+ packet_format_s = str_tolower(packet_format_s);
+ auto it = packet_format_map.find(std::string(optarg));
+ if (it == packet_format_map.end())
+ throw std::runtime_error("Bad packet format");
+ client_configuration.packet_format_ = it->second;
+ server_configuration.packet_format_ = it->second;
+ break;
+ }
+ case 'k': {
+ server_configuration.passphrase_ = std::string(optarg);
+ client_configuration.passphrase_ = std::string(optarg);
+ break;
+ }
case 'z': {
config.name = optarg;
break;
@@ -234,25 +317,31 @@ int main(int argc, char *argv[]) {
role += 1;
break;
}
- case 'k': {
- server_configuration.passphrase = std::string(optarg);
- client_configuration.passphrase = std::string(optarg);
+ case 'q': {
+ client_configuration.colored_ = server_configuration.colored_ = false;
+ break;
+ }
+ case 'J': {
+ client_configuration.aggr_interest_passphrase_ = optarg;
+ server_configuration.aggr_interest_passphrase_ = optarg;
+ // Consumer signature is only used with aggregated interests,
+ // hence enabling it also forces usage of aggregated interests
+ client_configuration.aggregated_interests_ = true;
break;
}
-
// Client specifc
case 'b': {
- client_configuration.beta = std::stod(optarg);
+ client_configuration.beta_ = std::stod(optarg);
options = 1;
break;
}
case 'd': {
- client_configuration.drop_factor = std::stod(optarg);
+ client_configuration.drop_factor_ = std::stod(optarg);
options = 1;
break;
}
case 'W': {
- client_configuration.window = std::stod(optarg);
+ client_configuration.window_ = std::stod(optarg);
options = 1;
break;
}
@@ -261,13 +350,17 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'N': {
+ client_configuration.aggregated_interests_ = true;
+ break;
+ }
case 'P': {
- client_configuration.producer_prefix_ = Prefix(optarg);
- client_configuration.secure_ = true;
+ client_configuration.parallel_flows_ =
+ server_configuration.parallel_flows_ = std::stoull(optarg);
break;
}
case 'c': {
- client_configuration.producer_certificate = std::string(optarg);
+ client_configuration.producer_certificate_ = std::string(optarg);
options = 1;
break;
}
@@ -286,15 +379,35 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'U': {
+ client_configuration.manifest_factor_relevant_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'u': {
+ client_configuration.manifest_factor_alert_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
case 'j': {
client_configuration.relay_ = true;
client_configuration.relay_name_ = Prefix(optarg);
options = 1;
break;
}
+ case 'H': {
+ client_configuration.print_headers_ = false;
+ options = 1;
+ break;
+ }
+ case 'n': {
+ client_configuration.nb_iterations_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
// Server specific
case 'A': {
- server_configuration.download_size = std::stoul(optarg);
+ server_configuration.download_size_ = std::stoul(optarg);
options = -1;
break;
}
@@ -304,43 +417,44 @@ int main(int argc, char *argv[]) {
break;
}
case 'r': {
- server_configuration.virtual_producer = false;
+ server_configuration.virtual_producer_ = false;
options = -1;
break;
}
case 'm': {
- server_configuration.manifest = true;
+ server_configuration.manifest_max_capacity_ = std::stoul(optarg);
options = -1;
break;
}
case 'l': {
- server_configuration.live_production = true;
+ server_configuration.live_production_ = true;
options = -1;
break;
}
case 'K': {
- server_configuration.keystore_name = std::string(optarg);
+ server_configuration.keystore_name_ = std::string(optarg);
options = -1;
break;
}
case 'y': {
+ CryptoHashType hash_algorithm = CryptoHashType::SHA256;
if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA256;
+ hash_algorithm = CryptoHashType::SHA256;
} else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA512;
+ hash_algorithm = CryptoHashType::SHA512;
} else if (strncasecmp(optarg, "blake2b512", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512;
+ hash_algorithm = CryptoHashType::BLAKE2B512;
} else if (strncasecmp(optarg, "blake2s256", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256;
+ hash_algorithm = CryptoHashType::BLAKE2S256;
} else {
- std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
- << std::endl;
+ LoggerWarn() << "Unknown hash algorithm. Using SHA 256.";
}
+ server_configuration.hash_algorithm_ = hash_algorithm;
options = -1;
break;
}
case 'p': {
- server_configuration.keystore_password = std::string(optarg);
+ server_configuration.keystore_password_ = std::string(optarg);
options = -1;
break;
}
@@ -356,12 +470,16 @@ int main(int argc, char *argv[]) {
options = -1;
break;
}
- case 'E': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.secure_ = true;
+ case 'e': {
+ client_configuration.recovery_strategy_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'X': {
+ server_configuration.fec_type_ = std::string(optarg);
+ options = -1;
break;
}
- case 'h':
default:
usage();
return EXIT_FAILURE;
@@ -369,34 +487,31 @@ int main(int argc, char *argv[]) {
}
if (options > 0 && role < 0) {
- std::cerr << "Client options cannot be used when using the "
- "software in server mode"
- << std::endl;
+ LoggerErr() << "Client options cannot be used when using the "
+ "software in server mode";
usage();
return EXIT_FAILURE;
} else if (options < 0 && role > 0) {
- std::cerr << "Server options cannot be used when using the "
- "software in client mode"
- << std::endl;
+ LoggerErr() << "Server options cannot be used when using the "
+ "software in client mode";
usage();
return EXIT_FAILURE;
} else if (!role) {
- std::cerr << "Please specify if running hiperf as client "
- "or server."
- << std::endl;
+ LoggerErr() << "Please specify if running hiperf as client "
+ "or server.";
usage();
return EXIT_FAILURE;
}
if (argv[optind] == 0) {
- std::cerr << "Please specify the name/prefix to use." << std::endl;
+ LoggerErr() << "Please specify the name/prefix to use.";
usage();
return EXIT_FAILURE;
} else {
if (role > 0) {
- client_configuration.name = Name(argv[optind]);
+ client_configuration.name_ = Prefix(argv[optind]);
} else {
- server_configuration.name = Prefix(argv[optind]);
+ server_configuration.name_ = Prefix(argv[optind]);
}
}
@@ -427,7 +542,7 @@ int main(int argc, char *argv[]) {
config.set();
// Parse config file
- transport::interface::global_config::parseConfigurationFile(conf_file);
+ global_conf.parseConfigurationFile(conf_file);
if (role > 0) {
HIperfClient c(client_configuration);
@@ -453,4 +568,4 @@ int main(int argc, char *argv[]) {
} // namespace hiperf
-int main(int argc, char *argv[]) { return hiperf::main(argc, argv); }
+int main(int argc, char *argv[]) { return hiperf::hiperf_main(argc, argv); }
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
index 968d42e2c..3f6c335f9 100644
--- a/apps/hiperf/src/server.cc
+++ b/apps/hiperf/src/server.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -17,167 +17,92 @@
namespace hiperf {
+using transport::core::ContentObject;
+using transport::core::Interest;
+using transport::core::Name;
+using transport::interface::GeneralTransportOptions;
+using transport::interface::ProducerCallbacksOptions;
+using transport::interface::ProducerInterestCallback;
+using transport::interface::ProducerSocket;
+using transport::interface::ProductionProtocolAlgorithms;
+
/**
* Hiperf server class: configure and setup an hicn producer following the
* ServerConfiguration.
*/
class HIperfServer::Impl {
- const std::size_t log2_content_object_buffer_size = 8;
-
- public:
- Impl(const hiperf::ServerConfiguration &conf)
- : configuration_(conf),
- signals_(io_service_),
- rtc_timer_(io_service_),
- unsatisfied_interests_(),
- content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
- content_objects_index_(0),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- last_segment_(0),
-#ifndef _WIN32
- ptr_last_segment_(&last_segment_),
- input_(io_service_),
- rtc_running_(false),
-#else
- ptr_last_segment_(&last_segment_),
-#endif
- flow_name_(configuration_.name.getName()),
- socket_(io_service_),
- recv_buffer_(nullptr, 0) {
- std::string buffer(configuration_.payload_size_, 'X');
- std::cout << "Producing contents under name " << conf.name.getName()
- << std::endl;
-#ifndef _WIN32
- if (configuration_.interactive_) {
- input_.assign(::dup(STDIN_FILENO));
- }
-#endif
-
- for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = ContentObject::Ptr(
- new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
- (const uint8_t *)buffer.data(), buffer.size()));
- content_objects_[i]->setLifetime(
- default_values::content_object_expiry_time);
- }
- }
-
- void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
- content_objects_[content_objects_index_ & mask_]->setName(
- interest.getName());
- producer_socket_->produce(
- *content_objects_[content_objects_index_++ & mask_]);
+ static constexpr std::size_t klog2_content_object_buffer_size() { return 8; }
+ static constexpr std::size_t kcontent_object_buffer_size() {
+ return (1 << klog2_content_object_buffer_size());
}
-
- void processInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)VOID_HANDLER);
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
-
- produceContent(p, interest.getName(), interest.getName().getSuffix());
- std::cout << "Received interest " << interest.getName().getSuffix()
- << std::endl;
+ static constexpr std::size_t kmask() {
+ return (kcontent_object_buffer_size() - 1);
}
- void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
- std::placeholders::_1,
- std::placeholders::_2));
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
- uint32_t suffix = interest.getName().getSuffix();
-
- if (suffix == 0) {
- last_segment_ = 0;
- ptr_last_segment_ = &last_segment_;
- unsatisfied_interests_.clear();
- }
-
- // The suffix will either be the one from the received interest or the
- // smallest suffix of a previous interest not satisfed
- if (!unsatisfied_interests_.empty()) {
- auto it =
- std::lower_bound(unsatisfied_interests_.begin(),
- unsatisfied_interests_.end(), *ptr_last_segment_);
- if (it != unsatisfied_interests_.end()) {
- suffix = *it;
+ /**
+ * @brief As we can (potentially) setup many producer sockets, we need to keep
+ * a separate context for each one of them. The context contains parameters
+ * and variable that are specific to a single producer socket.
+ */
+ class ProducerContext
+ : public Base<ProducerContext, ServerConfiguration, Impl>,
+ public ProducerSocket::Callback {
+ public:
+ using ConfType = ServerConfiguration;
+ using ParentType = typename HIperfServer::Impl;
+ static inline const auto getContextType() { return "ProducerContext"; }
+
+ ProducerContext(HIperfServer::Impl &server, int producer_identifier)
+ : Base(server, server.io_service_, producer_identifier) {
+ // Allocate buffer to copy as content objects payload
+ std::string buffer(configuration_.payload_size_, 'X');
+
+ // Allocate array of content objects. They are share_ptr so that the
+ // transport will only capture a reference to them instead of performing
+ // an hard copy.
+ for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) {
+ const auto &element =
+ content_objects_.emplace_back(std::make_shared<ContentObject>(
+ configuration_.name_.makeName(), configuration_.packet_format_,
+ 0, (const uint8_t *)buffer.data(), buffer.size()));
+ element->setLifetime(
+ transport::interface::default_values::content_object_expiry_time);
}
- unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
- std::cout << "Received interest " << interest.getName().getSuffix()
- << ", starting production at " << suffix << std::endl;
- std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
- << std::endl;
- produceContentAsync(p, interest.getName(), suffix);
- }
-
- void produceContent(ProducerSocket &p, const Name &content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
- uint32_t total;
-
- utils::TimePoint t0 = utils::SteadyClock::now();
- total = p.produceStream(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- utils::TimePoint t1 = utils::SteadyClock::now();
-
- std::cout
- << "Written " << total
- << " data packets in output buffer (Segmentation time: "
- << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count()
- << " us)" << std::endl;
- }
-
- void produceContentAsync(ProducerSocket &p, Name content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
+ // To make vector happy (move or copy constructor is needed when vector
+ // resizes)
+ ProducerContext(ProducerContext &&other) noexcept
+ : Base(std::move(other)),
+ content_objects_(std::move(other.content_objects_)),
+ unsatisfied_interests_(std::move(other.unsatisfied_interests_)),
+ last_segment_(other.last_segment_),
+ producer_socket_(std::move(other.producer_socket_)),
+ content_objects_index_(other.content_objects_index_),
+ payload_size_max_(other.payload_size_max_) {}
- p.asyncProduce(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix,
- &ptr_last_segment_);
- }
+ virtual ~ProducerContext() = default;
- void cacheMiss(ProducerSocket &p, const Interest &interest) {
- unsatisfied_interests_.push_back(interest.getName().getSuffix());
- }
+ /**
+ * @brief Produce datagram
+ */
+ void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const {
+ assert(producer_socket_);
- void onContentProduced(ProducerSocket &p, const std::error_code &err,
- uint64_t bytes_written) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(
- &Impl::asyncProcessInterest, this,
- std::placeholders::_1, std::placeholders::_2));
- }
+ auto size = std::min(buffer_size, payload_size_max_);
- std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path,
- std::string &keystore_pwd,
- CryptoHashType &hash_type) {
- if (access(keystore_path.c_str(), F_OK) != -1) {
- return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type);
+ producer_socket_->produceDatagram(flow_name_, buffer, size);
}
- return std::make_shared<Identity>(keystore_path, keystore_pwd,
- CryptoSuite::RSA_SHA256, 1024, 365,
- "producer-test");
- }
- int setup() {
- int ret;
- int production_protocol;
-
- if (configuration_.secure_) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
- producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
- configuration_.rtc_, identity);
- } else {
+ /**
+ * @brief Create and setup the producer socket
+ */
+ int setup() {
+ int ret;
+ int production_protocol;
+ std::shared_ptr<transport::auth::Signer> signer =
+ std::make_shared<transport::auth::VoidSigner>();
+
if (!configuration_.rtc_) {
production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
} else {
@@ -185,210 +110,470 @@ class HIperfServer::Impl {
}
producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- }
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "Failed to set producer callback." << std::endl;
+ return ERROR_SETUP;
+ }
- if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>(
- CryptoSuite::HMAC_SHA256, configuration_.passphrase);
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
- signer);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::HASH_ALGORITHM,
+ configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_MAX_CAPACITY,
+ configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(transport::interface::PACKET_FORMAT,
+ configuration_.packet_format_) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase_.empty()) {
+ signer = std::make_shared<transport::auth::SymmetricSigner>(
+ transport::auth::CryptoSuite::HMAC_SHA256,
+ configuration_.passphrase_);
+ }
+
+ if (!configuration_.keystore_name_.empty()) {
+ signer = std::make_shared<transport::auth::AsymmetricSigner>(
+ configuration_.keystore_name_, configuration_.keystore_password_);
+ }
- if (!configuration_.keystore_name.empty()) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
- std::shared_ptr<Signer> signer = identity->getSigner();
producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
signer);
- }
- uint32_t rtc_header_size = 0;
- if (configuration_.rtc_) rtc_header_size = 12;
- producer_socket_->setSocketOption(
- GeneralTransportOptions::DATA_PACKET_SIZE,
- (uint32_t)(
- configuration_.payload_size_ + rtc_header_size +
- (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60)));
- producer_socket_->registerPrefix(configuration_.name);
- producer_socket_->connect();
-
- if (configuration_.rtc_) {
- std::cout << "Running RTC producer: the prefix length will be ignored."
- " Use /128 by default in RTC mode"
- << std::endl;
- return ERROR_SUCCESS;
- }
+ // Compute maximum payload size
+ Packet::Format format = configuration_.packet_format_;
+ if (!configuration_.manifest_max_capacity_)
+ format = Packet::toAHFormat(format);
+ payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
+ configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
+ configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
+ !configuration_.manifest_max_capacity_
+ ? signer->getSignatureFieldSize()
+ : 0);
+
+ if (configuration_.payload_size_ > payload_size_max_) {
+ getOutputStream() << "WARNING: Payload has size "
+ << configuration_.payload_size_ << ", maximum is "
+ << payload_size_max_
+ << ". Payload will be truncated to fit." << std::endl;
+ }
+
+ // Verifier for aggregated interests
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::VoidVerifier>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ verifier = std::make_unique<transport::auth::SymmetricVerifier>(
+ configuration_.aggr_interest_passphrase_);
+ }
+ ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(
+ transport::interface::RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
- if (!configuration_.virtual_producer) {
if (producer_socket_->setSocketOption(
GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ producer_socket_->registerPrefix(Prefix(flow_name_, 128));
+ producer_socket_->connect();
+ producer_socket_->start();
+
+ if (configuration_.rtc_) {
+ return ERROR_SUCCESS;
}
- if (!configuration_.live_production) {
- produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ if (!configuration_.virtual_producer_) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_SEGMENT_SIZE,
+ static_cast<uint32_t>(configuration_.payload_size_)) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production_) {
+ produceContent(*producer_socket_, configuration_.name_.makeName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
} else {
ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)bind(
+ &ProducerContext::virtualProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
}
- } else {
- ret = producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (transport::interface::ProducerContentCallback)bind(
+ &ProducerContext::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- ret = producer_socket_->setSocketOption(
+ return ERROR_SUCCESS;
+ }
+
+ int run() {
+ getOutputStream() << "started to serve consumers with name " << flow_name_
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ void stop() {
+ getOutputStream() << "stopped to serve consumers" << std::endl;
+ producer_socket_->stop();
+ }
+
+ private:
+ /**
+ * @brief Produce an existing content object. Set the name as the
+ * interest.
+ */
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & kmask()]->setName(
+ interest.getName());
+ p.produce(*content_objects_[content_objects_index_++ & kmask()]);
+ }
+
+ /**
+ * @brief Create and produce a buffer of configuration_.download_size_
+ * length.
+ */
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) const {
+ uint32_t total;
+
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
+
+ LoggerInfo() << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
+ << std::endl;
+ }
+
+ /**
+ * @brief Synchronously produce content upon reception of one interest
+ */
+ void processInterest(ProducerSocket &p, const Interest &interest) const {
+ p.setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)transport::interface::VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ LoggerInfo() << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ /**
+ * @brief Async create and produce a buffer of
+ * configuration_.download_size_ length.
+ */
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ parent_.produce_thread_.add([this, suffix, content_name, &p]() {
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ last_segment_ =
+ suffix + p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_,
+ suffix);
+ });
+ }
+
+ /**
+ * @brief Asynchronously produce content upon reception of one interest
+ */
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::cacheMiss, this,
+ std::placeholders::_1, std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either come from the received interest or will be set
+ // to the smallest suffix of a previous interest not satisfied
+ if (!unsatisfied_interests_.empty()) {
+ auto it = std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
+
+ getOutputStream() << " Received interest "
+ << interest.getName().getSuffix()
+ << ", starting production at " << suffix << end_mod_
+ << std::endl;
+ getOutputStream() << unsatisfied_interests_.size()
+ << " interests still unsatisfied" << end_mod_
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
+ }
+
+ /**
+ * @brief Register cache miss events
+ */
+ void cacheMiss([[maybe_unused]] const ProducerSocket &p,
+ const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CONTENT_PRODUCED,
- (ProducerContentCallback)bind(
- &Impl::onContentProduced, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ /**
+ * @brief When content is produced, set cache miss callback so that we can
+ * register any cache miss happening after the production.
+ */
+ void onContentProduced(ProducerSocket &p,
+ [[maybe_unused]] const std::error_code &err,
+ [[maybe_unused]] uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
- return ERROR_SUCCESS;
+ /**
+ * @brief Internal producer error. When this callback is triggered
+ * something important happened. Here we stop the program.
+ */
+ void produceError(const std::error_code &err) noexcept override {
+ getOutputStream() << "Error from producer transport: " << err.message()
+ << std::endl;
+ parent_.stop();
+ }
+
+ // Members initialized in constructor
+ std::vector<ContentObject::Ptr> content_objects_;
+
+ // Members initialized by in-class initializer
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::uint32_t last_segment_{0};
+ std::unique_ptr<ProducerSocket> producer_socket_{nullptr};
+ std::uint16_t content_objects_index_{0};
+ std::size_t payload_size_max_{0};
+ };
+
+ public:
+ explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) {
+#ifndef _WIN32
+ if (config_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ std::memset(rtc_payload_.data(), 'X', rtc_payload_.size());
+ }
+
+ ~Impl() = default;
+
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ producer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = producer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
+
+ if (ret) {
+ break;
+ }
+ }
+
+ return ret;
}
void receiveStream() {
socket_.async_receive_from(
- asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
- [this](std::error_code ec, std::size_t length) {
+ asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()),
+ remote_, [this](const std::error_code &ec, std::size_t length) {
if (ec) return;
- sendRTCContentFromStream(recv_buffer_.first, length);
+ sendRTCContentFromStream(recv_buffer_.writableData(), length);
receiveStream();
});
}
- void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+ void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- uint8_t *start = (uint8_t *)payload->writableData();
+ auto now = utils::SystemTime::nowMs().count();
+
+ auto start = rtc_payload_.data();
std::memcpy(start, &now, sizeof(uint64_t));
std::memcpy(start + sizeof(uint64_t), buff, len);
- producer_socket_->produceDatagram(flow_name_, start,
- len + sizeof(uint64_t));
+
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, len + sizeof(uint64_t));
+ }
}
- void sendRTCContentObjectCallback(std::error_code ec) {
+ void sendRTCContentObjectCallback(const std::error_code &ec) {
if (ec) return;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ auto start = rtc_payload_.data();
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ std::memcpy(start, &now, sizeof(uint64_t));
- producer_socket_->produceDatagram(
- flow_name_, payload->data(),
- payload->length() < 1400 ? payload->length() : 1400);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, config_.payload_size_);
+ }
}
- void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) {
if (ec) return;
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
-
- uint32_t packet_len =
- configuration_.trace_[configuration_.trace_index_].size;
+ std::size_t packet_len = config_.trace_[config_.trace_index_].size;
// this is used to compute the data packet delay
// used only for performance evaluation
// it requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ auto start = rtc_payload_.data();
+ std::memcpy(start, &now, sizeof(uint64_t));
- if (packet_len > payload->length()) packet_len = payload->length();
- if (packet_len > 1400) packet_len = 1400;
+ if (packet_len > config_.payload_size_) {
+ packet_len = config_.payload_size_;
+ }
- producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, packet_len);
+ }
- uint32_t next_index = configuration_.trace_index_ + 1;
+ uint32_t next_index = config_.trace_index_ + 1;
uint64_t schedule_next;
- if (next_index < configuration_.trace_.size()) {
- schedule_next =
- configuration_.trace_[next_index].timestamp -
- configuration_.trace_[configuration_.trace_index_].timestamp;
+ if (next_index < config_.trace_.size()) {
+ schedule_next = config_.trace_[next_index].timestamp -
+ config_.trace_[config_.trace_index_].timestamp;
} else {
// here we need to loop, schedule in a random time
schedule_next = 1000;
}
- configuration_.trace_index_ =
- (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size();
rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
}
+ int parseTraceFile() {
+ std::ifstream trace(config_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ config_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
#ifndef _WIN32
void handleInput(const std::error_code &error, std::size_t length) {
if (error) {
- producer_socket_->stop();
- io_service_.stop();
+ stop();
}
if (rtc_running_) {
- std::cout << "stop real time content production" << std::endl;
+ LoggerInfo() << "stop real time content production" << std::endl;
rtc_running_ = false;
rtc_timer_.cancel();
} else {
- std::cout << "start real time content production" << std::endl;
+ LoggerInfo() << "start real time content production" << std::endl;
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
}
@@ -401,46 +586,33 @@ class HIperfServer::Impl {
}
#endif
- int parseTraceFile() {
- std::ifstream trace(configuration_.trace_file_);
- if (trace.fail()) {
- return -1;
- }
- std::string line;
- while (std::getline(trace, line)) {
- std::istringstream iss(line);
- hiperf::packet_t packet;
- iss >> packet.timestamp >> packet.size;
- configuration_.trace_.push_back(packet);
+ void stop() {
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.stop();
}
- return 0;
+
+ io_service_.stop();
}
int run() {
- std::cerr << "Starting to serve consumers" << std::endl;
-
signals_.add(SIGINT);
- signals_.async_wait([this](const std::error_code &, const int &) {
- std::cout << "STOPPING!!" << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- });
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { stop(); });
- if (configuration_.rtc_) {
-#ifndef _WIN32
- if (configuration_.interactive_) {
+ if (config_.rtc_) {
+ if (config_.interactive_) {
asio::async_read_until(
input_, input_buffer_, '\n',
std::bind(&Impl::handleInput, this, std::placeholders::_1,
std::placeholders::_2));
- } else if (configuration_.trace_based_) {
- std::cout << "trace-based mode enabled" << std::endl;
- if (configuration_.trace_file_ == nullptr) {
- std::cout << "cannot find the trace file" << std::endl;
+ } else if (config_.trace_based_) {
+ LoggerInfo() << "trace-based mode enabled" << std::endl;
+ if (config_.trace_file_ == nullptr) {
+ LoggerErr() << "cannot find the trace file" << std::endl;
return ERROR_SETUP;
}
if (parseTraceFile() < 0) {
- std::cout << "cannot parse the trace file" << std::endl;
+ LoggerErr() << "cannot parse the trace file" << std::endl;
return ERROR_SETUP;
}
rtc_running_ = true;
@@ -448,31 +620,26 @@ class HIperfServer::Impl {
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
- } else if (configuration_.input_stream_mode_) {
+ } else if (config_.input_stream_mode_) {
rtc_running_ = true;
- // crate socket
+ // create socket
remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ asio::ip::address::from_string("127.0.0.1"), config_.port_);
socket_.open(asio::ip::udp::v4());
socket_.bind(remote_);
- recv_buffer_.first = (uint8_t *)malloc(1500);
- recv_buffer_.second = 1500;
receiveStream();
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
this, std::placeholders::_1));
}
-#else
- rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
- rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
- std::placeholders::_1));
-#endif
+ }
+
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.run();
}
io_service_.run();
@@ -480,34 +647,31 @@ class HIperfServer::Impl {
return ERROR_SUCCESS;
}
+ ServerConfiguration &getConfig() { return config_; }
+
private:
- hiperf::ServerConfiguration configuration_;
+ // Variables initialized by the constructor.
+ ServerConfiguration config_;
+
+ // Variable initialized in the in-class initializer list.
asio::io_service io_service_;
- asio::signal_set signals_;
- asio::steady_timer rtc_timer_;
- std::vector<uint32_t> unsatisfied_interests_;
- std::vector<std::shared_ptr<ContentObject>> content_objects_;
- std::uint16_t content_objects_index_;
- std::uint16_t mask_;
- std::uint32_t last_segment_;
- std::uint32_t *ptr_last_segment_;
- std::unique_ptr<ProducerSocket> producer_socket_;
-#ifndef _WIN32
- asio::posix::stream_descriptor input_;
+ asio::signal_set signals_{io_service_};
+ asio::steady_timer rtc_timer_{io_service_};
+ asio::posix::stream_descriptor input_{io_service_};
+ asio::ip::udp::socket socket_{io_service_};
+ std::vector<ProducerContext> producer_contexts_;
+ ::utils::EventThread produce_thread_;
asio::streambuf input_buffer_;
- bool rtc_running_;
- Name flow_name_;
- asio::ip::udp::socket socket_;
+ bool rtc_running_{false};
asio::ip::udp::endpoint remote_;
- std::pair<uint8_t *, std::size_t> recv_buffer_;
-#endif
+ utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU};
+ std::array<uint8_t, HIPERF_MTU> rtc_payload_;
};
-HIperfServer::HIperfServer(const ServerConfiguration &conf) {
- impl_ = new Impl(conf);
-}
+HIperfServer::HIperfServer(const ServerConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfServer::~HIperfServer() { delete impl_; }
+HIperfServer::~HIperfServer() = default;
int HIperfServer::setup() { return impl_->setup(); }
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
index 05407a807..d7420da48 100644
--- a/apps/hiperf/src/server.h
+++ b/apps/hiperf/src/server.h
@@ -21,14 +21,15 @@ namespace hiperf {
class HIperfServer {
public:
- HIperfServer(const ServerConfiguration &conf);
+ explicit HIperfServer(const ServerConfiguration &conf);
+
~HIperfServer();
int setup();
void run();
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
} // namespace hiperf \ No newline at end of file