diff options
Diffstat (limited to 'apps/hiperf')
-rw-r--r-- | apps/hiperf/CMakeLists.txt | 45 | ||||
-rw-r--r-- | apps/hiperf/src/client.cc | 1448 | ||||
-rw-r--r-- | apps/hiperf/src/client.h | 14 | ||||
-rw-r--r-- | apps/hiperf/src/common.h | 315 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_config.h | 97 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.cc | 676 | ||||
-rw-r--r-- | apps/hiperf/src/forwarder_interface.h | 131 | ||||
-rw-r--r-- | apps/hiperf/src/main.cc | 435 | ||||
-rw-r--r-- | apps/hiperf/src/server.cc | 802 | ||||
-rw-r--r-- | apps/hiperf/src/server.h | 5 |
10 files changed, 1711 insertions, 2257 deletions
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt index 564525e67..5a0dc3c06 100644 --- a/apps/hiperf/CMakeLists.txt +++ b/apps/hiperf/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -12,33 +12,50 @@ # limitations under the License. if (NOT DISABLE_EXECUTABLES) +############################################################## +# Source files +############################################################## list(APPEND HIPERF_SRC ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc ${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc - ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc ) + +############################################################## +# Libraries +############################################################## list (APPEND HIPERF_LIBRARIES - ${LIBTRANSPORT_LIBRARIES} - ${LIBHICNCTRL_LIBRARIES} - ${LIBHICN_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} - ${LIBCONFIG_CPP_LIBRARIES} - ${WSOCK32_LIBRARY} - ${WS2_32_LIBRARY} + PRIVATE ${LIBTRANSPORT_LIBRARIES} + PRIVATE ${LIBHICNCTRL_LIBRARIES} + PRIVATE ${LIBHICN_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + PRIVATE ${LIBCONFIG_CPP_LIBRARIES} + PRIVATE ${WSOCK32_LIBRARY} + PRIVATE ${WS2_32_LIBRARY} + ) + +############################################################## +# Compiler options +############################################################## + set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} ) + +############################################################## +# Build hiperf +############################################################## build_executable(hiperf SOURCES ${HIPERF_SRC} LINK_LIBRARIES ${HIPERF_LIBRARIES} INCLUDE_DIRS - ${CMAKE_CURRENT_SOURCE_DIR}/src - ${LIBTRANSPORT_INCLUDE_DIRS} - ${LIBHICNCTRL_INCLUDE_DIRS} - ${LIBCONFIG_CPP_INCLUDE_DIRS} - DEPENDS ${DEPENDENCIES} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS} + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) endif()
\ No newline at end of file diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc index 820ebf0ce..1ce5b4c55 100644 --- a/apps/hiperf/src/client.cc +++ b/apps/hiperf/src/client.cc @@ -14,8 +14,7 @@ */ #include <client.h> -#include <forwarder_config.h> -#include <forwarder_interface.h> +#include <hicn/transport/portability/endianess.h> #include <libconfig.h++> @@ -27,874 +26,857 @@ namespace hiperf { class RTCCallback; class Callback; +using transport::auth::CryptoHashType; +using transport::core::Packet; +using transport::core::Prefix; +using transport::interface::ConsumerCallbacksOptions; +using transport::interface::ConsumerSocket; +using transport::interface::GeneralTransportOptions; +using transport::interface::ProducerSocket; +using transport::interface::ProductionProtocolAlgorithms; +using transport::interface::RaaqmTransportOptions; +using transport::interface::RtcTransportOptions; +using transport::interface::RtcTransportRecoveryStrategies; +using transport::interface::StrategyCallback; +using transport::interface::TransportStatistics; + /** * Hiperf client class: configure and setup an hicn consumer following the * ClientConfiguration. */ -class HIperfClient::Impl -#ifdef FORWARDER_INTERFACE - : ForwarderInterface::ICallback -#endif -{ - typedef std::chrono::time_point<std::chrono::steady_clock> Time; - typedef std::chrono::microseconds TimeDuration; - +class HIperfClient::Impl { friend class Callback; friend class RTCCallback; - struct nack_packet_t { - uint64_t timestamp; - uint32_t prod_rate; - uint32_t prod_seg; - - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } - - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } - - inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } - inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } - }; - - public: - Impl(const hiperf::ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - old_interest_tx_value_(0), - old_fec_interest_tx_value_(0), - old_fec_data_rx_value_(0), - old_lost_data_value_(0), - old_bytes_recovered_value_(0), - old_definitely_lost_data_value_(0), - old_retx_value_(0), - old_sent_int_value_(0), - old_received_nacks_value_(0), - old_fec_pkt_(0), - avg_data_delay_(0), - delay_sample_(0), - received_bytes_(0), - received_data_pkt_(0), - data_delays_(""), - signals_(io_service_), - rtc_callback_(*this), - callback_(*this), - socket_(io_service_), - done_(false), - switch_threshold_(~0) -#ifdef FORWARDER_INTERFACE - , - forwarder_interface_(io_service_, this) -#endif - { + static inline constexpr uint16_t klog2_header_counter() { return 4; } + static inline constexpr uint16_t kheader_counter_mask() { + return (1 << klog2_header_counter()) - 1; } - ~Impl() {} + class ConsumerContext + : public Base<ConsumerContext, ClientConfiguration, Impl>, + private ConsumerSocket::ReadCallback { + static inline const std::size_t kmtu = HIPERF_MTU; - void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) {} + public: + using ConfType = ClientConfiguration; + using ParentType = typename HIperfClient::Impl; + static inline auto getContextType() -> std::string { + return "ConsumerContext"; + } + + ConsumerContext(Impl &client, int consumer_identifier) + : Base(client, client.io_service_, consumer_identifier), + receive_buffer_( + utils::MemBuf::create(client.config_.receive_buffer_size_)), + socket_(client.io_service_), + payload_size_max_(PayloadSize(client.config_.packet_format_) + .getPayloadSizeMax(RTC_HEADER_SIZE)), + nb_iterations_(client.config_.nb_iterations_) {} + + ConsumerContext(ConsumerContext &&other) noexcept + : Base(std::move(other)), + receive_buffer_(std::move(other.receive_buffer_)), + socket_(std::move(other.socket_)), + payload_size_max_(other.payload_size_max_), + remote_(std::move(other.remote_)), + nb_iterations_(other.nb_iterations_), + saved_stats_(std::move(other.saved_stats_)), + header_counter_(other.header_counter_), + first_(other.first_), + consumer_socket_(std::move(other.consumer_socket_)), + producer_socket_(std::move(other.producer_socket_)) {} + + ~ConsumerContext() override = default; + + /*************************************************************** + * ConsumerSocket::ReadCallback implementation + ***************************************************************/ - void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} + bool isBufferMovable() noexcept override { return false; } - void addFace(const std::string &local_address, uint16_t local_port, - const std::string &remote_address, uint16_t remote_port, - std::string interface); + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = receive_buffer_->writableData(); - void handleTimerExpiration(ConsumerSocket &c, - const TransportStatistics &stats) { - const char separator = ' '; - const int width = 15; + if (configuration_.rtc_) { + *max_length = kmtu; + } else { + *max_length = configuration_.receive_buffer_size_; + } + } - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override { + // Nothing to do here + auto ret = std::move(buffer); + } - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; + void readDataAvailable(std::size_t length) noexcept override { + if (configuration_.rtc_) { + saved_stats_.received_bytes_ += length; + saved_stats_.received_data_pkt_++; - std::stringstream bytes_transferred; - bytes_transferred << std::fixed << std::setprecision(3) - << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MB]"; + // collecting delay stats. Just for performance testing + auto senderTimeStamp = + *reinterpret_cast<uint64_t *>(receive_buffer_->writableData()); - std::stringstream bandwidth; - bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / - (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; + auto now = utils::SystemTime::nowMs().count(); + auto new_delay = double(now - senderTimeStamp); - std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; + if (senderTimeStamp > now) + new_delay = -1 * double(senderTimeStamp - now); - std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + saved_stats_.delay_sample_++; + saved_stats_.avg_data_delay_ = + saved_stats_.avg_data_delay_ + + (double(new_delay) - saved_stats_.avg_data_delay_) / + saved_stats_.delay_sample_; - if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - - std::stringstream lost_data; - lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream bytes_recovered_data; - bytes_recovered_data << stats.getBytesRecoveredData() - - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream definitely_lost_data; - definitely_lost_data << stats.getDefinitelyLostData() - - old_definitely_lost_data_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; - - std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; - - std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; - - std::stringstream loss_rate; - loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; - - std::stringstream retx_sent; - retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream interest_sent; - interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream nacks; - nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream fec_pkt; - fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream queuing_delay; - queuing_delay << stats.getQueuingDelay() << std::setfill(separator) - << "[ms]"; - -#ifdef FORWARDER_INTERFACE - if (!done_ && stats.getQueuingDelay() >= switch_threshold_ && - total_duration_milliseconds_ > 1000) { - std::cout << "Switching due to queuing delay" << std::endl; - forwarder_interface_.createFaceAndRoutes(backup_routes_); - forwarder_interface_.deleteFaceAndRoutes(main_routes_); - std::swap(backup_routes_, main_routes_); - done_ = true; - } -#endif - - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "QueuingDelay"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "DefinitelyLost"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay"; - std::cout << std::left << std::setw(width) << "FecPkt" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << queuing_delay.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << definitely_lost_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::left << std::setw(width) << fec_pkt.str(); - std::cout << std::endl; - - if (configuration_.test_mode_) { - if (data_delays_.size() > 0) data_delays_.pop_back(); - - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]" - << std::endl; - } - - // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); - } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; - } - total_duration_milliseconds_ += (uint32_t)exact_duration.count(); - old_bytes_value_ = stats.getBytesRecv(); - old_lost_data_value_ = stats.getLostData(); - old_bytes_recovered_value_ = stats.getBytesRecoveredData(); - old_definitely_lost_data_value_ = stats.getDefinitelyLostData(); - old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); - old_fec_data_rx_value_ = stats.getBytesFecRecv(); - old_retx_value_ = stats.getRetxCount(); - old_sent_int_value_ = stats.getInterestTx(); - old_received_nacks_value_ = stats.getReceivedNacks(); - old_fec_pkt_ = stats.getReceivedFEC(); - delay_sample_ = 0; - avg_data_delay_ = 0; - received_bytes_ = 0; - received_data_pkt_ = 0; - data_delays_ = ""; - - t_stats_ = utils::SteadyClock::now(); - } + if (configuration_.test_mode_) { + saved_stats_.data_delays_ += std::to_string(int(new_delay)); + saved_stats_.data_delays_ += ","; + } - bool parseConfig(const char *conf_file) { - using namespace libconfig; - Config cfg; - - try { - cfg.readFile(conf_file); - } catch (const FileIOException &fioex) { - std::cerr << "I/O error while reading file." << std::endl; - return false; - } catch (const ParseException &pex) { - std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() - << " - " << pex.getError() << std::endl; - return false; + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + producer_socket_->produceDatagram( + configuration_.relay_name_.makeName(), + receive_buffer_->writableData(), + length < payload_size_max_ ? length : payload_size_max_); + } + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + const uint8_t *start = receive_buffer_->writableData(); + start += sizeof(uint64_t); + std::size_t pkt_len = length - sizeof(uint64_t); + socket_.send_to(asio::buffer(start, pkt_len), remote_); + } + } } - Setting &config = cfg.getRoot(); + size_t maxBufferSize() const override { + return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_; + } - if (config.exists("switch_threshold")) { - unsigned threshold; - config.lookupValue("switch_threshold", threshold); - switch_threshold_ = threshold; + void readError(const std::error_code &ec) noexcept override { + getOutputStream() << "Error " << ec.message() + << " while reading from socket" << std::endl; + parent_.io_service_.stop(); } - // listeners - if (config.exists("listeners")) { - // get path where looking for modules - const Setting &listeners = config.lookup("listeners"); - auto count = listeners.getLength(); + void readSuccess(std::size_t total_size) noexcept override { + if (configuration_.rtc_) { + getOutputStream() << "Data successfully read" << std::endl; + } else { + auto t2 = utils::SteadyTime::now(); + auto dt = + utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2); + auto usec = dt.count(); - for (int i = 0; i < count; i++) { - const Setting &listener = listeners[i]; - ListenerConfig list; - unsigned port; - std::string interface; + getOutputStream() << "Content retrieved. Size: " << total_size + << " [Bytes]" << std::endl; - list.name = listener.getName(); - listener.lookupValue("local_address", list.address); - listener.lookupValue("local_port", port); - listener.lookupValue("interface", list.interface); - list.port = (uint16_t)(port); + getOutputStream() << "Elapsed Time: " << usec / 1000000.0 + << " seconds -- " + << double(total_size * 8) * 1.0 / double(usec) * 1.0 + << " [Mbps]" << std::endl; - std::cout << "Adding listener " << list.name << ", ( " << list.address - << ":" << list.port << ")" << std::endl; - config_.addListener(std::move(list)); + parent_.io_service_.stop(); } } - // connectors - if (config.exists("connectors")) { - // get path where looking for modules - const Setting &connectors = config.lookup("connectors"); - auto count = connectors.getLength(); - - for (int i = 0; i < count; i++) { - const Setting &connector = connectors[i]; - ConnectorConfig conn; - - conn.name = connector.getName(); - unsigned port = 0; - - if (!connector.lookupValue("local_address", conn.local_address)) { - conn.local_address = ""; - } + /*************************************************************** + * End of ConsumerSocket::ReadCallback implementation + ***************************************************************/ - if (!connector.lookupValue("local_port", port)) { - port = 0; - } - - conn.local_port = (uint16_t)(port); + private: + struct SavedStatistics { + utils::SteadyTime::TimePoint t_stats_{}; + utils::SteadyTime::TimePoint t_download_{}; + uint32_t total_duration_milliseconds_{0}; + uint64_t old_bytes_value_{0}; + uint64_t old_interest_tx_value_{0}; + uint64_t old_fec_interest_tx_value_{0}; + uint64_t old_fec_data_rx_value_{0}; + uint64_t old_lost_data_value_{0}; + uint64_t old_bytes_recovered_value_{0}; + uint64_t old_definitely_lost_data_value_{0}; + uint64_t old_retx_value_{0}; + uint64_t old_sent_int_value_{0}; + uint64_t old_received_nacks_value_{0}; + uint32_t old_fec_pkt_{0}; + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_{0}; + uint32_t delay_sample_{0}; + uint32_t received_bytes_{0}; + uint32_t received_data_pkt_{0}; + uint32_t auth_alerts_{0}; + std::string data_delays_{""}; + }; + + /*************************************************************** + * Transport callbacks + ***************************************************************/ + + void checkReceivedRtcContent( + [[maybe_unused]] const ConsumerSocket &c, + [[maybe_unused]] const transport::core::ContentObject &content_object) + const { + // Nothing to do here + } + + void processLeavingInterest( + const ConsumerSocket & /*c*/, + const transport::core::Interest & /*interest*/) const { + // Nothing to do here + } + + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix /*suffix*/, + transport::auth::VerificationPolicy /*policy*/) { + saved_stats_.auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + + void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c, + const TransportStatistics &stats) { + const char separator = ' '; + const int width = 18; + + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = + utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2); - if (!connector.lookupValue("remote_address", conn.remote_address)) { - std::cerr - << "Error in configuration file: remote_address is a mandatory " - "field of Connectors." - << std::endl; - return false; + std::stringstream interval_ms; + interval_ms << saved_stats_.total_duration_milliseconds_ << "-" + << saved_stats_.total_duration_milliseconds_ + + exact_duration.count(); + + std::stringstream bytes_transferred; + bytes_transferred << std::fixed << std::setprecision(3) + << double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) / + 1000000.0 + << std::setfill(separator); + + std::stringstream bandwidth; + bandwidth << (double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) * + 8) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream window; + window << stats.getAverageWindowSize() << std::setfill(separator); + + std::stringstream avg_rtt; + avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt() + << std::setfill(separator); + + if (configuration_.rtc_) { + std::stringstream lost_data; + lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_ + << std::setfill(separator); + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + saved_stats_.old_bytes_recovered_value_ + << std::setfill(separator); + + std::stringstream definitely_lost_data; + definitely_lost_data << stats.getDefinitelyLostData() - + saved_stats_.old_definitely_lost_data_value_ + << std::setfill(separator); + + std::stringstream data_delay; + data_delay << std::fixed << std::setprecision(3) + << saved_stats_.avg_data_delay_ << std::setfill(separator); + + std::stringstream received_data_pkt; + received_data_pkt << saved_stats_.received_data_pkt_ + << std::setfill(separator); + + std::stringstream goodput; + goodput << std::fixed << std::setprecision(3) + << (saved_stats_.received_bytes_ * 8.0) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator); + + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_ + << std::setfill(separator); + + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - + saved_stats_.old_sent_int_value_ + << std::setfill(separator); + + std::stringstream nacks; + nacks << stats.getReceivedNacks() - + saved_stats_.old_received_nacks_value_ + << std::setfill(separator); + + std::stringstream fec_pkt; + fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_ + << std::setfill(separator); + + std::stringstream queuing_delay; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); + + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator); + + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() + << std::right << std::setw(width) << "RecvData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() + << std::right << std::setw(width) << "Goodput[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "LossRate[%]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "InterestSent"; + getOutputStream() + << std::right << std::setw(width) << "ReceivedNacks"; + getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]"; + getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]"; + getOutputStream() + << std::right << std::setw(width) << "QueuingDelay[ms]"; + getOutputStream() + << std::right << std::setw(width) << "LostData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "RecoveredData"; + getOutputStream() + << std::right << std::setw(width) << "DefinitelyLost"; + getOutputStream() << std::right << std::setw(width) << "State"; + getOutputStream() + << std::right << std::setw(width) << "DataDelay[ms]"; + getOutputStream() << std::right << std::setw(width) << "FecPkt"; + getOutputStream() << std::right << std::setw(width) << "Congestion"; + getOutputStream() + << std::right << std::setw(width) << "ResidualLosses"; + getOutputStream() << std::right << std::setw(width) << "QualityScore"; + getOutputStream() << std::right << std::setw(width) << "Alerts"; + getOutputStream() + << std::right << std::setw(width) << "AuthAlerts" << std::endl; + + first_ = false; } - if (!connector.lookupValue("remote_port", port)) { - std::cerr << "Error in configuration file: remote_port is a " - "mandatory field of Connectors." - << std::endl; - return false; + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << received_data_pkt.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) << goodput.str(); + getOutputStream() << std::right << std::setw(width) << loss_rate.str(); + getOutputStream() << std::right << std::setw(width) << retx_sent.str(); + getOutputStream() << std::right << std::setw(width) + << interest_sent.str(); + getOutputStream() << std::right << std::setw(width) << nacks.str(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str(); + getOutputStream() << std::right << std::setw(width) + << queuing_delay.str(); + getOutputStream() << std::right << std::setw(width) << lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_recovered_data.str(); + getOutputStream() << std::right << std::setw(width) + << definitely_lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getCCStatus(); + getOutputStream() << std::right << std::setw(width) << data_delay.str(); + getOutputStream() << std::right << std::setw(width) << fec_pkt.str(); + getOutputStream() << std::right << std::setw(width) + << stats.isCongested(); + getOutputStream() << std::right << std::setw(width) + << residual_losses.str(); + getOutputStream() << std::right << std::setw(width) + << quality_score.str(); + getOutputStream() << std::right << std::setw(width) << alerts.str(); + getOutputStream() << std::right << std::setw(width) << auth_alerts.str() + << std::endl; + + if (configuration_.test_mode_) { + if (saved_stats_.data_delays_.size() > 0) + saved_stats_.data_delays_.pop_back(); + + auto now = utils::SteadyTime::nowMs(); + getOutputStream() << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << saved_stats_.data_delays_ + << "]" << std::endl; } - - if (!connector.lookupValue("interface", conn.interface)) { - std::cerr << "Error in configuration file: interface is a " - "mandatory field of Connectors." - << std::endl; - return false; + } else { + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() << std::right << std::setw(width) << "Transfer[MB]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]"; + getOutputStream() + << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl; + + first_ = false; } - conn.remote_port = (uint16_t)(port); + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_transferred.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getRetxCount(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str() + << std::endl; + } - std::cout << "Adding connector " << conn.name << ", (" - << conn.local_address << ":" << conn.local_port << " " - << conn.remote_address << ":" << conn.remote_port << ")" - << std::endl; - config_.addConnector(conn.name, std::move(conn)); + saved_stats_.total_duration_milliseconds_ += + (uint32_t)exact_duration.count(); + saved_stats_.old_bytes_value_ = stats.getBytesRecv(); + saved_stats_.old_lost_data_value_ = stats.getLostData(); + saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + saved_stats_.old_definitely_lost_data_value_ = + stats.getDefinitelyLostData(); + saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv(); + saved_stats_.old_retx_value_ = stats.getRetxCount(); + saved_stats_.old_sent_int_value_ = stats.getInterestTx(); + saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks(); + saved_stats_.old_fec_pkt_ = stats.getReceivedFEC(); + saved_stats_.delay_sample_ = 0; + saved_stats_.avg_data_delay_ = 0; + saved_stats_.received_bytes_ = 0; + saved_stats_.received_data_pkt_ = 0; + saved_stats_.data_delays_ = ""; + saved_stats_.t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & kheader_counter_mask(); + + if (--nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + parent_.io_service_.stop(); } } - // Routes - if (config.exists("routes")) { - const Setting &routes = config.lookup("routes"); - auto count = routes.getLength(); + /*************************************************************** + * Setup functions + ***************************************************************/ - for (int i = 0; i < count; i++) { - const Setting &route = routes[i]; - RouteConfig r; - unsigned weight; + int setupRTCSocket() { + int ret = ERROR_SUCCESS; - r.name = route.getName(); - route.lookupValue("prefix", r.prefix); - route.lookupValue("weight", weight); - route.lookupValue("main_connector", r.main_connector); - route.lookupValue("backup_connector", r.backup_connector); - r.weight = (uint16_t)(weight); + configuration_.transport_protocol_ = transport::interface::RTC; - std::cout << "Adding route " << r.name << " " << r.prefix << " (" - << r.main_connector << " " << r.backup_connector << " " - << r.weight << ")" << std::endl; - config_.addRoute(std::move(r)); + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + producer_socket_ = + std::make_unique<ProducerSocket>(production_protocol); + producer_socket_->registerPrefix(configuration_.relay_name_); + producer_socket_->connect(); + producer_socket_->start(); } - } - std::cout << "Ok" << std::endl; - - return true; - } - - bool splitRoute(std::string route, std::string &prefix, - uint8_t &prefix_length) { - std::string delimiter = "/"; - - size_t pos = 0; - if ((pos = route.find(delimiter)) != std::string::npos) { - prefix = route.substr(0, pos); - route.erase(0, pos + delimiter.length()); - } else { - return false; - } + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + } - prefix_length = std::stoul(route.substr(0)); - return true; - } + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); -#ifdef FORWARDER_INTERFACE - void onHicnServiceReady() override { - std::cout << "Successfully connected to local forwarder!" << std::endl; + RtcTransportRecoveryStrategies recovery_strategy = + RtcTransportRecoveryStrategies::RTX_ONLY; + switch (configuration_.recovery_strategy_) { + case 1: + recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF; + break; + case 2: + recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY; + break; + case 3: + recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY; + break; + case 4: + recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED; + break; + case 5: + recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE; + break; + case 6: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH; + break; + case 7: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION; + break; + case 8: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES; + break; + case 9: + recovery_strategy = + RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES; + break; + case 10: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH; + break; + case 11: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION; + break; + default: + break; + } - std::cout << "Setting up listeners" << std::endl; - const char *config = getenv("FORWARDER_CONFIG"); + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + static_cast<uint32_t>(recovery_strategy)); - if (config) { - if (!parseConfig(config)) { - return; + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - // Create faces and route using first face in the list. - auto &routes = config_.getRoutes(); - auto &connectors = config_.getConnectors(); + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); - if (routes.size() == 0 || connectors.size() == 0) { - std::cerr << "Nothing to configure" << std::endl; - return; + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - for (auto &route : routes) { - auto the_connector_it = connectors.find(route.main_connector); - if (the_connector_it == connectors.end()) { - std::cerr << "No valid main connector found for route " << route.name - << std::endl; - continue; - } + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::CONTENT_SHARING_MODE, + configuration_.content_sharing_mode_); - auto &the_connector = the_connector_it->second; - auto route_info = std::make_shared<ForwarderInterface::RouteInfo>(); - route_info->family = AF_INET; - route_info->local_addr = the_connector.local_address; - route_info->local_port = the_connector.local_port; - route_info->remote_addr = the_connector.remote_address; - route_info->remote_port = the_connector.remote_port; - route_info->interface = the_connector.interface; - route_info->name = the_connector.name; - - std::string prefix; - uint8_t prefix_length; - auto ret = splitRoute(route.prefix, prefix, prefix_length); - - if (!ret) { - std::cerr << "Error parsing route" << std::endl; - return; - } + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - route_info->route_addr = prefix; - route_info->route_len = prefix_length; - - main_routes_.emplace_back(route_info); - - if (!route.backup_connector.empty()) { - // Add also the backup route - auto the_backup_connector_it = - connectors.find(route.backup_connector); - if (the_backup_connector_it == connectors.end()) { - std::cerr << "No valid backup connector found for route " - << route.name << std::endl; - continue; - } - - auto &the_backup_connector = the_backup_connector_it->second; - auto backup_route_info = - std::make_shared<ForwarderInterface::RouteInfo>(); - backup_route_info->family = AF_INET; - backup_route_info->local_addr = the_backup_connector.local_address; - backup_route_info->local_port = the_backup_connector.local_port; - backup_route_info->remote_addr = the_backup_connector.remote_address; - backup_route_info->remote_port = the_backup_connector.remote_port; - backup_route_info->interface = the_backup_connector.interface; - backup_route_info->name = the_backup_connector.name; - - std::string prefix; - uint8_t prefix_length; - auto ret = splitRoute(route.prefix, prefix, prefix_length); - - if (!ret) { - std::cerr << "Error parsing route" << std::endl; - return; - } - - backup_route_info->route_addr = prefix; - backup_route_info->route_len = prefix_length; - - backup_routes_.emplace_back(backup_route_info); - } + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (transport::interface::ConsumerContentObjectCallback)std::bind( + &Impl::ConsumerContext::checkReceivedRtcContent, this, + std::placeholders::_1, std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - // Create main routes - std::cout << "Creating main routes" << std::endl; - forwarder_interface_.createFaceAndRoutes(main_routes_); - } - } + std::shared_ptr<TransportStatistics> transport_stats; + ret = consumer_socket_->getSocketOption( + transport::interface::OtherOptions::STATISTICS, + (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); - void onRouteConfigured( - std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { - std::cout << "Routes successfully configured!" << std::endl; - } -#endif + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - int setup() { - int ret; - - if (configuration_.rtc_) { - configuration_.transport_protocol_ = RTC; - } else if (configuration_.window < 0) { - configuration_.transport_protocol_ = RAAQM; - } else { - configuration_.transport_protocol_ = CBR; + return ERROR_SUCCESS; } - if (configuration_.relay_ && configuration_.rtc_) { - int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; - producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); - producer_socket_->registerPrefix(configuration_.relay_name_); - producer_socket_->connect(); - } + int setupRAAQMSocket() { + int ret = ERROR_SUCCESS; - if (configuration_.output_stream_mode_ && configuration_.rtc_) { - remote_ = asio::ip::udp::endpoint( - asio::ip::address::from_string("127.0.0.1"), configuration_.port_); - socket_.open(asio::ip::udp::v4()); - } + configuration_.transport_protocol_ = transport::interface::RAAQM; - if (configuration_.secure_) { - consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>( - RAAQM, configuration_.transport_protocol_); - if (configuration_.producer_prefix_.getPrefixLength() == 0) { - std::cerr << "ERROR -- Missing producer prefix on which perform the " - "handshake." - << std::endl; - } else { - P2PSecureConsumerSocket &secure_consumer_socket = - *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get())); - secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); - } - } else { consumer_socket_ = std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); - } - consumer_socket_->setSocketOption( - GeneralTransportOptions::INTEREST_LIFETIME, - configuration_.interest_lifetime_); - -#if defined(DEBUG) && defined(__linux__) - std::shared_ptr<transport::BasePortal> portal; - consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - signals_ = - std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1); - signals_->async_wait([this](const std::error_code &, const int &) { - std::cout << "Signal SIGUSR1!" << std::endl; - mtrace(); - }); -#endif - - if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, - configuration_.window) == - SOCKET_OPTION_NOT_SET) { - std::cerr << "ERROR -- Impossible to set the size of the window." - << std::endl; - return ERROR_SETUP; - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.beta != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - configuration_.beta) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + if (configuration_.beta_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::BETA_VALUE, configuration_.beta_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - } - if (configuration_.transport_protocol_ == RAAQM && - configuration_.drop_factor != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - configuration_.drop_factor) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + if (configuration_.drop_factor_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - } - if (!configuration_.producer_certificate.empty()) { - std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>( - configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; + return ERROR_SUCCESS; } - if (!configuration_.passphrase.empty()) { - std::shared_ptr<Verifier> verifier = - std::make_shared<SymmetricVerifier>(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; - } + int setupCBRSocket() { + configuration_.transport_protocol_ = transport::interface::CBR; - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this, - std::placeholders::_1, - std::placeholders::_2)); + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + return ERROR_SUCCESS; } - if (!configuration_.rtc_) { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &callback_); - } else { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); - } + public: + int setup() { + int ret; + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::VoidVerifier>(); + + if (configuration_.rtc_) { + ret = setupRTCSocket(); + } else if (configuration_.window_ < 0) { + ret = setupRAAQMSocket(); + } else { + ret = setupCBRSocket(); + } - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } + if (ret != ERROR_SUCCESS) { + return ret; + } - if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - (ConsumerContentObjectCallback)std::bind( - &Impl::checkReceivedRtcContent, this, std::placeholders::_1, - std::placeholders::_2)); + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - } - - if (configuration_.rtc_) { - std::shared_ptr<TransportStatistics> transport_stats; - consumer_socket_->getSocketOption( - OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); - transport_stats->setAlpha(0.0); - } - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::STATS_SUMMARY, - (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this, - std::placeholders::_1, - std::placeholders::_2)); + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + configuration_.manifest_factor_relevant_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + configuration_.manifest_factor_alert_); - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::STATS_INTERVAL, - configuration_.report_interval_milliseconds_) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - consumer_socket_->connect(); + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; + return ERROR_SETUP; + } - return ERROR_SUCCESS; - } + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - int run() { - std::cout << "Starting download of " << configuration_.name << std::endl; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - signals_.add(SIGINT); - signals_.async_wait( - [this](const std::error_code &, const int &) { io_service_.stop(); }); + ret = consumer_socket_->setSocketOption( + transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() + << "ERROR -- Impossible to set the size of the window." + << std::endl; + return ERROR_SETUP; + } - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); + if (!configuration_.producer_certificate_.empty()) { + verifier = std::make_shared<transport::auth::AsymmetricVerifier>( + configuration_.producer_certificate_); + } - consumer_socket_->stop(); + if (!configuration_.passphrase_.empty()) { + verifier = std::make_shared<transport::auth::SymmetricVerifier>( + configuration_.passphrase_); + } - return ERROR_SUCCESS; - } + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); - private: - class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - public: - RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); - } + // Signer for aggregatd interests + std::shared_ptr<transport::auth::Signer> signer = + std::make_shared<transport::auth::VoidSigner>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + signer = std::make_shared<transport::auth::SymmetricSigner>( + transport::auth::CryptoSuite::HMAC_SHA256, + configuration_.aggr_interest_passphrase_); + } + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; - bool isBufferMovable() noexcept override { return false; } + if (configuration_.aggregated_interests_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_INTERESTS, true); - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = mtu; - } + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + } - void readDataAvailable(std::size_t length) noexcept override { - client_.received_bytes_ += length; - client_.received_data_pkt_++; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (transport::interface::ConsumerInterestCallback)std::bind( + &ConsumerContext::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); - // collecting delay stats. Just for performance testing - uint64_t *senderTimeStamp = - (uint64_t *)client_.configuration_.receive_buffer->writableData(); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - double new_delay = (double)(now - *senderTimeStamp); + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, this); - if (*senderTimeStamp > now) - new_delay = -1 * (double)(*senderTimeStamp - now); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } - client_.delay_sample_++; - client_.avg_data_delay_ = - client_.avg_data_delay_ + - (new_delay - client_.avg_data_delay_) / client_.delay_sample_; + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::STATS_SUMMARY, + (transport::interface::ConsumerTimerCallback)std::bind( + &Impl::ConsumerContext::handleTimerExpiration, this, + std::placeholders::_1, std::placeholders::_2)); - if (client_.configuration_.test_mode_) { - client_.data_delays_ += std::to_string(int(new_delay)); - client_.data_delays_ += ","; + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - if (client_.configuration_.relay_) { - client_.producer_socket_->produceDatagram( - client_.configuration_.relay_name_.getName(), - client_.configuration_.receive_buffer->writableData(), - length < 1400 ? length : 1400); - } - if (client_.configuration_.output_stream_mode_) { - uint8_t *start = - (uint8_t *)client_.configuration_.receive_buffer->writableData(); - start += sizeof(uint64_t); - std::size_t pkt_len = length - sizeof(uint64_t); - client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_); + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::STATS_INTERVAL, + configuration_.report_interval_milliseconds_) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; } - } - size_t maxBufferSize() const override { return mtu; } + consumer_socket_->connect(); - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error while reading from RTC socket" << std::endl; - client_.io_service_.stop(); + return ERROR_SUCCESS; } - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Data successfully read" << std::endl; - } + /*************************************************************** + * Run functions + ***************************************************************/ - private: - Impl &client_; - }; + int run() { + getOutputStream() << "Starting download of " << flow_name_ << std::endl; - class Callback : public ConsumerSocket::ReadCallback { - public: - Callback(Impl &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = - utils::MemBuf::create(client_.configuration_.receive_buffer_size_); + saved_stats_.t_download_ = saved_stats_.t_stats_ = + utils::SteadyTime::now(); + consumer_socket_->consume(flow_name_); + + return ERROR_SUCCESS; } - bool isBufferMovable() noexcept override { return false; } + // Members initialized by the constructor + std::shared_ptr<utils::MemBuf> receive_buffer_; + asio::ip::udp::socket socket_; + std::size_t payload_size_max_; + asio::ip::udp::endpoint remote_; + std::uint32_t nb_iterations_; - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = client_.configuration_.receive_buffer_size_; - } + // Members initialized by in-class initializer + SavedStatistics saved_stats_{}; + uint16_t header_counter_{0}; + bool first_{true}; + std::unique_ptr<ConsumerSocket> consumer_socket_; + std::unique_ptr<ProducerSocket> producer_socket_; + }; - void readDataAvailable(std::size_t length) noexcept override {} + public: + explicit Impl(const hiperf::ClientConfiguration &conf) + : config_(conf), signals_(io_service_) {} - void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} + virtual ~Impl() = default; - size_t maxBufferSize() const override { - return client_.configuration_.receive_buffer_size_; + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; } - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); - } + consumer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = consumer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); - void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); - long usec = (long)dt.count(); + if (ret) { + break; + } + } - std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" - << std::endl; + return ret; + } - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; + int run() { + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); - client_.io_service_.stop(); + for (auto &consumer_context : consumer_contexts_) { + consumer_context.run(); } - private: - Impl &client_; - }; + io_service_.run(); + + return ERROR_SUCCESS; + } - hiperf::ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; - uint32_t total_duration_milliseconds_; - uint64_t old_bytes_value_; - uint64_t old_interest_tx_value_; - uint64_t old_fec_interest_tx_value_; - uint64_t old_fec_data_rx_value_; - uint64_t old_lost_data_value_; - uint64_t old_bytes_recovered_value_; - uint64_t old_definitely_lost_data_value_; - uint32_t old_retx_value_; - uint32_t old_sent_int_value_; - uint32_t old_received_nacks_value_; - uint32_t old_fec_pkt_; - - // IMPORTANT: to be used only for performance testing, when consumer and - // producer are synchronized. Used for rtc only at the moment - double avg_data_delay_; - uint32_t delay_sample_; - - uint32_t received_bytes_; - uint32_t received_data_pkt_; - - std::string data_delays_; + ClientConfiguration &getConfig() { return config_; } + private: asio::io_service io_service_; + hiperf::ClientConfiguration config_; asio::signal_set signals_; - RTCCallback rtc_callback_; - Callback callback_; - std::unique_ptr<ConsumerSocket> consumer_socket_; - std::unique_ptr<ProducerSocket> producer_socket_; - asio::ip::udp::socket socket_; - asio::ip::udp::endpoint remote_; - - ForwarderConfiguration config_; - uint16_t switch_threshold_; /* ms */ - bool done_; - std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; - std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; -#ifdef FORWARDER_INTERFACE - ForwarderInterface forwarder_interface_; -#endif + std::vector<ConsumerContext> consumer_contexts_; }; -HIperfClient::HIperfClient(const ClientConfiguration &conf) { - impl_ = new Impl(conf); -} +HIperfClient::HIperfClient(const ClientConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} -HIperfClient::~HIperfClient() { delete impl_; } +HIperfClient::~HIperfClient() = default; -int HIperfClient::setup() { return impl_->setup(); } +int HIperfClient::setup() const { return impl_->setup(); } -void HIperfClient::run() { impl_->run(); } +void HIperfClient::run() const { impl_->run(); } } // namespace hiperf diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h index f45b9af43..beecbd473 100644 --- a/apps/hiperf/src/client.h +++ b/apps/hiperf/src/client.h @@ -16,19 +16,21 @@ #pragma once #include <common.h> +#include <hicn/transport/utils/noncopyable.h> namespace hiperf { -class HIperfClient { +class HIperfClient : public ::utils::NonCopyable { public: - HIperfClient(const ClientConfiguration &conf); + explicit HIperfClient(const ClientConfiguration &conf); + ~HIperfClient(); - int setup(); - void run(); + int setup() const; + void run() const; private: class Impl; - Impl *impl_; + std::unique_ptr<Impl> impl_; }; -} // namespace hiperf
\ No newline at end of file +} // namespace hiperf diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h index e6ba526f9..0f96bef1f 100644 --- a/apps/hiperf/src/common.h +++ b/apps/hiperf/src/common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,27 +15,28 @@ #pragma once -#include <hicn/transport/auth/identity.h> #include <hicn/transport/auth/signer.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/interfaces/global_conf_interface.h> -#include <hicn/transport/interfaces/p2psecure_socket_consumer.h> -#include <hicn/transport/interfaces/p2psecure_socket_producer.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/chrono_typedefs.h> +#include <hicn/transport/utils/color.h> #include <hicn/transport/utils/literals.h> #ifndef _WIN32 #include <hicn/transport/utils/daemonizator.h> #endif +#include <hicn/apps/utils/logger.h> + #include <asio.hpp> #include <cmath> #include <fstream> #include <iomanip> +#include <iostream> #include <sstream> #include <string> #include <unordered_set> @@ -43,43 +44,151 @@ #ifndef ERROR_SUCCESS #define ERROR_SUCCESS 0 #endif -#define ERROR_SETUP -5 -#define MIN_PROBE_SEQ 0xefffffff - -using namespace transport::interface; -using namespace transport::auth; -using namespace transport::core; - -static inline uint64_t _ntohll(const uint64_t *input) { - uint64_t return_val; - uint8_t *tmp = (uint8_t *)&return_val; - - tmp[0] = *input >> 56; - tmp[1] = *input >> 48; - tmp[2] = *input >> 40; - tmp[3] = *input >> 32; - tmp[4] = *input >> 24; - tmp[5] = *input >> 16; - tmp[6] = *input >> 8; - tmp[7] = *input >> 0; - - return return_val; -} +static constexpr int ERROR_SETUP = -5; +static constexpr uint32_t MIN_PROBE_SEQ = 0xefffffff; +static constexpr uint32_t RTC_HEADER_SIZE = 12; +static constexpr uint32_t FEC_HEADER_MAX_SIZE = 36; +static constexpr uint32_t HIPERF_MTU = 1500; + +namespace hiperf { + +using transport::core::Packet; +using transport::core::Prefix; + +/** + * Logger + */ +template <typename D, typename ConfType, typename ParentType> +class Base : public std::stringbuf, public std::ostream { + protected: + static inline const char separator[] = "| "; + + Base(ParentType &parent, asio::io_service &io_service, int identifier) + : std::stringbuf(), + std::ostream(this), + parent_(parent), + configuration_(parent_.getConfig()), + io_service_(io_service), + identifier_(identifier), + name_id_(D::getContextType() + std::to_string(identifier_)), + flow_name_(configuration_.name_.makeNameWithIndex(identifier_)) { + std::stringstream begin; + std::stringstream end; + if (configuration_.colored_) { + begin << color_mod_ << bold_mod_; + end << end_mod_; + } else { + begin << ""; + end << ""; + } + + begin << "|" << name_id_ << separator; + begin_ = begin.str(); + end_ = end.str(); + } + + Base(Base &&other) noexcept + : parent_(other.parent_), + configuration_(other.configuration_), + io_service_(other.io_service_), + identifier_(other.identifier_), + name_id_(std::move(other.name_id_)), + flow_name_(other.flow_name_) {} + + ~Base() {} + + /*************************************************************** + * std::stringbuf sync override + ***************************************************************/ + + int sync() override { + auto string = str(); + asio::post(io_service_, + [this, string]() { LoggerInfo() << begin_ << string << end_; }); + str(""); + + return 0; + } + + std::ostream &getOutputStream() { return *this; } -static inline uint64_t _htonll(const uint64_t *input) { - return (_ntohll(input)); + // Members initialized by the constructor + ParentType &parent_; + ConfType &configuration_; + asio::io_service &io_service_; + int identifier_; + std::string name_id_; + transport::core::Name flow_name_; + std::string begin_; + std::string end_; + + // Members initialized by the in-class initializer + utils::ColorModifier color_mod_; + utils::ColorModifier bold_mod_{utils::ColorModifier::Code::BOLD}; + utils::ColorModifier end_mod_{utils::ColorModifier::Code::RESET}; +}; + +static inline int ensureFlows(const Prefix &prefix, std::size_t flows) { + int ret = ERROR_SUCCESS; + + // Make sure the provided prefix length not allows to accomodate the + // provided number of flows. + uint16_t max_ip_addr_len_bits; + uint16_t log2_n_flow; + u64 max_n_flow; + if (prefix.getAddressFamily() == AF_INET) { + max_ip_addr_len_bits = IPV4_ADDR_LEN_BITS; + } else if (prefix.getAddressFamily() == AF_INET6) { + max_ip_addr_len_bits = IPV6_ADDR_LEN_BITS; + } else { + LoggerErr() << "Error: unknown address family."; + ret = ERROR_SETUP; + goto END; + } + + log2_n_flow = max_ip_addr_len_bits - prefix.getPrefixLength(); + max_n_flow = log2_n_flow < 64 ? (1 << log2_n_flow) : ~0ULL; + + if (flows > max_n_flow) { + LoggerErr() << "Error: the provided prefix length does not allow to " + "accomodate the provided number of flows (" + << flows << " > " << max_n_flow << ")."; + ret = ERROR_SETUP; + } + +END: + return ret; } -namespace hiperf { +/** + * Class to retrieve the maximum payload size given the MTU and packet headers. + */ +class PayloadSize { + public: + PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU) + : mtu_(mtu), format_(format) {} + + std::size_t getPayloadSizeMax(std::size_t transport_size = 0, + std::size_t fec_size = 0, + std::size_t signature_size = 0) { + return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) - + transport_size - fec_size; + } + + private: + std::size_t mtu_; + Packet::Format format_; +}; /** * Class for handling the production rate for the RTC producer. */ class Rate { public: - Rate() : rate_kbps_(0) {} + Rate() {} + ~Rate() {} - Rate(const std::string &rate) { + explicit Rate(const std::string &rate) { std::size_t found = rate.find("kbps"); if (found != std::string::npos) { rate_kbps_ = std::stof(rate.substr(0, found)); @@ -107,7 +216,7 @@ class Rate { } private: - float rate_kbps_; + float rate_kbps_ = 0.0; }; struct packet_t { @@ -115,103 +224,71 @@ struct packet_t { uint32_t size; }; +struct Configuration { + Prefix name_{"b001::abcd/64"}; + std::string passphrase_; + std::string aggr_interest_passphrase_; + bool rtc_{false}; + uint16_t port_{0}; + bool aggregated_data_{false}; + Packet::Format packet_format_{ + transport::interface::default_values::packet_format}; + uint32_t parallel_flows_{1}; + bool colored_{true}; +}; + /** * Container for command line configuration for hiperf client. */ -struct ClientConfiguration { - ClientConfiguration() - : name("b001::abcd", 0), - beta(-1.f), - drop_factor(-1.f), - window(-1), - producer_certificate(""), - passphrase(""), - receive_buffer(nullptr), - receive_buffer_size_(128 * 1024), - download_size(0), - report_interval_milliseconds_(1000), - transport_protocol_(CBR), - rtc_(false), - test_mode_(false), - relay_(false), - secure_(false), - producer_prefix_(), - interest_lifetime_(500), - relay_name_("c001::abcd/64"), - output_stream_mode_(false), - port_(0) {} - - Name name; - double beta; - double drop_factor; - double window; - std::string producer_certificate; - std::string passphrase; - std::shared_ptr<utils::MemBuf> receive_buffer; - std::size_t receive_buffer_size_; - std::size_t download_size; - std::uint32_t report_interval_milliseconds_; - TransportProtocolAlgorithms transport_protocol_; - bool rtc_; - bool test_mode_; - bool relay_; - bool secure_; +struct ClientConfiguration : Configuration { + double beta_{-1.f}; + double drop_factor_{-1.f}; + double window_{-1.f}; + std::string producer_certificate_; + std::size_t receive_buffer_size_{128 * 1024}; + std::uint32_t report_interval_milliseconds_{1000}; + transport::interface::TransportProtocolAlgorithms transport_protocol_{ + transport::interface::CBR}; + bool test_mode_{false}; + bool relay_{false}; Prefix producer_prefix_; - uint32_t interest_lifetime_; - Prefix relay_name_; - bool output_stream_mode_; - uint16_t port_; + uint32_t interest_lifetime_{500}; + uint32_t manifest_factor_relevant_{100}; + uint32_t manifest_factor_alert_{20}; + Prefix relay_name_{"c001::abcd/64"}; + bool output_stream_mode_{false}; + uint32_t recovery_strategy_{4}; + bool print_headers_{true}; + std::uint32_t nb_iterations_{ + std::numeric_limits<decltype(nb_iterations_)>::max()}; + bool content_sharing_mode_{false}; + bool aggregated_interests_{false}; }; /** * Container for command line configuration for hiperf server. */ -struct ServerConfiguration { - ServerConfiguration() - : name("b001::abcd/64"), - virtual_producer(true), - manifest(false), - live_production(false), - content_lifetime(600000000_U32), - download_size(20 * 1024 * 1024), - hash_algorithm(CryptoHashType::SHA256), - keystore_name(""), - passphrase(""), - keystore_password("cisco"), - multiphase_produce_(false), - rtc_(false), - interactive_(false), - trace_based_(false), - trace_index_(0), - trace_file_(nullptr), - production_rate_(std::string("2048kbps")), - payload_size_(1400), - secure_(false), - input_stream_mode_(false), - port_(0) {} - - Prefix name; - bool virtual_producer; - bool manifest; - bool live_production; - std::uint32_t content_lifetime; - std::uint32_t download_size; - CryptoHashType hash_algorithm; - std::string keystore_name; - std::string passphrase; - std::string keystore_password; - bool multiphase_produce_; - bool rtc_; - bool interactive_; - bool trace_based_; - std::uint32_t trace_index_; - char *trace_file_; - Rate production_rate_; - std::size_t payload_size_; - bool secure_; - bool input_stream_mode_; - uint16_t port_; +struct ServerConfiguration : Configuration { + bool virtual_producer_{true}; + std::uint32_t manifest_max_capacity_{0}; + bool live_production_{false}; + std::uint32_t content_lifetime_{ + transport::interface::default_values::content_object_expiry_time}; + std::uint32_t download_size_{20 * 1024 * 1024}; + transport::auth::CryptoHashType hash_algorithm_{ + transport::auth::CryptoHashType::SHA256}; + std::string keystore_name_; + std::string keystore_password_{"cisco"}; + bool multiphase_produce_{false}; + bool interactive_{false}; + bool trace_based_{false}; + std::uint32_t trace_index_{0}; + char *trace_file_{nullptr}; + Rate production_rate_{"2048kbps"}; + std::size_t payload_size_{1384}; + bool input_stream_mode_{false}; std::vector<struct packet_t> trace_; + std::string fec_type_; }; } // namespace hiperf diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h deleted file mode 100644 index 655ac3b66..000000000 --- a/apps/hiperf/src/forwarder_config.h +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <string> -#include <unordered_map> -#include <vector> - -namespace hiperf { - -struct ListenerConfig { - std::string address; - std::uint16_t port; - std::string interface; - std::string name; -}; - -struct ConnectorConfig { - std::string local_address; - std::uint16_t local_port; - std::string remote_address; - std::uint16_t remote_port; - std::string interface; - std::string name; -}; - -struct RouteConfig { - std::string prefix; - uint16_t weight; - std::string main_connector; - std::string backup_connector; - std::string name; -}; - -class ForwarderConfiguration { - public: - ForwarderConfiguration() : n_threads_(1) {} - - bool empty() { - return listeners_.empty() && connectors_.empty() && routes_.empty(); - } - - ForwarderConfiguration &setThreadNumber(std::size_t threads) { - n_threads_ = threads; - return *this; - } - - std::size_t getThreadNumber() { return n_threads_; } - - template <typename... Args> - ForwarderConfiguration &addListener(Args &&...args) { - listeners_.emplace_back(std::forward<Args>(args)...); - return *this; - } - - template <typename... Args> - ForwarderConfiguration &addConnector(const std::string &name, - Args &&...args) { - connectors_.emplace(name, std::forward<Args>(args)...); - return *this; - } - - template <typename... Args> - ForwarderConfiguration &addRoute(Args &&...args) { - routes_.emplace_back(std::forward<Args>(args)...); - return *this; - } - - std::vector<ListenerConfig> &getListeners() { return listeners_; } - - std::unordered_map<std::string, ConnectorConfig> &getConnectors() { - return connectors_; - } - - std::vector<RouteConfig> &getRoutes() { return routes_; } - - private: - std::vector<ListenerConfig> listeners_; - std::unordered_map<std::string, ConnectorConfig> connectors_; - std::vector<RouteConfig> routes_; - std::size_t n_threads_; -}; - -}
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc deleted file mode 100644 index 864208239..000000000 --- a/apps/hiperf/src/forwarder_interface.cc +++ /dev/null @@ -1,676 +0,0 @@ -/* - * Copyright (c) 2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <arpa/inet.h> -#include <forwarder_interface.h> -#include <hicn/transport/utils/log.h> - -#include <chrono> -#include <iostream> -#include <thread> -#include <unordered_set> - -extern "C" { -#include <hicn/error.h> -#include <hicn/util/ip_address.h> -} - -// XXX the main listener should be retrieve in this class at initialization, aka -// when hICN becomes avialable -// -// XXX the main listener port will be retrieved in the forwarder -// interface... everything else will be delayed until we have this -// information - -namespace hiperf { - -ForwarderInterface::ForwarderInterface(asio::io_service &io_service, - ICallback *callback) - : external_ioservice_(io_service), - forwarder_interface_callback_(callback), - work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), - sock_(nullptr), - thread_(std::make_unique<std::thread>([this]() { - std::cout << "Starting Forwarder Interface thread" << std::endl; - internal_ioservice_.run(); - std::cout << "Stopping Forwarder Interface thread" << std::endl; - })), - // set_route_callback_(std::forward<Callback &&>(setRouteCallback)), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - hicn_listen_port_(9695), - /* We start in disabled state even when a forwarder is always available */ - state_(State::Disabled), - timer_(io_service), - num_reattempts(0) { - std::cout << "Forwarder interface created... connecting to forwarder...\n"; - internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); -} - -ForwarderInterface::~ForwarderInterface() { - if (thread_ && thread_->joinable()) { - internal_ioservice_.dispatch([this]() { - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - - work_.reset(); - }); - - thread_->join(); - } - - std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl; -} - -void ForwarderInterface::onHicnServiceAvailable(bool flag) { - if (flag) { - switch (state_) { - case State::Disabled: - case State::Requested: - state_ = State::Available; - case State::Available: - connectToForwarder(); - /* Synchronous */ - if (state_ != State::Connected) { - std::cout << "ConnectToForwarder failed" << std::endl; - goto REATTEMPT; - } - state_ = State::Ready; - - std::cout << "Connected to forwarder... cancelling reconnection timer" - << std::endl; - timer_.cancel(); - num_reattempts = 0; - - // case State::Connected: - // checkListener(); - - // if (state_ != State::Ready) { - // std::cout << "Listener not found" << std::endl; - // goto REATTEMPT; - // } - // state_ = State::Ready; - - // timer_.cancel(); - // num_reattempts = 0; - - std::cout << "Forwarder interface is ready... communicate to controller" - << std::endl; - - forwarder_interface_callback_->onHicnServiceReady(); - - case State::Ready: - break; - } - } else { - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - state_ = State::Disabled; // XXX to be checked upon callback to prevent the - // state from going forward (used to manage - // concurrency) - } - return; - -REATTEMPT: - /* Schedule reattempt */ - std::cout << "Failed to connect, scheduling reattempt" << std::endl; - num_reattempts++; - - timer_.expires_from_now( - std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS)); - // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable, - // this, flag, std::placeholders::_1)); - timer_.async_wait([this, flag](std::error_code ec) { - if (ec) return; - onHicnServiceAvailable(flag); - }); -} - -int ForwarderInterface::connectToForwarder() { - sock_ = hc_sock_create(); - if (!sock_) { - std::cout << "Could not create socket" << std::endl; - goto ERR_SOCK; - } - - if (hc_sock_connect(sock_) < 0) { - std::cout << "Could not connect to forwarder" << std::endl; - goto ERR; - } - - std::cout << "Forwarder interface connected" << std::endl; - state_ = State::Connected; - return 0; - -ERR: - hc_sock_free(sock_); - sock_ = nullptr; -ERR_SOCK: - return -1; -} - -int ForwarderInterface::checkListener() { - if (!sock_) return -1; - - hc_data_t *data; - if (hc_listener_list(sock_, &data) < 0) return -1; - - int ret = -1; - foreach_listener(l, data) { - std::string interface = std::string(l->interface_name); - if (interface.compare("lo") != 0) { - hicn_listen_port_ = l->local_port; - state_ = State::Ready; - ret = 0; - std::cout << "Got listener port" << std::endl; - break; - } - } - - hc_data_free(data); - return ret; -} - -void ForwarderInterface::close() { - std::cout << "ForwarderInterface::close" << std::endl; - - state_ = State::Disabled; - /* Cancelling eventual reattempts */ - timer_.cancel(); - - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - - internal_ioservice_.post([this]() { work_.reset(); }); - - if (thread_->joinable()) { - thread_->join(); - } -} - -#if 0 -void ForwarderInterface::enableCheckRoutesTimer() { - if (check_routes_timer_ != nullptr) return; - - check_routes_timer_ = - std::make_unique<asio::steady_timer>(internal_ioservice_); - checkRoutesLoop(); -} - -void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) { - internalRemoveConnectedUser(protocol); -} - -void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) { - internal_ioservice_.post( - [this, protocol]() { internalRemoveConnectedUser(protocol); }); -} -#endif - -void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) { - std::vector<RouteInfoPtr> routes; - routes.push_back(std::move(route_info)); - createFaceAndRoutes(routes); -} - -void ForwarderInterface::createFaceAndRoutes( - const std::vector<RouteInfoPtr> &routes_info) { - pending_add_route_counter_++; - auto timer = new asio::steady_timer(internal_ioservice_); - internal_ioservice_.post([this, routes_info, timer]() { - internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT, - timer); - }); -} - -void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) { - std::vector<RouteInfoPtr> routes; - routes.push_back(std::move(route_info)); - deleteFaceAndRoutes(routes); -} - -void ForwarderInterface::deleteFaceAndRoutes( - const std::vector<RouteInfoPtr> &routes_info) { - internal_ioservice_.post([this, routes_info]() { - for (auto &route : routes_info) { - internalDeleteFaceAndRoute(route); - } - }); -} - -void ForwarderInterface::internalDeleteFaceAndRoute( - const RouteInfoPtr &route_info) { - if (!sock_) return; - - hc_data_t *data; - if (hc_route_list(sock_, &data) < 0) return; - - std::vector<hc_route_t *> routes_to_remove; - foreach_route(r, data) { - char remote_addr[INET6_ADDRSTRLEN]; - int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); - if (ret < 0) continue; - - std::string route_addr(remote_addr); - if (route_addr.compare(route_info->route_addr) == 0 && - r->len == route_info->route_len) { - // route found - routes_to_remove.push_back(r); - } - } - - if (routes_to_remove.size() == 0) { - // nothing to do here - hc_data_free(data); - return; - } - - std::unordered_set<uint32_t> connids_to_remove; - for (unsigned i = 0; i < routes_to_remove.size(); i++) { - connids_to_remove.insert(routes_to_remove[i]->face_id); - if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { - std::cout << "Error removing route from forwarder." << std::endl; - } - } - - // remove connection - if (hc_connection_list(sock_, &data) < 0) { - hc_data_free(data); - return; - } - - // collects pointerst to the connections using the conn IDs - std::vector<hc_connection_t *> conns_to_remove; - foreach_connection(c, data) { - if (connids_to_remove.find(c->id) != connids_to_remove.end()) { - // conn found - conns_to_remove.push_back(c); - } - } - - if (conns_to_remove.size() == 0) { - // nothing else to do here - hc_data_free(data); - return; - } - - for (unsigned i = 0; i < conns_to_remove.size(); i++) { - if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { - std::cout << "Error removing connection from forwarder." << std::endl; - } - } - - hc_data_free(data); -} - -void ForwarderInterface::internalCreateFaceAndRoutes( - const std::vector<RouteInfoPtr> &route_info, uint8_t max_try, - asio::steady_timer *timer) { - uint32_t face_id; - - std::vector<RouteInfoPtr> failed; - for (auto &route : route_info) { - int ret = tryToCreateFace(route.get(), &face_id); - if (ret >= 0) { - auto ret = tryToCreateRoute(route.get(), face_id); - if (ret < 0) { - failed.push_back(route); - std::cerr << "Error creating route and face" << std::endl; - continue; - } - } - } - - if (failed.size() > 0) { - if (max_try == 0) { - /* All attempts failed */ - goto RESULT; - } - max_try--; - timer->expires_from_now(std::chrono::milliseconds(500)); - timer->async_wait([this, failed, max_try, timer](std::error_code ec) { - if (ec) return; - internalCreateFaceAndRoutes(failed, max_try, timer); - }); - return; - } - -#if 0 - // route_status_[protocol] = std::move(route_info); - for (size_t i = 0; i < route_info.size(); i++) { - route_status_.insert( - std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i]))); - } -#endif - -RESULT: - std::cout << "Face / Route create ok, now calling back protocol" << std::endl; - pending_add_route_counter_--; - external_ioservice_.post([this, r = std::move(route_info)]() mutable { - forwarder_interface_callback_->onRouteConfigured(r); - }); - delete timer; -} - -int ForwarderInterface::tryToCreateFace(RouteInfo *route_info, - uint32_t *face_id) { - bool found = false; - - // check connection with the forwarder - if (!sock_) { - std::cout << "[ForwarderInterface::tryToCreateFace] socket error" - << std::endl; - goto ERR_SOCK; - } - - // get listeners list - hc_data_t *data; - if (hc_listener_list(sock_, &data) < 0) { - std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners"; - goto ERR_LIST; - } - - char _local_address[128]; - foreach_listener(l, data) { - std::cout << "Processing " << l->interface_name << std::endl; - std::string interface = std::string(l->interface_name); - int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET); - if (ret < 0) { - std::cerr << "Error in ip_address_ntop" << std::endl; - goto ERR; - } - - std::string local_address = std::string(_local_address); - uint16_t local_port = l->local_port; - - if (interface.compare(route_info->interface) == 0 && - local_address.compare(route_info->local_addr) == 0 && - local_port == route_info->local_port) { - found = true; - break; - } - } - - std::cout << route_info->remote_addr << std::endl; - - ip_address_t local_address, remote_address; - ip_address_pton(route_info->local_addr.c_str(), &local_address); - ip_address_pton(route_info->remote_addr.c_str(), &remote_address); - - if (!found) { - // Create listener - hc_listener_t listener; - memset(&listener, 0, sizeof(hc_listener_t)); - - std::string name = "l_" + route_info->name; - listener.local_addr = local_address; - listener.type = CONNECTION_TYPE_UDP; - listener.family = AF_INET; - listener.local_port = route_info->local_port; - strncpy(listener.name, name.c_str(), sizeof(listener.name)); - strncpy(listener.interface_name, route_info->interface.c_str(), - sizeof(listener.interface_name)); - - std::cout << "------------> " << route_info->interface << std::endl; - - int ret = hc_listener_create(sock_, &listener); - - if (ret < 0) { - std::cerr << "Error creating listener." << std::endl; - return -1; - } else { - std::cout << "Listener " << listener.id << " created." << std::endl; - } - } - - // Create face - hc_face_t face; - memset(&face, 0, sizeof(hc_face_t)); - - // crate face with the local interest - face.face.type = FACE_TYPE_UDP; - face.face.family = route_info->family; - face.face.local_addr = local_address; - face.face.remote_addr = remote_address; - face.face.local_port = route_info->local_port; - face.face.remote_port = route_info->remote_port; - - if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) < - 0) { - std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] " - "netdevice_set_name " - "(" - << face.face.netdevice.name << ", " - << route_info->interface << ") error" << std::endl; - goto ERR; - } - - // create face - if (hc_face_create(sock_, &face) < 0) { - std::cout << "[ForwarderInterface::tryToCreateFace] error creating face"; - goto ERR; - } - - std::cout << "Face created successfully" << std::endl; - - // assing face to the return value - *face_id = face.id; - - hc_data_free(data); - return 0; - -ERR: - hc_data_free(data); -ERR_LIST: -ERR_SOCK: - return -1; -} - -int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info, - uint32_t face_id) { - std::cout << "Trying to create route" << std::endl; - - // check connection with the forwarder - if (!sock_) { - std::cout << "[ForwarderInterface::tryToCreateRoute] socket error"; - return -1; - } - - ip_address_t route_ip; - hc_route_t route; - - if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { - std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error"; - return -1; - } - - route.face_id = face_id; - route.family = AF_INET6; - route.remote_addr = route_ip; - route.len = route_info->route_len; - route.cost = 1; - - if (hc_route_create(sock_, &route) < 0) { - std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route"; - return -1; - } - - std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl; - return 0; -} - -#if 0 // not used -void ForwarderInterface::checkRoutesLoop() { - check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000)); - check_routes_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - if (pending_add_route_counter_ == 0) checkRoutes(); - }); -} - -void ForwarderInterface::checkRoutes() { - std::cout << "someone called the checkRoutes function" << std::endl; - if (!sock_) return; - - hc_data_t *data; - if (hc_route_list(sock_, &data) < 0) { - return; - } - - std::unordered_set<std::string> routes_set; - foreach_route(r, data) { - char remote_addr[INET6_ADDRSTRLEN]; - int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); - if (ret < 0) continue; - std::string route(std::string(remote_addr) + "/" + std::to_string(r->len)); - routes_set.insert(route); - } - - for (auto it = route_status_.begin(); it != route_status_.end(); it++) { - std::string route(it->second->route_addr + "/" + - std::to_string(it->second->route_len)); - if (routes_set.find(route) == routes_set.end()) { - // the route is missing - createFaceAndRoute(it->second, it->first); - break; - } - } - - hc_data_free(data); -} -#endif - -#if 0 - using ListenerRetrievedCallback = - std::function<void(std::error_code, uint32_t)>; - - ListenerRetrievedCallback listener_retrieved_callback_; - -#ifdef __ANDROID__ - hicn_listen_port_(9695), -#else - hicn_listen_port_(0), -#endif - timer_(forward_engine_.getIoService()), - - void initConfigurationProtocol(void) - { - // We need the configuration, which is different for every protocol... - // so we move this step down towards the protocol implementation itself. - if (!permanent_hicn) { - doInitConfigurationProtocol(); - } else { - // XXX This should be moved somewhere else - getMainListener( - [this](std::error_code ec, uint32_t hicn_listen_port) { - if (!ec) - { - hicn_listen_port_ = hicn_listen_port; - doInitConfigurationProtocol(); - } - }); - } - } - - template <typename Callback> - void getMainListener(Callback &&callback) - { - listener_retrieved_callback_ = std::forward<Callback &&>(callback); - tryToConnectToForwarder(); - } - private: - void doGetMainListener(std::error_code ec) - { - if (!ec) - { - // ec == 0 --> timer expired - int ret = forwarder_interface_.getMainListenerPort(); - if (ret <= 0) - { - // Since without the main listener of the forwarder the proxy cannot - // work, we can stop the program here until we get the listener port. - std::cout << - "Could not retrieve main listener port from the forwarder. " - "Retrying."; - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&Protocol::doGetMainListener, this, - std::placeholders::_1)); - } - else - { - timer_.cancel(); - retx_count_ = 0; - hicn_listen_port_ = uint16_t(ret); - listener_retrieved_callback_( - make_error_code(configuration_error::success), hicn_listen_port_); - } - } - else - { - std::cout << "Timer for retrieving main hicn listener canceled." << std::endl; - } - } - - void tryToConnectToForwarder() - { - doTryToConnectToForwarder(std::make_error_code(std::errc(0))); - } - - void doTryToConnectToForwarder(std::error_code ec) - { - if (!ec) - { - // ec == 0 --> timer expired - int ret = forwarder_interface_.connect(); - if (ret < 0) - { - // We were not able to connect to the local forwarder. Do not give up - // and retry. - std::cout << "Could not connect to local forwarder. Retrying." << std::endl; - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this, - std::placeholders::_1)); - } - else - { - timer_.cancel(); - retx_count_ = 0; - doGetMainListener(std::make_error_code(std::errc(0))); - } - } - else - { - std::cout << "Timer for re-trying forwarder connection canceled." << std::endl; - } - } - - - template <typename ProtocolImplementation> - constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL; - -#endif - -constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS; -constexpr uint32_t ForwarderInterface::MAX_REATTEMPT; - -} // namespace hiperf
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h deleted file mode 100644 index 7591ea257..000000000 --- a/apps/hiperf/src/forwarder_interface.h +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -extern "C" { -#ifndef WITH_POLICY -#define WITH_POLICY -#endif -#include <hicn/ctrl/api.h> -#include <hicn/util/ip_address.h> -} - -#ifndef ASIO_STANDALONE -#define ASIO_STANDALONE -#endif -#include <asio.hpp> - -#include <functional> -#include <thread> -#include <unordered_map> - -namespace hiperf { - -class ForwarderInterface { - static const uint32_t REATTEMPT_DELAY_MS = 500; - static const uint32_t MAX_REATTEMPT = 10; - - public: - struct RouteInfo { - int family; - std::string local_addr; - uint16_t local_port; - std::string remote_addr; - uint16_t remote_port; - std::string route_addr; - uint8_t route_len; - std::string interface; - std::string name; - }; - - using RouteInfoPtr = std::shared_ptr<RouteInfo>; - - class ICallback { - public: - virtual void onHicnServiceReady() = 0; - virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0; - }; - - enum class State { - Disabled, /* Stack is stopped */ - Requested, /* Stack is starting */ - Available, /* Forwarder is running */ - Connected, /* Control socket connected */ - Ready, /* Listener present */ - }; - - public: - ForwarderInterface(asio::io_service &io_service, ICallback *callback); - - ~ForwarderInterface(); - - State getState(); - - void setState(State state); - - void onHicnServiceAvailable(bool flag); - - void enableCheckRoutesTimer(); - - void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info); - - void createFaceAndRoute(const RouteInfoPtr &route_info); - - void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info); - - void deleteFaceAndRoute(const RouteInfoPtr &route_info); - - void close(); - - uint16_t getHicnListenerPort() { return hicn_listen_port_; } - - private: - int connectToForwarder(); - - int checkListener(); - - void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info, - uint8_t max_try, asio::steady_timer *timer); - - void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info); - - int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id); - int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id); - - void checkRoutesLoop(); - - void checkRoutes(); - - asio::io_service &external_ioservice_; - asio::io_service internal_ioservice_; - ICallback *forwarder_interface_callback_; - std::unique_ptr<asio::io_service::work> work_; - hc_sock_t *sock_; - std::unique_ptr<std::thread> thread_; - // SetRouteCallback set_route_callback_; - // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_; - std::unique_ptr<asio::steady_timer> check_routes_timer_; - uint32_t pending_add_route_counter_; - uint16_t hicn_listen_port_; - - State state_; - - /* Reattempt timer */ - asio::steady_timer timer_; - unsigned num_reattempts; -}; - -} // namespace hiperf diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc index b2d99c4a4..25c1a288c 100644 --- a/apps/hiperf/src/main.cc +++ b/apps/hiperf/src/main.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,133 +14,184 @@ */ #include <client.h> +#include <hicn/apps/utils/logger.h> #include <server.h> -#include <forwarder_interface.h> namespace hiperf { +using transport::auth::CryptoHashType; + +static std::unordered_map<std::string, hicn_packet_format_t> const + packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP}, + {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP}, + {"new", HICN_PACKET_FORMAT_NEW}}; + +std::string str_tolower(std::string s) { + std::transform(s.begin(), s.end(), s.begin(), + [](unsigned char c) { return std::tolower(c); }); + return s; +} + void usage() { - std::cerr << "HIPERF - A tool for performing network throughput " - "measurements with hICN" - << std::endl; - std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; - std::cerr << std::endl; - std::cerr << "SERVER OR CLIENT:" << std::endl; + LoggerInfo() << "HIPERF - Instrumentation tool for performing active network" + "measurements with hICN"; + LoggerInfo() << "usage: hiperf [-S|-C] [options] [prefix|name]"; + LoggerInfo(); + LoggerInfo() << "SERVER OR CLIENT:"; #ifndef _WIN32 - std::cerr << "-D\t\t\t\t\t" - << "Run as a daemon" << std::endl; - std::cerr << "-R\t\t\t\t\t" - << "Run RTC protocol (client or server)" << std::endl; - std::cerr << "-f\t<filename>\t\t\t" - << "Log file" << std::endl; - std::cerr << "-z\t<io_module>\t\t\t" - << "IO module to use. Default: hicnlight_module" << std::endl; + LoggerInfo() << "-D\t\t\t\t\t" + << "Run as a daemon"; + LoggerInfo() << "-R\t\t\t\t\t" + << "Run RTC protocol (client or server)"; + LoggerInfo() << "-f\t<filename>\t\t\t" + << "Log file"; + LoggerInfo() << "-z\t<io_module>\t\t\t" + << "IO module to use. Default: hicnlight_module"; + LoggerInfo() << "-F\t<conf_file>\t\t\t" + << "Path to optional configuration file for libtransport"; + LoggerInfo() << "-a\t\t\t\t\t" + << "Enables data packet aggregation. " + << "Works only in RTC mode"; + LoggerInfo() << "-X\t<param>\t\t\t\t" + << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#"; + LoggerInfo() + << "-J\t<passphrase>\t\t\t" + << "Set the passphrase used to sign/verify aggregated interests. " + "If set on the client, aggregated interests are enable automatically."; #endif - std::cerr << std::endl; - std::cerr << "SERVER SPECIFIC:" << std::endl; - std::cerr << "-A\t<content_size>\t\t\t" - "Size of the content to publish. This " - "is not the size of the packet (see -s for it)." - << std::endl; - std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." - << std::endl; - std::cerr << "-r\t\t\t\t\t" - << "Produce real content of <content_size> bytes" << std::endl; - std::cerr << "-m\t\t\t\t\t" - << "Produce transport manifest" << std::endl; - std::cerr << "-l\t\t\t\t\t" - << "Start producing content upon the reception of the " - "first interest" - << std::endl; - std::cerr << "-K\t<keystore_path>\t\t\t" - << "Path of p12 file containing the " - "crypto material used for signing packets" - << std::endl; - std::cerr << "-k\t<passphrase>\t\t\t" - << "String from which a 128-bit symmetric key will be " - "derived for signing packets" - << std::endl; - std::cerr << "-y\t<hash_algorithm>\t\t" - << "Use the selected hash algorithm for " - "calculating manifest digests" - << std::endl; - std::cerr << "-p\t<password>\t\t\t" - << "Password for p12 keystore" << std::endl; - std::cerr << "-x\t\t\t\t\t" - << "Produce a content of <content_size>, then after downloading " - "it produce a new content of" - << "\n\t\t\t\t\t<content_size> without resetting " - "the suffix to 0." - << std::endl; - std::cerr << "-B\t<bitrate>\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." - << std::endl; + LoggerInfo(); + LoggerInfo() << "SERVER SPECIFIC:"; + LoggerInfo() + << "-A\t<content_size>\t\t\t" + "Sends an application data unit in bytes that is published once " + "before exit"; + LoggerInfo() << "-E\t<expiry_time>\t\t\t" + "Expiration time for data packets generated by the producer " + "socket"; + LoggerInfo() << "-s\t<packet_size>\t\t\tData packet payload size."; + LoggerInfo() << "-r\t\t\t\t\t" + << "Produce real content of <content_size> bytes"; + LoggerInfo() + << "-m\t<manifest_max_capacity>\t\t" + << "The maximum number of entries a manifest can contain. Set it " + "to 0 to disable manifests. Default is 30, max is 255."; + LoggerInfo() << "-l\t\t\t\t\t" + << "Start producing content upon the reception of the " + "first interest"; + LoggerInfo() << "-K\t<keystore_path>\t\t\t" + << "Path of p12 file containing the " + "crypto material used for signing packets"; + LoggerInfo() << "-k\t<passphrase>\t\t\t" + << "String from which a 128-bit symmetric key will be " + "derived for signing packets"; + LoggerInfo() << "-p\t<password>\t\t\t" + << "Password for p12 keystore"; + LoggerInfo() << "-y\t<hash_algorithm>\t\t" + << "Use the selected hash algorithm for " + "computing manifest digests (default: SHA256)"; + LoggerInfo() << "-x\t\t\t\t\t" + << "Produces application data units of size <content_size> " + << "without resetting the name suffix to 0."; + LoggerInfo() << "-B\t<bitrate>\t\t\t" + << "RTC producer data bitrate, to be used with the -R option."; #ifndef _WIN32 - std::cerr << "-I\t\t\t\t\t" - "Interactive mode, start/stop real time content production " - "by pressing return. To be used with the -R option" - << std::endl; - std::cerr + LoggerInfo() << "-I\t\t\t\t\t" + "Interactive mode, start/stop real time content production " + "by pressing return. To be used with the -R option"; + LoggerInfo() << "-T\t<filename>\t\t\t" "Trace based mode, hiperf takes as input a file with a trace. " "Each line of the file indicates the timestamp and the size of " "the packet to generate. To be used with the -R option. -B and -I " - "will be ignored." - << std::endl; - std::cerr << "-E\t\t\t\t\t" - << "Enable encrypted communication. Requires the path to a p12 " - "file containing the " - "crypto material used for the TLS handshake" - << std::endl; - std::cerr << "-G\t<port>\t\t\t" - << "input stream from localhost at the specified port" << std::endl; + "will be ignored."; + LoggerInfo() << "-G\t<port>\t\t\t\t" + << "Input stream from localhost at the specified port"; #endif - std::cerr << std::endl; - std::cerr << "CLIENT SPECIFIC:" << std::endl; - std::cerr << "-b\t<beta_parameter>\t\t" - << "RAAQM beta parameter" << std::endl; - std::cerr << "-d\t<drop_factor_parameter>\t\t" - << "RAAQM drop factor " - "parameter" - << std::endl; - std::cerr << "-L\t<interest lifetime>\t\t" - << "Set interest lifetime." << std::endl; - std::cerr << "-M\t<input_buffer_size>\t\t" - << "Size of consumer input buffer. If 0, reassembly of packets " - "will be disabled." - << std::endl; - std::cerr << "-W\t<window_size>\t\t\t" - << "Use a fixed congestion window " - "for retrieving the data." - << std::endl; - std::cerr << "-i\t<stats_interval>\t\t" - << "Show the statistics every <stats_interval> milliseconds." - << std::endl; - std::cerr << "-c\t<certificate_path>\t\t" - << "Path of the producer certificate to be used for verifying the " - "origin of the packets received." - << std::endl; - std::cerr << "-k\t<passphrase>\t\t\t" - << "String from which is derived the symmetric key used by the " - "producer to sign packets and by the consumer to verify them." - << std::endl; - std::cerr << "-t\t\t\t\t\t" - "Test mode, check if the client is receiving the " - "correct data. This is an RTC specific option, to be " - "used with the -R (default false)" - << std::endl; - std::cerr << "-P\t\t\t\t\t" - << "Prefix of the producer where to do the handshake" << std::endl; - std::cerr << "-j\t<relay_name>\t\t\t" - << "Publish the received content under the name relay_name." - "This is an RTC specific option, to be " - "used with the -R (default false)" - << std::endl; - std::cerr << "-g\t<port>\t\t\t" - << "output stream to localhost at the specified port" << std::endl; + LoggerInfo(); + LoggerInfo() << "CLIENT SPECIFIC:"; + LoggerInfo() << "-b\t<beta_parameter>\t\t" + << "RAAQM beta parameter"; + LoggerInfo() << "-d\t<drop_factor_parameter>\t\t" + << "RAAQM drop factor " + "parameter"; + LoggerInfo() << "-L\t<interest lifetime>\t\t" + << "Set interest lifetime."; + LoggerInfo() << "-U\t<factor>\t\t\t" + << "Update the relevance threshold: if an unverified packet has " + "been received before the last U * manifest_max_capacity_ " + "packets received (verified or not), it will be flushed out. " + "Should be > 1, default is 100."; + LoggerInfo() + << "-u\t<factor>\t\t\t" + << "Update the alert threshold: if the " + "number of unverified packet is > u * manifest_max_capacity_, " + "an alert is raised. Should be set such that U > u >= 1, " + "default is 20. If u >= U, no alert will ever be raised."; + LoggerInfo() << "-M\t<input_buffer_size>\t\t" + << "Size of consumer input buffer. If 0, reassembly of packets " + "will be disabled."; + LoggerInfo() + << "-N\t\t\t\t\t" + << "Enable aggregated interests; the number of suffixes (including " + "the one in the header) can be set through the env variable " + "`MAX_AGGREGATED_INTERESTS`."; + LoggerInfo() << "-W\t<window_size>\t\t\t" + << "Use a fixed congestion window " + "for retrieving the data."; + LoggerInfo() << "-i\t<stats_interval>\t\t" + << "Show the statistics every <stats_interval> milliseconds."; + LoggerInfo() + << "-c\t<certificate_path>\t\t" + << "Path of the producer certificate to be used for verifying the " + "origin of the packets received."; + LoggerInfo() + << "-k\t<passphrase>\t\t\t" + << "String from which is derived the symmetric key used by the " + "producer to sign packets and by the consumer to verify them."; + LoggerInfo() << "-t\t\t\t\t\t" + "Test mode, check if the client is receiving the " + "correct data. This is an RTC specific option, to be " + "used with the -R (default: false)"; + LoggerInfo() + << "-P\t\t\t\t\t" + << "Number of parallel streams. For hiperf client, this is the " + "number of consumer to create, while for hiperf server this is " + "the number of producers to create."; + LoggerInfo() << "-j\t<relay_name>\t\t\t" + << "Publish received content under the name relay_name." + "This is an RTC specific option, to be " + "used with the -R (default: false)"; + LoggerInfo() << "-g\t<port>\t\t\t\t" + << "Output stream to localhost at the specified port"; + LoggerInfo() + << "-o\t\t\t\t\t" + << "Content sharing mode: if set the socket work in content sharing" + << "mode. It works only in RTC mode"; + LoggerInfo() << "-e\t<strategy>\t\t\t" + << "Enhance the network with a reliability strategy. Options"; + LoggerInfo() << "\t\t\t\t\t\t1: unreliable "; + LoggerInfo() << "\t\t\t\t\t\t2: rtx only "; + LoggerInfo() << "\t\t\t\t\t\t3: fec only "; + LoggerInfo() << "\t\t\t\t\t\t4: delay based "; + LoggerInfo() << "\t\t\t\t\t\t5: low rate "; + LoggerInfo() << "\t\t\t\t\t\t6: low rate and best path "; + LoggerInfo() << "\t\t\t\t\t\t7: low rate and replication"; + LoggerInfo() << "\t\t\t\t\t\t8: low rate and best path/replication "; + LoggerInfo() << "\t\t\t\t\t\t9: only fec low residual losses "; + LoggerInfo() << "\t\t\t\t\t\t10: delay and best path "; + LoggerInfo() << "\t\t\t\t\t\t11: delay and replication "; + LoggerInfo() << "\t\t\t\t\t\t(default: 2 = rtx only) "; + LoggerInfo() << "-H\t\t\t\t\t" + << "Disable periodic print headers in stats report."; + LoggerInfo() << "-n\t<nb_iterations>\t\t\t" + << "Print the stats report <nb_iterations> times and exit.\n" + << "\t\t\t\t\tThis option limits the duration of the run to " + "<nb_iterations> * <stats_interval> milliseconds."; + LoggerInfo() << "-w <packet_format> Packet format (without signature, " + "defaults to IPV6_TCP)"; } -int main(int argc, char *argv[]) { +int hiperf_main(int argc, char *argv[]) { #ifndef _WIN32 // Common bool daemon = false; @@ -149,11 +200,13 @@ int main(int argc, char *argv[]) { WSAStartup(MAKEWORD(2, 2), &wsaData); #endif + transport::interface::global_config::GlobalConfigInterface global_conf; + // -1 server, 0 undefined, 1 client int role = 0; int options = 0; - char *log_file = nullptr; + const char *log_file = nullptr; transport::interface::global_config::IoModuleConfiguration config; std::string conf_file; config.name = "hicnlight_module"; @@ -166,10 +219,11 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt( - argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) != - -1) { + // Please keep in alphabetical order. + while ( + (opt = getopt(argc, argv, + "A:B:CDE:F:G:HIJ:K:L:M:NP:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:" + "n:op:qrs:tu:vw:xy:z:")) != -1) { switch (opt) { // Common case 'D': { @@ -202,11 +256,16 @@ int main(int argc, char *argv[]) { break; } #else + // Please keep in alphabetical order. while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) != - -1) { + "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:op:rs:" + "tu:vwxy:z:")) != -1) { switch (opt) { #endif + case 'E': { + server_configuration.content_lifetime_ = std::stoul(optarg); + break; + } case 'f': { log_file = optarg; break; @@ -216,6 +275,30 @@ int main(int argc, char *argv[]) { server_configuration.rtc_ = true; break; } + case 'a': { + client_configuration.aggregated_data_ = true; + server_configuration.aggregated_data_ = true; + break; + } + case 'o': { + client_configuration.content_sharing_mode_ = true; + break; + } + case 'w': { + std::string packet_format_s = std::string(optarg); + packet_format_s = str_tolower(packet_format_s); + auto it = packet_format_map.find(std::string(optarg)); + if (it == packet_format_map.end()) + throw std::runtime_error("Bad packet format"); + client_configuration.packet_format_ = it->second; + server_configuration.packet_format_ = it->second; + break; + } + case 'k': { + server_configuration.passphrase_ = std::string(optarg); + client_configuration.passphrase_ = std::string(optarg); + break; + } case 'z': { config.name = optarg; break; @@ -234,25 +317,31 @@ int main(int argc, char *argv[]) { role += 1; break; } - case 'k': { - server_configuration.passphrase = std::string(optarg); - client_configuration.passphrase = std::string(optarg); + case 'q': { + client_configuration.colored_ = server_configuration.colored_ = false; + break; + } + case 'J': { + client_configuration.aggr_interest_passphrase_ = optarg; + server_configuration.aggr_interest_passphrase_ = optarg; + // Consumer signature is only used with aggregated interests, + // hence enabling it also forces usage of aggregated interests + client_configuration.aggregated_interests_ = true; break; } - // Client specifc case 'b': { - client_configuration.beta = std::stod(optarg); + client_configuration.beta_ = std::stod(optarg); options = 1; break; } case 'd': { - client_configuration.drop_factor = std::stod(optarg); + client_configuration.drop_factor_ = std::stod(optarg); options = 1; break; } case 'W': { - client_configuration.window = std::stod(optarg); + client_configuration.window_ = std::stod(optarg); options = 1; break; } @@ -261,13 +350,17 @@ int main(int argc, char *argv[]) { options = 1; break; } + case 'N': { + client_configuration.aggregated_interests_ = true; + break; + } case 'P': { - client_configuration.producer_prefix_ = Prefix(optarg); - client_configuration.secure_ = true; + client_configuration.parallel_flows_ = + server_configuration.parallel_flows_ = std::stoull(optarg); break; } case 'c': { - client_configuration.producer_certificate = std::string(optarg); + client_configuration.producer_certificate_ = std::string(optarg); options = 1; break; } @@ -286,15 +379,35 @@ int main(int argc, char *argv[]) { options = 1; break; } + case 'U': { + client_configuration.manifest_factor_relevant_ = std::stoul(optarg); + options = 1; + break; + } + case 'u': { + client_configuration.manifest_factor_alert_ = std::stoul(optarg); + options = 1; + break; + } case 'j': { client_configuration.relay_ = true; client_configuration.relay_name_ = Prefix(optarg); options = 1; break; } + case 'H': { + client_configuration.print_headers_ = false; + options = 1; + break; + } + case 'n': { + client_configuration.nb_iterations_ = std::stoul(optarg); + options = 1; + break; + } // Server specific case 'A': { - server_configuration.download_size = std::stoul(optarg); + server_configuration.download_size_ = std::stoul(optarg); options = -1; break; } @@ -304,43 +417,44 @@ int main(int argc, char *argv[]) { break; } case 'r': { - server_configuration.virtual_producer = false; + server_configuration.virtual_producer_ = false; options = -1; break; } case 'm': { - server_configuration.manifest = true; + server_configuration.manifest_max_capacity_ = std::stoul(optarg); options = -1; break; } case 'l': { - server_configuration.live_production = true; + server_configuration.live_production_ = true; options = -1; break; } case 'K': { - server_configuration.keystore_name = std::string(optarg); + server_configuration.keystore_name_ = std::string(optarg); options = -1; break; } case 'y': { + CryptoHashType hash_algorithm = CryptoHashType::SHA256; if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA256; + hash_algorithm = CryptoHashType::SHA256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = CryptoHashType::SHA512; + hash_algorithm = CryptoHashType::SHA512; } else if (strncasecmp(optarg, "blake2b512", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512; + hash_algorithm = CryptoHashType::BLAKE2B512; } else if (strncasecmp(optarg, "blake2s256", 10) == 0) { - server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256; + hash_algorithm = CryptoHashType::BLAKE2S256; } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; + LoggerWarn() << "Unknown hash algorithm. Using SHA 256."; } + server_configuration.hash_algorithm_ = hash_algorithm; options = -1; break; } case 'p': { - server_configuration.keystore_password = std::string(optarg); + server_configuration.keystore_password_ = std::string(optarg); options = -1; break; } @@ -356,12 +470,16 @@ int main(int argc, char *argv[]) { options = -1; break; } - case 'E': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.secure_ = true; + case 'e': { + client_configuration.recovery_strategy_ = std::stoul(optarg); + options = 1; + break; + } + case 'X': { + server_configuration.fec_type_ = std::string(optarg); + options = -1; break; } - case 'h': default: usage(); return EXIT_FAILURE; @@ -369,34 +487,31 @@ int main(int argc, char *argv[]) { } if (options > 0 && role < 0) { - std::cerr << "Client options cannot be used when using the " - "software in server mode" - << std::endl; + LoggerErr() << "Client options cannot be used when using the " + "software in server mode"; usage(); return EXIT_FAILURE; } else if (options < 0 && role > 0) { - std::cerr << "Server options cannot be used when using the " - "software in client mode" - << std::endl; + LoggerErr() << "Server options cannot be used when using the " + "software in client mode"; usage(); return EXIT_FAILURE; } else if (!role) { - std::cerr << "Please specify if running hiperf as client " - "or server." - << std::endl; + LoggerErr() << "Please specify if running hiperf as client " + "or server."; usage(); return EXIT_FAILURE; } if (argv[optind] == 0) { - std::cerr << "Please specify the name/prefix to use." << std::endl; + LoggerErr() << "Please specify the name/prefix to use."; usage(); return EXIT_FAILURE; } else { if (role > 0) { - client_configuration.name = Name(argv[optind]); + client_configuration.name_ = Prefix(argv[optind]); } else { - server_configuration.name = Prefix(argv[optind]); + server_configuration.name_ = Prefix(argv[optind]); } } @@ -427,7 +542,7 @@ int main(int argc, char *argv[]) { config.set(); // Parse config file - transport::interface::global_config::parseConfigurationFile(conf_file); + global_conf.parseConfigurationFile(conf_file); if (role > 0) { HIperfClient c(client_configuration); @@ -453,4 +568,4 @@ int main(int argc, char *argv[]) { } // namespace hiperf -int main(int argc, char *argv[]) { return hiperf::main(argc, argv); } +int main(int argc, char *argv[]) { return hiperf::hiperf_main(argc, argv); } diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc index 968d42e2c..3f6c335f9 100644 --- a/apps/hiperf/src/server.cc +++ b/apps/hiperf/src/server.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -17,167 +17,92 @@ namespace hiperf { +using transport::core::ContentObject; +using transport::core::Interest; +using transport::core::Name; +using transport::interface::GeneralTransportOptions; +using transport::interface::ProducerCallbacksOptions; +using transport::interface::ProducerInterestCallback; +using transport::interface::ProducerSocket; +using transport::interface::ProductionProtocolAlgorithms; + /** * Hiperf server class: configure and setup an hicn producer following the * ServerConfiguration. */ class HIperfServer::Impl { - const std::size_t log2_content_object_buffer_size = 8; - - public: - Impl(const hiperf::ServerConfiguration &conf) - : configuration_(conf), - signals_(io_service_), - rtc_timer_(io_service_), - unsatisfied_interests_(), - content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), - content_objects_index_(0), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - last_segment_(0), -#ifndef _WIN32 - ptr_last_segment_(&last_segment_), - input_(io_service_), - rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), -#endif - flow_name_(configuration_.name.getName()), - socket_(io_service_), - recv_buffer_(nullptr, 0) { - std::string buffer(configuration_.payload_size_, 'X'); - std::cout << "Producing contents under name " << conf.name.getName() - << std::endl; -#ifndef _WIN32 - if (configuration_.interactive_) { - input_.assign(::dup(STDIN_FILENO)); - } -#endif - - for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); - content_objects_[i]->setLifetime( - default_values::content_object_expiry_time); - } - } - - void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { - content_objects_[content_objects_index_ & mask_]->setName( - interest.getName()); - producer_socket_->produce( - *content_objects_[content_objects_index_++ & mask_]); + static constexpr std::size_t klog2_content_object_buffer_size() { return 8; } + static constexpr std::size_t kcontent_object_buffer_size() { + return (1 << klog2_content_object_buffer_size()); } - - void processInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)VOID_HANDLER); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - - produceContent(p, interest.getName(), interest.getName().getSuffix()); - std::cout << "Received interest " << interest.getName().getSuffix() - << std::endl; + static constexpr std::size_t kmask() { + return (kcontent_object_buffer_size() - 1); } - void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&Impl::cacheMiss, this, - std::placeholders::_1, - std::placeholders::_2)); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - uint32_t suffix = interest.getName().getSuffix(); - - if (suffix == 0) { - last_segment_ = 0; - ptr_last_segment_ = &last_segment_; - unsatisfied_interests_.clear(); - } - - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed - if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); - if (it != unsatisfied_interests_.end()) { - suffix = *it; + /** + * @brief As we can (potentially) setup many producer sockets, we need to keep + * a separate context for each one of them. The context contains parameters + * and variable that are specific to a single producer socket. + */ + class ProducerContext + : public Base<ProducerContext, ServerConfiguration, Impl>, + public ProducerSocket::Callback { + public: + using ConfType = ServerConfiguration; + using ParentType = typename HIperfServer::Impl; + static inline const auto getContextType() { return "ProducerContext"; } + + ProducerContext(HIperfServer::Impl &server, int producer_identifier) + : Base(server, server.io_service_, producer_identifier) { + // Allocate buffer to copy as content objects payload + std::string buffer(configuration_.payload_size_, 'X'); + + // Allocate array of content objects. They are share_ptr so that the + // transport will only capture a reference to them instead of performing + // an hard copy. + for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) { + const auto &element = + content_objects_.emplace_back(std::make_shared<ContentObject>( + configuration_.name_.makeName(), configuration_.packet_format_, + 0, (const uint8_t *)buffer.data(), buffer.size())); + element->setLifetime( + transport::interface::default_values::content_object_expiry_time); } - unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); } - std::cout << "Received interest " << interest.getName().getSuffix() - << ", starting production at " << suffix << std::endl; - std::cout << unsatisfied_interests_.size() << " interests still unsatisfied" - << std::endl; - produceContentAsync(p, interest.getName(), suffix); - } - - void produceContent(ProducerSocket &p, const Name &content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - uint32_t total; - - utils::TimePoint t0 = utils::SteadyClock::now(); - total = p.produceStream(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); - - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() - << " us)" << std::endl; - } - - void produceContentAsync(ProducerSocket &p, Name content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); + // To make vector happy (move or copy constructor is needed when vector + // resizes) + ProducerContext(ProducerContext &&other) noexcept + : Base(std::move(other)), + content_objects_(std::move(other.content_objects_)), + unsatisfied_interests_(std::move(other.unsatisfied_interests_)), + last_segment_(other.last_segment_), + producer_socket_(std::move(other.producer_socket_)), + content_objects_index_(other.content_objects_index_), + payload_size_max_(other.payload_size_max_) {} - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); - } + virtual ~ProducerContext() = default; - void cacheMiss(ProducerSocket &p, const Interest &interest) { - unsatisfied_interests_.push_back(interest.getName().getSuffix()); - } + /** + * @brief Produce datagram + */ + void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const { + assert(producer_socket_); - void onContentProduced(ProducerSocket &p, const std::error_code &err, - uint64_t bytes_written) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind( - &Impl::asyncProcessInterest, this, - std::placeholders::_1, std::placeholders::_2)); - } + auto size = std::min(buffer_size, payload_size_max_); - std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path, - std::string &keystore_pwd, - CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type); + producer_socket_->produceDatagram(flow_name_, buffer, size); } - return std::make_shared<Identity>(keystore_path, keystore_pwd, - CryptoSuite::RSA_SHA256, 1024, 365, - "producer-test"); - } - int setup() { - int ret; - int production_protocol; - - if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - producer_socket_ = std::make_unique<P2PSecureProducerSocket>( - configuration_.rtc_, identity); - } else { + /** + * @brief Create and setup the producer socket + */ + int setup() { + int ret; + int production_protocol; + std::shared_ptr<transport::auth::Signer> signer = + std::make_shared<transport::auth::VoidSigner>(); + if (!configuration_.rtc_) { production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; } else { @@ -185,210 +110,470 @@ class HIperfServer::Impl { } producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); - } - if (producer_socket_->setSocketOption( - GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } + if (producer_socket_->setSocketOption( + ProducerCallbacksOptions::PRODUCER_CALLBACK, this) == + SOCKET_OPTION_NOT_SET) { + getOutputStream() << "Failed to set producer callback." << std::endl; + return ERROR_SETUP; + } - if (!configuration_.passphrase.empty()) { - std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>( - CryptoSuite::HMAC_SHA256, configuration_.passphrase); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); - } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::HASH_ALGORITHM, + configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_MAX_CAPACITY, + configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption(transport::interface::PACKET_FORMAT, + configuration_.packet_format_) == + SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; + return ERROR_SETUP; + } + + if (!configuration_.passphrase_.empty()) { + signer = std::make_shared<transport::auth::SymmetricSigner>( + transport::auth::CryptoSuite::HMAC_SHA256, + configuration_.passphrase_); + } + + if (!configuration_.keystore_name_.empty()) { + signer = std::make_shared<transport::auth::AsymmetricSigner>( + configuration_.keystore_name_, configuration_.keystore_password_); + } - if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - std::shared_ptr<Signer> signer = identity->getSigner(); producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer); - } - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); - producer_socket_->registerPrefix(configuration_.name); - producer_socket_->connect(); - - if (configuration_.rtc_) { - std::cout << "Running RTC producer: the prefix length will be ignored." - " Use /128 by default in RTC mode" - << std::endl; - return ERROR_SUCCESS; - } + // Compute maximum payload size + Packet::Format format = configuration_.packet_format_; + if (!configuration_.manifest_max_capacity_) + format = Packet::toAHFormat(format); + payload_size_max_ = PayloadSize(format).getPayloadSizeMax( + configuration_.rtc_ ? RTC_HEADER_SIZE : 0, + configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE, + !configuration_.manifest_max_capacity_ + ? signer->getSignatureFieldSize() + : 0); + + if (configuration_.payload_size_ > payload_size_max_) { + getOutputStream() << "WARNING: Payload has size " + << configuration_.payload_size_ << ", maximum is " + << payload_size_max_ + << ". Payload will be truncated to fit." << std::endl; + } + + // Verifier for aggregated interests + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::VoidVerifier>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + verifier = std::make_unique<transport::auth::SymmetricVerifier>( + configuration_.aggr_interest_passphrase_); + } + ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption( + transport::interface::RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = producer_socket_->setSocketOption( + GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } - if (!configuration_.virtual_producer) { if (producer_socket_->setSocketOption( GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) { + configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - if (producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + producer_socket_->registerPrefix(Prefix(flow_name_, 128)); + producer_socket_->connect(); + producer_socket_->start(); + + if (configuration_.rtc_) { + return ERROR_SUCCESS; } - if (!configuration_.live_production) { - produceContent(*producer_socket_, configuration_.name.getName(), 0); + if (!configuration_.virtual_producer_) { + if (producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAX_SEGMENT_SIZE, + static_cast<uint32_t>(configuration_.payload_size_)) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (!configuration_.live_production_) { + produceContent(*producer_socket_, configuration_.name_.makeName(), 0); + } else { + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::asyncProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } } else { ret = producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = producer_socket_->setSocketOption( ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this, - std::placeholders::_1, - std::placeholders::_2)); + (ProducerInterestCallback)bind( + &ProducerContext::virtualProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } } - } else { - ret = producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + (transport::interface::ProducerContentCallback)bind( + &ProducerContext::onContentProduced, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - ret = producer_socket_->setSocketOption( + return ERROR_SUCCESS; + } + + int run() { + getOutputStream() << "started to serve consumers with name " << flow_name_ + << std::endl; + return ERROR_SUCCESS; + } + + void stop() { + getOutputStream() << "stopped to serve consumers" << std::endl; + producer_socket_->stop(); + } + + private: + /** + * @brief Produce an existing content object. Set the name as the + * interest. + */ + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { + content_objects_[content_objects_index_ & kmask()]->setName( + interest.getName()); + p.produce(*content_objects_[content_objects_index_++ & kmask()]); + } + + /** + * @brief Create and produce a buffer of configuration_.download_size_ + * length. + */ + void produceContent(ProducerSocket &p, const Name &content_name, + uint32_t suffix) const { + uint32_t total; + + auto b = utils::MemBuf::create(configuration_.download_size_); + std::memset(b->writableData(), '?', configuration_.download_size_); + b->append(configuration_.download_size_); + + utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now(); + total = p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now(); + + LoggerInfo() << "Written " << total + << " data packets in output buffer (Segmentation time: " + << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)" + << std::endl; + } + + /** + * @brief Synchronously produce content upon reception of one interest + */ + void processInterest(ProducerSocket &p, const Interest &interest) const { + p.setSocketOption( ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this, - std::placeholders::_1, - std::placeholders::_2)); + (ProducerInterestCallback)transport::interface::VOID_HANDLER); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime_); - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + produceContent(p, interest.getName(), interest.getName().getSuffix()); + LoggerInfo() << "Received interest " << interest.getName().getSuffix() + << std::endl; + } + + /** + * @brief Async create and produce a buffer of + * configuration_.download_size_ length. + */ + void produceContentAsync(ProducerSocket &p, Name content_name, + uint32_t suffix) { + parent_.produce_thread_.add([this, suffix, content_name, &p]() { + auto b = utils::MemBuf::create(configuration_.download_size_); + std::memset(b->writableData(), '?', configuration_.download_size_); + b->append(configuration_.download_size_); + + last_segment_ = + suffix + p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, + suffix); + }); + } + + /** + * @brief Asynchronously produce content upon reception of one interest + */ + void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::cacheMiss, this, + std::placeholders::_1, std::placeholders::_2)); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime_); + uint32_t suffix = interest.getName().getSuffix(); + + if (suffix == 0) { + last_segment_ = 0; + unsatisfied_interests_.clear(); + } + + // The suffix will either come from the received interest or will be set + // to the smallest suffix of a previous interest not satisfied + if (!unsatisfied_interests_.empty()) { + auto it = std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), last_segment_); + if (it != unsatisfied_interests_.end()) { + suffix = *it; + } + unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); } + + getOutputStream() << " Received interest " + << interest.getName().getSuffix() + << ", starting production at " << suffix << end_mod_ + << std::endl; + getOutputStream() << unsatisfied_interests_.size() + << " interests still unsatisfied" << end_mod_ + << std::endl; + produceContentAsync(p, interest.getName(), suffix); + } + + /** + * @brief Register cache miss events + */ + void cacheMiss([[maybe_unused]] const ProducerSocket &p, + const Interest &interest) { + unsatisfied_interests_.push_back(interest.getName().getSuffix()); } - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CONTENT_PRODUCED, - (ProducerContentCallback)bind( - &Impl::onContentProduced, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + /** + * @brief When content is produced, set cache miss callback so that we can + * register any cache miss happening after the production. + */ + void onContentProduced(ProducerSocket &p, + [[maybe_unused]] const std::error_code &err, + [[maybe_unused]] uint64_t bytes_written) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::asyncProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + } - return ERROR_SUCCESS; + /** + * @brief Internal producer error. When this callback is triggered + * something important happened. Here we stop the program. + */ + void produceError(const std::error_code &err) noexcept override { + getOutputStream() << "Error from producer transport: " << err.message() + << std::endl; + parent_.stop(); + } + + // Members initialized in constructor + std::vector<ContentObject::Ptr> content_objects_; + + // Members initialized by in-class initializer + std::vector<uint32_t> unsatisfied_interests_; + std::uint32_t last_segment_{0}; + std::unique_ptr<ProducerSocket> producer_socket_{nullptr}; + std::uint16_t content_objects_index_{0}; + std::size_t payload_size_max_{0}; + }; + + public: + explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) { +#ifndef _WIN32 + if (config_.interactive_) { + input_.assign(::dup(STDIN_FILENO)); + } +#endif + + std::memset(rtc_payload_.data(), 'X', rtc_payload_.size()); + } + + ~Impl() = default; + + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; + } + + producer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = producer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); + + if (ret) { + break; + } + } + + return ret; } void receiveStream() { socket_.async_receive_from( - asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_, - [this](std::error_code ec, std::size_t length) { + asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()), + remote_, [this](const std::error_code &ec, std::size_t length) { if (ec) return; - sendRTCContentFromStream(recv_buffer_.first, length); + sendRTCContentFromStream(recv_buffer_.writableData(), length); receiveStream(); }); } - void sendRTCContentFromStream(uint8_t *buff, std::size_t len) { - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); + void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - uint8_t *start = (uint8_t *)payload->writableData(); + auto now = utils::SystemTime::nowMs().count(); + + auto start = rtc_payload_.data(); std::memcpy(start, &now, sizeof(uint64_t)); std::memcpy(start + sizeof(uint64_t), buff, len); - producer_socket_->produceDatagram(flow_name_, start, - len + sizeof(uint64_t)); + + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, len + sizeof(uint64_t)); + } } - void sendRTCContentObjectCallback(std::error_code ec) { + void sendRTCContentObjectCallback(const std::error_code &ec) { if (ec) return; rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, std::placeholders::_1)); - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); + + auto start = rtc_payload_.data(); // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + auto now = utils::SystemTime::nowMs().count(); + std::memcpy(start, &now, sizeof(uint64_t)); - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, config_.payload_size_); + } } - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) { if (ec) return; - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - - uint32_t packet_len = - configuration_.trace_[configuration_.trace_index_].size; + std::size_t packet_len = config_.trace_[config_.trace_index_].size; // this is used to compute the data packet delay // used only for performance evaluation // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + auto now = utils::SystemTime::nowMs().count(); + auto start = rtc_payload_.data(); + std::memcpy(start, &now, sizeof(uint64_t)); - if (packet_len > payload->length()) packet_len = payload->length(); - if (packet_len > 1400) packet_len = 1400; + if (packet_len > config_.payload_size_) { + packet_len = config_.payload_size_; + } - producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, packet_len); + } - uint32_t next_index = configuration_.trace_index_ + 1; + uint32_t next_index = config_.trace_index_ + 1; uint64_t schedule_next; - if (next_index < configuration_.trace_.size()) { - schedule_next = - configuration_.trace_[next_index].timestamp - - configuration_.trace_[configuration_.trace_index_].timestamp; + if (next_index < config_.trace_.size()) { + schedule_next = config_.trace_[next_index].timestamp - + config_.trace_[config_.trace_index_].timestamp; } else { // here we need to loop, schedule in a random time schedule_next = 1000; } - configuration_.trace_index_ = - (configuration_.trace_index_ + 1) % configuration_.trace_.size(); + config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size(); rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); rtc_timer_.async_wait( std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, std::placeholders::_1)); } + int parseTraceFile() { + std::ifstream trace(config_.trace_file_); + if (trace.fail()) { + return -1; + } + std::string line; + while (std::getline(trace, line)) { + std::istringstream iss(line); + hiperf::packet_t packet; + iss >> packet.timestamp >> packet.size; + config_.trace_.push_back(packet); + } + return 0; + } + #ifndef _WIN32 void handleInput(const std::error_code &error, std::size_t length) { if (error) { - producer_socket_->stop(); - io_service_.stop(); + stop(); } if (rtc_running_) { - std::cout << "stop real time content production" << std::endl; + LoggerInfo() << "stop real time content production" << std::endl; rtc_running_ = false; rtc_timer_.cancel(); } else { - std::cout << "start real time content production" << std::endl; + LoggerInfo() << "start real time content production" << std::endl; rtc_running_ = true; rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, std::placeholders::_1)); } @@ -401,46 +586,33 @@ class HIperfServer::Impl { } #endif - int parseTraceFile() { - std::ifstream trace(configuration_.trace_file_); - if (trace.fail()) { - return -1; - } - std::string line; - while (std::getline(trace, line)) { - std::istringstream iss(line); - hiperf::packet_t packet; - iss >> packet.timestamp >> packet.size; - configuration_.trace_.push_back(packet); + void stop() { + for (auto &producer_context : producer_contexts_) { + producer_context.stop(); } - return 0; + + io_service_.stop(); } int run() { - std::cerr << "Starting to serve consumers" << std::endl; - signals_.add(SIGINT); - signals_.async_wait([this](const std::error_code &, const int &) { - std::cout << "STOPPING!!" << std::endl; - producer_socket_->stop(); - io_service_.stop(); - }); + signals_.async_wait( + [this](const std::error_code &, const int &) { stop(); }); - if (configuration_.rtc_) { -#ifndef _WIN32 - if (configuration_.interactive_) { + if (config_.rtc_) { + if (config_.interactive_) { asio::async_read_until( input_, input_buffer_, '\n', std::bind(&Impl::handleInput, this, std::placeholders::_1, std::placeholders::_2)); - } else if (configuration_.trace_based_) { - std::cout << "trace-based mode enabled" << std::endl; - if (configuration_.trace_file_ == nullptr) { - std::cout << "cannot find the trace file" << std::endl; + } else if (config_.trace_based_) { + LoggerInfo() << "trace-based mode enabled" << std::endl; + if (config_.trace_file_ == nullptr) { + LoggerErr() << "cannot find the trace file" << std::endl; return ERROR_SETUP; } if (parseTraceFile() < 0) { - std::cout << "cannot parse the trace file" << std::endl; + LoggerErr() << "cannot parse the trace file" << std::endl; return ERROR_SETUP; } rtc_running_ = true; @@ -448,31 +620,26 @@ class HIperfServer::Impl { rtc_timer_.async_wait( std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, std::placeholders::_1)); - } else if (configuration_.input_stream_mode_) { + } else if (config_.input_stream_mode_) { rtc_running_ = true; - // crate socket + // create socket remote_ = asio::ip::udp::endpoint( - asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + asio::ip::address::from_string("127.0.0.1"), config_.port_); socket_.open(asio::ip::udp::v4()); socket_.bind(remote_); - recv_buffer_.first = (uint8_t *)malloc(1500); - recv_buffer_.second = 1500; receiveStream(); } else { rtc_running_ = true; rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, std::placeholders::_1)); } -#else - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, - std::placeholders::_1)); -#endif + } + + for (auto &producer_context : producer_contexts_) { + producer_context.run(); } io_service_.run(); @@ -480,34 +647,31 @@ class HIperfServer::Impl { return ERROR_SUCCESS; } + ServerConfiguration &getConfig() { return config_; } + private: - hiperf::ServerConfiguration configuration_; + // Variables initialized by the constructor. + ServerConfiguration config_; + + // Variable initialized in the in-class initializer list. asio::io_service io_service_; - asio::signal_set signals_; - asio::steady_timer rtc_timer_; - std::vector<uint32_t> unsatisfied_interests_; - std::vector<std::shared_ptr<ContentObject>> content_objects_; - std::uint16_t content_objects_index_; - std::uint16_t mask_; - std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; - std::unique_ptr<ProducerSocket> producer_socket_; -#ifndef _WIN32 - asio::posix::stream_descriptor input_; + asio::signal_set signals_{io_service_}; + asio::steady_timer rtc_timer_{io_service_}; + asio::posix::stream_descriptor input_{io_service_}; + asio::ip::udp::socket socket_{io_service_}; + std::vector<ProducerContext> producer_contexts_; + ::utils::EventThread produce_thread_; asio::streambuf input_buffer_; - bool rtc_running_; - Name flow_name_; - asio::ip::udp::socket socket_; + bool rtc_running_{false}; asio::ip::udp::endpoint remote_; - std::pair<uint8_t *, std::size_t> recv_buffer_; -#endif + utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU}; + std::array<uint8_t, HIPERF_MTU> rtc_payload_; }; -HIperfServer::HIperfServer(const ServerConfiguration &conf) { - impl_ = new Impl(conf); -} +HIperfServer::HIperfServer(const ServerConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} -HIperfServer::~HIperfServer() { delete impl_; } +HIperfServer::~HIperfServer() = default; int HIperfServer::setup() { return impl_->setup(); } diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h index 05407a807..d7420da48 100644 --- a/apps/hiperf/src/server.h +++ b/apps/hiperf/src/server.h @@ -21,14 +21,15 @@ namespace hiperf { class HIperfServer { public: - HIperfServer(const ServerConfiguration &conf); + explicit HIperfServer(const ServerConfiguration &conf); + ~HIperfServer(); int setup(); void run(); private: class Impl; - Impl *impl_; + std::unique_ptr<Impl> impl_; }; } // namespace hiperf
\ No newline at end of file |