aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMauro <you@example.com>2021-06-30 07:57:22 +0000
committerMauro Sardara <msardara@cisco.com>2021-07-06 16:16:04 +0000
commit08233d44a6cfde878d7e10bca38ae935ed1c8fd5 (patch)
tree7ecc534d55bdc7e8dd15ecab084720910bcdf4d9 /apps
parent147ba39bed26887f5eba84757e2463ab8e370a9a (diff)
[HICN-713] Transport Library Major Refactoring 2
Co-authored-by: Luca Muscariello <muscariello@ieee.org> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Signed-off-by: Mauro Sardara <msardara@cisco.com> Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2
Diffstat (limited to 'apps')
-rw-r--r--apps/CMakeLists.txt35
-rw-r--r--apps/cmake/Modules/Packaging.cmake8
-rw-r--r--apps/higet/CMakeLists.txt10
-rw-r--r--apps/hiperf/CMakeLists.txt44
-rw-r--r--apps/hiperf/src/client.cc900
-rw-r--r--apps/hiperf/src/client.h34
-rw-r--r--apps/hiperf/src/common.h217
-rw-r--r--apps/hiperf/src/forwarder_config.h97
-rw-r--r--apps/hiperf/src/forwarder_interface.cc676
-rw-r--r--apps/hiperf/src/forwarder_interface.h131
-rw-r--r--apps/hiperf/src/main.cc456
-rw-r--r--apps/hiperf/src/server.cc516
-rw-r--r--apps/hiperf/src/server.h34
-rw-r--r--apps/http-proxy/CMakeLists.txt12
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt2
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h16
-rw-r--r--apps/http-proxy/src/forwarder_interface.cc4
-rw-r--r--apps/http-proxy/src/http_proxy.cc66
-rw-r--r--apps/http-proxy/src/http_session.cc55
-rw-r--r--apps/http-proxy/src/icn_receiver.cc24
-rw-r--r--apps/ping/.clang-format14
-rw-r--r--apps/ping/CMakeLists.txt39
-rw-r--r--apps/ping/src/ping_client.cc441
-rw-r--r--apps/ping/src/ping_server.cc340
24 files changed, 4058 insertions, 113 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index 37e44f9e7..df1b9fc7c 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 14)
project(apps)
@@ -22,25 +22,32 @@ set(CMAKE_MODULE_PATH
"${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules"
)
+if (NOT CMAKE_BUILD_TYPE)
+ message(STATUS "${PROJECT_NAME}: No build type selected, default to Release")
+ set(CMAKE_BUILD_TYPE "Release")
+endif ()
+
include(BuildMacros)
include(WindowsMacros)
set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE)
+find_package(Threads REQUIRED)
+find_package(Libconfig++ REQUIRED)
+
if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR)
find_package(Libtransport REQUIRED)
+ find_package(Libhicn REQUIRED)
find_package(hicnctrl REQUIRED)
- find_package(Threads REQUIRED)
else()
if (DISABLE_SHARED_LIBRARIES)
find_package(OpenSSL REQUIRED)
- if (NOT WIN32)
- find_package(ZLIB REQUIRED)
- endif ()
set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC})
+ set(LIBHICN_LIBRARIES ${LIBHICN_STATIC})
set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC})
else ()
set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED})
+ set(LIBHICN_LIBRARIES ${LIBHICN_SHARED})
set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED})
endif ()
@@ -49,28 +56,20 @@ else()
)
endif()
-# Worksroung for unresolved symbols in vpp libraries
-if(${CMAKE_SYSTEM_NAME} MATCHES Linux)
- set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs")
-endif()
-
-list(APPEND LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${LIBHICNCTRL_LIBRARIES}
- ${OPENSSL_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
-)
-
-set(APPS_LIBRARY_LIST "${OPENSSL_LIBRARIES};${CMAKE_THREAD_LIBS_INIT}" CACHE INTERNAL "APPS_LIBRARY_LIST")
if (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996")
endif ()
include(Packaging)
+add_subdirectory(ping)
+add_subdirectory(hiperf)
+
set(HIGET higet)
set(HTTP_PROXY hicn-http-proxy)
+
if (NOT WIN32)
add_subdirectory(http-proxy)
endif ()
+
add_subdirectory(higet)
diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/Modules/Packaging.cmake
index 9b3fa2e72..981eb728e 100644
--- a/apps/cmake/Modules/Packaging.cmake
+++ b/apps/cmake/Modules/Packaging.cmake
@@ -18,21 +18,21 @@ useful for testing and debugging within a hicn network."
)
set(${HICN_APPS}_DEB_DEPENDENCIES
- "lib${LIBTRANSPORT} (>= stable_version)"
+ "lib${LIBTRANSPORT} (>= stable_version), lib${LIBHICNCTRL} (>= stable_version)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}-dev_DEB_DEPENDENCIES
- "lib${LIBTRANSPORT}-dev (>= stable_version)"
+ "${HICN_APPS} (>= stable_version), lib${LIBTRANSPORT}-dev (>= stable_version), lib${LIBHICNCTRL}-dev (>= stable_version)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}_RPM_DEPENDENCIES
- "lib${LIBTRANSPORT} >= stable_version"
+ "lib${LIBTRANSPORT} >= stable_version, lib${LIBHICNCTRL} >= stable_version"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}-dev_RPM_DEPENDENCIES
- "lib${LIBTRANSPORT}-devel >= stable_version"
+ "${HICN_APPS} >= stable_version, lib${LIBTRANSPORT}-dev >= stable_version, lib${LIBHICNCTRL}-dev >= stable_version"
CACHE STRING "Dependencies for deb/rpm package."
) \ No newline at end of file
diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt
index b929a24e4..8d112d3a3 100644
--- a/apps/higet/CMakeLists.txt
+++ b/apps/higet/CMakeLists.txt
@@ -11,11 +11,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 14)
project(utils)
+find_package(Threads REQUIRED)
+
set(CMAKE_MODULE_PATH
${CMAKE_MODULE_PATH}
"${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules"
@@ -38,7 +40,11 @@ endif()
if (NOT DISABLE_EXECUTABLES)
build_executable(${HIGET}
SOURCES ${APPS_SRC}
- LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY}
+ LINK_LIBRARIES
+ ${LIBTRANSPORT_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${WSOCK32_LIBRARY}
+ ${WS2_32_LIBRARY}
DEPENDS ${LIBTRANSPORT_LIBRARIES}
COMPONENT ${HICN_APPS}
DEFINITIONS ${COMPILER_DEFINITIONS}
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt
new file mode 100644
index 000000000..564525e67
--- /dev/null
+++ b/apps/hiperf/CMakeLists.txt
@@ -0,0 +1,44 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if (NOT DISABLE_EXECUTABLES)
+ list(APPEND HIPERF_SRC
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc
+ )
+
+ list (APPEND HIPERF_LIBRARIES
+ ${LIBTRANSPORT_LIBRARIES}
+ ${LIBHICNCTRL_LIBRARIES}
+ ${LIBHICN_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${LIBCONFIG_CPP_LIBRARIES}
+ ${WSOCK32_LIBRARY}
+ ${WS2_32_LIBRARY}
+ )
+
+ build_executable(hiperf
+ SOURCES ${HIPERF_SRC}
+ LINK_LIBRARIES ${HIPERF_LIBRARIES}
+ INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${LIBTRANSPORT_INCLUDE_DIRS}
+ ${LIBHICNCTRL_INCLUDE_DIRS}
+ ${LIBCONFIG_CPP_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT ${HICN_APPS}
+ LINK_FLAGS ${LINK_FLAGS}
+ )
+endif() \ No newline at end of file
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
new file mode 100644
index 000000000..820ebf0ce
--- /dev/null
+++ b/apps/hiperf/src/client.cc
@@ -0,0 +1,900 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <client.h>
+#include <forwarder_config.h>
+#include <forwarder_interface.h>
+
+#include <libconfig.h++>
+
+namespace hiperf {
+
+/**
+ * Forward declaration of client Read callbacks.
+ */
+class RTCCallback;
+class Callback;
+
+/**
+ * Hiperf client class: configure and setup an hicn consumer following the
+ * ClientConfiguration.
+ */
+class HIperfClient::Impl
+#ifdef FORWARDER_INTERFACE
+ : ForwarderInterface::ICallback
+#endif
+{
+ typedef std::chrono::time_point<std::chrono::steady_clock> Time;
+ typedef std::chrono::microseconds TimeDuration;
+
+ friend class Callback;
+ friend class RTCCallback;
+
+ struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+ };
+
+ public:
+ Impl(const hiperf::ClientConfiguration &conf)
+ : configuration_(conf),
+ total_duration_milliseconds_(0),
+ old_bytes_value_(0),
+ old_interest_tx_value_(0),
+ old_fec_interest_tx_value_(0),
+ old_fec_data_rx_value_(0),
+ old_lost_data_value_(0),
+ old_bytes_recovered_value_(0),
+ old_definitely_lost_data_value_(0),
+ old_retx_value_(0),
+ old_sent_int_value_(0),
+ old_received_nacks_value_(0),
+ old_fec_pkt_(0),
+ avg_data_delay_(0),
+ delay_sample_(0),
+ received_bytes_(0),
+ received_data_pkt_(0),
+ data_delays_(""),
+ signals_(io_service_),
+ rtc_callback_(*this),
+ callback_(*this),
+ socket_(io_service_),
+ done_(false),
+ switch_threshold_(~0)
+#ifdef FORWARDER_INTERFACE
+ ,
+ forwarder_interface_(io_service_, this)
+#endif
+ {
+ }
+
+ ~Impl() {}
+
+ void checkReceivedRtcContent(ConsumerSocket &c,
+ const ContentObject &contentObject) {}
+
+ void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
+
+ void addFace(const std::string &local_address, uint16_t local_port,
+ const std::string &remote_address, uint16_t remote_port,
+ std::string interface);
+
+ void handleTimerExpiration(ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 15;
+
+ utils::TimePoint t2 = utils::SteadyClock::now();
+ auto exact_duration =
+ std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+
+ std::stringstream interval;
+ interval << total_duration_milliseconds_ / 1000 << "-"
+ << total_duration_milliseconds_ / 1000 +
+ exact_duration.count() / 1000;
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
+ << std::setfill(separator) << "[MB]";
+
+ std::stringstream bandwidth;
+ bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator) << "[Mbps]";
+
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator)
+ << "[Int]";
+
+ std::stringstream avg_rtt;
+ avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+
+ if (configuration_.rtc_) {
+ // we get rtc stats more often, thus we need ms in the interval
+ std::stringstream interval_ms;
+ interval_ms << total_duration_milliseconds_ << "-"
+ << total_duration_milliseconds_ + exact_duration.count();
+
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - old_lost_data_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ old_bytes_recovered_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ old_definitely_lost_data_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream data_delay;
+ data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << received_data_pkt_ << std::setfill(separator)
+ << "[pkt]";
+
+ std::stringstream goodput;
+ goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
+ << std::setfill(separator) << "[Mbps]";
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator)
+ << "[%]";
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - old_retx_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() - old_sent_int_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() - old_received_nacks_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream queuing_delay;
+ queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
+ << "[ms]";
+
+#ifdef FORWARDER_INTERFACE
+ if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
+ total_duration_milliseconds_ > 1000) {
+ std::cout << "Switching due to queuing delay" << std::endl;
+ forwarder_interface_.createFaceAndRoutes(backup_routes_);
+ forwarder_interface_.deleteFaceAndRoutes(main_routes_);
+ std::swap(backup_routes_, main_routes_);
+ done_ = true;
+ }
+#endif
+
+ // statistics not yet available in the transport
+ // std::stringstream interest_fec_tx;
+ // interest_fec_tx << stats.getInterestFecTxCount() -
+ // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
+ // std::stringstream bytes_fec_recv;
+ // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
+ // << std::setfill(separator) << "[bytes]";
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "RecvData";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Goodput";
+ std::cout << std::left << std::setw(width) << "LossRate";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "InterestSent";
+ std::cout << std::left << std::setw(width) << "ReceivedNacks";
+ std::cout << std::left << std::setw(width) << "SyncWnd";
+ std::cout << std::left << std::setw(width) << "MinRtt";
+ std::cout << std::left << std::setw(width) << "QueuingDelay";
+ std::cout << std::left << std::setw(width) << "LostData";
+ std::cout << std::left << std::setw(width) << "RecoveredData";
+ std::cout << std::left << std::setw(width) << "DefinitelyLost";
+ std::cout << std::left << std::setw(width) << "State";
+ std::cout << std::left << std::setw(width) << "DataDelay";
+ std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval_ms.str();
+ std::cout << std::left << std::setw(width) << received_data_pkt.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << goodput.str();
+ std::cout << std::left << std::setw(width) << loss_rate.str();
+ std::cout << std::left << std::setw(width) << retx_sent.str();
+ std::cout << std::left << std::setw(width) << interest_sent.str();
+ std::cout << std::left << std::setw(width) << nacks.str();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str();
+ std::cout << std::left << std::setw(width) << queuing_delay.str();
+ std::cout << std::left << std::setw(width) << lost_data.str();
+ std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
+ std::cout << std::left << std::setw(width) << definitely_lost_data.str();
+ std::cout << std::left << std::setw(width) << stats.getCCStatus();
+ std::cout << std::left << std::setw(width) << data_delay.str();
+ std::cout << std::left << std::setw(width) << fec_pkt.str();
+ std::cout << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (data_delays_.size() > 0) data_delays_.pop_back();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
+ << std::endl;
+ }
+
+ // statistics not yet available in the transport
+ // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
+ // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
+ } else {
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "Transfer";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "Cwnd";
+ std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval.str();
+ std::cout << std::left << std::setw(width) << bytes_transferred.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << stats.getRetxCount();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
+ std::cout << std::endl;
+ }
+ total_duration_milliseconds_ += (uint32_t)exact_duration.count();
+ old_bytes_value_ = stats.getBytesRecv();
+ old_lost_data_value_ = stats.getLostData();
+ old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
+ old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ old_retx_value_ = stats.getRetxCount();
+ old_sent_int_value_ = stats.getInterestTx();
+ old_received_nacks_value_ = stats.getReceivedNacks();
+ old_fec_pkt_ = stats.getReceivedFEC();
+ delay_sample_ = 0;
+ avg_data_delay_ = 0;
+ received_bytes_ = 0;
+ received_data_pkt_ = 0;
+ data_delays_ = "";
+
+ t_stats_ = utils::SteadyClock::now();
+ }
+
+ bool parseConfig(const char *conf_file) {
+ using namespace libconfig;
+ Config cfg;
+
+ try {
+ cfg.readFile(conf_file);
+ } catch (const FileIOException &fioex) {
+ std::cerr << "I/O error while reading file." << std::endl;
+ return false;
+ } catch (const ParseException &pex) {
+ std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
+ << " - " << pex.getError() << std::endl;
+ return false;
+ }
+
+ Setting &config = cfg.getRoot();
+
+ if (config.exists("switch_threshold")) {
+ unsigned threshold;
+ config.lookupValue("switch_threshold", threshold);
+ switch_threshold_ = threshold;
+ }
+
+ // listeners
+ if (config.exists("listeners")) {
+ // get path where looking for modules
+ const Setting &listeners = config.lookup("listeners");
+ auto count = listeners.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &listener = listeners[i];
+ ListenerConfig list;
+ unsigned port;
+ std::string interface;
+
+ list.name = listener.getName();
+ listener.lookupValue("local_address", list.address);
+ listener.lookupValue("local_port", port);
+ listener.lookupValue("interface", list.interface);
+ list.port = (uint16_t)(port);
+
+ std::cout << "Adding listener " << list.name << ", ( " << list.address
+ << ":" << list.port << ")" << std::endl;
+ config_.addListener(std::move(list));
+ }
+ }
+
+ // connectors
+ if (config.exists("connectors")) {
+ // get path where looking for modules
+ const Setting &connectors = config.lookup("connectors");
+ auto count = connectors.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &connector = connectors[i];
+ ConnectorConfig conn;
+
+ conn.name = connector.getName();
+ unsigned port = 0;
+
+ if (!connector.lookupValue("local_address", conn.local_address)) {
+ conn.local_address = "";
+ }
+
+ if (!connector.lookupValue("local_port", port)) {
+ port = 0;
+ }
+
+ conn.local_port = (uint16_t)(port);
+
+ if (!connector.lookupValue("remote_address", conn.remote_address)) {
+ std::cerr
+ << "Error in configuration file: remote_address is a mandatory "
+ "field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ if (!connector.lookupValue("remote_port", port)) {
+ std::cerr << "Error in configuration file: remote_port is a "
+ "mandatory field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ if (!connector.lookupValue("interface", conn.interface)) {
+ std::cerr << "Error in configuration file: interface is a "
+ "mandatory field of Connectors."
+ << std::endl;
+ return false;
+ }
+
+ conn.remote_port = (uint16_t)(port);
+
+ std::cout << "Adding connector " << conn.name << ", ("
+ << conn.local_address << ":" << conn.local_port << " "
+ << conn.remote_address << ":" << conn.remote_port << ")"
+ << std::endl;
+ config_.addConnector(conn.name, std::move(conn));
+ }
+ }
+
+ // Routes
+ if (config.exists("routes")) {
+ const Setting &routes = config.lookup("routes");
+ auto count = routes.getLength();
+
+ for (int i = 0; i < count; i++) {
+ const Setting &route = routes[i];
+ RouteConfig r;
+ unsigned weight;
+
+ r.name = route.getName();
+ route.lookupValue("prefix", r.prefix);
+ route.lookupValue("weight", weight);
+ route.lookupValue("main_connector", r.main_connector);
+ route.lookupValue("backup_connector", r.backup_connector);
+ r.weight = (uint16_t)(weight);
+
+ std::cout << "Adding route " << r.name << " " << r.prefix << " ("
+ << r.main_connector << " " << r.backup_connector << " "
+ << r.weight << ")" << std::endl;
+ config_.addRoute(std::move(r));
+ }
+ }
+
+ std::cout << "Ok" << std::endl;
+
+ return true;
+ }
+
+ bool splitRoute(std::string route, std::string &prefix,
+ uint8_t &prefix_length) {
+ std::string delimiter = "/";
+
+ size_t pos = 0;
+ if ((pos = route.find(delimiter)) != std::string::npos) {
+ prefix = route.substr(0, pos);
+ route.erase(0, pos + delimiter.length());
+ } else {
+ return false;
+ }
+
+ prefix_length = std::stoul(route.substr(0));
+ return true;
+ }
+
+#ifdef FORWARDER_INTERFACE
+ void onHicnServiceReady() override {
+ std::cout << "Successfully connected to local forwarder!" << std::endl;
+
+ std::cout << "Setting up listeners" << std::endl;
+ const char *config = getenv("FORWARDER_CONFIG");
+
+ if (config) {
+ if (!parseConfig(config)) {
+ return;
+ }
+
+ // Create faces and route using first face in the list.
+ auto &routes = config_.getRoutes();
+ auto &connectors = config_.getConnectors();
+
+ if (routes.size() == 0 || connectors.size() == 0) {
+ std::cerr << "Nothing to configure" << std::endl;
+ return;
+ }
+
+ for (auto &route : routes) {
+ auto the_connector_it = connectors.find(route.main_connector);
+ if (the_connector_it == connectors.end()) {
+ std::cerr << "No valid main connector found for route " << route.name
+ << std::endl;
+ continue;
+ }
+
+ auto &the_connector = the_connector_it->second;
+ auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
+ route_info->family = AF_INET;
+ route_info->local_addr = the_connector.local_address;
+ route_info->local_port = the_connector.local_port;
+ route_info->remote_addr = the_connector.remote_address;
+ route_info->remote_port = the_connector.remote_port;
+ route_info->interface = the_connector.interface;
+ route_info->name = the_connector.name;
+
+ std::string prefix;
+ uint8_t prefix_length;
+ auto ret = splitRoute(route.prefix, prefix, prefix_length);
+
+ if (!ret) {
+ std::cerr << "Error parsing route" << std::endl;
+ return;
+ }
+
+ route_info->route_addr = prefix;
+ route_info->route_len = prefix_length;
+
+ main_routes_.emplace_back(route_info);
+
+ if (!route.backup_connector.empty()) {
+ // Add also the backup route
+ auto the_backup_connector_it =
+ connectors.find(route.backup_connector);
+ if (the_backup_connector_it == connectors.end()) {
+ std::cerr << "No valid backup connector found for route "
+ << route.name << std::endl;
+ continue;
+ }
+
+ auto &the_backup_connector = the_backup_connector_it->second;
+ auto backup_route_info =
+ std::make_shared<ForwarderInterface::RouteInfo>();
+ backup_route_info->family = AF_INET;
+ backup_route_info->local_addr = the_backup_connector.local_address;
+ backup_route_info->local_port = the_backup_connector.local_port;
+ backup_route_info->remote_addr = the_backup_connector.remote_address;
+ backup_route_info->remote_port = the_backup_connector.remote_port;
+ backup_route_info->interface = the_backup_connector.interface;
+ backup_route_info->name = the_backup_connector.name;
+
+ std::string prefix;
+ uint8_t prefix_length;
+ auto ret = splitRoute(route.prefix, prefix, prefix_length);
+
+ if (!ret) {
+ std::cerr << "Error parsing route" << std::endl;
+ return;
+ }
+
+ backup_route_info->route_addr = prefix;
+ backup_route_info->route_len = prefix_length;
+
+ backup_routes_.emplace_back(backup_route_info);
+ }
+ }
+
+ // Create main routes
+ std::cout << "Creating main routes" << std::endl;
+ forwarder_interface_.createFaceAndRoutes(main_routes_);
+ }
+ }
+
+ void onRouteConfigured(
+ std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
+ std::cout << "Routes successfully configured!" << std::endl;
+ }
+#endif
+
+ int setup() {
+ int ret;
+
+ if (configuration_.rtc_) {
+ configuration_.transport_protocol_ = RTC;
+ } else if (configuration_.window < 0) {
+ configuration_.transport_protocol_ = RAAQM;
+ } else {
+ configuration_.transport_protocol_ = CBR;
+ }
+
+ if (configuration_.relay_ && configuration_.rtc_) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ }
+
+ if (configuration_.output_stream_mode_ && configuration_.rtc_) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
+
+ if (configuration_.secure_) {
+ consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
+ RAAQM, configuration_.transport_protocol_);
+ if (configuration_.producer_prefix_.getPrefixLength() == 0) {
+ std::cerr << "ERROR -- Missing producer prefix on which perform the "
+ "handshake."
+ << std::endl;
+ } else {
+ P2PSecureConsumerSocket &secure_consumer_socket =
+ *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
+ secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
+ }
+ } else {
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
+ }
+
+ consumer_socket_->setSocketOption(
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
+
+#if defined(DEBUG) && defined(__linux__)
+ std::shared_ptr<transport::BasePortal> portal;
+ consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
+ signals_ =
+ std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
+ signals_->async_wait([this](const std::error_code &, const int &) {
+ std::cout << "Signal SIGUSR1!" << std::endl;
+ mtrace();
+ });
+#endif
+
+ if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
+ configuration_.window) ==
+ SOCKET_OPTION_NOT_SET) {
+ std::cerr << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
+
+ if (configuration_.transport_protocol_ == RAAQM &&
+ configuration_.beta != -1.f) {
+ if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ configuration_.beta) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.transport_protocol_ == RAAQM &&
+ configuration_.drop_factor != -1.f) {
+ if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
+ configuration_.drop_factor) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (!configuration_.producer_certificate.empty()) {
+ std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
+ configuration_.producer_certificate);
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET)
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase.empty()) {
+ std::shared_ptr<Verifier> verifier =
+ std::make_shared<SymmetricVerifier>(configuration_.passphrase);
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET)
+ return ERROR_SETUP;
+ }
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
+ } else {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
+ }
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (ConsumerContentObjectCallback)std::bind(
+ &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
+ std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ if (configuration_.rtc_) {
+ std::shared_ptr<TransportStatistics> transport_stats;
+ consumer_socket_->getSocketOption(
+ OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
+ }
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ consumer_socket_->connect();
+
+ return ERROR_SUCCESS;
+ }
+
+ int run() {
+ std::cout << "Starting download of " << configuration_.name << std::endl;
+
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
+
+ t_download_ = t_stats_ = std::chrono::steady_clock::now();
+ consumer_socket_->asyncConsume(configuration_.name);
+ io_service_.run();
+
+ consumer_socket_->stop();
+
+ return ERROR_SUCCESS;
+ }
+
+ private:
+ class RTCCallback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t mtu = 1500;
+
+ public:
+ RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = mtu;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ client_.received_bytes_ += length;
+ client_.received_data_pkt_++;
+
+ // collecting delay stats. Just for performance testing
+ uint64_t *senderTimeStamp =
+ (uint64_t *)client_.configuration_.receive_buffer->writableData();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ double new_delay = (double)(now - *senderTimeStamp);
+
+ if (*senderTimeStamp > now)
+ new_delay = -1 * (double)(*senderTimeStamp - now);
+
+ client_.delay_sample_++;
+ client_.avg_data_delay_ =
+ client_.avg_data_delay_ +
+ (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+
+ if (client_.configuration_.test_mode_) {
+ client_.data_delays_ += std::to_string(int(new_delay));
+ client_.data_delays_ += ",";
+ }
+
+ if (client_.configuration_.relay_) {
+ client_.producer_socket_->produceDatagram(
+ client_.configuration_.relay_name_.getName(),
+ client_.configuration_.receive_buffer->writableData(),
+ length < 1400 ? length : 1400);
+ }
+ if (client_.configuration_.output_stream_mode_) {
+ uint8_t *start =
+ (uint8_t *)client_.configuration_.receive_buffer->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ }
+ }
+
+ size_t maxBufferSize() const override { return mtu; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error while reading from RTC socket" << std::endl;
+ client_.io_service_.stop();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ std::cout << "Data successfully read" << std::endl;
+ }
+
+ private:
+ Impl &client_;
+ };
+
+ class Callback : public ConsumerSocket::ReadCallback {
+ public:
+ Callback(Impl &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer =
+ utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = client_.configuration_.receive_buffer_size_;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {}
+
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+
+ size_t maxBufferSize() const override {
+ return client_.configuration_.receive_buffer_size_;
+ }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error " << ec.message() << " while reading from socket"
+ << std::endl;
+ client_.io_service_.stop();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ Time t2 = std::chrono::steady_clock::now();
+ TimeDuration dt =
+ std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
+ long usec = (long)dt.count();
+
+ std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
+ << std::endl;
+
+ std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
+ << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
+ << std::endl;
+
+ client_.io_service_.stop();
+ }
+
+ private:
+ Impl &client_;
+ };
+
+ hiperf::ClientConfiguration configuration_;
+ Time t_stats_;
+ Time t_download_;
+ uint32_t total_duration_milliseconds_;
+ uint64_t old_bytes_value_;
+ uint64_t old_interest_tx_value_;
+ uint64_t old_fec_interest_tx_value_;
+ uint64_t old_fec_data_rx_value_;
+ uint64_t old_lost_data_value_;
+ uint64_t old_bytes_recovered_value_;
+ uint64_t old_definitely_lost_data_value_;
+ uint32_t old_retx_value_;
+ uint32_t old_sent_int_value_;
+ uint32_t old_received_nacks_value_;
+ uint32_t old_fec_pkt_;
+
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_;
+ uint32_t delay_sample_;
+
+ uint32_t received_bytes_;
+ uint32_t received_data_pkt_;
+
+ std::string data_delays_;
+
+ asio::io_service io_service_;
+ asio::signal_set signals_;
+ RTCCallback rtc_callback_;
+ Callback callback_;
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::endpoint remote_;
+
+ ForwarderConfiguration config_;
+ uint16_t switch_threshold_; /* ms */
+ bool done_;
+ std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
+ std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
+#ifdef FORWARDER_INTERFACE
+ ForwarderInterface forwarder_interface_;
+#endif
+};
+
+HIperfClient::HIperfClient(const ClientConfiguration &conf) {
+ impl_ = new Impl(conf);
+}
+
+HIperfClient::~HIperfClient() { delete impl_; }
+
+int HIperfClient::setup() { return impl_->setup(); }
+
+void HIperfClient::run() { impl_->run(); }
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
new file mode 100644
index 000000000..f45b9af43
--- /dev/null
+++ b/apps/hiperf/src/client.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <common.h>
+
+namespace hiperf {
+
+class HIperfClient {
+ public:
+ HIperfClient(const ClientConfiguration &conf);
+ ~HIperfClient();
+ int setup();
+ void run();
+
+ private:
+ class Impl;
+ Impl *impl_;
+};
+
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
new file mode 100644
index 000000000..e6ba526f9
--- /dev/null
+++ b/apps/hiperf/src/common.h
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/auth/identity.h>
+#include <hicn/transport/auth/signer.h>
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
+#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/literals.h>
+
+#ifndef _WIN32
+#include <hicn/transport/utils/daemonizator.h>
+#endif
+
+#include <asio.hpp>
+#include <cmath>
+#include <fstream>
+#include <iomanip>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+
+#ifndef ERROR_SUCCESS
+#define ERROR_SUCCESS 0
+#endif
+#define ERROR_SETUP -5
+#define MIN_PROBE_SEQ 0xefffffff
+
+using namespace transport::interface;
+using namespace transport::auth;
+using namespace transport::core;
+
+static inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+static inline uint64_t _htonll(const uint64_t *input) {
+ return (_ntohll(input));
+}
+
+namespace hiperf {
+
+/**
+ * Class for handling the production rate for the RTC producer.
+ */
+class Rate {
+ public:
+ Rate() : rate_kbps_(0) {}
+
+ Rate(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+ }
+
+ Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {}
+
+ Rate &operator=(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+
+ return *this;
+ }
+
+ std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) {
+ return std::chrono::microseconds(
+ (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_));
+ }
+
+ private:
+ float rate_kbps_;
+};
+
+struct packet_t {
+ uint64_t timestamp;
+ uint32_t size;
+};
+
+/**
+ * Container for command line configuration for hiperf client.
+ */
+struct ClientConfiguration {
+ ClientConfiguration()
+ : name("b001::abcd", 0),
+ beta(-1.f),
+ drop_factor(-1.f),
+ window(-1),
+ producer_certificate(""),
+ passphrase(""),
+ receive_buffer(nullptr),
+ receive_buffer_size_(128 * 1024),
+ download_size(0),
+ report_interval_milliseconds_(1000),
+ transport_protocol_(CBR),
+ rtc_(false),
+ test_mode_(false),
+ relay_(false),
+ secure_(false),
+ producer_prefix_(),
+ interest_lifetime_(500),
+ relay_name_("c001::abcd/64"),
+ output_stream_mode_(false),
+ port_(0) {}
+
+ Name name;
+ double beta;
+ double drop_factor;
+ double window;
+ std::string producer_certificate;
+ std::string passphrase;
+ std::shared_ptr<utils::MemBuf> receive_buffer;
+ std::size_t receive_buffer_size_;
+ std::size_t download_size;
+ std::uint32_t report_interval_milliseconds_;
+ TransportProtocolAlgorithms transport_protocol_;
+ bool rtc_;
+ bool test_mode_;
+ bool relay_;
+ bool secure_;
+ Prefix producer_prefix_;
+ uint32_t interest_lifetime_;
+ Prefix relay_name_;
+ bool output_stream_mode_;
+ uint16_t port_;
+};
+
+/**
+ * Container for command line configuration for hiperf server.
+ */
+struct ServerConfiguration {
+ ServerConfiguration()
+ : name("b001::abcd/64"),
+ virtual_producer(true),
+ manifest(false),
+ live_production(false),
+ content_lifetime(600000000_U32),
+ download_size(20 * 1024 * 1024),
+ hash_algorithm(CryptoHashType::SHA256),
+ keystore_name(""),
+ passphrase(""),
+ keystore_password("cisco"),
+ multiphase_produce_(false),
+ rtc_(false),
+ interactive_(false),
+ trace_based_(false),
+ trace_index_(0),
+ trace_file_(nullptr),
+ production_rate_(std::string("2048kbps")),
+ payload_size_(1400),
+ secure_(false),
+ input_stream_mode_(false),
+ port_(0) {}
+
+ Prefix name;
+ bool virtual_producer;
+ bool manifest;
+ bool live_production;
+ std::uint32_t content_lifetime;
+ std::uint32_t download_size;
+ CryptoHashType hash_algorithm;
+ std::string keystore_name;
+ std::string passphrase;
+ std::string keystore_password;
+ bool multiphase_produce_;
+ bool rtc_;
+ bool interactive_;
+ bool trace_based_;
+ std::uint32_t trace_index_;
+ char *trace_file_;
+ Rate production_rate_;
+ std::size_t payload_size_;
+ bool secure_;
+ bool input_stream_mode_;
+ uint16_t port_;
+ std::vector<struct packet_t> trace_;
+};
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
new file mode 100644
index 000000000..655ac3b66
--- /dev/null
+++ b/apps/hiperf/src/forwarder_config.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace hiperf {
+
+struct ListenerConfig {
+ std::string address;
+ std::uint16_t port;
+ std::string interface;
+ std::string name;
+};
+
+struct ConnectorConfig {
+ std::string local_address;
+ std::uint16_t local_port;
+ std::string remote_address;
+ std::uint16_t remote_port;
+ std::string interface;
+ std::string name;
+};
+
+struct RouteConfig {
+ std::string prefix;
+ uint16_t weight;
+ std::string main_connector;
+ std::string backup_connector;
+ std::string name;
+};
+
+class ForwarderConfiguration {
+ public:
+ ForwarderConfiguration() : n_threads_(1) {}
+
+ bool empty() {
+ return listeners_.empty() && connectors_.empty() && routes_.empty();
+ }
+
+ ForwarderConfiguration &setThreadNumber(std::size_t threads) {
+ n_threads_ = threads;
+ return *this;
+ }
+
+ std::size_t getThreadNumber() { return n_threads_; }
+
+ template <typename... Args>
+ ForwarderConfiguration &addListener(Args &&...args) {
+ listeners_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ ForwarderConfiguration &addConnector(const std::string &name,
+ Args &&...args) {
+ connectors_.emplace(name, std::forward<Args>(args)...);
+ return *this;
+ }
+
+ template <typename... Args>
+ ForwarderConfiguration &addRoute(Args &&...args) {
+ routes_.emplace_back(std::forward<Args>(args)...);
+ return *this;
+ }
+
+ std::vector<ListenerConfig> &getListeners() { return listeners_; }
+
+ std::unordered_map<std::string, ConnectorConfig> &getConnectors() {
+ return connectors_;
+ }
+
+ std::vector<RouteConfig> &getRoutes() { return routes_; }
+
+ private:
+ std::vector<ListenerConfig> listeners_;
+ std::unordered_map<std::string, ConnectorConfig> connectors_;
+ std::vector<RouteConfig> routes_;
+ std::size_t n_threads_;
+};
+
+} \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
new file mode 100644
index 000000000..864208239
--- /dev/null
+++ b/apps/hiperf/src/forwarder_interface.cc
@@ -0,0 +1,676 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arpa/inet.h>
+#include <forwarder_interface.h>
+#include <hicn/transport/utils/log.h>
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <unordered_set>
+
+extern "C" {
+#include <hicn/error.h>
+#include <hicn/util/ip_address.h>
+}
+
+// XXX the main listener should be retrieve in this class at initialization, aka
+// when hICN becomes avialable
+//
+// XXX the main listener port will be retrieved in the forwarder
+// interface... everything else will be delayed until we have this
+// information
+
+namespace hiperf {
+
+ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
+ ICallback *callback)
+ : external_ioservice_(io_service),
+ forwarder_interface_callback_(callback),
+ work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
+ sock_(nullptr),
+ thread_(std::make_unique<std::thread>([this]() {
+ std::cout << "Starting Forwarder Interface thread" << std::endl;
+ internal_ioservice_.run();
+ std::cout << "Stopping Forwarder Interface thread" << std::endl;
+ })),
+ // set_route_callback_(std::forward<Callback &&>(setRouteCallback)),
+ check_routes_timer_(nullptr),
+ pending_add_route_counter_(0),
+ hicn_listen_port_(9695),
+ /* We start in disabled state even when a forwarder is always available */
+ state_(State::Disabled),
+ timer_(io_service),
+ num_reattempts(0) {
+ std::cout << "Forwarder interface created... connecting to forwarder...\n";
+ internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
+}
+
+ForwarderInterface::~ForwarderInterface() {
+ if (thread_ && thread_->joinable()) {
+ internal_ioservice_.dispatch([this]() {
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+
+ work_.reset();
+ });
+
+ thread_->join();
+ }
+
+ std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl;
+}
+
+void ForwarderInterface::onHicnServiceAvailable(bool flag) {
+ if (flag) {
+ switch (state_) {
+ case State::Disabled:
+ case State::Requested:
+ state_ = State::Available;
+ case State::Available:
+ connectToForwarder();
+ /* Synchronous */
+ if (state_ != State::Connected) {
+ std::cout << "ConnectToForwarder failed" << std::endl;
+ goto REATTEMPT;
+ }
+ state_ = State::Ready;
+
+ std::cout << "Connected to forwarder... cancelling reconnection timer"
+ << std::endl;
+ timer_.cancel();
+ num_reattempts = 0;
+
+ // case State::Connected:
+ // checkListener();
+
+ // if (state_ != State::Ready) {
+ // std::cout << "Listener not found" << std::endl;
+ // goto REATTEMPT;
+ // }
+ // state_ = State::Ready;
+
+ // timer_.cancel();
+ // num_reattempts = 0;
+
+ std::cout << "Forwarder interface is ready... communicate to controller"
+ << std::endl;
+
+ forwarder_interface_callback_->onHicnServiceReady();
+
+ case State::Ready:
+ break;
+ }
+ } else {
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+ state_ = State::Disabled; // XXX to be checked upon callback to prevent the
+ // state from going forward (used to manage
+ // concurrency)
+ }
+ return;
+
+REATTEMPT:
+ /* Schedule reattempt */
+ std::cout << "Failed to connect, scheduling reattempt" << std::endl;
+ num_reattempts++;
+
+ timer_.expires_from_now(
+ std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
+ // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
+ // this, flag, std::placeholders::_1));
+ timer_.async_wait([this, flag](std::error_code ec) {
+ if (ec) return;
+ onHicnServiceAvailable(flag);
+ });
+}
+
+int ForwarderInterface::connectToForwarder() {
+ sock_ = hc_sock_create();
+ if (!sock_) {
+ std::cout << "Could not create socket" << std::endl;
+ goto ERR_SOCK;
+ }
+
+ if (hc_sock_connect(sock_) < 0) {
+ std::cout << "Could not connect to forwarder" << std::endl;
+ goto ERR;
+ }
+
+ std::cout << "Forwarder interface connected" << std::endl;
+ state_ = State::Connected;
+ return 0;
+
+ERR:
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ERR_SOCK:
+ return -1;
+}
+
+int ForwarderInterface::checkListener() {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) return -1;
+
+ int ret = -1;
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ hicn_listen_port_ = l->local_port;
+ state_ = State::Ready;
+ ret = 0;
+ std::cout << "Got listener port" << std::endl;
+ break;
+ }
+ }
+
+ hc_data_free(data);
+ return ret;
+}
+
+void ForwarderInterface::close() {
+ std::cout << "ForwarderInterface::close" << std::endl;
+
+ state_ = State::Disabled;
+ /* Cancelling eventual reattempts */
+ timer_.cancel();
+
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+
+ internal_ioservice_.post([this]() { work_.reset(); });
+
+ if (thread_->joinable()) {
+ thread_->join();
+ }
+}
+
+#if 0
+void ForwarderInterface::enableCheckRoutesTimer() {
+ if (check_routes_timer_ != nullptr) return;
+
+ check_routes_timer_ =
+ std::make_unique<asio::steady_timer>(internal_ioservice_);
+ checkRoutesLoop();
+}
+
+void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) {
+ internalRemoveConnectedUser(protocol);
+}
+
+void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) {
+ internal_ioservice_.post(
+ [this, protocol]() { internalRemoveConnectedUser(protocol); });
+}
+#endif
+
+void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) {
+ std::vector<RouteInfoPtr> routes;
+ routes.push_back(std::move(route_info));
+ createFaceAndRoutes(routes);
+}
+
+void ForwarderInterface::createFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &routes_info) {
+ pending_add_route_counter_++;
+ auto timer = new asio::steady_timer(internal_ioservice_);
+ internal_ioservice_.post([this, routes_info, timer]() {
+ internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT,
+ timer);
+ });
+}
+
+void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) {
+ std::vector<RouteInfoPtr> routes;
+ routes.push_back(std::move(route_info));
+ deleteFaceAndRoutes(routes);
+}
+
+void ForwarderInterface::deleteFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &routes_info) {
+ internal_ioservice_.post([this, routes_info]() {
+ for (auto &route : routes_info) {
+ internalDeleteFaceAndRoute(route);
+ }
+ });
+}
+
+void ForwarderInterface::internalDeleteFaceAndRoute(
+ const RouteInfoPtr &route_info) {
+ if (!sock_) return;
+
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) return;
+
+ std::vector<hc_route_t *> routes_to_remove;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+
+ std::string route_addr(remote_addr);
+ if (route_addr.compare(route_info->route_addr) == 0 &&
+ r->len == route_info->route_len) {
+ // route found
+ routes_to_remove.push_back(r);
+ }
+ }
+
+ if (routes_to_remove.size() == 0) {
+ // nothing to do here
+ hc_data_free(data);
+ return;
+ }
+
+ std::unordered_set<uint32_t> connids_to_remove;
+ for (unsigned i = 0; i < routes_to_remove.size(); i++) {
+ connids_to_remove.insert(routes_to_remove[i]->face_id);
+ if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
+ std::cout << "Error removing route from forwarder." << std::endl;
+ }
+ }
+
+ // remove connection
+ if (hc_connection_list(sock_, &data) < 0) {
+ hc_data_free(data);
+ return;
+ }
+
+ // collects pointerst to the connections using the conn IDs
+ std::vector<hc_connection_t *> conns_to_remove;
+ foreach_connection(c, data) {
+ if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
+ // conn found
+ conns_to_remove.push_back(c);
+ }
+ }
+
+ if (conns_to_remove.size() == 0) {
+ // nothing else to do here
+ hc_data_free(data);
+ return;
+ }
+
+ for (unsigned i = 0; i < conns_to_remove.size(); i++) {
+ if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
+ std::cout << "Error removing connection from forwarder." << std::endl;
+ }
+ }
+
+ hc_data_free(data);
+}
+
+void ForwarderInterface::internalCreateFaceAndRoutes(
+ const std::vector<RouteInfoPtr> &route_info, uint8_t max_try,
+ asio::steady_timer *timer) {
+ uint32_t face_id;
+
+ std::vector<RouteInfoPtr> failed;
+ for (auto &route : route_info) {
+ int ret = tryToCreateFace(route.get(), &face_id);
+ if (ret >= 0) {
+ auto ret = tryToCreateRoute(route.get(), face_id);
+ if (ret < 0) {
+ failed.push_back(route);
+ std::cerr << "Error creating route and face" << std::endl;
+ continue;
+ }
+ }
+ }
+
+ if (failed.size() > 0) {
+ if (max_try == 0) {
+ /* All attempts failed */
+ goto RESULT;
+ }
+ max_try--;
+ timer->expires_from_now(std::chrono::milliseconds(500));
+ timer->async_wait([this, failed, max_try, timer](std::error_code ec) {
+ if (ec) return;
+ internalCreateFaceAndRoutes(failed, max_try, timer);
+ });
+ return;
+ }
+
+#if 0
+ // route_status_[protocol] = std::move(route_info);
+ for (size_t i = 0; i < route_info.size(); i++) {
+ route_status_.insert(
+ std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i])));
+ }
+#endif
+
+RESULT:
+ std::cout << "Face / Route create ok, now calling back protocol" << std::endl;
+ pending_add_route_counter_--;
+ external_ioservice_.post([this, r = std::move(route_info)]() mutable {
+ forwarder_interface_callback_->onRouteConfigured(r);
+ });
+ delete timer;
+}
+
+int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
+ uint32_t *face_id) {
+ bool found = false;
+
+ // check connection with the forwarder
+ if (!sock_) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] socket error"
+ << std::endl;
+ goto ERR_SOCK;
+ }
+
+ // get listeners list
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners";
+ goto ERR_LIST;
+ }
+
+ char _local_address[128];
+ foreach_listener(l, data) {
+ std::cout << "Processing " << l->interface_name << std::endl;
+ std::string interface = std::string(l->interface_name);
+ int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET);
+ if (ret < 0) {
+ std::cerr << "Error in ip_address_ntop" << std::endl;
+ goto ERR;
+ }
+
+ std::string local_address = std::string(_local_address);
+ uint16_t local_port = l->local_port;
+
+ if (interface.compare(route_info->interface) == 0 &&
+ local_address.compare(route_info->local_addr) == 0 &&
+ local_port == route_info->local_port) {
+ found = true;
+ break;
+ }
+ }
+
+ std::cout << route_info->remote_addr << std::endl;
+
+ ip_address_t local_address, remote_address;
+ ip_address_pton(route_info->local_addr.c_str(), &local_address);
+ ip_address_pton(route_info->remote_addr.c_str(), &remote_address);
+
+ if (!found) {
+ // Create listener
+ hc_listener_t listener;
+ memset(&listener, 0, sizeof(hc_listener_t));
+
+ std::string name = "l_" + route_info->name;
+ listener.local_addr = local_address;
+ listener.type = CONNECTION_TYPE_UDP;
+ listener.family = AF_INET;
+ listener.local_port = route_info->local_port;
+ strncpy(listener.name, name.c_str(), sizeof(listener.name));
+ strncpy(listener.interface_name, route_info->interface.c_str(),
+ sizeof(listener.interface_name));
+
+ std::cout << "------------> " << route_info->interface << std::endl;
+
+ int ret = hc_listener_create(sock_, &listener);
+
+ if (ret < 0) {
+ std::cerr << "Error creating listener." << std::endl;
+ return -1;
+ } else {
+ std::cout << "Listener " << listener.id << " created." << std::endl;
+ }
+ }
+
+ // Create face
+ hc_face_t face;
+ memset(&face, 0, sizeof(hc_face_t));
+
+ // crate face with the local interest
+ face.face.type = FACE_TYPE_UDP;
+ face.face.family = route_info->family;
+ face.face.local_addr = local_address;
+ face.face.remote_addr = remote_address;
+ face.face.local_port = route_info->local_port;
+ face.face.remote_port = route_info->remote_port;
+
+ if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) <
+ 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] "
+ "netdevice_set_name "
+ "("
+ << face.face.netdevice.name << ", "
+ << route_info->interface << ") error" << std::endl;
+ goto ERR;
+ }
+
+ // create face
+ if (hc_face_create(sock_, &face) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateFace] error creating face";
+ goto ERR;
+ }
+
+ std::cout << "Face created successfully" << std::endl;
+
+ // assing face to the return value
+ *face_id = face.id;
+
+ hc_data_free(data);
+ return 0;
+
+ERR:
+ hc_data_free(data);
+ERR_LIST:
+ERR_SOCK:
+ return -1;
+}
+
+int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
+ uint32_t face_id) {
+ std::cout << "Trying to create route" << std::endl;
+
+ // check connection with the forwarder
+ if (!sock_) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] socket error";
+ return -1;
+ }
+
+ ip_address_t route_ip;
+ hc_route_t route;
+
+ if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error";
+ return -1;
+ }
+
+ route.face_id = face_id;
+ route.family = AF_INET6;
+ route.remote_addr = route_ip;
+ route.len = route_info->route_len;
+ route.cost = 1;
+
+ if (hc_route_create(sock_, &route) < 0) {
+ std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route";
+ return -1;
+ }
+
+ std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl;
+ return 0;
+}
+
+#if 0 // not used
+void ForwarderInterface::checkRoutesLoop() {
+ check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ check_routes_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (pending_add_route_counter_ == 0) checkRoutes();
+ });
+}
+
+void ForwarderInterface::checkRoutes() {
+ std::cout << "someone called the checkRoutes function" << std::endl;
+ if (!sock_) return;
+
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) {
+ return;
+ }
+
+ std::unordered_set<std::string> routes_set;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+ std::string route(std::string(remote_addr) + "/" + std::to_string(r->len));
+ routes_set.insert(route);
+ }
+
+ for (auto it = route_status_.begin(); it != route_status_.end(); it++) {
+ std::string route(it->second->route_addr + "/" +
+ std::to_string(it->second->route_len));
+ if (routes_set.find(route) == routes_set.end()) {
+ // the route is missing
+ createFaceAndRoute(it->second, it->first);
+ break;
+ }
+ }
+
+ hc_data_free(data);
+}
+#endif
+
+#if 0
+ using ListenerRetrievedCallback =
+ std::function<void(std::error_code, uint32_t)>;
+
+ ListenerRetrievedCallback listener_retrieved_callback_;
+
+#ifdef __ANDROID__
+ hicn_listen_port_(9695),
+#else
+ hicn_listen_port_(0),
+#endif
+ timer_(forward_engine_.getIoService()),
+
+ void initConfigurationProtocol(void)
+ {
+ // We need the configuration, which is different for every protocol...
+ // so we move this step down towards the protocol implementation itself.
+ if (!permanent_hicn) {
+ doInitConfigurationProtocol();
+ } else {
+ // XXX This should be moved somewhere else
+ getMainListener(
+ [this](std::error_code ec, uint32_t hicn_listen_port) {
+ if (!ec)
+ {
+ hicn_listen_port_ = hicn_listen_port;
+ doInitConfigurationProtocol();
+ }
+ });
+ }
+ }
+
+ template <typename Callback>
+ void getMainListener(Callback &&callback)
+ {
+ listener_retrieved_callback_ = std::forward<Callback &&>(callback);
+ tryToConnectToForwarder();
+ }
+ private:
+ void doGetMainListener(std::error_code ec)
+ {
+ if (!ec)
+ {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.getMainListenerPort();
+ if (ret <= 0)
+ {
+ // Since without the main listener of the forwarder the proxy cannot
+ // work, we can stop the program here until we get the listener port.
+ std::cout <<
+ "Could not retrieve main listener port from the forwarder. "
+ "Retrying.";
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&Protocol::doGetMainListener, this,
+ std::placeholders::_1));
+ }
+ else
+ {
+ timer_.cancel();
+ retx_count_ = 0;
+ hicn_listen_port_ = uint16_t(ret);
+ listener_retrieved_callback_(
+ make_error_code(configuration_error::success), hicn_listen_port_);
+ }
+ }
+ else
+ {
+ std::cout << "Timer for retrieving main hicn listener canceled." << std::endl;
+ }
+ }
+
+ void tryToConnectToForwarder()
+ {
+ doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
+ }
+
+ void doTryToConnectToForwarder(std::error_code ec)
+ {
+ if (!ec)
+ {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.connect();
+ if (ret < 0)
+ {
+ // We were not able to connect to the local forwarder. Do not give up
+ // and retry.
+ std::cout << "Could not connect to local forwarder. Retrying." << std::endl;
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this,
+ std::placeholders::_1));
+ }
+ else
+ {
+ timer_.cancel();
+ retx_count_ = 0;
+ doGetMainListener(std::make_error_code(std::errc(0)));
+ }
+ }
+ else
+ {
+ std::cout << "Timer for re-trying forwarder connection canceled." << std::endl;
+ }
+ }
+
+
+ template <typename ProtocolImplementation>
+ constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL;
+
+#endif
+
+constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
+constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
+
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
new file mode 100644
index 000000000..7591ea257
--- /dev/null
+++ b/apps/hiperf/src/forwarder_interface.h
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#ifndef WITH_POLICY
+#define WITH_POLICY
+#endif
+#include <hicn/ctrl/api.h>
+#include <hicn/util/ip_address.h>
+}
+
+#ifndef ASIO_STANDALONE
+#define ASIO_STANDALONE
+#endif
+#include <asio.hpp>
+
+#include <functional>
+#include <thread>
+#include <unordered_map>
+
+namespace hiperf {
+
+class ForwarderInterface {
+ static const uint32_t REATTEMPT_DELAY_MS = 500;
+ static const uint32_t MAX_REATTEMPT = 10;
+
+ public:
+ struct RouteInfo {
+ int family;
+ std::string local_addr;
+ uint16_t local_port;
+ std::string remote_addr;
+ uint16_t remote_port;
+ std::string route_addr;
+ uint8_t route_len;
+ std::string interface;
+ std::string name;
+ };
+
+ using RouteInfoPtr = std::shared_ptr<RouteInfo>;
+
+ class ICallback {
+ public:
+ virtual void onHicnServiceReady() = 0;
+ virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0;
+ };
+
+ enum class State {
+ Disabled, /* Stack is stopped */
+ Requested, /* Stack is starting */
+ Available, /* Forwarder is running */
+ Connected, /* Control socket connected */
+ Ready, /* Listener present */
+ };
+
+ public:
+ ForwarderInterface(asio::io_service &io_service, ICallback *callback);
+
+ ~ForwarderInterface();
+
+ State getState();
+
+ void setState(State state);
+
+ void onHicnServiceAvailable(bool flag);
+
+ void enableCheckRoutesTimer();
+
+ void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
+
+ void createFaceAndRoute(const RouteInfoPtr &route_info);
+
+ void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
+
+ void deleteFaceAndRoute(const RouteInfoPtr &route_info);
+
+ void close();
+
+ uint16_t getHicnListenerPort() { return hicn_listen_port_; }
+
+ private:
+ int connectToForwarder();
+
+ int checkListener();
+
+ void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info,
+ uint8_t max_try, asio::steady_timer *timer);
+
+ void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info);
+
+ int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id);
+ int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id);
+
+ void checkRoutesLoop();
+
+ void checkRoutes();
+
+ asio::io_service &external_ioservice_;
+ asio::io_service internal_ioservice_;
+ ICallback *forwarder_interface_callback_;
+ std::unique_ptr<asio::io_service::work> work_;
+ hc_sock_t *sock_;
+ std::unique_ptr<std::thread> thread_;
+ // SetRouteCallback set_route_callback_;
+ // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_;
+ std::unique_ptr<asio::steady_timer> check_routes_timer_;
+ uint32_t pending_add_route_counter_;
+ uint16_t hicn_listen_port_;
+
+ State state_;
+
+ /* Reattempt timer */
+ asio::steady_timer timer_;
+ unsigned num_reattempts;
+};
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
new file mode 100644
index 000000000..b2d99c4a4
--- /dev/null
+++ b/apps/hiperf/src/main.cc
@@ -0,0 +1,456 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <client.h>
+#include <server.h>
+#include <forwarder_interface.h>
+
+namespace hiperf {
+
+void usage() {
+ std::cerr << "HIPERF - A tool for performing network throughput "
+ "measurements with hICN"
+ << std::endl;
+ std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl;
+ std::cerr << std::endl;
+ std::cerr << "SERVER OR CLIENT:" << std::endl;
+#ifndef _WIN32
+ std::cerr << "-D\t\t\t\t\t"
+ << "Run as a daemon" << std::endl;
+ std::cerr << "-R\t\t\t\t\t"
+ << "Run RTC protocol (client or server)" << std::endl;
+ std::cerr << "-f\t<filename>\t\t\t"
+ << "Log file" << std::endl;
+ std::cerr << "-z\t<io_module>\t\t\t"
+ << "IO module to use. Default: hicnlight_module" << std::endl;
+#endif
+ std::cerr << std::endl;
+ std::cerr << "SERVER SPECIFIC:" << std::endl;
+ std::cerr << "-A\t<content_size>\t\t\t"
+ "Size of the content to publish. This "
+ "is not the size of the packet (see -s for it)."
+ << std::endl;
+ std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
+ << std::endl;
+ std::cerr << "-r\t\t\t\t\t"
+ << "Produce real content of <content_size> bytes" << std::endl;
+ std::cerr << "-m\t\t\t\t\t"
+ << "Produce transport manifest" << std::endl;
+ std::cerr << "-l\t\t\t\t\t"
+ << "Start producing content upon the reception of the "
+ "first interest"
+ << std::endl;
+ std::cerr << "-K\t<keystore_path>\t\t\t"
+ << "Path of p12 file containing the "
+ "crypto material used for signing packets"
+ << std::endl;
+ std::cerr << "-k\t<passphrase>\t\t\t"
+ << "String from which a 128-bit symmetric key will be "
+ "derived for signing packets"
+ << std::endl;
+ std::cerr << "-y\t<hash_algorithm>\t\t"
+ << "Use the selected hash algorithm for "
+ "calculating manifest digests"
+ << std::endl;
+ std::cerr << "-p\t<password>\t\t\t"
+ << "Password for p12 keystore" << std::endl;
+ std::cerr << "-x\t\t\t\t\t"
+ << "Produce a content of <content_size>, then after downloading "
+ "it produce a new content of"
+ << "\n\t\t\t\t\t<content_size> without resetting "
+ "the suffix to 0."
+ << std::endl;
+ std::cerr << "-B\t<bitrate>\t\t\t"
+ << "Bitrate for RTC producer, to be used with the -R option."
+ << std::endl;
+#ifndef _WIN32
+ std::cerr << "-I\t\t\t\t\t"
+ "Interactive mode, start/stop real time content production "
+ "by pressing return. To be used with the -R option"
+ << std::endl;
+ std::cerr
+ << "-T\t<filename>\t\t\t"
+ "Trace based mode, hiperf takes as input a file with a trace. "
+ "Each line of the file indicates the timestamp and the size of "
+ "the packet to generate. To be used with the -R option. -B and -I "
+ "will be ignored."
+ << std::endl;
+ std::cerr << "-E\t\t\t\t\t"
+ << "Enable encrypted communication. Requires the path to a p12 "
+ "file containing the "
+ "crypto material used for the TLS handshake"
+ << std::endl;
+ std::cerr << "-G\t<port>\t\t\t"
+ << "input stream from localhost at the specified port" << std::endl;
+#endif
+ std::cerr << std::endl;
+ std::cerr << "CLIENT SPECIFIC:" << std::endl;
+ std::cerr << "-b\t<beta_parameter>\t\t"
+ << "RAAQM beta parameter" << std::endl;
+ std::cerr << "-d\t<drop_factor_parameter>\t\t"
+ << "RAAQM drop factor "
+ "parameter"
+ << std::endl;
+ std::cerr << "-L\t<interest lifetime>\t\t"
+ << "Set interest lifetime." << std::endl;
+ std::cerr << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled."
+ << std::endl;
+ std::cerr << "-W\t<window_size>\t\t\t"
+ << "Use a fixed congestion window "
+ "for retrieving the data."
+ << std::endl;
+ std::cerr << "-i\t<stats_interval>\t\t"
+ << "Show the statistics every <stats_interval> milliseconds."
+ << std::endl;
+ std::cerr << "-c\t<certificate_path>\t\t"
+ << "Path of the producer certificate to be used for verifying the "
+ "origin of the packets received."
+ << std::endl;
+ std::cerr << "-k\t<passphrase>\t\t\t"
+ << "String from which is derived the symmetric key used by the "
+ "producer to sign packets and by the consumer to verify them."
+ << std::endl;
+ std::cerr << "-t\t\t\t\t\t"
+ "Test mode, check if the client is receiving the "
+ "correct data. This is an RTC specific option, to be "
+ "used with the -R (default false)"
+ << std::endl;
+ std::cerr << "-P\t\t\t\t\t"
+ << "Prefix of the producer where to do the handshake" << std::endl;
+ std::cerr << "-j\t<relay_name>\t\t\t"
+ << "Publish the received content under the name relay_name."
+ "This is an RTC specific option, to be "
+ "used with the -R (default false)"
+ << std::endl;
+ std::cerr << "-g\t<port>\t\t\t"
+ << "output stream to localhost at the specified port" << std::endl;
+}
+
+int main(int argc, char *argv[]) {
+#ifndef _WIN32
+ // Common
+ bool daemon = false;
+#else
+ WSADATA wsaData = {0};
+ WSAStartup(MAKEWORD(2, 2), &wsaData);
+#endif
+
+ // -1 server, 0 undefined, 1 client
+ int role = 0;
+ int options = 0;
+
+ char *log_file = nullptr;
+ transport::interface::global_config::IoModuleConfiguration config;
+ std::string conf_file;
+ config.name = "hicnlight_module";
+
+ // Consumer
+ ClientConfiguration client_configuration;
+
+ // Producer
+ ServerConfiguration server_configuration;
+
+ int opt;
+#ifndef _WIN32
+ while ((opt = getopt(
+ argc, argv,
+ "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) !=
+ -1) {
+ switch (opt) {
+ // Common
+ case 'D': {
+ daemon = true;
+ break;
+ }
+ case 'I': {
+ server_configuration.interactive_ = true;
+ server_configuration.trace_based_ = false;
+ server_configuration.input_stream_mode_ = false;
+ break;
+ }
+ case 'T': {
+ server_configuration.interactive_ = false;
+ server_configuration.trace_based_ = true;
+ server_configuration.input_stream_mode_ = false;
+ server_configuration.trace_file_ = optarg;
+ break;
+ }
+ case 'G': {
+ server_configuration.interactive_ = false;
+ server_configuration.trace_based_ = false;
+ server_configuration.input_stream_mode_ = true;
+ server_configuration.port_ = std::stoul(optarg);
+ break;
+ }
+ case 'g': {
+ client_configuration.output_stream_mode_ = true;
+ client_configuration.port_ = std::stoul(optarg);
+ break;
+ }
+#else
+ while ((opt = getopt(argc, argv,
+ "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) !=
+ -1) {
+ switch (opt) {
+#endif
+ case 'f': {
+ log_file = optarg;
+ break;
+ }
+ case 'R': {
+ client_configuration.rtc_ = true;
+ server_configuration.rtc_ = true;
+ break;
+ }
+ case 'z': {
+ config.name = optarg;
+ break;
+ }
+ case 'F': {
+ conf_file = optarg;
+ break;
+ }
+
+ // Server or Client
+ case 'S': {
+ role -= 1;
+ break;
+ }
+ case 'C': {
+ role += 1;
+ break;
+ }
+ case 'k': {
+ server_configuration.passphrase = std::string(optarg);
+ client_configuration.passphrase = std::string(optarg);
+ break;
+ }
+
+ // Client specifc
+ case 'b': {
+ client_configuration.beta = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'd': {
+ client_configuration.drop_factor = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'W': {
+ client_configuration.window = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'M': {
+ client_configuration.receive_buffer_size_ = std::stoull(optarg);
+ options = 1;
+ break;
+ }
+ case 'P': {
+ client_configuration.producer_prefix_ = Prefix(optarg);
+ client_configuration.secure_ = true;
+ break;
+ }
+ case 'c': {
+ client_configuration.producer_certificate = std::string(optarg);
+ options = 1;
+ break;
+ }
+ case 'i': {
+ client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 't': {
+ client_configuration.test_mode_ = true;
+ options = 1;
+ break;
+ }
+ case 'L': {
+ client_configuration.interest_lifetime_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'j': {
+ client_configuration.relay_ = true;
+ client_configuration.relay_name_ = Prefix(optarg);
+ options = 1;
+ break;
+ }
+ // Server specific
+ case 'A': {
+ server_configuration.download_size = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 's': {
+ server_configuration.payload_size_ = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 'r': {
+ server_configuration.virtual_producer = false;
+ options = -1;
+ break;
+ }
+ case 'm': {
+ server_configuration.manifest = true;
+ options = -1;
+ break;
+ }
+ case 'l': {
+ server_configuration.live_production = true;
+ options = -1;
+ break;
+ }
+ case 'K': {
+ server_configuration.keystore_name = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'y': {
+ if (strncasecmp(optarg, "sha256", 6) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::SHA256;
+ } else if (strncasecmp(optarg, "sha512", 6) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::SHA512;
+ } else if (strncasecmp(optarg, "blake2b512", 10) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512;
+ } else if (strncasecmp(optarg, "blake2s256", 10) == 0) {
+ server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256;
+ } else {
+ std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
+ << std::endl;
+ }
+ options = -1;
+ break;
+ }
+ case 'p': {
+ server_configuration.keystore_password = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'x': {
+ server_configuration.multiphase_produce_ = true;
+ options = -1;
+ break;
+ }
+ case 'B': {
+ auto str = std::string(optarg);
+ std::transform(str.begin(), str.end(), str.begin(), ::tolower);
+ server_configuration.production_rate_ = str;
+ options = -1;
+ break;
+ }
+ case 'E': {
+ server_configuration.keystore_name = std::string(optarg);
+ server_configuration.secure_ = true;
+ break;
+ }
+ case 'h':
+ default:
+ usage();
+ return EXIT_FAILURE;
+ }
+ }
+
+ if (options > 0 && role < 0) {
+ std::cerr << "Client options cannot be used when using the "
+ "software in server mode"
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else if (options < 0 && role > 0) {
+ std::cerr << "Server options cannot be used when using the "
+ "software in client mode"
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else if (!role) {
+ std::cerr << "Please specify if running hiperf as client "
+ "or server."
+ << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ }
+
+ if (argv[optind] == 0) {
+ std::cerr << "Please specify the name/prefix to use." << std::endl;
+ usage();
+ return EXIT_FAILURE;
+ } else {
+ if (role > 0) {
+ client_configuration.name = Name(argv[optind]);
+ } else {
+ server_configuration.name = Prefix(argv[optind]);
+ }
+ }
+
+ if (log_file) {
+#ifndef _WIN32
+ int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR);
+ dup2(fd, STDOUT_FILENO);
+ dup2(STDOUT_FILENO, STDERR_FILENO);
+ close(fd);
+#else
+ int fd =
+ _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD);
+ _dup2(fd, _fileno(stdout));
+ _dup2(_fileno(stdout), _fileno(stderr));
+ _close(fd);
+#endif
+ }
+
+#ifndef _WIN32
+ if (daemon) {
+ utils::Daemonizator::daemonize(false);
+ }
+#endif
+
+ /**
+ * IO module configuration
+ */
+ config.set();
+
+ // Parse config file
+ transport::interface::global_config::parseConfigurationFile(conf_file);
+
+ if (role > 0) {
+ HIperfClient c(client_configuration);
+ if (c.setup() != ERROR_SETUP) {
+ c.run();
+ }
+ } else if (role < 0) {
+ HIperfServer s(server_configuration);
+ if (s.setup() != ERROR_SETUP) {
+ s.run();
+ }
+ } else {
+ usage();
+ return EXIT_FAILURE;
+ }
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+
+ return 0;
+}
+
+} // namespace hiperf
+
+int main(int argc, char *argv[]) { return hiperf::main(argc, argv); }
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
new file mode 100644
index 000000000..968d42e2c
--- /dev/null
+++ b/apps/hiperf/src/server.cc
@@ -0,0 +1,516 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <server.h>
+
+namespace hiperf {
+
+/**
+ * Hiperf server class: configure and setup an hicn producer following the
+ * ServerConfiguration.
+ */
+class HIperfServer::Impl {
+ const std::size_t log2_content_object_buffer_size = 8;
+
+ public:
+ Impl(const hiperf::ServerConfiguration &conf)
+ : configuration_(conf),
+ signals_(io_service_),
+ rtc_timer_(io_service_),
+ unsatisfied_interests_(),
+ content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
+ content_objects_index_(0),
+ mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
+ last_segment_(0),
+#ifndef _WIN32
+ ptr_last_segment_(&last_segment_),
+ input_(io_service_),
+ rtc_running_(false),
+#else
+ ptr_last_segment_(&last_segment_),
+#endif
+ flow_name_(configuration_.name.getName()),
+ socket_(io_service_),
+ recv_buffer_(nullptr, 0) {
+ std::string buffer(configuration_.payload_size_, 'X');
+ std::cout << "Producing contents under name " << conf.name.getName()
+ << std::endl;
+#ifndef _WIN32
+ if (configuration_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
+ content_objects_[i] = ContentObject::Ptr(
+ new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
+ (const uint8_t *)buffer.data(), buffer.size()));
+ content_objects_[i]->setLifetime(
+ default_values::content_object_expiry_time);
+ }
+ }
+
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & mask_]->setName(
+ interest.getName());
+ producer_socket_->produce(
+ *content_objects_[content_objects_index_++ & mask_]);
+ }
+
+ void processInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 5000000_U32);
+
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ std::cout << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 5000000_U32);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ ptr_last_segment_ = &last_segment_;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either be the one from the received interest or the
+ // smallest suffix of a previous interest not satisfed
+ if (!unsatisfied_interests_.empty()) {
+ auto it =
+ std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), *ptr_last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
+ }
+
+ std::cout << "Received interest " << interest.getName().getSuffix()
+ << ", starting production at " << suffix << std::endl;
+ std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
+ }
+
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) {
+ auto b = utils::MemBuf::create(configuration_.download_size);
+ std::memset(b->writableData(), '?', configuration_.download_size);
+ b->append(configuration_.download_size);
+ uint32_t total;
+
+ utils::TimePoint t0 = utils::SteadyClock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::TimePoint t1 = utils::SteadyClock::now();
+
+ std::cout
+ << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count()
+ << " us)" << std::endl;
+ }
+
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ auto b = utils::MemBuf::create(configuration_.download_size);
+ std::memset(b->writableData(), '?', configuration_.download_size);
+ b->append(configuration_.download_size);
+
+ p.asyncProduce(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix,
+ &ptr_last_segment_);
+ }
+
+ void cacheMiss(ProducerSocket &p, const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
+ }
+
+ void onContentProduced(ProducerSocket &p, const std::error_code &err,
+ uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &Impl::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
+
+ std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path,
+ std::string &keystore_pwd,
+ CryptoHashType &hash_type) {
+ if (access(keystore_path.c_str(), F_OK) != -1) {
+ return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type);
+ }
+ return std::make_shared<Identity>(keystore_path, keystore_pwd,
+ CryptoSuite::RSA_SHA256, 1024, 365,
+ "producer-test");
+ }
+
+ int setup() {
+ int ret;
+ int production_protocol;
+
+ if (configuration_.secure_) {
+ auto identity = getProducerIdentity(configuration_.keystore_name,
+ configuration_.keystore_password,
+ configuration_.hash_algorithm);
+ producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
+ configuration_.rtc_, identity);
+ } else {
+ if (!configuration_.rtc_) {
+ production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
+ } else {
+ production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ }
+
+ producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase.empty()) {
+ std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>(
+ CryptoSuite::HMAC_SHA256, configuration_.passphrase);
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ }
+
+ if (!configuration_.keystore_name.empty()) {
+ auto identity = getProducerIdentity(configuration_.keystore_name,
+ configuration_.keystore_password,
+ configuration_.hash_algorithm);
+ std::shared_ptr<Signer> signer = identity->getSigner();
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ }
+
+ uint32_t rtc_header_size = 0;
+ if (configuration_.rtc_) rtc_header_size = 12;
+ producer_socket_->setSocketOption(
+ GeneralTransportOptions::DATA_PACKET_SIZE,
+ (uint32_t)(
+ configuration_.payload_size_ + rtc_header_size +
+ (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60)));
+ producer_socket_->registerPrefix(configuration_.name);
+ producer_socket_->connect();
+
+ if (configuration_.rtc_) {
+ std::cout << "Running RTC producer: the prefix length will be ignored."
+ " Use /128 by default in RTC mode"
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ if (!configuration_.virtual_producer) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production) {
+ produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+ } else {
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
+
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &Impl::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+
+ return ERROR_SUCCESS;
+ }
+
+ void receiveStream() {
+ socket_.async_receive_from(
+ asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
+ [this](std::error_code ec, std::size_t length) {
+ if (ec) return;
+ sendRTCContentFromStream(recv_buffer_.first, length);
+ receiveStream();
+ });
+ }
+
+ void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+ // this is used to compute the data packet delay
+ // Used only for performance evaluation
+ // It requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ uint8_t *start = (uint8_t *)payload->writableData();
+ std::memcpy(start, &now, sizeof(uint64_t));
+ std::memcpy(start + sizeof(uint64_t), buff, len);
+ producer_socket_->produceDatagram(flow_name_, start,
+ len + sizeof(uint64_t));
+ }
+
+ void sendRTCContentObjectCallback(std::error_code ec) {
+ if (ec) return;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ // this is used to compute the data packet delay
+ // Used only for performance evaluation
+ // It requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ producer_socket_->produceDatagram(
+ flow_name_, payload->data(),
+ payload->length() < 1400 ? payload->length() : 1400);
+ }
+
+ void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ if (ec) return;
+
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ uint32_t packet_len =
+ configuration_.trace_[configuration_.trace_index_].size;
+
+ // this is used to compute the data packet delay
+ // used only for performance evaluation
+ // it requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ if (packet_len > payload->length()) packet_len = payload->length();
+ if (packet_len > 1400) packet_len = 1400;
+
+ producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+
+ uint32_t next_index = configuration_.trace_index_ + 1;
+ uint64_t schedule_next;
+ if (next_index < configuration_.trace_.size()) {
+ schedule_next =
+ configuration_.trace_[next_index].timestamp -
+ configuration_.trace_[configuration_.trace_index_].timestamp;
+ } else {
+ // here we need to loop, schedule in a random time
+ schedule_next = 1000;
+ }
+
+ configuration_.trace_index_ =
+ (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
+ rtc_timer_.async_wait(
+ std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
+ std::placeholders::_1));
+ }
+
+#ifndef _WIN32
+ void handleInput(const std::error_code &error, std::size_t length) {
+ if (error) {
+ producer_socket_->stop();
+ io_service_.stop();
+ }
+
+ if (rtc_running_) {
+ std::cout << "stop real time content production" << std::endl;
+ rtc_running_ = false;
+ rtc_timer_.cancel();
+ } else {
+ std::cout << "start real time content production" << std::endl;
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+
+ input_buffer_.consume(length); // Remove newline from input.
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&Impl::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ }
+#endif
+
+ int parseTraceFile() {
+ std::ifstream trace(configuration_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ configuration_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
+ int run() {
+ std::cerr << "Starting to serve consumers" << std::endl;
+
+ signals_.add(SIGINT);
+ signals_.async_wait([this](const std::error_code &, const int &) {
+ std::cout << "STOPPING!!" << std::endl;
+ producer_socket_->stop();
+ io_service_.stop();
+ });
+
+ if (configuration_.rtc_) {
+#ifndef _WIN32
+ if (configuration_.interactive_) {
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&Impl::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ } else if (configuration_.trace_based_) {
+ std::cout << "trace-based mode enabled" << std::endl;
+ if (configuration_.trace_file_ == nullptr) {
+ std::cout << "cannot find the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ if (parseTraceFile() < 0) {
+ std::cout << "cannot parse the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(std::chrono::milliseconds(1));
+ rtc_timer_.async_wait(
+ std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
+ std::placeholders::_1));
+ } else if (configuration_.input_stream_mode_) {
+ rtc_running_ = true;
+ // crate socket
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ socket_.bind(remote_);
+ recv_buffer_.first = (uint8_t *)malloc(1500);
+ recv_buffer_.second = 1500;
+ receiveStream();
+ } else {
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
+ this, std::placeholders::_1));
+ }
+#else
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+#endif
+ }
+
+ io_service_.run();
+
+ return ERROR_SUCCESS;
+ }
+
+ private:
+ hiperf::ServerConfiguration configuration_;
+ asio::io_service io_service_;
+ asio::signal_set signals_;
+ asio::steady_timer rtc_timer_;
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::vector<std::shared_ptr<ContentObject>> content_objects_;
+ std::uint16_t content_objects_index_;
+ std::uint16_t mask_;
+ std::uint32_t last_segment_;
+ std::uint32_t *ptr_last_segment_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+#ifndef _WIN32
+ asio::posix::stream_descriptor input_;
+ asio::streambuf input_buffer_;
+ bool rtc_running_;
+ Name flow_name_;
+ asio::ip::udp::socket socket_;
+ asio::ip::udp::endpoint remote_;
+ std::pair<uint8_t *, std::size_t> recv_buffer_;
+#endif
+};
+
+HIperfServer::HIperfServer(const ServerConfiguration &conf) {
+ impl_ = new Impl(conf);
+}
+
+HIperfServer::~HIperfServer() { delete impl_; }
+
+int HIperfServer::setup() { return impl_->setup(); }
+
+void HIperfServer::run() { impl_->run(); }
+
+} // namespace hiperf
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
new file mode 100644
index 000000000..05407a807
--- /dev/null
+++ b/apps/hiperf/src/server.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <common.h>
+
+namespace hiperf {
+
+class HIperfServer {
+ public:
+ HIperfServer(const ServerConfiguration &conf);
+ ~HIperfServer();
+ int setup();
+ void run();
+
+ private:
+ class Impl;
+ Impl *impl_;
+};
+
+} // namespace hiperf \ No newline at end of file
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
index 8c2043c30..66b9c1bab 100644
--- a/apps/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/CMakeLists.txt
@@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 14)
# -Wno-c99-designator issue
@@ -56,10 +56,18 @@ list(APPEND COMPILER_DEFINITIONS
-DWITH_POLICY
)
+list(APPEND HTTP_PROXY_LIBRARIES
+ ${LIBTRANSPORT_LIBRARIES}
+ ${LIBHICNCTRL_LIBRARIES}
+ ${LIBHICN_LIBRARIES}
+ ${OPENSSL_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
+)
+
build_library(${LIBHTTP_PROXY}
STATIC
SOURCES ${LIB_SOURCE_FILES}
- LINK_LIBRARIES ${LIBRARIES}
+ LINK_LIBRARIES ${HTTP_PROXY_LIBRARIES}
DEPENDS ${DEPENDENCIES}
INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES}
INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS}
diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
index 75cbbd64b..5cc80a168 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
@@ -11,8 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
-
list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h
${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h
diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
index 19c96a9e3..e02b9d9a7 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
@@ -27,7 +27,6 @@
#include "forwarder_interface.h"
-
#define RETRY_INTERVAL 300
namespace transport {
@@ -68,7 +67,8 @@ class ForwarderConfig {
if (ret < 0) {
// We were not able to connect to the local forwarder. Do not give up
// and retry.
- TRANSPORT_LOGE("Could not connect to local forwarder. Retrying.");
+ TRANSPORT_LOG_ERROR
+ << "Could not connect to local forwarder. Retrying.";
timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder,
@@ -79,7 +79,8 @@ class ForwarderConfig {
doGetMainListener(std::make_error_code(std::errc(0)));
}
} else {
- TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled.");
+ TRANSPORT_LOG_ERROR
+ << "Timer for re-trying forwarder connection canceled.";
}
}
@@ -90,9 +91,9 @@ class ForwarderConfig {
if (ret <= 0) {
// Since without the main listener of the forwarder the proxy cannot
// work, we can stop the program here until we get the listener port.
- TRANSPORT_LOGE(
- "Could not retrieve main listener port from the forwarder. "
- "Retrying.");
+ TRANSPORT_LOG_ERROR
+ << "Could not retrieve main listener port from the forwarder. "
+ "Retrying.";
timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this,
@@ -104,7 +105,8 @@ class ForwarderConfig {
listener_retrieved_callback_(std::make_error_code(std::errc(0)));
}
} else {
- TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled.");
+ TRANSPORT_LOG_ERROR
+ << "Timer for retrieving main hicn listener canceled.";
}
}
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc
index 7d8235ac6..c2448de9a 100644
--- a/apps/http-proxy/src/forwarder_interface.cc
+++ b/apps/http-proxy/src/forwarder_interface.cc
@@ -119,7 +119,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
for (unsigned i = 0; i < routes_to_remove.size(); i++) {
connids_to_remove.insert(routes_to_remove[i]->face_id);
if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
- TRANSPORT_LOGE("Error removing route from forwarder.");
+ TRANSPORT_LOG_ERROR << "Error removing route from forwarder.";
}
}
@@ -146,7 +146,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
for (unsigned i = 0; i < conns_to_remove.size(); i++) {
if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
- TRANSPORT_LOGE("Error removing connection from forwarder.");
+ TRANSPORT_LOG_ERROR << "Error removing connection from forwarder.";
}
}
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
index 262fcb8e1..2040f7cfa 100644
--- a/apps/http-proxy/src/http_proxy.cc
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -67,8 +67,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
std::string remote_address =
socket.remote_endpoint().address().to_string();
std::uint16_t remote_port = socket.remote_endpoint().port();
- TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(),
- remote_port);
+ TRANSPORT_LOG_INFO << "Client " << remote_address << ":"
+ << remote_port << "disconnected.";
} catch (std::system_error& e) {
// Do nothing
}
@@ -85,7 +85,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
private:
void consumeNextRequest() {
if (request_buffer_queue_.size() == 0) {
- TRANSPORT_LOGD("No additional requests to process.");
+ // No additional requests to process
return;
}
@@ -136,24 +136,24 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
current_size_ += size;
if (is_last) {
- TRANSPORT_LOGD("Request received: %s",
- std::string((const char*)tmp_buffer_.first->data(),
- tmp_buffer_.first->length())
- .c_str());
+ // TRANSPORT_LOGD("Request received: %s",
+ // std::string((const char*)tmp_buffer_.first->data(),
+ // tmp_buffer_.first->length())
+ // .c_str());
if (current_size_ < 1400) {
request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
} else {
- TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.",
- current_size_);
+ TRANSPORT_LOG_ERROR << "Ignoring client request due to size ("
+ << current_size_ << ") > 1400.";
session_->close();
current_size_ = 0;
return;
}
if (!consumer_.isRunning()) {
- TRANSPORT_LOGD(
- "Consumer stopped, triggering consume from TCP session "
- "handler..");
+ TRANSPORT_LOG_INFO
+ << "Consumer stopped, triggering consume from TCP session "
+ "handler..";
consumeNextRequest();
}
@@ -187,12 +187,13 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept {
// Response received. Send it back to client
auto _buffer = buffer.release();
- TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
+ // TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
session_->send(_buffer, []() {});
}
void readError(const std::error_code ec) noexcept {
- TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session.");
+ TRANSPORT_LOG_ERROR
+ << "Error reading from hicn consumer socket. Closing session.";
session_->close();
}
@@ -209,15 +210,14 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
* Let's grant it!
*/
if (metadata->method == "OPTIONS") {
- session_->send(
- (const uint8_t*)HTTPMessageFastParser::http_cors,
- std::strlen(HTTPMessageFastParser::http_cors), [this]() {
- auto& socket = session_->socket_;
- TRANSPORT_LOGI(
- "Sent OPTIONS to client %s:%d",
- socket.remote_endpoint().address().to_string().c_str(),
- socket.remote_endpoint().port());
- });
+ session_->send((const uint8_t*)HTTPMessageFastParser::http_cors,
+ std::strlen(HTTPMessageFastParser::http_cors), [this]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOG_INFO
+ << "Sent OPTIONS to client "
+ << socket.remote_endpoint().address() << ":"
+ << socket.remote_endpoint().port();
+ });
}
} else {
tcp_receiver_.parseHicnHeader(
@@ -230,14 +230,14 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
}
/* Route created. Send back a 200 OK to client */
- session_->send(
- (const uint8_t*)reply, std::strlen(reply), [this, result]() {
- auto& socket = session_->socket_;
- TRANSPORT_LOGI(
- "Sent %d response to client %s:%d", result,
- socket.remote_endpoint().address().to_string().c_str(),
- socket.remote_endpoint().port());
- });
+ session_->send((const uint8_t*)reply, std::strlen(reply),
+ [this, result]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOG_INFO
+ << "Sent " << result << " response to client "
+ << socket.remote_endpoint().address() << ":"
+ << socket.remote_endpoint().port();
+ });
});
}
}
@@ -313,7 +313,6 @@ void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {
void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
if (http_clients_.size() == 0) {
// Create new HTTPClientConnectionCallback
- TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback.");
http_clients_.emplace_back(
new HTTPClientConnectionCallback(*this, thread_));
}
@@ -332,7 +331,8 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
void HTTPProxy::setupSignalHandler() {
signals_.async_wait([this](const std::error_code& ec, int signal_number) {
if (!ec) {
- TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number);
+ TRANSPORT_LOG_INFO << "Received signal " << signal_number
+ << ". Stopping gracefully.";
stop();
}
});
diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc
index 6b91c12c3..84c814cbd 100644
--- a/apps/http-proxy/src/http_session.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -111,7 +111,6 @@ void HTTPSession::send(utils::MemBuf *buffer,
doWrite();
}
} else {
- TRANSPORT_LOGD("Tell the handle connect it has data to write");
data_available_ = true;
}
});
@@ -134,15 +133,11 @@ void HTTPSession::doWrite() {
asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_FALSE(!ec)) {
- TRANSPORT_LOGD("Content successfully sent! %zu",
- length);
write_msgs_.front().second();
write_msgs_.pop_front();
if (!write_msgs_.empty()) {
doWrite();
}
- } else {
- TRANSPORT_LOGD("Content NOT sent!");
}
});
} // namespace transport
@@ -274,7 +269,7 @@ void HTTPSession::doReadHeader() {
void HTTPSession::tryReconnection() {
if (on_connection_closed_callback_(socket_)) {
if (state_ == ConnectorState::CONNECTED) {
- TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
+ TRANSPORT_LOG_ERROR << "Connection lost. Trying to reconnect...";
state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
io_service_.post([this]() {
@@ -290,35 +285,35 @@ void HTTPSession::tryReconnection() {
}
void HTTPSession::doConnect() {
- asio::async_connect(socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
- if (!ec) {
- timer_.cancel();
- state_ = ConnectorState::CONNECTED;
+ asio::async_connect(
+ socket_, endpoint_iterator_,
+ [this](std::error_code ec, tcp::resolver::iterator) {
+ if (!ec) {
+ timer_.cancel();
+ state_ = ConnectorState::CONNECTED;
- asio::ip::tcp::no_delay noDelayOption(true);
- socket_.set_option(noDelayOption);
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
- // on_reconnect_callback_();
+ // on_reconnect_callback_();
- doReadHeader();
+ doReadHeader();
- if (data_available_ && !write_msgs_.empty()) {
- data_available_ = false;
- doWrite();
- }
+ if (data_available_ && !write_msgs_.empty()) {
+ data_available_ = false;
+ doWrite();
+ }
- if (is_reconnection_) {
- is_reconnection_ = false;
- TRANSPORT_LOGD("Connection recovered!");
- }
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ TRANSPORT_LOG_INFO << "Connection recovered!";
+ }
- } else {
- TRANSPORT_LOGE("Impossible to reconnect: %s",
- ec.message().c_str());
- close();
- }
- });
+ } else {
+ TRANSPORT_LOG_ERROR << "Impossible to reconnect: " << ec.message();
+ close();
+ }
+ });
}
bool HTTPSession::checkConnected() {
@@ -335,7 +330,7 @@ void HTTPSession::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
- TRANSPORT_LOGE("Error connecting. Is the server running?\n");
+ TRANSPORT_LOG_ERROR << "Error connecting. Is the server running?";
io_service_.stop();
});
}
diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc
index 23e5b5623..ea8ac7191 100644
--- a/apps/http-proxy/src/icn_receiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -55,28 +55,28 @@ AsyncConsumerProducer::AsyncConsumerProducer(
interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOGD("Warning: output buffer size has not been set.");
+ TRANSPORT_LOG_WARNING << "Warning: output buffer size has not been set.";
}
ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::MAKE_MANIFEST, manifest);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOGD("Warning: impossible to enable signatures.");
+ TRANSPORT_LOG_WARNING << "Warning: impossible to enable signatures.";
}
ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOGD("Warning: mtu has not been set.");
+ TRANSPORT_LOG_WARNING << "Warning: mtu has not been set.";
}
producer_socket_.registerPrefix(prefix_);
}
void AsyncConsumerProducer::start() {
- TRANSPORT_LOGD("Starting listening");
+ TRANSPORT_LOG_INFO << "Starting listening";
doReceive();
}
@@ -90,8 +90,8 @@ void AsyncConsumerProducer::run() {
void AsyncConsumerProducer::stop() {
io_service_.post([this]() {
- TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
- (unsigned long)request_counter_);
+ TRANSPORT_LOG_INFO << "Number of requests processed by plugin: "
+ << request_counter_;
producer_socket_.stop();
connector_.close();
});
@@ -123,16 +123,14 @@ void AsyncConsumerProducer::manageIncomingInterest(
if (_it != _end) {
if (_it->second.second) {
- TRANSPORT_LOGD(
- "Content is in production, interests will be satisfied shortly.");
return;
}
if (seg >= _it->second.first) {
- TRANSPORT_LOGD(
- "Ignoring interest with name %s for a content object which does not "
- "exist. (Request: %u, max: %u)",
- name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
+ // TRANSPORT_LOGD(
+ // "Ignoring interest with name %s for a content object which does not "
+ // "exist. (Request: %u, max: %u)",
+ // name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
return;
}
}
@@ -170,7 +168,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data,
options.getLifetime());
if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) {
- TRANSPORT_LOGD("Warning: content object lifetime has not been set.");
+ TRANSPORT_LOG_WARNING << "Warning: content object lifetime has not been set.";
}
const interface::Name& name = options.getName();
diff --git a/apps/ping/.clang-format b/apps/ping/.clang-format
new file mode 100644
index 000000000..cd21e2017
--- /dev/null
+++ b/apps/ping/.clang-format
@@ -0,0 +1,14 @@
+# Copyright (c) 2017-2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+BasedOnStyle: Google
diff --git a/apps/ping/CMakeLists.txt b/apps/ping/CMakeLists.txt
new file mode 100644
index 000000000..42f7f98c1
--- /dev/null
+++ b/apps/ping/CMakeLists.txt
@@ -0,0 +1,39 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if (NOT DISABLE_EXECUTABLES)
+ list (APPEND PING_LIBRARIES
+ ${LIBTRANSPORT_LIBRARIES}
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${WSOCK32_LIBRARY}
+ ${WS2_32_LIBRARY}
+ )
+
+ build_executable(hicn-ping-server
+ SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_server.cc
+ LINK_LIBRARIES ${PING_LIBRARIES}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT ${HICN_APPS}
+ LINK_FLAGS ${LINK_FLAGS}
+ )
+
+ build_executable(hicn-ping-client
+ SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_client.cc
+ LINK_LIBRARIES ${PING_LIBRARIES}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES}
+ COMPONENT ${HICN_APPS}
+ LINK_FLAGS ${LINK_FLAGS}
+ )
+endif () \ No newline at end of file
diff --git a/apps/ping/src/ping_client.cc b/apps/ping/src/ping_client.cc
new file mode 100644
index 000000000..24e0bf3ed
--- /dev/null
+++ b/apps/ping/src/ping_client.cc
@@ -0,0 +1,441 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/portal.h>
+
+#include <asio/signal_set.hpp>
+#include <asio/steady_timer.hpp>
+#include <chrono>
+#include <map>
+
+#define SYN_STATE 1
+#define ACK_STATE 2
+
+namespace transport {
+
+namespace core {
+
+namespace ping {
+
+typedef std::map<uint64_t, uint64_t> SendTimeMap;
+typedef auth::AsymmetricVerifier Verifier;
+
+class Configuration {
+ public:
+ uint64_t interestLifetime_;
+ uint64_t pingInterval_;
+ uint64_t maxPing_;
+ uint64_t first_suffix_;
+ std::string name_;
+ std::string certificate_;
+ uint16_t srcPort_;
+ uint16_t dstPort_;
+ bool verbose_;
+ bool dump_;
+ bool jump_;
+ bool open_;
+ bool always_syn_;
+ bool always_ack_;
+ bool quiet_;
+ uint32_t jump_freq_;
+ uint32_t jump_size_;
+ uint8_t ttl_;
+
+ Configuration() {
+ interestLifetime_ = 500; // ms
+ pingInterval_ = 1000000; // us
+ maxPing_ = 10; // number of interests
+ first_suffix_ = 0;
+ name_ = "b001::1"; // string
+ srcPort_ = 9695;
+ dstPort_ = 8080;
+ verbose_ = false;
+ dump_ = false;
+ jump_ = false;
+ open_ = false;
+ always_syn_ = false;
+ always_ack_ = false;
+ quiet_ = false;
+ jump_freq_ = 0;
+ jump_size_ = 0;
+ ttl_ = 64;
+ }
+};
+
+class Client : interface::Portal::ConsumerCallback {
+ public:
+ Client(Configuration *c)
+ : portal_(), signals_(portal_.getIoService(), SIGINT) {
+ // Let the main thread to catch SIGINT
+ portal_.connect();
+ portal_.setConsumerCallback(this);
+
+ signals_.async_wait(std::bind(&Client::afterSignal, this));
+
+ timer_.reset(new asio::steady_timer(portal_.getIoService()));
+ config_ = c;
+ sequence_number_ = config_->first_suffix_;
+ last_jump_ = 0;
+ processed_ = 0;
+ state_ = SYN_STATE;
+ sent_ = 0;
+ received_ = 0;
+ timedout_ = 0;
+ if (!c->certificate_.empty()) {
+ verifier_.useCertificate(c->certificate_);
+ }
+ }
+
+ virtual ~Client() {}
+
+ void ping() {
+ std::cout << "start ping" << std::endl;
+ doPing();
+ portal_.runEventsLoop();
+ }
+
+ void onContentObject(Interest &interest, ContentObject &object) override {
+ uint64_t rtt = 0;
+
+ if (!config_->certificate_.empty()) {
+ auto t0 = std::chrono::steady_clock::now();
+ if (verifier_.verifyPacket(&object)) {
+ auto t1 = std::chrono::steady_clock::now();
+ auto dt =
+ std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0);
+ std::cout << "Verification time: " << dt.count() << std::endl;
+ std::cout << "<<< Signature Ok." << std::endl;
+ } else {
+ std::cout << "<<< Signature verification failed!" << std::endl;
+ }
+ }
+
+ auto it = send_timestamps_.find(interest.getName().getSuffix());
+ if (it != send_timestamps_.end()) {
+ rtt = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ it->second;
+ send_timestamps_.erase(it);
+ }
+
+ if (config_->verbose_) {
+ std::cout << "<<< recevied object. " << std::endl;
+ std::cout << "<<< interest name: " << interest.getName()
+ << " src port: " << interest.getSrcPort()
+ << " dst port: " << interest.getDstPort()
+ << " flags: " << interest.printFlags() << std::endl;
+ std::cout << "<<< object name: " << object.getName()
+ << " src port: " << object.getSrcPort()
+ << " dst port: " << object.getDstPort()
+ << " flags: " << object.printFlags() << " path label "
+ << object.getPathLabel() << " ("
+ << (object.getPathLabel() >> 24) << ")"
+ << " TTL: " << (int)object.getTTL() << std::endl;
+ } else if (!config_->quiet_) {
+ std::cout << "<<< received object. " << std::endl;
+ std::cout << "<<< round trip: " << rtt << " [us]" << std::endl;
+ std::cout << "<<< interest name: " << interest.getName() << std::endl;
+ std::cout << "<<< object name: " << object.getName() << std::endl;
+ std::cout << "<<< content object size: "
+ << object.payloadSize() + object.headerSize() << " [bytes]"
+ << std::endl;
+ }
+
+ if (config_->dump_) {
+ std::cout << "----- interest dump -----" << std::endl;
+ interest.dump();
+ std::cout << "-------------------------" << std::endl;
+ std::cout << "----- object dump -------" << std::endl;
+ object.dump();
+ std::cout << "-------------------------" << std::endl;
+ }
+
+ if (!config_->quiet_) std::cout << std::endl;
+
+ if (!config_->always_syn_) {
+ if (object.testSyn() && object.testAck() && state_ == SYN_STATE) {
+ state_ = ACK_STATE;
+ }
+ }
+
+ received_++;
+ processed_++;
+ if (processed_ >= config_->maxPing_) {
+ afterSignal();
+ }
+ }
+
+ void onTimeout(Interest::Ptr &interest, const Name &name) override {
+ if (config_->verbose_) {
+ std::cout << "### timeout for " << name
+ << " src port: " << interest->getSrcPort()
+ << " dst port: " << interest->getDstPort()
+ << " flags: " << interest->printFlags() << std::endl;
+ } else if (!config_->quiet_) {
+ std::cout << "### timeout for " << name << std::endl;
+ }
+
+ if (config_->dump_) {
+ std::cout << "----- interest dump -----" << std::endl;
+ interest->dump();
+ std::cout << "-------------------------" << std::endl;
+ }
+
+ if (!config_->quiet_) std::cout << std::endl;
+
+ timedout_++;
+ processed_++;
+ if (processed_ >= config_->maxPing_) {
+ afterSignal();
+ }
+ }
+
+ void onError(std::error_code ec) override {}
+
+ void doPing() {
+ const Name interest_name(config_->name_, (uint32_t)sequence_number_);
+ hicn_format_t format;
+ if (interest_name.getAddressFamily() == AF_INET) {
+ format = HF_INET_TCP;
+ } else {
+ format = HF_INET6_TCP;
+ }
+
+ auto interest = std::make_shared<Interest>(interest_name, format);
+
+ interest->setLifetime(uint32_t(config_->interestLifetime_));
+ interest->resetFlags();
+
+ if (config_->open_ || config_->always_syn_) {
+ if (state_ == SYN_STATE) {
+ interest->setSyn();
+ } else if (state_ == ACK_STATE) {
+ interest->setAck();
+ }
+ } else if (config_->always_ack_) {
+ interest->setAck();
+ }
+
+ interest->setSrcPort(config_->srcPort_);
+ interest->setDstPort(config_->dstPort_);
+ interest->setTTL(config_->ttl_);
+
+ if (config_->verbose_) {
+ std::cout << ">>> send interest " << interest->getName()
+ << " src port: " << interest->getSrcPort()
+ << " dst port: " << interest->getDstPort()
+ << " flags: " << interest->printFlags()
+ << " TTL: " << (int)interest->getTTL() << std::endl;
+ } else if (!config_->quiet_) {
+ std::cout << ">>> send interest " << interest->getName() << std::endl;
+ }
+
+ if (config_->dump_) {
+ std::cout << "----- interest dump -----" << std::endl;
+ interest->dump();
+ std::cout << "-------------------------" << std::endl;
+ }
+
+ if (!config_->quiet_) std::cout << std::endl;
+
+ send_timestamps_[sequence_number_] =
+ std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ portal_.sendInterest(std::move(interest));
+
+ sequence_number_++;
+ sent_++;
+
+ if (sent_ < config_->maxPing_) {
+ this->timer_->expires_from_now(
+ std::chrono::microseconds(config_->pingInterval_));
+ this->timer_->async_wait([this](const std::error_code e) { doPing(); });
+ }
+ }
+
+ void afterSignal() {
+ std::cout << "Stop ping" << std::endl;
+ std::cout << "Sent: " << sent_ << " Received: " << received_
+ << " Timeouts: " << timedout_ << std::endl;
+ portal_.stopEventsLoop();
+ }
+
+ void reset() {
+ timer_.reset(new asio::steady_timer(portal_.getIoService()));
+ sequence_number_ = config_->first_suffix_;
+ last_jump_ = 0;
+ processed_ = 0;
+ state_ = SYN_STATE;
+ sent_ = 0;
+ received_ = 0;
+ timedout_ = 0;
+ }
+
+ private:
+ SendTimeMap send_timestamps_;
+ interface::Portal portal_;
+ asio::signal_set signals_;
+ uint64_t sequence_number_;
+ uint64_t last_jump_;
+ uint64_t processed_;
+ uint32_t state_;
+ uint32_t sent_;
+ uint32_t received_;
+ uint32_t timedout_;
+ std::unique_ptr<asio::steady_timer> timer_;
+ Configuration *config_;
+ Verifier verifier_;
+};
+
+void help() {
+ std::cout << "usage: hicn-consumer-ping [options]" << std::endl;
+ std::cout << "PING options" << std::endl;
+ std::cout
+ << "-i <val> ping interval in microseconds (default 1000000ms)"
+ << std::endl;
+ std::cout << "-m <val> maximum number of pings to send (default 10)"
+ << std::endl;
+ std::cout << "-s <val> sorce port (default 9695)" << std::endl;
+ std::cout << "-d <val> destination port (default 8080)" << std::endl;
+ std::cout << "-t <val> set packet ttl (default 64)" << std::endl;
+ std::cout << "-O open tcp connection (three way handshake) "
+ "(default false)"
+ << std::endl;
+ std::cout << "-S send always syn messages (default false)"
+ << std::endl;
+ std::cout << "-A send always ack messages (default false)"
+ << std::endl;
+ std::cout << "HICN options" << std::endl;
+ std::cout << "-n <val> hicn name (default b001::1)" << std::endl;
+ std::cout
+ << "-l <val> interest lifetime in milliseconds (default 500ms)"
+ << std::endl;
+ std::cout << "OUTPUT options" << std::endl;
+ std::cout << "-V verbose, prints statistics about the "
+ "messagges sent and received (default false)"
+ << std::endl;
+ std::cout << "-D dump, dumps sent and received packets "
+ "(default false)"
+ << std::endl;
+ std::cout << "-q quiet, not prints (default false)"
+ << std::endl;
+ std::cout << "-H prints this message" << std::endl;
+}
+
+int main(int argc, char *argv[]) {
+#ifdef _WIN32
+ WSADATA wsaData = {0};
+ WSAStartup(MAKEWORD(2, 2), &wsaData);
+#endif
+
+ Configuration *c = new Configuration();
+ int opt;
+ std::string producer_certificate = "";
+
+ while ((opt = getopt(argc, argv, "j::t:i:m:s:d:n:l:f:c:SAOqVDH")) != -1) {
+ switch (opt) {
+ case 't':
+ c->ttl_ = (uint8_t)std::stoi(optarg);
+ break;
+ case 'i':
+ c->pingInterval_ = std::stoi(optarg);
+ break;
+ case 'm':
+ c->maxPing_ = std::stoi(optarg);
+ break;
+ case 'f':
+ c->first_suffix_ = std::stoul(optarg);
+ break;
+ case 's':
+ c->srcPort_ = std::stoi(optarg);
+ break;
+ case 'd':
+ c->dstPort_ = std::stoi(optarg);
+ break;
+ case 'n':
+ c->name_ = optarg;
+ break;
+ case 'l':
+ c->interestLifetime_ = std::stoi(optarg);
+ break;
+ case 'V':
+ c->verbose_ = true;
+ ;
+ break;
+ case 'D':
+ c->dump_ = true;
+ break;
+ case 'O':
+ c->always_syn_ = false;
+ c->always_ack_ = false;
+ c->open_ = true;
+ break;
+ case 'S':
+ c->always_syn_ = true;
+ c->always_ack_ = false;
+ c->open_ = false;
+ break;
+ case 'A':
+ c->always_syn_ = false;
+ c->always_ack_ = true;
+ c->open_ = false;
+ break;
+ case 'q':
+ c->quiet_ = true;
+ c->verbose_ = false;
+ c->dump_ = false;
+ break;
+ case 'c':
+ c->certificate_ = std::string(optarg);
+ break;
+ case 'H':
+ default:
+ help();
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ auto ping = std::make_unique<Client>(c);
+
+ auto t0 = std::chrono::steady_clock::now();
+ ping->ping();
+ auto t1 = std::chrono::steady_clock::now();
+
+ std::cout
+ << "Elapsed time: "
+ << std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0).count()
+ << std::endl;
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+ return 0;
+}
+
+} // namespace ping
+
+} // namespace core
+
+} // namespace transport
+
+int main(int argc, char *argv[]) {
+ return transport::core::ping::main(argc, argv);
+}
diff --git a/apps/ping/src/ping_server.cc b/apps/ping/src/ping_server.cc
new file mode 100644
index 000000000..baf9c6698
--- /dev/null
+++ b/apps/ping/src/ping_server.cc
@@ -0,0 +1,340 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_producer.h>
+#ifndef _WIN32
+#include <hicn/transport/utils/daemonizator.h>
+#include <unistd.h>
+#else
+#include <openssl/applink.c>
+#endif
+
+#include <hicn/transport/auth/identity.h>
+#include <hicn/transport/auth/signer.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/string_tokenizer.h>
+
+#include <asio.hpp>
+
+namespace transport {
+
+namespace interface {
+
+using HashAlgorithm = core::HashAlgorithm;
+using CryptoSuite = auth::CryptoSuite;
+
+auth::Identity setProducerIdentity(std::string keystore_name,
+ std::string keystore_password,
+ auth::CryptoHashType hash_algorithm) {
+ if (access(keystore_name.c_str(), F_OK) != -1) {
+ return auth::Identity(keystore_name, keystore_password, hash_algorithm);
+ } else {
+ return auth::Identity(keystore_name, keystore_password,
+ CryptoSuite::RSA_SHA256, 1024, 365, "producer-test");
+ }
+}
+
+class CallbackContainer {
+ const std::size_t log2_content_object_buffer_size = 12;
+
+ public:
+ CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose,
+ bool dump, bool quite, bool flags, bool reset, uint8_t ttl,
+ auth::Identity *identity, bool sign, uint32_t lifetime)
+ : buffer_(object_size, 'X'),
+ content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)),
+ mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
+ content_objects_index_(0),
+ verbose_(verbose),
+ dump_(dump),
+ quite_(quite),
+ flags_(flags),
+ reset_(reset),
+ ttl_(ttl),
+ identity_(identity),
+ sign_(sign) {
+ core::Packet::Format format;
+
+ if (prefix.getAddressFamily() == AF_INET) {
+ format = core::Packet::Format::HF_INET_TCP;
+ if (sign_) {
+ format = core::Packet::Format::HF_INET_TCP_AH;
+ }
+ } else {
+ format = core::Packet::Format::HF_INET6_TCP;
+ if (sign_) {
+ format = core::Packet::Format::HF_INET6_TCP_AH;
+ }
+ }
+
+ for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
+ content_objects_[i] = std::make_shared<ContentObject>(
+ prefix, format, 0, (const uint8_t *)buffer_.data(), buffer_.size());
+ content_objects_[i]->setLifetime(lifetime);
+ }
+ }
+
+ void processInterest(ProducerSocket &p, const Interest &interest,
+ uint32_t lifetime) {
+ if (verbose_) {
+ std::cout << "<<< received interest " << interest.getName()
+ << " src port: " << interest.getSrcPort()
+ << " dst port: " << interest.getDstPort()
+ << " flags: " << interest.printFlags()
+ << "TTL: " << (int)interest.getTTL() << std::endl;
+ } else if (!quite_) {
+ std::cout << "<<< received interest " << interest.getName() << std::endl;
+ }
+
+ if (dump_) {
+ std::cout << "----- interest dump -----" << std::endl;
+ interest.dump();
+ std::cout << "-------------------------" << std::endl;
+ }
+
+ if (interest.testRst()) {
+ std::cout << "!!!got a reset, I don't reply" << std::endl;
+ } else {
+ auto &content_object = content_objects_[content_objects_index_++ & mask_];
+
+ content_object->setName(interest.getName());
+ content_object->setLifetime(lifetime);
+ content_object->setLocator(interest.getLocator());
+ content_object->setSrcPort(interest.getDstPort());
+ content_object->setDstPort(interest.getSrcPort());
+ content_object->setTTL(ttl_);
+
+ if (!sign_) {
+ content_object->resetFlags();
+ }
+
+ if (flags_) {
+ if (interest.testSyn()) {
+ content_object->setSyn();
+ content_object->setAck();
+ } else if (interest.testAck()) {
+ content_object->setAck();
+ } // here I may need to handle the FIN flag;
+ } else if (reset_) {
+ content_object->setRst();
+ }
+
+ if (verbose_) {
+ std::cout << ">>> send object " << content_object->getName()
+ << " src port: " << content_object->getSrcPort()
+ << " dst port: " << content_object->getDstPort()
+ << " flags: " << content_object->printFlags()
+ << " TTL: " << (int)content_object->getTTL() << std::endl;
+ } else if (!quite_) {
+ std::cout << ">>> send object " << content_object->getName()
+ << std::endl;
+ }
+
+ if (dump_) {
+ std::cout << "----- object dump -----" << std::endl;
+ content_object->dump();
+ std::cout << "-----------------------" << std::endl;
+ }
+
+ if (!quite_) std::cout << std::endl;
+
+ if (sign_) {
+ identity_->getSigner()->signPacket(content_object.get());
+ }
+
+ p.produce(*content_object);
+ }
+ }
+
+ private:
+ std::string buffer_;
+ std::vector<std::shared_ptr<ContentObject>> content_objects_;
+ std::uint16_t mask_;
+ std::uint16_t content_objects_index_;
+ bool verbose_;
+ bool dump_;
+ bool quite_;
+ bool flags_;
+ bool reset_;
+ uint8_t ttl_;
+ auth::Identity *identity_;
+ bool sign_;
+};
+
+void help() {
+ std::cout << "usage: hicn-preoducer-ping [options]" << std::endl;
+ std::cout << "PING options" << std::endl;
+ std::cout << "-s <val> object content size (default 1350B)" << std::endl;
+ std::cout << "-n <val> hicn name (default b001::/64)" << std::endl;
+ std::cout << "-f set tcp flags according to the flag received "
+ "(default false)"
+ << std::endl;
+ std::cout << "-l data lifetime" << std::endl;
+ std::cout << "-r always reply with a reset flag (default false)"
+ << std::endl;
+ std::cout << "-t set ttl (default 64)" << std::endl;
+ std::cout << "OUTPUT options" << std::endl;
+ std::cout << "-V verbose, prints statistics about the messagges sent "
+ "and received (default false)"
+ << std::endl;
+ std::cout << "-D dump, dumps sent and received packets (default false)"
+ << std::endl;
+ std::cout << "-q quite, not prints (default false)" << std::endl;
+#ifndef _WIN32
+ std::cout << "-d daemon mode" << std::endl;
+#endif
+ std::cout << "-H prints this message" << std::endl;
+}
+
+int main(int argc, char **argv) {
+#ifdef _WIN32
+ WSADATA wsaData = {0};
+ WSAStartup(MAKEWORD(2, 2), &wsaData);
+#else
+ bool daemon = false;
+#endif
+ std::string name_prefix = "b001::0/64";
+ std::string delimiter = "/";
+ bool verbose = false;
+ bool dump = false;
+ bool quite = false;
+ bool flags = false;
+ bool reset = false;
+ uint32_t object_size = 1250;
+ uint8_t ttl = 64;
+ std::string keystore_path = "./rsa_crypto_material.p12";
+ std::string keystore_password = "cisco";
+ bool sign = false;
+ uint32_t data_lifetime = default_values::content_object_expiry_time;
+
+ int opt;
+#ifndef _WIN32
+ while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDdHk:p:")) != -1) {
+#else
+ while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) {
+#endif
+ switch (opt) {
+ case 's':
+ object_size = std::stoi(optarg);
+ break;
+ case 'n':
+ name_prefix = optarg;
+ break;
+ case 't':
+ ttl = (uint8_t)std::stoi(optarg);
+ break;
+ case 'l':
+ data_lifetime = std::stoi(optarg);
+ break;
+ case 'V':
+ verbose = true;
+ break;
+ case 'D':
+ dump = true;
+ break;
+ case 'q':
+ verbose = false;
+ dump = false;
+ quite = true;
+ break;
+#ifndef _WIN32
+ case 'd':
+ daemon = true;
+ break;
+#endif
+ case 'f':
+ flags = true;
+ break;
+ case 'r':
+ reset = true;
+ break;
+ case 'k':
+ keystore_path = optarg;
+ sign = true;
+ break;
+ case 'p':
+ keystore_password = optarg;
+ break;
+ case 'H':
+ default:
+ help();
+ exit(EXIT_FAILURE);
+ }
+ }
+
+#ifndef _WIN32
+ if (daemon) {
+ utils::Daemonizator::daemonize();
+ }
+#endif
+
+ core::Prefix producer_namespace(name_prefix);
+
+ utils::StringTokenizer tokenizer(name_prefix, delimiter);
+ std::string ip_address = tokenizer.nextToken();
+ Name n(ip_address);
+
+ if (object_size > 1350) object_size = 1350;
+
+ CallbackContainer *stubs;
+ auth::Identity identity = setProducerIdentity(
+ keystore_path, keystore_password, auth::CryptoHashType::SHA256);
+
+ if (sign) {
+ stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags,
+ reset, ttl, &identity, sign, data_lifetime);
+ } else {
+ auth::Identity *identity = nullptr;
+ stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags,
+ reset, ttl, identity, sign, data_lifetime);
+ }
+
+ ProducerSocket p;
+ p.registerPrefix(producer_namespace);
+
+ p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+ p.setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(&CallbackContainer::processInterest, stubs,
+ std::placeholders::_1,
+ std::placeholders::_2, data_lifetime));
+
+ p.connect();
+
+ asio::io_service io_service;
+ asio::signal_set signal_set(io_service, SIGINT);
+ signal_set.async_wait(
+ [&p, &io_service](const std::error_code &, const int &) {
+ std::cout << "STOPPING!!" << std::endl;
+ p.stop();
+ io_service.stop();
+ });
+
+ io_service.run();
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+ return 0;
+}
+
+} // namespace interface
+
+} // end namespace transport
+
+int main(int argc, char **argv) {
+ return transport::interface::main(argc, argv);
+}