summaryrefslogtreecommitdiffstats
path: root/apps/hiperf
diff options
context:
space:
mode:
Diffstat (limited to 'apps/hiperf')
-rw-r--r--apps/hiperf/CMakeLists.txt44
-rw-r--r--apps/hiperf/src/client.cc900
-rw-r--r--apps/hiperf/src/client.h34
-rw-r--r--apps/hiperf/src/common.h217
-rw-r--r--apps/hiperf/src/forwarder_config.h97
-rw-r--r--apps/hiperf/src/forwarder_interface.cc676
-rw-r--r--apps/hiperf/src/forwarder_interface.h131
-rw-r--r--apps/hiperf/src/main.cc456
-rw-r--r--apps/hiperf/src/server.cc516
-rw-r--r--apps/hiperf/src/server.h34
10 files changed, 3105 insertions, 0 deletions
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt
new file mode 100644
index 000000000..564525e67
--- /dev/null
+++ b/apps/hiperf/CMakeLists.txt
@@ -0,0 +1,44 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if (NOT DISABLE_EXECUTABLES)
+ list(APPEND HIPERF_SRC
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc
+ )
+
+ list (APPEND HIPERF_LIBRARIES
+ ${LIBTRANSPORT_LIBRARIES}
+ ${LIBHICNCTRL_LIBRARIES}
+ ${LIBHICN_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${LIBCONFIG_CPP_LIBRARIES}
+ ${WSOCK32_LIBRARY}
+ ${WS2_32_LIBRARY}
+ )
+
+ build_executable(hiperf
+ SOURCES ${HIPERF_SRC}
+ LINK_LIBRARIES ${HIPERF_LIBRARIES}
+ INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${LIBTRANSPORT_INCLUDE_DIRS}
+ ${LIBHICNCTRL_INCLUDE_DIRS}
+ ${LIBCONFIG_CPP_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT ${HICN_APPS}
+ LINK_FLAGS ${LINK_FLAGS}
+ )
+endif() \ No newline at end of file
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
new file mode 100644
index 000000000..820ebf0ce
--- /dev/null
+++ b/apps/hiperf/src/client.cc
@@ -0,0 +1,900 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <client.h>
+#include <forwarder_config.h>
+#include <forwarder_interface.h>
+
+#include <libconfig.h++>
+
+namespace hiperf {
+
+/**
+ * Forward declaration of client Read callbacks.
+ */
+class RTCCallback;
+class Callback;
+
+/**
+ * Hiperf client class: configure and setup an hicn consumer following the
+ * ClientConfiguration.
+ */
+class HIperfClient::Impl
+#ifdef FORWARDER_INTERFACE
+ : ForwarderInterface::ICallback
+#endif
+{
+ typedef std::chrono::time_point<std::chrono::steady_clock> Time;
+ typedef std::chrono::microseconds TimeDuration;
+
+ friend class Callback;
+ friend class RTCCallback;
+
+ struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+ };
+
+ public:
+ Impl(const hiperf::ClientConfiguration &conf)
+ : configuration_(conf),
+ total_duration_milliseconds_(0),
+ old_bytes_value_(0),
+ old_interest_tx_value_(0),
+ old_fec_interest_tx_value_(0),
+ old_fec_data_rx_value_(0),
+ old_lost_data_value_(0),
+ old_bytes_recovered_value_(0),
+ old_definitely_lost_data_value_(0),
+ old_retx_value_(0),
+ old_sent_int_value_(0),
+ old_received_nacks_value_(0),
+ old_fec_pkt_(0),
+ avg_data_delay_(0),
+ delay_sample_(0),
+ received_bytes_(0),
+ received_data_pkt_(0),
+ data_delays_(""),
+ signals_(io_service_),
+ rtc_callback_(*this),
+ callback_(*this),
+ socket_(io_service_),
+ done_(false),
+ switch_threshold_(~0)
+#ifdef FORWARDER_INTERFACE
+ ,
+ forwarder_interface_(io_service_, this)
+#endif
+ {
+ }
+
+ ~Impl() {}
+
+ void checkReceivedRtcContent(ConsumerSocket &c,
+ const ContentObject &contentObject) {}
+
+ void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
+
+ void addFace(const std::string &local_address, uint16_t local_port,
+ const std::string &remote_address, uint16_t remote_port,
+ std::string interface);
+
+ void handleTimerExpiration(ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 15;
+
+ utils::TimePoint t2 = utils::SteadyClock::now();
+ auto exact_duration =
+ std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+
+ std::stringstream interval;
+ interval << total_duration_milliseconds_ / 1000 << "-"
+ << total_duration_milliseconds_ / 1000 +
+ exact_duration.count() / 1000;
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
+ << std::setfill(separator) << "[MB]";
+
+ std::stringstream bandwidth;
+ bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator) << "[Mbps]";
+
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator)
+ << "[Int]";
+
+ std::stringstream avg_rtt;
+ avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+
+ if (configuration_.rtc_) {
+ // we get rtc stats more often, thus we need ms in the interval
+ std::stringstream interval_ms;
+ interval_ms << total_duration_milliseconds_ << "-"
+ << total_duration_milliseconds_ + exact_duration.count();
+
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - old_lost_data_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ old_bytes_recovered_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ old_definitely_lost_data_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream data_delay;
+ data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << received_data_pkt_ << std::setfill(separator)
+ << "[pkt]";
+
+ std::stringstream goodput;
+ goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
+ << std::setfill(separator) << "[Mbps]";
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator)
+ << "[%]";
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - old_retx_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() - old_sent_int_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() - old_received_nacks_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream queuing_delay;
+ queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
+ << "[ms]";
+
+#ifdef FORWARDER_INTERFACE
+ if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
+ total_duration_milliseconds_ > 1000) {
+ std::cout << "Switching due to queuing delay" << std::endl;
+ forwarder_interface_.createFaceAndRoutes(backup_routes_);
+ forwarder_interface_.deleteFaceAndRoutes(main_routes_);
+ std::swap(backup_routes_, main_routes_);
+ done_ = true;
+ }
+#endif
+
+ // statistics not yet available in the transport
+ // std::stringstream interest_fec_tx;
+ // interest_fec_tx << stats.getInterestFecTxCount() -
+ // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
+ // std::stringstream bytes_fec_recv;
+ // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
+ // << std::setfill(separator) << "[bytes]";
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "RecvData";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Goodput";
+ std::cout << std::left << std::setw(width) << "LossRate";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "InterestSent";
+ std::cout << std::left << std::setw(width) << "ReceivedNacks";
+ std::cout << std::left << std::setw(width) << "SyncWnd";
+ std::cout << std::left << std::setw(width) << "MinRtt";
+ std::cout << std::left << std::setw(width) << "QueuingDelay";
+ std::cout << std::left << std::setw(width) << "LostData";
+ std::cout << std::left << std::setw(width) << "RecoveredData";
+ std::cout << std::left << std::setw(width) << "DefinitelyLost";
+ std::cout << std::left << std::setw(width) << "State";
+ std::cout << std::left << std::setw(width) << "DataDelay";
+ std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval_ms.str();
+ std::cout << std::left << std::setw(width) << received_data_pkt.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << goodput.str();
+ std::cout << std::left << std::setw(width) << loss_rate.str();
+ std::cout << std::left << std::setw(width) << retx_sent.str();
+ std::cout << std::left << std::setw(width) << interest_sent.str();
+ std::cout << std::left << std::setw(width) << nacks.str();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str();
+ std::cout << std::left << std::setw(width) << queuing_delay.str();
+ std::cout << std::left << std::setw(width) << lost_data.str();
+ std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
+ std::cout << std::left << std::setw(width) << definitely_lost_data.str();
+ std::cout << std::left << std::setw(width) << stats.getCCStatus();
+ std::cout << std::left << std::setw(width) << data_delay.str();
+ std::cout << std::left << std::setw(width) << fec_pkt.str();
+ std::cout << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (data_delays_.size() > 0) data_delays_.pop_back();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
+ << std::endl;
+ }
+
+ // statistics not yet available in the transport
+ // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
+ // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
+ } else {
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "Transfer";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "Cwnd";
+ std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval.str();
+ std::cout << std::left << std::setw(width) << bytes_transferred.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << stats.getRetxCount();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
+ std::cout << std::endl;
+ }
+ total_duration_milliseconds_ += (uint32_t)exact_duration.count();
+ old_bytes_value_ = stats.getBytesRecv();
+ old_lost_data_value_ = stats.getLostData();
+ old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
+ old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ old_retx_value_ = stats.getRetxCount();
+ old_sent_int_value_ = stats.getInterestTx();
+ old_received_nacks_value_ = stats.getReceivedNacks();
+ old_fec_pkt_ = stats.getReceivedFEC();
+ delay_sample_ = 0;
+ avg_data_delay_ = 0;
+ received_bytes_ = 0;
+ received_data_pkt_ = 0;
+ data_delays_ = "";
+
+ t_stats_ = utils::SteadyClock::now();
+ }
+
+ bool parseConfig(const char *conf_file) {
+ using namespace libconfig;
+ Config cfg;
+
+ try {
+ cfg.readFile(conf_file);
+ } catch (const FileIOException &fioex) {
+ std::cerr << "I/O error while reading file." << std::endl;
+ return false;
+ } catch (const ParseException &pex) {
+ std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
+ << " - " << pex.getError() << std::endl;
+ return false;
+ }
+
+ Setting &config = cfg.getRoot();
+
+ if (config.exists("switch_threshold")) {
+ unsigned threshold;
+ config.lookupValue("switch_threshold", threshold);
+ switch_threshold_ = threshold;
+ }
+
+ // listeners
+ if (config.exists("listeners")) {
+ // get path where looking for modules
+ const Setting &listeners = config.lookup("listeners");
+ auto count = listeners.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &listener = listeners[i];
+ ListenerConfig list;
+ unsigned port;
+ std::string interface;
+
+ list.name = listener.getName();
+ listener.lookupValue("local_address", list.address);
+ listener.lookupValue("local_port", port);
+ listener.lookupValue("interface", list.interface);
+ list.port = (uint16_t)(port);
+
+ std::cout << "Adding listener " << list.name << ", ( " << list.address
+ << ":" << list.port << ")" << std::endl;
+ config_.addListener(std::move(list));
+ }
+ }
+
+ // connectors
+ if (config.exists("connectors")) {
+ // get path where looking for modules
+ const Setting &connectors = config.lookup("connectors");
+ auto count = connectors.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &connector = connectors[i];
+ ConnectorConfig conn;
+
+ conn.name = connector.getName();
+ unsigned port = 0;
+
+ if (!connector.lookupValue("local_address", conn.local_address)) {
+ conn.local_address = "";
+ }
+
+ if (!connector.lookupValue("local_port", port)) {
+ port = 0;
+ }
+
+ conn.local_port = (uint16_t)(port);
+
+ if (!connector.lookupValue("remote_address", conn.remote_address)) {
+ std::cerr
+ << "Error in configuration file: remote_address is a mandatory "
+ "field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ if (!connector.lookupValue("remote_port", port)) {
+ std::cerr << "Error in configuration file: remote_port is a "
+ "mandatory field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ if (!connector.lookupValue("interface", conn.interface)) {
+ std::cerr << "Error in configuration file: interface is a "
+ "mandatory field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ conn.remote_port = (uint16_t)(port);
+
+ std::cout << "Adding connector " << conn.name << ", ("
+ << conn.local_address << ":" << conn.local_port << " "
+ << conn.remote_address << ":" << conn.remote_port << ")"
+ << std::endl;
+ config_.addConnector(conn.name, std::move(conn));
+ }
+ }
+
+ // Routes
+ if (config.exists("routes")) {
+ const Setting &routes = config.lookup("routes");
+ auto count = routes.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &route = routes[i];
+ RouteConfig r;
+ unsigned weight;
+
+ r.name = route.getName();
+ route.lookupValue("prefix", r.prefix);
+ route.lookupValue("weight", weight);
+ route.lookupValue("main_connector", r.main_connector);
+ route.lookupValue("backup_connector", r.backup_connector);
+ r.weight = (uint16_t)(weight);
+
+ std::cout << "Adding route " << r.name << " " << r.prefix << " ("
+ << r.main_connector << " " << r.backup_connector << " "
+ << r.weight << ")" << std::endl;
+ config_.addRoute(std::move(r));
+ }
+ }
+
+ std::cout << "Ok" << std::endl;
+
+ return true;
+ }
+
+ bool splitRoute(std::string route, std::string &prefix,
+ uint8_t &prefix_length) {
+ std::string delimiter = "/";
+
+ size_t pos = 0;
+ if ((pos = route.find(delimiter)) != std::string::npos) {
+ prefix = route.substr(0, pos);
+ route.erase(0, pos + delimiter.length());
+ } else {
+ return false;
+ }
+
+ prefix_length = std::stoul(route.substr(0));
+ return true;
+ }
+
+#ifdef FORWARDER_INTERFACE
+ void onHicnServiceReady() override {
+ std::cout << "Successfully connected to local forwarder!" << std::endl;
+
+ std::cout << "Setting up listeners" << std::endl;
+ const char *config = getenv("FORWARDER_CONFIG");
+
+ if (config) {
+ if (!parseConfig(config)) {
+ return;
+ }
+
+ // Create faces and route using first face in the list.
+ auto &routes = config_.getRoutes();
+ auto &connectors = config_.getConnectors();
+
+ if (routes.size() == 0 || connectors.size() == 0) {
+ std::cerr << "Nothing to configure" << std::endl;
+ return;
+ }
+
+ for (auto &route : routes) {
+ auto the_connector_it = connectors.find(route.main_connector);
+ if (the_connector_it == connectors.end()) {
+ std::cerr << "No valid main connector found for route " << route.name
+ << std::endl;
+ continue;
+ }
+
+ auto &the_connector = the_connector_it->second;
+ auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
+ route_info->family = AF_INET;
+ route_info->local_addr = the_connector.local_address;
+ route_info->local_port = the_connector.local_port;
+ route_info->remote_addr = the_connector.remote_address;
+ route_info->remote_port = the_connector.remote_port;
+ route_info->interface = the_connector.interface;
+ route_info->name = the_connector.name;
+
+ std::string prefix;
+ uint8_t prefix_length;
+ auto ret = splitRoute(route.prefix, prefix, prefix_length);
+
+ if (!ret) {
+ std::cerr << "Error parsing route" << std::endl;
+ return;
+ }
+
+ route_info->route_addr = prefix;
+ route_info->route_len = prefix_length;
+
+ main_routes_.emplace_back(route_info);
+
+ if (!route.backup_connector.empty()) {
+ // Add also the backup route
+ auto the_backup_connector_it =
+ connectors.find(route.backup_connector);
+ if (the_backup_connector_it == connectors.end()) {
+ std::cerr << "No valid backup connector found for route "
+ << route.name << std::endl;
+ continue;
+ }
+
+ auto &the_backup_connector = the_backup_connector_it->second;
+ auto backup_route_info =
+ std::make_shared<ForwarderInterface::RouteInfo>();
+ backup_route_info->family = AF_INET;
+ backup_route_info->local_addr = the_backup_connector.local_address;
+ backup_route_info->local_port = the_backup_connector.local_port;
+ backup_route_info->remote_addr = the_backup_connector.remote_address;
+ backup_route_info->remote_port = the_backup_connector.remote_port;
+ backup_route_info->interface = the_backup_connector.interface;
+ backup_route_info->name = the_backup_connector.name;
+
+ std::string prefix;
+ uint8_t prefix_length;
+ auto ret = splitRoute(route.prefix, prefix, prefix_length);
+
+ if (!ret) {
+ std::cerr << "Error parsing route" << std::endl;
+ return;
+ }
+
+ backup_route_info->route_addr = prefix;
+ backup_route_info->route_len = prefix_length;
+
+ backup_routes_.emplace_back(backup_route_info);
+ }
+ }
+
+ // Create main routes
+ std::cout << "Creating main routes" << std::endl;
+ forwarder_interface_.createFaceAndRoutes(main_routes_);
+ }
+ }
+
+ void onRouteConfigured(
+ std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
+ std::cout << "Routes successfully configured!" << std::endl;
+ }
+#endif
+
+ int setup() {
+ int ret;
+
+ if (configuration_.rtc_) {
+ configuration_.transport_protocol_ = RTC;
+ } else if (configuration_.window < 0) {
+ configuration_.transport_protocol_ = RAAQM;
+ } else {
+ configuration_.transport_protocol_ = CBR;
+ }
+
+ if (configuration_.relay_ && configuration_.rtc_) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ }
+
+ if (configuration_.output_stream_mode_ && configuration_.rtc_) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
+
+ if (configuration_.secure_) {
+ consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
+ RAAQM, configuration_.transport_protocol_);
+ if (configuration_.producer_prefix_.getPrefixLength() == 0) {
+ std::cerr << "ERROR -- Missing producer prefix on which perform the "
+ "handshake."
+ << std::endl;
+ } else {
+ P2PSecureConsumerSocket &secure_consumer_socket =
+ *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
+ secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
+ }
+ } else {
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
+ }
+
+ consumer_socket_->setSocketOption(
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
+
+#if defined(DEBUG) && defined(__linux__)
+ std::shared_ptr<transport::BasePortal> portal;
+ consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
+ signals_ =
+ std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
+ signals_->async_wait([this](const std::error_code &, const int &) {
+ std::cout << "Signal SIGUSR1!" << std::endl;
+ mtrace();
+ });
+#endif
+
+ if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
+ configuration_.window) ==
+ SOCKET_OPTION_NOT_SET) {
+ std::cerr << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
+
+ if (configuration_.transport_protocol_ == RAAQM &&
+ configuration_.beta != -1.f) {
+ if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ configuration_.beta) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.transport_protocol_ == RAAQM &&
+ configuration_.drop_factor != -1.f) {
+ if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
+ configuration_.drop_factor) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (!configuration_.producer_certificate.empty()) {
+ std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
+ configuration_.producer_certificate);
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET)
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase.empty()) {
+ std::shared_ptr<Verifier> verifier =
+ std::make_shared<SymmetricVerifier>(configuration_.passphrase);
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET)
+ return ERROR_SETUP;
+ }
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
+ } else {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
+ }
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (ConsumerContentObjectCallback)std::bind(
+ &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
+ std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
+ std::shared_ptr<TransportStatistics> transport_stats;
+ consumer_socket_->getSocketOption(
+ OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
+ }
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ consumer_socket_->connect();
+
+ return ERROR_SUCCESS;
+ }
+
+ int run() {
+ std::cout << "Starting download of " << configuration_.name << std::endl;
+
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
+
+ t_download_ = t_stats_ = std::chrono::steady_clock::now();
+ consumer_socket_->asyncConsume(configuration_.name);
+ io_service_.run();
+
+ consumer_socket_->stop();
+
+ return ERROR_SUCCESS;
+ }
+
+ private:
+ class RTCCallback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t mtu = 1500;
+
+ public:
+ RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = mtu;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ client_.received_bytes_ += length;
+ client_.received_data_pkt_++;
+
+ // collecting delay stats. Just for performance testing
+ uint64_t *senderTimeStamp =
+ (uint64_t *)client_.configuration_.receive_buffer->writableData();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ double new_delay = (double)(now - *senderTimeStamp);
+
+ if (*senderTimeStamp > now)
+ new_delay = -1 * (double)(*senderTimeStamp - now);
+
+ client_.delay_sample_++;
+ client_.avg_data_delay_ =
+ client_.avg_data_delay_ +
+ (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+
+ if (client_.configuration_.test_mode_) {
+ client_.data_delays_ += std::to_string(int(new_delay));
+ client_.data_delays_ += ",";
+ }
+
+ if (client_.configuration_.relay_) {
+ client_.producer_socket_->produceDatagram(
+ client_.configuration_.relay_name_.getName(),
+ client_.configuration_.receive_buffer->writableData(),
+ length < 1400 ? length : 1400);
+ }
+ if (client_.configuration_.output_stream_mode_) {
+ uint8_t *start =
+ (uint8_t *)client_.configuration_.receive_buffer->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ }
+ }
+
+ size_t maxBufferSize() const override { return mtu; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error while reading from RTC socket" << std::endl;
+ client_.io_service_.stop();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ std::cout << "Data successfully read" << std::endl;
+ }
+
+ private:
+ Impl &client_;
+ };
+
+ class Callback : public ConsumerSocket::ReadCallback {
+ public:
+ Callback(Impl &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer =
+ utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = client_.configuration_.receive_buffer_size_;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {}
+
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+
+ size_t maxBufferSize() const override {
+ return client_.configuration_.receive_buffer_size_;
+ }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error " << ec.message() << " while reading from socket"
+ << std::endl;
+ client_.io_service_.stop();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ Time t2 = std::chrono::steady_clock::now();
+ TimeDuration dt =
+ std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
+ long usec = (long)dt.count();
+
+ std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
+ << std::endl;
+
+ std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
+ << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
+ << std::endl;
+
+ client_.io_service_.stop();
+ }
+
+ private:
+ Impl &client_;
+ };
+
+ hiperf::ClientConfiguration configuration_;
+ Time t_stats_;
+ Time t_download_;
+ uint32_t total_duration_milliseconds_;
+ uint64_t old_bytes_value_;
+ uint64_t old_interest_tx_value_;
+ uint64_t old_fec_interest_tx_value_;
+ uint64_t old_fec_data_rx_value_;
+ uint64_t old_lost_data_value_;
+ uint64_t old_bytes_recovered_value_;
+ uint64_t old_definitely_lost_data_value_;
+ uint32_t old_retx_value_;
+ uint32_t old_sent_int_value_;
+ uint32_t old_received_nacks_value_;
+ uint32_t old_fec_pkt_;
+
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_;
+ uint32_t delay_sample_;
+
+ uint32_t received_bytes_;
+ uint32_t received_data_pkt_;
+
+ std::string data_delays_;
+
+ asio::io_service io_service_;
+ asio::signal_set signals_;
+ RTCCallback rtc_callback_;
+ Callback callback_;
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::endpoint remote_;
+
+ ForwarderConfiguration config_;
+ uint16_t switch_threshold_; /* ms */
+ bool done_;
+ std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
+ std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
+#ifdef FORWARDER_INTERFACE
+ ForwarderInterface forwarder_interface_;
+#endif
+};
+
+HIperfClient::HIperfClient(const ClientConfiguration &conf) {
+ impl_ = new Impl(conf);
+}
+
+HIperfClient::~HIperfClient() { delete impl_; }
+
+int HIperfClient::setup() { return impl_->setup(); }
+
+void HIperfClient::run() { impl_->run(); }
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
new file mode 100644
index 000000000..f45b9af43
--- /dev/null
+++ b/apps/hiperf/src/client.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <common.h>
+
+namespace hiperf {
+
+class HIperfClient {
+ public:
+ HIperfClient(const ClientConfiguration &conf);
+ ~HIperfClient();
+ int setup();
+ void run();
+
+ private:
+ class Impl;
+ Impl *impl_;
+};
+
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
new file mode 100644
index 000000000..e6ba526f9
--- /dev/null
+++ b/apps/hiperf/src/common.h
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/auth/identity.h>
+#include <hicn/transport/auth/signer.h>
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
+#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/literals.h>
+
+#ifndef _WIN32
+#include <hicn/transport/utils/daemonizator.h>
+#endif
+
+#include <asio.hpp>
+#include <cmath>
+#include <fstream>
+#include <iomanip>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+
+#ifndef ERROR_SUCCESS
+#define ERROR_SUCCESS 0
+#endif
+#define ERROR_SETUP -5
+#define MIN_PROBE_SEQ 0xefffffff
+
+using namespace transport::interface;
+using namespace transport::auth;
+using namespace transport::core;
+
+static inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+static inline uint64_t _htonll(const uint64_t *input) {
+ return (_ntohll(input));
+}
+
+namespace hiperf {
+
+/**
+ * Class for handling the production rate for the RTC producer.
+ */
+class Rate {
+ public:
+ Rate() : rate_kbps_(0) {}
+
+ Rate(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+ }
+
+ Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {}
+
+ Rate &operator=(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+
+ return *this;
+ }
+
+ std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) {
+ return std::chrono::microseconds(
+ (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_));
+ }
+
+ private:
+ float rate_kbps_;
+};
+
+struct packet_t {
+ uint64_t timestamp;
+ uint32_t size;
+};
+
+/**
+ * Container for command line configuration for hiperf client.
+ */
+struct ClientConfiguration {
+ ClientConfiguration()
+ : name("b001::abcd", 0),
+ beta(-1.f),
+ drop_factor(-1.f),
+ window(-1),
+ producer_certificate(""),
+ passphrase(""),
+ receive_buffer(nullptr),
+ receive_buffer_size_(128 * 1024),
+ download_size(0),
+ report_interval_milliseconds_(1000),
+ transport_protocol_(CBR),
+ rtc_(false),
+ test_mode_(false),
+ relay_(false),
+ secure_(false),
+ producer_prefix_(),
+ interest_lifetime_(500),
+ relay_name_("c001::abcd/64"),
+ output_stream_mode_(false),
+ port_(0) {}
+
+ Name name;
+ double beta;
+ double drop_factor;
+ double window;
+ std::string producer_certificate;
+ std::string passphrase;
+ std::shared_ptr<utils::MemBuf> receive_buffer;
+ std::size_t receive_buffer_size_;
+ std::size_t download_size;
+ std::uint32_t report_interval_milliseconds_;
+ TransportProtocolAlgorithms transport_protocol_;
+ bool rtc_;
+ bool test_mode_;
+ bool relay_;
+ bool secure_;
+ Prefix producer_prefix_;
+ uint32_t interest_lifetime_;
+ Prefix relay_name_;
+ bool output_stream_mode_;
+ uint16_t port_;
+};
+
+/**
+ * Container for command line configuration for hiperf server.
+ */
+struct ServerConfiguration {
+ ServerConfiguration()
+ : name("b001::abcd/64"),
+ virtual_producer(true),
+ manifest(false),
+ live_production(false),
+ content_lifetime(600000000_U32),
+ download_size(20 * 1024 * 1024),
+ hash_algorithm(CryptoHashType::SHA256),
+ keystore_name(""),
+ passphrase(""),
+ keystore_password("cisco"),
+ multiphase_produce_(false),
+ rtc_(false),
+ interactive_(false),
+ trace_based_(false),
+ trace_index_(0),
+ trace_file_(nullptr),
+ production_rate_(std::string("2048kbps")),
+ payload_size_(1400),
+ secure_(false),
+ input_stream_mode_(false),
+ port_(0) {}
+
+ Prefix name;
+ bool virtual_producer;
+ bool manifest;
+ bool live_production;
+ std::uint32_t content_lifetime;
+ std::uint32_t download_size;
+ CryptoHashType hash_algorithm;
+ std::string keystore_name;
+ std::string passphrase;
+ std::string keystore_password;
+ bool multiphase_produce_;
+ bool rtc_;
+ bool interactive_;
+ bool trace_based_;
+ std::uint32_t trace_index_;
+ char *trace_file_;
+ Rate production_rate_;
+ std::size_t payload_size_;
+ bool secure_;
+ bool input_stream_mode_;
+ uint16_t port_;
+ std::vector<struct packet_t> trace_;
+};
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
new file mode 100644
index 000000000..655ac3b66
--- /dev/null
+++ b/apps/hiperf/src/forwarder_config.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace hiperf {
+
+struct ListenerConfig {
+ std::string address;
+ std::uint16_t port;
+ std::string interface;
+ std::string name;
+};
+
+struct ConnectorConfig {
+ std::string local_address;
+ std::uint16_t local_port;
+ std::string remote_address;
+ std::uint16_t remote_port;
+ std::string interface;
+ std::string name;
+};
+
+struct RouteConfig {
+ std::string prefix;
+ uint16_t weight;
+ std::string main_connector;
+ std::string backup_connector;
+ std::string name;
+};
+
+class ForwarderConfiguration {
+ public:
+ ForwarderConfiguration() : n_threads_(1) {}
+
+ bool empty() {
+ return listeners_.empty() && connectors_.empty() && routes_.empty();
+ }
+
+ ForwarderConfiguration &setThreadNumber(std::size_t threads) {
+ n_threads_ = threads;
+ return *this;
+ }
+
+ std::size_t getThreadNumber() { return n_threads_; }
+
+ template <typename... Args>
+ ForwarderConfiguration &addListener(Args &&...args) {
+ listeners_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ ForwarderConfiguration &addConnector(const std::string &name,
+ Args &&...args) {
+ connectors_.emplace(name, std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ ForwarderConfiguration &addRoute(Args &&...args) {
+ routes_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ std::vector<ListenerConfig> &getListeners() { return listeners_; }
+
+ std::unordered_map<std::string, ConnectorConfig> &getConnectors() {
+ return connectors_;
+ }
+
+ std::vector<RouteConfig> &getRoutes() { return routes_; }
+
+ private:
+ std::vector<ListenerConfig> listeners_;
+ std::unordered_map<std::string, ConnectorConfig> connectors_;
+ std::vector<RouteConfig> routes_;
+ std::size_t n_threads_;
+};
+
+} \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
new file mode 100644
index 000000000..864208239
--- /dev/null
+++ b/apps/hiperf/src/forwarder_interface.cc
@@ -0,0 +1,676 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arpa/inet.h>
+#include <forwarder_interface.h>
+#include <hicn/transport/utils/log.h>
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <unordered_set>
+
+extern "C" {
+#include <hicn/error.h>
+#include <hicn/util/ip_address.h>
+}
+
+// XXX the main listener should be retrieve in this class at initialization, aka
+// when hICN becomes avialable
+//
+// XXX the main listener port will be retrieved in the forwarder
+// interface... everything else will be delayed until we have this
+// information
+
+namespace hiperf {
+
+ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
+ ICallback *callback)
+ : external_ioservice_(io_service),
+ forwarder_interface_callback_(callback),
+ work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
+ sock_(nullptr),
+ thread_(std::make_unique<std::thread>([this]() {
+ std::cout << "Starting Forwarder Interface thread" << std::endl;
+ internal_ioservice_.run();
+ std::cout << "Stopping Forwarder Interface thread" << std::endl;
+ })),
+ // set_route_callback_(std::forward<Callback &&>(setRouteCallback)),
+ check_routes_timer_(nullptr),
+ pending_add_route_counter_(0),
+ hicn_listen_port_(9695),
+ /* We start in disabled state even when a forwarder is always available */
+ state_(State::Disabled),
+ timer_(io_service),
+ num_reattempts(0) {
+ std::cout << "Forwarder interface created... connecting to forwarder...\n";
+ internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
+}
+
+ForwarderInterface::~ForwarderInterface() {
+ if (thread_ && thread_->joinable()) {
+ internal_ioservice_.dispatch([this]() {
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+
+ work_.reset();
+ });
+
+ thread_->join();
+ }
+
+ std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl;
+}
+
+void ForwarderInterface::onHicnServiceAvailable(bool flag) {
+ if (flag) {
+ switch (state_) {
+ case State::Disabled:
+ case State::Requested:
+ state_ = State::Available;
+ case State::Available:
+ connectToForwarder();
+ /* Synchronous */
+ if (state_ != State::Connected) {
+ std::cout << "ConnectToForwarder failed" << std::endl;
+ goto REATTEMPT;
+ }
+ state_ = State::Ready;
+
+ std::cout << "Connected to forwarder... cancelling reconnection timer"
+ << std::endl;
+ timer_.cancel();
+ num_reattempts = 0;
+
+ // case State::Connected:
+ // checkListener();
+
+ // if (state_ != State::Ready) {
+ // std::cout << "Listener not found" << std::endl;
+ // goto REATTEMPT;
+ // }
+ // state_ = State::Ready;
+
+ // timer_.cancel();
+ // num_reattempts = 0;
+
+ std::cout << "Forwarder interface is ready... communicate to controller"
+ << std::endl;
+
+ forwarder_interface_callback_->onHicnServiceReady();
+
+ case State::Ready:
+ break;
+ }
+ } else {
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+ state_ = State::Disabled; // XXX to be checked upon callback to prevent the
+ // state from going forward (used to manage
+ // concurrency)
+ }
+ return;
+
+REATTEMPT:
+ /* Schedule reattempt */
+ std::cout << "Failed to connect, scheduling reattempt" << std::endl;
+ num_reattempts++;
+
+ timer_.expires_from_now(
+ std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
+ // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
+ // this, flag, std::placeholders::_1));
+ timer_.async_wait([this, flag](std::error_code ec) {
+ if (ec) return;
+ onHicnServiceAvailable(flag);
+ });
+}
+
+int ForwarderInterface::connectToForwarder() {
+ sock_ = hc_sock_create();
+ if (!sock_) {
+ std::cout << "Could not create socket" << std::endl;
+ goto ERR_SOCK;
+ }
+
+ if (hc_sock_connect(sock_) < 0) {
+ std::cout << "Could not connect to forwarder" << std::endl;
+ goto ERR;
+ }
+
+ std::cout << "Forwarder interface connected" << std::endl;
+ state_ = State::Connected;
+ return 0;
+
+ERR:
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ERR_SOCK:
+ return -1;
+}
+
+int ForwarderInterface::checkListener() {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) return -1;
+
+ int ret = -1;
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ hicn_listen_port_ = l->local_port;
+ state_ = State::Ready;
+ ret = 0;
+ std::cout << "Got listener port" << std::endl;
+ break;
+ }
+ }
+
+ hc_data_free(data);
+ return ret;
+}
+
+void ForwarderInterface::close() {
+ std::cout << "ForwarderInterface::close" << std::endl;
+
+ state_ = State::Disabled;
+ /* Cancelling eventual reattempts */
+ timer_.cancel();
+
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+
+ internal_ioservice_.post([this]() { work_.reset(); });
+
+ if (thread_->joinable()) {
+ thread_->join();
+ }
+}
+
+#if 0
+void ForwarderInterface::enableCheckRoutesTimer() {
+ if (check_routes_timer_ != nullptr) return;
+
+ check_routes_timer_ =
+ std::make_unique<asio::steady_timer>(internal_ioservice_);
+ checkRoutesLoop();
+}
+
+void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) {
+ internalRemoveConnectedUser(protocol);
+}
+
+void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) {
+ internal_ioservice_.post(
+ [this, protocol]() { internalRemoveConnectedUser(protocol); });
+}
+#endif
+
+void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) {
+ std::vector<RouteInfoPtr> routes;
+ routes.push_back(std::move(route_info));
+ createFaceAndRoutes(routes);
+}
+
+void ForwarderInterface::createFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &routes_info) {
+ pending_add_route_counter_++;
+ auto timer = new asio::steady_timer(internal_ioservice_);
+ internal_ioservice_.post([this, routes_info, timer]() {
+ internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT,
+ timer);
+ });
+}
+
+void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) {
+ std::vector<RouteInfoPtr> routes;
+ routes.push_back(std::move(route_info));
+ deleteFaceAndRoutes(routes);
+}
+
+void ForwarderInterface::deleteFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &routes_info) {
+ internal_ioservice_.post([this, routes_info]() {
+ for (auto &route : routes_info) {
+ internalDeleteFaceAndRoute(route);
+ }
+ });
+}
+
+void ForwarderInterface::internalDeleteFaceAndRoute(
+ const RouteInfoPtr &route_info) {
+ if (!sock_) return;
+
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) return;
+
+ std::vector<hc_route_t *> routes_to_remove;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+
+ std::string route_addr(remote_addr);
+ if (route_addr.compare(route_info->route_addr) == 0 &&
+ r->len == route_info->route_len) {
+ // route found
+ routes_to_remove.push_back(r);
+ }
+ }
+
+ if (routes_to_remove.size() == 0) {
+ // nothing to do here
+ hc_data_free(data);
+ return;
+ }
+
+ std::unordered_set<uint32_t> connids_to_remove;
+ for (unsigned i = 0; i < routes_to_remove.size(); i++) {
+ connids_to_remove.insert(routes_to_remove[i]->face_id);
+ if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
+ std::cout << "Error removing route from forwarder." << std::endl;
+ }
+ }
+
+ // remove connection
+ if (hc_connection_list(sock_, &data) < 0) {
+ hc_data_free(data);
+ return;
+ }
+
+ // collects pointerst to the connections using the conn IDs
+ std::vector<hc_connection_t *> conns_to_remove;
+ foreach_connection(c, data) {
+ if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
+ // conn found
+ conns_to_remove.push_back(c);
+ }
+ }
+
+ if (conns_to_remove.size() == 0) {
+ // nothing else to do here
+ hc_data_free(data);
+ return;
+ }
+
+ for (unsigned i = 0; i < conns_to_remove.size(); i++) {
+ if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
+ std::cout << "Error removing connection from forwarder." << std::endl;
+ }
+ }
+
+ hc_data_free(data);
+}
+
+void ForwarderInterface::internalCreateFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &route_info, uint8_t max_try,
+ asio::steady_timer *timer) {
+ uint32_t face_id;
+
+ std::vector<RouteInfoPtr> failed;
+ for (auto &route : route_info) {
+ int ret = tryToCreateFace(route.get(), &face_id);
+ if (ret >= 0) {
+ auto ret = tryToCreateRoute(route.get(), face_id);
+ if (ret < 0) {
+ failed.push_back(route);
+ std::cerr << "Error creating route and face" << std::endl;
+ continue;
+ }
+ }
+ }
+
+ if (failed.size() > 0) {
+ if (max_try == 0) {
+ /* All attempts failed */
+ goto RESULT;
+ }
+ max_try--;
+ timer->expires_from_now(std::chrono::milliseconds(500));
+ timer->async_wait([this, failed, max_try, timer](std::error_code ec) {
+ if (ec) return;
+ internalCreateFaceAndRoutes(failed, max_try, timer);
+ });
+ return;
+ }
+
+#if 0
+ // route_status_[protocol] = std::move(route_info);
+ for (size_t i = 0; i < route_info.size(); i++) {
+ route_status_.insert(
+ std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i])));
+ }
+#endif
+
+RESULT:
+ std::cout << "Face / Route create ok, now calling back protocol" << std::endl;
+ pending_add_route_counter_--;
+ external_ioservice_.post([this, r = std::move(route_info)]() mutable {
+ forwarder_interface_callback_->onRouteConfigured(r);
+ });
+ delete timer;
+}
+
+int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
+ uint32_t *face_id) {
+ bool found = false;
+
+ // check connection with the forwarder
+ if (!sock_) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] socket error"
+ << std::endl;
+ goto ERR_SOCK;
+ }
+
+ // get listeners list
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners";
+ goto ERR_LIST;
+ }
+
+ char _local_address[128];
+ foreach_listener(l, data) {
+ std::cout << "Processing " << l->interface_name << std::endl;
+ std::string interface = std::string(l->interface_name);
+ int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET);
+ if (ret < 0) {
+ std::cerr << "Error in ip_address_ntop" << std::endl;
+ goto ERR;
+ }
+
+ std::string local_address = std::string(_local_address);
+ uint16_t local_port = l->local_port;
+
+ if (interface.compare(route_info->interface) == 0 &&
+ local_address.compare(route_info->local_addr) == 0 &&
+ local_port == route_info->local_port) {
+ found = true;
+ break;
+ }
+ }
+
+ std::cout << route_info->remote_addr << std::endl;
+
+ ip_address_t local_address, remote_address;
+ ip_address_pton(route_info->local_addr.c_str(), &local_address);
+ ip_address_pton(route_info->remote_addr.c_str(), &remote_address);
+
+ if (!found) {
+ // Create listener
+ hc_listener_t listener;
+ memset(&listener, 0, sizeof(hc_listener_t));
+
+ std::string name = "l_" + route_info->name;
+ listener.local_addr = local_address;
+ listener.type = CONNECTION_TYPE_UDP;
+ listener.family = AF_INET;
+ listener.local_port = route_info->local_port;
+ strncpy(listener.name, name.c_str(), sizeof(listener.name));
+ strncpy(listener.interface_name, route_info->interface.c_str(),
+ sizeof(listener.interface_name));
+
+ std::cout << "------------> " << route_info->interface << std::endl;
+
+ int ret = hc_listener_create(sock_, &listener);
+
+ if (ret < 0) {
+ std::cerr << "Error creating listener." << std::endl;
+ return -1;
+ } else {
+ std::cout << "Listener " << listener.id << " created." << std::endl;
+ }
+ }
+
+ // Create face
+ hc_face_t face;
+ memset(&face, 0, sizeof(hc_face_t));
+
+ // crate face with the local interest
+ face.face.type = FACE_TYPE_UDP;
+ face.face.family = route_info->family;
+ face.face.local_addr = local_address;
+ face.face.remote_addr = remote_address;
+ face.face.local_port = route_info->local_port;
+ face.face.remote_port = route_info->remote_port;
+
+ if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) <
+ 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] "
+ "netdevice_set_name "
+ "("
+ << face.face.netdevice.name << ", "
+ << route_info->interface << ") error" << std::endl;
+ goto ERR;
+ }
+
+ // create face
+ if (hc_face_create(sock_, &face) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] error creating face";
+ goto ERR;
+ }
+
+ std::cout << "Face created successfully" << std::endl;
+
+ // assing face to the return value
+ *face_id = face.id;
+
+ hc_data_free(data);
+ return 0;
+
+ERR:
+ hc_data_free(data);
+ERR_LIST:
+ERR_SOCK:
+ return -1;
+}
+
+int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
+ uint32_t face_id) {
+ std::cout << "Trying to create route" << std::endl;
+
+ // check connection with the forwarder
+ if (!sock_) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] socket error";
+ return -1;
+ }
+
+ ip_address_t route_ip;
+ hc_route_t route;
+
+ if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error";
+ return -1;
+ }
+
+ route.face_id = face_id;
+ route.family = AF_INET6;
+ route.remote_addr = route_ip;
+ route.len = route_info->route_len;
+ route.cost = 1;
+
+ if (hc_route_create(sock_, &route) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route";
+ return -1;
+ }
+
+ std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl;
+ return 0;
+}
+
+#if 0 // not used
+void ForwarderInterface::checkRoutesLoop() {
+ check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ check_routes_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (pending_add_route_counter_ == 0) checkRoutes();
+ });
+}
+
+void ForwarderInterface::checkRoutes() {
+ std::cout << "someone called the checkRoutes function" << std::endl;
+ if (!sock_) return;
+
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) {
+ return;
+ }
+
+ std::unordered_set<std::string> routes_set;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+ std::string route(std::string(remote_addr) + "/" + std::to_string(r->len));
+ routes_set.insert(route);
+ }
+
+ for (auto it = route_status_.begin(); it != route_status_.end(); it++) {
+ std::string route(it->second->route_addr + "/" +
+ std::to_string(it->second->route_len));
+ if (routes_set.find(route) == routes_set.end()) {
+ // the route is missing
+ createFaceAndRoute(it->second, it->first);
+ break;
+ }
+ }
+
+ hc_data_free(data);
+}
+#endif
+
+#if 0
+ using ListenerRetrievedCallback =
+ std::function<void(std::error_code, uint32_t)>;
+
+ ListenerRetrievedCallback listener_retrieved_callback_;
+
+#ifdef __ANDROID__
+ hicn_listen_port_(9695),
+#else
+ hicn_listen_port_(0),
+#endif
+ timer_(forward_engine_.getIoService()),
+
+ void initConfigurationProtocol(void)
+ {
+ // We need the configuration, which is different for every protocol...
+ // so we move this step down towards the protocol implementation itself.
+ if (!permanent_hicn) {
+ doInitConfigurationProtocol();
+ } else {
+ // XXX This should be moved somewhere else
+ getMainListener(
+ [this](std::error_code ec, uint32_t hicn_listen_port) {
+ if (!ec)
+ {
+ hicn_listen_port_ = hicn_listen_port;
+ doInitConfigurationProtocol();
+ }
+ });
+ }
+ }
+
+ template <typename Callback>
+ void getMainListener(Callback &&callback)
+ {
+ listener_retrieved_callback_ = std::forward<Callback &&>(callback);
+ tryToConnectToForwarder();
+ }
+ private:
+ void doGetMainListener(std::error_code ec)
+ {
+ if (!ec)
+ {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.getMainListenerPort();
+ if (ret <= 0)
+ {
+ // Since without the main listener of the forwarder the proxy cannot
+ // work, we can stop the program here until we get the listener port.
+ std::cout <<
+ "Could not retrieve main listener port from the forwarder. "
+ "Retrying.";
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&Protocol::doGetMainListener, this,
+ std::placeholders::_1));
+ }
+ else
+ {
+ timer_.cancel();
+ retx_count_ = 0;
+ hicn_listen_port_ = uint16_t(ret);
+ listener_retrieved_callback_(
+ make_error_code(configuration_error::success), hicn_listen_port_);
+ }
+ }
+ else
+ {
+ std::cout << "Timer for retrieving main hicn listener canceled." << std::endl;
+ }
+ }
+
+ void tryToConnectToForwarder()
+ {
+ doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
+ }
+
+ void doTryToConnectToForwarder(std::error_code ec)
+ {
+ if (!ec)
+ {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.connect();
+ if (ret < 0)
+ {
+ // We were not able to connect to the local forwarder. Do not give up
+ // and retry.
+ std::cout << "Could not connect to local forwarder. Retrying." << std::endl;
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this,
+ std::placeholders::_1));
+ }
+ else
+ {
+ timer_.cancel();
+ retx_count_ = 0;
+ doGetMainListener(std::make_error_code(std::errc(0)));
+ }
+ }
+ else
+ {
+ std::cout << "Timer for re-trying forwarder connection canceled." << std::endl;
+ }
+ }
+
+
+ template <typename ProtocolImplementation>
+ constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL;
+
+#endif
+
+constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
+constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
+
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
new file mode 100644
index 000000000..7591ea257
--- /dev/null
+++ b/apps/hiperf/src/forwarder_interface.h
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#ifndef WITH_POLICY
+#define WITH_POLICY
+#endif
+#include <hicn/ctrl/api.h>
+#include <hicn/util/ip_address.h>
+}
+
+#ifndef ASIO_STANDALONE
+#define ASIO_STANDALONE
+#endif
+#include <asio.hpp>
+
+#include <functional>
+#include <thread>
+#include <unordered_map>
+
+namespace hiperf {
+
+class ForwarderInterface {
+ static const uint32_t REATTEMPT_DELAY_MS = 500;
+ static const uint32_t MAX_REATTEMPT = 10;
+
+ public:
+ struct RouteInfo {
+ int family;
+ std::string local_addr;
+ uint16_t local_port;
+ std::string remote_addr;
+ uint16_t remote_port;
+ std::string route_addr;
+ uint8_t route_len;
+ std::string interface;
+ std::string name;
+ };
+
+ using RouteInfoPtr = std::shared_ptr<RouteInfo>;
+
+ class ICallback {
+ public:
+ virtual void onHicnServiceReady() = 0;
+ virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0;
+ };
+
+ enum class State {
+ Disabled, /* Stack is stopped */
+ Requested, /* Stack is starting */
+ Available, /* Forwarder is running */
+ Connected, /* Control socket connected */
+ Ready, /* Listener present */
+ };
+
+ public:
+ ForwarderInterface(asio::io_service &io_service, ICallback *callback);
+
+ ~ForwarderInterface();
+
+ State getState();
+
+ void setState(State state);
+
+ void onHicnServiceAvailable(bool flag);
+
+ void enableCheckRoutesTimer();
+
+ void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
+
+ void createFaceAndRoute(const RouteInfoPtr &route_info);
+
+ void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
+
+ void deleteFaceAndRoute(const RouteInfoPtr &route_info);
+
+ void close();
+
+ uint16_t getHicnListenerPort() { return hicn_listen_port_; }
+
+ private:
+ int connectToForwarder();
+
+ int checkListener();
+
+ void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info,
+ uint8_t max_try, asio::steady_timer *timer);
+
+ void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info);
+
+ int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id);
+ int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id);
+
+ void checkRoutesLoop();
+
+ void checkRoutes();
+
+ asio::io_service &external_ioservice_;
+ asio::io_service internal_ioservice_;
+ ICallback *forwarder_interface_callback_;
+ std::unique_ptr<asio::io_service::work> work_;
+ hc_sock_t *sock_;
+ std::unique_ptr<std::thread> thread_;
+ // SetRouteCallback set_route_callback_;
+ // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_;
+ std::unique_ptr<asio::steady_timer> check_routes_timer_;
+ uint32_t pending_add_route_counter_;
+ uint16_t hicn_listen_port_;
+
+ State state_;
+
+ /* Reattempt timer */
+ asio::steady_timer timer_;
+ unsigned num_reattempts;
+};
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
new file mode 100644
index 000000000..b2d99c4a4
--- /dev/null
+++ b/apps/hiperf/src/main.cc
@@ -0,0 +1,456 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <client.h>
+#include <server.h>
+#include <forwarder_interface.h>
+
+namespace hiperf {
+
+void usage() {
+ std::cerr << "HIPERF - A tool for performing network throughput "
+ "measurements with hICN"
+ << std::endl;
+ std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl;
+ std::cerr << std::endl;
+ std::cerr << "SERVER OR CLIENT:" << std::endl;
+#ifndef _WIN32
+ std::cerr << "-D\t\t\t\t\t"
+ << "Run as a daemon" << std::endl;
+ std::cerr << "-R\t\t\t\t\t"
+ << "Run RTC protocol (client or server)" << std::endl;
+ std::cerr << "-f\t<filename>\t\t\t"
+ << "Log file" << std::endl;
+ std::cerr << "-z\t<io_module>\t\t\t"
+ << "IO module to use. Default: hicnlight_module" << std::endl;
+#endif
+ std::cerr << std::endl;
+ std::cerr << "SERVER SPECIFIC:" << std::endl;
+ std::cerr << "-A\t<content_size>\t\t\t"
+ "Size of the content to publish. This "
+ "is not the size of the packet (see -s for it)."
+ << std::endl;
+ std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
+ << std::endl;
+ std::cerr << "-r\t\t\t\t\t"
+ << "Produce real content of <content_size> bytes" << std::endl;
+ std::cerr << "-m\t\t\t\t\t"
+ << "Produce transport manifest" << std::endl;
+ std::cerr << "-l\t\t\t\t\t"
+ << "Start producing content upon the reception of the "
+ "first interest"
+ << std::endl;
+ std::cerr << "-K\t<keystore_path>\t\t\t"
+ << "Path of p12 file containing the "
+ "crypto material used for signing packets"
+ << std::endl;
+ std::cerr << "-k\t<passphrase>\t\t\t"
+ << "String from which a 128-bit symmetric key will be "
+ "derived for signing packets"
+ << std::endl;
+ std::cerr << "-y\t<hash_algorithm>\t\t"
+ << "Use the selected hash algorithm for "
+ "calculating manifest digests"
+ << std::endl;
+ std::cerr << "-p\t<password>\t\t\t"
+ << "Password for p12 keystore" << std::endl;
+ std::cerr << "-x\t\t\t\t\t"
+ << "Produce a content of <content_size>, then after downloading "
+ "it produce a new content of"
+ << "\n\t\t\t\t\t<content_size> without resetting "
+ "the suffix to 0."
+ << std::endl;
+ std::cerr << "-B\t<bitrate>\t\t\t"
+ << "Bitrate for RTC producer, to be used with the -R option."
+ << std::endl;
+#ifndef _WIN32
+ std::cerr << "-I\t\t\t\t\t"
+ "Interactive mode, start/stop real time content production "
+ "by pressing return. To be used with the -R option"
+ << std::endl;
+ std::cerr
+ << "-T\t<filename>\t\t\t"
+ "Trace based mode, hiperf takes as input a file with a trace. "
+ "Each line of the file indicates the timestamp and the size of "
+ "the packet to generate. To be used with the -R option. -B and -I "
+ "will be ignored."
+ << std::endl;
+ std::cerr << "-E\t\t\t\t\t"
+ << "Enable encrypted communication. Requires the path to a p12 "
+ "file containing the "
+ "crypto material used for the TLS handshake"
+ << std::endl;
+ std::cerr << "-G\t<port>\t\t\t"
+ << "input stream from localhost at the specified port" << std::endl;
+#endif
+ std::cerr << std::endl;
+ std::cerr << "CLIENT SPECIFIC:" << std::endl;
+ std::cerr << "-b\t<beta_parameter>\t\t"
+ << "RAAQM beta parameter" << std::endl;
+ std::cerr << "-d\t<drop_factor_parameter>\t\t"
+ << "RAAQM drop factor "
+ "parameter"
+ << std::endl;
+ std::cerr << "-L\t<interest lifetime>\t\t"
+ << "Set interest lifetime." << std::endl;
+ std::cerr << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled."
+ << std::endl;
+ std::cerr << "-W\t<window_size>\t\t\t"
+ << "Use a fixed congestion window "
+ "for retrieving the data."
+ << std::endl;
+ std::cerr << "-i\t<stats_interval>\t\t"
+ << "Show the statistics every <stats_interval> milliseconds."
+ << std::endl;
+ std::cerr << "-c\t<certificate_path>\t\t"
+ << "Path of the producer certificate to be used for verifying the "
+ "origin of the packets received."
+ << std::endl;
+ std::cerr << "-k\t<passphrase>\t\t\t"
+ << "String from which is derived the symmetric key used by the "
+ "producer to sign packets and by the consumer to verify them."
+ << std::endl;
+ std::cerr << "-t\t\t\t\t\t"
+ "Test mode, check if the client is receiving the "
+ "correct data. This is an RTC specific option, to be "
+ "used with the -R (default false)"
+ << std::endl;
+ std::cerr << "-P\t\t\t\t\t"
+ << "Prefix of the producer where to do the handshake" << std::endl;
+ std::cerr << "-j\t<relay_name>\t\t\t"
+ << "Publish the received content under the name relay_name."
+ "This is an RTC specific option, to be "
+ "used with the -R (default false)"
+ << std::endl;
+ std::cerr << "-g\t<port>\t\t\t"
+ << "output stream to localhost at the specified port" << std::endl;
+}
+
+int main(int argc, char *argv[]) {
+#ifndef _WIN32
+ // Common
+ bool daemon = false;
+#else
+ WSADATA wsaData = {0};
+ WSAStartup(MAKEWORD(2, 2), &wsaData);
+#endif
+
+ // -1 server, 0 undefined, 1 client
+ int role = 0;
+ int options = 0;
+
+ char *log_file = nullptr;
+ transport::interface::global_config::IoModuleConfiguration config;
+ std::string conf_file;
+ config.name = "hicnlight_module";
+
+ // Consumer
+ ClientConfiguration client_configuration;
+
+ // Producer
+ ServerConfiguration server_configuration;
+
+ int opt;
+#ifndef _WIN32
+ while ((opt = getopt(
+ argc, argv,
+ "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) !=
+ -1) {
+ switch (opt) {
+ // Common
+ case 'D': {
+ daemon = true;
+ break;
+ }
+ case 'I': {
+ server_configuration.interactive_ = true;
+ server_configuration.trace_based_ = false;
+ server_configuration.input_stream_mode_ = false;
+ break;
+ }
+ case 'T': {
+ server_configuration.interactive_ = false;
+ server_configuration.trace_based_ = true;
+ server_configuration.input_stream_mode_ = false;
+ server_configuration.trace_file_ = optarg;
+ break;
+ }
+ case 'G': {
+ server_configuration.interactive_ = false;
+ server_configuration.trace_based_ = false;
+ server_configuration.input_stream_mode_ = true;
+ server_configuration.port_ = std::stoul(optarg);
+ break;
+ }
+ case 'g': {
+ client_configuration.output_stream_mode_ = true;
+ client_configuration.port_ = std::stoul(optarg);
+ break;
+ }
+#else
+ while ((opt = getopt(argc, argv,
+ "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) !=
+ -1) {
+ switch (opt) {
+#endif
+ case 'f': {
+ log_file = optarg;
+ break;
+ }
+ case 'R': {
+ client_configuration.rtc_ = true;
+ server_configuration.rtc_ = true;
+ break;
+ }
+ case 'z': {
+ config.name = optarg;
+ break;
+ }
+ case 'F': {
+ conf_file = optarg;
+ break;
+ }
+
+ // Server or Client
+ case 'S': {
+ role -= 1;
+ break;
+ }
+ case 'C': {
+ role += 1;
+ break;
+ }
+ case 'k': {
+ server_configuration.passphrase = std::string(optarg);
+ client_configuration.passphrase = std::string(optarg);
+ break;
+ }
+
+ // Client specifc
+ case 'b': {
+ client_configuration.beta = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'd': {
+ client_configuration.drop_factor = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'W': {
+ client_configuration.window = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'M': {
+ client_configuration.receive_buffer_size_ = std::stoull(optarg);
+ options = 1;
+ break;
+ }
+ case 'P': {
+ client_configuration.producer_prefix_ = Prefix(optarg);
+ client_configuration.secure_ = true;
+ break;
+ }
+ case 'c': {
+ client_configuration.producer_certificate = std::string(optarg);
+ options = 1;
+ break;
+ }
+ case 'i': {
+ client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 't': {
+ client_configuration.test_mode_ = true;
+ options = 1;
+ break;
+ }
+ case 'L': {
+ client_configuration.interest_lifetime_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'j': {
+ client_configuration.relay_ = true;
+ client_configuration.relay_name_ = Prefix(optarg);
+ options = 1;
+ break;
+ }
+ // Server specific
+ case 'A': {
+ server_configuration.download_size = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 's': {
+ server_configuration.payload_size_ = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 'r': {
+ server_configuration.virtual_producer = false;
+ options = -1;
+ break;
+ }
+ case 'm': {
+ server_configuration.manifest = true;
+ options = -1;
+ break;
+ }
+ case 'l': {
+ server_configuration.live_production = true;
+ options = -1;
+ break;
+ }
+ case 'K': {
+ server_configuration.keystore_name = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'y': {
+ if (strncasecmp(optarg, "sha256", 6) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::SHA256;
+ } else if (strncasecmp(optarg, "sha512", 6) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::SHA512;
+ } else if (strncasecmp(optarg, "blake2b512", 10) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512;
+ } else if (strncasecmp(optarg, "blake2s256", 10) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256;
+ } else {
+ std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
+ << std::endl;
+ }
+ options = -1;
+ break;
+ }
+ case 'p': {
+ server_configuration.keystore_password = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'x': {
+ server_configuration.multiphase_produce_ = true;
+ options = -1;
+ break;
+ }
+ case 'B': {
+ auto str = std::string(optarg);
+ std::transform(str.begin(), str.end(), str.begin(), ::tolower);
+ server_configuration.production_rate_ = str;
+ options = -1;
+ break;
+ }
+ case 'E': {
+ server_configuration.keystore_name = std::string(optarg);
+ server_configuration.secure_ = true;
+ break;
+ }
+ case 'h':
+ default:
+ usage();
+ return EXIT_FAILURE;
+ }
+ }
+
+ if (options > 0 && role < 0) {
+ std::cerr << "Client options cannot be used when using the "
+ "software in server mode"
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else if (options < 0 && role > 0) {
+ std::cerr << "Server options cannot be used when using the "
+ "software in client mode"
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else if (!role) {
+ std::cerr << "Please specify if running hiperf as client "
+ "or server."
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ }
+
+ if (argv[optind] == 0) {
+ std::cerr << "Please specify the name/prefix to use." << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else {
+ if (role > 0) {
+ client_configuration.name = Name(argv[optind]);
+ } else {
+ server_configuration.name = Prefix(argv[optind]);
+ }
+ }
+
+ if (log_file) {
+#ifndef _WIN32
+ int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR);
+ dup2(fd, STDOUT_FILENO);
+ dup2(STDOUT_FILENO, STDERR_FILENO);
+ close(fd);
+#else
+ int fd =
+ _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD);
+ _dup2(fd, _fileno(stdout));
+ _dup2(_fileno(stdout), _fileno(stderr));
+ _close(fd);
+#endif
+ }
+
+#ifndef _WIN32
+ if (daemon) {
+ utils::Daemonizator::daemonize(false);
+ }
+#endif
+
+ /**
+ * IO module configuration
+ */
+ config.set();
+
+ // Parse config file
+ transport::interface::global_config::parseConfigurationFile(conf_file);
+
+ if (role > 0) {
+ HIperfClient c(client_configuration);
+ if (c.setup() != ERROR_SETUP) {
+ c.run();
+ }
+ } else if (role < 0) {
+ HIperfServer s(server_configuration);
+ if (s.setup() != ERROR_SETUP) {
+ s.run();
+ }
+ } else {
+ usage();
+ return EXIT_FAILURE;
+ }
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+
+ return 0;
+}
+
+} // namespace hiperf
+
+int main(int argc, char *argv[]) { return hiperf::main(argc, argv); }
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
new file mode 100644
index 000000000..968d42e2c
--- /dev/null
+++ b/apps/hiperf/src/server.cc
@@ -0,0 +1,516 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <server.h>
+
+namespace hiperf {
+
+/**
+ * Hiperf server class: configure and setup an hicn producer following the
+ * ServerConfiguration.
+ */
+class HIperfServer::Impl {
+ const std::size_t log2_content_object_buffer_size = 8;
+
+ public:
+ Impl(const hiperf::ServerConfiguration &conf)
+ : configuration_(conf),
+ signals_(io_service_),
+ rtc_timer_(io_service_),
+ unsatisfied_interests_(),
+ content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
+ content_objects_index_(0),
+ mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
+ last_segment_(0),
+#ifndef _WIN32
+ ptr_last_segment_(&last_segment_),
+ input_(io_service_),
+ rtc_running_(false),
+#else
+ ptr_last_segment_(&last_segment_),
+#endif
+ flow_name_(configuration_.name.getName()),
+ socket_(io_service_),
+ recv_buffer_(nullptr, 0) {
+ std::string buffer(configuration_.payload_size_, 'X');
+ std::cout << "Producing contents under name " << conf.name.getName()
+ << std::endl;
+#ifndef _WIN32
+ if (configuration_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
+ content_objects_[i] = ContentObject::Ptr(
+ new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
+ (const uint8_t *)buffer.data(), buffer.size()));
+ content_objects_[i]->setLifetime(
+ default_values::content_object_expiry_time);
+ }
+ }
+
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & mask_]->setName(
+ interest.getName());
+ producer_socket_->produce(
+ *content_objects_[content_objects_index_++ & mask_]);
+ }
+
+ void processInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 5000000_U32);
+
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ std::cout << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 5000000_U32);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ ptr_last_segment_ = &last_segment_;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either be the one from the received interest or the
+ // smallest suffix of a previous interest not satisfed
+ if (!unsatisfied_interests_.empty()) {
+ auto it =
+ std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), *ptr_last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
+ }
+
+ std::cout << "Received interest " << interest.getName().getSuffix()
+ << ", starting production at " << suffix << std::endl;
+ std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
+ }
+
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) {
+ auto b = utils::MemBuf::create(configuration_.download_size);
+ std::memset(b->writableData(), '?', configuration_.download_size);
+ b->append(configuration_.download_size);
+ uint32_t total;
+
+ utils::TimePoint t0 = utils::SteadyClock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::TimePoint t1 = utils::SteadyClock::now();
+
+ std::cout
+ << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count()
+ << " us)" << std::endl;
+ }
+
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ auto b = utils::MemBuf::create(configuration_.download_size);
+ std::memset(b->writableData(), '?', configuration_.download_size);
+ b->append(configuration_.download_size);
+
+ p.asyncProduce(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix,
+ &ptr_last_segment_);
+ }
+
+ void cacheMiss(ProducerSocket &p, const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
+ }
+
+ void onContentProduced(ProducerSocket &p, const std::error_code &err,
+ uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &Impl::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
+
+ std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path,
+ std::string &keystore_pwd,
+ CryptoHashType &hash_type) {
+ if (access(keystore_path.c_str(), F_OK) != -1) {
+ return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type);
+ }
+ return std::make_shared<Identity>(keystore_path, keystore_pwd,
+ CryptoSuite::RSA_SHA256, 1024, 365,
+ "producer-test");
+ }
+
+ int setup() {
+ int ret;
+ int production_protocol;
+
+ if (configuration_.secure_) {
+ auto identity = getProducerIdentity(configuration_.keystore_name,
+ configuration_.keystore_password,
+ configuration_.hash_algorithm);
+ producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
+ configuration_.rtc_, identity);
+ } else {
+ if (!configuration_.rtc_) {
+ production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
+ } else {
+ production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ }
+
+ producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase.empty()) {
+ std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>(
+ CryptoSuite::HMAC_SHA256, configuration_.passphrase);
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ }
+
+ if (!configuration_.keystore_name.empty()) {
+ auto identity = getProducerIdentity(configuration_.keystore_name,
+ configuration_.keystore_password,
+ configuration_.hash_algorithm);
+ std::shared_ptr<Signer> signer = identity->getSigner();
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ }
+
+ uint32_t rtc_header_size = 0;
+ if (configuration_.rtc_) rtc_header_size = 12;
+ producer_socket_->setSocketOption(
+ GeneralTransportOptions::DATA_PACKET_SIZE,
+ (uint32_t)(
+ configuration_.payload_size_ + rtc_header_size +
+ (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60)));
+ producer_socket_->registerPrefix(configuration_.name);
+ producer_socket_->connect();
+
+ if (configuration_.rtc_) {
+ std::cout << "Running RTC producer: the prefix length will be ignored."
+ " Use /128 by default in RTC mode"
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ if (!configuration_.virtual_producer) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production) {
+ produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+ } else {
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &Impl::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+
+ return ERROR_SUCCESS;
+ }
+
+ void receiveStream() {
+ socket_.async_receive_from(
+ asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
+ [this](std::error_code ec, std::size_t length) {
+ if (ec) return;
+ sendRTCContentFromStream(recv_buffer_.first, length);
+ receiveStream();
+ });
+ }
+
+ void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+ // this is used to compute the data packet delay
+ // Used only for performance evaluation
+ // It requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ uint8_t *start = (uint8_t *)payload->writableData();
+ std::memcpy(start, &now, sizeof(uint64_t));
+ std::memcpy(start + sizeof(uint64_t), buff, len);
+ producer_socket_->produceDatagram(flow_name_, start,
+ len + sizeof(uint64_t));
+ }
+
+ void sendRTCContentObjectCallback(std::error_code ec) {
+ if (ec) return;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ // this is used to compute the data packet delay
+ // Used only for performance evaluation
+ // It requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ producer_socket_->produceDatagram(
+ flow_name_, payload->data(),
+ payload->length() < 1400 ? payload->length() : 1400);
+ }
+
+ void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ if (ec) return;
+
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ uint32_t packet_len =
+ configuration_.trace_[configuration_.trace_index_].size;
+
+ // this is used to compute the data packet delay
+ // used only for performance evaluation
+ // it requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ if (packet_len > payload->length()) packet_len = payload->length();
+ if (packet_len > 1400) packet_len = 1400;
+
+ producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+
+ uint32_t next_index = configuration_.trace_index_ + 1;
+ uint64_t schedule_next;
+ if (next_index < configuration_.trace_.size()) {
+ schedule_next =
+ configuration_.trace_[next_index].timestamp -
+ configuration_.trace_[configuration_.trace_index_].timestamp;
+ } else {
+ // here we need to loop, schedule in a random time
+ schedule_next = 1000;
+ }
+
+ configuration_.trace_index_ =
+ (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
+ rtc_timer_.async_wait(
+ std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
+ std::placeholders::_1));
+ }
+
+#ifndef _WIN32
+ void handleInput(const std::error_code &error, std::size_t length) {
+ if (error) {
+ producer_socket_->stop();
+ io_service_.stop();
+ }
+
+ if (rtc_running_) {
+ std::cout << "stop real time content production" << std::endl;
+ rtc_running_ = false;
+ rtc_timer_.cancel();
+ } else {
+ std::cout << "start real time content production" << std::endl;
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+
+ input_buffer_.consume(length); // Remove newline from input.
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&Impl::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ }
+#endif
+
+ int parseTraceFile() {
+ std::ifstream trace(configuration_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ configuration_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
+ int run() {
+ std::cerr << "Starting to serve consumers" << std::endl;
+
+ signals_.add(SIGINT);
+ signals_.async_wait([this](const std::error_code &, const int &) {
+ std::cout << "STOPPING!!" << std::endl;
+ producer_socket_->stop();
+ io_service_.stop();
+ });
+
+ if (configuration_.rtc_) {
+#ifndef _WIN32
+ if (configuration_.interactive_) {
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&Impl::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ } else if (configuration_.trace_based_) {
+ std::cout << "trace-based mode enabled" << std::endl;
+ if (configuration_.trace_file_ == nullptr) {
+ std::cout << "cannot find the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ if (parseTraceFile() < 0) {
+ std::cout << "cannot parse the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(std::chrono::milliseconds(1));
+ rtc_timer_.async_wait(
+ std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
+ std::placeholders::_1));
+ } else if (configuration_.input_stream_mode_) {
+ rtc_running_ = true;
+ // crate socket
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ socket_.bind(remote_);
+ recv_buffer_.first = (uint8_t *)malloc(1500);
+ recv_buffer_.second = 1500;
+ receiveStream();
+ } else {
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
+ this, std::placeholders::_1));
+ }
+#else
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+#endif
+ }
+
+ io_service_.run();
+
+ return ERROR_SUCCESS;
+ }
+
+ private:
+ hiperf::ServerConfiguration configuration_;
+ asio::io_service io_service_;
+ asio::signal_set signals_;
+ asio::steady_timer rtc_timer_;
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::vector<std::shared_ptr<ContentObject>> content_objects_;
+ std::uint16_t content_objects_index_;
+ std::uint16_t mask_;
+ std::uint32_t last_segment_;
+ std::uint32_t *ptr_last_segment_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+#ifndef _WIN32
+ asio::posix::stream_descriptor input_;
+ asio::streambuf input_buffer_;
+ bool rtc_running_;
+ Name flow_name_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::endpoint remote_;
+ std::pair<uint8_t *, std::size_t> recv_buffer_;
+#endif
+};
+
+HIperfServer::HIperfServer(const ServerConfiguration &conf) {
+ impl_ = new Impl(conf);
+}
+
+HIperfServer::~HIperfServer() { delete impl_; }
+
+int HIperfServer::setup() { return impl_->setup(); }
+
+void HIperfServer::run() { impl_->run(); }
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
new file mode 100644
index 000000000..05407a807
--- /dev/null
+++ b/apps/hiperf/src/server.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <common.h>
+
+namespace hiperf {
+
+class HIperfServer {
+ public:
+ HIperfServer(const ServerConfiguration &conf);
+ ~HIperfServer();
+ int setup();
+ void run();
+
+ private:
+ class Impl;
+ Impl *impl_;
+};
+
+} // namespace hiperf \ No newline at end of file