diff options
Diffstat (limited to 'apps')
24 files changed, 4058 insertions, 113 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index 37e44f9e7..df1b9fc7c 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) set(CMAKE_CXX_STANDARD 14) project(apps) @@ -22,25 +22,32 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules" ) +if (NOT CMAKE_BUILD_TYPE) + message(STATUS "${PROJECT_NAME}: No build type selected, default to Release") + set(CMAKE_BUILD_TYPE "Release") +endif () + include(BuildMacros) include(WindowsMacros) set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE) +find_package(Threads REQUIRED) +find_package(Libconfig++ REQUIRED) + if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) find_package(Libtransport REQUIRED) + find_package(Libhicn REQUIRED) find_package(hicnctrl REQUIRED) - find_package(Threads REQUIRED) else() if (DISABLE_SHARED_LIBRARIES) find_package(OpenSSL REQUIRED) - if (NOT WIN32) - find_package(ZLIB REQUIRED) - endif () set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC}) + set(LIBHICN_LIBRARIES ${LIBHICN_STATIC}) set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC}) else () set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED}) + set(LIBHICN_LIBRARIES ${LIBHICN_SHARED}) set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED}) endif () @@ -49,28 +56,20 @@ else() ) endif() -# Worksroung for unresolved symbols in vpp libraries -if(${CMAKE_SYSTEM_NAME} MATCHES Linux) - set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs") -endif() - -list(APPEND LIBRARIES - ${LIBTRANSPORT_LIBRARIES} - ${LIBHICNCTRL_LIBRARIES} - ${OPENSSL_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} -) - -set(APPS_LIBRARY_LIST "${OPENSSL_LIBRARIES};${CMAKE_THREAD_LIBS_INIT}" CACHE INTERNAL "APPS_LIBRARY_LIST") if (WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996") endif () include(Packaging) +add_subdirectory(ping) +add_subdirectory(hiperf) + set(HIGET higet) set(HTTP_PROXY hicn-http-proxy) + if (NOT WIN32) add_subdirectory(http-proxy) endif () + add_subdirectory(higet) diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/Modules/Packaging.cmake index 9b3fa2e72..981eb728e 100644 --- a/apps/cmake/Modules/Packaging.cmake +++ b/apps/cmake/Modules/Packaging.cmake @@ -18,21 +18,21 @@ useful for testing and debugging within a hicn network." ) set(${HICN_APPS}_DEB_DEPENDENCIES - "lib${LIBTRANSPORT} (>= stable_version)" + "lib${LIBTRANSPORT} (>= stable_version), lib${LIBHICNCTRL} (>= stable_version)" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}-dev_DEB_DEPENDENCIES - "lib${LIBTRANSPORT}-dev (>= stable_version)" + "${HICN_APPS} (>= stable_version), lib${LIBTRANSPORT}-dev (>= stable_version), lib${LIBHICNCTRL}-dev (>= stable_version)" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}_RPM_DEPENDENCIES - "lib${LIBTRANSPORT} >= stable_version" + "lib${LIBTRANSPORT} >= stable_version, lib${LIBHICNCTRL} >= stable_version" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}-dev_RPM_DEPENDENCIES - "lib${LIBTRANSPORT}-devel >= stable_version" + "${HICN_APPS} >= stable_version, lib${LIBTRANSPORT}-dev >= stable_version, lib${LIBHICNCTRL}-dev >= stable_version" CACHE STRING "Dependencies for deb/rpm package." )
\ No newline at end of file diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt index b929a24e4..8d112d3a3 100644 --- a/apps/higet/CMakeLists.txt +++ b/apps/higet/CMakeLists.txt @@ -11,11 +11,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) set(CMAKE_CXX_STANDARD 14) project(utils) +find_package(Threads REQUIRED) + set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules" @@ -38,7 +40,11 @@ endif() if (NOT DISABLE_EXECUTABLES) build_executable(${HIGET} SOURCES ${APPS_SRC} - LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY} + LINK_LIBRARIES + ${LIBTRANSPORT_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + ${WSOCK32_LIBRARY} + ${WS2_32_LIBRARY} DEPENDS ${LIBTRANSPORT_LIBRARIES} COMPONENT ${HICN_APPS} DEFINITIONS ${COMPILER_DEFINITIONS} diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt new file mode 100644 index 000000000..564525e67 --- /dev/null +++ b/apps/hiperf/CMakeLists.txt @@ -0,0 +1,44 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if (NOT DISABLE_EXECUTABLES) + list(APPEND HIPERF_SRC + ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc + ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc + ${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc + ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc + ) + + list (APPEND HIPERF_LIBRARIES + ${LIBTRANSPORT_LIBRARIES} + ${LIBHICNCTRL_LIBRARIES} + ${LIBHICN_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + ${LIBCONFIG_CPP_LIBRARIES} + ${WSOCK32_LIBRARY} + ${WS2_32_LIBRARY} + ) + + build_executable(hiperf + SOURCES ${HIPERF_SRC} + LINK_LIBRARIES ${HIPERF_LIBRARIES} + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/src + ${LIBTRANSPORT_INCLUDE_DIRS} + ${LIBHICNCTRL_INCLUDE_DIRS} + ${LIBCONFIG_CPP_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + ) +endif()
\ No newline at end of file diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc new file mode 100644 index 000000000..820ebf0ce --- /dev/null +++ b/apps/hiperf/src/client.cc @@ -0,0 +1,900 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <client.h> +#include <forwarder_config.h> +#include <forwarder_interface.h> + +#include <libconfig.h++> + +namespace hiperf { + +/** + * Forward declaration of client Read callbacks. + */ +class RTCCallback; +class Callback; + +/** + * Hiperf client class: configure and setup an hicn consumer following the + * ClientConfiguration. + */ +class HIperfClient::Impl +#ifdef FORWARDER_INTERFACE + : ForwarderInterface::ICallback +#endif +{ + typedef std::chrono::time_point<std::chrono::steady_clock> Time; + typedef std::chrono::microseconds TimeDuration; + + friend class Callback; + friend class RTCCallback; + + struct nack_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + uint32_t prod_seg; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + + inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } + inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } + }; + + public: + Impl(const hiperf::ClientConfiguration &conf) + : configuration_(conf), + total_duration_milliseconds_(0), + old_bytes_value_(0), + old_interest_tx_value_(0), + old_fec_interest_tx_value_(0), + old_fec_data_rx_value_(0), + old_lost_data_value_(0), + old_bytes_recovered_value_(0), + old_definitely_lost_data_value_(0), + old_retx_value_(0), + old_sent_int_value_(0), + old_received_nacks_value_(0), + old_fec_pkt_(0), + avg_data_delay_(0), + delay_sample_(0), + received_bytes_(0), + received_data_pkt_(0), + data_delays_(""), + signals_(io_service_), + rtc_callback_(*this), + callback_(*this), + socket_(io_service_), + done_(false), + switch_threshold_(~0) +#ifdef FORWARDER_INTERFACE + , + forwarder_interface_(io_service_, this) +#endif + { + } + + ~Impl() {} + + void checkReceivedRtcContent(ConsumerSocket &c, + const ContentObject &contentObject) {} + + void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} + + void addFace(const std::string &local_address, uint16_t local_port, + const std::string &remote_address, uint16_t remote_port, + std::string interface); + + void handleTimerExpiration(ConsumerSocket &c, + const TransportStatistics &stats) { + const char separator = ' '; + const int width = 15; + + utils::TimePoint t2 = utils::SteadyClock::now(); + auto exact_duration = + std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + + std::stringstream interval; + interval << total_duration_milliseconds_ / 1000 << "-" + << total_duration_milliseconds_ / 1000 + + exact_duration.count() / 1000; + + std::stringstream bytes_transferred; + bytes_transferred << std::fixed << std::setprecision(3) + << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 + << std::setfill(separator) << "[MB]"; + + std::stringstream bandwidth; + bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator) << "[Mbps]"; + + std::stringstream window; + window << stats.getAverageWindowSize() << std::setfill(separator) + << "[Int]"; + + std::stringstream avg_rtt; + avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + + if (configuration_.rtc_) { + // we get rtc stats more often, thus we need ms in the interval + std::stringstream interval_ms; + interval_ms << total_duration_milliseconds_ << "-" + << total_duration_milliseconds_ + exact_duration.count(); + + std::stringstream lost_data; + lost_data << stats.getLostData() - old_lost_data_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + old_bytes_recovered_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream definitely_lost_data; + definitely_lost_data << stats.getDefinitelyLostData() - + old_definitely_lost_data_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream data_delay; + data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; + + std::stringstream received_data_pkt; + received_data_pkt << received_data_pkt_ << std::setfill(separator) + << "[pkt]"; + + std::stringstream goodput; + goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 + << std::setfill(separator) << "[Mbps]"; + + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator) + << "[%]"; + + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - old_retx_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - old_sent_int_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream nacks; + nacks << stats.getReceivedNacks() - old_received_nacks_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream fec_pkt; + fec_pkt << stats.getReceivedFEC() - old_fec_pkt_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream queuing_delay; + queuing_delay << stats.getQueuingDelay() << std::setfill(separator) + << "[ms]"; + +#ifdef FORWARDER_INTERFACE + if (!done_ && stats.getQueuingDelay() >= switch_threshold_ && + total_duration_milliseconds_ > 1000) { + std::cout << "Switching due to queuing delay" << std::endl; + forwarder_interface_.createFaceAndRoutes(backup_routes_); + forwarder_interface_.deleteFaceAndRoutes(main_routes_); + std::swap(backup_routes_, main_routes_); + done_ = true; + } +#endif + + // statistics not yet available in the transport + // std::stringstream interest_fec_tx; + // interest_fec_tx << stats.getInterestFecTxCount() - + // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; + // std::stringstream bytes_fec_recv; + // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ + // << std::setfill(separator) << "[bytes]"; + std::cout << std::left << std::setw(width) << "Interval"; + std::cout << std::left << std::setw(width) << "RecvData"; + std::cout << std::left << std::setw(width) << "Bandwidth"; + std::cout << std::left << std::setw(width) << "Goodput"; + std::cout << std::left << std::setw(width) << "LossRate"; + std::cout << std::left << std::setw(width) << "Retr"; + std::cout << std::left << std::setw(width) << "InterestSent"; + std::cout << std::left << std::setw(width) << "ReceivedNacks"; + std::cout << std::left << std::setw(width) << "SyncWnd"; + std::cout << std::left << std::setw(width) << "MinRtt"; + std::cout << std::left << std::setw(width) << "QueuingDelay"; + std::cout << std::left << std::setw(width) << "LostData"; + std::cout << std::left << std::setw(width) << "RecoveredData"; + std::cout << std::left << std::setw(width) << "DefinitelyLost"; + std::cout << std::left << std::setw(width) << "State"; + std::cout << std::left << std::setw(width) << "DataDelay"; + std::cout << std::left << std::setw(width) << "FecPkt" << std::endl; + + std::cout << std::left << std::setw(width) << interval_ms.str(); + std::cout << std::left << std::setw(width) << received_data_pkt.str(); + std::cout << std::left << std::setw(width) << bandwidth.str(); + std::cout << std::left << std::setw(width) << goodput.str(); + std::cout << std::left << std::setw(width) << loss_rate.str(); + std::cout << std::left << std::setw(width) << retx_sent.str(); + std::cout << std::left << std::setw(width) << interest_sent.str(); + std::cout << std::left << std::setw(width) << nacks.str(); + std::cout << std::left << std::setw(width) << window.str(); + std::cout << std::left << std::setw(width) << avg_rtt.str(); + std::cout << std::left << std::setw(width) << queuing_delay.str(); + std::cout << std::left << std::setw(width) << lost_data.str(); + std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); + std::cout << std::left << std::setw(width) << definitely_lost_data.str(); + std::cout << std::left << std::setw(width) << stats.getCCStatus(); + std::cout << std::left << std::setw(width) << data_delay.str(); + std::cout << std::left << std::setw(width) << fec_pkt.str(); + std::cout << std::endl; + + if (configuration_.test_mode_) { + if (data_delays_.size() > 0) data_delays_.pop_back(); + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]" + << std::endl; + } + + // statistics not yet available in the transport + // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); + // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); + } else { + std::cout << std::left << std::setw(width) << "Interval"; + std::cout << std::left << std::setw(width) << "Transfer"; + std::cout << std::left << std::setw(width) << "Bandwidth"; + std::cout << std::left << std::setw(width) << "Retr"; + std::cout << std::left << std::setw(width) << "Cwnd"; + std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; + + std::cout << std::left << std::setw(width) << interval.str(); + std::cout << std::left << std::setw(width) << bytes_transferred.str(); + std::cout << std::left << std::setw(width) << bandwidth.str(); + std::cout << std::left << std::setw(width) << stats.getRetxCount(); + std::cout << std::left << std::setw(width) << window.str(); + std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; + std::cout << std::endl; + } + total_duration_milliseconds_ += (uint32_t)exact_duration.count(); + old_bytes_value_ = stats.getBytesRecv(); + old_lost_data_value_ = stats.getLostData(); + old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + old_definitely_lost_data_value_ = stats.getDefinitelyLostData(); + old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + old_fec_data_rx_value_ = stats.getBytesFecRecv(); + old_retx_value_ = stats.getRetxCount(); + old_sent_int_value_ = stats.getInterestTx(); + old_received_nacks_value_ = stats.getReceivedNacks(); + old_fec_pkt_ = stats.getReceivedFEC(); + delay_sample_ = 0; + avg_data_delay_ = 0; + received_bytes_ = 0; + received_data_pkt_ = 0; + data_delays_ = ""; + + t_stats_ = utils::SteadyClock::now(); + } + + bool parseConfig(const char *conf_file) { + using namespace libconfig; + Config cfg; + + try { + cfg.readFile(conf_file); + } catch (const FileIOException &fioex) { + std::cerr << "I/O error while reading file." << std::endl; + return false; + } catch (const ParseException &pex) { + std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine() + << " - " << pex.getError() << std::endl; + return false; + } + + Setting &config = cfg.getRoot(); + + if (config.exists("switch_threshold")) { + unsigned threshold; + config.lookupValue("switch_threshold", threshold); + switch_threshold_ = threshold; + } + + // listeners + if (config.exists("listeners")) { + // get path where looking for modules + const Setting &listeners = config.lookup("listeners"); + auto count = listeners.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &listener = listeners[i]; + ListenerConfig list; + unsigned port; + std::string interface; + + list.name = listener.getName(); + listener.lookupValue("local_address", list.address); + listener.lookupValue("local_port", port); + listener.lookupValue("interface", list.interface); + list.port = (uint16_t)(port); + + std::cout << "Adding listener " << list.name << ", ( " << list.address + << ":" << list.port << ")" << std::endl; + config_.addListener(std::move(list)); + } + } + + // connectors + if (config.exists("connectors")) { + // get path where looking for modules + const Setting &connectors = config.lookup("connectors"); + auto count = connectors.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &connector = connectors[i]; + ConnectorConfig conn; + + conn.name = connector.getName(); + unsigned port = 0; + + if (!connector.lookupValue("local_address", conn.local_address)) { + conn.local_address = ""; + } + + if (!connector.lookupValue("local_port", port)) { + port = 0; + } + + conn.local_port = (uint16_t)(port); + + if (!connector.lookupValue("remote_address", conn.remote_address)) { + std::cerr + << "Error in configuration file: remote_address is a mandatory " + "field of Connectors." + << std::endl; + return false; + } + + if (!connector.lookupValue("remote_port", port)) { + std::cerr << "Error in configuration file: remote_port is a " + "mandatory field of Connectors." + << std::endl; + return false; + } + + if (!connector.lookupValue("interface", conn.interface)) { + std::cerr << "Error in configuration file: interface is a " + "mandatory field of Connectors." + << std::endl; + return false; + } + + conn.remote_port = (uint16_t)(port); + + std::cout << "Adding connector " << conn.name << ", (" + << conn.local_address << ":" << conn.local_port << " " + << conn.remote_address << ":" << conn.remote_port << ")" + << std::endl; + config_.addConnector(conn.name, std::move(conn)); + } + } + + // Routes + if (config.exists("routes")) { + const Setting &routes = config.lookup("routes"); + auto count = routes.getLength(); + + for (int i = 0; i < count; i++) { + const Setting &route = routes[i]; + RouteConfig r; + unsigned weight; + + r.name = route.getName(); + route.lookupValue("prefix", r.prefix); + route.lookupValue("weight", weight); + route.lookupValue("main_connector", r.main_connector); + route.lookupValue("backup_connector", r.backup_connector); + r.weight = (uint16_t)(weight); + + std::cout << "Adding route " << r.name << " " << r.prefix << " (" + << r.main_connector << " " << r.backup_connector << " " + << r.weight << ")" << std::endl; + config_.addRoute(std::move(r)); + } + } + + std::cout << "Ok" << std::endl; + + return true; + } + + bool splitRoute(std::string route, std::string &prefix, + uint8_t &prefix_length) { + std::string delimiter = "/"; + + size_t pos = 0; + if ((pos = route.find(delimiter)) != std::string::npos) { + prefix = route.substr(0, pos); + route.erase(0, pos + delimiter.length()); + } else { + return false; + } + + prefix_length = std::stoul(route.substr(0)); + return true; + } + +#ifdef FORWARDER_INTERFACE + void onHicnServiceReady() override { + std::cout << "Successfully connected to local forwarder!" << std::endl; + + std::cout << "Setting up listeners" << std::endl; + const char *config = getenv("FORWARDER_CONFIG"); + + if (config) { + if (!parseConfig(config)) { + return; + } + + // Create faces and route using first face in the list. + auto &routes = config_.getRoutes(); + auto &connectors = config_.getConnectors(); + + if (routes.size() == 0 || connectors.size() == 0) { + std::cerr << "Nothing to configure" << std::endl; + return; + } + + for (auto &route : routes) { + auto the_connector_it = connectors.find(route.main_connector); + if (the_connector_it == connectors.end()) { + std::cerr << "No valid main connector found for route " << route.name + << std::endl; + continue; + } + + auto &the_connector = the_connector_it->second; + auto route_info = std::make_shared<ForwarderInterface::RouteInfo>(); + route_info->family = AF_INET; + route_info->local_addr = the_connector.local_address; + route_info->local_port = the_connector.local_port; + route_info->remote_addr = the_connector.remote_address; + route_info->remote_port = the_connector.remote_port; + route_info->interface = the_connector.interface; + route_info->name = the_connector.name; + + std::string prefix; + uint8_t prefix_length; + auto ret = splitRoute(route.prefix, prefix, prefix_length); + + if (!ret) { + std::cerr << "Error parsing route" << std::endl; + return; + } + + route_info->route_addr = prefix; + route_info->route_len = prefix_length; + + main_routes_.emplace_back(route_info); + + if (!route.backup_connector.empty()) { + // Add also the backup route + auto the_backup_connector_it = + connectors.find(route.backup_connector); + if (the_backup_connector_it == connectors.end()) { + std::cerr << "No valid backup connector found for route " + << route.name << std::endl; + continue; + } + + auto &the_backup_connector = the_backup_connector_it->second; + auto backup_route_info = + std::make_shared<ForwarderInterface::RouteInfo>(); + backup_route_info->family = AF_INET; + backup_route_info->local_addr = the_backup_connector.local_address; + backup_route_info->local_port = the_backup_connector.local_port; + backup_route_info->remote_addr = the_backup_connector.remote_address; + backup_route_info->remote_port = the_backup_connector.remote_port; + backup_route_info->interface = the_backup_connector.interface; + backup_route_info->name = the_backup_connector.name; + + std::string prefix; + uint8_t prefix_length; + auto ret = splitRoute(route.prefix, prefix, prefix_length); + + if (!ret) { + std::cerr << "Error parsing route" << std::endl; + return; + } + + backup_route_info->route_addr = prefix; + backup_route_info->route_len = prefix_length; + + backup_routes_.emplace_back(backup_route_info); + } + } + + // Create main routes + std::cout << "Creating main routes" << std::endl; + forwarder_interface_.createFaceAndRoutes(main_routes_); + } + } + + void onRouteConfigured( + std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override { + std::cout << "Routes successfully configured!" << std::endl; + } +#endif + + int setup() { + int ret; + + if (configuration_.rtc_) { + configuration_.transport_protocol_ = RTC; + } else if (configuration_.window < 0) { + configuration_.transport_protocol_ = RAAQM; + } else { + configuration_.transport_protocol_ = CBR; + } + + if (configuration_.relay_ && configuration_.rtc_) { + int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); + producer_socket_->registerPrefix(configuration_.relay_name_); + producer_socket_->connect(); + } + + if (configuration_.output_stream_mode_ && configuration_.rtc_) { + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + } + + if (configuration_.secure_) { + consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>( + RAAQM, configuration_.transport_protocol_); + if (configuration_.producer_prefix_.getPrefixLength() == 0) { + std::cerr << "ERROR -- Missing producer prefix on which perform the " + "handshake." + << std::endl; + } else { + P2PSecureConsumerSocket &secure_consumer_socket = + *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get())); + secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); + } + } else { + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + } + + consumer_socket_->setSocketOption( + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); + +#if defined(DEBUG) && defined(__linux__) + std::shared_ptr<transport::BasePortal> portal; + consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); + signals_ = + std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1); + signals_->async_wait([this](const std::error_code &, const int &) { + std::cout << "Signal SIGUSR1!" << std::endl; + mtrace(); + }); +#endif + + if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, + configuration_.window) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the size of the window." + << std::endl; + return ERROR_SETUP; + } + + if (configuration_.transport_protocol_ == RAAQM && + configuration_.beta != -1.f) { + if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, + configuration_.beta) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.transport_protocol_ == RAAQM && + configuration_.drop_factor != -1.f) { + if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, + configuration_.drop_factor) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (!configuration_.producer_certificate.empty()) { + std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>( + configuration_.producer_certificate); + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) + return ERROR_SETUP; + } + + if (!configuration_.passphrase.empty()) { + std::shared_ptr<Verifier> verifier = + std::make_shared<SymmetricVerifier>(configuration_.passphrase); + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this, + std::placeholders::_1, + std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (!configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, &callback_); + } else { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); + } + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (ConsumerContentObjectCallback)std::bind( + &Impl::checkReceivedRtcContent, this, std::placeholders::_1, + std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + std::shared_ptr<TransportStatistics> transport_stats; + consumer_socket_->getSocketOption( + OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::STATS_SUMMARY, + (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this, + std::placeholders::_1, + std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::STATS_INTERVAL, + configuration_.report_interval_milliseconds_) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + consumer_socket_->connect(); + + return ERROR_SUCCESS; + } + + int run() { + std::cout << "Starting download of " << configuration_.name << std::endl; + + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); + + t_download_ = t_stats_ = std::chrono::steady_clock::now(); + consumer_socket_->asyncConsume(configuration_.name); + io_service_.run(); + + consumer_socket_->stop(); + + return ERROR_SUCCESS; + } + + private: + class RTCCallback : public ConsumerSocket::ReadCallback { + static constexpr std::size_t mtu = 1500; + + public: + RTCCallback(Impl &hiperf_client) : client_(hiperf_client) { + client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + } + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = + client_.configuration_.receive_buffer->writableData(); + *max_length = mtu; + } + + void readDataAvailable(std::size_t length) noexcept override { + client_.received_bytes_ += length; + client_.received_data_pkt_++; + + // collecting delay stats. Just for performance testing + uint64_t *senderTimeStamp = + (uint64_t *)client_.configuration_.receive_buffer->writableData(); + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + double new_delay = (double)(now - *senderTimeStamp); + + if (*senderTimeStamp > now) + new_delay = -1 * (double)(*senderTimeStamp - now); + + client_.delay_sample_++; + client_.avg_data_delay_ = + client_.avg_data_delay_ + + (new_delay - client_.avg_data_delay_) / client_.delay_sample_; + + if (client_.configuration_.test_mode_) { + client_.data_delays_ += std::to_string(int(new_delay)); + client_.data_delays_ += ","; + } + + if (client_.configuration_.relay_) { + client_.producer_socket_->produceDatagram( + client_.configuration_.relay_name_.getName(), + client_.configuration_.receive_buffer->writableData(), + length < 1400 ? length : 1400); + } + if (client_.configuration_.output_stream_mode_) { + uint8_t *start = + (uint8_t *)client_.configuration_.receive_buffer->writableData(); + start += sizeof(uint64_t); + std::size_t pkt_len = length - sizeof(uint64_t); + client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_); + } + } + + size_t maxBufferSize() const override { return mtu; } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error while reading from RTC socket" << std::endl; + client_.io_service_.stop(); + } + + void readSuccess(std::size_t total_size) noexcept override { + std::cout << "Data successfully read" << std::endl; + } + + private: + Impl &client_; + }; + + class Callback : public ConsumerSocket::ReadCallback { + public: + Callback(Impl &hiperf_client) : client_(hiperf_client) { + client_.configuration_.receive_buffer = + utils::MemBuf::create(client_.configuration_.receive_buffer_size_); + } + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = + client_.configuration_.receive_buffer->writableData(); + *max_length = client_.configuration_.receive_buffer_size_; + } + + void readDataAvailable(std::size_t length) noexcept override {} + + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} + + size_t maxBufferSize() const override { + return client_.configuration_.receive_buffer_size_; + } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error " << ec.message() << " while reading from socket" + << std::endl; + client_.io_service_.stop(); + } + + void readSuccess(std::size_t total_size) noexcept override { + Time t2 = std::chrono::steady_clock::now(); + TimeDuration dt = + std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); + long usec = (long)dt.count(); + + std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" + << std::endl; + + std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " + << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" + << std::endl; + + client_.io_service_.stop(); + } + + private: + Impl &client_; + }; + + hiperf::ClientConfiguration configuration_; + Time t_stats_; + Time t_download_; + uint32_t total_duration_milliseconds_; + uint64_t old_bytes_value_; + uint64_t old_interest_tx_value_; + uint64_t old_fec_interest_tx_value_; + uint64_t old_fec_data_rx_value_; + uint64_t old_lost_data_value_; + uint64_t old_bytes_recovered_value_; + uint64_t old_definitely_lost_data_value_; + uint32_t old_retx_value_; + uint32_t old_sent_int_value_; + uint32_t old_received_nacks_value_; + uint32_t old_fec_pkt_; + + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_; + uint32_t delay_sample_; + + uint32_t received_bytes_; + uint32_t received_data_pkt_; + + std::string data_delays_; + + asio::io_service io_service_; + asio::signal_set signals_; + RTCCallback rtc_callback_; + Callback callback_; + std::unique_ptr<ConsumerSocket> consumer_socket_; + std::unique_ptr<ProducerSocket> producer_socket_; + asio::ip::udp::socket socket_; + asio::ip::udp::endpoint remote_; + + ForwarderConfiguration config_; + uint16_t switch_threshold_; /* ms */ + bool done_; + std::vector<ForwarderInterface::RouteInfoPtr> main_routes_; + std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_; +#ifdef FORWARDER_INTERFACE + ForwarderInterface forwarder_interface_; +#endif +}; + +HIperfClient::HIperfClient(const ClientConfiguration &conf) { + impl_ = new Impl(conf); +} + +HIperfClient::~HIperfClient() { delete impl_; } + +int HIperfClient::setup() { return impl_->setup(); } + +void HIperfClient::run() { impl_->run(); } + +} // namespace hiperf diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h new file mode 100644 index 000000000..f45b9af43 --- /dev/null +++ b/apps/hiperf/src/client.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <common.h> + +namespace hiperf { + +class HIperfClient { + public: + HIperfClient(const ClientConfiguration &conf); + ~HIperfClient(); + int setup(); + void run(); + + private: + class Impl; + Impl *impl_; +}; + +} // namespace hiperf
\ No newline at end of file diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h new file mode 100644 index 000000000..e6ba526f9 --- /dev/null +++ b/apps/hiperf/src/common.h @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/auth/identity.h> +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/interfaces/p2psecure_socket_consumer.h> +#include <hicn/transport/interfaces/p2psecure_socket_producer.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <hicn/transport/utils/literals.h> + +#ifndef _WIN32 +#include <hicn/transport/utils/daemonizator.h> +#endif + +#include <asio.hpp> +#include <cmath> +#include <fstream> +#include <iomanip> +#include <sstream> +#include <string> +#include <unordered_set> + +#ifndef ERROR_SUCCESS +#define ERROR_SUCCESS 0 +#endif +#define ERROR_SETUP -5 +#define MIN_PROBE_SEQ 0xefffffff + +using namespace transport::interface; +using namespace transport::auth; +using namespace transport::core; + +static inline uint64_t _ntohll(const uint64_t *input) { + uint64_t return_val; + uint8_t *tmp = (uint8_t *)&return_val; + + tmp[0] = *input >> 56; + tmp[1] = *input >> 48; + tmp[2] = *input >> 40; + tmp[3] = *input >> 32; + tmp[4] = *input >> 24; + tmp[5] = *input >> 16; + tmp[6] = *input >> 8; + tmp[7] = *input >> 0; + + return return_val; +} + +static inline uint64_t _htonll(const uint64_t *input) { + return (_ntohll(input)); +} + +namespace hiperf { + +/** + * Class for handling the production rate for the RTC producer. + */ +class Rate { + public: + Rate() : rate_kbps_(0) {} + + Rate(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + } + + Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} + + Rate &operator=(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + + return *this; + } + + std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { + return std::chrono::microseconds( + (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); + } + + private: + float rate_kbps_; +}; + +struct packet_t { + uint64_t timestamp; + uint32_t size; +}; + +/** + * Container for command line configuration for hiperf client. + */ +struct ClientConfiguration { + ClientConfiguration() + : name("b001::abcd", 0), + beta(-1.f), + drop_factor(-1.f), + window(-1), + producer_certificate(""), + passphrase(""), + receive_buffer(nullptr), + receive_buffer_size_(128 * 1024), + download_size(0), + report_interval_milliseconds_(1000), + transport_protocol_(CBR), + rtc_(false), + test_mode_(false), + relay_(false), + secure_(false), + producer_prefix_(), + interest_lifetime_(500), + relay_name_("c001::abcd/64"), + output_stream_mode_(false), + port_(0) {} + + Name name; + double beta; + double drop_factor; + double window; + std::string producer_certificate; + std::string passphrase; + std::shared_ptr<utils::MemBuf> receive_buffer; + std::size_t receive_buffer_size_; + std::size_t download_size; + std::uint32_t report_interval_milliseconds_; + TransportProtocolAlgorithms transport_protocol_; + bool rtc_; + bool test_mode_; + bool relay_; + bool secure_; + Prefix producer_prefix_; + uint32_t interest_lifetime_; + Prefix relay_name_; + bool output_stream_mode_; + uint16_t port_; +}; + +/** + * Container for command line configuration for hiperf server. + */ +struct ServerConfiguration { + ServerConfiguration() + : name("b001::abcd/64"), + virtual_producer(true), + manifest(false), + live_production(false), + content_lifetime(600000000_U32), + download_size(20 * 1024 * 1024), + hash_algorithm(CryptoHashType::SHA256), + keystore_name(""), + passphrase(""), + keystore_password("cisco"), + multiphase_produce_(false), + rtc_(false), + interactive_(false), + trace_based_(false), + trace_index_(0), + trace_file_(nullptr), + production_rate_(std::string("2048kbps")), + payload_size_(1400), + secure_(false), + input_stream_mode_(false), + port_(0) {} + + Prefix name; + bool virtual_producer; + bool manifest; + bool live_production; + std::uint32_t content_lifetime; + std::uint32_t download_size; + CryptoHashType hash_algorithm; + std::string keystore_name; + std::string passphrase; + std::string keystore_password; + bool multiphase_produce_; + bool rtc_; + bool interactive_; + bool trace_based_; + std::uint32_t trace_index_; + char *trace_file_; + Rate production_rate_; + std::size_t payload_size_; + bool secure_; + bool input_stream_mode_; + uint16_t port_; + std::vector<struct packet_t> trace_; +}; + +} // namespace hiperf diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h new file mode 100644 index 000000000..655ac3b66 --- /dev/null +++ b/apps/hiperf/src/forwarder_config.h @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <unordered_map> +#include <vector> + +namespace hiperf { + +struct ListenerConfig { + std::string address; + std::uint16_t port; + std::string interface; + std::string name; +}; + +struct ConnectorConfig { + std::string local_address; + std::uint16_t local_port; + std::string remote_address; + std::uint16_t remote_port; + std::string interface; + std::string name; +}; + +struct RouteConfig { + std::string prefix; + uint16_t weight; + std::string main_connector; + std::string backup_connector; + std::string name; +}; + +class ForwarderConfiguration { + public: + ForwarderConfiguration() : n_threads_(1) {} + + bool empty() { + return listeners_.empty() && connectors_.empty() && routes_.empty(); + } + + ForwarderConfiguration &setThreadNumber(std::size_t threads) { + n_threads_ = threads; + return *this; + } + + std::size_t getThreadNumber() { return n_threads_; } + + template <typename... Args> + ForwarderConfiguration &addListener(Args &&...args) { + listeners_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + ForwarderConfiguration &addConnector(const std::string &name, + Args &&...args) { + connectors_.emplace(name, std::forward<Args>(args)...); + return *this; + } + + template <typename... Args> + ForwarderConfiguration &addRoute(Args &&...args) { + routes_.emplace_back(std::forward<Args>(args)...); + return *this; + } + + std::vector<ListenerConfig> &getListeners() { return listeners_; } + + std::unordered_map<std::string, ConnectorConfig> &getConnectors() { + return connectors_; + } + + std::vector<RouteConfig> &getRoutes() { return routes_; } + + private: + std::vector<ListenerConfig> listeners_; + std::unordered_map<std::string, ConnectorConfig> connectors_; + std::vector<RouteConfig> routes_; + std::size_t n_threads_; +}; + +}
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc new file mode 100644 index 000000000..864208239 --- /dev/null +++ b/apps/hiperf/src/forwarder_interface.cc @@ -0,0 +1,676 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <arpa/inet.h> +#include <forwarder_interface.h> +#include <hicn/transport/utils/log.h> + +#include <chrono> +#include <iostream> +#include <thread> +#include <unordered_set> + +extern "C" { +#include <hicn/error.h> +#include <hicn/util/ip_address.h> +} + +// XXX the main listener should be retrieve in this class at initialization, aka +// when hICN becomes avialable +// +// XXX the main listener port will be retrieved in the forwarder +// interface... everything else will be delayed until we have this +// information + +namespace hiperf { + +ForwarderInterface::ForwarderInterface(asio::io_service &io_service, + ICallback *callback) + : external_ioservice_(io_service), + forwarder_interface_callback_(callback), + work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), + sock_(nullptr), + thread_(std::make_unique<std::thread>([this]() { + std::cout << "Starting Forwarder Interface thread" << std::endl; + internal_ioservice_.run(); + std::cout << "Stopping Forwarder Interface thread" << std::endl; + })), + // set_route_callback_(std::forward<Callback &&>(setRouteCallback)), + check_routes_timer_(nullptr), + pending_add_route_counter_(0), + hicn_listen_port_(9695), + /* We start in disabled state even when a forwarder is always available */ + state_(State::Disabled), + timer_(io_service), + num_reattempts(0) { + std::cout << "Forwarder interface created... connecting to forwarder...\n"; + internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); +} + +ForwarderInterface::~ForwarderInterface() { + if (thread_ && thread_->joinable()) { + internal_ioservice_.dispatch([this]() { + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + + work_.reset(); + }); + + thread_->join(); + } + + std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl; +} + +void ForwarderInterface::onHicnServiceAvailable(bool flag) { + if (flag) { + switch (state_) { + case State::Disabled: + case State::Requested: + state_ = State::Available; + case State::Available: + connectToForwarder(); + /* Synchronous */ + if (state_ != State::Connected) { + std::cout << "ConnectToForwarder failed" << std::endl; + goto REATTEMPT; + } + state_ = State::Ready; + + std::cout << "Connected to forwarder... cancelling reconnection timer" + << std::endl; + timer_.cancel(); + num_reattempts = 0; + + // case State::Connected: + // checkListener(); + + // if (state_ != State::Ready) { + // std::cout << "Listener not found" << std::endl; + // goto REATTEMPT; + // } + // state_ = State::Ready; + + // timer_.cancel(); + // num_reattempts = 0; + + std::cout << "Forwarder interface is ready... communicate to controller" + << std::endl; + + forwarder_interface_callback_->onHicnServiceReady(); + + case State::Ready: + break; + } + } else { + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + state_ = State::Disabled; // XXX to be checked upon callback to prevent the + // state from going forward (used to manage + // concurrency) + } + return; + +REATTEMPT: + /* Schedule reattempt */ + std::cout << "Failed to connect, scheduling reattempt" << std::endl; + num_reattempts++; + + timer_.expires_from_now( + std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS)); + // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable, + // this, flag, std::placeholders::_1)); + timer_.async_wait([this, flag](std::error_code ec) { + if (ec) return; + onHicnServiceAvailable(flag); + }); +} + +int ForwarderInterface::connectToForwarder() { + sock_ = hc_sock_create(); + if (!sock_) { + std::cout << "Could not create socket" << std::endl; + goto ERR_SOCK; + } + + if (hc_sock_connect(sock_) < 0) { + std::cout << "Could not connect to forwarder" << std::endl; + goto ERR; + } + + std::cout << "Forwarder interface connected" << std::endl; + state_ = State::Connected; + return 0; + +ERR: + hc_sock_free(sock_); + sock_ = nullptr; +ERR_SOCK: + return -1; +} + +int ForwarderInterface::checkListener() { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) return -1; + + int ret = -1; + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + hicn_listen_port_ = l->local_port; + state_ = State::Ready; + ret = 0; + std::cout << "Got listener port" << std::endl; + break; + } + } + + hc_data_free(data); + return ret; +} + +void ForwarderInterface::close() { + std::cout << "ForwarderInterface::close" << std::endl; + + state_ = State::Disabled; + /* Cancelling eventual reattempts */ + timer_.cancel(); + + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + + internal_ioservice_.post([this]() { work_.reset(); }); + + if (thread_->joinable()) { + thread_->join(); + } +} + +#if 0 +void ForwarderInterface::enableCheckRoutesTimer() { + if (check_routes_timer_ != nullptr) return; + + check_routes_timer_ = + std::make_unique<asio::steady_timer>(internal_ioservice_); + checkRoutesLoop(); +} + +void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) { + internalRemoveConnectedUser(protocol); +} + +void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) { + internal_ioservice_.post( + [this, protocol]() { internalRemoveConnectedUser(protocol); }); +} +#endif + +void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) { + std::vector<RouteInfoPtr> routes; + routes.push_back(std::move(route_info)); + createFaceAndRoutes(routes); +} + +void ForwarderInterface::createFaceAndRoutes( + const std::vector<RouteInfoPtr> &routes_info) { + pending_add_route_counter_++; + auto timer = new asio::steady_timer(internal_ioservice_); + internal_ioservice_.post([this, routes_info, timer]() { + internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT, + timer); + }); +} + +void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) { + std::vector<RouteInfoPtr> routes; + routes.push_back(std::move(route_info)); + deleteFaceAndRoutes(routes); +} + +void ForwarderInterface::deleteFaceAndRoutes( + const std::vector<RouteInfoPtr> &routes_info) { + internal_ioservice_.post([this, routes_info]() { + for (auto &route : routes_info) { + internalDeleteFaceAndRoute(route); + } + }); +} + +void ForwarderInterface::internalDeleteFaceAndRoute( + const RouteInfoPtr &route_info) { + if (!sock_) return; + + hc_data_t *data; + if (hc_route_list(sock_, &data) < 0) return; + + std::vector<hc_route_t *> routes_to_remove; + foreach_route(r, data) { + char remote_addr[INET6_ADDRSTRLEN]; + int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + if (ret < 0) continue; + + std::string route_addr(remote_addr); + if (route_addr.compare(route_info->route_addr) == 0 && + r->len == route_info->route_len) { + // route found + routes_to_remove.push_back(r); + } + } + + if (routes_to_remove.size() == 0) { + // nothing to do here + hc_data_free(data); + return; + } + + std::unordered_set<uint32_t> connids_to_remove; + for (unsigned i = 0; i < routes_to_remove.size(); i++) { + connids_to_remove.insert(routes_to_remove[i]->face_id); + if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { + std::cout << "Error removing route from forwarder." << std::endl; + } + } + + // remove connection + if (hc_connection_list(sock_, &data) < 0) { + hc_data_free(data); + return; + } + + // collects pointerst to the connections using the conn IDs + std::vector<hc_connection_t *> conns_to_remove; + foreach_connection(c, data) { + if (connids_to_remove.find(c->id) != connids_to_remove.end()) { + // conn found + conns_to_remove.push_back(c); + } + } + + if (conns_to_remove.size() == 0) { + // nothing else to do here + hc_data_free(data); + return; + } + + for (unsigned i = 0; i < conns_to_remove.size(); i++) { + if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { + std::cout << "Error removing connection from forwarder." << std::endl; + } + } + + hc_data_free(data); +} + +void ForwarderInterface::internalCreateFaceAndRoutes( + const std::vector<RouteInfoPtr> &route_info, uint8_t max_try, + asio::steady_timer *timer) { + uint32_t face_id; + + std::vector<RouteInfoPtr> failed; + for (auto &route : route_info) { + int ret = tryToCreateFace(route.get(), &face_id); + if (ret >= 0) { + auto ret = tryToCreateRoute(route.get(), face_id); + if (ret < 0) { + failed.push_back(route); + std::cerr << "Error creating route and face" << std::endl; + continue; + } + } + } + + if (failed.size() > 0) { + if (max_try == 0) { + /* All attempts failed */ + goto RESULT; + } + max_try--; + timer->expires_from_now(std::chrono::milliseconds(500)); + timer->async_wait([this, failed, max_try, timer](std::error_code ec) { + if (ec) return; + internalCreateFaceAndRoutes(failed, max_try, timer); + }); + return; + } + +#if 0 + // route_status_[protocol] = std::move(route_info); + for (size_t i = 0; i < route_info.size(); i++) { + route_status_.insert( + std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i]))); + } +#endif + +RESULT: + std::cout << "Face / Route create ok, now calling back protocol" << std::endl; + pending_add_route_counter_--; + external_ioservice_.post([this, r = std::move(route_info)]() mutable { + forwarder_interface_callback_->onRouteConfigured(r); + }); + delete timer; +} + +int ForwarderInterface::tryToCreateFace(RouteInfo *route_info, + uint32_t *face_id) { + bool found = false; + + // check connection with the forwarder + if (!sock_) { + std::cout << "[ForwarderInterface::tryToCreateFace] socket error" + << std::endl; + goto ERR_SOCK; + } + + // get listeners list + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) { + std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners"; + goto ERR_LIST; + } + + char _local_address[128]; + foreach_listener(l, data) { + std::cout << "Processing " << l->interface_name << std::endl; + std::string interface = std::string(l->interface_name); + int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET); + if (ret < 0) { + std::cerr << "Error in ip_address_ntop" << std::endl; + goto ERR; + } + + std::string local_address = std::string(_local_address); + uint16_t local_port = l->local_port; + + if (interface.compare(route_info->interface) == 0 && + local_address.compare(route_info->local_addr) == 0 && + local_port == route_info->local_port) { + found = true; + break; + } + } + + std::cout << route_info->remote_addr << std::endl; + + ip_address_t local_address, remote_address; + ip_address_pton(route_info->local_addr.c_str(), &local_address); + ip_address_pton(route_info->remote_addr.c_str(), &remote_address); + + if (!found) { + // Create listener + hc_listener_t listener; + memset(&listener, 0, sizeof(hc_listener_t)); + + std::string name = "l_" + route_info->name; + listener.local_addr = local_address; + listener.type = CONNECTION_TYPE_UDP; + listener.family = AF_INET; + listener.local_port = route_info->local_port; + strncpy(listener.name, name.c_str(), sizeof(listener.name)); + strncpy(listener.interface_name, route_info->interface.c_str(), + sizeof(listener.interface_name)); + + std::cout << "------------> " << route_info->interface << std::endl; + + int ret = hc_listener_create(sock_, &listener); + + if (ret < 0) { + std::cerr << "Error creating listener." << std::endl; + return -1; + } else { + std::cout << "Listener " << listener.id << " created." << std::endl; + } + } + + // Create face + hc_face_t face; + memset(&face, 0, sizeof(hc_face_t)); + + // crate face with the local interest + face.face.type = FACE_TYPE_UDP; + face.face.family = route_info->family; + face.face.local_addr = local_address; + face.face.remote_addr = remote_address; + face.face.local_port = route_info->local_port; + face.face.remote_port = route_info->remote_port; + + if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) < + 0) { + std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] " + "netdevice_set_name " + "(" + << face.face.netdevice.name << ", " + << route_info->interface << ") error" << std::endl; + goto ERR; + } + + // create face + if (hc_face_create(sock_, &face) < 0) { + std::cout << "[ForwarderInterface::tryToCreateFace] error creating face"; + goto ERR; + } + + std::cout << "Face created successfully" << std::endl; + + // assing face to the return value + *face_id = face.id; + + hc_data_free(data); + return 0; + +ERR: + hc_data_free(data); +ERR_LIST: +ERR_SOCK: + return -1; +} + +int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info, + uint32_t face_id) { + std::cout << "Trying to create route" << std::endl; + + // check connection with the forwarder + if (!sock_) { + std::cout << "[ForwarderInterface::tryToCreateRoute] socket error"; + return -1; + } + + ip_address_t route_ip; + hc_route_t route; + + if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error"; + return -1; + } + + route.face_id = face_id; + route.family = AF_INET6; + route.remote_addr = route_ip; + route.len = route_info->route_len; + route.cost = 1; + + if (hc_route_create(sock_, &route) < 0) { + std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route"; + return -1; + } + + std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl; + return 0; +} + +#if 0 // not used +void ForwarderInterface::checkRoutesLoop() { + check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000)); + check_routes_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + if (pending_add_route_counter_ == 0) checkRoutes(); + }); +} + +void ForwarderInterface::checkRoutes() { + std::cout << "someone called the checkRoutes function" << std::endl; + if (!sock_) return; + + hc_data_t *data; + if (hc_route_list(sock_, &data) < 0) { + return; + } + + std::unordered_set<std::string> routes_set; + foreach_route(r, data) { + char remote_addr[INET6_ADDRSTRLEN]; + int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + if (ret < 0) continue; + std::string route(std::string(remote_addr) + "/" + std::to_string(r->len)); + routes_set.insert(route); + } + + for (auto it = route_status_.begin(); it != route_status_.end(); it++) { + std::string route(it->second->route_addr + "/" + + std::to_string(it->second->route_len)); + if (routes_set.find(route) == routes_set.end()) { + // the route is missing + createFaceAndRoute(it->second, it->first); + break; + } + } + + hc_data_free(data); +} +#endif + +#if 0 + using ListenerRetrievedCallback = + std::function<void(std::error_code, uint32_t)>; + + ListenerRetrievedCallback listener_retrieved_callback_; + +#ifdef __ANDROID__ + hicn_listen_port_(9695), +#else + hicn_listen_port_(0), +#endif + timer_(forward_engine_.getIoService()), + + void initConfigurationProtocol(void) + { + // We need the configuration, which is different for every protocol... + // so we move this step down towards the protocol implementation itself. + if (!permanent_hicn) { + doInitConfigurationProtocol(); + } else { + // XXX This should be moved somewhere else + getMainListener( + [this](std::error_code ec, uint32_t hicn_listen_port) { + if (!ec) + { + hicn_listen_port_ = hicn_listen_port; + doInitConfigurationProtocol(); + } + }); + } + } + + template <typename Callback> + void getMainListener(Callback &&callback) + { + listener_retrieved_callback_ = std::forward<Callback &&>(callback); + tryToConnectToForwarder(); + } + private: + void doGetMainListener(std::error_code ec) + { + if (!ec) + { + // ec == 0 --> timer expired + int ret = forwarder_interface_.getMainListenerPort(); + if (ret <= 0) + { + // Since without the main listener of the forwarder the proxy cannot + // work, we can stop the program here until we get the listener port. + std::cout << + "Could not retrieve main listener port from the forwarder. " + "Retrying."; + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&Protocol::doGetMainListener, this, + std::placeholders::_1)); + } + else + { + timer_.cancel(); + retx_count_ = 0; + hicn_listen_port_ = uint16_t(ret); + listener_retrieved_callback_( + make_error_code(configuration_error::success), hicn_listen_port_); + } + } + else + { + std::cout << "Timer for retrieving main hicn listener canceled." << std::endl; + } + } + + void tryToConnectToForwarder() + { + doTryToConnectToForwarder(std::make_error_code(std::errc(0))); + } + + void doTryToConnectToForwarder(std::error_code ec) + { + if (!ec) + { + // ec == 0 --> timer expired + int ret = forwarder_interface_.connect(); + if (ret < 0) + { + // We were not able to connect to the local forwarder. Do not give up + // and retry. + std::cout << "Could not connect to local forwarder. Retrying." << std::endl; + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this, + std::placeholders::_1)); + } + else + { + timer_.cancel(); + retx_count_ = 0; + doGetMainListener(std::make_error_code(std::errc(0))); + } + } + else + { + std::cout << "Timer for re-trying forwarder connection canceled." << std::endl; + } + } + + + template <typename ProtocolImplementation> + constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL; + +#endif + +constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS; +constexpr uint32_t ForwarderInterface::MAX_REATTEMPT; + +} // namespace hiperf
\ No newline at end of file diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h new file mode 100644 index 000000000..7591ea257 --- /dev/null +++ b/apps/hiperf/src/forwarder_interface.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern "C" { +#ifndef WITH_POLICY +#define WITH_POLICY +#endif +#include <hicn/ctrl/api.h> +#include <hicn/util/ip_address.h> +} + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include <asio.hpp> + +#include <functional> +#include <thread> +#include <unordered_map> + +namespace hiperf { + +class ForwarderInterface { + static const uint32_t REATTEMPT_DELAY_MS = 500; + static const uint32_t MAX_REATTEMPT = 10; + + public: + struct RouteInfo { + int family; + std::string local_addr; + uint16_t local_port; + std::string remote_addr; + uint16_t remote_port; + std::string route_addr; + uint8_t route_len; + std::string interface; + std::string name; + }; + + using RouteInfoPtr = std::shared_ptr<RouteInfo>; + + class ICallback { + public: + virtual void onHicnServiceReady() = 0; + virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0; + }; + + enum class State { + Disabled, /* Stack is stopped */ + Requested, /* Stack is starting */ + Available, /* Forwarder is running */ + Connected, /* Control socket connected */ + Ready, /* Listener present */ + }; + + public: + ForwarderInterface(asio::io_service &io_service, ICallback *callback); + + ~ForwarderInterface(); + + State getState(); + + void setState(State state); + + void onHicnServiceAvailable(bool flag); + + void enableCheckRoutesTimer(); + + void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info); + + void createFaceAndRoute(const RouteInfoPtr &route_info); + + void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info); + + void deleteFaceAndRoute(const RouteInfoPtr &route_info); + + void close(); + + uint16_t getHicnListenerPort() { return hicn_listen_port_; } + + private: + int connectToForwarder(); + + int checkListener(); + + void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info, + uint8_t max_try, asio::steady_timer *timer); + + void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info); + + int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id); + int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id); + + void checkRoutesLoop(); + + void checkRoutes(); + + asio::io_service &external_ioservice_; + asio::io_service internal_ioservice_; + ICallback *forwarder_interface_callback_; + std::unique_ptr<asio::io_service::work> work_; + hc_sock_t *sock_; + std::unique_ptr<std::thread> thread_; + // SetRouteCallback set_route_callback_; + // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_; + std::unique_ptr<asio::steady_timer> check_routes_timer_; + uint32_t pending_add_route_counter_; + uint16_t hicn_listen_port_; + + State state_; + + /* Reattempt timer */ + asio::steady_timer timer_; + unsigned num_reattempts; +}; + +} // namespace hiperf diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc new file mode 100644 index 000000000..b2d99c4a4 --- /dev/null +++ b/apps/hiperf/src/main.cc @@ -0,0 +1,456 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <client.h> +#include <server.h> +#include <forwarder_interface.h> + +namespace hiperf { + +void usage() { + std::cerr << "HIPERF - A tool for performing network throughput " + "measurements with hICN" + << std::endl; + std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; + std::cerr << std::endl; + std::cerr << "SERVER OR CLIENT:" << std::endl; +#ifndef _WIN32 + std::cerr << "-D\t\t\t\t\t" + << "Run as a daemon" << std::endl; + std::cerr << "-R\t\t\t\t\t" + << "Run RTC protocol (client or server)" << std::endl; + std::cerr << "-f\t<filename>\t\t\t" + << "Log file" << std::endl; + std::cerr << "-z\t<io_module>\t\t\t" + << "IO module to use. Default: hicnlight_module" << std::endl; +#endif + std::cerr << std::endl; + std::cerr << "SERVER SPECIFIC:" << std::endl; + std::cerr << "-A\t<content_size>\t\t\t" + "Size of the content to publish. This " + "is not the size of the packet (see -s for it)." + << std::endl; + std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." + << std::endl; + std::cerr << "-r\t\t\t\t\t" + << "Produce real content of <content_size> bytes" << std::endl; + std::cerr << "-m\t\t\t\t\t" + << "Produce transport manifest" << std::endl; + std::cerr << "-l\t\t\t\t\t" + << "Start producing content upon the reception of the " + "first interest" + << std::endl; + std::cerr << "-K\t<keystore_path>\t\t\t" + << "Path of p12 file containing the " + "crypto material used for signing packets" + << std::endl; + std::cerr << "-k\t<passphrase>\t\t\t" + << "String from which a 128-bit symmetric key will be " + "derived for signing packets" + << std::endl; + std::cerr << "-y\t<hash_algorithm>\t\t" + << "Use the selected hash algorithm for " + "calculating manifest digests" + << std::endl; + std::cerr << "-p\t<password>\t\t\t" + << "Password for p12 keystore" << std::endl; + std::cerr << "-x\t\t\t\t\t" + << "Produce a content of <content_size>, then after downloading " + "it produce a new content of" + << "\n\t\t\t\t\t<content_size> without resetting " + "the suffix to 0." + << std::endl; + std::cerr << "-B\t<bitrate>\t\t\t" + << "Bitrate for RTC producer, to be used with the -R option." + << std::endl; +#ifndef _WIN32 + std::cerr << "-I\t\t\t\t\t" + "Interactive mode, start/stop real time content production " + "by pressing return. To be used with the -R option" + << std::endl; + std::cerr + << "-T\t<filename>\t\t\t" + "Trace based mode, hiperf takes as input a file with a trace. " + "Each line of the file indicates the timestamp and the size of " + "the packet to generate. To be used with the -R option. -B and -I " + "will be ignored." + << std::endl; + std::cerr << "-E\t\t\t\t\t" + << "Enable encrypted communication. Requires the path to a p12 " + "file containing the " + "crypto material used for the TLS handshake" + << std::endl; + std::cerr << "-G\t<port>\t\t\t" + << "input stream from localhost at the specified port" << std::endl; +#endif + std::cerr << std::endl; + std::cerr << "CLIENT SPECIFIC:" << std::endl; + std::cerr << "-b\t<beta_parameter>\t\t" + << "RAAQM beta parameter" << std::endl; + std::cerr << "-d\t<drop_factor_parameter>\t\t" + << "RAAQM drop factor " + "parameter" + << std::endl; + std::cerr << "-L\t<interest lifetime>\t\t" + << "Set interest lifetime." << std::endl; + std::cerr << "-M\t<input_buffer_size>\t\t" + << "Size of consumer input buffer. If 0, reassembly of packets " + "will be disabled." + << std::endl; + std::cerr << "-W\t<window_size>\t\t\t" + << "Use a fixed congestion window " + "for retrieving the data." + << std::endl; + std::cerr << "-i\t<stats_interval>\t\t" + << "Show the statistics every <stats_interval> milliseconds." + << std::endl; + std::cerr << "-c\t<certificate_path>\t\t" + << "Path of the producer certificate to be used for verifying the " + "origin of the packets received." + << std::endl; + std::cerr << "-k\t<passphrase>\t\t\t" + << "String from which is derived the symmetric key used by the " + "producer to sign packets and by the consumer to verify them." + << std::endl; + std::cerr << "-t\t\t\t\t\t" + "Test mode, check if the client is receiving the " + "correct data. This is an RTC specific option, to be " + "used with the -R (default false)" + << std::endl; + std::cerr << "-P\t\t\t\t\t" + << "Prefix of the producer where to do the handshake" << std::endl; + std::cerr << "-j\t<relay_name>\t\t\t" + << "Publish the received content under the name relay_name." + "This is an RTC specific option, to be " + "used with the -R (default false)" + << std::endl; + std::cerr << "-g\t<port>\t\t\t" + << "output stream to localhost at the specified port" << std::endl; +} + +int main(int argc, char *argv[]) { +#ifndef _WIN32 + // Common + bool daemon = false; +#else + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + + // -1 server, 0 undefined, 1 client + int role = 0; + int options = 0; + + char *log_file = nullptr; + transport::interface::global_config::IoModuleConfiguration config; + std::string conf_file; + config.name = "hicnlight_module"; + + // Consumer + ClientConfiguration client_configuration; + + // Producer + ServerConfiguration server_configuration; + + int opt; +#ifndef _WIN32 + while ((opt = getopt( + argc, argv, + "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) != + -1) { + switch (opt) { + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + server_configuration.trace_based_ = false; + server_configuration.input_stream_mode_ = false; + break; + } + case 'T': { + server_configuration.interactive_ = false; + server_configuration.trace_based_ = true; + server_configuration.input_stream_mode_ = false; + server_configuration.trace_file_ = optarg; + break; + } + case 'G': { + server_configuration.interactive_ = false; + server_configuration.trace_based_ = false; + server_configuration.input_stream_mode_ = true; + server_configuration.port_ = std::stoul(optarg); + break; + } + case 'g': { + client_configuration.output_stream_mode_ = true; + client_configuration.port_ = std::stoul(optarg); + break; + } +#else + while ((opt = getopt(argc, argv, + "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) != + -1) { + switch (opt) { +#endif + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } + case 'z': { + config.name = optarg; + break; + } + case 'F': { + conf_file = optarg; + break; + } + + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } + case 'k': { + server_configuration.passphrase = std::string(optarg); + client_configuration.passphrase = std::string(optarg); + break; + } + + // Client specifc + case 'b': { + client_configuration.beta = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.receive_buffer_size_ = std::stoull(optarg); + options = 1; + break; + } + case 'P': { + client_configuration.producer_prefix_ = Prefix(optarg); + client_configuration.secure_ = true; + break; + } + case 'c': { + client_configuration.producer_certificate = std::string(optarg); + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } + case 't': { + client_configuration.test_mode_ = true; + options = 1; + break; + } + case 'L': { + client_configuration.interest_lifetime_ = std::stoul(optarg); + options = 1; + break; + } + case 'j': { + client_configuration.relay_ = true; + client_configuration.relay_name_ = Prefix(optarg); + options = 1; + break; + } + // Server specific + case 'A': { + server_configuration.download_size = std::stoul(optarg); + options = -1; + break; + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest = true; + options = -1; + break; + } + case 'l': { + server_configuration.live_production = true; + options = -1; + break; + } + case 'K': { + server_configuration.keystore_name = std::string(optarg); + options = -1; + break; + } + case 'y': { + if (strncasecmp(optarg, "sha256", 6) == 0) { + server_configuration.hash_algorithm = CryptoHashType::SHA256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + server_configuration.hash_algorithm = CryptoHashType::SHA512; + } else if (strncasecmp(optarg, "blake2b512", 10) == 0) { + server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512; + } else if (strncasecmp(optarg, "blake2s256", 10) == 0) { + server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256; + } else { + std::cerr << "Ignored unknown hash algorithm. Using SHA 256." + << std::endl; + } + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + server_configuration.production_rate_ = str; + options = -1; + break; + } + case 'E': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.secure_ = true; + break; + } + case 'h': + default: + usage(); + return EXIT_FAILURE; + } + } + + if (options > 0 && role < 0) { + std::cerr << "Client options cannot be used when using the " + "software in server mode" + << std::endl; + usage(); + return EXIT_FAILURE; + } else if (options < 0 && role > 0) { + std::cerr << "Server options cannot be used when using the " + "software in client mode" + << std::endl; + usage(); + return EXIT_FAILURE; + } else if (!role) { + std::cerr << "Please specify if running hiperf as client " + "or server." + << std::endl; + usage(); + return EXIT_FAILURE; + } + + if (argv[optind] == 0) { + std::cerr << "Please specify the name/prefix to use." << std::endl; + usage(); + return EXIT_FAILURE; + } else { + if (role > 0) { + client_configuration.name = Name(argv[optind]); + } else { + server_configuration.name = Prefix(argv[optind]); + } + } + + if (log_file) { +#ifndef _WIN32 + int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR); + dup2(fd, STDOUT_FILENO); + dup2(STDOUT_FILENO, STDERR_FILENO); + close(fd); +#else + int fd = + _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD); + _dup2(fd, _fileno(stdout)); + _dup2(_fileno(stdout), _fileno(stderr)); + _close(fd); +#endif + } + +#ifndef _WIN32 + if (daemon) { + utils::Daemonizator::daemonize(false); + } +#endif + + /** + * IO module configuration + */ + config.set(); + + // Parse config file + transport::interface::global_config::parseConfigurationFile(conf_file); + + if (role > 0) { + HIperfClient c(client_configuration); + if (c.setup() != ERROR_SETUP) { + c.run(); + } + } else if (role < 0) { + HIperfServer s(server_configuration); + if (s.setup() != ERROR_SETUP) { + s.run(); + } + } else { + usage(); + return EXIT_FAILURE; + } + +#ifdef _WIN32 + WSACleanup(); +#endif + + return 0; +} + +} // namespace hiperf + +int main(int argc, char *argv[]) { return hiperf::main(argc, argv); } diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc new file mode 100644 index 000000000..968d42e2c --- /dev/null +++ b/apps/hiperf/src/server.cc @@ -0,0 +1,516 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <server.h> + +namespace hiperf { + +/** + * Hiperf server class: configure and setup an hicn producer following the + * ServerConfiguration. + */ +class HIperfServer::Impl { + const std::size_t log2_content_object_buffer_size = 8; + + public: + Impl(const hiperf::ServerConfiguration &conf) + : configuration_(conf), + signals_(io_service_), + rtc_timer_(io_service_), + unsatisfied_interests_(), + content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), + content_objects_index_(0), + mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), + last_segment_(0), +#ifndef _WIN32 + ptr_last_segment_(&last_segment_), + input_(io_service_), + rtc_running_(false), +#else + ptr_last_segment_(&last_segment_), +#endif + flow_name_(configuration_.name.getName()), + socket_(io_service_), + recv_buffer_(nullptr, 0) { + std::string buffer(configuration_.payload_size_, 'X'); + std::cout << "Producing contents under name " << conf.name.getName() + << std::endl; +#ifndef _WIN32 + if (configuration_.interactive_) { + input_.assign(::dup(STDIN_FILENO)); + } +#endif + + for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { + content_objects_[i] = ContentObject::Ptr( + new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, + (const uint8_t *)buffer.data(), buffer.size())); + content_objects_[i]->setLifetime( + default_values::content_object_expiry_time); + } + } + + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { + content_objects_[content_objects_index_ & mask_]->setName( + interest.getName()); + producer_socket_->produce( + *content_objects_[content_objects_index_++ & mask_]); + } + + void processInterest(ProducerSocket &p, const Interest &interest) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)VOID_HANDLER); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + 5000000_U32); + + produceContent(p, interest.getName(), interest.getName().getSuffix()); + std::cout << "Received interest " << interest.getName().getSuffix() + << std::endl; + } + + void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind(&Impl::cacheMiss, this, + std::placeholders::_1, + std::placeholders::_2)); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + 5000000_U32); + uint32_t suffix = interest.getName().getSuffix(); + + if (suffix == 0) { + last_segment_ = 0; + ptr_last_segment_ = &last_segment_; + unsatisfied_interests_.clear(); + } + + // The suffix will either be the one from the received interest or the + // smallest suffix of a previous interest not satisfed + if (!unsatisfied_interests_.empty()) { + auto it = + std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), *ptr_last_segment_); + if (it != unsatisfied_interests_.end()) { + suffix = *it; + } + unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); + } + + std::cout << "Received interest " << interest.getName().getSuffix() + << ", starting production at " << suffix << std::endl; + std::cout << unsatisfied_interests_.size() << " interests still unsatisfied" + << std::endl; + produceContentAsync(p, interest.getName(), suffix); + } + + void produceContent(ProducerSocket &p, const Name &content_name, + uint32_t suffix) { + auto b = utils::MemBuf::create(configuration_.download_size); + std::memset(b->writableData(), '?', configuration_.download_size); + b->append(configuration_.download_size); + uint32_t total; + + utils::TimePoint t0 = utils::SteadyClock::now(); + total = p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + utils::TimePoint t1 = utils::SteadyClock::now(); + + std::cout + << "Written " << total + << " data packets in output buffer (Segmentation time: " + << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() + << " us)" << std::endl; + } + + void produceContentAsync(ProducerSocket &p, Name content_name, + uint32_t suffix) { + auto b = utils::MemBuf::create(configuration_.download_size); + std::memset(b->writableData(), '?', configuration_.download_size); + b->append(configuration_.download_size); + + p.asyncProduce(content_name, std::move(b), + !configuration_.multiphase_produce_, suffix, + &ptr_last_segment_); + } + + void cacheMiss(ProducerSocket &p, const Interest &interest) { + unsatisfied_interests_.push_back(interest.getName().getSuffix()); + } + + void onContentProduced(ProducerSocket &p, const std::error_code &err, + uint64_t bytes_written) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &Impl::asyncProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + } + + std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path, + std::string &keystore_pwd, + CryptoHashType &hash_type) { + if (access(keystore_path.c_str(), F_OK) != -1) { + return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type); + } + return std::make_shared<Identity>(keystore_path, keystore_pwd, + CryptoSuite::RSA_SHA256, 1024, 365, + "producer-test"); + } + + int setup() { + int ret; + int production_protocol; + + if (configuration_.secure_) { + auto identity = getProducerIdentity(configuration_.keystore_name, + configuration_.keystore_password, + configuration_.hash_algorithm); + producer_socket_ = std::make_unique<P2PSecureProducerSocket>( + configuration_.rtc_, identity); + } else { + if (!configuration_.rtc_) { + production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; + } else { + production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + } + + producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (!configuration_.passphrase.empty()) { + std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>( + CryptoSuite::HMAC_SHA256, configuration_.passphrase); + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + } + + if (!configuration_.keystore_name.empty()) { + auto identity = getProducerIdentity(configuration_.keystore_name, + configuration_.keystore_password, + configuration_.hash_algorithm); + std::shared_ptr<Signer> signer = identity->getSigner(); + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + } + + uint32_t rtc_header_size = 0; + if (configuration_.rtc_) rtc_header_size = 12; + producer_socket_->setSocketOption( + GeneralTransportOptions::DATA_PACKET_SIZE, + (uint32_t)( + configuration_.payload_size_ + rtc_header_size + + (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); + producer_socket_->registerPrefix(configuration_.name); + producer_socket_->connect(); + + if (configuration_.rtc_) { + std::cout << "Running RTC producer: the prefix length will be ignored." + " Use /128 by default in RTC mode" + << std::endl; + return ERROR_SUCCESS; + } + + if (!configuration_.virtual_producer) { + if (producer_socket_->setSocketOption( + GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (!configuration_.live_production) { + produceContent(*producer_socket_, configuration_.name.getName(), 0); + } else { + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this, + std::placeholders::_1, + std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + } else { + ret = producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this, + std::placeholders::_1, + std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + (ProducerContentCallback)bind( + &Impl::onContentProduced, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + + return ERROR_SUCCESS; + } + + void receiveStream() { + socket_.async_receive_from( + asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_, + [this](std::error_code ec, std::size_t length) { + if (ec) return; + sendRTCContentFromStream(recv_buffer_.first, length); + receiveStream(); + }); + } + + void sendRTCContentFromStream(uint8_t *buff, std::size_t len) { + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + // this is used to compute the data packet delay + // Used only for performance evaluation + // It requires clock synchronization between producer and consumer + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + uint8_t *start = (uint8_t *)payload->writableData(); + std::memcpy(start, &now, sizeof(uint64_t)); + std::memcpy(start + sizeof(uint64_t), buff, len); + producer_socket_->produceDatagram(flow_name_, start, + len + sizeof(uint64_t)); + } + + void sendRTCContentObjectCallback(std::error_code ec) { + if (ec) return; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + + // this is used to compute the data packet delay + // Used only for performance evaluation + // It requires clock synchronization between producer and consumer + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + + producer_socket_->produceDatagram( + flow_name_, payload->data(), + payload->length() < 1400 ? payload->length() : 1400); + } + + void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + if (ec) return; + + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + + uint32_t packet_len = + configuration_.trace_[configuration_.trace_index_].size; + + // this is used to compute the data packet delay + // used only for performance evaluation + // it requires clock synchronization between producer and consumer + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + + if (packet_len > payload->length()) packet_len = payload->length(); + if (packet_len > 1400) packet_len = 1400; + + producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); + + uint32_t next_index = configuration_.trace_index_ + 1; + uint64_t schedule_next; + if (next_index < configuration_.trace_.size()) { + schedule_next = + configuration_.trace_[next_index].timestamp - + configuration_.trace_[configuration_.trace_index_].timestamp; + } else { + // here we need to loop, schedule in a random time + schedule_next = 1000; + } + + configuration_.trace_index_ = + (configuration_.trace_index_ + 1) % configuration_.trace_.size(); + rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); + rtc_timer_.async_wait( + std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, + std::placeholders::_1)); + } + +#ifndef _WIN32 + void handleInput(const std::error_code &error, std::size_t length) { + if (error) { + producer_socket_->stop(); + io_service_.stop(); + } + + if (rtc_running_) { + std::cout << "stop real time content production" << std::endl; + rtc_running_ = false; + rtc_timer_.cancel(); + } else { + std::cout << "start real time content production" << std::endl; + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&Impl::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } +#endif + + int parseTraceFile() { + std::ifstream trace(configuration_.trace_file_); + if (trace.fail()) { + return -1; + } + std::string line; + while (std::getline(trace, line)) { + std::istringstream iss(line); + hiperf::packet_t packet; + iss >> packet.timestamp >> packet.size; + configuration_.trace_.push_back(packet); + } + return 0; + } + + int run() { + std::cerr << "Starting to serve consumers" << std::endl; + + signals_.add(SIGINT); + signals_.async_wait([this](const std::error_code &, const int &) { + std::cout << "STOPPING!!" << std::endl; + producer_socket_->stop(); + io_service_.stop(); + }); + + if (configuration_.rtc_) { +#ifndef _WIN32 + if (configuration_.interactive_) { + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&Impl::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } else if (configuration_.trace_based_) { + std::cout << "trace-based mode enabled" << std::endl; + if (configuration_.trace_file_ == nullptr) { + std::cout << "cannot find the trace file" << std::endl; + return ERROR_SETUP; + } + if (parseTraceFile() < 0) { + std::cout << "cannot parse the trace file" << std::endl; + return ERROR_SETUP; + } + rtc_running_ = true; + rtc_timer_.expires_from_now(std::chrono::milliseconds(1)); + rtc_timer_.async_wait( + std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, + std::placeholders::_1)); + } else if (configuration_.input_stream_mode_) { + rtc_running_ = true; + // crate socket + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + socket_.bind(remote_); + recv_buffer_.first = (uint8_t *)malloc(1500); + recv_buffer_.second = 1500; + receiveStream(); + } else { + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, + this, std::placeholders::_1)); + } +#else + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, + std::placeholders::_1)); +#endif + } + + io_service_.run(); + + return ERROR_SUCCESS; + } + + private: + hiperf::ServerConfiguration configuration_; + asio::io_service io_service_; + asio::signal_set signals_; + asio::steady_timer rtc_timer_; + std::vector<uint32_t> unsatisfied_interests_; + std::vector<std::shared_ptr<ContentObject>> content_objects_; + std::uint16_t content_objects_index_; + std::uint16_t mask_; + std::uint32_t last_segment_; + std::uint32_t *ptr_last_segment_; + std::unique_ptr<ProducerSocket> producer_socket_; +#ifndef _WIN32 + asio::posix::stream_descriptor input_; + asio::streambuf input_buffer_; + bool rtc_running_; + Name flow_name_; + asio::ip::udp::socket socket_; + asio::ip::udp::endpoint remote_; + std::pair<uint8_t *, std::size_t> recv_buffer_; +#endif +}; + +HIperfServer::HIperfServer(const ServerConfiguration &conf) { + impl_ = new Impl(conf); +} + +HIperfServer::~HIperfServer() { delete impl_; } + +int HIperfServer::setup() { return impl_->setup(); } + +void HIperfServer::run() { impl_->run(); } + +} // namespace hiperf diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h new file mode 100644 index 000000000..05407a807 --- /dev/null +++ b/apps/hiperf/src/server.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <common.h> + +namespace hiperf { + +class HIperfServer { + public: + HIperfServer(const ServerConfiguration &conf); + ~HIperfServer(); + int setup(); + void run(); + + private: + class Impl; + Impl *impl_; +}; + +} // namespace hiperf
\ No newline at end of file diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index 8c2043c30..66b9c1bab 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) set(CMAKE_CXX_STANDARD 14) # -Wno-c99-designator issue @@ -56,10 +56,18 @@ list(APPEND COMPILER_DEFINITIONS -DWITH_POLICY ) +list(APPEND HTTP_PROXY_LIBRARIES + ${LIBTRANSPORT_LIBRARIES} + ${LIBHICNCTRL_LIBRARIES} + ${LIBHICN_LIBRARIES} + ${OPENSSL_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} +) + build_library(${LIBHTTP_PROXY} STATIC SOURCES ${LIB_SOURCE_FILES} - LINK_LIBRARIES ${LIBRARIES} + LINK_LIBRARIES ${HTTP_PROXY_LIBRARIES} DEPENDS ${DEPENDENCIES} INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES} INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS} diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt index 75cbbd64b..5cc80a168 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -11,8 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h index 19c96a9e3..e02b9d9a7 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -27,7 +27,6 @@ #include "forwarder_interface.h" - #define RETRY_INTERVAL 300 namespace transport { @@ -68,7 +67,8 @@ class ForwarderConfig { if (ret < 0) { // We were not able to connect to the local forwarder. Do not give up // and retry. - TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + TRANSPORT_LOG_ERROR + << "Could not connect to local forwarder. Retrying."; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, @@ -79,7 +79,8 @@ class ForwarderConfig { doGetMainListener(std::make_error_code(std::errc(0))); } } else { - TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); + TRANSPORT_LOG_ERROR + << "Timer for re-trying forwarder connection canceled."; } } @@ -90,9 +91,9 @@ class ForwarderConfig { if (ret <= 0) { // Since without the main listener of the forwarder the proxy cannot // work, we can stop the program here until we get the listener port. - TRANSPORT_LOGE( - "Could not retrieve main listener port from the forwarder. " - "Retrying."); + TRANSPORT_LOG_ERROR + << "Could not retrieve main listener port from the forwarder. " + "Retrying."; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, @@ -104,7 +105,8 @@ class ForwarderConfig { listener_retrieved_callback_(std::make_error_code(std::errc(0))); } } else { - TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + TRANSPORT_LOG_ERROR + << "Timer for retrieving main hicn listener canceled."; } } diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index 7d8235ac6..c2448de9a 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -119,7 +119,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < routes_to_remove.size(); i++) { connids_to_remove.insert(routes_to_remove[i]->face_id); if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { - TRANSPORT_LOGE("Error removing route from forwarder."); + TRANSPORT_LOG_ERROR << "Error removing route from forwarder."; } } @@ -146,7 +146,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < conns_to_remove.size(); i++) { if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { - TRANSPORT_LOGE("Error removing connection from forwarder."); + TRANSPORT_LOG_ERROR << "Error removing connection from forwarder."; } } diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 262fcb8e1..2040f7cfa 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -67,8 +67,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { std::string remote_address = socket.remote_endpoint().address().to_string(); std::uint16_t remote_port = socket.remote_endpoint().port(); - TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), - remote_port); + TRANSPORT_LOG_INFO << "Client " << remote_address << ":" + << remote_port << "disconnected."; } catch (std::system_error& e) { // Do nothing } @@ -85,7 +85,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { private: void consumeNextRequest() { if (request_buffer_queue_.size() == 0) { - TRANSPORT_LOGD("No additional requests to process."); + // No additional requests to process return; } @@ -136,24 +136,24 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { current_size_ += size; if (is_last) { - TRANSPORT_LOGD("Request received: %s", - std::string((const char*)tmp_buffer_.first->data(), - tmp_buffer_.first->length()) - .c_str()); + // TRANSPORT_LOGD("Request received: %s", + // std::string((const char*)tmp_buffer_.first->data(), + // tmp_buffer_.first->length()) + // .c_str()); if (current_size_ < 1400) { request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); } else { - TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", - current_size_); + TRANSPORT_LOG_ERROR << "Ignoring client request due to size (" + << current_size_ << ") > 1400."; session_->close(); current_size_ = 0; return; } if (!consumer_.isRunning()) { - TRANSPORT_LOGD( - "Consumer stopped, triggering consume from TCP session " - "handler.."); + TRANSPORT_LOG_INFO + << "Consumer stopped, triggering consume from TCP session " + "handler.."; consumeNextRequest(); } @@ -187,12 +187,13 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { // Response received. Send it back to client auto _buffer = buffer.release(); - TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + // TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); session_->send(_buffer, []() {}); } void readError(const std::error_code ec) noexcept { - TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + TRANSPORT_LOG_ERROR + << "Error reading from hicn consumer socket. Closing session."; session_->close(); } @@ -209,15 +210,14 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { * Let's grant it! */ if (metadata->method == "OPTIONS") { - session_->send( - (const uint8_t*)HTTPMessageFastParser::http_cors, - std::strlen(HTTPMessageFastParser::http_cors), [this]() { - auto& socket = session_->socket_; - TRANSPORT_LOGI( - "Sent OPTIONS to client %s:%d", - socket.remote_endpoint().address().to_string().c_str(), - socket.remote_endpoint().port()); - }); + session_->send((const uint8_t*)HTTPMessageFastParser::http_cors, + std::strlen(HTTPMessageFastParser::http_cors), [this]() { + auto& socket = session_->socket_; + TRANSPORT_LOG_INFO + << "Sent OPTIONS to client " + << socket.remote_endpoint().address() << ":" + << socket.remote_endpoint().port(); + }); } } else { tcp_receiver_.parseHicnHeader( @@ -230,14 +230,14 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { } /* Route created. Send back a 200 OK to client */ - session_->send( - (const uint8_t*)reply, std::strlen(reply), [this, result]() { - auto& socket = session_->socket_; - TRANSPORT_LOGI( - "Sent %d response to client %s:%d", result, - socket.remote_endpoint().address().to_string().c_str(), - socket.remote_endpoint().port()); - }); + session_->send((const uint8_t*)reply, std::strlen(reply), + [this, result]() { + auto& socket = session_->socket_; + TRANSPORT_LOG_INFO + << "Sent " << result << " response to client " + << socket.remote_endpoint().address() << ":" + << socket.remote_endpoint().port(); + }); }); } } @@ -313,7 +313,6 @@ void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { if (http_clients_.size() == 0) { // Create new HTTPClientConnectionCallback - TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); http_clients_.emplace_back( new HTTPClientConnectionCallback(*this, thread_)); } @@ -332,7 +331,8 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { void HTTPProxy::setupSignalHandler() { signals_.async_wait([this](const std::error_code& ec, int signal_number) { if (!ec) { - TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + TRANSPORT_LOG_INFO << "Received signal " << signal_number + << ". Stopping gracefully."; stop(); } }); diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 6b91c12c3..84c814cbd 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -111,7 +111,6 @@ void HTTPSession::send(utils::MemBuf *buffer, doWrite(); } } else { - TRANSPORT_LOGD("Tell the handle connect it has data to write"); data_available_ = true; } }); @@ -134,15 +133,11 @@ void HTTPSession::doWrite() { asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent! %zu", - length); write_msgs_.front().second(); write_msgs_.pop_front(); if (!write_msgs_.empty()) { doWrite(); } - } else { - TRANSPORT_LOGD("Content NOT sent!"); } }); } // namespace transport @@ -274,7 +269,7 @@ void HTTPSession::doReadHeader() { void HTTPSession::tryReconnection() { if (on_connection_closed_callback_(socket_)) { if (state_ == ConnectorState::CONNECTED) { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + TRANSPORT_LOG_ERROR << "Connection lost. Trying to reconnect..."; state_ = ConnectorState::CONNECTING; is_reconnection_ = true; io_service_.post([this]() { @@ -290,35 +285,35 @@ void HTTPSession::tryReconnection() { } void HTTPSession::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - state_ = ConnectorState::CONNECTED; + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); - // on_reconnect_callback_(); + // on_reconnect_callback_(); - doReadHeader(); + doReadHeader(); - if (data_available_ && !write_msgs_.empty()) { - data_available_ = false; - doWrite(); - } + if (data_available_ && !write_msgs_.empty()) { + data_available_ = false; + doWrite(); + } - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGD("Connection recovered!"); - } + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOG_INFO << "Connection recovered!"; + } - } else { - TRANSPORT_LOGE("Impossible to reconnect: %s", - ec.message().c_str()); - close(); - } - }); + } else { + TRANSPORT_LOG_ERROR << "Impossible to reconnect: " << ec.message(); + close(); + } + }); } bool HTTPSession::checkConnected() { @@ -335,7 +330,7 @@ void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); - TRANSPORT_LOGE("Error connecting. Is the server running?\n"); + TRANSPORT_LOG_ERROR << "Error connecting. Is the server running?"; io_service_.stop(); }); } diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 23e5b5623..ea8ac7191 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -55,28 +55,28 @@ AsyncConsumerProducer::AsyncConsumerProducer( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: output buffer size has not been set."); + TRANSPORT_LOG_WARNING << "Warning: output buffer size has not been set."; } ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: impossible to enable signatures."); + TRANSPORT_LOG_WARNING << "Warning: impossible to enable signatures."; } ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: mtu has not been set."); + TRANSPORT_LOG_WARNING << "Warning: mtu has not been set."; } producer_socket_.registerPrefix(prefix_); } void AsyncConsumerProducer::start() { - TRANSPORT_LOGD("Starting listening"); + TRANSPORT_LOG_INFO << "Starting listening"; doReceive(); } @@ -90,8 +90,8 @@ void AsyncConsumerProducer::run() { void AsyncConsumerProducer::stop() { io_service_.post([this]() { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); + TRANSPORT_LOG_INFO << "Number of requests processed by plugin: " + << request_counter_; producer_socket_.stop(); connector_.close(); }); @@ -123,16 +123,14 @@ void AsyncConsumerProducer::manageIncomingInterest( if (_it != _end) { if (_it->second.second) { - TRANSPORT_LOGD( - "Content is in production, interests will be satisfied shortly."); return; } if (seg >= _it->second.first) { - TRANSPORT_LOGD( - "Ignoring interest with name %s for a content object which does not " - "exist. (Request: %u, max: %u)", - name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + // TRANSPORT_LOGD( + // "Ignoring interest with name %s for a content object which does not " + // "exist. (Request: %u, max: %u)", + // name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); return; } } @@ -170,7 +168,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, options.getLifetime()); if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { - TRANSPORT_LOGD("Warning: content object lifetime has not been set."); + TRANSPORT_LOG_WARNING << "Warning: content object lifetime has not been set."; } const interface::Name& name = options.getName(); diff --git a/apps/ping/.clang-format b/apps/ping/.clang-format new file mode 100644 index 000000000..cd21e2017 --- /dev/null +++ b/apps/ping/.clang-format @@ -0,0 +1,14 @@ +# Copyright (c) 2017-2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BasedOnStyle: Google diff --git a/apps/ping/CMakeLists.txt b/apps/ping/CMakeLists.txt new file mode 100644 index 000000000..42f7f98c1 --- /dev/null +++ b/apps/ping/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if (NOT DISABLE_EXECUTABLES) + list (APPEND PING_LIBRARIES + ${LIBTRANSPORT_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + ${WSOCK32_LIBRARY} + ${WS2_32_LIBRARY} + ) + + build_executable(hicn-ping-server + SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_server.cc + LINK_LIBRARIES ${PING_LIBRARIES} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + ) + + build_executable(hicn-ping-client + SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_client.cc + LINK_LIBRARIES ${PING_LIBRARIES} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + ) +endif ()
\ No newline at end of file diff --git a/apps/ping/src/ping_client.cc b/apps/ping/src/ping_client.cc new file mode 100644 index 000000000..24e0bf3ed --- /dev/null +++ b/apps/ping/src/ping_client.cc @@ -0,0 +1,441 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/portal.h> + +#include <asio/signal_set.hpp> +#include <asio/steady_timer.hpp> +#include <chrono> +#include <map> + +#define SYN_STATE 1 +#define ACK_STATE 2 + +namespace transport { + +namespace core { + +namespace ping { + +typedef std::map<uint64_t, uint64_t> SendTimeMap; +typedef auth::AsymmetricVerifier Verifier; + +class Configuration { + public: + uint64_t interestLifetime_; + uint64_t pingInterval_; + uint64_t maxPing_; + uint64_t first_suffix_; + std::string name_; + std::string certificate_; + uint16_t srcPort_; + uint16_t dstPort_; + bool verbose_; + bool dump_; + bool jump_; + bool open_; + bool always_syn_; + bool always_ack_; + bool quiet_; + uint32_t jump_freq_; + uint32_t jump_size_; + uint8_t ttl_; + + Configuration() { + interestLifetime_ = 500; // ms + pingInterval_ = 1000000; // us + maxPing_ = 10; // number of interests + first_suffix_ = 0; + name_ = "b001::1"; // string + srcPort_ = 9695; + dstPort_ = 8080; + verbose_ = false; + dump_ = false; + jump_ = false; + open_ = false; + always_syn_ = false; + always_ack_ = false; + quiet_ = false; + jump_freq_ = 0; + jump_size_ = 0; + ttl_ = 64; + } +}; + +class Client : interface::Portal::ConsumerCallback { + public: + Client(Configuration *c) + : portal_(), signals_(portal_.getIoService(), SIGINT) { + // Let the main thread to catch SIGINT + portal_.connect(); + portal_.setConsumerCallback(this); + + signals_.async_wait(std::bind(&Client::afterSignal, this)); + + timer_.reset(new asio::steady_timer(portal_.getIoService())); + config_ = c; + sequence_number_ = config_->first_suffix_; + last_jump_ = 0; + processed_ = 0; + state_ = SYN_STATE; + sent_ = 0; + received_ = 0; + timedout_ = 0; + if (!c->certificate_.empty()) { + verifier_.useCertificate(c->certificate_); + } + } + + virtual ~Client() {} + + void ping() { + std::cout << "start ping" << std::endl; + doPing(); + portal_.runEventsLoop(); + } + + void onContentObject(Interest &interest, ContentObject &object) override { + uint64_t rtt = 0; + + if (!config_->certificate_.empty()) { + auto t0 = std::chrono::steady_clock::now(); + if (verifier_.verifyPacket(&object)) { + auto t1 = std::chrono::steady_clock::now(); + auto dt = + std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0); + std::cout << "Verification time: " << dt.count() << std::endl; + std::cout << "<<< Signature Ok." << std::endl; + } else { + std::cout << "<<< Signature verification failed!" << std::endl; + } + } + + auto it = send_timestamps_.find(interest.getName().getSuffix()); + if (it != send_timestamps_.end()) { + rtt = std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + it->second; + send_timestamps_.erase(it); + } + + if (config_->verbose_) { + std::cout << "<<< recevied object. " << std::endl; + std::cout << "<<< interest name: " << interest.getName() + << " src port: " << interest.getSrcPort() + << " dst port: " << interest.getDstPort() + << " flags: " << interest.printFlags() << std::endl; + std::cout << "<<< object name: " << object.getName() + << " src port: " << object.getSrcPort() + << " dst port: " << object.getDstPort() + << " flags: " << object.printFlags() << " path label " + << object.getPathLabel() << " (" + << (object.getPathLabel() >> 24) << ")" + << " TTL: " << (int)object.getTTL() << std::endl; + } else if (!config_->quiet_) { + std::cout << "<<< received object. " << std::endl; + std::cout << "<<< round trip: " << rtt << " [us]" << std::endl; + std::cout << "<<< interest name: " << interest.getName() << std::endl; + std::cout << "<<< object name: " << object.getName() << std::endl; + std::cout << "<<< content object size: " + << object.payloadSize() + object.headerSize() << " [bytes]" + << std::endl; + } + + if (config_->dump_) { + std::cout << "----- interest dump -----" << std::endl; + interest.dump(); + std::cout << "-------------------------" << std::endl; + std::cout << "----- object dump -------" << std::endl; + object.dump(); + std::cout << "-------------------------" << std::endl; + } + + if (!config_->quiet_) std::cout << std::endl; + + if (!config_->always_syn_) { + if (object.testSyn() && object.testAck() && state_ == SYN_STATE) { + state_ = ACK_STATE; + } + } + + received_++; + processed_++; + if (processed_ >= config_->maxPing_) { + afterSignal(); + } + } + + void onTimeout(Interest::Ptr &interest, const Name &name) override { + if (config_->verbose_) { + std::cout << "### timeout for " << name + << " src port: " << interest->getSrcPort() + << " dst port: " << interest->getDstPort() + << " flags: " << interest->printFlags() << std::endl; + } else if (!config_->quiet_) { + std::cout << "### timeout for " << name << std::endl; + } + + if (config_->dump_) { + std::cout << "----- interest dump -----" << std::endl; + interest->dump(); + std::cout << "-------------------------" << std::endl; + } + + if (!config_->quiet_) std::cout << std::endl; + + timedout_++; + processed_++; + if (processed_ >= config_->maxPing_) { + afterSignal(); + } + } + + void onError(std::error_code ec) override {} + + void doPing() { + const Name interest_name(config_->name_, (uint32_t)sequence_number_); + hicn_format_t format; + if (interest_name.getAddressFamily() == AF_INET) { + format = HF_INET_TCP; + } else { + format = HF_INET6_TCP; + } + + auto interest = std::make_shared<Interest>(interest_name, format); + + interest->setLifetime(uint32_t(config_->interestLifetime_)); + interest->resetFlags(); + + if (config_->open_ || config_->always_syn_) { + if (state_ == SYN_STATE) { + interest->setSyn(); + } else if (state_ == ACK_STATE) { + interest->setAck(); + } + } else if (config_->always_ack_) { + interest->setAck(); + } + + interest->setSrcPort(config_->srcPort_); + interest->setDstPort(config_->dstPort_); + interest->setTTL(config_->ttl_); + + if (config_->verbose_) { + std::cout << ">>> send interest " << interest->getName() + << " src port: " << interest->getSrcPort() + << " dst port: " << interest->getDstPort() + << " flags: " << interest->printFlags() + << " TTL: " << (int)interest->getTTL() << std::endl; + } else if (!config_->quiet_) { + std::cout << ">>> send interest " << interest->getName() << std::endl; + } + + if (config_->dump_) { + std::cout << "----- interest dump -----" << std::endl; + interest->dump(); + std::cout << "-------------------------" << std::endl; + } + + if (!config_->quiet_) std::cout << std::endl; + + send_timestamps_[sequence_number_] = + std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + portal_.sendInterest(std::move(interest)); + + sequence_number_++; + sent_++; + + if (sent_ < config_->maxPing_) { + this->timer_->expires_from_now( + std::chrono::microseconds(config_->pingInterval_)); + this->timer_->async_wait([this](const std::error_code e) { doPing(); }); + } + } + + void afterSignal() { + std::cout << "Stop ping" << std::endl; + std::cout << "Sent: " << sent_ << " Received: " << received_ + << " Timeouts: " << timedout_ << std::endl; + portal_.stopEventsLoop(); + } + + void reset() { + timer_.reset(new asio::steady_timer(portal_.getIoService())); + sequence_number_ = config_->first_suffix_; + last_jump_ = 0; + processed_ = 0; + state_ = SYN_STATE; + sent_ = 0; + received_ = 0; + timedout_ = 0; + } + + private: + SendTimeMap send_timestamps_; + interface::Portal portal_; + asio::signal_set signals_; + uint64_t sequence_number_; + uint64_t last_jump_; + uint64_t processed_; + uint32_t state_; + uint32_t sent_; + uint32_t received_; + uint32_t timedout_; + std::unique_ptr<asio::steady_timer> timer_; + Configuration *config_; + Verifier verifier_; +}; + +void help() { + std::cout << "usage: hicn-consumer-ping [options]" << std::endl; + std::cout << "PING options" << std::endl; + std::cout + << "-i <val> ping interval in microseconds (default 1000000ms)" + << std::endl; + std::cout << "-m <val> maximum number of pings to send (default 10)" + << std::endl; + std::cout << "-s <val> sorce port (default 9695)" << std::endl; + std::cout << "-d <val> destination port (default 8080)" << std::endl; + std::cout << "-t <val> set packet ttl (default 64)" << std::endl; + std::cout << "-O open tcp connection (three way handshake) " + "(default false)" + << std::endl; + std::cout << "-S send always syn messages (default false)" + << std::endl; + std::cout << "-A send always ack messages (default false)" + << std::endl; + std::cout << "HICN options" << std::endl; + std::cout << "-n <val> hicn name (default b001::1)" << std::endl; + std::cout + << "-l <val> interest lifetime in milliseconds (default 500ms)" + << std::endl; + std::cout << "OUTPUT options" << std::endl; + std::cout << "-V verbose, prints statistics about the " + "messagges sent and received (default false)" + << std::endl; + std::cout << "-D dump, dumps sent and received packets " + "(default false)" + << std::endl; + std::cout << "-q quiet, not prints (default false)" + << std::endl; + std::cout << "-H prints this message" << std::endl; +} + +int main(int argc, char *argv[]) { +#ifdef _WIN32 + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + + Configuration *c = new Configuration(); + int opt; + std::string producer_certificate = ""; + + while ((opt = getopt(argc, argv, "j::t:i:m:s:d:n:l:f:c:SAOqVDH")) != -1) { + switch (opt) { + case 't': + c->ttl_ = (uint8_t)std::stoi(optarg); + break; + case 'i': + c->pingInterval_ = std::stoi(optarg); + break; + case 'm': + c->maxPing_ = std::stoi(optarg); + break; + case 'f': + c->first_suffix_ = std::stoul(optarg); + break; + case 's': + c->srcPort_ = std::stoi(optarg); + break; + case 'd': + c->dstPort_ = std::stoi(optarg); + break; + case 'n': + c->name_ = optarg; + break; + case 'l': + c->interestLifetime_ = std::stoi(optarg); + break; + case 'V': + c->verbose_ = true; + ; + break; + case 'D': + c->dump_ = true; + break; + case 'O': + c->always_syn_ = false; + c->always_ack_ = false; + c->open_ = true; + break; + case 'S': + c->always_syn_ = true; + c->always_ack_ = false; + c->open_ = false; + break; + case 'A': + c->always_syn_ = false; + c->always_ack_ = true; + c->open_ = false; + break; + case 'q': + c->quiet_ = true; + c->verbose_ = false; + c->dump_ = false; + break; + case 'c': + c->certificate_ = std::string(optarg); + break; + case 'H': + default: + help(); + exit(EXIT_FAILURE); + } + } + + auto ping = std::make_unique<Client>(c); + + auto t0 = std::chrono::steady_clock::now(); + ping->ping(); + auto t1 = std::chrono::steady_clock::now(); + + std::cout + << "Elapsed time: " + << std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0).count() + << std::endl; + +#ifdef _WIN32 + WSACleanup(); +#endif + return 0; +} + +} // namespace ping + +} // namespace core + +} // namespace transport + +int main(int argc, char *argv[]) { + return transport::core::ping::main(argc, argv); +} diff --git a/apps/ping/src/ping_server.cc b/apps/ping/src/ping_server.cc new file mode 100644 index 000000000..baf9c6698 --- /dev/null +++ b/apps/ping/src/ping_server.cc @@ -0,0 +1,340 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_producer.h> +#ifndef _WIN32 +#include <hicn/transport/utils/daemonizator.h> +#include <unistd.h> +#else +#include <openssl/applink.c> +#endif + +#include <hicn/transport/auth/identity.h> +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/utils/string_tokenizer.h> + +#include <asio.hpp> + +namespace transport { + +namespace interface { + +using HashAlgorithm = core::HashAlgorithm; +using CryptoSuite = auth::CryptoSuite; + +auth::Identity setProducerIdentity(std::string keystore_name, + std::string keystore_password, + auth::CryptoHashType hash_algorithm) { + if (access(keystore_name.c_str(), F_OK) != -1) { + return auth::Identity(keystore_name, keystore_password, hash_algorithm); + } else { + return auth::Identity(keystore_name, keystore_password, + CryptoSuite::RSA_SHA256, 1024, 365, "producer-test"); + } +} + +class CallbackContainer { + const std::size_t log2_content_object_buffer_size = 12; + + public: + CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose, + bool dump, bool quite, bool flags, bool reset, uint8_t ttl, + auth::Identity *identity, bool sign, uint32_t lifetime) + : buffer_(object_size, 'X'), + content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)), + mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), + content_objects_index_(0), + verbose_(verbose), + dump_(dump), + quite_(quite), + flags_(flags), + reset_(reset), + ttl_(ttl), + identity_(identity), + sign_(sign) { + core::Packet::Format format; + + if (prefix.getAddressFamily() == AF_INET) { + format = core::Packet::Format::HF_INET_TCP; + if (sign_) { + format = core::Packet::Format::HF_INET_TCP_AH; + } + } else { + format = core::Packet::Format::HF_INET6_TCP; + if (sign_) { + format = core::Packet::Format::HF_INET6_TCP_AH; + } + } + + for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { + content_objects_[i] = std::make_shared<ContentObject>( + prefix, format, 0, (const uint8_t *)buffer_.data(), buffer_.size()); + content_objects_[i]->setLifetime(lifetime); + } + } + + void processInterest(ProducerSocket &p, const Interest &interest, + uint32_t lifetime) { + if (verbose_) { + std::cout << "<<< received interest " << interest.getName() + << " src port: " << interest.getSrcPort() + << " dst port: " << interest.getDstPort() + << " flags: " << interest.printFlags() + << "TTL: " << (int)interest.getTTL() << std::endl; + } else if (!quite_) { + std::cout << "<<< received interest " << interest.getName() << std::endl; + } + + if (dump_) { + std::cout << "----- interest dump -----" << std::endl; + interest.dump(); + std::cout << "-------------------------" << std::endl; + } + + if (interest.testRst()) { + std::cout << "!!!got a reset, I don't reply" << std::endl; + } else { + auto &content_object = content_objects_[content_objects_index_++ & mask_]; + + content_object->setName(interest.getName()); + content_object->setLifetime(lifetime); + content_object->setLocator(interest.getLocator()); + content_object->setSrcPort(interest.getDstPort()); + content_object->setDstPort(interest.getSrcPort()); + content_object->setTTL(ttl_); + + if (!sign_) { + content_object->resetFlags(); + } + + if (flags_) { + if (interest.testSyn()) { + content_object->setSyn(); + content_object->setAck(); + } else if (interest.testAck()) { + content_object->setAck(); + } // here I may need to handle the FIN flag; + } else if (reset_) { + content_object->setRst(); + } + + if (verbose_) { + std::cout << ">>> send object " << content_object->getName() + << " src port: " << content_object->getSrcPort() + << " dst port: " << content_object->getDstPort() + << " flags: " << content_object->printFlags() + << " TTL: " << (int)content_object->getTTL() << std::endl; + } else if (!quite_) { + std::cout << ">>> send object " << content_object->getName() + << std::endl; + } + + if (dump_) { + std::cout << "----- object dump -----" << std::endl; + content_object->dump(); + std::cout << "-----------------------" << std::endl; + } + + if (!quite_) std::cout << std::endl; + + if (sign_) { + identity_->getSigner()->signPacket(content_object.get()); + } + + p.produce(*content_object); + } + } + + private: + std::string buffer_; + std::vector<std::shared_ptr<ContentObject>> content_objects_; + std::uint16_t mask_; + std::uint16_t content_objects_index_; + bool verbose_; + bool dump_; + bool quite_; + bool flags_; + bool reset_; + uint8_t ttl_; + auth::Identity *identity_; + bool sign_; +}; + +void help() { + std::cout << "usage: hicn-preoducer-ping [options]" << std::endl; + std::cout << "PING options" << std::endl; + std::cout << "-s <val> object content size (default 1350B)" << std::endl; + std::cout << "-n <val> hicn name (default b001::/64)" << std::endl; + std::cout << "-f set tcp flags according to the flag received " + "(default false)" + << std::endl; + std::cout << "-l data lifetime" << std::endl; + std::cout << "-r always reply with a reset flag (default false)" + << std::endl; + std::cout << "-t set ttl (default 64)" << std::endl; + std::cout << "OUTPUT options" << std::endl; + std::cout << "-V verbose, prints statistics about the messagges sent " + "and received (default false)" + << std::endl; + std::cout << "-D dump, dumps sent and received packets (default false)" + << std::endl; + std::cout << "-q quite, not prints (default false)" << std::endl; +#ifndef _WIN32 + std::cout << "-d daemon mode" << std::endl; +#endif + std::cout << "-H prints this message" << std::endl; +} + +int main(int argc, char **argv) { +#ifdef _WIN32 + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#else + bool daemon = false; +#endif + std::string name_prefix = "b001::0/64"; + std::string delimiter = "/"; + bool verbose = false; + bool dump = false; + bool quite = false; + bool flags = false; + bool reset = false; + uint32_t object_size = 1250; + uint8_t ttl = 64; + std::string keystore_path = "./rsa_crypto_material.p12"; + std::string keystore_password = "cisco"; + bool sign = false; + uint32_t data_lifetime = default_values::content_object_expiry_time; + + int opt; +#ifndef _WIN32 + while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDdHk:p:")) != -1) { +#else + while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) { +#endif + switch (opt) { + case 's': + object_size = std::stoi(optarg); + break; + case 'n': + name_prefix = optarg; + break; + case 't': + ttl = (uint8_t)std::stoi(optarg); + break; + case 'l': + data_lifetime = std::stoi(optarg); + break; + case 'V': + verbose = true; + break; + case 'D': + dump = true; + break; + case 'q': + verbose = false; + dump = false; + quite = true; + break; +#ifndef _WIN32 + case 'd': + daemon = true; + break; +#endif + case 'f': + flags = true; + break; + case 'r': + reset = true; + break; + case 'k': + keystore_path = optarg; + sign = true; + break; + case 'p': + keystore_password = optarg; + break; + case 'H': + default: + help(); + exit(EXIT_FAILURE); + } + } + +#ifndef _WIN32 + if (daemon) { + utils::Daemonizator::daemonize(); + } +#endif + + core::Prefix producer_namespace(name_prefix); + + utils::StringTokenizer tokenizer(name_prefix, delimiter); + std::string ip_address = tokenizer.nextToken(); + Name n(ip_address); + + if (object_size > 1350) object_size = 1350; + + CallbackContainer *stubs; + auth::Identity identity = setProducerIdentity( + keystore_path, keystore_password, auth::CryptoHashType::SHA256); + + if (sign) { + stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags, + reset, ttl, &identity, sign, data_lifetime); + } else { + auth::Identity *identity = nullptr; + stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags, + reset, ttl, identity, sign, data_lifetime); + } + + ProducerSocket p; + p.registerPrefix(producer_namespace); + + p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + p.setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind(&CallbackContainer::processInterest, stubs, + std::placeholders::_1, + std::placeholders::_2, data_lifetime)); + + p.connect(); + + asio::io_service io_service; + asio::signal_set signal_set(io_service, SIGINT); + signal_set.async_wait( + [&p, &io_service](const std::error_code &, const int &) { + std::cout << "STOPPING!!" << std::endl; + p.stop(); + io_service.stop(); + }); + + io_service.run(); + +#ifdef _WIN32 + WSACleanup(); +#endif + return 0; +} + +} // namespace interface + +} // end namespace transport + +int main(int argc, char **argv) { + return transport::interface::main(argc, argv); +} |