summaryrefslogtreecommitdiffstats
path: root/apps/hiperf/src
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-06-09 21:34:09 +0200
committerLuca Muscariello <muscariello@ieee.org>2022-06-30 10:47:50 +0200
commit6b94663b2455e212009a544ae23bb6a8c55407f8 (patch)
tree0af780ce5eeb1009fd24b8af8af08e8368eda3bd /apps/hiperf/src
parenta1ac96f497719b897793ac14b287cb8d840651c1 (diff)
refactor(lib, hicn-light, vpp, hiperf): HICN-723
- move infra data structure into the shared lib - new packet cache using double hashing and lookup on prefix suffix - testing updates - authenticated requests using interest manifests Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'apps/hiperf/src')
-rw-r--r--apps/hiperf/src/client.cc1612
-rw-r--r--apps/hiperf/src/client.h12
-rw-r--r--apps/hiperf/src/common.h288
-rw-r--r--apps/hiperf/src/forwarder_config.h97
-rw-r--r--apps/hiperf/src/forwarder_interface.cc696
-rw-r--r--apps/hiperf/src/forwarder_interface.h143
-rw-r--r--apps/hiperf/src/main.cc159
-rw-r--r--apps/hiperf/src/server.cc816
-rw-r--r--apps/hiperf/src/server.h6
9 files changed, 1399 insertions, 2430 deletions
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index ba36cd20e..0e1f596c5 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -14,8 +14,7 @@
*/
#include <client.h>
-#include <forwarder_config.h>
-#include <forwarder_interface.h>
+#include <hicn/transport/portability/endianess.h>
#include <libconfig.h++>
@@ -31,1098 +30,831 @@ class Callback;
* Hiperf client class: configure and setup an hicn consumer following the
* ClientConfiguration.
*/
-class HIperfClient::Impl : ForwarderInterface::ICallback {
+class HIperfClient::Impl {
friend class Callback;
friend class RTCCallback;
- static const constexpr uint16_t log2_header_counter = 4;
-
- struct nack_packet_t {
- uint64_t timestamp;
- uint32_t prod_rate;
- uint32_t prod_seg;
-
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
-
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
-
- inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
- inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
- };
-
- public:
- Impl(const hiperf::ClientConfiguration &conf)
- : configuration_(conf),
- total_duration_milliseconds_(0),
- old_bytes_value_(0),
- old_interest_tx_value_(0),
- old_fec_interest_tx_value_(0),
- old_fec_data_rx_value_(0),
- old_lost_data_value_(0),
- old_bytes_recovered_value_(0),
- old_definitely_lost_data_value_(0),
- old_retx_value_(0),
- old_sent_int_value_(0),
- old_received_nacks_value_(0),
- old_fec_pkt_(0),
- avg_data_delay_(0),
- delay_sample_(0),
- received_bytes_(0),
- received_data_pkt_(0),
- auth_alerts_(0),
- data_delays_(""),
- signals_(io_service_),
- rtc_callback_(*this),
- callback_(*this),
- socket_(io_service_),
- // switch_threshold_(~0),
- fwd_connected_(false),
- use_bestpath_(false),
- rtt_threshold_(~0),
- loss_threshold_(~0),
- prefix_name_(""),
- prefix_len_(0),
- // done_(false),
- header_counter_mask_((1 << log2_header_counter) - 1),
- header_counter_(0),
- print_headers_(configuration_.print_headers_),
- first_(true),
- forwarder_interface_(io_service_) {
- setForwarderConnection(conf.forwarder_type_);
+ static inline constexpr uint16_t klog2_header_counter() { return 4; }
+ static inline constexpr uint16_t kheader_counter_mask() {
+ return (1 << klog2_header_counter()) - 1;
}
- virtual ~Impl() {}
-
- void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {}
-
- void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
-
- void addFace(const std::string &local_address, uint16_t local_port,
- const std::string &remote_address, uint16_t remote_port,
- std::string interface);
-
- void handleTimerExpiration(ConsumerSocket &c,
- const TransportStatistics &stats) {
- const char separator = ' ';
- const int width = 18;
-
- utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
- auto exact_duration = utils::SteadyTime::getDurationMs(t_stats_, t2);
-
- std::stringstream interval_ms;
- interval_ms << total_duration_milliseconds_ << "-"
- << total_duration_milliseconds_ + exact_duration.count();
-
- std::stringstream bytes_transferred;
- bytes_transferred << std::fixed << std::setprecision(3)
- << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
- << std::setfill(separator);
-
- std::stringstream bandwidth;
- bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
- (exact_duration.count()) / 1000.0
- << std::setfill(separator);
-
- std::stringstream window;
- window << stats.getAverageWindowSize() << std::setfill(separator);
+ class ConsumerContext
+ : public Base<ConsumerContext, ClientConfiguration, Impl>,
+ private ConsumerSocket::ReadCallback {
+ static inline const std::size_t kmtu = HIPERF_MTU;
- std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator);
-
- if (configuration_.rtc_) {
- std::stringstream lost_data;
- lost_data << stats.getLostData() - old_lost_data_value_
- << std::setfill(separator);
-
- std::stringstream bytes_recovered_data;
- bytes_recovered_data << stats.getBytesRecoveredData() -
- old_bytes_recovered_value_
- << std::setfill(separator);
-
- std::stringstream definitely_lost_data;
- definitely_lost_data << stats.getDefinitelyLostData() -
- old_definitely_lost_data_value_
- << std::setfill(separator);
-
- std::stringstream data_delay;
- data_delay << std::fixed << std::setprecision(3) << avg_data_delay_
- << std::setfill(separator);
-
- std::stringstream received_data_pkt;
- received_data_pkt << received_data_pkt_ << std::setfill(separator);
-
- std::stringstream goodput;
- goodput << std::fixed << std::setprecision(3)
- << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
- << std::setfill(separator);
-
- std::stringstream loss_rate;
- loss_rate << std::fixed << std::setprecision(2)
- << stats.getLossRatio() * 100.0 << std::setfill(separator);
-
- std::stringstream retx_sent;
- retx_sent << stats.getRetxCount() - old_retx_value_
- << std::setfill(separator);
-
- std::stringstream interest_sent;
- interest_sent << stats.getInterestTx() - old_sent_int_value_
- << std::setfill(separator);
+ public:
+ using ConfType = ClientConfiguration;
+ using ParentType = typename HIperfClient::Impl;
+ static inline auto getContextType() -> std::string {
+ return "ConsumerContext";
+ }
+
+ ConsumerContext(Impl &client, int consumer_identifier)
+ : Base(client, client.io_service_, consumer_identifier),
+ receive_buffer_(
+ utils::MemBuf::create(client.config_.receive_buffer_size_)),
+ socket_(client.io_service_),
+ payload_size_max_(PayloadSize(client.config_.packet_format_)
+ .getPayloadSizeMax(RTC_HEADER_SIZE)),
+ nb_iterations_(client.config_.nb_iterations_) {}
+
+ ConsumerContext(ConsumerContext &&other) noexcept
+ : Base(std::move(other)),
+ receive_buffer_(std::move(other.receive_buffer_)),
+ socket_(std::move(other.socket_)),
+ payload_size_max_(other.payload_size_max_),
+ remote_(std::move(other.remote_)),
+ nb_iterations_(other.nb_iterations_),
+ saved_stats_(std::move(other.saved_stats_)),
+ header_counter_(other.header_counter_),
+ first_(other.first_),
+ consumer_socket_(std::move(other.consumer_socket_)),
+ producer_socket_(std::move(other.producer_socket_)) {}
+
+ ~ConsumerContext() override = default;
+
+ /***************************************************************
+ * ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- std::stringstream nacks;
- nacks << stats.getReceivedNacks() - old_received_nacks_value_
- << std::setfill(separator);
+ bool isBufferMovable() noexcept override { return false; }
- std::stringstream fec_pkt;
- fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
- << std::setfill(separator);
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer = receive_buffer_->writableData();
- std::stringstream queuing_delay;
- queuing_delay << std::fixed << std::setprecision(3)
- << stats.getQueuingDelay() << std::setfill(separator);
+ if (configuration_.rtc_) {
+ *max_length = kmtu;
+ } else {
+ *max_length = configuration_.receive_buffer_size_;
+ }
+ }
- std::stringstream residual_losses;
- double rl_perc = stats.getResidualLossRate() * 100;
- residual_losses << std::fixed << std::setprecision(2) << rl_perc
- << std::setfill(separator);
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ // Nothing to do here
+ auto ret = std::move(buffer);
+ }
- std::stringstream quality_score;
- quality_score << std::fixed << (int)stats.getQualityScore()
- << std::setfill(separator);
+ void readDataAvailable(std::size_t length) noexcept override {
+ if (configuration_.rtc_) {
+ saved_stats_.received_bytes_ += length;
+ saved_stats_.received_data_pkt_++;
- std::stringstream alerts;
- alerts << stats.getAlerts() << std::setfill(separator);
+ // collecting delay stats. Just for performance testing
+ auto senderTimeStamp =
+ *reinterpret_cast<uint64_t *>(receive_buffer_->writableData());
- std::stringstream auth_alerts;
- auth_alerts << auth_alerts_ << std::setfill(separator);
+ auto now = utils::SystemTime::nowMs().count();
+ auto new_delay = double(now - senderTimeStamp);
- if (fwd_connected_ && use_bestpath_ &&
- ((stats.getAverageRtt() > rtt_threshold_) ||
- ((stats.getResidualLossRate() * 100) > loss_threshold_))) {
- forwarder_interface_.setStrategy(prefix_name_, prefix_len_, "bestpath");
- }
+ if (senderTimeStamp > now)
+ new_delay = -1 * double(senderTimeStamp - now);
- if ((header_counter_ == 0 && print_headers_) || first_) {
- std::cout << std::right << std::setw(width) << "Interval[ms]";
- std::cout << std::right << std::setw(width) << "RecvData[pkt]";
- std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]";
- std::cout << std::right << std::setw(width) << "Goodput[Mbps]";
- std::cout << std::right << std::setw(width) << "LossRate[%]";
- std::cout << std::right << std::setw(width) << "Retr[pkt]";
- std::cout << std::right << std::setw(width) << "InterestSent";
- std::cout << std::right << std::setw(width) << "ReceivedNacks";
- std::cout << std::right << std::setw(width) << "SyncWnd[pkt]";
- std::cout << std::right << std::setw(width) << "MinRtt[ms]";
- std::cout << std::right << std::setw(width) << "QueuingDelay[ms]";
- std::cout << std::right << std::setw(width) << "LostData[pkt]";
- std::cout << std::right << std::setw(width) << "RecoveredData";
- std::cout << std::right << std::setw(width) << "DefinitelyLost";
- std::cout << std::right << std::setw(width) << "State";
- std::cout << std::right << std::setw(width) << "DataDelay[ms]";
- std::cout << std::right << std::setw(width) << "FecPkt";
- std::cout << std::right << std::setw(width) << "Congestion";
- std::cout << std::right << std::setw(width) << "ResidualLosses";
- std::cout << std::right << std::setw(width) << "QualityScore";
- std::cout << std::right << std::setw(width) << "Alerts";
- std::cout << std::right << std::setw(width) << "AuthAlerts"
- << std::endl;
-
- first_ = false;
- }
+ saved_stats_.delay_sample_++;
+ saved_stats_.avg_data_delay_ =
+ saved_stats_.avg_data_delay_ +
+ (double(new_delay) - saved_stats_.avg_data_delay_) /
+ saved_stats_.delay_sample_;
- std::cout << std::right << std::setw(width) << interval_ms.str();
- std::cout << std::right << std::setw(width) << received_data_pkt.str();
- std::cout << std::right << std::setw(width) << bandwidth.str();
- std::cout << std::right << std::setw(width) << goodput.str();
- std::cout << std::right << std::setw(width) << loss_rate.str();
- std::cout << std::right << std::setw(width) << retx_sent.str();
- std::cout << std::right << std::setw(width) << interest_sent.str();
- std::cout << std::right << std::setw(width) << nacks.str();
- std::cout << std::right << std::setw(width) << window.str();
- std::cout << std::right << std::setw(width) << avg_rtt.str();
- std::cout << std::right << std::setw(width) << queuing_delay.str();
- std::cout << std::right << std::setw(width) << lost_data.str();
- std::cout << std::right << std::setw(width) << bytes_recovered_data.str();
- std::cout << std::right << std::setw(width) << definitely_lost_data.str();
- std::cout << std::right << std::setw(width) << stats.getCCStatus();
- std::cout << std::right << std::setw(width) << data_delay.str();
- std::cout << std::right << std::setw(width) << fec_pkt.str();
- std::cout << std::right << std::setw(width) << stats.isCongested();
- std::cout << std::right << std::setw(width) << residual_losses.str();
- std::cout << std::right << std::setw(width) << quality_score.str();
- std::cout << std::right << std::setw(width) << alerts.str();
- std::cout << std::right << std::setw(width) << auth_alerts.str();
- std::cout << std::endl;
-
- if (configuration_.test_mode_) {
- if (data_delays_.size() > 0) data_delays_.pop_back();
-
- auto now = utils::SteadyTime::nowMs();
- std::cout << std::fixed << std::setprecision(0) << now.count()
- << " DATA-DELAYS:[" << data_delays_ << "]" << std::endl;
- }
+ if (configuration_.test_mode_) {
+ saved_stats_.data_delays_ += std::to_string(int(new_delay));
+ saved_stats_.data_delays_ += ",";
+ }
- // statistics not yet available in the transport
- // std::cout << std::right << std::setw(width) << interest_fec_tx.str();
- // std::cout << std::right << std::setw(width) << bytes_fec_recv.str();
- } else {
- if ((header_counter_ == 0 && print_headers_) || first_) {
- std::cout << std::right << std::setw(width) << "Interval[ms]";
- std::cout << std::right << std::setw(width) << "Transfer[MB]";
- std::cout << std::right << std::setw(width) << "Bandwidth[Mbps]";
- std::cout << std::right << std::setw(width) << "Retr[pkt]";
- std::cout << std::right << std::setw(width) << "Cwnd[Int]";
- std::cout << std::right << std::setw(width) << "AvgRtt[ms]"
- << std::endl;
-
- first_ = false;
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ producer_socket_->produceDatagram(
+ configuration_.relay_name_.makeName(),
+ receive_buffer_->writableData(),
+ length < payload_size_max_ ? length : payload_size_max_);
+ }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ const uint8_t *start = receive_buffer_->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ socket_.send_to(asio::buffer(start, pkt_len), remote_);
+ }
}
-
- std::cout << std::right << std::setw(width) << interval_ms.str();
- std::cout << std::right << std::setw(width) << bytes_transferred.str();
- std::cout << std::right << std::setw(width) << bandwidth.str();
- std::cout << std::right << std::setw(width) << stats.getRetxCount();
- std::cout << std::right << std::setw(width) << window.str();
- std::cout << std::right << std::setw(width) << avg_rtt.str() << std::endl;
- }
-
- total_duration_milliseconds_ += (uint32_t)exact_duration.count();
- old_bytes_value_ = stats.getBytesRecv();
- old_lost_data_value_ = stats.getLostData();
- old_bytes_recovered_value_ = stats.getBytesRecoveredData();
- old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
- old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
- old_fec_data_rx_value_ = stats.getBytesFecRecv();
- old_retx_value_ = stats.getRetxCount();
- old_sent_int_value_ = stats.getInterestTx();
- old_received_nacks_value_ = stats.getReceivedNacks();
- old_fec_pkt_ = stats.getReceivedFEC();
- delay_sample_ = 0;
- avg_data_delay_ = 0;
- received_bytes_ = 0;
- received_data_pkt_ = 0;
- data_delays_ = "";
-
- t_stats_ = utils::SteadyTime::Clock::now();
-
- header_counter_ = (header_counter_ + 1) & header_counter_mask_;
-
- if (--configuration_.nb_iterations_ == 0) {
- // We reached the maximum nb of runs. Stop now.
- io_service_.stop();
- }
- }
-
- bool setForwarderConnection(forwarder_type_t forwarder_type) {
- using namespace libconfig;
- Config cfg;
-
- const char *conf_file = getenv("FORWARDER_CONFIG");
- if (!conf_file) return false;
-
- if ((forwarder_type != HICNLIGHT) && (forwarder_type != HICNLIGHT_NG))
- return false;
-
- try {
- cfg.readFile(conf_file);
- } catch (const FileIOException &fioex) {
- std::cerr << "I/O error while reading file." << std::endl;
- return false;
- } catch (const ParseException &pex) {
- std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
- << " - " << pex.getError() << std::endl;
- return false;
}
- Setting &config = cfg.getRoot();
-
- /* conf file example
- *
- * use_bestpath = "ON | OFF"
- * rtt_threshold = 200 //ms
- * loss_threshold = 20 //%
- * name = "b001::/16"
- */
-
- if (config.exists("use_bestpath")) {
- std::string val;
- config.lookupValue("use_bestpath", val);
- if (val.compare("ON") == 0) use_bestpath_ = true;
+ size_t maxBufferSize() const override {
+ return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_;
}
- if (config.exists("rtt_threshold")) {
- unsigned val;
- config.lookupValue("rtt_threshold", val);
- rtt_threshold_ = val;
+ void readError(const std::error_code &ec) noexcept override {
+ getOutputStream() << "Error " << ec.message()
+ << " while reading from socket" << std::endl;
+ parent_.io_service_.stop();
}
- if (config.exists("loss_threshold")) {
- unsigned val;
- config.lookupValue("loss_threshold", val);
- loss_threshold_ = val;
- }
+ void readSuccess(std::size_t total_size) noexcept override {
+ if (configuration_.rtc_) {
+ getOutputStream() << "Data successfully read" << std::endl;
+ } else {
+ auto t2 = utils::SteadyTime::now();
+ auto dt =
+ utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2);
+ auto usec = dt.count();
- if (config.exists("name")) {
- std::string route;
- config.lookupValue("name", route);
+ getOutputStream() << "Content retrieved. Size: " << total_size
+ << " [Bytes]" << std::endl;
- std::string delimiter = "/";
- size_t pos = 0;
+ getOutputStream() << "Elapsed Time: " << usec / 1000000.0
+ << " seconds -- "
+ << double(total_size * 8) * 1.0 / double(usec) * 1.0
+ << " [Mbps]" << std::endl;
- if ((pos = route.find(delimiter)) != std::string::npos) {
- prefix_name_ = route.substr(0, pos);
- route.erase(0, pos + delimiter.length());
- prefix_len_ = std::stoul(route.substr(0));
+ parent_.io_service_.stop();
}
}
- forwarder_interface_.initForwarderInterface(this, forwarder_type);
-
- return true;
- }
+ /***************************************************************
+ * End of ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- void onHicnServiceReady() override {
- std::cout << "Successfully connected to local forwarder!" << std::endl;
- fwd_connected_ = true;
- }
+ private:
+ struct SavedStatistics {
+ utils::SteadyTime::TimePoint t_stats_{};
+ utils::SteadyTime::TimePoint t_download_{};
+ uint32_t total_duration_milliseconds_{0};
+ uint64_t old_bytes_value_{0};
+ uint64_t old_interest_tx_value_{0};
+ uint64_t old_fec_interest_tx_value_{0};
+ uint64_t old_fec_data_rx_value_{0};
+ uint64_t old_lost_data_value_{0};
+ uint64_t old_bytes_recovered_value_{0};
+ uint64_t old_definitely_lost_data_value_{0};
+ uint64_t old_retx_value_{0};
+ uint64_t old_sent_int_value_{0};
+ uint64_t old_received_nacks_value_{0};
+ uint32_t old_fec_pkt_{0};
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_{0};
+ uint32_t delay_sample_{0};
+ uint32_t received_bytes_{0};
+ uint32_t received_data_pkt_{0};
+ uint32_t auth_alerts_{0};
+ std::string data_delays_{""};
+ };
+
+ /***************************************************************
+ * Transport callbacks
+ ***************************************************************/
+
+ void checkReceivedRtcContent(
+ [[maybe_unused]] const ConsumerSocket &c,
+ [[maybe_unused]] const ContentObject &content_object) const {
+ // Nothing to do here
+ }
+
+ void processLeavingInterest(const ConsumerSocket & /*c*/,
+ const Interest & /*interest*/) const {
+ // Nothing to do here
+ }
+
+ transport::auth::VerificationPolicy onAuthFailed(
+ transport::auth::Suffix /*suffix*/,
+ transport::auth::VerificationPolicy /*policy*/) {
+ saved_stats_.auth_alerts_++;
+ return transport::auth::VerificationPolicy::ACCEPT;
+ }
+
+ void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 18;
+
+ utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
+ auto exact_duration =
+ utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2);
+
+ std::stringstream interval_ms;
+ interval_ms << saved_stats_.total_duration_milliseconds_ << "-"
+ << saved_stats_.total_duration_milliseconds_ +
+ exact_duration.count();
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) /
+ 1000000.0
+ << std::setfill(separator);
+
+ std::stringstream bandwidth;
+ bandwidth << (double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) *
+ 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
- void onRouteConfigured(
- std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
- std::cout << "Routes successfully configured!" << std::endl;
- }
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator);
-#ifdef FORWARDER_INTERFACE
- bool parseConfig(const char *conf_file) {
- using namespace libconfig;
- Config cfg;
-
- try {
- cfg.readFile(conf_file);
- } catch (const FileIOException &fioex) {
- std::cerr << "I/O error while reading file." << std::endl;
- return false;
- } catch (const ParseException &pex) {
- std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
- << " - " << pex.getError() << std::endl;
- return false;
- }
+ std::stringstream avg_rtt;
+ avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt()
+ << std::setfill(separator);
- Setting &config = cfg.getRoot();
+ if (configuration_.rtc_) {
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ saved_stats_.old_bytes_recovered_value_
+ << std::setfill(separator);
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ saved_stats_.old_definitely_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream data_delay;
+ data_delay << std::fixed << std::setprecision(3)
+ << saved_stats_.avg_data_delay_ << std::setfill(separator);
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << saved_stats_.received_data_pkt_
+ << std::setfill(separator);
+
+ std::stringstream goodput;
+ goodput << std::fixed << std::setprecision(3)
+ << (saved_stats_.received_bytes_ * 8.0) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
- if (config.exists("switch_threshold")) {
- unsigned threshold;
- config.lookupValue("switch_threshold", threshold);
- switch_threshold_ = threshold;
- }
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator);
- // listeners
- if (config.exists("listeners")) {
- // get path where looking for modules
- const Setting &listeners = config.lookup("listeners");
- auto count = listeners.getLength();
-
- for (int i = 0; i < count; i++) {
- const Setting &listener = listeners[i];
- ListenerConfig list;
- unsigned port;
- std::string interface;
-
- list.name = listener.getName();
- listener.lookupValue("local_address", list.address);
- listener.lookupValue("local_port", port);
- listener.lookupValue("interface", list.interface);
- list.port = (uint16_t)(port);
-
- std::cout << "Adding listener " << list.name << ", ( " << list.address
- << ":" << list.port << ")" << std::endl;
- config_.addListener(std::move(list));
- }
- }
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_
+ << std::setfill(separator);
- // connectors
- if (config.exists("connectors")) {
- // get path where looking for modules
- const Setting &connectors = config.lookup("connectors");
- auto count = connectors.getLength();
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() -
+ saved_stats_.old_sent_int_value_
+ << std::setfill(separator);
- for (int i = 0; i < count; i++) {
- const Setting &connector = connectors[i];
- ConnectorConfig conn;
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() -
+ saved_stats_.old_received_nacks_value_
+ << std::setfill(separator);
- conn.name = connector.getName();
- unsigned port = 0;
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_
+ << std::setfill(separator);
- if (!connector.lookupValue("local_address", conn.local_address)) {
- conn.local_address = "";
- }
+ std::stringstream queuing_delay;
+ queuing_delay << std::fixed << std::setprecision(3)
+ << stats.getQueuingDelay() << std::setfill(separator);
- if (!connector.lookupValue("local_port", port)) {
- port = 0;
- }
+ std::stringstream residual_losses;
+ double rl_perc = stats.getResidualLossRate() * 100;
+ residual_losses << std::fixed << std::setprecision(2) << rl_perc
+ << std::setfill(separator);
- conn.local_port = (uint16_t)(port);
+ std::stringstream quality_score;
+ quality_score << std::fixed << (int)stats.getQualityScore()
+ << std::setfill(separator);
- if (!connector.lookupValue("remote_address", conn.remote_address)) {
- std::cerr
- << "Error in configuration file: remote_address is a mandatory "
- "field of Connectors."
- << std::endl;
- return false;
+ std::stringstream alerts;
+ alerts << stats.getAlerts() << std::setfill(separator);
+
+ std::stringstream auth_alerts;
+ auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator);
+
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecvData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Goodput[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "LossRate[%]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "InterestSent";
+ getOutputStream()
+ << std::right << std::setw(width) << "ReceivedNacks";
+ getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "QueuingDelay[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "LostData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecoveredData";
+ getOutputStream()
+ << std::right << std::setw(width) << "DefinitelyLost";
+ getOutputStream() << std::right << std::setw(width) << "State";
+ getOutputStream()
+ << std::right << std::setw(width) << "DataDelay[ms]";
+ getOutputStream() << std::right << std::setw(width) << "FecPkt";
+ getOutputStream() << std::right << std::setw(width) << "Congestion";
+ getOutputStream()
+ << std::right << std::setw(width) << "ResidualLosses";
+ getOutputStream() << std::right << std::setw(width) << "QualityScore";
+ getOutputStream() << std::right << std::setw(width) << "Alerts";
+ getOutputStream()
+ << std::right << std::setw(width) << "AuthAlerts" << std::endl;
+
+ first_ = false;
}
- if (!connector.lookupValue("remote_port", port)) {
- std::cerr << "Error in configuration file: remote_port is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << received_data_pkt.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width) << goodput.str();
+ getOutputStream() << std::right << std::setw(width) << loss_rate.str();
+ getOutputStream() << std::right << std::setw(width) << retx_sent.str();
+ getOutputStream() << std::right << std::setw(width)
+ << interest_sent.str();
+ getOutputStream() << std::right << std::setw(width) << nacks.str();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << queuing_delay.str();
+ getOutputStream() << std::right << std::setw(width) << lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_recovered_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << definitely_lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getCCStatus();
+ getOutputStream() << std::right << std::setw(width) << data_delay.str();
+ getOutputStream() << std::right << std::setw(width) << fec_pkt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.isCongested();
+ getOutputStream() << std::right << std::setw(width)
+ << residual_losses.str();
+ getOutputStream() << std::right << std::setw(width)
+ << quality_score.str();
+ getOutputStream() << std::right << std::setw(width) << alerts.str();
+ getOutputStream() << std::right << std::setw(width) << auth_alerts.str()
+ << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (saved_stats_.data_delays_.size() > 0)
+ saved_stats_.data_delays_.pop_back();
+
+ auto now = utils::SteadyTime::nowMs();
+ getOutputStream() << std::fixed << std::setprecision(0) << now.count()
+ << " DATA-DELAYS:[" << saved_stats_.data_delays_
+ << "]" << std::endl;
}
-
- if (!connector.lookupValue("interface", conn.interface)) {
- std::cerr << "Error in configuration file: interface is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ } else {
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream() << std::right << std::setw(width) << "Transfer[MB]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]";
+ getOutputStream()
+ << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl;
+
+ first_ = false;
}
- conn.remote_port = (uint16_t)(port);
-
- std::cout << "Adding connector " << conn.name << ", ("
- << conn.local_address << ":" << conn.local_port << " "
- << conn.remote_address << ":" << conn.remote_port << ")"
- << std::endl;
- config_.addConnector(conn.name, std::move(conn));
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_transferred.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getRetxCount();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str()
+ << std::endl;
}
- }
- // Routes
- if (config.exists("routes")) {
- const Setting &routes = config.lookup("routes");
- auto count = routes.getLength();
-
- for (int i = 0; i < count; i++) {
- const Setting &route = routes[i];
- RouteConfig r;
- unsigned weight;
-
- r.name = route.getName();
- route.lookupValue("prefix", r.prefix);
- route.lookupValue("weight", weight);
- route.lookupValue("main_connector", r.main_connector);
- route.lookupValue("backup_connector", r.backup_connector);
- r.weight = (uint16_t)(weight);
-
- std::cout << "Adding route " << r.name << " " << r.prefix << " ("
- << r.main_connector << " " << r.backup_connector << " "
- << r.weight << ")" << std::endl;
- config_.addRoute(std::move(r));
+ saved_stats_.total_duration_milliseconds_ +=
+ (uint32_t)exact_duration.count();
+ saved_stats_.old_bytes_value_ = stats.getBytesRecv();
+ saved_stats_.old_lost_data_value_ = stats.getLostData();
+ saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ saved_stats_.old_definitely_lost_data_value_ =
+ stats.getDefinitelyLostData();
+ saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ saved_stats_.old_retx_value_ = stats.getRetxCount();
+ saved_stats_.old_sent_int_value_ = stats.getInterestTx();
+ saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks();
+ saved_stats_.old_fec_pkt_ = stats.getReceivedFEC();
+ saved_stats_.delay_sample_ = 0;
+ saved_stats_.avg_data_delay_ = 0;
+ saved_stats_.received_bytes_ = 0;
+ saved_stats_.received_data_pkt_ = 0;
+ saved_stats_.data_delays_ = "";
+ saved_stats_.t_stats_ = utils::SteadyTime::Clock::now();
+
+ header_counter_ = (header_counter_ + 1) & kheader_counter_mask();
+
+ if (--nb_iterations_ == 0) {
+ // We reached the maximum nb of runs. Stop now.
+ parent_.io_service_.stop();
}
}
- std::cout << "Ok" << std::endl;
+ /***************************************************************
+ * Setup functions
+ ***************************************************************/
- return true;
- }
+ int setupRTCSocket() {
+ int ret = ERROR_SUCCESS;
- bool splitRoute(std::string route, std::string &prefix,
- uint8_t &prefix_length) {
- std::string delimiter = "/";
-
- size_t pos = 0;
- if ((pos = route.find(delimiter)) != std::string::npos) {
- prefix = route.substr(0, pos);
- route.erase(0, pos + delimiter.length());
- } else {
- return false;
- }
+ configuration_.transport_protocol_ = RTC;
- prefix_length = std::stoul(route.substr(0));
- return true;
- }
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ =
+ std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ producer_socket_->start();
+ }
- void onHicnServiceReady() override {
- std::cout << "Successfully connected to local forwarder!" << std::endl;
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
- std::cout << "Setting up listeners" << std::endl;
- const char *config = getenv("FORWARDER_CONFIG");
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- if (config) {
- if (!parseConfig(config)) {
- return;
+ RtcTransportRecoveryStrategies recovery_strategy =
+ RtcTransportRecoveryStrategies::RTX_ONLY;
+ switch (configuration_.recovery_strategy_) {
+ case 1:
+ recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF;
+ break;
+ case 2:
+ recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY;
+ break;
+ case 3:
+ recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY;
+ break;
+ case 4:
+ recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED;
+ break;
+ case 5:
+ recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE;
+ break;
+ case 6:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH;
+ break;
+ case 7:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION;
+ break;
+ case 8:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES;
+ break;
+ case 9:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES;
+ break;
+ case 10:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH;
+ break;
+ case 11:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION;
+ break;
+ default:
+ break;
}
- // Create faces and route using first face in the list.
- auto &routes = config_.getRoutes();
- auto &connectors = config_.getConnectors();
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ static_cast<uint32_t>(recovery_strategy));
- if (routes.size() == 0 || connectors.size() == 0) {
- std::cerr << "Nothing to configure" << std::endl;
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- for (auto &route : routes) {
- auto the_connector_it = connectors.find(route.main_connector);
- if (the_connector_it == connectors.end()) {
- std::cerr << "No valid main connector found for route " << route.name
- << std::endl;
- continue;
- }
-
- auto &the_connector = the_connector_it->second;
- auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
- route_info->family = AF_INET;
- route_info->local_addr = the_connector.local_address;
- route_info->local_port = the_connector.local_port;
- route_info->remote_addr = the_connector.remote_address;
- route_info->remote_port = the_connector.remote_port;
- route_info->interface = the_connector.interface;
- route_info->name = the_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- route_info->route_addr = prefix;
- route_info->route_len = prefix_length;
-
- main_routes_.emplace_back(route_info);
-
- if (!route.backup_connector.empty()) {
- // Add also the backup route
- auto the_backup_connector_it =
- connectors.find(route.backup_connector);
- if (the_backup_connector_it == connectors.end()) {
- std::cerr << "No valid backup connector found for route "
- << route.name << std::endl;
- continue;
- }
-
- auto &the_backup_connector = the_backup_connector_it->second;
- auto backup_route_info =
- std::make_shared<ForwarderInterface::RouteInfo>();
- backup_route_info->family = AF_INET;
- backup_route_info->local_addr = the_backup_connector.local_address;
- backup_route_info->local_port = the_backup_connector.local_port;
- backup_route_info->remote_addr = the_backup_connector.remote_address;
- backup_route_info->remote_port = the_backup_connector.remote_port;
- backup_route_info->interface = the_backup_connector.interface;
- backup_route_info->name = the_backup_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
-
- backup_route_info->route_addr = prefix;
- backup_route_info->route_len = prefix_length;
-
- backup_routes_.emplace_back(backup_route_info);
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create main routes
- std::cout << "Creating main routes" << std::endl;
- forwarder_interface_.createFaceAndRoutes(main_routes_);
- }
- }
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::CONTENT_SHARING_MODE,
+ configuration_.content_sharing_mode_);
- void onRouteConfigured(
- std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
- std::cout << "Routes successfully configured!" << std::endl;
- }
-#endif
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- transport::auth::VerificationPolicy onAuthFailed(
- transport::auth::Suffix suffix,
- transport::auth::VerificationPolicy policy) {
- auth_alerts_++;
- return transport::auth::VerificationPolicy::ACCEPT;
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (ConsumerContentObjectCallback)std::bind(
+ &Impl::ConsumerContext::checkReceivedRtcContent, this,
+ std::placeholders::_1, std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int setup() {
- int ret;
+ std::shared_ptr<TransportStatistics> transport_stats;
+ ret = consumer_socket_->getSocketOption(
+ OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
- if (configuration_.rtc_) {
- configuration_.transport_protocol_ = RTC;
- } else if (configuration_.window < 0) {
- configuration_.transport_protocol_ = RAAQM;
- } else {
- configuration_.transport_protocol_ = CBR;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (configuration_.relay_ && configuration_.rtc_) {
- int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
- producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- producer_socket_->registerPrefix(configuration_.relay_name_);
- producer_socket_->connect();
- producer_socket_->start();
+ return ERROR_SUCCESS;
}
- if (configuration_.output_stream_mode_ && configuration_.rtc_) {
- remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
- socket_.open(asio::ip::udp::v4());
- }
+ int setupRAAQMSocket() {
+ int ret = ERROR_SUCCESS;
+
+ configuration_.transport_protocol_ = RAAQM;
- if (configuration_.secure_) {
- consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
- RAAQM, configuration_.transport_protocol_);
- if (configuration_.producer_prefix_.getPrefixLength() == 0) {
- std::cerr << "ERROR -- Missing producer prefix on which perform the "
- "handshake."
- << std::endl;
- } else {
- P2PSecureConsumerSocket &secure_consumer_socket =
- *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
- secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
- }
- } else {
consumer_socket_ =
std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- }
- consumer_socket_->setSocketOption(
- GeneralTransportOptions::INTEREST_LIFETIME,
- configuration_.interest_lifetime_);
-
- consumer_socket_->setSocketOption(
- GeneralTransportOptions::UNVERIFIED_INTERVAL,
- configuration_.unverified_interval_);
-
- consumer_socket_->setSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
- configuration_.unverified_ratio_);
-
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::PACKET_FORMAT,
- configuration_.packet_format_) == SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the packet format." << std::endl;
- return ERROR_SETUP;
- }
-
-#if defined(DEBUG) && defined(__linux__)
- std::shared_ptr<transport::BasePortal> portal;
- consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- signals_ =
- std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
- signals_->async_wait([this](const std::error_code &, const int &) {
- std::cout << "Signal SIGUSR1!" << std::endl;
- mtrace();
- });
-
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
- [this](notification::Strategy strategy) {
- std::cout << "Forwarder strategy callback" << std::endl;
- });
- if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
-
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
- [this](notification::Strategy strategy) {
- std::cout << "Recovery strategy callback" << std::endl;
- });
- if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
-#endif
-
- if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
- configuration_.window) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the size of the window."
- << std::endl;
- return ERROR_SETUP;
- }
-
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.beta != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
- configuration_.beta) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.beta_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::BETA_VALUE, configuration_.beta_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.drop_factor != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- configuration_.drop_factor) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.drop_factor_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
-
- std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>();
- if (!configuration_.producer_certificate.empty()) {
- verifier = std::make_shared<AsymmetricVerifier>(
- configuration_.producer_certificate);
+ return ERROR_SUCCESS;
}
- if (!configuration_.passphrase.empty()) {
- verifier = std::make_shared<SymmetricVerifier>(configuration_.passphrase);
- }
+ int setupCBRSocket() {
+ configuration_.transport_protocol_ = CBR;
- verifier->setVerificationFailedCallback(
- std::bind(&HIperfClient::Impl::onAuthFailed, this,
- std::placeholders::_1, std::placeholders::_2));
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ public:
+ int setup() {
+ int ret;
+ std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>();
+
+ if (configuration_.rtc_) {
+ ret = setupRTCSocket();
+ } else if (configuration_.window_ < 0) {
+ ret = setupRAAQMSocket();
+ } else {
+ ret = setupCBRSocket();
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
- if (!configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
- } else {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
- }
-
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
- if (configuration_.rtc_) {
- if (configuration_.recovery_strategy_ == 1) { // unreliable
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::RECOVERY_OFF);
- } else if (configuration_.recovery_strategy_ == 2) { // rtx only
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY);
- } else if (configuration_.recovery_strategy_ == 3) { // fec only
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::FEC_ONLY);
- } else if (configuration_.recovery_strategy_ == 4) { // delay based
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::DELAY_BASED);
- } else if (configuration_.recovery_strategy_ == 5) { // low rate flow
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE);
- } else if (configuration_.recovery_strategy_ ==
- 6) { // low rate + bestpath
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH);
- } else if (configuration_.recovery_strategy_ ==
- 7) { // low rate + replication
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION);
- } else if (configuration_.recovery_strategy_ ==
- 8) { // low rate + bestpath or replication
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::
- LOW_RATE_AND_ALL_FWD_STRATEGIES);
- } else {
- // default
- ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::RECOVERY_STRATEGY,
- (uint32_t)RtcTransportRecoveryStrategies::RTX_ONLY);
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ configuration_.manifest_factor_relevant_);
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- }
- if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- RtcTransportOptions::AGGREGATED_DATA,
- configuration_.aggregated_data_);
+ GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ configuration_.manifest_factor_alert_);
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- }
- if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- (ConsumerContentObjectCallback)std::bind(
- &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
- std::placeholders::_2));
+ GeneralTransportOptions::PACKET_FORMAT,
+ configuration_.packet_format_);
if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
return ERROR_SETUP;
}
- }
-
- if (configuration_.rtc_) {
- std::shared_ptr<TransportStatistics> transport_stats;
- consumer_socket_->getSocketOption(
- OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
- transport_stats->setAlpha(0.0);
- }
-
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::STATS_SUMMARY,
- (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
- std::placeholders::_1,
- std::placeholders::_2));
-
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::STATS_INTERVAL,
- configuration_.report_interval_milliseconds_) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
- consumer_socket_->connect();
- return ERROR_SUCCESS;
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] notification::Strategy strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int run() {
- std::cout << "Starting download of " << configuration_.name << std::endl;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] notification::Strategy strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- signals_.add(SIGINT);
- signals_.async_wait(
- [this](const std::error_code &, const int &) { io_service_.stop(); });
+ ret = consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
+ configuration_.window_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream()
+ << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- t_download_ = t_stats_ = utils::SteadyTime::now();
- consumer_socket_->consume(configuration_.name);
+ if (!configuration_.producer_certificate_.empty()) {
+ verifier = std::make_shared<AsymmetricVerifier>(
+ configuration_.producer_certificate_);
+ }
- io_service_.run();
- consumer_socket_->stop();
+ if (!configuration_.passphrase_.empty()) {
+ verifier =
+ std::make_shared<SymmetricVerifier>(configuration_.passphrase_);
+ }
- return ERROR_SUCCESS;
- }
+ verifier->setVerificationFailedCallback(
+ std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this,
+ std::placeholders::_1, std::placeholders::_2));
- private:
- class RTCCallback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t mtu = HIPERF_MTU;
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- public:
- RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
- Packet::Format format =
- PayloadSize::getFormatFromName(client_.configuration_.name, false);
- payload_size_max_ =
- PayloadSize(format).getPayloadSizeMax(RTC_HEADER_SIZE);
- }
+ // Signer for aggregatd interests
+ std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ signer = std::make_shared<SymmetricSigner>(
+ CryptoSuite::HMAC_SHA256, configuration_.aggr_interest_passphrase_);
+ }
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- bool isBufferMovable() noexcept override { return false; }
+ if (configuration_.aggregated_interests_) {
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_INTERESTS, true);
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = mtu;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+ }
- void readDataAvailable(std::size_t length) noexcept override {
- client_.received_bytes_ += length;
- client_.received_data_pkt_++;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &ConsumerContext::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
- // collecting delay stats. Just for performance testing
- uint64_t *senderTimeStamp =
- (uint64_t *)client_.configuration_.receive_buffer->writableData();
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- auto now = utils::SystemTime::nowMs().count();
- double new_delay = (double)(now - *senderTimeStamp);
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, this);
- if (*senderTimeStamp > now)
- new_delay = -1 * (double)(*senderTimeStamp - now);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- client_.delay_sample_++;
- client_.avg_data_delay_ =
- client_.avg_data_delay_ +
- (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (ConsumerTimerCallback)std::bind(
+ &Impl::ConsumerContext::handleTimerExpiration, this,
+ std::placeholders::_1, std::placeholders::_2));
- if (client_.configuration_.test_mode_) {
- client_.data_delays_ += std::to_string(int(new_delay));
- client_.data_delays_ += ",";
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- if (client_.configuration_.relay_) {
- client_.producer_socket_->produceDatagram(
- client_.configuration_.relay_name_.getName(),
- client_.configuration_.receive_buffer->writableData(),
- length < payload_size_max_ ? length : payload_size_max_);
- }
- if (client_.configuration_.output_stream_mode_) {
- uint8_t *start =
- (uint8_t *)client_.configuration_.receive_buffer->writableData();
- start += sizeof(uint64_t);
- std::size_t pkt_len = length - sizeof(uint64_t);
- client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- }
- size_t maxBufferSize() const override { return mtu; }
+ consumer_socket_->connect();
- void readError(const std::error_code &ec) noexcept override {
- std::cerr << "Error while reading from RTC socket" << std::endl;
- client_.io_service_.stop();
+ return ERROR_SUCCESS;
}
- void readSuccess(std::size_t total_size) noexcept override {
- std::cout << "Data successfully read" << std::endl;
+ /***************************************************************
+ * Run functions
+ ***************************************************************/
+
+ int run() {
+ getOutputStream() << "Starting download of " << flow_name_ << std::endl;
+
+ saved_stats_.t_download_ = saved_stats_.t_stats_ =
+ utils::SteadyTime::now();
+ consumer_socket_->consume(flow_name_);
+
+ return ERROR_SUCCESS;
}
- private:
- Impl &client_;
+ // Members initialized by the constructor
+ std::shared_ptr<utils::MemBuf> receive_buffer_;
+ asio::ip::udp::socket socket_;
std::size_t payload_size_max_;
+ asio::ip::udp::endpoint remote_;
+ std::uint32_t nb_iterations_;
+
+ // Members initialized by in-class initializer
+ SavedStatistics saved_stats_{};
+ uint16_t header_counter_{0};
+ bool first_{true};
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
};
- class Callback : public ConsumerSocket::ReadCallback {
- public:
- Callback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer =
- utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
- }
+ public:
+ explicit Impl(const hiperf::ClientConfiguration &conf)
+ : config_(conf), signals_(io_service_) {}
- bool isBufferMovable() noexcept override { return false; }
+ virtual ~Impl() = default;
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = client_.configuration_.receive_buffer_size_;
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
}
- void readDataAvailable(std::size_t length) noexcept override {}
-
- void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+ consumer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = consumer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
- size_t maxBufferSize() const override {
- return client_.configuration_.receive_buffer_size_;
- }
-
- void readError(const std::error_code &ec) noexcept override {
- std::cerr << "Error " << ec.message() << " while reading from socket"
- << std::endl;
- client_.io_service_.stop();
+ if (ret) {
+ break;
+ }
}
- void readSuccess(std::size_t total_size) noexcept override {
- auto t2 = utils::SteadyTime::now();
- auto dt = utils::SteadyTime::getDurationUs(client_.t_download_, t2);
- long usec = (long)dt.count();
-
- std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
- << std::endl;
+ return ret;
+ }
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
+ int run() {
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
- client_.io_service_.stop();
+ for (auto &consumer_context : consumer_contexts_) {
+ consumer_context.run();
}
- private:
- Impl &client_;
- };
+ io_service_.run();
- hiperf::ClientConfiguration configuration_;
- utils::SteadyTime::TimePoint t_stats_;
- utils::SteadyTime::TimePoint t_download_;
- uint32_t total_duration_milliseconds_;
- uint64_t old_bytes_value_;
- uint64_t old_interest_tx_value_;
- uint64_t old_fec_interest_tx_value_;
- uint64_t old_fec_data_rx_value_;
- uint64_t old_lost_data_value_;
- uint64_t old_bytes_recovered_value_;
- uint64_t old_definitely_lost_data_value_;
- uint32_t old_retx_value_;
- uint32_t old_sent_int_value_;
- uint32_t old_received_nacks_value_;
- uint32_t old_fec_pkt_;
-
- // IMPORTANT: to be used only for performance testing, when consumer and
- // producer are synchronized. Used for rtc only at the moment
- double avg_data_delay_;
- uint32_t delay_sample_;
-
- uint32_t received_bytes_;
- uint32_t received_data_pkt_;
- uint32_t auth_alerts_;
-
- std::string data_delays_;
+ return ERROR_SUCCESS;
+ }
+ ClientConfiguration &getConfig() { return config_; }
+
+ private:
asio::io_service io_service_;
+ hiperf::ClientConfiguration config_;
asio::signal_set signals_;
- RTCCallback rtc_callback_;
- Callback callback_;
- std::unique_ptr<ConsumerSocket> consumer_socket_;
- std::unique_ptr<ProducerSocket> producer_socket_;
- asio::ip::udp::socket socket_;
- asio::ip::udp::endpoint remote_;
-
- ForwarderConfiguration config_;
- // uint16_t switch_threshold_; /* ms */
- bool fwd_connected_;
- bool use_bestpath_;
- uint32_t rtt_threshold_; /* ms */
- uint32_t loss_threshold_; /* ms */
- std::string prefix_name_; // bestpath route
- uint32_t prefix_len_;
- // bool done_;
-
- std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
- std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
- uint16_t header_counter_mask_;
- uint16_t header_counter_;
-
- bool print_headers_;
- bool first_;
-
- ForwarderInterface forwarder_interface_;
+ std::vector<ConsumerContext> consumer_contexts_;
};
-HIperfClient::HIperfClient(const ClientConfiguration &conf) {
- impl_ = new Impl(conf);
-}
-
-HIperfClient::HIperfClient(HIperfClient &&other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
-}
-
-HIperfClient &HIperfClient::operator=(HIperfClient &&other) {
- if (this != &other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
- }
-
- return *this;
-}
+HIperfClient::HIperfClient(const ClientConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfClient::~HIperfClient() { delete impl_; }
+HIperfClient::~HIperfClient() = default;
-int HIperfClient::setup() { return impl_->setup(); }
+int HIperfClient::setup() const { return impl_->setup(); }
-void HIperfClient::run() { impl_->run(); }
+void HIperfClient::run() const { impl_->run(); }
} // namespace hiperf
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
index bc80c874c..c4c6bc2ae 100644
--- a/apps/hiperf/src/client.h
+++ b/apps/hiperf/src/client.h
@@ -20,19 +20,17 @@
namespace hiperf {
-class HIperfClient : ::utils::NonCopyable {
+class HIperfClient : private ::utils::NonCopyable {
public:
- HIperfClient(const ClientConfiguration &conf);
- HIperfClient(HIperfClient &&other);
- HIperfClient &operator=(HIperfClient &&other);
+ explicit HIperfClient(const ClientConfiguration &conf);
~HIperfClient();
- int setup();
- void run();
+ int setup() const;
+ void run() const;
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
index 3a90f3732..5143afe31 100644
--- a/apps/hiperf/src/common.h
+++ b/apps/hiperf/src/common.h
@@ -15,17 +15,15 @@
#pragma once
-#include <forwarder_interface.h>
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/interfaces/global_conf_interface.h>
-#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/color.h>
#include <hicn/transport/utils/literals.h>
#ifndef _WIN32
@@ -36,6 +34,7 @@
#include <cmath>
#include <fstream>
#include <iomanip>
+#include <iostream>
#include <sstream>
#include <string>
#include <unordered_set>
@@ -53,27 +52,112 @@ using namespace transport::interface;
using namespace transport::auth;
using namespace transport::core;
-static inline uint64_t _ntohll(const uint64_t *input) {
- uint64_t return_val;
- uint8_t *tmp = (uint8_t *)&return_val;
+namespace hiperf {
- tmp[0] = *input >> 56;
- tmp[1] = *input >> 48;
- tmp[2] = *input >> 40;
- tmp[3] = *input >> 32;
- tmp[4] = *input >> 24;
- tmp[5] = *input >> 16;
- tmp[6] = *input >> 8;
- tmp[7] = *input >> 0;
+/**
+ * Logger
+ */
+static std::ostream &Logger() { return std::cout; }
- return return_val;
-}
+template <typename D, typename ConfType, typename ParentType>
+class Base : protected std::stringbuf, protected std::ostream {
+ protected:
+ static inline const char separator[] = "| ";
-static inline uint64_t _htonll(const uint64_t *input) {
- return (_ntohll(input));
-}
+ Base(ParentType &parent, asio::io_service &io_service, int identifier)
+ : std::stringbuf(),
+ std::ostream(this),
+ parent_(parent),
+ configuration_(parent_.getConfig()),
+ io_service_(io_service),
+ identifier_(identifier),
+ name_id_(D::getContextType() + std::to_string(identifier_)),
+ flow_name_(configuration_.name_.makeNameWithIndex(identifier_)) {
+ std::stringstream begin;
+ std::stringstream end;
+ if (configuration_.colored_) {
+ begin << color_mod_ << bold_mod_;
+ end << end_mod_;
+ } else {
+ begin << "";
+ end << "";
+ }
-namespace hiperf {
+ begin << "|" << name_id_ << separator;
+ begin_ = begin.str();
+ end_ = end.str();
+ }
+
+ Base(Base &&other)
+ : parent_(other.parent_),
+ configuration_(other.configuration_),
+ io_service_(other.io_service_),
+ identifier_(other.identifier_),
+ name_id_(std::move(other.name_id_)),
+ flow_name_(other.flow_name_) {}
+
+ /***************************************************************
+ * std::stringbuf sync override
+ ***************************************************************/
+
+ int sync() override {
+ auto string = str();
+ asio::post(io_service_,
+ [this, string]() { Logger() << begin_ << string << end_; });
+ str("");
+
+ return 0;
+ }
+
+ std::ostream &getOutputStream() { return *this; }
+
+ // Members initialized by the constructor
+ ParentType &parent_;
+ ConfType &configuration_;
+ asio::io_service &io_service_;
+ int identifier_;
+ std::string name_id_;
+ transport::core::Name flow_name_;
+ std::string begin_;
+ std::string end_;
+
+ // Members initialized by the in-class initializer
+ utils::ColorModifier color_mod_;
+ utils::ColorModifier bold_mod_{utils::ColorModifier::Code::BOLD};
+ utils::ColorModifier end_mod_{utils::ColorModifier::Code::RESET};
+};
+
+static inline int ensureFlows(const Prefix &prefix, std::size_t flows) {
+ int ret = ERROR_SUCCESS;
+
+ // Make sure the provided prefix length not allows to accomodate the
+ // provided number of flows.
+ uint16_t max_ip_addr_len_bits;
+ uint16_t log2_n_flow;
+ u64 max_n_flow;
+ if (prefix.getAddressFamily() == AF_INET) {
+ max_ip_addr_len_bits = IPV4_ADDR_LEN_BITS;
+ } else if (prefix.getAddressFamily() == AF_INET6) {
+ max_ip_addr_len_bits = IPV6_ADDR_LEN_BITS;
+ } else {
+ Logger() << "Error: unknown address family." << std::endl;
+ ret = ERROR_SETUP;
+ goto end;
+ }
+
+ log2_n_flow = max_ip_addr_len_bits - prefix.getPrefixLength();
+ max_n_flow = log2_n_flow < 64 ? (1 << log2_n_flow) : ~0ULL;
+
+ if (flows > max_n_flow) {
+ Logger() << "Error: the provided prefix length does not allow to "
+ "accomodate the provided number of flows ("
+ << flows << " > " << max_n_flow << ")." << std::endl;
+ ret = ERROR_SETUP;
+ }
+
+end:
+ return ret;
+}
/**
* Class to retrieve the maximum payload size given the MTU and packet headers.
@@ -90,8 +174,9 @@ class PayloadSize {
transport_size - fec_size;
}
- static Packet::Format getFormatFromName(Name name, bool ah = false) {
- switch (name.getAddressFamily()) {
+ static Packet::Format getFormatFromPrefix(const Prefix &prefix,
+ bool ah = false) {
+ switch (prefix.getAddressFamily()) {
case AF_INET:
return ah ? HF_INET_TCP_AH : HF_INET_TCP;
case AF_INET6:
@@ -158,124 +243,69 @@ struct packet_t {
uint32_t size;
};
+struct Configuration {
+ Prefix name_{"b001::abcd/64"};
+ std::string passphrase_;
+ std::string aggr_interest_passphrase_;
+ bool rtc_{false};
+ uint16_t port_{0};
+ bool aggregated_data_{false};
+ Packet::Format packet_format_{default_values::packet_format};
+ uint32_t parallel_flows_{1};
+ bool colored_{true};
+};
+
/**
* Container for command line configuration for hiperf client.
*/
-struct ClientConfiguration {
- ClientConfiguration()
- : name("b001::abcd", 0),
- beta(-1.f),
- drop_factor(-1.f),
- window(-1),
- producer_certificate(""),
- passphrase(""),
- receive_buffer(nullptr),
- receive_buffer_size_(128 * 1024),
- download_size(0),
- report_interval_milliseconds_(1000),
- transport_protocol_(CBR),
- rtc_(false),
- test_mode_(false),
- relay_(false),
- secure_(false),
- producer_prefix_(),
- interest_lifetime_(500),
- unverified_interval_(10000),
- unverified_ratio_(0.2),
- relay_name_("c001::abcd/64"),
- output_stream_mode_(false),
- port_(0),
- recovery_strategy_(4),
- aggregated_data_(false),
- packet_format_(default_values::packet_format),
- print_headers_(true),
- nb_iterations_(std::numeric_limits<decltype(nb_iterations_)>::max()) {}
-
- Name name;
- double beta;
- double drop_factor;
- double window;
- std::string producer_certificate;
- std::string passphrase;
- std::shared_ptr<utils::MemBuf> receive_buffer;
- std::size_t receive_buffer_size_;
- std::size_t download_size;
- std::uint32_t report_interval_milliseconds_;
- TransportProtocolAlgorithms transport_protocol_;
- bool rtc_;
- bool test_mode_;
- bool relay_;
- bool secure_;
+struct ClientConfiguration : public Configuration {
+ double beta_{-1.f};
+ double drop_factor_{-1.f};
+ double window_{-1.f};
+ std::string producer_certificate_;
+ std::string passphrase_;
+ std::size_t receive_buffer_size_{128 * 1024};
+ std::uint32_t report_interval_milliseconds_{1000};
+ TransportProtocolAlgorithms transport_protocol_{CBR};
+ bool test_mode_{false};
+ bool relay_{false};
Prefix producer_prefix_;
- uint32_t interest_lifetime_;
- uint32_t unverified_interval_;
- double unverified_ratio_;
- Prefix relay_name_;
- bool output_stream_mode_;
- uint16_t port_;
- uint32_t recovery_strategy_;
- bool aggregated_data_;
- Packet::Format packet_format_;
- bool print_headers_;
- std::uint32_t nb_iterations_;
- forwarder_type_t forwarder_type_;
+ uint32_t interest_lifetime_{500};
+ uint32_t manifest_factor_relevant_{100};
+ uint32_t manifest_factor_alert_{20};
+ Prefix relay_name_{"c001::abcd/64"};
+ bool output_stream_mode_{false};
+ uint32_t recovery_strategy_{4};
+ bool print_headers_{true};
+ std::uint32_t nb_iterations_{
+ std::numeric_limits<decltype(nb_iterations_)>::max()};
+ bool content_sharing_mode_{false};
+ bool aggregated_interests_{false};
};
/**
* Container for command line configuration for hiperf server.
*/
-struct ServerConfiguration {
- ServerConfiguration()
- : name("b001::abcd/64"),
- virtual_producer(true),
- manifest(0),
- live_production(false),
- content_lifetime(600000000_U32),
- download_size(20 * 1024 * 1024),
- hash_algorithm_(CryptoHashType::SHA256),
- keystore_name(""),
- passphrase(""),
- keystore_password("cisco"),
- multiphase_produce_(false),
- rtc_(false),
- interactive_(false),
- trace_based_(false),
- trace_index_(0),
- trace_file_(nullptr),
- production_rate_(std::string("2048kbps")),
- payload_size_(1384),
- secure_(false),
- input_stream_mode_(false),
- port_(0),
- aggregated_data_(false),
- fec_type_(""),
- packet_format_(default_values::packet_format) {}
-
- Prefix name;
- bool virtual_producer;
- std::uint32_t manifest;
- bool live_production;
- std::uint32_t content_lifetime;
- std::uint32_t download_size;
- CryptoHashType hash_algorithm_;
- std::string keystore_name;
- std::string passphrase;
- std::string keystore_password;
- bool multiphase_produce_;
- bool rtc_;
- bool interactive_;
- bool trace_based_;
- std::uint32_t trace_index_;
- char *trace_file_;
- Rate production_rate_;
- std::size_t payload_size_;
- bool secure_;
- bool input_stream_mode_;
- uint16_t port_;
+struct ServerConfiguration : public Configuration {
+ bool virtual_producer_{true};
+ std::uint32_t manifest_max_capacity_{0};
+ bool live_production_{false};
+ std::uint32_t content_lifetime_{
+ transport::interface::default_values::content_object_expiry_time};
+ std::uint32_t download_size_{20 * 1024 * 1024};
+ CryptoHashType hash_algorithm_{CryptoHashType::SHA256};
+ std::string keystore_name_;
+ std::string keystore_password_{"cisco"};
+ bool multiphase_produce_{false};
+ bool interactive_{false};
+ bool trace_based_{false};
+ std::uint32_t trace_index_{0};
+ char *trace_file_{nullptr};
+ Rate production_rate_{"2048kbps"};
+ std::size_t payload_size_{1384};
+ bool input_stream_mode_{false};
std::vector<struct packet_t> trace_;
- bool aggregated_data_;
std::string fec_type_;
- Packet::Format packet_format_;
};
} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
deleted file mode 100644
index aaac14839..000000000
--- a/apps/hiperf/src/forwarder_config.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-namespace hiperf {
-
-struct ListenerConfig {
- std::string address;
- std::uint16_t port;
- std::string interface;
- std::string name;
-};
-
-struct ConnectorConfig {
- std::string local_address;
- std::uint16_t local_port;
- std::string remote_address;
- std::uint16_t remote_port;
- std::string interface;
- std::string name;
-};
-
-struct RouteConfig {
- std::string prefix;
- uint16_t weight;
- std::string main_connector;
- std::string backup_connector;
- std::string name;
-};
-
-class ForwarderConfiguration {
- public:
- ForwarderConfiguration() : n_threads_(1) {}
-
- bool empty() {
- return listeners_.empty() && connectors_.empty() && routes_.empty();
- }
-
- ForwarderConfiguration &setThreadNumber(std::size_t threads) {
- n_threads_ = threads;
- return *this;
- }
-
- std::size_t getThreadNumber() { return n_threads_; }
-
- template <typename... Args>
- ForwarderConfiguration &addListener(Args &&...args) {
- listeners_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addConnector(const std::string &name,
- Args &&...args) {
- connectors_.emplace(name, std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addRoute(Args &&...args) {
- routes_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- std::vector<ListenerConfig> &getListeners() { return listeners_; }
-
- std::unordered_map<std::string, ConnectorConfig> &getConnectors() {
- return connectors_;
- }
-
- std::vector<RouteConfig> &getRoutes() { return routes_; }
-
- private:
- std::vector<ListenerConfig> listeners_;
- std::unordered_map<std::string, ConnectorConfig> connectors_;
- std::vector<RouteConfig> routes_;
- std::size_t n_threads_;
-};
-
-} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
deleted file mode 100644
index e87a5953d..000000000
--- a/apps/hiperf/src/forwarder_interface.cc
+++ /dev/null
@@ -1,696 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arpa/inet.h>
-#include <forwarder_interface.h>
-#include <hicn/transport/utils/log.h>
-
-#include <chrono>
-#include <iostream>
-#include <thread>
-#include <unordered_set>
-
-extern "C" {
-#include <hicn/error.h>
-#include <hicn/util/ip_address.h>
-#include <hicn/util/sstrncpy.h>
-}
-
-// XXX the main listener should be retrieve in this class at initialization, aka
-// when hICN becomes avialable
-//
-// XXX the main listener port will be retrieved in the forwarder
-// interface... everything else will be delayed until we have this
-// information
-
-namespace hiperf {
-
-ForwarderInterface::ForwarderInterface(asio::io_service &io_service)
- : external_ioservice_(io_service), timer_(io_service) {}
-
-ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
- ICallback *callback,
- forwarder_type_t fwd_type)
- : external_ioservice_(io_service), timer_(io_service) {
- initForwarderInterface(callback, fwd_type);
-}
-
-ForwarderInterface::~ForwarderInterface() {
- if (thread_ && thread_->joinable()) {
- internal_ioservice_.dispatch([this]() {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- work_.reset();
- });
-
- thread_->join();
- }
-}
-
-void ForwarderInterface::initForwarderInterface(ICallback *callback,
- forwarder_type_t fwd_type) {
- forwarder_interface_callback_ = callback;
- work_ = std::make_unique<asio::io_service::work>(internal_ioservice_);
- sock_ = nullptr;
- thread_ = std::make_unique<std::thread>([this]() {
- std::cout << "Starting Forwarder Interface thread" << std::endl;
- internal_ioservice_.run();
- std::cout << "Stopping Forwarder Interface thread" << std::endl;
- });
- check_routes_timer_ = nullptr;
- pending_add_route_counter_ = 0;
- hicn_listen_port_ = 9695;
- /* We start in disabled state even when a forwarder is always available */
- state_ = State::Disabled;
- fwd_type_ = fwd_type;
- num_reattempts = 0;
- std::cout << "Forwarder interface created... connecting to forwarder...\n";
- internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
-}
-
-void ForwarderInterface::onHicnServiceAvailable(bool flag) {
- if (flag) {
- switch (state_) {
- case State::Disabled:
- case State::Requested:
- state_ = State::Available;
- case State::Available:
- connectToForwarder();
- /* Synchronous */
- if (state_ != State::Connected) {
- std::cout << "ConnectToForwarder failed" << std::endl;
- goto REATTEMPT;
- }
- state_ = State::Ready;
-
- std::cout << "Connected to forwarder... cancelling reconnection timer"
- << std::endl;
-
- timer_.cancel();
- num_reattempts = 0;
-
- std::cout << "Forwarder interface is ready... communicate to controller"
- << std::endl;
-
- forwarder_interface_callback_->onHicnServiceReady();
- case State::Connected:
- case State::Ready:
- break;
- }
- } else {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
- state_ = State::Disabled; // XXX to be checked upon callback to prevent the
- // state from going forward (used to manage
- // concurrency)
- }
- return;
-
-REATTEMPT:
- /* Schedule reattempt */
- std::cout << "Failed to connect, scheduling reattempt" << std::endl;
- num_reattempts++;
-
- timer_.expires_from_now(
- std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
- // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
- // this, flag, std::placeholders::_1));
- timer_.async_wait([this, flag](const std::error_code &ec) {
- if (ec) return;
- onHicnServiceAvailable(flag);
- });
-}
-
-int ForwarderInterface::connectToForwarder() {
- sock_ = hc_sock_create_forwarder(fwd_type_);
- if (!sock_) {
- std::cout << "Could not create socket" << std::endl;
- goto ERR_SOCK;
- }
-
- if (hc_sock_connect(sock_) < 0) {
- std::cout << "Could not connect to forwarder" << std::endl;
- goto ERR;
- }
-
- std::cout << "Forwarder interface connected" << std::endl;
- state_ = State::Connected;
- return 0;
-
-ERR:
- hc_sock_free(sock_);
- sock_ = nullptr;
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::checkListener() {
- if (!sock_) return -1;
-
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) return -1;
-
- int ret = -1;
- foreach_listener(l, data) {
- std::string interface = std::string(l->interface_name);
- if (interface.compare("lo") != 0) {
- hicn_listen_port_ = l->local_port;
- state_ = State::Ready;
- ret = 0;
- std::cout << "Got listener port" << std::endl;
- break;
- }
- }
-
- hc_data_free(data);
- return ret;
-}
-
-void ForwarderInterface::close() {
- std::cout << "ForwarderInterface::close" << std::endl;
-
- state_ = State::Disabled;
- /* Cancelling eventual reattempts */
- timer_.cancel();
-
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- internal_ioservice_.post([this]() { work_.reset(); });
-
- if (thread_->joinable()) {
- thread_->join();
- }
-}
-
-#if 0
-void ForwarderInterface::enableCheckRoutesTimer() {
- if (check_routes_timer_ != nullptr) return;
-
- check_routes_timer_ =
- std::make_unique<asio::steady_timer>(internal_ioservice_);
- checkRoutesLoop();
-}
-
-void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) {
- internalRemoveConnectedUser(protocol);
-}
-
-void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) {
- internal_ioservice_.post(
- [this, protocol]() { internalRemoveConnectedUser(protocol); });
-}
-#endif
-
-void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- createFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::createFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- pending_add_route_counter_++;
- auto timer = new asio::steady_timer(internal_ioservice_);
- internal_ioservice_.post([this, routes_info, timer]() {
- internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT,
- timer);
- });
-}
-
-void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- deleteFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::deleteFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- internal_ioservice_.post([this, routes_info]() {
- for (auto &route : routes_info) {
- internalDeleteFaceAndRoute(route);
- }
- });
-}
-
-void ForwarderInterface::setStrategy(std::string prefix, uint32_t prefix_len,
- std::string strategy) {
- if (!sock_) return;
-
- ip_address_t ip_prefix;
- if (ip_address_pton(prefix.c_str(), &ip_prefix) < 0) {
- return;
- }
-
- strategy_type_t strategy_type = strategy_type_from_str(strategy.c_str());
- if (strategy_type == STRATEGY_TYPE_UNDEFINED) return;
-
- hc_strategy_t strategy_conf;
- strategy_conf.address = ip_prefix;
- strategy_conf.len = prefix_len;
- strategy_conf.family = AF_INET6;
- strategy_conf.type = strategy_type;
-
- hc_strategy_set(sock_, &strategy_conf);
-}
-
-void ForwarderInterface::internalDeleteFaceAndRoute(
- const RouteInfoPtr &route_info) {
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) return;
-
- std::vector<hc_route_t *> routes_to_remove;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
-
- std::string route_addr(remote_addr);
- if (route_addr.compare(route_info->route_addr) == 0 &&
- r->len == route_info->route_len) {
- // route found
- routes_to_remove.push_back(r);
- }
- }
-
- if (routes_to_remove.size() == 0) {
- // nothing to do here
- hc_data_free(data);
- return;
- }
-
- std::unordered_set<uint32_t> connids_to_remove;
- for (unsigned i = 0; i < routes_to_remove.size(); i++) {
- connids_to_remove.insert(routes_to_remove[i]->face_id);
- if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
- std::cout << "Error removing route from forwarder." << std::endl;
- }
- }
-
- // remove connection
- if (hc_connection_list(sock_, &data) < 0) {
- hc_data_free(data);
- return;
- }
-
- // collects pointerst to the connections using the conn IDs
- std::vector<hc_connection_t *> conns_to_remove;
- foreach_connection(c, data) {
- if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
- // conn found
- conns_to_remove.push_back(c);
- }
- }
-
- if (conns_to_remove.size() == 0) {
- // nothing else to do here
- hc_data_free(data);
- return;
- }
-
- for (unsigned i = 0; i < conns_to_remove.size(); i++) {
- if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
- std::cout << "Error removing connection from forwarder." << std::endl;
- }
- }
-
- hc_data_free(data);
-}
-
-void ForwarderInterface::internalCreateFaceAndRoutes(
- const std::vector<RouteInfoPtr> &route_info, uint8_t max_try,
- asio::steady_timer *timer) {
- uint32_t face_id;
-
- std::vector<RouteInfoPtr> failed;
- for (auto &route : route_info) {
- int ret = tryToCreateFace(route.get(), &face_id);
- if (ret >= 0) {
- auto ret = tryToCreateRoute(route.get(), face_id);
- if (ret < 0) {
- failed.push_back(route);
- std::cerr << "Error creating route and face" << std::endl;
- continue;
- }
- }
- }
-
- if (failed.size() > 0) {
- if (max_try == 0) {
- /* All attempts failed */
- goto RESULT;
- }
- max_try--;
- timer->expires_from_now(std::chrono::milliseconds(500));
- timer->async_wait(
- [this, failed, max_try, timer](const std::error_code &ec) {
- if (ec) return;
- internalCreateFaceAndRoutes(failed, max_try, timer);
- });
- return;
- }
-
-#if 0
- // route_status_[protocol] = std::move(route_info);
- for (size_t i = 0; i < route_info.size(); i++) {
- route_status_.insert(
- std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i])));
- }
-#endif
-
-RESULT:
- std::cout << "Face / Route create ok, now calling back protocol" << std::endl;
- pending_add_route_counter_--;
- external_ioservice_.post([this, r = std::move(route_info)]() mutable {
- forwarder_interface_callback_->onRouteConfigured(r);
- });
- delete timer;
-}
-
-int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
- uint32_t *face_id) {
- bool found = false;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateFace] socket error"
- << std::endl;
- goto ERR_SOCK;
- }
-
- // get listeners list
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners";
- goto ERR_LIST;
- }
-
- char _local_address[128];
- foreach_listener(l, data) {
- std::cout << "Processing " << l->interface_name << std::endl;
- std::string interface = std::string(l->interface_name);
- int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET);
- if (ret < 0) {
- std::cerr << "Error in ip_address_ntop" << std::endl;
- goto ERR;
- }
-
- std::string local_address = std::string(_local_address);
- uint16_t local_port = l->local_port;
-
- if (interface.compare(route_info->interface) == 0 &&
- local_address.compare(route_info->local_addr) == 0 &&
- local_port == route_info->local_port) {
- found = true;
- break;
- }
- }
-
- std::cout << route_info->remote_addr << std::endl;
-
- ip_address_t local_address, remote_address;
- ip_address_pton(route_info->local_addr.c_str(), &local_address);
- ip_address_pton(route_info->remote_addr.c_str(), &remote_address);
-
- if (!found) {
- // Create listener
- hc_listener_t listener;
- memset(&listener, 0, sizeof(hc_listener_t));
-
- std::string name = "l_" + route_info->name;
- listener.local_addr = local_address;
- listener.type = FACE_TYPE_UDP;
- listener.family = AF_INET;
- listener.local_port = route_info->local_port;
- int ret = strcpy_s(listener.name, SYMBOLIC_NAME_LEN - 1, name.c_str());
- if (ret < EOK) goto ERR;
- ret = strcpy_s(listener.interface_name, INTERFACE_LEN - 1,
- route_info->interface.c_str());
- if (ret < EOK) goto ERR;
-
- std::cout << "------------> " << route_info->interface << std::endl;
-
- ret = hc_listener_create(sock_, &listener);
-
- if (ret < 0) {
- std::cerr << "Error creating listener." << std::endl;
- return -1;
- } else {
- std::cout << "Listener " << listener.id << " created." << std::endl;
- }
- }
-
- // Create face
- hc_face_t face;
- memset(&face, 0, sizeof(hc_face_t));
-
- // crate face with the local interest
- face.face.type = FACE_TYPE_UDP;
- face.face.family = route_info->family;
- face.face.local_addr = local_address;
- face.face.remote_addr = remote_address;
- face.face.local_port = route_info->local_port;
- face.face.remote_port = route_info->remote_port;
-
- if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) <
- 0) {
- std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] "
- "netdevice_set_name "
- "("
- << face.face.netdevice.name << ", "
- << route_info->interface << ") error" << std::endl;
- goto ERR;
- }
-
- // create face
- if (hc_face_create(sock_, &face) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] error creating face";
- goto ERR;
- }
-
- std::cout << "Face created successfully" << std::endl;
-
- // assing face to the return value
- *face_id = face.id;
-
- hc_data_free(data);
- return 0;
-
-ERR:
- hc_data_free(data);
-ERR_LIST:
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
- uint32_t face_id) {
- std::cout << "Trying to create route" << std::endl;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] socket error";
- return -1;
- }
-
- ip_address_t route_ip;
- hc_route_t route;
-
- if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error";
- return -1;
- }
-
- route.face_id = face_id;
- route.family = AF_INET6;
- route.remote_addr = route_ip;
- route.len = route_info->route_len;
- route.cost = 1;
-
- if (hc_route_create(sock_, &route) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route";
- return -1;
- }
-
- std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl;
- return 0;
-}
-
-#if 0 // not used
-void ForwarderInterface::checkRoutesLoop() {
- check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
- check_routes_timer_->async_wait([this](const std::error_code &ec) {
- if (ec) return;
- if (pending_add_route_counter_ == 0) checkRoutes();
- });
-}
-
-void ForwarderInterface::checkRoutes() {
- std::cout << "someone called the checkRoutes function" << std::endl;
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) {
- return;
- }
-
- std::unordered_set<std::string> routes_set;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
- std::string route(std::string(remote_addr) + "/" + std::to_string(r->len));
- routes_set.insert(route);
- }
-
- for (auto it = route_status_.begin(); it != route_status_.end(); it++) {
- std::string route(it->second->route_addr + "/" +
- std::to_string(it->second->route_len));
- if (routes_set.find(route) == routes_set.end()) {
- // the route is missing
- createFaceAndRoute(it->second, it->first);
- break;
- }
- }
-
- hc_data_free(data);
-}
-#endif
-
-#if 0
- using ListenerRetrievedCallback =
- std::function<void(std::error_code, uint32_t)>;
-
- ListenerRetrievedCallback listener_retrieved_callback_;
-
-#ifdef __ANDROID__
- hicn_listen_port_(9695),
-#else
- hicn_listen_port_(0),
-#endif
- timer_(forward_engine_.getIoService()),
-
- void initConfigurationProtocol(void)
- {
- // We need the configuration, which is different for every protocol...
- // so we move this step down towards the protocol implementation itself.
- if (!permanent_hicn) {
- doInitConfigurationProtocol();
- } else {
- // XXX This should be moved somewhere else
- getMainListener(
- [this](const std::error_code &ec, uint32_t hicn_listen_port) {
- if (!ec)
- {
- hicn_listen_port_ = hicn_listen_port;
- doInitConfigurationProtocol();
- }
- });
- }
- }
-
- template <typename Callback>
- void getMainListener(Callback &&callback)
- {
- listener_retrieved_callback_ = std::forward<Callback &&>(callback);
- tryToConnectToForwarder();
- }
- private:
- void doGetMainListener(const std::error_code &ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.getMainListenerPort();
- if (ret <= 0)
- {
- // Since without the main listener of the forwarder the proxy cannot
- // work, we can stop the program here until we get the listener port.
- std::cout <<
- "Could not retrieve main listener port from the forwarder. "
- "Retrying.";
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doGetMainListener, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- hicn_listen_port_ = uint16_t(ret);
- listener_retrieved_callback_(
- make_error_code(configuration_error::success), hicn_listen_port_);
- }
- }
- else
- {
- std::cout << "Timer for retrieving main hicn listener canceled." << std::endl;
- }
- }
-
- void tryToConnectToForwarder()
- {
- doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
- }
-
- void doTryToConnectToForwarder(const std::error_code &ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.connect();
- if (ret < 0)
- {
- // We were not able to connect to the local forwarder. Do not give up
- // and retry.
- std::cout << "Could not connect to local forwarder. Retrying." << std::endl;
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- doGetMainListener(std::make_error_code(std::errc(0)));
- }
- }
- else
- {
- std::cout << "Timer for re-trying forwarder connection canceled." << std::endl;
- }
- }
-
-
- template <typename ProtocolImplementation>
- constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL;
-
-#endif
-
-constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
-constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
-
-} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
deleted file mode 100644
index e58989295..000000000
--- a/apps/hiperf/src/forwarder_interface.h
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/noncopyable.h>
-
-extern "C" {
-#ifndef WITH_POLICY
-#define WITH_POLICY
-#endif
-#include <hicn/ctrl/api.h>
-#include <hicn/util/ip_address.h>
-}
-
-#ifndef ASIO_STANDALONE
-#define ASIO_STANDALONE
-#endif
-#include <asio.hpp>
-#include <functional>
-#include <thread>
-#include <unordered_map>
-
-namespace hiperf {
-
-class ForwarderInterface : ::utils::NonCopyable {
- static const uint32_t REATTEMPT_DELAY_MS = 500;
- static const uint32_t MAX_REATTEMPT = 10;
-
- public:
- struct RouteInfo {
- int family;
- std::string local_addr;
- uint16_t local_port;
- std::string remote_addr;
- uint16_t remote_port;
- std::string route_addr;
- uint8_t route_len;
- std::string interface;
- std::string name;
- };
-
- using RouteInfoPtr = std::shared_ptr<RouteInfo>;
-
- class ICallback {
- public:
- virtual void onHicnServiceReady() = 0;
- virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0;
- };
-
- enum class State {
- Disabled, /* Stack is stopped */
- Requested, /* Stack is starting */
- Available, /* Forwarder is running */
- Connected, /* Control socket connected */
- Ready, /* Listener present */
- };
-
- public:
- explicit ForwarderInterface(asio::io_service &io_service);
- explicit ForwarderInterface(asio::io_service &io_service, ICallback *callback,
- forwarder_type_t fwd_type);
-
- ~ForwarderInterface();
-
- void initForwarderInterface(ICallback *callback, forwarder_type_t fwd_type);
-
- State getState();
-
- void setState(State state);
-
- void onHicnServiceAvailable(bool flag);
-
- void enableCheckRoutesTimer();
-
- void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void createFaceAndRoute(const RouteInfoPtr &route_info);
-
- void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void deleteFaceAndRoute(const RouteInfoPtr &route_info);
-
- void setStrategy(std::string prefix, uint32_t prefix_len,
- std::string strategy);
-
- void close();
-
- uint16_t getHicnListenerPort() { return hicn_listen_port_; }
-
- private:
- ForwarderInterface &operator=(const ForwarderInterface &other) = delete;
-
- int connectToForwarder();
-
- int checkListener();
-
- void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info,
- uint8_t max_try, asio::steady_timer *timer);
-
- void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info);
-
- int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id);
- int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id);
-
- void checkRoutesLoop();
-
- void checkRoutes();
-
- asio::io_service &external_ioservice_;
- asio::io_service internal_ioservice_;
- ICallback *forwarder_interface_callback_;
- std::unique_ptr<asio::io_service::work> work_;
- hc_sock_t *sock_;
- std::unique_ptr<std::thread> thread_;
- // SetRouteCallback set_route_callback_;
- // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_;
- std::unique_ptr<asio::steady_timer> check_routes_timer_;
- uint32_t pending_add_route_counter_;
- uint16_t hicn_listen_port_;
-
- State state_;
-
- forwarder_type_t fwd_type_;
-
- /* Reattempt timer */
- asio::steady_timer timer_;
- unsigned num_reattempts;
-};
-
-} // namespace hiperf
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
index 9c0f0a140..85cadd677 100644
--- a/apps/hiperf/src/main.cc
+++ b/apps/hiperf/src/main.cc
@@ -14,7 +14,6 @@
*/
#include <client.h>
-#include <forwarder_interface.h>
#include <server.h>
namespace hiperf {
@@ -44,6 +43,11 @@ void usage() {
std::cerr << "-X\t<param>\t\t\t\t"
<< "Set FEC params. Options are Rely_K#_N# or RS_K#_N#"
<< std::endl;
+ std::cerr
+ << "-J\t<passphrase>\t\t\t"
+ << "Set the passphrase used to sign/verify aggregated interests. "
+ "If set on the client, aggregated interests are enable automatically."
+ << std::endl;
#endif
std::cerr << std::endl;
std::cerr << "SERVER SPECIFIC:" << std::endl;
@@ -51,11 +55,17 @@ void usage() {
"Sends an application data unit in bytes that is published once "
"before exit"
<< std::endl;
+ std::cerr << "-E\t<expiry_time>\t\t\t"
+ "Expiration time for data packets generated by the producer "
+ "socket"
+ << std::endl;
std::cerr << "-s\t<packet_size>\t\t\tData packet payload size." << std::endl;
std::cerr << "-r\t\t\t\t\t"
<< "Produce real content of <content_size> bytes" << std::endl;
- std::cerr << "-m\t<manifest_capacity>\t\t"
- << "Produce transport manifest" << std::endl;
+ std::cerr << "-m\t<manifest_max_capacity>\t\t"
+ << "The maximum number of entries a manifest can contain. Set it "
+ "to 0 to disable manifests. Default is 30, max is 255."
+ << std::endl;
std::cerr << "-l\t\t\t\t\t"
<< "Start producing content upon the reception of the "
"first interest"
@@ -92,11 +102,6 @@ void usage() {
"the packet to generate. To be used with the -R option. -B and -I "
"will be ignored."
<< std::endl;
- std::cerr << "-E\t\t\t\t\t"
- << "Enable encrypted communication. Requires the path to a p12 "
- "file containing the "
- "crypto material used for the TLS handshake"
- << std::endl;
std::cerr << "-G\t<port>\t\t\t\t"
<< "Input stream from localhost at the specified port" << std::endl;
#endif
@@ -110,12 +115,27 @@ void usage() {
<< std::endl;
std::cerr << "-L\t<interest lifetime>\t\t"
<< "Set interest lifetime." << std::endl;
- std::cerr << "-u\t<delay>\t\t\t\t"
- << "Set max lifetime of unverified packets." << std::endl;
+ std::cerr << "-U\t<factor>\t\t\t"
+ << "Update the relevance threshold: if an unverified packet has "
+ "been received before the last U * manifest_max_capacity_ "
+ "packets received (verified or not), it will be flushed out. "
+ "Should be > 1, default is 100."
+ << std::endl;
+ std::cerr << "-u\t<factor>\t\t\t"
+ << "Update the alert threshold: if the "
+ "number of unverified packet is > u * manifest_max_capacity_, "
+ "an alert is raised. Should be set such that U > u >= 1, "
+ "default is 20. If u >= U, no alert will ever be raised."
+ << std::endl;
std::cerr << "-M\t<input_buffer_size>\t\t"
<< "Size of consumer input buffer. If 0, reassembly of packets "
"will be disabled."
<< std::endl;
+ std::cerr << "-N\t\t\t\t\t"
+ << "Enable aggregated interests; the number of suffixes (including "
+ "the one in the header) can be set through the env variable "
+ "`MAX_AGGREGATED_INTERESTS`."
+ << std::endl;
std::cerr << "-W\t<window_size>\t\t\t"
<< "Use a fixed congestion window "
"for retrieving the data."
@@ -137,7 +157,10 @@ void usage() {
"used with the -R (default: false)"
<< std::endl;
std::cerr << "-P\t\t\t\t\t"
- << "Prefix of the producer where to do the handshake" << std::endl;
+ << "Number of parallel streams. For hiperf client, this is the "
+ "number of consumer to create, while for hiperf server this is "
+ "the number of producers to create."
+ << std::endl;
std::cerr << "-j\t<relay_name>\t\t\t"
<< "Publish received content under the name relay_name."
"This is an RTC specific option, to be "
@@ -145,13 +168,25 @@ void usage() {
<< std::endl;
std::cerr << "-g\t<port>\t\t\t\t"
<< "Output stream to localhost at the specified port" << std::endl;
+ std::cerr << "-o\t\t\t\t\t"
+ << "Content sharing mode: if set the socket work in content sharing"
+ << "mode. It works only in RTC mode" << std::endl;
std::cerr << "-e\t<strategy>\t\t\t"
- << "Enhance the network with a reliability strategy. Options 1:"
- << " unreliable, 2: rtx only, 3: fec only, "
- << "4: delay based, 5: low rate, 6: low rate and best path "
- << "7: low rate and replication, 8: low rate and best"
- << " path/replication"
- << "(default: 2 = rtx only) " << std::endl;
+ << "Enhance the network with a reliability strategy. Options"
+ << std::endl;
+ std::cerr << "\t\t\t\t\t\t1: unreliable " << std::endl;
+ std::cerr << "\t\t\t\t\t\t2: rtx only " << std::endl;
+ std::cerr << "\t\t\t\t\t\t3: fec only " << std::endl;
+ std::cerr << "\t\t\t\t\t\t4: delay based " << std::endl;
+ std::cerr << "\t\t\t\t\t\t5: low rate " << std::endl;
+ std::cerr << "\t\t\t\t\t\t6: low rate and best path " << std::endl;
+ std::cerr << "\t\t\t\t\t\t7: low rate and replication" << std::endl;
+ std::cerr << "\t\t\t\t\t\t8: low rate and best path/replication "
+ << std::endl;
+ std::cerr << "\t\t\t\t\t\t9: only fec low residual losses " << std::endl;
+ std::cerr << "\t\t\t\t\t\t10: delay and best path " << std::endl;
+ std::cerr << "\t\t\t\t\t\t11: delay and replication " << std::endl;
+ std::cerr << "\t\t\t\t\t\t(default: 2 = rtx only) " << std::endl;
std::cerr << "-H\t\t\t\t\t"
<< "Disable periodic print headers in stats report." << std::endl;
std::cerr << "-n\t<nb_iterations>\t\t\t"
@@ -170,6 +205,8 @@ int main(int argc, char *argv[]) {
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
+ transport::interface::global_config::GlobalConfigInterface global_conf;
+
// -1 server, 0 undefined, 1 client
int role = 0;
int options = 0;
@@ -188,9 +225,10 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
// Please keep in alphabetical order.
- while ((opt = getopt(argc, argv,
- "A:B:CDE:F:G:HIK:L:M:P:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:"
- "n:p:rs:tu:vwxy:z:")) != -1) {
+ while (
+ (opt = getopt(argc, argv,
+ "A:B:CDE:F:G:HIJ:K:L:M:NP:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:"
+ "n:op:qrs:tu:vwxy:z:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -225,10 +263,14 @@ int main(int argc, char *argv[]) {
#else
// Please keep in alphabetical order.
while ((opt = getopt(argc, argv,
- "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:p:rs:"
+ "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:op:rs:"
"tu:vwxy:z:")) != -1) {
switch (opt) {
#endif
+ case 'E': {
+ server_configuration.content_lifetime_ = std::stoul(optarg);
+ break;
+ }
case 'f': {
log_file = optarg;
break;
@@ -243,14 +285,18 @@ int main(int argc, char *argv[]) {
server_configuration.aggregated_data_ = true;
break;
}
+ case 'o': {
+ client_configuration.content_sharing_mode_ = true;
+ break;
+ }
case 'w': {
client_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
server_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
break;
}
case 'k': {
- server_configuration.passphrase = std::string(optarg);
- client_configuration.passphrase = std::string(optarg);
+ server_configuration.passphrase_ = std::string(optarg);
+ client_configuration.passphrase_ = std::string(optarg);
break;
}
case 'z': {
@@ -271,20 +317,31 @@ int main(int argc, char *argv[]) {
role += 1;
break;
}
-
+ case 'q': {
+ client_configuration.colored_ = server_configuration.colored_ = false;
+ break;
+ }
+ case 'J': {
+ client_configuration.aggr_interest_passphrase_ = optarg;
+ server_configuration.aggr_interest_passphrase_ = optarg;
+ // Consumer signature is only used with aggregated interests,
+ // hence enabling it also forces usage of aggregated interests
+ client_configuration.aggregated_interests_ = true;
+ break;
+ }
// Client specifc
case 'b': {
- client_configuration.beta = std::stod(optarg);
+ client_configuration.beta_ = std::stod(optarg);
options = 1;
break;
}
case 'd': {
- client_configuration.drop_factor = std::stod(optarg);
+ client_configuration.drop_factor_ = std::stod(optarg);
options = 1;
break;
}
case 'W': {
- client_configuration.window = std::stod(optarg);
+ client_configuration.window_ = std::stod(optarg);
options = 1;
break;
}
@@ -293,13 +350,17 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'N': {
+ client_configuration.aggregated_interests_ = true;
+ break;
+ }
case 'P': {
- client_configuration.producer_prefix_ = Prefix(optarg);
- client_configuration.secure_ = true;
+ client_configuration.parallel_flows_ =
+ server_configuration.parallel_flows_ = std::stoull(optarg);
break;
}
case 'c': {
- client_configuration.producer_certificate = std::string(optarg);
+ client_configuration.producer_certificate_ = std::string(optarg);
options = 1;
break;
}
@@ -318,13 +379,13 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
- case 'u': {
- client_configuration.unverified_interval_ = std::stoul(optarg);
+ case 'U': {
+ client_configuration.manifest_factor_relevant_ = std::stoul(optarg);
options = 1;
break;
}
- case 'U': {
- client_configuration.unverified_ratio_ = std::stod(optarg);
+ case 'u': {
+ client_configuration.manifest_factor_alert_ = std::stoul(optarg);
options = 1;
break;
}
@@ -346,7 +407,7 @@ int main(int argc, char *argv[]) {
}
// Server specific
case 'A': {
- server_configuration.download_size = std::stoul(optarg);
+ server_configuration.download_size_ = std::stoul(optarg);
options = -1;
break;
}
@@ -356,22 +417,22 @@ int main(int argc, char *argv[]) {
break;
}
case 'r': {
- server_configuration.virtual_producer = false;
+ server_configuration.virtual_producer_ = false;
options = -1;
break;
}
case 'm': {
- server_configuration.manifest = std::stoul(optarg);
+ server_configuration.manifest_max_capacity_ = std::stoul(optarg);
options = -1;
break;
}
case 'l': {
- server_configuration.live_production = true;
+ server_configuration.live_production_ = true;
options = -1;
break;
}
case 'K': {
- server_configuration.keystore_name = std::string(optarg);
+ server_configuration.keystore_name_ = std::string(optarg);
options = -1;
break;
}
@@ -393,7 +454,7 @@ int main(int argc, char *argv[]) {
break;
}
case 'p': {
- server_configuration.keystore_password = std::string(optarg);
+ server_configuration.keystore_password_ = std::string(optarg);
options = -1;
break;
}
@@ -409,11 +470,6 @@ int main(int argc, char *argv[]) {
options = -1;
break;
}
- case 'E': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.secure_ = true;
- break;
- }
case 'e': {
client_configuration.recovery_strategy_ = std::stoul(optarg);
options = 1;
@@ -457,9 +513,9 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
} else {
if (role > 0) {
- client_configuration.name = Name(argv[optind]);
+ client_configuration.name_ = Prefix(argv[optind]);
} else {
- server_configuration.name = Prefix(argv[optind]);
+ server_configuration.name_ = Prefix(argv[optind]);
}
}
@@ -490,16 +546,9 @@ int main(int argc, char *argv[]) {
config.set();
// Parse config file
- transport::interface::global_config::parseConfigurationFile(conf_file);
+ global_conf.parseConfigurationFile(conf_file);
if (role > 0) {
- // set forwarder type
- client_configuration.forwarder_type_ = UNDEFINED;
- if (config.name.compare("hicnlightng_module") == 0)
- client_configuration.forwarder_type_ = HICNLIGHT;
- else if (config.name.compare("hicnlightng_module") == 0)
- client_configuration.forwarder_type_ = HICNLIGHT_NG;
-
HIperfClient c(client_configuration);
if (c.setup() != ERROR_SETUP) {
c.run();
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
index 7101e7a4a..afaf5423b 100644
--- a/apps/hiperf/src/server.cc
+++ b/apps/hiperf/src/server.cc
@@ -21,156 +21,79 @@ namespace hiperf {
* Hiperf server class: configure and setup an hicn producer following the
* ServerConfiguration.
*/
-class HIperfServer::Impl : public ProducerSocket::Callback {
- const std::size_t log2_content_object_buffer_size = 8;
-
- public:
- Impl(const hiperf::ServerConfiguration &conf)
- : configuration_(conf),
- signals_(io_service_),
- rtc_timer_(io_service_),
- unsatisfied_interests_(),
- content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
- content_objects_index_(0),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- last_segment_(0),
-#ifndef _WIN32
- input_(io_service_),
- rtc_running_(false),
-#endif
- flow_name_(configuration_.name.getName()),
- socket_(io_service_),
- recv_buffer_(nullptr, 0) {
- std::string buffer(configuration_.payload_size_, 'X');
- std::cout << "Producing contents under name " << conf.name.getName()
- << std::endl;
-#ifndef _WIN32
- if (configuration_.interactive_) {
- input_.assign(::dup(STDIN_FILENO));
- }
-#endif
-
- for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = ContentObject::Ptr(new ContentObject(
- configuration_.name.getName(), configuration_.packet_format_, 0,
- (const uint8_t *)buffer.data(), buffer.size()));
- content_objects_[i]->setLifetime(
- default_values::content_object_expiry_time);
- }
+class HIperfServer::Impl {
+ static inline constexpr std::size_t klog2_content_object_buffer_size() {
+ return 8;
}
-
- virtual ~Impl() {}
-
- void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
- content_objects_[content_objects_index_ & mask_]->setName(
- interest.getName());
- producer_socket_->produce(
- *content_objects_[content_objects_index_++ & mask_]);
+ static inline constexpr std::size_t kcontent_object_buffer_size() {
+ return (1 << klog2_content_object_buffer_size());
}
-
- void processInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)VOID_HANDLER);
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
-
- produceContent(p, interest.getName(), interest.getName().getSuffix());
- std::cout << "Received interest " << interest.getName().getSuffix()
- << std::endl;
+ static inline constexpr std::size_t kmask() {
+ return (kcontent_object_buffer_size() - 1);
}
- void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
- std::placeholders::_1,
- std::placeholders::_2));
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
- uint32_t suffix = interest.getName().getSuffix();
-
- if (suffix == 0) {
- last_segment_ = 0;
- unsatisfied_interests_.clear();
- }
-
- // The suffix will either come from the received interest or will be set to
- // the smallest suffix of a previous interest not satisfied
- if (!unsatisfied_interests_.empty()) {
- auto it = std::lower_bound(unsatisfied_interests_.begin(),
- unsatisfied_interests_.end(), last_segment_);
- if (it != unsatisfied_interests_.end()) {
- suffix = *it;
+ /**
+ * @brief As we can (potentially) setup many producer sockets, we need to keep
+ * a separate context for each one of them. The context contains parameters
+ * and variable that are specific to a single producer socket.
+ */
+ class ProducerContext
+ : public Base<ProducerContext, ServerConfiguration, Impl>,
+ public ProducerSocket::Callback {
+ public:
+ using ConfType = ServerConfiguration;
+ using ParentType = typename HIperfServer::Impl;
+ static inline const auto getContextType() { return "ProducerContext"; }
+
+ ProducerContext(HIperfServer::Impl &server, int producer_identifier)
+ : Base(server, server.io_service_, producer_identifier) {
+ // Allocate buffer to copy as content objects payload
+ std::string buffer(configuration_.payload_size_, 'X');
+
+ // Allocate array of content objects. They are share_ptr so that the
+ // transport will only capture a reference to them instead of performing
+ // an hard copy.
+ for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) {
+ const auto &element =
+ content_objects_.emplace_back(std::make_shared<ContentObject>(
+ configuration_.name_.makeName(), configuration_.packet_format_,
+ 0, (const uint8_t *)buffer.data(), buffer.size()));
+ element->setLifetime(default_values::content_object_expiry_time);
}
- unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
- std::cout << "Received interest " << interest.getName().getSuffix()
- << ", starting production at " << suffix << std::endl;
- std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
- << std::endl;
- produceContentAsync(p, interest.getName(), suffix);
- }
+ // To make vector happy (move or copy constructor is needed when vector
+ // resizes)
+ ProducerContext(ProducerContext &&other) noexcept
+ : Base(std::move(other)),
+ content_objects_(std::move(other.content_objects_)),
+ unsatisfied_interests_(std::move(other.unsatisfied_interests_)),
+ last_segment_(other.last_segment_),
+ producer_socket_(std::move(other.producer_socket_)),
+ content_objects_index_(other.content_objects_index_),
+ payload_size_max_(other.payload_size_max_) {}
- void produceContent(ProducerSocket &p, const Name &content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
- uint32_t total;
-
- utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
- total = p.produceStream(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
-
- std::cout << "Written " << total
- << " data packets in output buffer (Segmentation time: "
- << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
- << std::endl;
- }
+ virtual ~ProducerContext() = default;
- void produceContentAsync(ProducerSocket &p, Name content_name,
- uint32_t suffix) {
- produce_thread_.add([this, suffix, content_name]() {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
-
- last_segment_ = suffix + producer_socket_->produceStream(
- content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- });
- }
+ /**
+ * @brief Produce datagram
+ */
+ void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const {
+ assert(producer_socket_);
- void cacheMiss(ProducerSocket &p, const Interest &interest) {
- unsatisfied_interests_.push_back(interest.getName().getSuffix());
- }
+ auto size = std::min(buffer_size, payload_size_max_);
- void onContentProduced(ProducerSocket &p, const std::error_code &err,
- uint64_t bytes_written) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(
- &Impl::asyncProcessInterest, this,
- std::placeholders::_1, std::placeholders::_2));
- }
+ producer_socket_->produceDatagram(flow_name_, buffer, size);
+ }
- void produceError(const std::error_code &err) noexcept override {
- std::cerr << "Error from producer transport: " << err.message()
- << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- }
+ /**
+ * @brief Create and setup the producer socket
+ */
+ int setup() {
+ int ret;
+ int production_protocol;
+ std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
- int setup() {
- int ret;
- int production_protocol;
- std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
-
- if (configuration_.secure_) {
- producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
- configuration_.rtc_, configuration_.keystore_name,
- configuration_.keystore_password);
- } else {
if (!configuration_.rtc_) {
production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
} else {
@@ -178,255 +101,466 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
}
producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- }
-
- if (producer_socket_->setSocketOption(
- ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "Failed to set producer callback." << std::endl;
- return ERROR_SETUP;
- }
-
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::HASH_ALGORITHM,
- configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
- if (producer_socket_->setSocketOption(PACKET_FORMAT,
- configuration_.packet_format_) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the packet format." << std::endl;
- return ERROR_SETUP;
- }
+ if (producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "Failed to set producer callback." << std::endl;
+ return ERROR_SETUP;
+ }
- if (!configuration_.passphrase.empty()) {
- signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256,
- configuration_.passphrase);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::HASH_ALGORITHM,
+ configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (!configuration_.keystore_name.empty()) {
- signer = std::make_shared<AsymmetricSigner>(
- configuration_.keystore_name, configuration_.keystore_password);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_MAX_CAPACITY,
+ configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer);
+ if (producer_socket_->setSocketOption(PACKET_FORMAT,
+ configuration_.packet_format_) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- // Compute maximum payload size
- Packet::Format format = PayloadSize::getFormatFromName(
- configuration_.name.getName(), !configuration_.manifest);
- payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
- configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
- configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
- !configuration_.manifest ? signer->getSignatureFieldSize() : 0);
+ if (!configuration_.passphrase_.empty()) {
+ signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256,
+ configuration_.passphrase_);
+ }
- if (configuration_.payload_size_ > payload_size_max_) {
- std::cerr << "WARNING: Payload has size " << configuration_.payload_size_
- << ", maximum is " << payload_size_max_
- << ". Payload will be truncated to fit." << std::endl;
- }
+ if (!configuration_.keystore_name_.empty()) {
+ signer = std::make_shared<AsymmetricSigner>(
+ configuration_.keystore_name_, configuration_.keystore_password_);
+ }
- if (configuration_.rtc_) {
- ret = producer_socket_->setSocketOption(
- RtcTransportOptions::AGGREGATED_DATA,
- configuration_.aggregated_data_);
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+
+ // Compute maximum payload size
+ Packet::Format format = PayloadSize::getFormatFromPrefix(
+ configuration_.name_, !configuration_.manifest_max_capacity_);
+ payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
+ configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
+ configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
+ !configuration_.manifest_max_capacity_
+ ? signer->getSignatureFieldSize()
+ : 0);
+
+ if (configuration_.payload_size_ > payload_size_max_) {
+ getOutputStream() << "WARNING: Payload has size "
+ << configuration_.payload_size_ << ", maximum is "
+ << payload_size_max_
+ << ". Payload will be truncated to fit." << std::endl;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ // Verifier for aggregated interests
+ std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ verifier = std::make_unique<SymmetricVerifier>(
+ configuration_.aggr_interest_passphrase_);
}
- }
+ ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- if (configuration_.rtc_) {
- ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE,
- configuration_.fec_type_);
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- producer_socket_->registerPrefix(configuration_.name);
- producer_socket_->connect();
- producer_socket_->start();
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_);
- if (configuration_.rtc_) {
- std::cout << "Running RTC producer: the prefix length will be ignored."
- " Use /128 by default in RTC mode"
- << std::endl;
- return ERROR_SUCCESS;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
- if (!configuration_.virtual_producer) {
if (producer_socket_->setSocketOption(
GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ producer_socket_->registerPrefix(Prefix(flow_name_, 128));
+ producer_socket_->connect();
+ producer_socket_->start();
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAX_SEGMENT_SIZE,
- static_cast<uint32_t>(configuration_.payload_size_)) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.rtc_) {
+ return ERROR_SUCCESS;
}
- if (!configuration_.live_production) {
- produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ if (!configuration_.virtual_producer_) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_SEGMENT_SIZE,
+ static_cast<uint32_t>(configuration_.payload_size_)) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production_) {
+ produceContent(*producer_socket_, configuration_.name_.makeName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
} else {
ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)bind(
+ &ProducerContext::virtualProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
}
- } else {
- ret = producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &ProducerContext::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ return ERROR_SUCCESS;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ int run() {
+ getOutputStream() << "started to serve consumers with name " << flow_name_
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ void stop() {
+ getOutputStream() << "stopped to serve consumers" << std::endl;
+ producer_socket_->stop();
+ }
+
+ private:
+ /**
+ * @brief Produce an existing content object. Set the name as the
+ * interest.
+ */
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & kmask()]->setName(
+ interest.getName());
+ p.produce(*content_objects_[content_objects_index_++ & kmask()]);
+ }
+
+ /**
+ * @brief Create and produce a buffer of configuration_.download_size_
+ * length.
+ */
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) const {
+ uint32_t total;
+
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
+
+ Logger() << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
+ << std::endl;
+ }
+
+ /**
+ * @brief Synchronously produce content upon reception of one interest
+ */
+ void processInterest(ProducerSocket &p, const Interest &interest) const {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ Logger() << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ /**
+ * @brief Async create and produce a buffer of
+ * configuration_.download_size_ length.
+ */
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ parent_.produce_thread_.add([this, suffix, content_name, &p]() {
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ last_segment_ =
+ suffix + p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_,
+ suffix);
+ });
+ }
+
+ /**
+ * @brief Asynchronously produce content upon reception of one interest
+ */
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::cacheMiss, this,
+ std::placeholders::_1, std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either come from the received interest or will be set
+ // to the smallest suffix of a previous interest not satisfied
+ if (!unsatisfied_interests_.empty()) {
+ auto it = std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
+
+ getOutputStream() << " Received interest "
+ << interest.getName().getSuffix()
+ << ", starting production at " << suffix << end_mod_
+ << std::endl;
+ getOutputStream() << unsatisfied_interests_.size()
+ << " interests still unsatisfied" << end_mod_
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CONTENT_PRODUCED,
- (ProducerContentCallback)bind(
- &Impl::onContentProduced, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ /**
+ * @brief Register cache miss events
+ */
+ void cacheMiss([[maybe_unused]] const ProducerSocket &p,
+ const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
+ }
- return ERROR_SUCCESS;
+ /**
+ * @brief When content is produced, set cache miss callback so that we can
+ * register any cache miss happening after the production.
+ */
+ void onContentProduced(ProducerSocket &p,
+ [[maybe_unused]] const std::error_code &err,
+ [[maybe_unused]] uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
+
+ /**
+ * @brief Internal producer error. When this callback is triggered
+ * something important happened. Here we stop the program.
+ */
+ void produceError(const std::error_code &err) noexcept override {
+ getOutputStream() << "Error from producer transport: " << err.message()
+ << std::endl;
+ parent_.stop();
+ }
+
+ // Members initialized in constructor
+ std::vector<ContentObject::Ptr> content_objects_;
+
+ // Members initialized by in-class initializer
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::uint32_t last_segment_{0};
+ std::unique_ptr<ProducerSocket> producer_socket_{nullptr};
+ std::uint16_t content_objects_index_{0};
+ std::size_t payload_size_max_{0};
+ };
+
+ public:
+ explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) {
+#ifndef _WIN32
+ if (config_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ std::memset(rtc_payload_.data(), 'X', rtc_payload_.size());
+ }
+
+ ~Impl() = default;
+
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ producer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = producer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
+
+ if (ret) {
+ break;
+ }
+ }
+
+ return ret;
}
void receiveStream() {
socket_.async_receive_from(
- asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
- [this](const std::error_code &ec, std::size_t length) {
+ asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()),
+ remote_, [this](const std::error_code &ec, std::size_t length) {
if (ec) return;
- sendRTCContentFromStream(recv_buffer_.first, length);
+ sendRTCContentFromStream(recv_buffer_.writableData(), length);
receiveStream();
});
}
- void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+ void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
+ auto now = utils::SystemTime::nowMs().count();
- uint8_t *start = (uint8_t *)payload->writableData();
+ auto start = rtc_payload_.data();
std::memcpy(start, &now, sizeof(uint64_t));
std::memcpy(start + sizeof(uint64_t), buff, len);
- producer_socket_->produceDatagram(flow_name_, start,
- len + sizeof(uint64_t));
+
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, len + sizeof(uint64_t));
+ }
}
void sendRTCContentObjectCallback(const std::error_code &ec) {
if (ec) return;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ auto start = rtc_payload_.data();
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ std::memcpy(start, &now, sizeof(uint64_t));
- producer_socket_->produceDatagram(flow_name_, payload->data(),
- payload->length() < payload_size_max_
- ? payload->length()
- : payload_size_max_);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, config_.payload_size_);
+ }
}
void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) {
if (ec) return;
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
-
- uint32_t packet_len =
- configuration_.trace_[configuration_.trace_index_].size;
+ std::size_t packet_len = config_.trace_[config_.trace_index_].size;
// this is used to compute the data packet delay
// used only for performance evaluation
// it requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ auto start = rtc_payload_.data();
+ std::memcpy(start, &now, sizeof(uint64_t));
- if (packet_len > payload->length()) packet_len = payload->length();
- if (packet_len > payload_size_max_) packet_len = payload_size_max_;
+ if (packet_len > config_.payload_size_) {
+ packet_len = config_.payload_size_;
+ }
- producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, packet_len);
+ }
- uint32_t next_index = configuration_.trace_index_ + 1;
+ uint32_t next_index = config_.trace_index_ + 1;
uint64_t schedule_next;
- if (next_index < configuration_.trace_.size()) {
- schedule_next =
- configuration_.trace_[next_index].timestamp -
- configuration_.trace_[configuration_.trace_index_].timestamp;
+ if (next_index < config_.trace_.size()) {
+ schedule_next = config_.trace_[next_index].timestamp -
+ config_.trace_[config_.trace_index_].timestamp;
} else {
// here we need to loop, schedule in a random time
schedule_next = 1000;
}
- configuration_.trace_index_ =
- (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size();
rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
}
+ int parseTraceFile() {
+ std::ifstream trace(config_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ config_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
#ifndef _WIN32
void handleInput(const std::error_code &error, std::size_t length) {
if (error) {
- producer_socket_->stop();
- io_service_.stop();
+ stop();
}
if (rtc_running_) {
- std::cout << "stop real time content production" << std::endl;
+ Logger() << "stop real time content production" << std::endl;
rtc_running_ = false;
rtc_timer_.cancel();
} else {
- std::cout << "start real time content production" << std::endl;
+ Logger() << "start real time content production" << std::endl;
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
}
@@ -439,46 +573,33 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
}
#endif
- int parseTraceFile() {
- std::ifstream trace(configuration_.trace_file_);
- if (trace.fail()) {
- return -1;
- }
- std::string line;
- while (std::getline(trace, line)) {
- std::istringstream iss(line);
- hiperf::packet_t packet;
- iss >> packet.timestamp >> packet.size;
- configuration_.trace_.push_back(packet);
+ void stop() {
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.stop();
}
- return 0;
+
+ io_service_.stop();
}
int run() {
- std::cerr << "Starting to serve consumers" << std::endl;
-
signals_.add(SIGINT);
- signals_.async_wait([this](const std::error_code &, const int &) {
- std::cout << "STOPPING!!" << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- });
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { stop(); });
- if (configuration_.rtc_) {
-#ifndef _WIN32
- if (configuration_.interactive_) {
+ if (config_.rtc_) {
+ if (config_.interactive_) {
asio::async_read_until(
input_, input_buffer_, '\n',
std::bind(&Impl::handleInput, this, std::placeholders::_1,
std::placeholders::_2));
- } else if (configuration_.trace_based_) {
- std::cout << "trace-based mode enabled" << std::endl;
- if (configuration_.trace_file_ == nullptr) {
- std::cout << "cannot find the trace file" << std::endl;
+ } else if (config_.trace_based_) {
+ Logger() << "trace-based mode enabled" << std::endl;
+ if (config_.trace_file_ == nullptr) {
+ Logger() << "cannot find the trace file" << std::endl;
return ERROR_SETUP;
}
if (parseTraceFile() < 0) {
- std::cout << "cannot parse the trace file" << std::endl;
+ Logger() << "cannot parse the trace file" << std::endl;
return ERROR_SETUP;
}
rtc_running_ = true;
@@ -486,31 +607,26 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
- } else if (configuration_.input_stream_mode_) {
+ } else if (config_.input_stream_mode_) {
rtc_running_ = true;
// create socket
remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ asio::ip::address::from_string("127.0.0.1"), config_.port_);
socket_.open(asio::ip::udp::v4());
socket_.bind(remote_);
- recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU);
- recv_buffer_.second = HIPERF_MTU;
receiveStream();
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
this, std::placeholders::_1));
}
-#else
- rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
- rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
- std::placeholders::_1));
-#endif
+ }
+
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.run();
}
io_service_.run();
@@ -518,49 +634,31 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
return ERROR_SUCCESS;
}
+ ServerConfiguration &getConfig() { return config_; }
+
private:
- hiperf::ServerConfiguration configuration_;
+ // Variables initialized by the constructor.
+ ServerConfiguration config_;
+
+ // Variable initialized in the in-class initializer list.
asio::io_service io_service_;
- asio::signal_set signals_;
- asio::steady_timer rtc_timer_;
- std::vector<uint32_t> unsatisfied_interests_;
- std::vector<std::shared_ptr<ContentObject>> content_objects_;
- std::uint16_t content_objects_index_;
- std::uint16_t mask_;
- std::uint32_t last_segment_;
- std::unique_ptr<ProducerSocket> producer_socket_;
+ asio::signal_set signals_{io_service_};
+ asio::steady_timer rtc_timer_{io_service_};
+ asio::posix::stream_descriptor input_{io_service_};
+ asio::ip::udp::socket socket_{io_service_};
+ std::vector<ProducerContext> producer_contexts_;
::utils::EventThread produce_thread_;
- std::size_t payload_size_max_;
-#ifndef _WIN32
- asio::posix::stream_descriptor input_;
asio::streambuf input_buffer_;
- bool rtc_running_;
- Name flow_name_;
- asio::ip::udp::socket socket_;
+ bool rtc_running_{false};
asio::ip::udp::endpoint remote_;
- std::pair<uint8_t *, std::size_t> recv_buffer_;
-#endif
+ utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU};
+ std::array<uint8_t, HIPERF_MTU> rtc_payload_;
};
-HIperfServer::HIperfServer(const ServerConfiguration &conf) {
- impl_ = new Impl(conf);
-}
-
-HIperfServer::HIperfServer(HIperfServer &&other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
-}
-
-HIperfServer &HIperfServer::operator=(HIperfServer &&other) {
- if (this != &other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
- }
-
- return *this;
-}
+HIperfServer::HIperfServer(const ServerConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfServer::~HIperfServer() { delete impl_; }
+HIperfServer::~HIperfServer() = default;
int HIperfServer::setup() { return impl_->setup(); }
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
index 73ac72123..d7420da48 100644
--- a/apps/hiperf/src/server.h
+++ b/apps/hiperf/src/server.h
@@ -21,9 +21,7 @@ namespace hiperf {
class HIperfServer {
public:
- HIperfServer(const ServerConfiguration &conf);
- HIperfServer(HIperfServer &&other);
- HIperfServer &operator=(HIperfServer &&other);
+ explicit HIperfServer(const ServerConfiguration &conf);
~HIperfServer();
int setup();
@@ -31,7 +29,7 @@ class HIperfServer {
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
} // namespace hiperf \ No newline at end of file