diff options
Diffstat (limited to 'apps')
32 files changed, 3978 insertions, 470 deletions
diff --git a/apps/.clang-format b/apps/.clang-format new file mode 100644 index 000000000..adc73c6fd --- /dev/null +++ b/apps/.clang-format @@ -0,0 +1,14 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BasedOnStyle: Google diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index 5737a1d09..b58e18f03 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,69 +11,106 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -set(CMAKE_CXX_STANDARD 14) - +############################################################## +# Project and cmake version +############################################################## +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) project(apps) + +############################################################## +# C Standard +############################################################## +set(CMAKE_CXX_STANDARD 17) + + +############################################################## +# Cmake modules +############################################################## +include("${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake") set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} - "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules" - "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules" + ${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules ) -include(BuildMacros) -include(WindowsMacros) +############################################################## +# Libs and Bins names +############################################################## set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE) +set(HIGET higet) +set(HTTP_PROXY hicn-http-proxy) +set(LIBHTTP_PROXY hicnhttpproxy) +set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) + +############################################################## +# Dependencies and third party libs +############################################################## +find_package(Threads REQUIRED) +find_package(Libconfig++ ${LIBCONFIG_DEFAULT_VERSION} REQUIRED) + +############################################################## +# Check if building as subproject or as root project +############################################################## if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) - find_package(Libtransport REQUIRED) - find_package(hicnctrl REQUIRED) - find_package(Threads REQUIRED) + include(CommonSetup) + + find_package(Libhicn ${CURRENT_VERSION} REQUIRED NO_MODULE) + find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE) + find_package(Libhicntransport ${CURRENT_VERSION} REQUIRED NO_MODULE) + + if (DISABLE_SHARED_LIBRARIES) + set(LIBTYPE static) + else() + set(LIBTYPE shared) + endif() + + list(APPEND LIBHICN_LIBRARIES hicn::hicn.${LIBTYPE}) + list(APPEND LIBTRANSPORT_LIBRARIES hicn::hicntransport.${LIBTYPE}) + list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE}) else() if (DISABLE_SHARED_LIBRARIES) - find_package(OpenSSL REQUIRED) - if (NOT WIN32) - find_package(ZLIB REQUIRED) - endif () + find_package(OpenSSL ${OPENSSL_DEFAULT_VERSION} REQUIRED) set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC}) + set(LIBHICN_LIBRARIES ${LIBHICN_STATIC}) set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC}) else () set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED}) + set(LIBHICN_LIBRARIES ${LIBHICN_SHARED}) set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED}) endif () list(APPEND DEPENDENCIES ${LIBTRANSPORT_LIBRARIES} ) -endif() -set(SUFFIX "") -if (${LIBTRANSPORT_LIBRARIES} MATCHES ".*-memif.*") - set(DEPENDENCIES ${LIBMEMIF_SHARED}) - set(SUFFIX "-memif") - set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs") -endif() + # glog + list(APPEND THIRD_PARTY_INCLUDE_DIRS + ${glog_BINARY_DIR} + ${glog_SOURCE_DIR}/src + ) + list(APPEND THIRD_PARTY_DEPENDENCIES + glog + ) -set(HICN_APPS "${HICN_APPS}${SUFFIX}") + set(COMMON_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/common-includes + ) +endif() -list(APPEND LIBRARIES - ${LIBTRANSPORT_LIBRARIES} - ${LIBHICNCTRL_LIBRARIES} - ${OPENSSL_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} -) -if (WIN32) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996") -endif () +############################################################## +# Packaging and versioning +############################################################## +include(${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake) +include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/packaging.cmake) -include(Packaging) -set(HIGET higet) -set(HTTP_PROXY hicn-http-proxy) -if (NOT WIN32) - add_subdirectory(http-proxy) -endif () +############################################################## +# Subdirectories +############################################################## +add_subdirectory(ping) +add_subdirectory(hiperf) +add_subdirectory(http-proxy) add_subdirectory(higet) diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/packaging.cmake index 9b3fa2e72..9142598ad 100644 --- a/apps/cmake/Modules/Packaging.cmake +++ b/apps/cmake/packaging.cmake @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -18,21 +18,34 @@ useful for testing and debugging within a hicn network." ) set(${HICN_APPS}_DEB_DEPENDENCIES - "lib${LIBTRANSPORT} (>= stable_version)" + "lib${LIBTRANSPORT} (= stable_version), lib${LIBHICNCTRL} (= stable_version)" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}-dev_DEB_DEPENDENCIES - "lib${LIBTRANSPORT}-dev (>= stable_version)" + "${HICN_APPS} (= stable_version), lib${LIBTRANSPORT}-dev (= stable_version), lib${LIBHICNCTRL}-dev (= stable_version)" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}_RPM_DEPENDENCIES - "lib${LIBTRANSPORT} >= stable_version" + "lib${LIBTRANSPORT} = stable_version, lib${LIBHICNCTRL} = stable_version" CACHE STRING "Dependencies for deb/rpm package." ) set(${HICN_APPS}-dev_RPM_DEPENDENCIES - "lib${LIBTRANSPORT}-devel >= stable_version" + "${HICN_APPS} = stable_version, lib${LIBTRANSPORT}-dev = stable_version, lib${LIBHICNCTRL}-dev = stable_version" CACHE STRING "Dependencies for deb/rpm package." -)
\ No newline at end of file +) + +if (INTERNAL_ENVIRONMENT) + include(CheckSsl) + CheckSsl() + set(${HICN_APPS}_DEB_DEPENDENCIES + "${${HICN_APPS}_DEB_DEPENDENCIES}, ${OPENSSL_DEPENDENCY}" + CACHE STRING "Dependencies for deb/rpm package." + ) + set(${HICN_APPS}-dev_DEB_DEPENDENCIES + "${${HICN_APPS}-dev_DEB_DEPENDENCIES}, ${OPENSSL_DEPENDENCY_DEV}" + CACHE STRING "Dependencies for deb/rpm package." + ) +endif ()
\ No newline at end of file diff --git a/apps/common-includes/hicn/apps/utils/logger.h b/apps/common-includes/hicn/apps/utils/logger.h new file mode 100644 index 000000000..d2af988d9 --- /dev/null +++ b/apps/common-includes/hicn/apps/utils/logger.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <glog/logging.h> + +#include <iostream> + +#define LoggerInfo() LOG(INFO) +#define LoggerWarn() LOG(WARNING) +#define LoggerErr() LOG(ERROR) +#define LoggerFatal() LOG(FATAL) +#define LoggerVerbose(level) VLOG((level)) +#define LoggerIsOn(level) VLOG_IS_ON((level)) + +struct HicnLogger { + HicnLogger() { + // Set log level + const char *log_level = std::getenv("LOG_LEVEL"); + if (log_level != nullptr) FLAGS_v = std::stol(std::string(log_level)); + + // Enable/disable prefix + const char *enable_log_prefix = std::getenv("ENABLE_LOG_PREFIX"); + if (enable_log_prefix != nullptr && + std::string(enable_log_prefix) == "OFF") { + FLAGS_log_prefix = false; + } + + FLAGS_colorlogtostderr = true; + } +}; + +static HicnLogger logger;
\ No newline at end of file diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt index 1cf14c287..791191872 100644 --- a/apps/higet/CMakeLists.txt +++ b/apps/higet/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,33 +11,50 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -set(CMAKE_CXX_STANDARD 14) +############################################################## +# Source files +############################################################## +list(APPEND APPS_SRC + higet.cc +) -project(utils) -set(CMAKE_MODULE_PATH - ${CMAKE_MODULE_PATH} - "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules" - "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules" -) +############################################################## +# Linker flags +############################################################## +if (WIN32) + set(CMAKE_EXE_LINKER_FLAGS + "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\"" + ) +endif() -if (NOT CMAKE_BUILD_TYPE) - message(STATUS "${PROJECT_NAME}: No build type selected, default to Release") - set(CMAKE_BUILD_TYPE "Release") -endif () -list(APPEND APPS_SRC - higet.cc +############################################################## +# Compiler options +############################################################## +set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} ) + +############################################################## +# Build higet +############################################################## if (NOT DISABLE_EXECUTABLES) build_executable(${HIGET} SOURCES ${APPS_SRC} - LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY} - DEPENDS ${LIBTRANSPORT_LIBRARIES} + LINK_LIBRARIES + ${LIBHICN_LIBRARIES} + ${LIBTRANSPORT_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + ${WSOCK32_LIBRARY} + ${WS2_32_LIBRARY} + INCLUDE_DIRS + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${LIBTRANSPORT_LIBRARIES} ${THIRD_PARTY_DEPENDENCIES} COMPONENT ${HICN_APPS} DEFINITIONS ${COMPILER_DEFINITIONS} LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) endif () diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc index 7584148a9..77cbb52d2 100644 --- a/apps/higet/higet.cc +++ b/apps/higet/higet.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,25 +13,20 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/transport/http/client_connection.h> -#include <fstream> -#include <map> +#include <hicn/transport/utils/chrono_typedefs.h> + #include <algorithm> +#include <asio.hpp> +#include <fstream> #include <functional> +#include <map> +#ifndef ASIO_STANDALONE #define ASIO_STANDALONE +#endif #include <asio.hpp> -#undef ASIO_STANDALONE - -#include <thread> - -typedef std::chrono::time_point<std::chrono::system_clock> Time; -typedef std::chrono::milliseconds TimeDuration; - -Time t1; - -#define DEFAULT_BETA 0.99 -#define DEFAULT_GAMMA 0.07 namespace http { @@ -47,16 +42,13 @@ class ReadBytesCallbackImplementation static std::string chunk_separator; public: - ReadBytesCallbackImplementation(std::string file_name, long yet_downloaded) + ReadBytesCallbackImplementation(const std::string &file_name, + long yet_downloaded) : file_name_(file_name), temp_file_name_(file_name_ + ".temp"), yet_downloaded_(yet_downloaded), byte_downloaded_(yet_downloaded), - chunked_(false), - chunk_size_(0), - work_(std::make_unique<asio::io_service::work>(io_service_)), - thread_( - std::make_unique<std::thread>([this]() { io_service_.run(); })) { + work_(std::make_unique<asio::io_service::work>(io_service_)) { std::streambuf *buf; if (file_name_ != "-") { of_.open(temp_file_name_, std::ofstream::binary | std::ofstream::app); @@ -68,12 +60,6 @@ class ReadBytesCallbackImplementation out_ = new std::ostream(buf); } - ~ReadBytesCallbackImplementation() { - if (thread_->joinable()) { - thread_->join(); - } - } - void onBytesReceived(std::unique_ptr<utils::MemBuf> &&buffer) { auto buffer_ptr = buffer.release(); io_service_.post([this, buffer_ptr]() { @@ -101,7 +87,7 @@ class ReadBytesCallbackImplementation if (chunked_) { if (chunk_size_ > 0) { - out_->write((char *)payload->data(), chunk_size_); + out_->write((char *)payload->writableData(), chunk_size_); payload->trimStart(chunk_size_); if (payload->length() >= chunk_separator.size()) { @@ -118,7 +104,7 @@ class ReadBytesCallbackImplementation auto it = std::search(begin, end, begincrlf2, endcrlf2); if (it != end) { chunk_size_ = std::stoul(begin, 0, 16); - content_size_ += chunk_size_; + content_size_ += (long)chunk_size_; payload->trimStart(it + chunk_separator.size() - begin); std::size_t to_write; @@ -129,8 +115,8 @@ class ReadBytesCallbackImplementation chunk_size_ -= payload->length(); } - out_->write((char *)payload->data(), to_write); - byte_downloaded_ += to_write; + out_->write((char *)payload->writableData(), to_write); + byte_downloaded_ += (long)to_write; payload->trimStart(to_write); if (payload->length() >= chunk_separator.size()) { @@ -139,8 +125,8 @@ class ReadBytesCallbackImplementation } } } else { - out_->write((char *)payload->data(), payload->length()); - byte_downloaded_ += payload->length(); + out_->write((char *)payload->writableData(), payload->length()); + byte_downloaded_ += (long)payload->length(); } if (file_name_ != "-") { @@ -174,13 +160,13 @@ class ReadBytesCallbackImplementation } print_bar(100, 100, true); - std::cout << "\nDownloaded " << bytes << " bytes" << std::endl; + LoggerInfo() << "\nDownloaded " << bytes << " bytes"; } work_.reset(); }); } - void onError(const std::error_code ec) { + void onError(const std::error_code &ec) { io_service_.post([this]() { of_.close(); delete out_; @@ -207,7 +193,7 @@ class ReadBytesCallbackImplementation #endif std::cout << "["; - int pos = barWidth * progress; + int pos = barWidth * (int)progress; for (int i = 0; i < barWidth; ++i) { if (i < pos) { std::cout << "="; @@ -218,15 +204,13 @@ class ReadBytesCallbackImplementation } } if (last) { - std::cout << "] " << int(progress * 100.0) << " %" << std::endl - << std::endl; + std::cout << "] " << int(progress * 100.0) << " %"; } else { std::cout << "] " << int(progress * 100.0) << " %\r"; std::cout.flush(); } } - private: std::string file_name_; std::string temp_file_name_; std::ostream *out_; @@ -235,41 +219,36 @@ class ReadBytesCallbackImplementation long content_size_; bool first_chunk_read_ = false; long byte_downloaded_ = 0; - bool chunked_; - std::size_t chunk_size_; + bool chunked_ = false; + std::size_t chunk_size_ = 0; asio::io_service io_service_; std::unique_ptr<asio::io_service::work> work_; - std::unique_ptr<std::thread> thread_; }; std::string ReadBytesCallbackImplementation::chunk_separator = "\r\n"; -long checkFileStatus(std::string file_name) { +long checkFileStatus(const std::string &file_name) { struct stat stat_buf; std::string temp_file_name_ = file_name + ".temp"; int rc = stat(temp_file_name_.c_str(), &stat_buf); return rc == 0 ? stat_buf.st_size : -1; } -void usage(char *program_name) { - std::cerr << "usage:" << std::endl; - std::cerr << program_name << " [option]... [url]..." << std::endl; - std::cerr << program_name << " options:" << std::endl; - std::cerr - << "-O <out_put_path> = write documents to <out_put_file>" - << std::endl; - std::cerr << "-S = print server response" - << std::endl; - std::cerr << "-P = first word of the ipv6 name of " - "the response" - << std::endl; - std::cerr << "example:" << std::endl; - std::cerr << "\t" << program_name << " -O - http://origin/index.html" - << std::endl; - exit(EXIT_FAILURE); +void usage(const char *program_name) { + LoggerInfo() << "usage:"; + LoggerInfo() << program_name << " [option]... [url]..."; + LoggerInfo() << program_name << " options:"; + LoggerInfo() + << "-O <out_put_path> = write documents to <out_put_file>"; + LoggerInfo() << "-S = print server response"; + LoggerInfo() + << "-P = first word of the ipv6 name of " + "the response"; + LoggerInfo() << "example:"; + LoggerInfo() << "\t" << program_name << " -O - http://origin/index.html"; } -int main(int argc, char **argv) { +int http_main(int argc, char **argv) { #ifdef _WIN32 WSADATA wsaData = {0}; WSAStartup(MAKEWORD(2, 2), &wsaData); @@ -298,20 +277,20 @@ int main(int argc, char **argv) { case 'P': conf.ipv6_first_word = optarg; break; - case 'h': default: usage(argv[0]); - break; + exit(EXIT_FAILURE); } } if (!argv[optind]) { usage(argv[0]); + exit(EXIT_FAILURE); } name = argv[optind]; - std::cerr << "Using name " << name << " and name first word " - << conf.ipv6_first_word << std::endl; + LoggerInfo() << "Using name " << name << " and name first word " + << conf.ipv6_first_word; if (conf.file_name.empty()) { conf.file_name = name.substr(1 + name.find_last_of("/")); @@ -334,13 +313,16 @@ int main(int argc, char **argv) { {"Connection", "Keep-Alive"}, {"Range", range}}; } + transport::http::HTTPClientConnection connection; + if (!conf.producer_certificate.empty()) { - connection.setCertificate(conf.producer_certificate); + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::AsymmetricVerifier>( + conf.producer_certificate); + connection.setVerifier(verifier); } - t1 = std::chrono::system_clock::now(); - http::ReadBytesCallbackImplementation readBytesCallback(conf.file_name, yetDownloaded); @@ -356,4 +338,4 @@ int main(int argc, char **argv) { } // end namespace http -int main(int argc, char **argv) { return http::main(argc, argv); } +int main(int argc, char **argv) { return http::http_main(argc, argv); } diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt new file mode 100644 index 000000000..5a0dc3c06 --- /dev/null +++ b/apps/hiperf/CMakeLists.txt @@ -0,0 +1,61 @@ +# Copyright (c) 2021-2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if (NOT DISABLE_EXECUTABLES) +############################################################## +# Source files +############################################################## + list(APPEND HIPERF_SRC + ${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc + ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc + ${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc + ) + + +############################################################## +# Libraries +############################################################## + list (APPEND HIPERF_LIBRARIES + PRIVATE ${LIBTRANSPORT_LIBRARIES} + PRIVATE ${LIBHICNCTRL_LIBRARIES} + PRIVATE ${LIBHICN_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + PRIVATE ${LIBCONFIG_CPP_LIBRARIES} + PRIVATE ${WSOCK32_LIBRARY} + PRIVATE ${WS2_32_LIBRARY} + ) + +############################################################## +# Compiler options +############################################################## + set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ) + + +############################################################## +# Build hiperf +############################################################## + build_executable(hiperf + SOURCES ${HIPERF_SRC} + LINK_LIBRARIES ${HIPERF_LIBRARIES} + INCLUDE_DIRS + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS} + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} + ) +endif()
\ No newline at end of file diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc new file mode 100644 index 000000000..1ce5b4c55 --- /dev/null +++ b/apps/hiperf/src/client.cc @@ -0,0 +1,882 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <client.h> +#include <hicn/transport/portability/endianess.h> + +#include <libconfig.h++> + +namespace hiperf { + +/** + * Forward declaration of client Read callbacks. + */ +class RTCCallback; +class Callback; + +using transport::auth::CryptoHashType; +using transport::core::Packet; +using transport::core::Prefix; +using transport::interface::ConsumerCallbacksOptions; +using transport::interface::ConsumerSocket; +using transport::interface::GeneralTransportOptions; +using transport::interface::ProducerSocket; +using transport::interface::ProductionProtocolAlgorithms; +using transport::interface::RaaqmTransportOptions; +using transport::interface::RtcTransportOptions; +using transport::interface::RtcTransportRecoveryStrategies; +using transport::interface::StrategyCallback; +using transport::interface::TransportStatistics; + +/** + * Hiperf client class: configure and setup an hicn consumer following the + * ClientConfiguration. + */ +class HIperfClient::Impl { + friend class Callback; + friend class RTCCallback; + + static inline constexpr uint16_t klog2_header_counter() { return 4; } + static inline constexpr uint16_t kheader_counter_mask() { + return (1 << klog2_header_counter()) - 1; + } + + class ConsumerContext + : public Base<ConsumerContext, ClientConfiguration, Impl>, + private ConsumerSocket::ReadCallback { + static inline const std::size_t kmtu = HIPERF_MTU; + + public: + using ConfType = ClientConfiguration; + using ParentType = typename HIperfClient::Impl; + static inline auto getContextType() -> std::string { + return "ConsumerContext"; + } + + ConsumerContext(Impl &client, int consumer_identifier) + : Base(client, client.io_service_, consumer_identifier), + receive_buffer_( + utils::MemBuf::create(client.config_.receive_buffer_size_)), + socket_(client.io_service_), + payload_size_max_(PayloadSize(client.config_.packet_format_) + .getPayloadSizeMax(RTC_HEADER_SIZE)), + nb_iterations_(client.config_.nb_iterations_) {} + + ConsumerContext(ConsumerContext &&other) noexcept + : Base(std::move(other)), + receive_buffer_(std::move(other.receive_buffer_)), + socket_(std::move(other.socket_)), + payload_size_max_(other.payload_size_max_), + remote_(std::move(other.remote_)), + nb_iterations_(other.nb_iterations_), + saved_stats_(std::move(other.saved_stats_)), + header_counter_(other.header_counter_), + first_(other.first_), + consumer_socket_(std::move(other.consumer_socket_)), + producer_socket_(std::move(other.producer_socket_)) {} + + ~ConsumerContext() override = default; + + /*************************************************************** + * ConsumerSocket::ReadCallback implementation + ***************************************************************/ + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = receive_buffer_->writableData(); + + if (configuration_.rtc_) { + *max_length = kmtu; + } else { + *max_length = configuration_.receive_buffer_size_; + } + } + + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override { + // Nothing to do here + auto ret = std::move(buffer); + } + + void readDataAvailable(std::size_t length) noexcept override { + if (configuration_.rtc_) { + saved_stats_.received_bytes_ += length; + saved_stats_.received_data_pkt_++; + + // collecting delay stats. Just for performance testing + auto senderTimeStamp = + *reinterpret_cast<uint64_t *>(receive_buffer_->writableData()); + + auto now = utils::SystemTime::nowMs().count(); + auto new_delay = double(now - senderTimeStamp); + + if (senderTimeStamp > now) + new_delay = -1 * double(senderTimeStamp - now); + + saved_stats_.delay_sample_++; + saved_stats_.avg_data_delay_ = + saved_stats_.avg_data_delay_ + + (double(new_delay) - saved_stats_.avg_data_delay_) / + saved_stats_.delay_sample_; + + if (configuration_.test_mode_) { + saved_stats_.data_delays_ += std::to_string(int(new_delay)); + saved_stats_.data_delays_ += ","; + } + + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + producer_socket_->produceDatagram( + configuration_.relay_name_.makeName(), + receive_buffer_->writableData(), + length < payload_size_max_ ? length : payload_size_max_); + } + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + const uint8_t *start = receive_buffer_->writableData(); + start += sizeof(uint64_t); + std::size_t pkt_len = length - sizeof(uint64_t); + socket_.send_to(asio::buffer(start, pkt_len), remote_); + } + } + } + + size_t maxBufferSize() const override { + return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_; + } + + void readError(const std::error_code &ec) noexcept override { + getOutputStream() << "Error " << ec.message() + << " while reading from socket" << std::endl; + parent_.io_service_.stop(); + } + + void readSuccess(std::size_t total_size) noexcept override { + if (configuration_.rtc_) { + getOutputStream() << "Data successfully read" << std::endl; + } else { + auto t2 = utils::SteadyTime::now(); + auto dt = + utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2); + auto usec = dt.count(); + + getOutputStream() << "Content retrieved. Size: " << total_size + << " [Bytes]" << std::endl; + + getOutputStream() << "Elapsed Time: " << usec / 1000000.0 + << " seconds -- " + << double(total_size * 8) * 1.0 / double(usec) * 1.0 + << " [Mbps]" << std::endl; + + parent_.io_service_.stop(); + } + } + + /*************************************************************** + * End of ConsumerSocket::ReadCallback implementation + ***************************************************************/ + + private: + struct SavedStatistics { + utils::SteadyTime::TimePoint t_stats_{}; + utils::SteadyTime::TimePoint t_download_{}; + uint32_t total_duration_milliseconds_{0}; + uint64_t old_bytes_value_{0}; + uint64_t old_interest_tx_value_{0}; + uint64_t old_fec_interest_tx_value_{0}; + uint64_t old_fec_data_rx_value_{0}; + uint64_t old_lost_data_value_{0}; + uint64_t old_bytes_recovered_value_{0}; + uint64_t old_definitely_lost_data_value_{0}; + uint64_t old_retx_value_{0}; + uint64_t old_sent_int_value_{0}; + uint64_t old_received_nacks_value_{0}; + uint32_t old_fec_pkt_{0}; + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_{0}; + uint32_t delay_sample_{0}; + uint32_t received_bytes_{0}; + uint32_t received_data_pkt_{0}; + uint32_t auth_alerts_{0}; + std::string data_delays_{""}; + }; + + /*************************************************************** + * Transport callbacks + ***************************************************************/ + + void checkReceivedRtcContent( + [[maybe_unused]] const ConsumerSocket &c, + [[maybe_unused]] const transport::core::ContentObject &content_object) + const { + // Nothing to do here + } + + void processLeavingInterest( + const ConsumerSocket & /*c*/, + const transport::core::Interest & /*interest*/) const { + // Nothing to do here + } + + transport::auth::VerificationPolicy onAuthFailed( + transport::auth::Suffix /*suffix*/, + transport::auth::VerificationPolicy /*policy*/) { + saved_stats_.auth_alerts_++; + return transport::auth::VerificationPolicy::ACCEPT; + } + + void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c, + const TransportStatistics &stats) { + const char separator = ' '; + const int width = 18; + + utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now(); + auto exact_duration = + utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2); + + std::stringstream interval_ms; + interval_ms << saved_stats_.total_duration_milliseconds_ << "-" + << saved_stats_.total_duration_milliseconds_ + + exact_duration.count(); + + std::stringstream bytes_transferred; + bytes_transferred << std::fixed << std::setprecision(3) + << double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) / + 1000000.0 + << std::setfill(separator); + + std::stringstream bandwidth; + bandwidth << (double(stats.getBytesRecv() - + saved_stats_.old_bytes_value_) * + 8) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream window; + window << stats.getAverageWindowSize() << std::setfill(separator); + + std::stringstream avg_rtt; + avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt() + << std::setfill(separator); + + if (configuration_.rtc_) { + std::stringstream lost_data; + lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_ + << std::setfill(separator); + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + saved_stats_.old_bytes_recovered_value_ + << std::setfill(separator); + + std::stringstream definitely_lost_data; + definitely_lost_data << stats.getDefinitelyLostData() - + saved_stats_.old_definitely_lost_data_value_ + << std::setfill(separator); + + std::stringstream data_delay; + data_delay << std::fixed << std::setprecision(3) + << saved_stats_.avg_data_delay_ << std::setfill(separator); + + std::stringstream received_data_pkt; + received_data_pkt << saved_stats_.received_data_pkt_ + << std::setfill(separator); + + std::stringstream goodput; + goodput << std::fixed << std::setprecision(3) + << (saved_stats_.received_bytes_ * 8.0) / + (exact_duration.count()) / 1000.0 + << std::setfill(separator); + + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator); + + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_ + << std::setfill(separator); + + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - + saved_stats_.old_sent_int_value_ + << std::setfill(separator); + + std::stringstream nacks; + nacks << stats.getReceivedNacks() - + saved_stats_.old_received_nacks_value_ + << std::setfill(separator); + + std::stringstream fec_pkt; + fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_ + << std::setfill(separator); + + std::stringstream queuing_delay; + queuing_delay << std::fixed << std::setprecision(3) + << stats.getQueuingDelay() << std::setfill(separator); + + std::stringstream residual_losses; + double rl_perc = stats.getResidualLossRate() * 100; + residual_losses << std::fixed << std::setprecision(2) << rl_perc + << std::setfill(separator); + + std::stringstream quality_score; + quality_score << std::fixed << (int)stats.getQualityScore() + << std::setfill(separator); + + std::stringstream alerts; + alerts << stats.getAlerts() << std::setfill(separator); + + std::stringstream auth_alerts; + auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator); + + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() + << std::right << std::setw(width) << "RecvData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() + << std::right << std::setw(width) << "Goodput[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "LossRate[%]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "InterestSent"; + getOutputStream() + << std::right << std::setw(width) << "ReceivedNacks"; + getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]"; + getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]"; + getOutputStream() + << std::right << std::setw(width) << "QueuingDelay[ms]"; + getOutputStream() + << std::right << std::setw(width) << "LostData[pkt]"; + getOutputStream() + << std::right << std::setw(width) << "RecoveredData"; + getOutputStream() + << std::right << std::setw(width) << "DefinitelyLost"; + getOutputStream() << std::right << std::setw(width) << "State"; + getOutputStream() + << std::right << std::setw(width) << "DataDelay[ms]"; + getOutputStream() << std::right << std::setw(width) << "FecPkt"; + getOutputStream() << std::right << std::setw(width) << "Congestion"; + getOutputStream() + << std::right << std::setw(width) << "ResidualLosses"; + getOutputStream() << std::right << std::setw(width) << "QualityScore"; + getOutputStream() << std::right << std::setw(width) << "Alerts"; + getOutputStream() + << std::right << std::setw(width) << "AuthAlerts" << std::endl; + + first_ = false; + } + + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << received_data_pkt.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) << goodput.str(); + getOutputStream() << std::right << std::setw(width) << loss_rate.str(); + getOutputStream() << std::right << std::setw(width) << retx_sent.str(); + getOutputStream() << std::right << std::setw(width) + << interest_sent.str(); + getOutputStream() << std::right << std::setw(width) << nacks.str(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str(); + getOutputStream() << std::right << std::setw(width) + << queuing_delay.str(); + getOutputStream() << std::right << std::setw(width) << lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_recovered_data.str(); + getOutputStream() << std::right << std::setw(width) + << definitely_lost_data.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getCCStatus(); + getOutputStream() << std::right << std::setw(width) << data_delay.str(); + getOutputStream() << std::right << std::setw(width) << fec_pkt.str(); + getOutputStream() << std::right << std::setw(width) + << stats.isCongested(); + getOutputStream() << std::right << std::setw(width) + << residual_losses.str(); + getOutputStream() << std::right << std::setw(width) + << quality_score.str(); + getOutputStream() << std::right << std::setw(width) << alerts.str(); + getOutputStream() << std::right << std::setw(width) << auth_alerts.str() + << std::endl; + + if (configuration_.test_mode_) { + if (saved_stats_.data_delays_.size() > 0) + saved_stats_.data_delays_.pop_back(); + + auto now = utils::SteadyTime::nowMs(); + getOutputStream() << std::fixed << std::setprecision(0) << now.count() + << " DATA-DELAYS:[" << saved_stats_.data_delays_ + << "]" << std::endl; + } + } else { + if ((header_counter_ == 0 && configuration_.print_headers_) || first_) { + getOutputStream() << std::right << std::setw(width) << "Interval[ms]"; + getOutputStream() << std::right << std::setw(width) << "Transfer[MB]"; + getOutputStream() + << std::right << std::setw(width) << "Bandwidth[Mbps]"; + getOutputStream() << std::right << std::setw(width) << "Retr[pkt]"; + getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]"; + getOutputStream() + << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl; + + first_ = false; + } + + getOutputStream() << std::right << std::setw(width) + << interval_ms.str(); + getOutputStream() << std::right << std::setw(width) + << bytes_transferred.str(); + getOutputStream() << std::right << std::setw(width) << bandwidth.str(); + getOutputStream() << std::right << std::setw(width) + << stats.getRetxCount(); + getOutputStream() << std::right << std::setw(width) << window.str(); + getOutputStream() << std::right << std::setw(width) << avg_rtt.str() + << std::endl; + } + + saved_stats_.total_duration_milliseconds_ += + (uint32_t)exact_duration.count(); + saved_stats_.old_bytes_value_ = stats.getBytesRecv(); + saved_stats_.old_lost_data_value_ = stats.getLostData(); + saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + saved_stats_.old_definitely_lost_data_value_ = + stats.getDefinitelyLostData(); + saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv(); + saved_stats_.old_retx_value_ = stats.getRetxCount(); + saved_stats_.old_sent_int_value_ = stats.getInterestTx(); + saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks(); + saved_stats_.old_fec_pkt_ = stats.getReceivedFEC(); + saved_stats_.delay_sample_ = 0; + saved_stats_.avg_data_delay_ = 0; + saved_stats_.received_bytes_ = 0; + saved_stats_.received_data_pkt_ = 0; + saved_stats_.data_delays_ = ""; + saved_stats_.t_stats_ = utils::SteadyTime::Clock::now(); + + header_counter_ = (header_counter_ + 1) & kheader_counter_mask(); + + if (--nb_iterations_ == 0) { + // We reached the maximum nb of runs. Stop now. + parent_.io_service_.stop(); + } + } + + /*************************************************************** + * Setup functions + ***************************************************************/ + + int setupRTCSocket() { + int ret = ERROR_SUCCESS; + + configuration_.transport_protocol_ = transport::interface::RTC; + + if (configuration_.relay_ && configuration_.parallel_flows_ == 1) { + int production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + producer_socket_ = + std::make_unique<ProducerSocket>(production_protocol); + producer_socket_->registerPrefix(configuration_.relay_name_); + producer_socket_->connect(); + producer_socket_->start(); + } + + if (configuration_.output_stream_mode_ && + configuration_.parallel_flows_ == 1) { + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), configuration_.port_); + socket_.open(asio::ip::udp::v4()); + } + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + RtcTransportRecoveryStrategies recovery_strategy = + RtcTransportRecoveryStrategies::RTX_ONLY; + switch (configuration_.recovery_strategy_) { + case 1: + recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF; + break; + case 2: + recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY; + break; + case 3: + recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY; + break; + case 4: + recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED; + break; + case 5: + recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE; + break; + case 6: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH; + break; + case 7: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION; + break; + case 8: + recovery_strategy = + RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES; + break; + case 9: + recovery_strategy = + RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES; + break; + case 10: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH; + break; + case 11: + recovery_strategy = + RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION; + break; + default: + break; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::RECOVERY_STRATEGY, + static_cast<uint32_t>(recovery_strategy)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::CONTENT_SHARING_MODE, + configuration_.content_sharing_mode_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (transport::interface::ConsumerContentObjectCallback)std::bind( + &Impl::ConsumerContext::checkReceivedRtcContent, this, + std::placeholders::_1, std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + std::shared_ptr<TransportStatistics> transport_stats; + ret = consumer_socket_->getSocketOption( + transport::interface::OtherOptions::STATISTICS, + (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + return ERROR_SUCCESS; + } + + int setupRAAQMSocket() { + int ret = ERROR_SUCCESS; + + configuration_.transport_protocol_ = transport::interface::RAAQM; + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + if (configuration_.beta_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::BETA_VALUE, configuration_.beta_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.drop_factor_ != -1.f) { + ret = consumer_socket_->setSocketOption( + RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + return ERROR_SUCCESS; + } + + int setupCBRSocket() { + configuration_.transport_protocol_ = transport::interface::CBR; + + consumer_socket_ = + std::make_unique<ConsumerSocket>(configuration_.transport_protocol_); + + return ERROR_SUCCESS; + } + + public: + int setup() { + int ret; + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::VoidVerifier>(); + + if (configuration_.rtc_) { + ret = setupRTCSocket(); + } else if (configuration_.window_ < 0) { + ret = setupRAAQMSocket(); + } else { + ret = setupCBRSocket(); + } + + if (ret != ERROR_SUCCESS) { + return ret; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + configuration_.manifest_factor_relevant_); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + configuration_.manifest_factor_alert_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + GeneralTransportOptions::PACKET_FORMAT, + configuration_.packet_format_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + (StrategyCallback)[]( + [[maybe_unused]] transport::interface::notification::Strategy + strategy){ + // nothing to do + }); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_); + if (ret == SOCKET_OPTION_NOT_SET) { + getOutputStream() + << "ERROR -- Impossible to set the size of the window." + << std::endl; + return ERROR_SETUP; + } + + if (!configuration_.producer_certificate_.empty()) { + verifier = std::make_shared<transport::auth::AsymmetricVerifier>( + configuration_.producer_certificate_); + } + + if (!configuration_.passphrase_.empty()) { + verifier = std::make_shared<transport::auth::SymmetricVerifier>( + configuration_.passphrase_); + } + + verifier->setVerificationFailedCallback( + std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this, + std::placeholders::_1, std::placeholders::_2)); + + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + // Signer for aggregatd interests + std::shared_ptr<transport::auth::Signer> signer = + std::make_shared<transport::auth::VoidSigner>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + signer = std::make_shared<transport::auth::SymmetricSigner>( + transport::auth::CryptoSuite::HMAC_SHA256, + configuration_.aggr_interest_passphrase_); + } + ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + + if (configuration_.aggregated_interests_) { + ret = consumer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_INTERESTS, true); + + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (transport::interface::ConsumerInterestCallback)std::bind( + &ConsumerContext::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::READ_CALLBACK, this); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::STATS_SUMMARY, + (transport::interface::ConsumerTimerCallback)std::bind( + &Impl::ConsumerContext::handleTimerExpiration, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (consumer_socket_->setSocketOption( + GeneralTransportOptions::STATS_INTERVAL, + configuration_.report_interval_milliseconds_) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + consumer_socket_->connect(); + + return ERROR_SUCCESS; + } + + /*************************************************************** + * Run functions + ***************************************************************/ + + int run() { + getOutputStream() << "Starting download of " << flow_name_ << std::endl; + + saved_stats_.t_download_ = saved_stats_.t_stats_ = + utils::SteadyTime::now(); + consumer_socket_->consume(flow_name_); + + return ERROR_SUCCESS; + } + + // Members initialized by the constructor + std::shared_ptr<utils::MemBuf> receive_buffer_; + asio::ip::udp::socket socket_; + std::size_t payload_size_max_; + asio::ip::udp::endpoint remote_; + std::uint32_t nb_iterations_; + + // Members initialized by in-class initializer + SavedStatistics saved_stats_{}; + uint16_t header_counter_{0}; + bool first_{true}; + std::unique_ptr<ConsumerSocket> consumer_socket_; + std::unique_ptr<ProducerSocket> producer_socket_; + }; + + public: + explicit Impl(const hiperf::ClientConfiguration &conf) + : config_(conf), signals_(io_service_) {} + + virtual ~Impl() = default; + + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; + } + + consumer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = consumer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); + + if (ret) { + break; + } + } + + return ret; + } + + int run() { + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); + + for (auto &consumer_context : consumer_contexts_) { + consumer_context.run(); + } + + io_service_.run(); + + return ERROR_SUCCESS; + } + + ClientConfiguration &getConfig() { return config_; } + + private: + asio::io_service io_service_; + hiperf::ClientConfiguration config_; + asio::signal_set signals_; + std::vector<ConsumerContext> consumer_contexts_; +}; + +HIperfClient::HIperfClient(const ClientConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} + +HIperfClient::~HIperfClient() = default; + +int HIperfClient::setup() const { return impl_->setup(); } + +void HIperfClient::run() const { impl_->run(); } + +} // namespace hiperf diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h new file mode 100644 index 000000000..beecbd473 --- /dev/null +++ b/apps/hiperf/src/client.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <common.h> +#include <hicn/transport/utils/noncopyable.h> + +namespace hiperf { + +class HIperfClient : public ::utils::NonCopyable { + public: + explicit HIperfClient(const ClientConfiguration &conf); + + ~HIperfClient(); + int setup() const; + void run() const; + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +} // namespace hiperf diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h new file mode 100644 index 000000000..0f96bef1f --- /dev/null +++ b/apps/hiperf/src/common.h @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2021-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <hicn/transport/utils/color.h> +#include <hicn/transport/utils/literals.h> + +#ifndef _WIN32 +#include <hicn/transport/utils/daemonizator.h> +#endif + +#include <hicn/apps/utils/logger.h> + +#include <asio.hpp> +#include <cmath> +#include <fstream> +#include <iomanip> +#include <iostream> +#include <sstream> +#include <string> +#include <unordered_set> + +#ifndef ERROR_SUCCESS +#define ERROR_SUCCESS 0 +#endif +static constexpr int ERROR_SETUP = -5; +static constexpr uint32_t MIN_PROBE_SEQ = 0xefffffff; +static constexpr uint32_t RTC_HEADER_SIZE = 12; +static constexpr uint32_t FEC_HEADER_MAX_SIZE = 36; +static constexpr uint32_t HIPERF_MTU = 1500; + +namespace hiperf { + +using transport::core::Packet; +using transport::core::Prefix; + +/** + * Logger + */ +template <typename D, typename ConfType, typename ParentType> +class Base : public std::stringbuf, public std::ostream { + protected: + static inline const char separator[] = "| "; + + Base(ParentType &parent, asio::io_service &io_service, int identifier) + : std::stringbuf(), + std::ostream(this), + parent_(parent), + configuration_(parent_.getConfig()), + io_service_(io_service), + identifier_(identifier), + name_id_(D::getContextType() + std::to_string(identifier_)), + flow_name_(configuration_.name_.makeNameWithIndex(identifier_)) { + std::stringstream begin; + std::stringstream end; + if (configuration_.colored_) { + begin << color_mod_ << bold_mod_; + end << end_mod_; + } else { + begin << ""; + end << ""; + } + + begin << "|" << name_id_ << separator; + begin_ = begin.str(); + end_ = end.str(); + } + + Base(Base &&other) noexcept + : parent_(other.parent_), + configuration_(other.configuration_), + io_service_(other.io_service_), + identifier_(other.identifier_), + name_id_(std::move(other.name_id_)), + flow_name_(other.flow_name_) {} + + ~Base() {} + + /*************************************************************** + * std::stringbuf sync override + ***************************************************************/ + + int sync() override { + auto string = str(); + asio::post(io_service_, + [this, string]() { LoggerInfo() << begin_ << string << end_; }); + str(""); + + return 0; + } + + std::ostream &getOutputStream() { return *this; } + + // Members initialized by the constructor + ParentType &parent_; + ConfType &configuration_; + asio::io_service &io_service_; + int identifier_; + std::string name_id_; + transport::core::Name flow_name_; + std::string begin_; + std::string end_; + + // Members initialized by the in-class initializer + utils::ColorModifier color_mod_; + utils::ColorModifier bold_mod_{utils::ColorModifier::Code::BOLD}; + utils::ColorModifier end_mod_{utils::ColorModifier::Code::RESET}; +}; + +static inline int ensureFlows(const Prefix &prefix, std::size_t flows) { + int ret = ERROR_SUCCESS; + + // Make sure the provided prefix length not allows to accomodate the + // provided number of flows. + uint16_t max_ip_addr_len_bits; + uint16_t log2_n_flow; + u64 max_n_flow; + if (prefix.getAddressFamily() == AF_INET) { + max_ip_addr_len_bits = IPV4_ADDR_LEN_BITS; + } else if (prefix.getAddressFamily() == AF_INET6) { + max_ip_addr_len_bits = IPV6_ADDR_LEN_BITS; + } else { + LoggerErr() << "Error: unknown address family."; + ret = ERROR_SETUP; + goto END; + } + + log2_n_flow = max_ip_addr_len_bits - prefix.getPrefixLength(); + max_n_flow = log2_n_flow < 64 ? (1 << log2_n_flow) : ~0ULL; + + if (flows > max_n_flow) { + LoggerErr() << "Error: the provided prefix length does not allow to " + "accomodate the provided number of flows (" + << flows << " > " << max_n_flow << ")."; + ret = ERROR_SETUP; + } + +END: + return ret; +} + +/** + * Class to retrieve the maximum payload size given the MTU and packet headers. + */ +class PayloadSize { + public: + PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU) + : mtu_(mtu), format_(format) {} + + std::size_t getPayloadSizeMax(std::size_t transport_size = 0, + std::size_t fec_size = 0, + std::size_t signature_size = 0) { + return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) - + transport_size - fec_size; + } + + private: + std::size_t mtu_; + Packet::Format format_; +}; + +/** + * Class for handling the production rate for the RTC producer. + */ +class Rate { + public: + Rate() {} + ~Rate() {} + + explicit Rate(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + } + + Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} + + Rate &operator=(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + + return *this; + } + + std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { + return std::chrono::microseconds( + (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); + } + + private: + float rate_kbps_ = 0.0; +}; + +struct packet_t { + uint64_t timestamp; + uint32_t size; +}; + +struct Configuration { + Prefix name_{"b001::abcd/64"}; + std::string passphrase_; + std::string aggr_interest_passphrase_; + bool rtc_{false}; + uint16_t port_{0}; + bool aggregated_data_{false}; + Packet::Format packet_format_{ + transport::interface::default_values::packet_format}; + uint32_t parallel_flows_{1}; + bool colored_{true}; +}; + +/** + * Container for command line configuration for hiperf client. + */ +struct ClientConfiguration : Configuration { + double beta_{-1.f}; + double drop_factor_{-1.f}; + double window_{-1.f}; + std::string producer_certificate_; + std::size_t receive_buffer_size_{128 * 1024}; + std::uint32_t report_interval_milliseconds_{1000}; + transport::interface::TransportProtocolAlgorithms transport_protocol_{ + transport::interface::CBR}; + bool test_mode_{false}; + bool relay_{false}; + Prefix producer_prefix_; + uint32_t interest_lifetime_{500}; + uint32_t manifest_factor_relevant_{100}; + uint32_t manifest_factor_alert_{20}; + Prefix relay_name_{"c001::abcd/64"}; + bool output_stream_mode_{false}; + uint32_t recovery_strategy_{4}; + bool print_headers_{true}; + std::uint32_t nb_iterations_{ + std::numeric_limits<decltype(nb_iterations_)>::max()}; + bool content_sharing_mode_{false}; + bool aggregated_interests_{false}; +}; + +/** + * Container for command line configuration for hiperf server. + */ +struct ServerConfiguration : Configuration { + bool virtual_producer_{true}; + std::uint32_t manifest_max_capacity_{0}; + bool live_production_{false}; + std::uint32_t content_lifetime_{ + transport::interface::default_values::content_object_expiry_time}; + std::uint32_t download_size_{20 * 1024 * 1024}; + transport::auth::CryptoHashType hash_algorithm_{ + transport::auth::CryptoHashType::SHA256}; + std::string keystore_name_; + std::string keystore_password_{"cisco"}; + bool multiphase_produce_{false}; + bool interactive_{false}; + bool trace_based_{false}; + std::uint32_t trace_index_{0}; + char *trace_file_{nullptr}; + Rate production_rate_{"2048kbps"}; + std::size_t payload_size_{1384}; + bool input_stream_mode_{false}; + std::vector<struct packet_t> trace_; + std::string fec_type_; +}; + +} // namespace hiperf diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc new file mode 100644 index 000000000..25c1a288c --- /dev/null +++ b/apps/hiperf/src/main.cc @@ -0,0 +1,571 @@ +/* + * Copyright (c) 2021-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <client.h> +#include <hicn/apps/utils/logger.h> +#include <server.h> + +namespace hiperf { + +using transport::auth::CryptoHashType; + +static std::unordered_map<std::string, hicn_packet_format_t> const + packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP}, + {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP}, + {"new", HICN_PACKET_FORMAT_NEW}}; + +std::string str_tolower(std::string s) { + std::transform(s.begin(), s.end(), s.begin(), + [](unsigned char c) { return std::tolower(c); }); + return s; +} + +void usage() { + LoggerInfo() << "HIPERF - Instrumentation tool for performing active network" + "measurements with hICN"; + LoggerInfo() << "usage: hiperf [-S|-C] [options] [prefix|name]"; + LoggerInfo(); + LoggerInfo() << "SERVER OR CLIENT:"; +#ifndef _WIN32 + LoggerInfo() << "-D\t\t\t\t\t" + << "Run as a daemon"; + LoggerInfo() << "-R\t\t\t\t\t" + << "Run RTC protocol (client or server)"; + LoggerInfo() << "-f\t<filename>\t\t\t" + << "Log file"; + LoggerInfo() << "-z\t<io_module>\t\t\t" + << "IO module to use. Default: hicnlight_module"; + LoggerInfo() << "-F\t<conf_file>\t\t\t" + << "Path to optional configuration file for libtransport"; + LoggerInfo() << "-a\t\t\t\t\t" + << "Enables data packet aggregation. " + << "Works only in RTC mode"; + LoggerInfo() << "-X\t<param>\t\t\t\t" + << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#"; + LoggerInfo() + << "-J\t<passphrase>\t\t\t" + << "Set the passphrase used to sign/verify aggregated interests. " + "If set on the client, aggregated interests are enable automatically."; +#endif + LoggerInfo(); + LoggerInfo() << "SERVER SPECIFIC:"; + LoggerInfo() + << "-A\t<content_size>\t\t\t" + "Sends an application data unit in bytes that is published once " + "before exit"; + LoggerInfo() << "-E\t<expiry_time>\t\t\t" + "Expiration time for data packets generated by the producer " + "socket"; + LoggerInfo() << "-s\t<packet_size>\t\t\tData packet payload size."; + LoggerInfo() << "-r\t\t\t\t\t" + << "Produce real content of <content_size> bytes"; + LoggerInfo() + << "-m\t<manifest_max_capacity>\t\t" + << "The maximum number of entries a manifest can contain. Set it " + "to 0 to disable manifests. Default is 30, max is 255."; + LoggerInfo() << "-l\t\t\t\t\t" + << "Start producing content upon the reception of the " + "first interest"; + LoggerInfo() << "-K\t<keystore_path>\t\t\t" + << "Path of p12 file containing the " + "crypto material used for signing packets"; + LoggerInfo() << "-k\t<passphrase>\t\t\t" + << "String from which a 128-bit symmetric key will be " + "derived for signing packets"; + LoggerInfo() << "-p\t<password>\t\t\t" + << "Password for p12 keystore"; + LoggerInfo() << "-y\t<hash_algorithm>\t\t" + << "Use the selected hash algorithm for " + "computing manifest digests (default: SHA256)"; + LoggerInfo() << "-x\t\t\t\t\t" + << "Produces application data units of size <content_size> " + << "without resetting the name suffix to 0."; + LoggerInfo() << "-B\t<bitrate>\t\t\t" + << "RTC producer data bitrate, to be used with the -R option."; +#ifndef _WIN32 + LoggerInfo() << "-I\t\t\t\t\t" + "Interactive mode, start/stop real time content production " + "by pressing return. To be used with the -R option"; + LoggerInfo() + << "-T\t<filename>\t\t\t" + "Trace based mode, hiperf takes as input a file with a trace. " + "Each line of the file indicates the timestamp and the size of " + "the packet to generate. To be used with the -R option. -B and -I " + "will be ignored."; + LoggerInfo() << "-G\t<port>\t\t\t\t" + << "Input stream from localhost at the specified port"; +#endif + LoggerInfo(); + LoggerInfo() << "CLIENT SPECIFIC:"; + LoggerInfo() << "-b\t<beta_parameter>\t\t" + << "RAAQM beta parameter"; + LoggerInfo() << "-d\t<drop_factor_parameter>\t\t" + << "RAAQM drop factor " + "parameter"; + LoggerInfo() << "-L\t<interest lifetime>\t\t" + << "Set interest lifetime."; + LoggerInfo() << "-U\t<factor>\t\t\t" + << "Update the relevance threshold: if an unverified packet has " + "been received before the last U * manifest_max_capacity_ " + "packets received (verified or not), it will be flushed out. " + "Should be > 1, default is 100."; + LoggerInfo() + << "-u\t<factor>\t\t\t" + << "Update the alert threshold: if the " + "number of unverified packet is > u * manifest_max_capacity_, " + "an alert is raised. Should be set such that U > u >= 1, " + "default is 20. If u >= U, no alert will ever be raised."; + LoggerInfo() << "-M\t<input_buffer_size>\t\t" + << "Size of consumer input buffer. If 0, reassembly of packets " + "will be disabled."; + LoggerInfo() + << "-N\t\t\t\t\t" + << "Enable aggregated interests; the number of suffixes (including " + "the one in the header) can be set through the env variable " + "`MAX_AGGREGATED_INTERESTS`."; + LoggerInfo() << "-W\t<window_size>\t\t\t" + << "Use a fixed congestion window " + "for retrieving the data."; + LoggerInfo() << "-i\t<stats_interval>\t\t" + << "Show the statistics every <stats_interval> milliseconds."; + LoggerInfo() + << "-c\t<certificate_path>\t\t" + << "Path of the producer certificate to be used for verifying the " + "origin of the packets received."; + LoggerInfo() + << "-k\t<passphrase>\t\t\t" + << "String from which is derived the symmetric key used by the " + "producer to sign packets and by the consumer to verify them."; + LoggerInfo() << "-t\t\t\t\t\t" + "Test mode, check if the client is receiving the " + "correct data. This is an RTC specific option, to be " + "used with the -R (default: false)"; + LoggerInfo() + << "-P\t\t\t\t\t" + << "Number of parallel streams. For hiperf client, this is the " + "number of consumer to create, while for hiperf server this is " + "the number of producers to create."; + LoggerInfo() << "-j\t<relay_name>\t\t\t" + << "Publish received content under the name relay_name." + "This is an RTC specific option, to be " + "used with the -R (default: false)"; + LoggerInfo() << "-g\t<port>\t\t\t\t" + << "Output stream to localhost at the specified port"; + LoggerInfo() + << "-o\t\t\t\t\t" + << "Content sharing mode: if set the socket work in content sharing" + << "mode. It works only in RTC mode"; + LoggerInfo() << "-e\t<strategy>\t\t\t" + << "Enhance the network with a reliability strategy. Options"; + LoggerInfo() << "\t\t\t\t\t\t1: unreliable "; + LoggerInfo() << "\t\t\t\t\t\t2: rtx only "; + LoggerInfo() << "\t\t\t\t\t\t3: fec only "; + LoggerInfo() << "\t\t\t\t\t\t4: delay based "; + LoggerInfo() << "\t\t\t\t\t\t5: low rate "; + LoggerInfo() << "\t\t\t\t\t\t6: low rate and best path "; + LoggerInfo() << "\t\t\t\t\t\t7: low rate and replication"; + LoggerInfo() << "\t\t\t\t\t\t8: low rate and best path/replication "; + LoggerInfo() << "\t\t\t\t\t\t9: only fec low residual losses "; + LoggerInfo() << "\t\t\t\t\t\t10: delay and best path "; + LoggerInfo() << "\t\t\t\t\t\t11: delay and replication "; + LoggerInfo() << "\t\t\t\t\t\t(default: 2 = rtx only) "; + LoggerInfo() << "-H\t\t\t\t\t" + << "Disable periodic print headers in stats report."; + LoggerInfo() << "-n\t<nb_iterations>\t\t\t" + << "Print the stats report <nb_iterations> times and exit.\n" + << "\t\t\t\t\tThis option limits the duration of the run to " + "<nb_iterations> * <stats_interval> milliseconds."; + LoggerInfo() << "-w <packet_format> Packet format (without signature, " + "defaults to IPV6_TCP)"; +} + +int hiperf_main(int argc, char *argv[]) { +#ifndef _WIN32 + // Common + bool daemon = false; +#else + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + + transport::interface::global_config::GlobalConfigInterface global_conf; + + // -1 server, 0 undefined, 1 client + int role = 0; + int options = 0; + + const char *log_file = nullptr; + transport::interface::global_config::IoModuleConfiguration config; + std::string conf_file; + config.name = "hicnlight_module"; + + // Consumer + ClientConfiguration client_configuration; + + // Producer + ServerConfiguration server_configuration; + + int opt; +#ifndef _WIN32 + // Please keep in alphabetical order. + while ( + (opt = getopt(argc, argv, + "A:B:CDE:F:G:HIJ:K:L:M:NP:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:" + "n:op:qrs:tu:vw:xy:z:")) != -1) { + switch (opt) { + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + server_configuration.trace_based_ = false; + server_configuration.input_stream_mode_ = false; + break; + } + case 'T': { + server_configuration.interactive_ = false; + server_configuration.trace_based_ = true; + server_configuration.input_stream_mode_ = false; + server_configuration.trace_file_ = optarg; + break; + } + case 'G': { + server_configuration.interactive_ = false; + server_configuration.trace_based_ = false; + server_configuration.input_stream_mode_ = true; + server_configuration.port_ = std::stoul(optarg); + break; + } + case 'g': { + client_configuration.output_stream_mode_ = true; + client_configuration.port_ = std::stoul(optarg); + break; + } +#else + // Please keep in alphabetical order. + while ((opt = getopt(argc, argv, + "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:op:rs:" + "tu:vwxy:z:")) != -1) { + switch (opt) { +#endif + case 'E': { + server_configuration.content_lifetime_ = std::stoul(optarg); + break; + } + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } + case 'a': { + client_configuration.aggregated_data_ = true; + server_configuration.aggregated_data_ = true; + break; + } + case 'o': { + client_configuration.content_sharing_mode_ = true; + break; + } + case 'w': { + std::string packet_format_s = std::string(optarg); + packet_format_s = str_tolower(packet_format_s); + auto it = packet_format_map.find(std::string(optarg)); + if (it == packet_format_map.end()) + throw std::runtime_error("Bad packet format"); + client_configuration.packet_format_ = it->second; + server_configuration.packet_format_ = it->second; + break; + } + case 'k': { + server_configuration.passphrase_ = std::string(optarg); + client_configuration.passphrase_ = std::string(optarg); + break; + } + case 'z': { + config.name = optarg; + break; + } + case 'F': { + conf_file = optarg; + break; + } + + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } + case 'q': { + client_configuration.colored_ = server_configuration.colored_ = false; + break; + } + case 'J': { + client_configuration.aggr_interest_passphrase_ = optarg; + server_configuration.aggr_interest_passphrase_ = optarg; + // Consumer signature is only used with aggregated interests, + // hence enabling it also forces usage of aggregated interests + client_configuration.aggregated_interests_ = true; + break; + } + // Client specifc + case 'b': { + client_configuration.beta_ = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor_ = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window_ = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.receive_buffer_size_ = std::stoull(optarg); + options = 1; + break; + } + case 'N': { + client_configuration.aggregated_interests_ = true; + break; + } + case 'P': { + client_configuration.parallel_flows_ = + server_configuration.parallel_flows_ = std::stoull(optarg); + break; + } + case 'c': { + client_configuration.producer_certificate_ = std::string(optarg); + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } + case 't': { + client_configuration.test_mode_ = true; + options = 1; + break; + } + case 'L': { + client_configuration.interest_lifetime_ = std::stoul(optarg); + options = 1; + break; + } + case 'U': { + client_configuration.manifest_factor_relevant_ = std::stoul(optarg); + options = 1; + break; + } + case 'u': { + client_configuration.manifest_factor_alert_ = std::stoul(optarg); + options = 1; + break; + } + case 'j': { + client_configuration.relay_ = true; + client_configuration.relay_name_ = Prefix(optarg); + options = 1; + break; + } + case 'H': { + client_configuration.print_headers_ = false; + options = 1; + break; + } + case 'n': { + client_configuration.nb_iterations_ = std::stoul(optarg); + options = 1; + break; + } + // Server specific + case 'A': { + server_configuration.download_size_ = std::stoul(optarg); + options = -1; + break; + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer_ = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest_max_capacity_ = std::stoul(optarg); + options = -1; + break; + } + case 'l': { + server_configuration.live_production_ = true; + options = -1; + break; + } + case 'K': { + server_configuration.keystore_name_ = std::string(optarg); + options = -1; + break; + } + case 'y': { + CryptoHashType hash_algorithm = CryptoHashType::SHA256; + if (strncasecmp(optarg, "sha256", 6) == 0) { + hash_algorithm = CryptoHashType::SHA256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + hash_algorithm = CryptoHashType::SHA512; + } else if (strncasecmp(optarg, "blake2b512", 10) == 0) { + hash_algorithm = CryptoHashType::BLAKE2B512; + } else if (strncasecmp(optarg, "blake2s256", 10) == 0) { + hash_algorithm = CryptoHashType::BLAKE2S256; + } else { + LoggerWarn() << "Unknown hash algorithm. Using SHA 256."; + } + server_configuration.hash_algorithm_ = hash_algorithm; + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password_ = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + server_configuration.production_rate_ = str; + options = -1; + break; + } + case 'e': { + client_configuration.recovery_strategy_ = std::stoul(optarg); + options = 1; + break; + } + case 'X': { + server_configuration.fec_type_ = std::string(optarg); + options = -1; + break; + } + default: + usage(); + return EXIT_FAILURE; + } + } + + if (options > 0 && role < 0) { + LoggerErr() << "Client options cannot be used when using the " + "software in server mode"; + usage(); + return EXIT_FAILURE; + } else if (options < 0 && role > 0) { + LoggerErr() << "Server options cannot be used when using the " + "software in client mode"; + usage(); + return EXIT_FAILURE; + } else if (!role) { + LoggerErr() << "Please specify if running hiperf as client " + "or server."; + usage(); + return EXIT_FAILURE; + } + + if (argv[optind] == 0) { + LoggerErr() << "Please specify the name/prefix to use."; + usage(); + return EXIT_FAILURE; + } else { + if (role > 0) { + client_configuration.name_ = Prefix(argv[optind]); + } else { + server_configuration.name_ = Prefix(argv[optind]); + } + } + + if (log_file) { +#ifndef _WIN32 + int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR); + dup2(fd, STDOUT_FILENO); + dup2(STDOUT_FILENO, STDERR_FILENO); + close(fd); +#else + int fd = + _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD); + _dup2(fd, _fileno(stdout)); + _dup2(_fileno(stdout), _fileno(stderr)); + _close(fd); +#endif + } + +#ifndef _WIN32 + if (daemon) { + utils::Daemonizator::daemonize(false); + } +#endif + + /** + * IO module configuration + */ + config.set(); + + // Parse config file + global_conf.parseConfigurationFile(conf_file); + + if (role > 0) { + HIperfClient c(client_configuration); + if (c.setup() != ERROR_SETUP) { + c.run(); + } + } else if (role < 0) { + HIperfServer s(server_configuration); + if (s.setup() != ERROR_SETUP) { + s.run(); + } + } else { + usage(); + return EXIT_FAILURE; + } + +#ifdef _WIN32 + WSACleanup(); +#endif + + return 0; +} + +} // namespace hiperf + +int main(int argc, char *argv[]) { return hiperf::hiperf_main(argc, argv); } diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc new file mode 100644 index 000000000..3f6c335f9 --- /dev/null +++ b/apps/hiperf/src/server.cc @@ -0,0 +1,680 @@ +/* + * Copyright (c) 2021-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <server.h> + +namespace hiperf { + +using transport::core::ContentObject; +using transport::core::Interest; +using transport::core::Name; +using transport::interface::GeneralTransportOptions; +using transport::interface::ProducerCallbacksOptions; +using transport::interface::ProducerInterestCallback; +using transport::interface::ProducerSocket; +using transport::interface::ProductionProtocolAlgorithms; + +/** + * Hiperf server class: configure and setup an hicn producer following the + * ServerConfiguration. + */ +class HIperfServer::Impl { + static constexpr std::size_t klog2_content_object_buffer_size() { return 8; } + static constexpr std::size_t kcontent_object_buffer_size() { + return (1 << klog2_content_object_buffer_size()); + } + static constexpr std::size_t kmask() { + return (kcontent_object_buffer_size() - 1); + } + + /** + * @brief As we can (potentially) setup many producer sockets, we need to keep + * a separate context for each one of them. The context contains parameters + * and variable that are specific to a single producer socket. + */ + class ProducerContext + : public Base<ProducerContext, ServerConfiguration, Impl>, + public ProducerSocket::Callback { + public: + using ConfType = ServerConfiguration; + using ParentType = typename HIperfServer::Impl; + static inline const auto getContextType() { return "ProducerContext"; } + + ProducerContext(HIperfServer::Impl &server, int producer_identifier) + : Base(server, server.io_service_, producer_identifier) { + // Allocate buffer to copy as content objects payload + std::string buffer(configuration_.payload_size_, 'X'); + + // Allocate array of content objects. They are share_ptr so that the + // transport will only capture a reference to them instead of performing + // an hard copy. + for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) { + const auto &element = + content_objects_.emplace_back(std::make_shared<ContentObject>( + configuration_.name_.makeName(), configuration_.packet_format_, + 0, (const uint8_t *)buffer.data(), buffer.size())); + element->setLifetime( + transport::interface::default_values::content_object_expiry_time); + } + } + + // To make vector happy (move or copy constructor is needed when vector + // resizes) + ProducerContext(ProducerContext &&other) noexcept + : Base(std::move(other)), + content_objects_(std::move(other.content_objects_)), + unsatisfied_interests_(std::move(other.unsatisfied_interests_)), + last_segment_(other.last_segment_), + producer_socket_(std::move(other.producer_socket_)), + content_objects_index_(other.content_objects_index_), + payload_size_max_(other.payload_size_max_) {} + + virtual ~ProducerContext() = default; + + /** + * @brief Produce datagram + */ + void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const { + assert(producer_socket_); + + auto size = std::min(buffer_size, payload_size_max_); + + producer_socket_->produceDatagram(flow_name_, buffer, size); + } + + /** + * @brief Create and setup the producer socket + */ + int setup() { + int ret; + int production_protocol; + std::shared_ptr<transport::auth::Signer> signer = + std::make_shared<transport::auth::VoidSigner>(); + + if (!configuration_.rtc_) { + production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; + } else { + production_protocol = ProductionProtocolAlgorithms::RTC_PROD; + } + + producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); + + if (producer_socket_->setSocketOption( + ProducerCallbacksOptions::PRODUCER_CALLBACK, this) == + SOCKET_OPTION_NOT_SET) { + getOutputStream() << "Failed to set producer callback." << std::endl; + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::HASH_ALGORITHM, + configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MANIFEST_MAX_CAPACITY, + configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption(transport::interface::PACKET_FORMAT, + configuration_.packet_format_) == + SOCKET_OPTION_NOT_SET) { + getOutputStream() << "ERROR -- Impossible to set the packet format." + << std::endl; + return ERROR_SETUP; + } + + if (!configuration_.passphrase_.empty()) { + signer = std::make_shared<transport::auth::SymmetricSigner>( + transport::auth::CryptoSuite::HMAC_SHA256, + configuration_.passphrase_); + } + + if (!configuration_.keystore_name_.empty()) { + signer = std::make_shared<transport::auth::AsymmetricSigner>( + configuration_.keystore_name_, configuration_.keystore_password_); + } + + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, + signer); + + // Compute maximum payload size + Packet::Format format = configuration_.packet_format_; + if (!configuration_.manifest_max_capacity_) + format = Packet::toAHFormat(format); + payload_size_max_ = PayloadSize(format).getPayloadSizeMax( + configuration_.rtc_ ? RTC_HEADER_SIZE : 0, + configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE, + !configuration_.manifest_max_capacity_ + ? signer->getSignatureFieldSize() + : 0); + + if (configuration_.payload_size_ > payload_size_max_) { + getOutputStream() << "WARNING: Payload has size " + << configuration_.payload_size_ << ", maximum is " + << payload_size_max_ + << ". Payload will be truncated to fit." << std::endl; + } + + // Verifier for aggregated interests + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::VoidVerifier>(); + if (!configuration_.aggr_interest_passphrase_.empty()) { + verifier = std::make_unique<transport::auth::SymmetricVerifier>( + configuration_.aggr_interest_passphrase_); + } + ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier); + if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption( + transport::interface::RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = producer_socket_->setSocketOption( + GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + producer_socket_->registerPrefix(Prefix(flow_name_, 128)); + producer_socket_->connect(); + producer_socket_->start(); + + if (configuration_.rtc_) { + return ERROR_SUCCESS; + } + + if (!configuration_.virtual_producer_) { + if (producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAX_SEGMENT_SIZE, + static_cast<uint32_t>(configuration_.payload_size_)) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (!configuration_.live_production_) { + produceContent(*producer_socket_, configuration_.name_.makeName(), 0); + } else { + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::asyncProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + } else { + ret = producer_socket_->setSocketOption( + GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::virtualProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + ret = producer_socket_->setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + (transport::interface::ProducerContentCallback)bind( + &ProducerContext::onContentProduced, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + return ERROR_SUCCESS; + } + + int run() { + getOutputStream() << "started to serve consumers with name " << flow_name_ + << std::endl; + return ERROR_SUCCESS; + } + + void stop() { + getOutputStream() << "stopped to serve consumers" << std::endl; + producer_socket_->stop(); + } + + private: + /** + * @brief Produce an existing content object. Set the name as the + * interest. + */ + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { + content_objects_[content_objects_index_ & kmask()]->setName( + interest.getName()); + p.produce(*content_objects_[content_objects_index_++ & kmask()]); + } + + /** + * @brief Create and produce a buffer of configuration_.download_size_ + * length. + */ + void produceContent(ProducerSocket &p, const Name &content_name, + uint32_t suffix) const { + uint32_t total; + + auto b = utils::MemBuf::create(configuration_.download_size_); + std::memset(b->writableData(), '?', configuration_.download_size_); + b->append(configuration_.download_size_); + + utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now(); + total = p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now(); + + LoggerInfo() << "Written " << total + << " data packets in output buffer (Segmentation time: " + << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)" + << std::endl; + } + + /** + * @brief Synchronously produce content upon reception of one interest + */ + void processInterest(ProducerSocket &p, const Interest &interest) const { + p.setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)transport::interface::VOID_HANDLER); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime_); + + produceContent(p, interest.getName(), interest.getName().getSuffix()); + LoggerInfo() << "Received interest " << interest.getName().getSuffix() + << std::endl; + } + + /** + * @brief Async create and produce a buffer of + * configuration_.download_size_ length. + */ + void produceContentAsync(ProducerSocket &p, Name content_name, + uint32_t suffix) { + parent_.produce_thread_.add([this, suffix, content_name, &p]() { + auto b = utils::MemBuf::create(configuration_.download_size_); + std::memset(b->writableData(), '?', configuration_.download_size_); + b->append(configuration_.download_size_); + + last_segment_ = + suffix + p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, + suffix); + }); + } + + /** + * @brief Asynchronously produce content upon reception of one interest + */ + void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::cacheMiss, this, + std::placeholders::_1, std::placeholders::_2)); + p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + configuration_.content_lifetime_); + uint32_t suffix = interest.getName().getSuffix(); + + if (suffix == 0) { + last_segment_ = 0; + unsatisfied_interests_.clear(); + } + + // The suffix will either come from the received interest or will be set + // to the smallest suffix of a previous interest not satisfied + if (!unsatisfied_interests_.empty()) { + auto it = std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), last_segment_); + if (it != unsatisfied_interests_.end()) { + suffix = *it; + } + unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); + } + + getOutputStream() << " Received interest " + << interest.getName().getSuffix() + << ", starting production at " << suffix << end_mod_ + << std::endl; + getOutputStream() << unsatisfied_interests_.size() + << " interests still unsatisfied" << end_mod_ + << std::endl; + produceContentAsync(p, interest.getName(), suffix); + } + + /** + * @brief Register cache miss events + */ + void cacheMiss([[maybe_unused]] const ProducerSocket &p, + const Interest &interest) { + unsatisfied_interests_.push_back(interest.getName().getSuffix()); + } + + /** + * @brief When content is produced, set cache miss callback so that we can + * register any cache miss happening after the production. + */ + void onContentProduced(ProducerSocket &p, + [[maybe_unused]] const std::error_code &err, + [[maybe_unused]] uint64_t bytes_written) { + p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind( + &ProducerContext::asyncProcessInterest, this, + std::placeholders::_1, std::placeholders::_2)); + } + + /** + * @brief Internal producer error. When this callback is triggered + * something important happened. Here we stop the program. + */ + void produceError(const std::error_code &err) noexcept override { + getOutputStream() << "Error from producer transport: " << err.message() + << std::endl; + parent_.stop(); + } + + // Members initialized in constructor + std::vector<ContentObject::Ptr> content_objects_; + + // Members initialized by in-class initializer + std::vector<uint32_t> unsatisfied_interests_; + std::uint32_t last_segment_{0}; + std::unique_ptr<ProducerSocket> producer_socket_{nullptr}; + std::uint16_t content_objects_index_{0}; + std::size_t payload_size_max_{0}; + }; + + public: + explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) { +#ifndef _WIN32 + if (config_.interactive_) { + input_.assign(::dup(STDIN_FILENO)); + } +#endif + + std::memset(rtc_payload_.data(), 'X', rtc_payload_.size()); + } + + ~Impl() = default; + + int setup() { + int ret = ensureFlows(config_.name_, config_.parallel_flows_); + if (ret != ERROR_SUCCESS) { + return ret; + } + + producer_contexts_.reserve(config_.parallel_flows_); + for (uint32_t i = 0; i < config_.parallel_flows_; i++) { + auto &ctx = producer_contexts_.emplace_back(*this, i); + ret = ctx.setup(); + + if (ret) { + break; + } + } + + return ret; + } + + void receiveStream() { + socket_.async_receive_from( + asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()), + remote_, [this](const std::error_code &ec, std::size_t length) { + if (ec) return; + sendRTCContentFromStream(recv_buffer_.writableData(), length); + receiveStream(); + }); + } + + void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) { + // this is used to compute the data packet delay + // Used only for performance evaluation + // It requires clock synchronization between producer and consumer + auto now = utils::SystemTime::nowMs().count(); + + auto start = rtc_payload_.data(); + std::memcpy(start, &now, sizeof(uint64_t)); + std::memcpy(start + sizeof(uint64_t), buff, len); + + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, len + sizeof(uint64_t)); + } + } + + void sendRTCContentObjectCallback(const std::error_code &ec) { + if (ec) return; + rtc_timer_.expires_from_now( + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + + auto start = rtc_payload_.data(); + + // this is used to compute the data packet delay + // Used only for performance evaluation + // It requires clock synchronization between producer and consumer + auto now = utils::SystemTime::nowMs().count(); + std::memcpy(start, &now, sizeof(uint64_t)); + + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, config_.payload_size_); + } + } + + void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) { + if (ec) return; + + std::size_t packet_len = config_.trace_[config_.trace_index_].size; + + // this is used to compute the data packet delay + // used only for performance evaluation + // it requires clock synchronization between producer and consumer + auto now = utils::SystemTime::nowMs().count(); + auto start = rtc_payload_.data(); + std::memcpy(start, &now, sizeof(uint64_t)); + + if (packet_len > config_.payload_size_) { + packet_len = config_.payload_size_; + } + + for (const auto &producer_context : producer_contexts_) { + producer_context.produceDatagram(start, packet_len); + } + + uint32_t next_index = config_.trace_index_ + 1; + uint64_t schedule_next; + if (next_index < config_.trace_.size()) { + schedule_next = config_.trace_[next_index].timestamp - + config_.trace_[config_.trace_index_].timestamp; + } else { + // here we need to loop, schedule in a random time + schedule_next = 1000; + } + + config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size(); + rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); + rtc_timer_.async_wait( + std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, + std::placeholders::_1)); + } + + int parseTraceFile() { + std::ifstream trace(config_.trace_file_); + if (trace.fail()) { + return -1; + } + std::string line; + while (std::getline(trace, line)) { + std::istringstream iss(line); + hiperf::packet_t packet; + iss >> packet.timestamp >> packet.size; + config_.trace_.push_back(packet); + } + return 0; + } + +#ifndef _WIN32 + void handleInput(const std::error_code &error, std::size_t length) { + if (error) { + stop(); + } + + if (rtc_running_) { + LoggerInfo() << "stop real time content production" << std::endl; + rtc_running_ = false; + rtc_timer_.cancel(); + } else { + LoggerInfo() << "start real time content production" << std::endl; + rtc_running_ = true; + rtc_timer_.expires_from_now( + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&Impl::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } +#endif + + void stop() { + for (auto &producer_context : producer_contexts_) { + producer_context.stop(); + } + + io_service_.stop(); + } + + int run() { + signals_.add(SIGINT); + signals_.async_wait( + [this](const std::error_code &, const int &) { stop(); }); + + if (config_.rtc_) { + if (config_.interactive_) { + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&Impl::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } else if (config_.trace_based_) { + LoggerInfo() << "trace-based mode enabled" << std::endl; + if (config_.trace_file_ == nullptr) { + LoggerErr() << "cannot find the trace file" << std::endl; + return ERROR_SETUP; + } + if (parseTraceFile() < 0) { + LoggerErr() << "cannot parse the trace file" << std::endl; + return ERROR_SETUP; + } + rtc_running_ = true; + rtc_timer_.expires_from_now(std::chrono::milliseconds(1)); + rtc_timer_.async_wait( + std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this, + std::placeholders::_1)); + } else if (config_.input_stream_mode_) { + rtc_running_ = true; + // create socket + remote_ = asio::ip::udp::endpoint( + asio::ip::address::from_string("127.0.0.1"), config_.port_); + socket_.open(asio::ip::udp::v4()); + socket_.bind(remote_); + receiveStream(); + } else { + rtc_running_ = true; + rtc_timer_.expires_from_now( + config_.production_rate_.getMicrosecondsForPacket( + config_.payload_size_)); + rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, + this, std::placeholders::_1)); + } + } + + for (auto &producer_context : producer_contexts_) { + producer_context.run(); + } + + io_service_.run(); + + return ERROR_SUCCESS; + } + + ServerConfiguration &getConfig() { return config_; } + + private: + // Variables initialized by the constructor. + ServerConfiguration config_; + + // Variable initialized in the in-class initializer list. + asio::io_service io_service_; + asio::signal_set signals_{io_service_}; + asio::steady_timer rtc_timer_{io_service_}; + asio::posix::stream_descriptor input_{io_service_}; + asio::ip::udp::socket socket_{io_service_}; + std::vector<ProducerContext> producer_contexts_; + ::utils::EventThread produce_thread_; + asio::streambuf input_buffer_; + bool rtc_running_{false}; + asio::ip::udp::endpoint remote_; + utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU}; + std::array<uint8_t, HIPERF_MTU> rtc_payload_; +}; + +HIperfServer::HIperfServer(const ServerConfiguration &conf) + : impl_(std::make_unique<Impl>(conf)) {} + +HIperfServer::~HIperfServer() = default; + +int HIperfServer::setup() { return impl_->setup(); } + +void HIperfServer::run() { impl_->run(); } + +} // namespace hiperf diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h new file mode 100644 index 000000000..d7420da48 --- /dev/null +++ b/apps/hiperf/src/server.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <common.h> + +namespace hiperf { + +class HIperfServer { + public: + explicit HIperfServer(const ServerConfiguration &conf); + + ~HIperfServer(); + int setup(); + void run(); + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +} // namespace hiperf
\ No newline at end of file diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index d2d02d0dc..80f671567 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2019 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,19 +11,37 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -set(CMAKE_CXX_STANDARD 14) - -set(CMAKE_MODULE_PATH - ${CMAKE_MODULE_PATH} - "${CMAKE_SOURCE_DIR}/cmake/Modules/" +############################################################## +# Compiler options +############################################################## +set(COMPILER_OPTIONS + PRIVATE ${DEFAULT_COMPILER_OPTIONS} ) -if (NOT CMAKE_BUILD_TYPE) - message(STATUS "${PROJECT_NAME}: No build type selected, default to Release") - set(CMAKE_BUILD_TYPE "Release") -endif () +# -Wno-c99-designator issue +# +# Unsure about version for which this was introduced +# clang version 9.0.8 (no flag), 11.0.5 (ndk22, flag) +if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + EXECUTE_PROCESS( COMMAND ${CMAKE_CXX_COMPILER} --version OUTPUT_VARIABLE clang_full_version_string ) + string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION_STRING ${clang_full_version_string}) + if (CLANG_VERSION_STRING VERSION_GREATER_EQUAL 11) + list(APPEND COMPILER_OPTIONS + "-Wno-c99-designator" + ) + endif() +endif() + + +############################################################## +# Includes subdirectory +############################################################## +add_subdirectory(includes/hicn/http-proxy) + +############################################################## +# Source files +############################################################## set(LIB_SOURCE_FILES src/http_session.cc src/http_proxy.cc @@ -36,33 +54,50 @@ set(APP_SOURCE_FILES main.cc ) -add_subdirectory(includes/hicn/http-proxy) -set(LIBHTTP_PROXY hicnhttpproxy) -set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) - -list(APPEND COMPILER_DEFINITIONS - -DWITH_POLICY +############################################################## +# Libraries to link +############################################################## +list(APPEND HTTP_PROXY_LIBRARIES + PUBLIC ${LIBTRANSPORT_LIBRARIES} + PUBLIC ${LIBHICNCTRL_LIBRARIES} + PUBLIC ${LIBHICN_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} ) + +############################################################## +# Build http proxy library +############################################################## build_library(${LIBHTTP_PROXY} STATIC SOURCES ${LIB_SOURCE_FILES} - LINK_LIBRARIES ${LIBRARIES} + LINK_LIBRARIES ${HTTP_PROXY_LIBRARIES} + INCLUDE_DIRS + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} DEPENDS ${DEPENDENCIES} INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS} + INCLUDE_DIRS + PUBLIC + $<BUILD_INTERFACE:${LIBPROXY_INCLUDE_DIRS}> + $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}> COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} - DEFINITIONS ${COMPILER_DEFINITIONS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) + +############################################################## +# Build http proxy executable +############################################################## if (NOT DISABLE_EXECUTABLES) build_executable(${HTTP_PROXY} SOURCES ${APP_SOURCE_FILES} LINK_LIBRARIES ${LIBHTTP_PROXY_STATIC} - DEPENDS ${LIBHTTP_PROXY_STATIC} + INCLUDE_DIRS + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${LIBHTTP_PROXY_STATIC} ${THIRD_PARTY_DEPENDENCIES} COMPONENT ${HICN_APPS} - DEFINITIONS ${COMPILER_DEFINITIONS} LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) endif () diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt index 75cbbd64b..18f5704d1 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,8 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h @@ -26,7 +24,7 @@ list(APPEND HEADER_FILES set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) set(LIBPROXY_INCLUDE_DIRS - ${CMAKE_CURRENT_SOURCE_DIR}/../.. "" + ${CMAKE_CURRENT_SOURCE_DIR}/../.. CACHE INTERNAL "" FORCE ) @@ -36,4 +34,3 @@ set(LIBPROXY_TO_INSTALL_HEADER_FILES CACHE INTERNAL "" FORCE ) - diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h index 19c96a9e3..14e0068ed 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,9 +15,9 @@ #pragma once +#include <hicn/apps/utils/logger.h> #include <hicn/transport/portability/c_portability.h> #include <hicn/transport/utils/branch_prediction.h> -#include <hicn/transport/utils/log.h> #include <hicn/transport/utils/string_utils.h> #include <asio.hpp> @@ -27,7 +27,6 @@ #include "forwarder_interface.h" - #define RETRY_INTERVAL 300 namespace transport { @@ -61,14 +60,14 @@ class ForwarderConfig { doTryToConnectToForwarder(std::make_error_code(std::errc(0))); } - void doTryToConnectToForwarder(std::error_code ec) { + void doTryToConnectToForwarder(const std::error_code& ec) { if (!ec) { // ec == 0 --> timer expired int ret = forwarder_interface_.connectToForwarder(); if (ret < 0) { // We were not able to connect to the local forwarder. Do not give up // and retry. - TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + LoggerErr() << "Could not connect to local forwarder. Retrying."; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, @@ -79,20 +78,20 @@ class ForwarderConfig { doGetMainListener(std::make_error_code(std::errc(0))); } } else { - TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); + LoggerErr() << "Timer for re-trying forwarder connection canceled."; } } - void doGetMainListener(std::error_code ec) { + void doGetMainListener(const std::error_code& ec) { if (!ec) { // ec == 0 --> timer expired int ret = forwarder_interface_.getMainListenerPort(); if (ret <= 0) { // Since without the main listener of the forwarder the proxy cannot // work, we can stop the program here until we get the listener port. - TRANSPORT_LOGE( - "Could not retrieve main listener port from the forwarder. " - "Retrying."); + LoggerErr() + << "Could not retrieve main listener port from the forwarder. " + "Retrying."; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, @@ -104,7 +103,7 @@ class ForwarderConfig { listener_retrieved_callback_(std::make_error_code(std::errc(0))); } } else { - TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + LoggerErr() << "Timer for retrieving main hicn listener canceled."; } } @@ -112,7 +111,7 @@ class ForwarderConfig { TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, Callback&& callback) { std::stringstream ss(header); - route_info_t* ret = new route_info_t(); + RouteInfoPtr ret = std::make_shared<route_info_t>(); std::string port_string; while (ss.good()) { @@ -172,9 +171,9 @@ class ForwarderConfig { ret->family = AF_INET; std::string _prefix = ret->route_addr; forwarder_interface_.createFaceAndRoute( - RouteInfoPtr(ret), [callback = std::forward<Callback>(callback), - configured_prefix = std::move(_prefix)]( - uint32_t route_id, bool result) { + std::move(ret), [callback = std::forward<Callback>(callback), + configured_prefix = std::move(_prefix)]( + uint32_t route_id, bool result) { callback(result, configured_prefix); }); diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h index 54941a4ba..ae9562a12 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -23,8 +23,15 @@ extern "C" { #ifndef ASIO_STANDALONE #define ASIO_STANDALONE 1 #endif -#include <asio.hpp> -#include <asio/steady_timer.hpp> + +#ifdef __APPLE__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wshorten-64-to-32" +#endif +#include <hicn/transport/core/asio_wrapper.h> +#ifdef __APPLE__ +#pragma clang diagnostic pop +#endif #include <functional> #include <thread> #include <unordered_map> @@ -45,16 +52,11 @@ using RouteInfoPtr = std::shared_ptr<route_info_t>; class ForwarderInterface { public: - ForwarderInterface(asio::io_service &io_service) + explicit ForwarderInterface(asio::io_service &io_service) : external_ioservice_(io_service), work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), - sock_(nullptr), thread_(std::make_unique<std::thread>( - [this]() { internal_ioservice_.run(); })), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - route_id_(0), - closed_(false) {} + [this]() { internal_ioservice_.run(); })) {} ~ForwarderInterface(); @@ -88,20 +90,20 @@ class ForwarderInterface { void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, asio::steady_timer *timer, - SetRouteCallback callback); + const SetRouteCallback &callback); - int tryToCreateFaceAndRoute(route_info_t *route_info); + int tryToCreateFaceAndRoute(const route_info_t *route_info); asio::io_service &external_ioservice_; asio::io_service internal_ioservice_; std::unique_ptr<asio::io_service::work> work_; - hc_sock_t *sock_; + hc_sock_t *sock_ = nullptr; std::unique_ptr<std::thread> thread_; std::unordered_map<uint32_t, RouteInfoPtr> route_status_; - std::unique_ptr<asio::steady_timer> check_routes_timer_; - uint32_t pending_add_route_counter_; - uint32_t route_id_; - bool closed_; + std::unique_ptr<asio::steady_timer> check_routes_timer_ = nullptr; + uint32_t pending_add_route_counter_ = 0; + uint32_t route_id_ = 0; + bool closed_ = false; }; } // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h index 7c035c83b..87a5c7d4e 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h index a4139a620..567ab4540 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #pragma once +#include <hicn/apps/utils/logger.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/utils/event_thread.h> @@ -22,7 +23,9 @@ #include "http_session.h" #include "icn_receiver.h" +#ifndef ASIO_STANDALONE #define ASIO_STANDALONE +#endif #include <asio.hpp> #include <asio/version.hpp> #include <unordered_set> @@ -31,7 +34,8 @@ class TcpListener { public: using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>; - TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + TcpListener(asio::io_service& io_service, short port, + const AcceptCallback& callback) : acceptor_(io_service), #if ((ASIO_VERSION / 100 % 1000) < 12) socket_(io_service), @@ -46,13 +50,12 @@ class TcpListener { acceptor_.listen(); } - public: void doAccept() { #if ((ASIO_VERSION / 100 % 1000) >= 12) acceptor_.async_accept( - [this](std::error_code ec, asio::ip::tcp::socket socket) { + [this](const std::error_code& ec, asio::ip::tcp::socket socket) { #else - acceptor_.async_accept(socket_, [this](std::error_code ec) { + acceptor_.async_accept(socket_, [this](const std::error_code& ec) { auto socket = std::move(socket_); #endif if (!ec) { @@ -77,7 +80,7 @@ class HTTPClientConnectionCallback; class Receiver { public: - Receiver() : thread_() {} + Receiver() {} virtual ~Receiver() = default; void stopAndJoinThread() { thread_.stop(); } virtual void stop() = 0; @@ -112,13 +115,13 @@ class TcpReceiver : public Receiver { std::deque<HTTPClientConnectionCallback*> http_clients_; std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; ForwarderConfig forwarder_config_; - bool stopped_; + bool stopped_ = false; }; class IcnReceiver : public Receiver { public: template <typename... Args> - IcnReceiver(Args&&... args) + explicit IcnReceiver(Args&&... args) : Receiver(), icn_consum_producer_(thread_.getIoService(), std::forward<Args>(args)...) { @@ -145,20 +148,18 @@ class HTTPProxy { std::string prefix; std::string first_ipv6_word; - virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + virtual ~CommonParams() {} + virtual void printParams() { LoggerInfo() << "Parameters: "; }; }; struct ClientParams : virtual CommonParams { short tcp_listen_port; void printParams() override { - std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + LoggerInfo() << "Running HTTP/TCP -> HTTP/hICN proxy."; CommonParams::printParams(); - std::cout << "\t" - << "HTTP listen port: " << tcp_listen_port << std::endl; - std::cout << "\t" - << "Consumer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; + LoggerInfo() << "\tHTTP listen port: " << tcp_listen_port; + LoggerInfo() << "\tConsumer Prefix: " << prefix; + LoggerInfo() << "\tPrefix first word: " << first_ipv6_word; } }; @@ -171,25 +172,24 @@ class HTTPProxy { bool manifest; void printParams() override { - std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + LoggerInfo() << "Running HTTP/hICN -> HTTP/TCP proxy."; CommonParams::printParams(); - std::cout << "\t" - << "Origin address: " << origin_address << std::endl; - std::cout << "\t" - << "Origin port: " << origin_port << std::endl; - std::cout << "\t" - << "Producer cache size: " << cache_size << std::endl; - std::cout << "\t" - << "hICN MTU: " << mtu << std::endl; - std::cout << "\t" - << "Default content lifetime: " << content_lifetime - << std::endl; - std::cout << "\t" - << "Producer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; - std::cout << "\t" - << "Use manifest: " << manifest << std::endl; + LoggerInfo() << "\t" + << "Origin address: " << origin_address; + LoggerInfo() << "\t" + << "Origin port: " << origin_port; + LoggerInfo() << "\t" + << "Producer cache size: " << cache_size; + LoggerInfo() << "\t" + << "hICN MTU: " << mtu; + LoggerInfo() << "\t" + << "Default content lifetime: " << content_lifetime; + LoggerInfo() << "\t" + << "Producer Prefix: " << prefix; + LoggerInfo() << "\t" + << "Prefix first word: " << first_ipv6_word; + LoggerInfo() << "\t" + << "Use manifest: " << manifest; } }; @@ -207,4 +207,4 @@ class HTTPProxy { asio::signal_set signals_; }; -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_session.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h index f4a3dbdee..1b7df96a6 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/http_session.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -17,13 +17,19 @@ #include <hicn/transport/core/packet.h> -#include "http_1x_message_fast_parser.h" - -#define ASIO_STANDALONE -#include <asio.hpp> +#ifdef __APPLE__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wshorten-64-to-32" +#endif +#include <hicn/transport/core/asio_wrapper.h> +#ifdef __APPLE__ +#pragma clang diagnostic pop +#endif #include <deque> #include <functional> +#include "http_1x_message_fast_parser.h" + namespace transport { using asio::ip::tcp; @@ -67,13 +73,16 @@ class HTTPSession { }; public: - HTTPSession(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = false); + HTTPSession(asio::io_service &io_service, const std::string &ip_address, + const std::string &port, + const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_reconnect_callback, + bool client = false); HTTPSession(asio::ip::tcp::socket socket, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = true); + const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_reconnect_callback, + bool client = true); ~HTTPSession(); @@ -97,8 +106,7 @@ class HTTPSession { bool checkConnected(); - private: - void handleRead(std::error_code ec, std::size_t length); + void handleRead(const std::error_code &ec, std::size_t length); void tryReconnection(); void startConnectionTimer(); void handleDeadline(const std::error_code &ec); @@ -114,14 +122,14 @@ class HTTPSession { asio::streambuf input_buffer_; bool reverse_; - bool is_reconnection_; - bool data_available_; + bool is_reconnection_ = false; + bool data_available_ = false; - std::size_t content_length_; + std::size_t content_length_ = 0; // Chunked encoding - bool is_last_chunk_; - bool chunked_; + bool is_last_chunk_ = false; + bool chunked_ = false; ContentReceivedCallback receive_callback_; OnConnectionClosed on_connection_closed_callback_; diff --git a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h index 780037665..e3ab97fdd 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h +++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,20 +13,18 @@ * limitations under the License. */ +#include <hicn/http-proxy/http_session.h> +#include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/interfaces/publication_options.h> #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/spinlock.h> -#include <asio.hpp> #include <cassert> #include <cstring> #include <queue> #include <utility> -#include <hicn/http-proxy/http_session.h> -//#include "http_session.h" - namespace transport { class AsyncConsumerProducer { @@ -49,9 +47,7 @@ class AsyncConsumerProducer { const std::string& content_lifetime, bool manifest) : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, origin_address, origin_port, cache_size, mtu, - content_lifetime, manifest) { - external_io_service_ = false; - } + content_lifetime, manifest) {} void run(); @@ -73,7 +69,7 @@ class AsyncConsumerProducer { core::Prefix prefix_; asio::io_service& io_service_; asio::io_service internal_io_service_; - bool external_io_service_; + bool external_io_service_ = false; interface::ProducerSocket producer_socket_; std::string ip_address_; @@ -81,9 +77,8 @@ class AsyncConsumerProducer { uint32_t cache_size_; uint32_t mtu_; - uint64_t request_counter_; + uint64_t request_counter_ = 0; - // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> // connection_map_; HTTPSession connector_; diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h index d87c796d0..9865d8e4c 100644 --- a/apps/http-proxy/includes/hicn/http-proxy/utils.h +++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -35,17 +35,16 @@ TRANSPORT_ALWAYS_INLINE std::string generatePrefix( str += pos; uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + const uint16_t* word = (const uint16_t*)&locator_hash; std::stringstream stream; stream << first_ipv6_word << ":0"; - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; + for (std::size_t i = 0; i < sizeof(locator_hash) / 2; i++) { + stream << ":" << std::hex << word[i]; } stream << "::"; return stream.str(); -}
\ No newline at end of file +} diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index d9c29ecde..832ec23e6 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,32 +13,31 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_proxy.h> using namespace transport; -int usage(char* program) { - std::cerr << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n" - << "Server or Client: \n" - << " -P [FIRST_IPv6_WORD_HEX]\n" - << " -t [number of threads]\n" - << "Client Options: \n" - << " -L [PROXY_LISTEN_PORT]\n" - << "Server Options: \n" - << " -a [ORIGIN_IP_ADDRESS]\n" - << " -p [ORIGIN_PORT]\n" - << " -c [CACHE_SIZE]\n" - << " -m [MTU]" - << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n" - << " -M (enable manifest)\n" - << std::endl - << "Example Server:\n" - << " " << program - << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 " - "http://httpserver\n" - << "Example Client:\n" - << " " << program << " -C -L 9091 http://httpserver\n" - << std::endl; +int usage(const char* program) { + LoggerInfo() << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n" + << "Server or Client: \n" + << " -P [FIRST_IPv6_WORD_HEX]\n" + << " -t [number of threads]\n" + << "Client Options: \n" + << " -L [PROXY_LISTEN_PORT]\n" + << "Server Options: \n" + << " -a [ORIGIN_IP_ADDRESS]\n" + << " -p [ORIGIN_PORT]\n" + << " -c [CACHE_SIZE]\n" + << " -m [MTU]" + << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n" + << " -M (enable manifest)\n"; + LoggerInfo() << "Example Server:\n" + << " " << program + << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 " + "http://httpserver\n" + << "Example Client:\n" + << " " << program << " -C -L 9091 http://httpserver\n"; return -1; } @@ -53,8 +52,7 @@ struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { "Proxy configured as client and server at the same time."); } - std::cout << "\t" - << "N Threads: " << n_thread << std::endl; + LoggerInfo() << "\tN Threads: " << n_thread; } HTTPProxy* instantiateProxyAsValue() { @@ -93,18 +91,16 @@ int main(int argc, char** argv) { switch (opt) { case 'C': if (params.server) { - std::cerr << "Cannot be both client and server (both -C anc -S " - "options specified.)." - << std::endl; + LoggerErr() << "Cannot be both client and server (both -C anc -S " + "options specified.)."; return usage(argv[0]); } params.client = true; break; case 'S': if (params.client) { - std::cerr << "Cannot be both client and server (both -C anc -S " - "options specified.)." - << std::endl; + LoggerErr() << "Cannot be both client and server (both -C anc -S " + "options specified.)."; return usage(argv[0]); } params.server = true; @@ -136,15 +132,13 @@ int main(int argc, char** argv) { case 't': params.n_thread = std::stoul(optarg); break; - case 'h': default: return usage(argv[0]); - break; } } if (argv[optind] == 0) { - std::cerr << "Using default prefix " << params.prefix << std::endl; + LoggerInfo() << "Using default prefix " << params.prefix; } else { params.prefix = argv[optind]; } @@ -155,4 +149,4 @@ int main(int argc, char** argv) { delete proxy; return 0; -}
\ No newline at end of file +} diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index d80939b8b..717679e09 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,10 +13,9 @@ * limitations under the License. */ -#include <hicn/http-proxy/forwarder_interface.h> - #include <arpa/inet.h> -#include <hicn/transport/utils/log.h> +#include <hicn/apps/utils/logger.h> +#include <hicn/http-proxy/forwarder_interface.h> #include <chrono> #include <iostream> @@ -28,7 +27,7 @@ namespace transport { ForwarderInterface::~ForwarderInterface() {} int ForwarderInterface::connectToForwarder() { - sock_ = hc_sock_create(); + sock_ = hc_sock_create(FORWARDER_TYPE_HICNLIGHT, NULL); if (!sock_) return -1; if (hc_sock_connect(sock_) < 0) { @@ -97,7 +96,8 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { std::vector<hc_route_t *> routes_to_remove; foreach_route(r, data) { char remote_addr[INET6_ADDRSTRLEN]; - int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + int ret = + hicn_ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); if (ret < 0) continue; std::string route_addr(remote_addr); @@ -120,7 +120,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < routes_to_remove.size(); i++) { connids_to_remove.insert(routes_to_remove[i]->face_id); if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { - TRANSPORT_LOGE("Error removing route from forwarder."); + LoggerErr() << "Error removing route from forwarder."; } } @@ -147,24 +147,23 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < conns_to_remove.size(); i++) { if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { - TRANSPORT_LOGE("Error removing connection from forwarder."); + LoggerErr() << "Error removing connection from forwarder."; } } hc_data_free(data); } -void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, - uint8_t max_try, - asio::steady_timer *timer, - SetRouteCallback callback) { +void ForwarderInterface::internalCreateFaceAndRoute( + RouteInfoPtr route_info, uint8_t max_try, asio::steady_timer *timer, + const SetRouteCallback &callback) { int ret = tryToCreateFaceAndRoute(route_info.get()); if (ret < 0 && max_try > 0) { max_try--; timer->expires_from_now(std::chrono::milliseconds(500)); timer->async_wait([this, _route_info = std::move(route_info), max_try, - timer, callback](std::error_code ec) { + timer, callback](const std::error_code &ec) { if (ec) return; internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, std::move(callback)); @@ -186,7 +185,8 @@ void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, delete timer; } -int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { +int ForwarderInterface::tryToCreateFaceAndRoute( + const route_info_t *route_info) { if (!sock_) return -1; hc_data_t *data; @@ -202,8 +202,9 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { if (interface.compare("lo") != 0) { found = true; - ip_address_t remote_ip; - if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) { + hicn_ip_address_t remote_ip; + if (hicn_ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < + 0) { hc_data_free(data); return -1; } @@ -211,14 +212,14 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { hc_face_t face; memset(&face, 0, sizeof(hc_face_t)); - face.face.type = FACE_TYPE_UDP; - face.face.family = route_info->family; - face.face.local_addr = l->local_addr; - face.face.remote_addr = remote_ip; - face.face.local_port = l->local_port; - face.face.remote_port = route_info->remote_port; + face.type = FACE_TYPE_UDP; + face.family = route_info->family; + face.local_addr = l->local_addr; + face.remote_addr = remote_ip; + face.local_port = l->local_port; + face.remote_port = route_info->remote_port; - if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) { + if (netdevice_set_name(&face.netdevice, l->interface_name) < 0) { hc_data_free(data); return -1; } @@ -238,10 +239,10 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { return -1; } - ip_address_t route_ip; + hicn_ip_address_t route_ip; hc_route_t route; - if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + if (hicn_ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { hc_data_free(data); return -1; } diff --git a/apps/http-proxy/src/http_1x_message_fast_parser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc index 4b6b78d55..ffae2368b 100644 --- a/apps/http-proxy/src/http_1x_message_fast_parser.cc +++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -99,7 +99,8 @@ bool HTTPMessageFastParser::isMpdRequest(const uint8_t *headers, return false; } -uint32_t HTTPMessageFastParser::parseCacheControl(const uint8_t *headers, - std::size_t length) { +uint32_t HTTPMessageFastParser::parseCacheControl( + [[maybe_unused]] const uint8_t *headers, + [[maybe_unused]] std::size_t length) { return 0; } diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index c252afe88..7419c7f7f 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,11 +13,11 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_proxy.h> #include <hicn/http-proxy/http_session.h> #include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> -#include <hicn/transport/utils/log.h> #include <hicn/transport/utils/string_utils.h> namespace transport { @@ -29,16 +29,15 @@ using interface::ConsumerInterestCallback; using interface::ConsumerSocket; using interface::TransportProtocolAlgorithms; -class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { +class HTTPClientConnectionCallback + : public interface::ConsumerSocket::ReadCallback { public: HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, utils::EventThread& thread) : tcp_receiver_(tcp_receiver), thread_(thread), prefix_hash_(tcp_receiver_.prefix_hash_), - consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), - session_(nullptr), - current_size_(0) { + consumer_(TransportProtocolAlgorithms::RAAQM, thread_) { consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); consumer_.setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, @@ -62,15 +61,15 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), - [this](asio::ip::tcp::socket& socket) -> bool { + [this](const asio::ip::tcp::socket& socket) { try { std::string remote_address = socket.remote_endpoint().address().to_string(); std::uint16_t remote_port = socket.remote_endpoint().port(); - TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), - remote_port); - } catch (std::system_error& e) { - // Do nothing + LoggerInfo() << "Client " << remote_address << ":" << remote_port + << "disconnected."; + } catch (asio::system_error& e) { + LoggerInfo() << "Client disconnected."; } consumer_.stop(); @@ -85,7 +84,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { private: void consumeNextRequest() { if (request_buffer_queue_.size() == 0) { - TRANSPORT_LOGD("No additional requests to process."); + // No additional requests to process return; } @@ -136,24 +135,19 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { current_size_ += size; if (is_last) { - TRANSPORT_LOGD("Request received: %s", - std::string((const char*)tmp_buffer_.first->data(), - tmp_buffer_.first->length()) - .c_str()); if (current_size_ < 1400) { request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); } else { - TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", - current_size_); + LoggerErr() << "Ignoring client request due to size (" << current_size_ + << ") > 1400."; session_->close(); current_size_ = 0; return; } if (!consumer_.isRunning()) { - TRANSPORT_LOGD( - "Consumer stopped, triggering consume from TCP session " - "handler.."); + LoggerInfo() << "Consumer stopped, triggering consume from TCP session " + "handler.."; consumeNextRequest(); } @@ -163,15 +157,16 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { // hicn callbacks - void processLeavingInterest(interface::ConsumerSocket& c, - const core::Interest& interest) { + void processLeavingInterest( + [[maybe_unused]] const interface::ConsumerSocket& c, + const core::Interest& interest) { if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) { Interest& int2 = const_cast<Interest&>(interest); int2.appendPayload(request_buffer_queue_.front().first->clone()); } } - void processInterestRetx(interface::ConsumerSocket& c, + void processInterestRetx([[maybe_unused]] const interface::ConsumerSocket& c, const core::Interest& interest) { if (interest.payloadSize() == 0) { Interest& int2 = const_cast<Interest&>(interest); @@ -180,19 +175,22 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { } bool isBufferMovable() noexcept { return true; } - void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} - void readDataAvailable(size_t length) noexcept {} + void getReadBuffer(uint8_t** application_buffer, + size_t* max_length) { /*nothing to do*/ + } + void readDataAvailable(size_t length) noexcept { /*nothing to do*/ + } size_t maxBufferSize() const { return 64 * 1024; } void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { // Response received. Send it back to client auto _buffer = buffer.release(); - TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); - session_->send(_buffer, []() {}); + // TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + session_->send(_buffer, []() { /*nothing to do*/ }); } - void readError(const std::error_code ec) noexcept { - TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + void readError(const std::error_code& ec) noexcept { + LoggerErr() << "Error reading from hicn consumer socket. Closing session."; session_->close(); } @@ -209,23 +207,22 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { * Let's grant it! */ if (metadata->method == "OPTIONS") { - session_->send( - (const uint8_t*)HTTPMessageFastParser::http_cors, - std::strlen(HTTPMessageFastParser::http_cors), [this]() { - auto& socket = session_->socket_; - TRANSPORT_LOGI( - "Sent OPTIONS to client %s:%d", - socket.remote_endpoint().address().to_string().c_str(), - socket.remote_endpoint().port()); - }); + session_->send((const uint8_t*)HTTPMessageFastParser::http_cors, + std::strlen(HTTPMessageFastParser::http_cors), [this]() { + auto& socket = session_->socket_; + LoggerInfo() << "Sent OPTIONS to client " + << socket.remote_endpoint().address() + << ":" << socket.remote_endpoint().port(); + }); } } else { tcp_receiver_.parseHicnHeader( - it->second, [this](bool result, std::string configured_prefix) { + it->second, + [this](bool result, + [[maybe_unused]] const std::string& configured_prefix) { const char* reply = nullptr; if (result) { reply = HTTPMessageFastParser::http_ok; - prefix_hash_ = configured_prefix; } else { reply = HTTPMessageFastParser::http_failed; } @@ -234,25 +231,23 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { session_->send( (const uint8_t*)reply, std::strlen(reply), [this, result]() { auto& socket = session_->socket_; - TRANSPORT_LOGI( - "Sent %d response to client %s:%d", result, - socket.remote_endpoint().address().to_string().c_str(), - socket.remote_endpoint().port()); + LoggerInfo() << "Sent " << result << " response to client " + << socket.remote_endpoint().address() << ":" + << socket.remote_endpoint().port(); }); }); } } - private: TcpReceiver& tcp_receiver_; utils::EventThread& thread_; std::string& prefix_hash_; ConsumerSocket consumer_; - std::unique_ptr<HTTPSession> session_; + std::unique_ptr<HTTPSession> session_ = nullptr; std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>> request_buffer_queue_; std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_; - std::size_t current_size_; + std::size_t current_size_ = 0; }; TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, @@ -265,8 +260,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, ipv6_first_word_(ipv6_first_word), prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)), forwarder_config_( - thread_.getIoService(), - [this](std::error_code ec) { + thread_.getIoService(), [this](const std::error_code& ec) { if (!ec) { listener_.doAccept(); for (int i = 0; i < 10; i++) { @@ -274,8 +268,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, new HTTPClientConnectionCallback(*this, thread_)); } } - }), - stopped_(false) { + }) { forwarder_config_.tryToConnectToForwarder(); } @@ -314,7 +307,6 @@ void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { if (http_clients_.size() == 0) { // Create new HTTPClientConnectionCallback - TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); http_clients_.emplace_back( new HTTPClientConnectionCallback(*this, thread_)); } @@ -333,7 +325,8 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { void HTTPProxy::setupSignalHandler() { signals_.async_wait([this](const std::error_code& ec, int signal_number) { if (!ec) { - TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + LoggerInfo() << "Received signal " << signal_number + << ". Stopping gracefully."; stop(); } }); @@ -354,7 +347,6 @@ void HTTPProxy::stop() { HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { - // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); receivers_.emplace_back(std::make_unique<TcpReceiver>( params.tcp_listen_port, params.prefix, params.first_ipv6_word)); } diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 6b91c12c3..06a81dc27 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,30 +13,24 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> -#include <hicn/transport/utils/log.h> #include <iostream> namespace transport { -HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, - std::string &port, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_connection_closed_callback, - bool client) +HTTPSession::HTTPSession( + asio::io_service &io_service, const std::string &ip_address, + const std::string &port, const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_connection_closed_callback, bool client) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), reverse_(client), - is_reconnection_(false), - data_available_(false), - content_length_(0), - is_last_chunk_(false), - chunked_(false), receive_callback_(receive_callback), on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); @@ -51,10 +45,10 @@ HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, doConnect(); } -HTTPSession::HTTPSession(asio::ip::tcp::socket socket, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_connection_closed_callback, - bool client) +HTTPSession::HTTPSession( + asio::ip::tcp::socket socket, + const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_connection_closed_callback, bool client) : #if ((ASIO_VERSION / 100 % 1000) < 12) io_service_(socket.get_io_service()), @@ -92,7 +86,7 @@ void HTTPSession::send(const uint8_t *packet, std::size_t len, io_service_.dispatch([this, packet, len, content_sent]() { asio::async_write(socket_, asio::buffer(packet, len), [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { + const std::error_code &ec, std::size_t /*length*/) { if (!ec) { content_sent(); } @@ -111,7 +105,6 @@ void HTTPSession::send(utils::MemBuf *buffer, doWrite(); } } else { - TRANSPORT_LOGD("Tell the handle connect it has data to write"); data_available_ = true; } }); @@ -121,9 +114,7 @@ void HTTPSession::close() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); - // on_disconnect_callback_(); } } } @@ -131,29 +122,26 @@ void HTTPSession::close() { void HTTPSession::doWrite() { auto &buffer = write_msgs_.front().first; - asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent! %zu", - length); - write_msgs_.front().second(); - write_msgs_.pop_front(); - if (!write_msgs_.empty()) { - doWrite(); - } - } else { - TRANSPORT_LOGD("Content NOT sent!"); - } - }); + asio::async_write( + socket_, asio::buffer(buffer->data(), buffer->length()), + [this](const std::error_code &ec, [[maybe_unused]] std::size_t length) { + if (TRANSPORT_EXPECT_FALSE(!ec)) { + write_msgs_.front().second(); + write_msgs_.pop_front(); + if (!write_msgs_.empty()) { + doWrite(); + } + } + }); } // namespace transport -void HTTPSession::handleRead(std::error_code ec, std::size_t length) { +void HTTPSession::handleRead(const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) - : !content_length_; + bool check = is_last_chunk_ ? !content_length_ : false; + bool is_last = chunked_ ? check : !content_length_; receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr); input_buffer_.consume(input_buffer_.size()); @@ -212,7 +200,7 @@ void HTTPSession::doReadBody(std::size_t body_size, void HTTPSession::doReadChunkedHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n", - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); @@ -231,7 +219,7 @@ void HTTPSession::doReadChunkedHeader() { void HTTPSession::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); @@ -274,12 +262,11 @@ void HTTPSession::doReadHeader() { void HTTPSession::tryReconnection() { if (on_connection_closed_callback_(socket_)) { if (state_ == ConnectorState::CONNECTED) { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + LoggerErr() << "Connection lost. Trying to reconnect..."; state_ = ConnectorState::CONNECTING; is_reconnection_ = true; io_service_.post([this]() { if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); } startConnectionTimer(); @@ -290,35 +277,33 @@ void HTTPSession::tryReconnection() { } void HTTPSession::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - state_ = ConnectorState::CONNECTED; - - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); + asio::async_connect( + socket_, endpoint_iterator_, + [this](const std::error_code &ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; - // on_reconnect_callback_(); + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); - doReadHeader(); + doReadHeader(); - if (data_available_ && !write_msgs_.empty()) { - data_available_ = false; - doWrite(); - } + if (data_available_ && !write_msgs_.empty()) { + data_available_ = false; + doWrite(); + } - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGD("Connection recovered!"); - } + if (is_reconnection_) { + is_reconnection_ = false; + LoggerInfo() << "Connection recovered!"; + } - } else { - TRANSPORT_LOGE("Impossible to reconnect: %s", - ec.message().c_str()); - close(); - } - }); + } else { + LoggerErr() << "Impossible to reconnect: " << ec.message(); + close(); + } + }); } bool HTTPSession::checkConnected() { @@ -335,7 +320,7 @@ void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); - TRANSPORT_LOGE("Error connecting. Is the server running?\n"); + LoggerErr() << "Error connecting. Is the server running?"; io_service_.stop(); }); } diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 8823907dc..c8904aa95 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,13 +13,13 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_1x_message_fast_parser.h> #include <hicn/http-proxy/icn_receiver.h> #include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> #include <hicn/transport/utils/hash.h> -#include <hicn/transport/utils/log.h> #include <functional> #include <memory> @@ -33,18 +33,15 @@ AsyncConsumerProducer::AsyncConsumerProducer( const std::string& mtu, const std::string& content_lifetime, bool manifest) : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), io_service_(io_service), - external_io_service_(true), - producer_socket_(), ip_address_(origin_address), port_(origin_port), - cache_size_(std::stoul(cache_size)), - mtu_(std::stoul(mtu)), - request_counter_(0), + cache_size_((uint32_t)std::stoul(cache_size)), + mtu_((uint32_t)std::stoul(mtu)), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - [this](asio::ip::tcp::socket& socket) -> bool { + [this]([[maybe_unused]] const asio::ip::tcp::socket& socket) { std::queue<interface::PublicationOptions> empty; std::swap(response_name_queue_, empty); @@ -55,28 +52,28 @@ AsyncConsumerProducer::AsyncConsumerProducer( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: output buffer size has not been set."); + LoggerWarn() << "Warning: output buffer size has not been set."; } ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); + interface::GeneralTransportOptions::MANIFEST_MAX_CAPACITY, manifest); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: impossible to enable signatures."); + LoggerWarn() << "Warning: impossible to enable signatures."; } ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: mtu has not been set."); + LoggerWarn() << "Warning: mtu has not been set."; } producer_socket_.registerPrefix(prefix_); } void AsyncConsumerProducer::start() { - TRANSPORT_LOGD("Starting listening"); + LoggerInfo() << "Starting listening"; doReceive(); } @@ -90,8 +87,8 @@ void AsyncConsumerProducer::run() { void AsyncConsumerProducer::stop() { io_service_.post([this]() { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); + LoggerInfo() << "Number of requests processed by plugin: " + << request_counter_; producer_socket_.stop(); connector_.close(); }); @@ -100,7 +97,7 @@ void AsyncConsumerProducer::stop() { void AsyncConsumerProducer::doReceive() { producer_socket_.setSocketOption( interface::ProducerCallbacksOptions::CACHE_MISS, - [this](interface::ProducerSocket& producer, + [this]([[maybe_unused]] const interface::ProducerSocket& producer, interface::Interest& interest) { if (interest.payloadSize() > 0) { // Interest may contain http request @@ -112,6 +109,7 @@ void AsyncConsumerProducer::doReceive() { }); producer_socket_.connect(); + producer_socket_.start(); } void AsyncConsumerProducer::manageIncomingInterest( @@ -123,16 +121,14 @@ void AsyncConsumerProducer::manageIncomingInterest( if (_it != _end) { if (_it->second.second) { - TRANSPORT_LOGD( - "Content is in production, interests will be satisfied shortly."); return; } if (seg >= _it->second.first) { - TRANSPORT_LOGD( - "Ignoring interest with name %s for a content object which does not " - "exist. (Request: %u, max: %u)", - name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + // TRANSPORT_LOGD( + // "Ignoring interest with name %s for a content object which does not + // " "exist. (Request: %u, max: %u)", name.toString().c_str(), + // (uint32_t)seg, (uint32_t)_it->second.first); return; } } @@ -150,7 +146,8 @@ void AsyncConsumerProducer::manageIncomingInterest( response_name_queue_.emplace(std::move(name), is_mpd ? 1000 : default_content_lifetime_); - connector_.send(payload, [packet = std::move(packet)]() {}); + connector_.send(payload, + [packet = std::move(packet)]() { /*nothing to do*/ }); } void AsyncConsumerProducer::publishContent(const uint8_t* data, @@ -159,26 +156,25 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, uint32_t start_suffix = 0; if (response_name_queue_.empty()) { - std::cerr << "Aborting due tue empty request queue" << std::endl; + LoggerErr() << "Aborting due tue empty request queue"; abort(); } - interface::PublicationOptions& options = response_name_queue_.front(); + const interface::PublicationOptions& options = response_name_queue_.front(); int ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, options.getLifetime()); if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { - TRANSPORT_LOGD("Warning: content object lifetime has not been set."); + LoggerWarn() << "Warning: content object lifetime has not been set."; } const interface::Name& name = options.getName(); auto it = chunk_number_map_.find(name); if (it == chunk_number_map_.end()) { - std::cerr << "Aborting due to response not found in ResposeInfo map." - << std::endl; + LoggerErr() << "Aborting due to response not found in ResposeInfo map."; abort(); } @@ -189,7 +185,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, } it->second.first += - producer_socket_.produce(name, data, size, is_last, start_suffix); + producer_socket_.produceStream(name, data, size, is_last, start_suffix); if (is_last) { it->second.second = false; diff --git a/apps/ping/.clang-format b/apps/ping/.clang-format new file mode 100644 index 000000000..adc73c6fd --- /dev/null +++ b/apps/ping/.clang-format @@ -0,0 +1,14 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BasedOnStyle: Google diff --git a/apps/ping/CMakeLists.txt b/apps/ping/CMakeLists.txt new file mode 100644 index 000000000..ab3fdf56d --- /dev/null +++ b/apps/ping/CMakeLists.txt @@ -0,0 +1,60 @@ +# Copyright (c) 2021-2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if (NOT DISABLE_EXECUTABLES) +############################################################## +# Compiler options +############################################################## +set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} +) + +############################################################## +# Libraries to link +############################################################## + list (APPEND PING_LIBRARIES + PRIVATE ${LIBHICN_LIBRARIES} + PRIVATE ${LIBTRANSPORT_LIBRARIES} + PRIVATE ${CMAKE_THREAD_LIBS_INIT} + PRIVATE ${WSOCK32_LIBRARY} + PRIVATE ${WS2_32_LIBRARY} + ) + +############################################################## +# Build ping server +############################################################## + build_executable(hicn-ping-server + SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_server.cc + LINK_LIBRARIES ${PING_LIBRARIES} + INCLUDE_DIRS + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} + ) + +############################################################## +# Build ping client +############################################################## + build_executable(hicn-ping-client + SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_client.cc + LINK_LIBRARIES ${PING_LIBRARIES} + INCLUDE_DIRS + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS} + DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES} + COMPONENT ${HICN_APPS} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} + ) +endif ()
\ No newline at end of file diff --git a/apps/ping/src/ping_client.cc b/apps/ping/src/ping_client.cc new file mode 100644 index 000000000..08938734b --- /dev/null +++ b/apps/ping/src/ping_client.cc @@ -0,0 +1,461 @@ +/* + * Copyright (c) 2021-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/apps/utils/logger.h> +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/interfaces/portal.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <hicn/transport/utils/traffic_generator.h> + +#include <asio/signal_set.hpp> +#include <asio/steady_timer.hpp> +#include <chrono> +#include <map> + +static constexpr uint32_t SYN_STATE = 1; + +namespace transport { + +namespace core { + +namespace ping { + +using SendTimeMap = std::map<uint64_t, utils::SteadyTime::TimePoint>; +using Verifier = auth::AsymmetricVerifier; + +class Configuration { + public: + static constexpr char TRAFFIC_GENERATOR_RAND[] = "RANDOM"; + + uint32_t num_int_manifest_suffixes_ = + 0; // Number of suffixes in interest manifest (suffix in the header + // is not included in the count) + uint64_t interestLifetime_ = 500; // ms + uint64_t pingInterval_ = 1000000; // us + uint32_t maxPing_ = 10; // number of interests + uint32_t first_suffix_ = 0; + std::string name_ = "b001::1"; + std::string certificate_; + std::string passphrase_; + std::string traffic_generator_type_; + bool jump_ = false; + uint32_t jump_freq_ = 0; + uint32_t jump_size_ = 0; + hicn_packet_format_t packet_format_ = HICN_PACKET_FORMAT_DEFAULT; + + Configuration() = default; +}; + +class Client : public interface::Portal::TransportCallback { + public: + explicit Client(Configuration *c) + : signals_(io_service_, SIGINT), + config_(c), + timer_(std::make_unique<asio::steady_timer>( + portal_.getThread().getIoService())) { + // Let the main thread to catch SIGINT + signals_.async_wait(std::bind(&Client::afterSignal, this)); + + if (!c->certificate_.empty()) { + verifier_.useCertificate(c->certificate_); + } + + // If interst manifest, sign it + if (c->num_int_manifest_suffixes_ != 0) { + assert(!c->passphrase_.empty()); + signer_ = std::make_unique<auth::SymmetricSigner>( + auth::CryptoSuite::HMAC_SHA256, c->passphrase_); + } + + if (c->traffic_generator_type_ == + std::string(Configuration::TRAFFIC_GENERATOR_RAND)) { + traffic_generator_ = + std::make_unique<RandomTrafficGenerator>(config_->maxPing_); + } else { + traffic_generator_ = std::make_unique<IncrSuffixTrafficGenerator>( + config_->name_, config_->first_suffix_, config_->maxPing_); + } + } + + virtual ~Client() = default; + + void ping() { + LoggerInfo() << "Starting ping..."; + + portal_.getThread().add([this]() { + portal_.connect(); + portal_.registerTransportCallback(this); + doPing(); + }); + + io_service_.run(); + } + + void onInterest(Interest &interest) override { + LoggerInfo() << "Unexpected interest received."; + } + + void onContentObject(Interest &interest, ContentObject &object) override { + uint64_t rtt = 0; + + if (!config_->certificate_.empty()) { + auto t0 = utils::SteadyTime::now(); + if (verifier_.verifyPacket(&object)) { + auto t1 = utils::SteadyTime::now(); + auto dt = utils::SteadyTime::getDurationUs(t0, t1); + LoggerInfo() << "Verification time: " << dt.count(); + LoggerInfo() << "<<< Signature Ok."; + } else { + LoggerErr() << "<<< Signature verification failed!"; + } + } + + if (auto it = send_timestamps_.find(interest.getName().getSuffix()); + it != send_timestamps_.end()) { + rtt = + utils::SteadyTime::getDurationUs(it->second, utils::SteadyTime::now()) + .count(); + send_timestamps_.erase(it); + } + + if (LoggerIsOn(2)) { + LoggerInfo() << "<<< recevied object. "; + LoggerInfo() << "<<< interest name: " << interest.getName().getPrefix() + << " (n_suffixes=" << config_->num_int_manifest_suffixes_ + << ")"; + LoggerInfo() << "<<< object name: " << object.getName() << " path label " + << object.getPathLabel() << " (" + << (object.getPathLabel() >> 24) << ")"; + } else if (LoggerIsOn(1)) { + LoggerInfo() << "<<< received object. "; + LoggerInfo() << "<<< round trip: " << rtt << " [us]"; + LoggerInfo() << "<<< interest name: " << interest.getName().getPrefix(); + + LoggerInfo() << "<<< object name: " << object.getName(); + LoggerInfo() << "<<< content object size: " + << object.payloadSize() + object.headerSize() << " [bytes]"; + } + + if (LoggerIsOn(3)) { + LoggerInfo() << "----- interest dump -----"; + interest.dump(); + LoggerInfo() << "-------------------------"; + LoggerInfo() << "----- object dump -------"; + object.dump(); + LoggerInfo() << "-------------------------"; + } + LoggerVerbose(1) << "\n"; + + received_++; + processed_++; + if (processed_ >= config_->maxPing_) { + afterSignal(); + } + } + + void onTimeout(Interest::Ptr &interest, const Name &name) override { + if (LoggerIsOn(2)) { + LoggerInfo() << "### timeout for " << name; + } else if (LoggerIsOn(1)) { + LoggerInfo() << "### timeout for " << name; + } + + if (LoggerIsOn(3)) { + LoggerInfo() << "----- interest dump -----"; + interest->dump(); + LoggerInfo() << "-------------------------"; + } + LoggerVerbose(1) << "\n"; + + timedout_++; + processed_++; + if (processed_ >= config_->maxPing_) afterSignal(); + } + + void onError(const std::error_code &ec) override { + LoggerErr() << "Aborting ping due to internal error: " << ec.message(); + afterSignal(); + } + + void checkFamily(hicn_packet_format_t format, int family) { + switch (HICN_PACKET_FORMAT_GET(format, 0)) { + case IPPROTO_IP: + if (family != AF_INET) throw std::runtime_error("Bad packet format"); + break; + case IPPROTO_IPV6: + if (family != AF_INET6) throw std::runtime_error("Bad packet format"); + break; + default: + throw std::runtime_error("Bad packet format"); + } + } + + void doPing() { + std::string name = traffic_generator_->getPrefix(); + uint32_t sequence_number = traffic_generator_->getSuffix(); + const Name interest_name(name, sequence_number); + + hicn_packet_format_t format = config_->packet_format_; + + switch (format) { + case HICN_PACKET_FORMAT_NEW: + /* Nothing to do */ + break; + case HICN_PACKET_FORMAT_IPV4_TCP: + case HICN_PACKET_FORMAT_IPV6_TCP: + checkFamily(format, interest_name.getAddressFamily()); + break; + default: + throw std::runtime_error("Bad packet format"); + } + + /* + * Eventually add the AH header if a signer is defined. Raise an error + * if format include the AH header but no signer is defined. + */ + if (HICN_PACKET_FORMAT_IS_AH(format)) { + if (!signer_) throw std::runtime_error("Bad packet format"); + } else { + if (signer_) format = Packet::toAHFormat(format); + } + + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>( + format, signer_ ? signer_->getSignatureFieldSize() : 0); + interest->setName(interest_name); + + interest->setLifetime(uint32_t(config_->interestLifetime_)); + + if (LoggerIsOn(2)) { + LoggerInfo() << ">>> send interest " << interest->getName() + << " suffixes in manifest: " + << config_->num_int_manifest_suffixes_; + } else if (LoggerIsOn(1)) { + LoggerInfo() << ">>> send interest " << interest->getName(); + } + LoggerVerbose(1) << "\n"; + + send_timestamps_[sequence_number] = utils::SteadyTime::now(); + for (uint32_t i = 0; i < config_->num_int_manifest_suffixes_ && + !traffic_generator_->hasFinished(); + i++) { + uint32_t sequence_number = traffic_generator_->getSuffix(); + + interest->appendSuffix(sequence_number); + send_timestamps_[sequence_number] = utils::SteadyTime::now(); + } + + if (LoggerIsOn(3)) { + LoggerInfo() << "----- interest dump -----"; + interest->dump(); + LoggerInfo() << "-------------------------"; + } + + interest->encodeSuffixes(); + if (signer_) signer_->signPacket(interest.get()); + portal_.sendInterest(interest, interest->getLifetime()); + + if (!traffic_generator_->hasFinished()) { + this->timer_->expires_from_now( + std::chrono::microseconds(config_->pingInterval_)); + this->timer_->async_wait([this](const std::error_code e) { + if (!e) { + doPing(); + } + }); + } + } + + void afterSignal() { + LoggerInfo() << "Stopping ping..."; + LoggerInfo() << "Sent: " << traffic_generator_->getSentCount() + << " Received: " << received_ << " Timeouts: " << timedout_; + io_service_.stop(); + } + + void reset() { + timer_.reset(new asio::steady_timer(portal_.getThread().getIoService())); + traffic_generator_->reset(); + last_jump_ = 0; + processed_ = 0; + state_ = SYN_STATE; + received_ = 0; + timedout_ = 0; + } + + private: + SendTimeMap send_timestamps_; + asio::io_service io_service_; + interface::Portal portal_; + asio::signal_set signals_; + Configuration *config_; + std::unique_ptr<asio::steady_timer> timer_; + uint64_t last_jump_ = 0; + uint64_t processed_ = 0; + uint32_t state_ = SYN_STATE; + uint32_t received_ = 0; + uint32_t timedout_ = 0; + Verifier verifier_; + std::unique_ptr<auth::Signer> signer_; + std::unique_ptr<TrafficGenerator> traffic_generator_; +}; + +static std::unordered_map<std::string, hicn_packet_format_t> const + packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP}, + {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP}, + {"new", HICN_PACKET_FORMAT_NEW}}; + +std::string str_tolower(std::string s) { + std::transform(s.begin(), s.end(), s.begin(), + [](unsigned char c) { return std::tolower(c); }); + return s; +} + +void help() { + LoggerInfo() << "usage: hicn-consumer-ping [options]"; + LoggerInfo() << "PING options"; + LoggerInfo() << "-i <val> ping interval in microseconds (default " + "1000000ms)"; + LoggerInfo() + << "-m <val> maximum number of pings to send (default 10)"; + LoggerInfo() << "-a <val> <pass> set the passphrase and the number of " + "suffixes in interest manifest (default 0);"; + LoggerInfo() + << " e.g. '-m 6 -a -2' sends two interest (0 and " + "3) with 2 suffixes each (1,2 and 4,5 respectively)"; + LoggerInfo() << "HICN options"; + LoggerInfo() << "-n <val> hicn name (default b001::1)"; + LoggerInfo() + << "-l <val> interest lifetime in milliseconds (default " + "500ms)"; + LoggerInfo() << "OUTPUT options"; + LoggerInfo() << "-V verbose, prints statistics about the " + "messagges sent and received (default false)"; + LoggerInfo() << "-D dump, dumps sent and received packets " + "(default false)"; + LoggerInfo() << "-q quiet, not prints (default false)"; + LoggerInfo() + << "-z <io_module> IO module to use. Default: hicnlight_module"; + LoggerInfo() << "-F <conf_file> Path to optional configuration file for " + "libtransport"; + LoggerInfo() << "-b <type> Traffic generator type. Use 'RANDOM' for " + "random prefixes and suffixes. Default: sequential suffixes."; + LoggerInfo() + << "-w <packet_format> Packet format (without signature, defaults " + "to IPV6_TCP)"; + LoggerInfo() << "-H prints this message"; +} + +int start(int argc, char *argv[]) { +#ifdef _WIN32 + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + + transport::interface::global_config::GlobalConfigInterface global_conf; + + auto c = std::make_unique<Configuration>(); + int opt; + std::string producer_certificate = ""; + + std::string conf_file; + transport::interface::global_config::IoModuleConfiguration io_config; + io_config.name = "hicnlight_module"; + + while ((opt = getopt(argc, argv, "a:b:i:m:f:n:l:c:z:F:w:H")) != -1) { + switch (opt) { + case 'a': + c->num_int_manifest_suffixes_ = std::stoi(optarg); + c->passphrase_ = argv[optind]; + break; + case 'b': + c->traffic_generator_type_ = optarg; + break; + case 'i': + c->pingInterval_ = std::stoi(optarg); + break; + case 'm': + c->maxPing_ = std::stoi(optarg); + break; + case 'f': + c->first_suffix_ = uint32_t(std::stoul(optarg)); + break; + case 'n': + c->name_ = optarg; + break; + case 'l': + c->interestLifetime_ = std::stoi(optarg); + break; + case 'c': + c->certificate_ = std::string(optarg); + break; + case 'z': + io_config.name = optarg; + break; + case 'F': + conf_file = optarg; + break; + case 'w': { + std::string packet_format_s = std::string(optarg); + packet_format_s = str_tolower(packet_format_s); + auto it = packet_format_map.find(std::string(optarg)); + if (it == packet_format_map.end()) + throw std::runtime_error("Bad packet format"); + c->packet_format_ = it->second; + break; + } + default: + help(); + exit(EXIT_FAILURE); + } + } + + /** + * IO module configuration + */ + io_config.set(); + + /** + * Parse config file + */ + global_conf.parseConfigurationFile(conf_file); + + auto ping = std::make_unique<Client>(c.get()); + + auto t0 = std::chrono::steady_clock::now(); + ping->ping(); + auto t1 = std::chrono::steady_clock::now(); + + LoggerInfo() << "Elapsed time: " + << utils::SteadyTime::getDurationMs(t0, t1).count() << "ms"; + +#ifdef _WIN32 + WSACleanup(); +#endif + return 0; +} + +} // namespace ping + +} // namespace core + +} // namespace transport + +int main(int argc, char *argv[]) { + return transport::core::ping::start(argc, argv); +} diff --git a/apps/ping/src/ping_server.cc b/apps/ping/src/ping_server.cc new file mode 100644 index 000000000..900da18ca --- /dev/null +++ b/apps/ping/src/ping_server.cc @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2021-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_producer.h> +#ifndef _WIN32 +#include <hicn/transport/utils/daemonizator.h> +#include <unistd.h> +#else +#include <openssl/applink.c> +#endif + +#include <hicn/apps/utils/logger.h> +#include <hicn/transport/auth/signer.h> +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/utils/string_tokenizer.h> + +#include <asio.hpp> + +namespace transport { + +namespace interface { + +using HashAlgorithm = core::HashAlgorithm; +using CryptoSuite = auth::CryptoSuite; + +class CallbackContainer { + private: + std::shared_ptr<ContentObject> createContentObject(const Name &name, + uint32_t lifetime, + const Interest &interest) { + auto content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + interest.getFormat(), + (sign_ && signer_) ? signer_->getSignatureFieldSize() : 0); + + content_object->setName(name); + content_object->setLifetime(lifetime); + content_object->setLocator(interest.getLocator()); + + if (LoggerIsOn(2)) { + LoggerInfo() << ">>> send object " << content_object->getName(); + } else if (LoggerIsOn(1)) { + LoggerInfo() << ">>> send object " << content_object->getName(); + } + + if (LoggerIsOn(3)) { + LoggerInfo() << "----- object dump -----"; + content_object->dump(); + LoggerInfo() << "-----------------------"; + } + + if (sign_ && signer_) signer_->signPacket(content_object.get()); + return content_object; + } + + public: + CallbackContainer([[maybe_unused]] const Name &prefix, uint32_t object_size, + auth::Signer *signer, bool sign, std::string passphrase, + [[maybe_unused]] uint32_t lifetime) + : buffer_(object_size, 'X'), signer_(signer), sign_(sign) { + // Verifier for interest manifests + if (!passphrase.empty()) + verifier_ = std::make_unique<auth::SymmetricVerifier>(passphrase); + } + + void processInterest(ProducerSocket &p, Interest &interest, + uint32_t lifetime) { + if (verifier_ && interest.hasManifest()) { + auto t0 = utils::SteadyTime::now(); + if (verifier_->verifyPacket(&interest)) { + auto t1 = utils::SteadyTime::now(); + auto dt = utils::SteadyTime::getDurationUs(t0, t1); + LoggerInfo() << "Verification time: " << dt.count(); + LoggerInfo() << "<<< Signature Ok."; + } else { + LoggerErr() << "<<< Signature verification failed!"; + } + } + + if (LoggerIsOn(2)) { + LoggerInfo() << "<<< received interest " << interest.getName() + << " suffixes in manifest: " << interest.numberOfSuffixes(); + } else if (LoggerIsOn(1)) { + LoggerInfo() << "<<< received interest " << interest.getName(); + } + + if (LoggerIsOn(3)) { + LoggerInfo() << "----- interest dump -----"; + interest.dump(); + LoggerInfo() << "-------------------------"; + } + + if (!interest.isValid()) throw std::runtime_error("Bad interest format"); + Name name = interest.getName(); + + if (!interest.hasManifest()) { // Single interest + auto content_object = createContentObject(name, lifetime, interest); + p.produce(*content_object); + } else { // Interest manifest + uint32_t _; + const uint32_t *suffix = NULL; + UNUSED(_); + + interest_manifest_foreach_suffix(interest.getIntManifestHeader(), suffix, + _) { + name.setSuffix(*suffix); + + auto content_object = createContentObject(name, lifetime, interest); + p.produce(*content_object); + } + } + + LoggerVerbose(1) << "\n"; + } + + private: + std::string buffer_; + auth::Signer *signer_; + bool sign_; + std::unique_ptr<auth::Verifier> verifier_; +}; + +void help() { + LoggerInfo() << "usage: hicn-preoducer-ping [options]"; + LoggerInfo() << "PING options"; + LoggerInfo() << "-s <val> object content size (default 1350B)"; + LoggerInfo() << "-n <val> hicn name (default b001::/64)"; + LoggerInfo() << "-l data lifetime"; + LoggerInfo() << "OUTPUT options"; + LoggerInfo() << "-V verbose, prints statistics about the " + "messagges sent " + " and received (default false)"; + LoggerInfo() << "-D dump, dumps sent and received packets " + "(default false)"; + LoggerInfo() << "-q quiet, not prints (default false)"; + LoggerInfo() + << "-z <io_module> IO module to use. Default: hicnlight_module"; + LoggerInfo() << "-F <conf_file> Path to optional configuration file for " + "libtransport"; +#ifndef _WIN32 + LoggerInfo() << "-d daemon mode"; +#endif + LoggerInfo() << "-H prints this message"; +} + +int ping_main(int argc, char **argv) { + transport::interface::global_config::GlobalConfigInterface global_conf; +#ifdef _WIN32 + WSADATA wsaData = {0}; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#else + bool daemon = false; +#endif + std::string name_prefix = "b001::0/64"; + std::string delimiter = "/"; + uint32_t object_size = 1250; + std::string keystore_path = "./rsa_crypto_material.p12"; + std::string keystore_password = "cisco"; + std::string passphrase = ""; + bool sign = false; + uint32_t data_lifetime = default_values::content_object_expiry_time; + + std::string conf_file; + transport::interface::global_config::IoModuleConfiguration io_config; + io_config.name = "hicnlight_module"; + + int opt; +#ifndef _WIN32 + while ((opt = getopt(argc, argv, "a:s:n:t:l:frdHk:p:z:F:")) != -1) { +#else + while ((opt = getopt(argc, argv, "s:n:t:l:frHk:p:z:F:")) != -1) { +#endif + switch (opt) { + case 'a': + passphrase = optarg; + break; + case 's': + object_size = std::stoi(optarg); + break; + case 'n': + name_prefix = optarg; + break; + case 'l': + data_lifetime = std::stoi(optarg); + break; +#ifndef _WIN32 + case 'd': + daemon = true; + break; +#endif + case 'k': + keystore_path = optarg; + sign = true; + break; + case 'p': + keystore_password = optarg; + break; + case 'z': + io_config.name = optarg; + break; + case 'F': + conf_file = optarg; + break; + default: + help(); + exit(EXIT_FAILURE); + } + } + +#ifndef _WIN32 + if (daemon) { + utils::Daemonizator::daemonize(); + } +#endif + + /** + * IO module configuration + */ + io_config.set(); + + /** + * Parse config file + */ + global_conf.parseConfigurationFile(conf_file); + + core::Prefix producer_namespace(name_prefix); + + utils::StringTokenizer tokenizer(name_prefix, delimiter); + std::string ip_address = tokenizer.nextToken(); + Name n(ip_address); + + if (object_size > 1350) object_size = 1350; + + CallbackContainer *stubs; + std::unique_ptr<auth::Signer> signer; + + if (sign) { + signer = std::make_unique<auth::AsymmetricSigner>(keystore_path, + keystore_password); + stubs = new CallbackContainer(n, object_size, signer.get(), sign, + passphrase, data_lifetime); + } else { + auth::Signer *signer = nullptr; + stubs = new CallbackContainer(n, object_size, signer, sign, passphrase, + data_lifetime); + } + + ProducerSocket p; + p.registerPrefix(producer_namespace); + + p.setSocketOption(GeneralTransportOptions::MANIFEST_MAX_CAPACITY, 0U); + p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); + p.setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + (ProducerInterestCallback)bind(&CallbackContainer::processInterest, stubs, + std::placeholders::_1, + std::placeholders::_2, data_lifetime)); + + p.connect(); + p.start(); + + asio::io_service io_service; + asio::signal_set signal_set(io_service, SIGINT); + signal_set.async_wait( + [&p, &io_service](const std::error_code &, const int &) { + LoggerInfo() << "STOPPING!!"; + p.stop(); + io_service.stop(); + }); + + io_service.run(); + +#ifdef _WIN32 + WSACleanup(); +#endif + return 0; +} + +} // namespace interface + +} // end namespace transport + +int main(int argc, char **argv) { + return transport::interface::ping_main(argc, argv); +} |