aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/.clang-format2
-rw-r--r--apps/CMakeLists.txt99
-rw-r--r--apps/cmake/packaging.cmake (renamed from apps/cmake/Modules/Packaging.cmake)25
-rw-r--r--apps/common-includes/hicn/apps/utils/logger.h46
-rw-r--r--apps/higet/CMakeLists.txt49
-rw-r--r--apps/higet/higet.cc96
-rw-r--r--apps/hiperf/CMakeLists.txt45
-rw-r--r--apps/hiperf/src/client.cc1448
-rw-r--r--apps/hiperf/src/client.h14
-rw-r--r--apps/hiperf/src/common.h315
-rw-r--r--apps/hiperf/src/forwarder_config.h97
-rw-r--r--apps/hiperf/src/forwarder_interface.cc676
-rw-r--r--apps/hiperf/src/forwarder_interface.h131
-rw-r--r--apps/hiperf/src/main.cc435
-rw-r--r--apps/hiperf/src/server.cc802
-rw-r--r--apps/hiperf/src/server.h5
-rw-r--r--apps/http-proxy/CMakeLists.txt73
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt5
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h27
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h36
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h2
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_proxy.h70
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_session.h42
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h17
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/utils.h11
-rw-r--r--apps/http-proxy/main.cc64
-rw-r--r--apps/http-proxy/src/forwarder_interface.cc48
-rw-r--r--apps/http-proxy/src/http_1x_message_fast_parser.cc7
-rw-r--r--apps/http-proxy/src/http_proxy.cc97
-rw-r--r--apps/http-proxy/src/http_session.cc74
-rw-r--r--apps/http-proxy/src/icn_receiver.cc48
-rw-r--r--apps/ping/.clang-format2
-rw-r--r--apps/ping/CMakeLists.txt39
-rw-r--r--apps/ping/src/ping_client.cc538
-rw-r--r--apps/ping/src/ping_server.cc311
35 files changed, 2664 insertions, 3132 deletions
diff --git a/apps/.clang-format b/apps/.clang-format
index cd21e2017..adc73c6fd 100644
--- a/apps/.clang-format
+++ b/apps/.clang-format
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2021 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index df1b9fc7c..b58e18f03 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -11,37 +11,67 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+##############################################################
+# Project and cmake version
+##############################################################
cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
-set(CMAKE_CXX_STANDARD 14)
-
project(apps)
+
+##############################################################
+# C Standard
+##############################################################
+set(CMAKE_CXX_STANDARD 17)
+
+
+##############################################################
+# Cmake modules
+##############################################################
+include("${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake")
set(CMAKE_MODULE_PATH
${CMAKE_MODULE_PATH}
- "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules"
- "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules"
+ ${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules
)
-if (NOT CMAKE_BUILD_TYPE)
- message(STATUS "${PROJECT_NAME}: No build type selected, default to Release")
- set(CMAKE_BUILD_TYPE "Release")
-endif ()
-
-include(BuildMacros)
-include(WindowsMacros)
+##############################################################
+# Libs and Bins names
+##############################################################
set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE)
+set(HIGET higet)
+set(HTTP_PROXY hicn-http-proxy)
+set(LIBHTTP_PROXY hicnhttpproxy)
+set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static)
+
+##############################################################
+# Dependencies and third party libs
+##############################################################
find_package(Threads REQUIRED)
-find_package(Libconfig++ REQUIRED)
+find_package(Libconfig++ ${LIBCONFIG_DEFAULT_VERSION} REQUIRED)
+##############################################################
+# Check if building as subproject or as root project
+##############################################################
if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR)
- find_package(Libtransport REQUIRED)
- find_package(Libhicn REQUIRED)
- find_package(hicnctrl REQUIRED)
+ include(CommonSetup)
+
+ find_package(Libhicn ${CURRENT_VERSION} REQUIRED NO_MODULE)
+ find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE)
+ find_package(Libhicntransport ${CURRENT_VERSION} REQUIRED NO_MODULE)
+
+ if (DISABLE_SHARED_LIBRARIES)
+ set(LIBTYPE static)
+ else()
+ set(LIBTYPE shared)
+ endif()
+
+ list(APPEND LIBHICN_LIBRARIES hicn::hicn.${LIBTYPE})
+ list(APPEND LIBTRANSPORT_LIBRARIES hicn::hicntransport.${LIBTYPE})
+ list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE})
else()
if (DISABLE_SHARED_LIBRARIES)
- find_package(OpenSSL REQUIRED)
+ find_package(OpenSSL ${OPENSSL_DEFAULT_VERSION} REQUIRED)
set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC})
set(LIBHICN_LIBRARIES ${LIBHICN_STATIC})
set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC})
@@ -54,22 +84,33 @@ else()
list(APPEND DEPENDENCIES
${LIBTRANSPORT_LIBRARIES}
)
-endif()
-if (WIN32)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996")
-endif ()
+ # glog
+ list(APPEND THIRD_PARTY_INCLUDE_DIRS
+ ${glog_BINARY_DIR}
+ ${glog_SOURCE_DIR}/src
+ )
+ list(APPEND THIRD_PARTY_DEPENDENCIES
+ glog
+ )
-include(Packaging)
+ set(COMMON_INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/common-includes
+ )
+endif()
-add_subdirectory(ping)
-add_subdirectory(hiperf)
-set(HIGET higet)
-set(HTTP_PROXY hicn-http-proxy)
+##############################################################
+# Packaging and versioning
+##############################################################
+include(${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake)
+include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/packaging.cmake)
-if (NOT WIN32)
- add_subdirectory(http-proxy)
-endif ()
+##############################################################
+# Subdirectories
+##############################################################
+add_subdirectory(ping)
+add_subdirectory(hiperf)
+add_subdirectory(http-proxy)
add_subdirectory(higet)
diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/packaging.cmake
index 981eb728e..9142598ad 100644
--- a/apps/cmake/Modules/Packaging.cmake
+++ b/apps/cmake/packaging.cmake
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -18,21 +18,34 @@ useful for testing and debugging within a hicn network."
)
set(${HICN_APPS}_DEB_DEPENDENCIES
- "lib${LIBTRANSPORT} (>= stable_version), lib${LIBHICNCTRL} (>= stable_version)"
+ "lib${LIBTRANSPORT} (= stable_version), lib${LIBHICNCTRL} (= stable_version)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}-dev_DEB_DEPENDENCIES
- "${HICN_APPS} (>= stable_version), lib${LIBTRANSPORT}-dev (>= stable_version), lib${LIBHICNCTRL}-dev (>= stable_version)"
+ "${HICN_APPS} (= stable_version), lib${LIBTRANSPORT}-dev (= stable_version), lib${LIBHICNCTRL}-dev (= stable_version)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}_RPM_DEPENDENCIES
- "lib${LIBTRANSPORT} >= stable_version, lib${LIBHICNCTRL} >= stable_version"
+ "lib${LIBTRANSPORT} = stable_version, lib${LIBHICNCTRL} = stable_version"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_APPS}-dev_RPM_DEPENDENCIES
- "${HICN_APPS} >= stable_version, lib${LIBTRANSPORT}-dev >= stable_version, lib${LIBHICNCTRL}-dev >= stable_version"
+ "${HICN_APPS} = stable_version, lib${LIBTRANSPORT}-dev = stable_version, lib${LIBHICNCTRL}-dev = stable_version"
CACHE STRING "Dependencies for deb/rpm package."
-) \ No newline at end of file
+)
+
+if (INTERNAL_ENVIRONMENT)
+ include(CheckSsl)
+ CheckSsl()
+ set(${HICN_APPS}_DEB_DEPENDENCIES
+ "${${HICN_APPS}_DEB_DEPENDENCIES}, ${OPENSSL_DEPENDENCY}"
+ CACHE STRING "Dependencies for deb/rpm package."
+ )
+ set(${HICN_APPS}-dev_DEB_DEPENDENCIES
+ "${${HICN_APPS}-dev_DEB_DEPENDENCIES}, ${OPENSSL_DEPENDENCY_DEV}"
+ CACHE STRING "Dependencies for deb/rpm package."
+ )
+endif () \ No newline at end of file
diff --git a/apps/common-includes/hicn/apps/utils/logger.h b/apps/common-includes/hicn/apps/utils/logger.h
new file mode 100644
index 000000000..d2af988d9
--- /dev/null
+++ b/apps/common-includes/hicn/apps/utils/logger.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+
+#include <iostream>
+
+#define LoggerInfo() LOG(INFO)
+#define LoggerWarn() LOG(WARNING)
+#define LoggerErr() LOG(ERROR)
+#define LoggerFatal() LOG(FATAL)
+#define LoggerVerbose(level) VLOG((level))
+#define LoggerIsOn(level) VLOG_IS_ON((level))
+
+struct HicnLogger {
+ HicnLogger() {
+ // Set log level
+ const char *log_level = std::getenv("LOG_LEVEL");
+ if (log_level != nullptr) FLAGS_v = std::stol(std::string(log_level));
+
+ // Enable/disable prefix
+ const char *enable_log_prefix = std::getenv("ENABLE_LOG_PREFIX");
+ if (enable_log_prefix != nullptr &&
+ std::string(enable_log_prefix) == "OFF") {
+ FLAGS_log_prefix = false;
+ }
+
+ FLAGS_colorlogtostderr = true;
+ }
+};
+
+static HicnLogger logger; \ No newline at end of file
diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt
index 8d112d3a3..791191872 100644
--- a/apps/higet/CMakeLists.txt
+++ b/apps/higet/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -11,43 +11,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
-set(CMAKE_CXX_STANDARD 14)
-
-project(utils)
-
-find_package(Threads REQUIRED)
-
-set(CMAKE_MODULE_PATH
- ${CMAKE_MODULE_PATH}
- "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules"
- "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules"
-)
-
-if (NOT CMAKE_BUILD_TYPE)
- message(STATUS "${PROJECT_NAME}: No build type selected, default to Release")
- set(CMAKE_BUILD_TYPE "Release")
-endif ()
-
+##############################################################
+# Source files
+##############################################################
list(APPEND APPS_SRC
higet.cc
)
+
+##############################################################
+# Linker flags
+##############################################################
if (WIN32)
- set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\"" )
+ set(CMAKE_EXE_LINKER_FLAGS
+ "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\""
+ )
endif()
+
+##############################################################
+# Compiler options
+##############################################################
+set(COMPILER_OPTIONS
+ ${DEFAULT_COMPILER_OPTIONS}
+)
+
+
+##############################################################
+# Build higet
+##############################################################
if (NOT DISABLE_EXECUTABLES)
build_executable(${HIGET}
SOURCES ${APPS_SRC}
LINK_LIBRARIES
+ ${LIBHICN_LIBRARIES}
${LIBTRANSPORT_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
${WSOCK32_LIBRARY}
${WS2_32_LIBRARY}
- DEPENDS ${LIBTRANSPORT_LIBRARIES}
+ INCLUDE_DIRS
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${LIBTRANSPORT_LIBRARIES} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
DEFINITIONS ${COMPILER_DEFINITIONS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif ()
diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc
index 9ae869731..77cbb52d2 100644
--- a/apps/higet/higet.cc
+++ b/apps/higet/higet.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,26 +13,20 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
#include <hicn/transport/http/client_connection.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
#include <algorithm>
+#include <asio.hpp>
#include <fstream>
#include <functional>
#include <map>
+#ifndef ASIO_STANDALONE
#define ASIO_STANDALONE
+#endif
#include <asio.hpp>
-#undef ASIO_STANDALONE
-
-#include <thread>
-
-typedef std::chrono::time_point<std::chrono::system_clock> Time;
-typedef std::chrono::milliseconds TimeDuration;
-
-Time t1;
-
-#define DEFAULT_BETA 0.99
-#define DEFAULT_GAMMA 0.07
namespace http {
@@ -48,16 +42,13 @@ class ReadBytesCallbackImplementation
static std::string chunk_separator;
public:
- ReadBytesCallbackImplementation(std::string file_name, long yet_downloaded)
+ ReadBytesCallbackImplementation(const std::string &file_name,
+ long yet_downloaded)
: file_name_(file_name),
temp_file_name_(file_name_ + ".temp"),
yet_downloaded_(yet_downloaded),
byte_downloaded_(yet_downloaded),
- chunked_(false),
- chunk_size_(0),
- work_(std::make_unique<asio::io_service::work>(io_service_)),
- thread_(
- std::make_unique<std::thread>([this]() { io_service_.run(); })) {
+ work_(std::make_unique<asio::io_service::work>(io_service_)) {
std::streambuf *buf;
if (file_name_ != "-") {
of_.open(temp_file_name_, std::ofstream::binary | std::ofstream::app);
@@ -69,12 +60,6 @@ class ReadBytesCallbackImplementation
out_ = new std::ostream(buf);
}
- ~ReadBytesCallbackImplementation() {
- if (thread_->joinable()) {
- thread_->join();
- }
- }
-
void onBytesReceived(std::unique_ptr<utils::MemBuf> &&buffer) {
auto buffer_ptr = buffer.release();
io_service_.post([this, buffer_ptr]() {
@@ -102,7 +87,7 @@ class ReadBytesCallbackImplementation
if (chunked_) {
if (chunk_size_ > 0) {
- out_->write((char *)payload->data(), chunk_size_);
+ out_->write((char *)payload->writableData(), chunk_size_);
payload->trimStart(chunk_size_);
if (payload->length() >= chunk_separator.size()) {
@@ -130,7 +115,7 @@ class ReadBytesCallbackImplementation
chunk_size_ -= payload->length();
}
- out_->write((char *)payload->data(), to_write);
+ out_->write((char *)payload->writableData(), to_write);
byte_downloaded_ += (long)to_write;
payload->trimStart(to_write);
@@ -140,7 +125,7 @@ class ReadBytesCallbackImplementation
}
}
} else {
- out_->write((char *)payload->data(), payload->length());
+ out_->write((char *)payload->writableData(), payload->length());
byte_downloaded_ += (long)payload->length();
}
@@ -175,13 +160,13 @@ class ReadBytesCallbackImplementation
}
print_bar(100, 100, true);
- std::cout << "\nDownloaded " << bytes << " bytes" << std::endl;
+ LoggerInfo() << "\nDownloaded " << bytes << " bytes";
}
work_.reset();
});
}
- void onError(const std::error_code ec) {
+ void onError(const std::error_code &ec) {
io_service_.post([this]() {
of_.close();
delete out_;
@@ -219,15 +204,13 @@ class ReadBytesCallbackImplementation
}
}
if (last) {
- std::cout << "] " << int(progress * 100.0) << " %" << std::endl
- << std::endl;
+ std::cout << "] " << int(progress * 100.0) << " %";
} else {
std::cout << "] " << int(progress * 100.0) << " %\r";
std::cout.flush();
}
}
- private:
std::string file_name_;
std::string temp_file_name_;
std::ostream *out_;
@@ -236,41 +219,36 @@ class ReadBytesCallbackImplementation
long content_size_;
bool first_chunk_read_ = false;
long byte_downloaded_ = 0;
- bool chunked_;
- std::size_t chunk_size_;
+ bool chunked_ = false;
+ std::size_t chunk_size_ = 0;
asio::io_service io_service_;
std::unique_ptr<asio::io_service::work> work_;
- std::unique_ptr<std::thread> thread_;
};
std::string ReadBytesCallbackImplementation::chunk_separator = "\r\n";
-long checkFileStatus(std::string file_name) {
+long checkFileStatus(const std::string &file_name) {
struct stat stat_buf;
std::string temp_file_name_ = file_name + ".temp";
int rc = stat(temp_file_name_.c_str(), &stat_buf);
return rc == 0 ? stat_buf.st_size : -1;
}
-void usage(char *program_name) {
- std::cerr << "usage:" << std::endl;
- std::cerr << program_name << " [option]... [url]..." << std::endl;
- std::cerr << program_name << " options:" << std::endl;
- std::cerr
- << "-O <out_put_path> = write documents to <out_put_file>"
- << std::endl;
- std::cerr << "-S = print server response"
- << std::endl;
- std::cerr << "-P = first word of the ipv6 name of "
- "the response"
- << std::endl;
- std::cerr << "example:" << std::endl;
- std::cerr << "\t" << program_name << " -O - http://origin/index.html"
- << std::endl;
- exit(EXIT_FAILURE);
+void usage(const char *program_name) {
+ LoggerInfo() << "usage:";
+ LoggerInfo() << program_name << " [option]... [url]...";
+ LoggerInfo() << program_name << " options:";
+ LoggerInfo()
+ << "-O <out_put_path> = write documents to <out_put_file>";
+ LoggerInfo() << "-S = print server response";
+ LoggerInfo()
+ << "-P = first word of the ipv6 name of "
+ "the response";
+ LoggerInfo() << "example:";
+ LoggerInfo() << "\t" << program_name << " -O - http://origin/index.html";
}
-int main(int argc, char **argv) {
+int http_main(int argc, char **argv) {
#ifdef _WIN32
WSADATA wsaData = {0};
WSAStartup(MAKEWORD(2, 2), &wsaData);
@@ -299,20 +277,20 @@ int main(int argc, char **argv) {
case 'P':
conf.ipv6_first_word = optarg;
break;
- case 'h':
default:
usage(argv[0]);
- break;
+ exit(EXIT_FAILURE);
}
}
if (!argv[optind]) {
usage(argv[0]);
+ exit(EXIT_FAILURE);
}
name = argv[optind];
- std::cerr << "Using name " << name << " and name first word "
- << conf.ipv6_first_word << std::endl;
+ LoggerInfo() << "Using name " << name << " and name first word "
+ << conf.ipv6_first_word;
if (conf.file_name.empty()) {
conf.file_name = name.substr(1 + name.find_last_of("/"));
@@ -345,8 +323,6 @@ int main(int argc, char **argv) {
connection.setVerifier(verifier);
}
- t1 = std::chrono::system_clock::now();
-
http::ReadBytesCallbackImplementation readBytesCallback(conf.file_name,
yetDownloaded);
@@ -362,4 +338,4 @@ int main(int argc, char **argv) {
} // end namespace http
-int main(int argc, char **argv) { return http::main(argc, argv); }
+int main(int argc, char **argv) { return http::http_main(argc, argv); }
diff --git a/apps/hiperf/CMakeLists.txt b/apps/hiperf/CMakeLists.txt
index 564525e67..5a0dc3c06 100644
--- a/apps/hiperf/CMakeLists.txt
+++ b/apps/hiperf/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -12,33 +12,50 @@
# limitations under the License.
if (NOT DISABLE_EXECUTABLES)
+##############################################################
+# Source files
+##############################################################
list(APPEND HIPERF_SRC
${CMAKE_CURRENT_SOURCE_DIR}/src/client.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/main.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/server.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/src/forwarder_interface.cc
)
+
+##############################################################
+# Libraries
+##############################################################
list (APPEND HIPERF_LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${LIBHICNCTRL_LIBRARIES}
- ${LIBHICN_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
- ${LIBCONFIG_CPP_LIBRARIES}
- ${WSOCK32_LIBRARY}
- ${WS2_32_LIBRARY}
+ PRIVATE ${LIBTRANSPORT_LIBRARIES}
+ PRIVATE ${LIBHICNCTRL_LIBRARIES}
+ PRIVATE ${LIBHICN_LIBRARIES}
+ PRIVATE ${CMAKE_THREAD_LIBS_INIT}
+ PRIVATE ${LIBCONFIG_CPP_LIBRARIES}
+ PRIVATE ${WSOCK32_LIBRARY}
+ PRIVATE ${WS2_32_LIBRARY}
+ )
+
+##############################################################
+# Compiler options
+##############################################################
+ set(COMPILER_OPTIONS
+ ${DEFAULT_COMPILER_OPTIONS}
)
+
+##############################################################
+# Build hiperf
+##############################################################
build_executable(hiperf
SOURCES ${HIPERF_SRC}
LINK_LIBRARIES ${HIPERF_LIBRARIES}
INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${LIBTRANSPORT_INCLUDE_DIRS}
- ${LIBHICNCTRL_INCLUDE_DIRS}
- ${LIBCONFIG_CPP_INCLUDE_DIRS}
- DEPENDS ${DEPENDENCIES}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
+ PRIVATE ${LIBCONFIG_CPP_INCLUDE_DIRS}
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif() \ No newline at end of file
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index 820ebf0ce..1ce5b4c55 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -14,8 +14,7 @@
*/
#include <client.h>
-#include <forwarder_config.h>
-#include <forwarder_interface.h>
+#include <hicn/transport/portability/endianess.h>
#include <libconfig.h++>
@@ -27,874 +26,857 @@ namespace hiperf {
class RTCCallback;
class Callback;
+using transport::auth::CryptoHashType;
+using transport::core::Packet;
+using transport::core::Prefix;
+using transport::interface::ConsumerCallbacksOptions;
+using transport::interface::ConsumerSocket;
+using transport::interface::GeneralTransportOptions;
+using transport::interface::ProducerSocket;
+using transport::interface::ProductionProtocolAlgorithms;
+using transport::interface::RaaqmTransportOptions;
+using transport::interface::RtcTransportOptions;
+using transport::interface::RtcTransportRecoveryStrategies;
+using transport::interface::StrategyCallback;
+using transport::interface::TransportStatistics;
+
/**
* Hiperf client class: configure and setup an hicn consumer following the
* ClientConfiguration.
*/
-class HIperfClient::Impl
-#ifdef FORWARDER_INTERFACE
- : ForwarderInterface::ICallback
-#endif
-{
- typedef std::chrono::time_point<std::chrono::steady_clock> Time;
- typedef std::chrono::microseconds TimeDuration;
-
+class HIperfClient::Impl {
friend class Callback;
friend class RTCCallback;
- struct nack_packet_t {
- uint64_t timestamp;
- uint32_t prod_rate;
- uint32_t prod_seg;
-
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
-
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
-
- inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
- inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
- };
-
- public:
- Impl(const hiperf::ClientConfiguration &conf)
- : configuration_(conf),
- total_duration_milliseconds_(0),
- old_bytes_value_(0),
- old_interest_tx_value_(0),
- old_fec_interest_tx_value_(0),
- old_fec_data_rx_value_(0),
- old_lost_data_value_(0),
- old_bytes_recovered_value_(0),
- old_definitely_lost_data_value_(0),
- old_retx_value_(0),
- old_sent_int_value_(0),
- old_received_nacks_value_(0),
- old_fec_pkt_(0),
- avg_data_delay_(0),
- delay_sample_(0),
- received_bytes_(0),
- received_data_pkt_(0),
- data_delays_(""),
- signals_(io_service_),
- rtc_callback_(*this),
- callback_(*this),
- socket_(io_service_),
- done_(false),
- switch_threshold_(~0)
-#ifdef FORWARDER_INTERFACE
- ,
- forwarder_interface_(io_service_, this)
-#endif
- {
+ static inline constexpr uint16_t klog2_header_counter() { return 4; }
+ static inline constexpr uint16_t kheader_counter_mask() {
+ return (1 << klog2_header_counter()) - 1;
}
- ~Impl() {}
+ class ConsumerContext
+ : public Base<ConsumerContext, ClientConfiguration, Impl>,
+ private ConsumerSocket::ReadCallback {
+ static inline const std::size_t kmtu = HIPERF_MTU;
- void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {}
+ public:
+ using ConfType = ClientConfiguration;
+ using ParentType = typename HIperfClient::Impl;
+ static inline auto getContextType() -> std::string {
+ return "ConsumerContext";
+ }
+
+ ConsumerContext(Impl &client, int consumer_identifier)
+ : Base(client, client.io_service_, consumer_identifier),
+ receive_buffer_(
+ utils::MemBuf::create(client.config_.receive_buffer_size_)),
+ socket_(client.io_service_),
+ payload_size_max_(PayloadSize(client.config_.packet_format_)
+ .getPayloadSizeMax(RTC_HEADER_SIZE)),
+ nb_iterations_(client.config_.nb_iterations_) {}
+
+ ConsumerContext(ConsumerContext &&other) noexcept
+ : Base(std::move(other)),
+ receive_buffer_(std::move(other.receive_buffer_)),
+ socket_(std::move(other.socket_)),
+ payload_size_max_(other.payload_size_max_),
+ remote_(std::move(other.remote_)),
+ nb_iterations_(other.nb_iterations_),
+ saved_stats_(std::move(other.saved_stats_)),
+ header_counter_(other.header_counter_),
+ first_(other.first_),
+ consumer_socket_(std::move(other.consumer_socket_)),
+ producer_socket_(std::move(other.producer_socket_)) {}
+
+ ~ConsumerContext() override = default;
+
+ /***************************************************************
+ * ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
+ bool isBufferMovable() noexcept override { return false; }
- void addFace(const std::string &local_address, uint16_t local_port,
- const std::string &remote_address, uint16_t remote_port,
- std::string interface);
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer = receive_buffer_->writableData();
- void handleTimerExpiration(ConsumerSocket &c,
- const TransportStatistics &stats) {
- const char separator = ' ';
- const int width = 15;
+ if (configuration_.rtc_) {
+ *max_length = kmtu;
+ } else {
+ *max_length = configuration_.receive_buffer_size_;
+ }
+ }
- utils::TimePoint t2 = utils::SteadyClock::now();
- auto exact_duration =
- std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_);
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ // Nothing to do here
+ auto ret = std::move(buffer);
+ }
- std::stringstream interval;
- interval << total_duration_milliseconds_ / 1000 << "-"
- << total_duration_milliseconds_ / 1000 +
- exact_duration.count() / 1000;
+ void readDataAvailable(std::size_t length) noexcept override {
+ if (configuration_.rtc_) {
+ saved_stats_.received_bytes_ += length;
+ saved_stats_.received_data_pkt_++;
- std::stringstream bytes_transferred;
- bytes_transferred << std::fixed << std::setprecision(3)
- << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0
- << std::setfill(separator) << "[MB]";
+ // collecting delay stats. Just for performance testing
+ auto senderTimeStamp =
+ *reinterpret_cast<uint64_t *>(receive_buffer_->writableData());
- std::stringstream bandwidth;
- bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) /
- (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
+ auto now = utils::SystemTime::nowMs().count();
+ auto new_delay = double(now - senderTimeStamp);
- std::stringstream window;
- window << stats.getAverageWindowSize() << std::setfill(separator)
- << "[Int]";
+ if (senderTimeStamp > now)
+ new_delay = -1 * double(senderTimeStamp - now);
- std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+ saved_stats_.delay_sample_++;
+ saved_stats_.avg_data_delay_ =
+ saved_stats_.avg_data_delay_ +
+ (double(new_delay) - saved_stats_.avg_data_delay_) /
+ saved_stats_.delay_sample_;
- if (configuration_.rtc_) {
- // we get rtc stats more often, thus we need ms in the interval
- std::stringstream interval_ms;
- interval_ms << total_duration_milliseconds_ << "-"
- << total_duration_milliseconds_ + exact_duration.count();
-
- std::stringstream lost_data;
- lost_data << stats.getLostData() - old_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream bytes_recovered_data;
- bytes_recovered_data << stats.getBytesRecoveredData() -
- old_bytes_recovered_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream definitely_lost_data;
- definitely_lost_data << stats.getDefinitelyLostData() -
- old_definitely_lost_data_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream data_delay;
- data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
-
- std::stringstream received_data_pkt;
- received_data_pkt << received_data_pkt_ << std::setfill(separator)
- << "[pkt]";
-
- std::stringstream goodput;
- goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
- << std::setfill(separator) << "[Mbps]";
-
- std::stringstream loss_rate;
- loss_rate << std::fixed << std::setprecision(2)
- << stats.getLossRatio() * 100.0 << std::setfill(separator)
- << "[%]";
-
- std::stringstream retx_sent;
- retx_sent << stats.getRetxCount() - old_retx_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream interest_sent;
- interest_sent << stats.getInterestTx() - old_sent_int_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream nacks;
- nacks << stats.getReceivedNacks() - old_received_nacks_value_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream fec_pkt;
- fec_pkt << stats.getReceivedFEC() - old_fec_pkt_
- << std::setfill(separator) << "[pkt]";
-
- std::stringstream queuing_delay;
- queuing_delay << stats.getQueuingDelay() << std::setfill(separator)
- << "[ms]";
-
-#ifdef FORWARDER_INTERFACE
- if (!done_ && stats.getQueuingDelay() >= switch_threshold_ &&
- total_duration_milliseconds_ > 1000) {
- std::cout << "Switching due to queuing delay" << std::endl;
- forwarder_interface_.createFaceAndRoutes(backup_routes_);
- forwarder_interface_.deleteFaceAndRoutes(main_routes_);
- std::swap(backup_routes_, main_routes_);
- done_ = true;
- }
-#endif
-
- // statistics not yet available in the transport
- // std::stringstream interest_fec_tx;
- // interest_fec_tx << stats.getInterestFecTxCount() -
- // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
- // std::stringstream bytes_fec_recv;
- // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
- // << std::setfill(separator) << "[bytes]";
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "RecvData";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Goodput";
- std::cout << std::left << std::setw(width) << "LossRate";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "InterestSent";
- std::cout << std::left << std::setw(width) << "ReceivedNacks";
- std::cout << std::left << std::setw(width) << "SyncWnd";
- std::cout << std::left << std::setw(width) << "MinRtt";
- std::cout << std::left << std::setw(width) << "QueuingDelay";
- std::cout << std::left << std::setw(width) << "LostData";
- std::cout << std::left << std::setw(width) << "RecoveredData";
- std::cout << std::left << std::setw(width) << "DefinitelyLost";
- std::cout << std::left << std::setw(width) << "State";
- std::cout << std::left << std::setw(width) << "DataDelay";
- std::cout << std::left << std::setw(width) << "FecPkt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval_ms.str();
- std::cout << std::left << std::setw(width) << received_data_pkt.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << goodput.str();
- std::cout << std::left << std::setw(width) << loss_rate.str();
- std::cout << std::left << std::setw(width) << retx_sent.str();
- std::cout << std::left << std::setw(width) << interest_sent.str();
- std::cout << std::left << std::setw(width) << nacks.str();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str();
- std::cout << std::left << std::setw(width) << queuing_delay.str();
- std::cout << std::left << std::setw(width) << lost_data.str();
- std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
- std::cout << std::left << std::setw(width) << definitely_lost_data.str();
- std::cout << std::left << std::setw(width) << stats.getCCStatus();
- std::cout << std::left << std::setw(width) << data_delay.str();
- std::cout << std::left << std::setw(width) << fec_pkt.str();
- std::cout << std::endl;
-
- if (configuration_.test_mode_) {
- if (data_delays_.size() > 0) data_delays_.pop_back();
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- std::cout << now << " DATA-DELAYS:[" << data_delays_ << "]"
- << std::endl;
- }
-
- // statistics not yet available in the transport
- // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
- // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
- } else {
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "Transfer";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "Cwnd";
- std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval.str();
- std::cout << std::left << std::setw(width) << bytes_transferred.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << stats.getRetxCount();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
- std::cout << std::endl;
- }
- total_duration_milliseconds_ += (uint32_t)exact_duration.count();
- old_bytes_value_ = stats.getBytesRecv();
- old_lost_data_value_ = stats.getLostData();
- old_bytes_recovered_value_ = stats.getBytesRecoveredData();
- old_definitely_lost_data_value_ = stats.getDefinitelyLostData();
- old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
- old_fec_data_rx_value_ = stats.getBytesFecRecv();
- old_retx_value_ = stats.getRetxCount();
- old_sent_int_value_ = stats.getInterestTx();
- old_received_nacks_value_ = stats.getReceivedNacks();
- old_fec_pkt_ = stats.getReceivedFEC();
- delay_sample_ = 0;
- avg_data_delay_ = 0;
- received_bytes_ = 0;
- received_data_pkt_ = 0;
- data_delays_ = "";
-
- t_stats_ = utils::SteadyClock::now();
- }
+ if (configuration_.test_mode_) {
+ saved_stats_.data_delays_ += std::to_string(int(new_delay));
+ saved_stats_.data_delays_ += ",";
+ }
- bool parseConfig(const char *conf_file) {
- using namespace libconfig;
- Config cfg;
-
- try {
- cfg.readFile(conf_file);
- } catch (const FileIOException &fioex) {
- std::cerr << "I/O error while reading file." << std::endl;
- return false;
- } catch (const ParseException &pex) {
- std::cerr << "Parse error at " << pex.getFile() << ":" << pex.getLine()
- << " - " << pex.getError() << std::endl;
- return false;
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ producer_socket_->produceDatagram(
+ configuration_.relay_name_.makeName(),
+ receive_buffer_->writableData(),
+ length < payload_size_max_ ? length : payload_size_max_);
+ }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ const uint8_t *start = receive_buffer_->writableData();
+ start += sizeof(uint64_t);
+ std::size_t pkt_len = length - sizeof(uint64_t);
+ socket_.send_to(asio::buffer(start, pkt_len), remote_);
+ }
+ }
}
- Setting &config = cfg.getRoot();
+ size_t maxBufferSize() const override {
+ return configuration_.rtc_ ? kmtu : configuration_.receive_buffer_size_;
+ }
- if (config.exists("switch_threshold")) {
- unsigned threshold;
- config.lookupValue("switch_threshold", threshold);
- switch_threshold_ = threshold;
+ void readError(const std::error_code &ec) noexcept override {
+ getOutputStream() << "Error " << ec.message()
+ << " while reading from socket" << std::endl;
+ parent_.io_service_.stop();
}
- // listeners
- if (config.exists("listeners")) {
- // get path where looking for modules
- const Setting &listeners = config.lookup("listeners");
- auto count = listeners.getLength();
+ void readSuccess(std::size_t total_size) noexcept override {
+ if (configuration_.rtc_) {
+ getOutputStream() << "Data successfully read" << std::endl;
+ } else {
+ auto t2 = utils::SteadyTime::now();
+ auto dt =
+ utils::SteadyTime::getDurationUs(saved_stats_.t_download_, t2);
+ auto usec = dt.count();
- for (int i = 0; i < count; i++) {
- const Setting &listener = listeners[i];
- ListenerConfig list;
- unsigned port;
- std::string interface;
+ getOutputStream() << "Content retrieved. Size: " << total_size
+ << " [Bytes]" << std::endl;
- list.name = listener.getName();
- listener.lookupValue("local_address", list.address);
- listener.lookupValue("local_port", port);
- listener.lookupValue("interface", list.interface);
- list.port = (uint16_t)(port);
+ getOutputStream() << "Elapsed Time: " << usec / 1000000.0
+ << " seconds -- "
+ << double(total_size * 8) * 1.0 / double(usec) * 1.0
+ << " [Mbps]" << std::endl;
- std::cout << "Adding listener " << list.name << ", ( " << list.address
- << ":" << list.port << ")" << std::endl;
- config_.addListener(std::move(list));
+ parent_.io_service_.stop();
}
}
- // connectors
- if (config.exists("connectors")) {
- // get path where looking for modules
- const Setting &connectors = config.lookup("connectors");
- auto count = connectors.getLength();
-
- for (int i = 0; i < count; i++) {
- const Setting &connector = connectors[i];
- ConnectorConfig conn;
-
- conn.name = connector.getName();
- unsigned port = 0;
-
- if (!connector.lookupValue("local_address", conn.local_address)) {
- conn.local_address = "";
- }
+ /***************************************************************
+ * End of ConsumerSocket::ReadCallback implementation
+ ***************************************************************/
- if (!connector.lookupValue("local_port", port)) {
- port = 0;
- }
-
- conn.local_port = (uint16_t)(port);
+ private:
+ struct SavedStatistics {
+ utils::SteadyTime::TimePoint t_stats_{};
+ utils::SteadyTime::TimePoint t_download_{};
+ uint32_t total_duration_milliseconds_{0};
+ uint64_t old_bytes_value_{0};
+ uint64_t old_interest_tx_value_{0};
+ uint64_t old_fec_interest_tx_value_{0};
+ uint64_t old_fec_data_rx_value_{0};
+ uint64_t old_lost_data_value_{0};
+ uint64_t old_bytes_recovered_value_{0};
+ uint64_t old_definitely_lost_data_value_{0};
+ uint64_t old_retx_value_{0};
+ uint64_t old_sent_int_value_{0};
+ uint64_t old_received_nacks_value_{0};
+ uint32_t old_fec_pkt_{0};
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_{0};
+ uint32_t delay_sample_{0};
+ uint32_t received_bytes_{0};
+ uint32_t received_data_pkt_{0};
+ uint32_t auth_alerts_{0};
+ std::string data_delays_{""};
+ };
+
+ /***************************************************************
+ * Transport callbacks
+ ***************************************************************/
+
+ void checkReceivedRtcContent(
+ [[maybe_unused]] const ConsumerSocket &c,
+ [[maybe_unused]] const transport::core::ContentObject &content_object)
+ const {
+ // Nothing to do here
+ }
+
+ void processLeavingInterest(
+ const ConsumerSocket & /*c*/,
+ const transport::core::Interest & /*interest*/) const {
+ // Nothing to do here
+ }
+
+ transport::auth::VerificationPolicy onAuthFailed(
+ transport::auth::Suffix /*suffix*/,
+ transport::auth::VerificationPolicy /*policy*/) {
+ saved_stats_.auth_alerts_++;
+ return transport::auth::VerificationPolicy::ACCEPT;
+ }
+
+ void handleTimerExpiration([[maybe_unused]] const ConsumerSocket &c,
+ const TransportStatistics &stats) {
+ const char separator = ' ';
+ const int width = 18;
+
+ utils::SteadyTime::TimePoint t2 = utils::SteadyTime::Clock::now();
+ auto exact_duration =
+ utils::SteadyTime::getDurationMs(saved_stats_.t_stats_, t2);
- if (!connector.lookupValue("remote_address", conn.remote_address)) {
- std::cerr
- << "Error in configuration file: remote_address is a mandatory "
- "field of Connectors."
- << std::endl;
- return false;
+ std::stringstream interval_ms;
+ interval_ms << saved_stats_.total_duration_milliseconds_ << "-"
+ << saved_stats_.total_duration_milliseconds_ +
+ exact_duration.count();
+
+ std::stringstream bytes_transferred;
+ bytes_transferred << std::fixed << std::setprecision(3)
+ << double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) /
+ 1000000.0
+ << std::setfill(separator);
+
+ std::stringstream bandwidth;
+ bandwidth << (double(stats.getBytesRecv() -
+ saved_stats_.old_bytes_value_) *
+ 8) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream window;
+ window << stats.getAverageWindowSize() << std::setfill(separator);
+
+ std::stringstream avg_rtt;
+ avg_rtt << std::setprecision(3) << std::fixed << stats.getAverageRtt()
+ << std::setfill(separator);
+
+ if (configuration_.rtc_) {
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - saved_stats_.old_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ saved_stats_.old_bytes_recovered_value_
+ << std::setfill(separator);
+
+ std::stringstream definitely_lost_data;
+ definitely_lost_data << stats.getDefinitelyLostData() -
+ saved_stats_.old_definitely_lost_data_value_
+ << std::setfill(separator);
+
+ std::stringstream data_delay;
+ data_delay << std::fixed << std::setprecision(3)
+ << saved_stats_.avg_data_delay_ << std::setfill(separator);
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << saved_stats_.received_data_pkt_
+ << std::setfill(separator);
+
+ std::stringstream goodput;
+ goodput << std::fixed << std::setprecision(3)
+ << (saved_stats_.received_bytes_ * 8.0) /
+ (exact_duration.count()) / 1000.0
+ << std::setfill(separator);
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator);
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - saved_stats_.old_retx_value_
+ << std::setfill(separator);
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() -
+ saved_stats_.old_sent_int_value_
+ << std::setfill(separator);
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() -
+ saved_stats_.old_received_nacks_value_
+ << std::setfill(separator);
+
+ std::stringstream fec_pkt;
+ fec_pkt << stats.getReceivedFEC() - saved_stats_.old_fec_pkt_
+ << std::setfill(separator);
+
+ std::stringstream queuing_delay;
+ queuing_delay << std::fixed << std::setprecision(3)
+ << stats.getQueuingDelay() << std::setfill(separator);
+
+ std::stringstream residual_losses;
+ double rl_perc = stats.getResidualLossRate() * 100;
+ residual_losses << std::fixed << std::setprecision(2) << rl_perc
+ << std::setfill(separator);
+
+ std::stringstream quality_score;
+ quality_score << std::fixed << (int)stats.getQualityScore()
+ << std::setfill(separator);
+
+ std::stringstream alerts;
+ alerts << stats.getAlerts() << std::setfill(separator);
+
+ std::stringstream auth_alerts;
+ auth_alerts << saved_stats_.auth_alerts_ << std::setfill(separator);
+
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecvData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Goodput[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "LossRate[%]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "InterestSent";
+ getOutputStream()
+ << std::right << std::setw(width) << "ReceivedNacks";
+ getOutputStream() << std::right << std::setw(width) << "SyncWnd[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "MinRtt[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "QueuingDelay[ms]";
+ getOutputStream()
+ << std::right << std::setw(width) << "LostData[pkt]";
+ getOutputStream()
+ << std::right << std::setw(width) << "RecoveredData";
+ getOutputStream()
+ << std::right << std::setw(width) << "DefinitelyLost";
+ getOutputStream() << std::right << std::setw(width) << "State";
+ getOutputStream()
+ << std::right << std::setw(width) << "DataDelay[ms]";
+ getOutputStream() << std::right << std::setw(width) << "FecPkt";
+ getOutputStream() << std::right << std::setw(width) << "Congestion";
+ getOutputStream()
+ << std::right << std::setw(width) << "ResidualLosses";
+ getOutputStream() << std::right << std::setw(width) << "QualityScore";
+ getOutputStream() << std::right << std::setw(width) << "Alerts";
+ getOutputStream()
+ << std::right << std::setw(width) << "AuthAlerts" << std::endl;
+
+ first_ = false;
}
- if (!connector.lookupValue("remote_port", port)) {
- std::cerr << "Error in configuration file: remote_port is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << received_data_pkt.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width) << goodput.str();
+ getOutputStream() << std::right << std::setw(width) << loss_rate.str();
+ getOutputStream() << std::right << std::setw(width) << retx_sent.str();
+ getOutputStream() << std::right << std::setw(width)
+ << interest_sent.str();
+ getOutputStream() << std::right << std::setw(width) << nacks.str();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << queuing_delay.str();
+ getOutputStream() << std::right << std::setw(width) << lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_recovered_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << definitely_lost_data.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getCCStatus();
+ getOutputStream() << std::right << std::setw(width) << data_delay.str();
+ getOutputStream() << std::right << std::setw(width) << fec_pkt.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.isCongested();
+ getOutputStream() << std::right << std::setw(width)
+ << residual_losses.str();
+ getOutputStream() << std::right << std::setw(width)
+ << quality_score.str();
+ getOutputStream() << std::right << std::setw(width) << alerts.str();
+ getOutputStream() << std::right << std::setw(width) << auth_alerts.str()
+ << std::endl;
+
+ if (configuration_.test_mode_) {
+ if (saved_stats_.data_delays_.size() > 0)
+ saved_stats_.data_delays_.pop_back();
+
+ auto now = utils::SteadyTime::nowMs();
+ getOutputStream() << std::fixed << std::setprecision(0) << now.count()
+ << " DATA-DELAYS:[" << saved_stats_.data_delays_
+ << "]" << std::endl;
}
-
- if (!connector.lookupValue("interface", conn.interface)) {
- std::cerr << "Error in configuration file: interface is a "
- "mandatory field of Connectors."
- << std::endl;
- return false;
+ } else {
+ if ((header_counter_ == 0 && configuration_.print_headers_) || first_) {
+ getOutputStream() << std::right << std::setw(width) << "Interval[ms]";
+ getOutputStream() << std::right << std::setw(width) << "Transfer[MB]";
+ getOutputStream()
+ << std::right << std::setw(width) << "Bandwidth[Mbps]";
+ getOutputStream() << std::right << std::setw(width) << "Retr[pkt]";
+ getOutputStream() << std::right << std::setw(width) << "Cwnd[Int]";
+ getOutputStream()
+ << std::right << std::setw(width) << "AvgRtt[ms]" << std::endl;
+
+ first_ = false;
}
- conn.remote_port = (uint16_t)(port);
+ getOutputStream() << std::right << std::setw(width)
+ << interval_ms.str();
+ getOutputStream() << std::right << std::setw(width)
+ << bytes_transferred.str();
+ getOutputStream() << std::right << std::setw(width) << bandwidth.str();
+ getOutputStream() << std::right << std::setw(width)
+ << stats.getRetxCount();
+ getOutputStream() << std::right << std::setw(width) << window.str();
+ getOutputStream() << std::right << std::setw(width) << avg_rtt.str()
+ << std::endl;
+ }
- std::cout << "Adding connector " << conn.name << ", ("
- << conn.local_address << ":" << conn.local_port << " "
- << conn.remote_address << ":" << conn.remote_port << ")"
- << std::endl;
- config_.addConnector(conn.name, std::move(conn));
+ saved_stats_.total_duration_milliseconds_ +=
+ (uint32_t)exact_duration.count();
+ saved_stats_.old_bytes_value_ = stats.getBytesRecv();
+ saved_stats_.old_lost_data_value_ = stats.getLostData();
+ saved_stats_.old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ saved_stats_.old_definitely_lost_data_value_ =
+ stats.getDefinitelyLostData();
+ saved_stats_.old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ saved_stats_.old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ saved_stats_.old_retx_value_ = stats.getRetxCount();
+ saved_stats_.old_sent_int_value_ = stats.getInterestTx();
+ saved_stats_.old_received_nacks_value_ = stats.getReceivedNacks();
+ saved_stats_.old_fec_pkt_ = stats.getReceivedFEC();
+ saved_stats_.delay_sample_ = 0;
+ saved_stats_.avg_data_delay_ = 0;
+ saved_stats_.received_bytes_ = 0;
+ saved_stats_.received_data_pkt_ = 0;
+ saved_stats_.data_delays_ = "";
+ saved_stats_.t_stats_ = utils::SteadyTime::Clock::now();
+
+ header_counter_ = (header_counter_ + 1) & kheader_counter_mask();
+
+ if (--nb_iterations_ == 0) {
+ // We reached the maximum nb of runs. Stop now.
+ parent_.io_service_.stop();
}
}
- // Routes
- if (config.exists("routes")) {
- const Setting &routes = config.lookup("routes");
- auto count = routes.getLength();
+ /***************************************************************
+ * Setup functions
+ ***************************************************************/
- for (int i = 0; i < count; i++) {
- const Setting &route = routes[i];
- RouteConfig r;
- unsigned weight;
+ int setupRTCSocket() {
+ int ret = ERROR_SUCCESS;
- r.name = route.getName();
- route.lookupValue("prefix", r.prefix);
- route.lookupValue("weight", weight);
- route.lookupValue("main_connector", r.main_connector);
- route.lookupValue("backup_connector", r.backup_connector);
- r.weight = (uint16_t)(weight);
+ configuration_.transport_protocol_ = transport::interface::RTC;
- std::cout << "Adding route " << r.name << " " << r.prefix << " ("
- << r.main_connector << " " << r.backup_connector << " "
- << r.weight << ")" << std::endl;
- config_.addRoute(std::move(r));
+ if (configuration_.relay_ && configuration_.parallel_flows_ == 1) {
+ int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
+ producer_socket_ =
+ std::make_unique<ProducerSocket>(production_protocol);
+ producer_socket_->registerPrefix(configuration_.relay_name_);
+ producer_socket_->connect();
+ producer_socket_->start();
}
- }
- std::cout << "Ok" << std::endl;
-
- return true;
- }
-
- bool splitRoute(std::string route, std::string &prefix,
- uint8_t &prefix_length) {
- std::string delimiter = "/";
-
- size_t pos = 0;
- if ((pos = route.find(delimiter)) != std::string::npos) {
- prefix = route.substr(0, pos);
- route.erase(0, pos + delimiter.length());
- } else {
- return false;
- }
+ if (configuration_.output_stream_mode_ &&
+ configuration_.parallel_flows_ == 1) {
+ remote_ = asio::ip::udp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ socket_.open(asio::ip::udp::v4());
+ }
- prefix_length = std::stoul(route.substr(0));
- return true;
- }
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
-#ifdef FORWARDER_INTERFACE
- void onHicnServiceReady() override {
- std::cout << "Successfully connected to local forwarder!" << std::endl;
+ RtcTransportRecoveryStrategies recovery_strategy =
+ RtcTransportRecoveryStrategies::RTX_ONLY;
+ switch (configuration_.recovery_strategy_) {
+ case 1:
+ recovery_strategy = RtcTransportRecoveryStrategies::RECOVERY_OFF;
+ break;
+ case 2:
+ recovery_strategy = RtcTransportRecoveryStrategies::RTX_ONLY;
+ break;
+ case 3:
+ recovery_strategy = RtcTransportRecoveryStrategies::FEC_ONLY;
+ break;
+ case 4:
+ recovery_strategy = RtcTransportRecoveryStrategies::DELAY_BASED;
+ break;
+ case 5:
+ recovery_strategy = RtcTransportRecoveryStrategies::LOW_RATE;
+ break;
+ case 6:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH;
+ break;
+ case 7:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION;
+ break;
+ case 8:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::LOW_RATE_AND_ALL_FWD_STRATEGIES;
+ break;
+ case 9:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES;
+ break;
+ case 10:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH;
+ break;
+ case 11:
+ recovery_strategy =
+ RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION;
+ break;
+ default:
+ break;
+ }
- std::cout << "Setting up listeners" << std::endl;
- const char *config = getenv("FORWARDER_CONFIG");
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::RECOVERY_STRATEGY,
+ static_cast<uint32_t>(recovery_strategy));
- if (config) {
- if (!parseConfig(config)) {
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create faces and route using first face in the list.
- auto &routes = config_.getRoutes();
- auto &connectors = config_.getConnectors();
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- if (routes.size() == 0 || connectors.size() == 0) {
- std::cerr << "Nothing to configure" << std::endl;
- return;
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- for (auto &route : routes) {
- auto the_connector_it = connectors.find(route.main_connector);
- if (the_connector_it == connectors.end()) {
- std::cerr << "No valid main connector found for route " << route.name
- << std::endl;
- continue;
- }
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::CONTENT_SHARING_MODE,
+ configuration_.content_sharing_mode_);
- auto &the_connector = the_connector_it->second;
- auto route_info = std::make_shared<ForwarderInterface::RouteInfo>();
- route_info->family = AF_INET;
- route_info->local_addr = the_connector.local_address;
- route_info->local_port = the_connector.local_port;
- route_info->remote_addr = the_connector.remote_address;
- route_info->remote_port = the_connector.remote_port;
- route_info->interface = the_connector.interface;
- route_info->name = the_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- route_info->route_addr = prefix;
- route_info->route_len = prefix_length;
-
- main_routes_.emplace_back(route_info);
-
- if (!route.backup_connector.empty()) {
- // Add also the backup route
- auto the_backup_connector_it =
- connectors.find(route.backup_connector);
- if (the_backup_connector_it == connectors.end()) {
- std::cerr << "No valid backup connector found for route "
- << route.name << std::endl;
- continue;
- }
-
- auto &the_backup_connector = the_backup_connector_it->second;
- auto backup_route_info =
- std::make_shared<ForwarderInterface::RouteInfo>();
- backup_route_info->family = AF_INET;
- backup_route_info->local_addr = the_backup_connector.local_address;
- backup_route_info->local_port = the_backup_connector.local_port;
- backup_route_info->remote_addr = the_backup_connector.remote_address;
- backup_route_info->remote_port = the_backup_connector.remote_port;
- backup_route_info->interface = the_backup_connector.interface;
- backup_route_info->name = the_backup_connector.name;
-
- std::string prefix;
- uint8_t prefix_length;
- auto ret = splitRoute(route.prefix, prefix, prefix_length);
-
- if (!ret) {
- std::cerr << "Error parsing route" << std::endl;
- return;
- }
-
- backup_route_info->route_addr = prefix;
- backup_route_info->route_len = prefix_length;
-
- backup_routes_.emplace_back(backup_route_info);
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ (transport::interface::ConsumerContentObjectCallback)std::bind(
+ &Impl::ConsumerContext::checkReceivedRtcContent, this,
+ std::placeholders::_1, std::placeholders::_2));
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- // Create main routes
- std::cout << "Creating main routes" << std::endl;
- forwarder_interface_.createFaceAndRoutes(main_routes_);
- }
- }
+ std::shared_ptr<TransportStatistics> transport_stats;
+ ret = consumer_socket_->getSocketOption(
+ transport::interface::OtherOptions::STATISTICS,
+ (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
- void onRouteConfigured(
- std::vector<ForwarderInterface::RouteInfoPtr> &route_info) override {
- std::cout << "Routes successfully configured!" << std::endl;
- }
-#endif
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int setup() {
- int ret;
-
- if (configuration_.rtc_) {
- configuration_.transport_protocol_ = RTC;
- } else if (configuration_.window < 0) {
- configuration_.transport_protocol_ = RAAQM;
- } else {
- configuration_.transport_protocol_ = CBR;
+ return ERROR_SUCCESS;
}
- if (configuration_.relay_ && configuration_.rtc_) {
- int production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
- producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- producer_socket_->registerPrefix(configuration_.relay_name_);
- producer_socket_->connect();
- }
+ int setupRAAQMSocket() {
+ int ret = ERROR_SUCCESS;
- if (configuration_.output_stream_mode_ && configuration_.rtc_) {
- remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
- socket_.open(asio::ip::udp::v4());
- }
+ configuration_.transport_protocol_ = transport::interface::RAAQM;
- if (configuration_.secure_) {
- consumer_socket_ = std::make_unique<P2PSecureConsumerSocket>(
- RAAQM, configuration_.transport_protocol_);
- if (configuration_.producer_prefix_.getPrefixLength() == 0) {
- std::cerr << "ERROR -- Missing producer prefix on which perform the "
- "handshake."
- << std::endl;
- } else {
- P2PSecureConsumerSocket &secure_consumer_socket =
- *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get()));
- secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
- }
- } else {
consumer_socket_ =
std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- }
- consumer_socket_->setSocketOption(
- GeneralTransportOptions::INTEREST_LIFETIME,
- configuration_.interest_lifetime_);
-
-#if defined(DEBUG) && defined(__linux__)
- std::shared_ptr<transport::BasePortal> portal;
- consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- signals_ =
- std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1);
- signals_->async_wait([this](const std::error_code &, const int &) {
- std::cout << "Signal SIGUSR1!" << std::endl;
- mtrace();
- });
-#endif
-
- if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
- configuration_.window) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the size of the window."
- << std::endl;
- return ERROR_SETUP;
- }
-
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.beta != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
- configuration_.beta) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.beta_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::BETA_VALUE, configuration_.beta_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (configuration_.transport_protocol_ == RAAQM &&
- configuration_.drop_factor != -1.f) {
- if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- configuration_.drop_factor) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.drop_factor_ != -1.f) {
+ ret = consumer_socket_->setSocketOption(
+ RaaqmTransportOptions::DROP_FACTOR, configuration_.drop_factor_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
}
- }
- if (!configuration_.producer_certificate.empty()) {
- std::shared_ptr<Verifier> verifier = std::make_shared<AsymmetricVerifier>(
- configuration_.producer_certificate);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Verifier> verifier =
- std::make_shared<SymmetricVerifier>(configuration_.passphrase);
- if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) == SOCKET_OPTION_NOT_SET)
- return ERROR_SETUP;
- }
+ int setupCBRSocket() {
+ configuration_.transport_protocol_ = transport::interface::CBR;
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(&Impl::processLeavingInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ consumer_socket_ =
+ std::make_unique<ConsumerSocket>(configuration_.transport_protocol_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ return ERROR_SUCCESS;
}
- if (!configuration_.rtc_) {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
- } else {
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
- }
+ public:
+ int setup() {
+ int ret;
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::VoidVerifier>();
+
+ if (configuration_.rtc_) {
+ ret = setupRTCSocket();
+ } else if (configuration_.window_ < 0) {
+ ret = setupRAAQMSocket();
+ } else {
+ ret = setupCBRSocket();
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
- if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- (ConsumerContentObjectCallback)std::bind(
- &Impl::checkReceivedRtcContent, this, std::placeholders::_1,
- std::placeholders::_2));
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- }
-
- if (configuration_.rtc_) {
- std::shared_ptr<TransportStatistics> transport_stats;
- consumer_socket_->getSocketOption(
- OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
- transport_stats->setAlpha(0.0);
- }
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::STATS_SUMMARY,
- (ConsumerTimerCallback)std::bind(&Impl::handleTimerExpiration, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ configuration_.manifest_factor_relevant_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ configuration_.manifest_factor_alert_);
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::STATS_INTERVAL,
- configuration_.report_interval_milliseconds_) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- consumer_socket_->connect();
+ ret = consumer_socket_->setSocketOption(
+ GeneralTransportOptions::PACKET_FORMAT,
+ configuration_.packet_format_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- return ERROR_SUCCESS;
- }
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- int run() {
- std::cout << "Starting download of " << configuration_.name << std::endl;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ (StrategyCallback)[](
+ [[maybe_unused]] transport::interface::notification::Strategy
+ strategy){
+ // nothing to do
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- signals_.add(SIGINT);
- signals_.async_wait(
- [this](const std::error_code &, const int &) { io_service_.stop(); });
+ ret = consumer_socket_->setSocketOption(
+ transport::interface::CURRENT_WINDOW_SIZE, configuration_.window_);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ getOutputStream()
+ << "ERROR -- Impossible to set the size of the window."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name);
- io_service_.run();
+ if (!configuration_.producer_certificate_.empty()) {
+ verifier = std::make_shared<transport::auth::AsymmetricVerifier>(
+ configuration_.producer_certificate_);
+ }
- consumer_socket_->stop();
+ if (!configuration_.passphrase_.empty()) {
+ verifier = std::make_shared<transport::auth::SymmetricVerifier>(
+ configuration_.passphrase_);
+ }
- return ERROR_SUCCESS;
- }
+ verifier->setVerificationFailedCallback(
+ std::bind(&HIperfClient::Impl::ConsumerContext::onAuthFailed, this,
+ std::placeholders::_1, std::placeholders::_2));
- private:
- class RTCCallback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t mtu = 1500;
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- public:
- RTCCallback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
- }
+ // Signer for aggregatd interests
+ std::shared_ptr<transport::auth::Signer> signer =
+ std::make_shared<transport::auth::VoidSigner>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ signer = std::make_shared<transport::auth::SymmetricSigner>(
+ transport::auth::CryptoSuite::HMAC_SHA256,
+ configuration_.aggr_interest_passphrase_);
+ }
+ ret = consumer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- bool isBufferMovable() noexcept override { return false; }
+ if (configuration_.aggregated_interests_) {
+ ret = consumer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_INTERESTS, true);
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = mtu;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+ }
- void readDataAvailable(std::size_t length) noexcept override {
- client_.received_bytes_ += length;
- client_.received_data_pkt_++;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (transport::interface::ConsumerInterestCallback)std::bind(
+ &ConsumerContext::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
- // collecting delay stats. Just for performance testing
- uint64_t *senderTimeStamp =
- (uint64_t *)client_.configuration_.receive_buffer->writableData();
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- double new_delay = (double)(now - *senderTimeStamp);
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::READ_CALLBACK, this);
- if (*senderTimeStamp > now)
- new_delay = -1 * (double)(*senderTimeStamp - now);
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- client_.delay_sample_++;
- client_.avg_data_delay_ =
- client_.avg_data_delay_ +
- (new_delay - client_.avg_data_delay_) / client_.delay_sample_;
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::STATS_SUMMARY,
+ (transport::interface::ConsumerTimerCallback)std::bind(
+ &Impl::ConsumerContext::handleTimerExpiration, this,
+ std::placeholders::_1, std::placeholders::_2));
- if (client_.configuration_.test_mode_) {
- client_.data_delays_ += std::to_string(int(new_delay));
- client_.data_delays_ += ",";
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- if (client_.configuration_.relay_) {
- client_.producer_socket_->produceDatagram(
- client_.configuration_.relay_name_.getName(),
- client_.configuration_.receive_buffer->writableData(),
- length < 1400 ? length : 1400);
- }
- if (client_.configuration_.output_stream_mode_) {
- uint8_t *start =
- (uint8_t *)client_.configuration_.receive_buffer->writableData();
- start += sizeof(uint64_t);
- std::size_t pkt_len = length - sizeof(uint64_t);
- client_.socket_.send_to(asio::buffer(start, pkt_len), client_.remote_);
+ if (consumer_socket_->setSocketOption(
+ GeneralTransportOptions::STATS_INTERVAL,
+ configuration_.report_interval_milliseconds_) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
}
- }
- size_t maxBufferSize() const override { return mtu; }
+ consumer_socket_->connect();
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error while reading from RTC socket" << std::endl;
- client_.io_service_.stop();
+ return ERROR_SUCCESS;
}
- void readSuccess(std::size_t total_size) noexcept override {
- std::cout << "Data successfully read" << std::endl;
- }
+ /***************************************************************
+ * Run functions
+ ***************************************************************/
- private:
- Impl &client_;
- };
+ int run() {
+ getOutputStream() << "Starting download of " << flow_name_ << std::endl;
- class Callback : public ConsumerSocket::ReadCallback {
- public:
- Callback(Impl &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer =
- utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ saved_stats_.t_download_ = saved_stats_.t_stats_ =
+ utils::SteadyTime::now();
+ consumer_socket_->consume(flow_name_);
+
+ return ERROR_SUCCESS;
}
- bool isBufferMovable() noexcept override { return false; }
+ // Members initialized by the constructor
+ std::shared_ptr<utils::MemBuf> receive_buffer_;
+ asio::ip::udp::socket socket_;
+ std::size_t payload_size_max_;
+ asio::ip::udp::endpoint remote_;
+ std::uint32_t nb_iterations_;
- void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {
- *application_buffer =
- client_.configuration_.receive_buffer->writableData();
- *max_length = client_.configuration_.receive_buffer_size_;
- }
+ // Members initialized by in-class initializer
+ SavedStatistics saved_stats_{};
+ uint16_t header_counter_{0};
+ bool first_{true};
+ std::unique_ptr<ConsumerSocket> consumer_socket_;
+ std::unique_ptr<ProducerSocket> producer_socket_;
+ };
- void readDataAvailable(std::size_t length) noexcept override {}
+ public:
+ explicit Impl(const hiperf::ClientConfiguration &conf)
+ : config_(conf), signals_(io_service_) {}
- void readBufferAvailable(
- std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
+ virtual ~Impl() = default;
- size_t maxBufferSize() const override {
- return client_.configuration_.receive_buffer_size_;
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
}
- void readError(const std::error_code ec) noexcept override {
- std::cerr << "Error " << ec.message() << " while reading from socket"
- << std::endl;
- client_.io_service_.stop();
- }
+ consumer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = consumer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
- void readSuccess(std::size_t total_size) noexcept override {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
- long usec = (long)dt.count();
+ if (ret) {
+ break;
+ }
+ }
- std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
- << std::endl;
+ return ret;
+ }
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
+ int run() {
+ signals_.add(SIGINT);
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
- client_.io_service_.stop();
+ for (auto &consumer_context : consumer_contexts_) {
+ consumer_context.run();
}
- private:
- Impl &client_;
- };
+ io_service_.run();
+
+ return ERROR_SUCCESS;
+ }
- hiperf::ClientConfiguration configuration_;
- Time t_stats_;
- Time t_download_;
- uint32_t total_duration_milliseconds_;
- uint64_t old_bytes_value_;
- uint64_t old_interest_tx_value_;
- uint64_t old_fec_interest_tx_value_;
- uint64_t old_fec_data_rx_value_;
- uint64_t old_lost_data_value_;
- uint64_t old_bytes_recovered_value_;
- uint64_t old_definitely_lost_data_value_;
- uint32_t old_retx_value_;
- uint32_t old_sent_int_value_;
- uint32_t old_received_nacks_value_;
- uint32_t old_fec_pkt_;
-
- // IMPORTANT: to be used only for performance testing, when consumer and
- // producer are synchronized. Used for rtc only at the moment
- double avg_data_delay_;
- uint32_t delay_sample_;
-
- uint32_t received_bytes_;
- uint32_t received_data_pkt_;
-
- std::string data_delays_;
+ ClientConfiguration &getConfig() { return config_; }
+ private:
asio::io_service io_service_;
+ hiperf::ClientConfiguration config_;
asio::signal_set signals_;
- RTCCallback rtc_callback_;
- Callback callback_;
- std::unique_ptr<ConsumerSocket> consumer_socket_;
- std::unique_ptr<ProducerSocket> producer_socket_;
- asio::ip::udp::socket socket_;
- asio::ip::udp::endpoint remote_;
-
- ForwarderConfiguration config_;
- uint16_t switch_threshold_; /* ms */
- bool done_;
- std::vector<ForwarderInterface::RouteInfoPtr> main_routes_;
- std::vector<ForwarderInterface::RouteInfoPtr> backup_routes_;
-#ifdef FORWARDER_INTERFACE
- ForwarderInterface forwarder_interface_;
-#endif
+ std::vector<ConsumerContext> consumer_contexts_;
};
-HIperfClient::HIperfClient(const ClientConfiguration &conf) {
- impl_ = new Impl(conf);
-}
+HIperfClient::HIperfClient(const ClientConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfClient::~HIperfClient() { delete impl_; }
+HIperfClient::~HIperfClient() = default;
-int HIperfClient::setup() { return impl_->setup(); }
+int HIperfClient::setup() const { return impl_->setup(); }
-void HIperfClient::run() { impl_->run(); }
+void HIperfClient::run() const { impl_->run(); }
} // namespace hiperf
diff --git a/apps/hiperf/src/client.h b/apps/hiperf/src/client.h
index f45b9af43..beecbd473 100644
--- a/apps/hiperf/src/client.h
+++ b/apps/hiperf/src/client.h
@@ -16,19 +16,21 @@
#pragma once
#include <common.h>
+#include <hicn/transport/utils/noncopyable.h>
namespace hiperf {
-class HIperfClient {
+class HIperfClient : public ::utils::NonCopyable {
public:
- HIperfClient(const ClientConfiguration &conf);
+ explicit HIperfClient(const ClientConfiguration &conf);
+
~HIperfClient();
- int setup();
- void run();
+ int setup() const;
+ void run() const;
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
-} // namespace hiperf \ No newline at end of file
+} // namespace hiperf
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
index e6ba526f9..0f96bef1f 100644
--- a/apps/hiperf/src/common.h
+++ b/apps/hiperf/src/common.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,27 +15,28 @@
#pragma once
-#include <hicn/transport/auth/identity.h>
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/interfaces/global_conf_interface.h>
-#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/color.h>
#include <hicn/transport/utils/literals.h>
#ifndef _WIN32
#include <hicn/transport/utils/daemonizator.h>
#endif
+#include <hicn/apps/utils/logger.h>
+
#include <asio.hpp>
#include <cmath>
#include <fstream>
#include <iomanip>
+#include <iostream>
#include <sstream>
#include <string>
#include <unordered_set>
@@ -43,43 +44,151 @@
#ifndef ERROR_SUCCESS
#define ERROR_SUCCESS 0
#endif
-#define ERROR_SETUP -5
-#define MIN_PROBE_SEQ 0xefffffff
-
-using namespace transport::interface;
-using namespace transport::auth;
-using namespace transport::core;
-
-static inline uint64_t _ntohll(const uint64_t *input) {
- uint64_t return_val;
- uint8_t *tmp = (uint8_t *)&return_val;
-
- tmp[0] = *input >> 56;
- tmp[1] = *input >> 48;
- tmp[2] = *input >> 40;
- tmp[3] = *input >> 32;
- tmp[4] = *input >> 24;
- tmp[5] = *input >> 16;
- tmp[6] = *input >> 8;
- tmp[7] = *input >> 0;
-
- return return_val;
-}
+static constexpr int ERROR_SETUP = -5;
+static constexpr uint32_t MIN_PROBE_SEQ = 0xefffffff;
+static constexpr uint32_t RTC_HEADER_SIZE = 12;
+static constexpr uint32_t FEC_HEADER_MAX_SIZE = 36;
+static constexpr uint32_t HIPERF_MTU = 1500;
+
+namespace hiperf {
+
+using transport::core::Packet;
+using transport::core::Prefix;
+
+/**
+ * Logger
+ */
+template <typename D, typename ConfType, typename ParentType>
+class Base : public std::stringbuf, public std::ostream {
+ protected:
+ static inline const char separator[] = "| ";
+
+ Base(ParentType &parent, asio::io_service &io_service, int identifier)
+ : std::stringbuf(),
+ std::ostream(this),
+ parent_(parent),
+ configuration_(parent_.getConfig()),
+ io_service_(io_service),
+ identifier_(identifier),
+ name_id_(D::getContextType() + std::to_string(identifier_)),
+ flow_name_(configuration_.name_.makeNameWithIndex(identifier_)) {
+ std::stringstream begin;
+ std::stringstream end;
+ if (configuration_.colored_) {
+ begin << color_mod_ << bold_mod_;
+ end << end_mod_;
+ } else {
+ begin << "";
+ end << "";
+ }
+
+ begin << "|" << name_id_ << separator;
+ begin_ = begin.str();
+ end_ = end.str();
+ }
+
+ Base(Base &&other) noexcept
+ : parent_(other.parent_),
+ configuration_(other.configuration_),
+ io_service_(other.io_service_),
+ identifier_(other.identifier_),
+ name_id_(std::move(other.name_id_)),
+ flow_name_(other.flow_name_) {}
+
+ ~Base() {}
+
+ /***************************************************************
+ * std::stringbuf sync override
+ ***************************************************************/
+
+ int sync() override {
+ auto string = str();
+ asio::post(io_service_,
+ [this, string]() { LoggerInfo() << begin_ << string << end_; });
+ str("");
+
+ return 0;
+ }
+
+ std::ostream &getOutputStream() { return *this; }
-static inline uint64_t _htonll(const uint64_t *input) {
- return (_ntohll(input));
+ // Members initialized by the constructor
+ ParentType &parent_;
+ ConfType &configuration_;
+ asio::io_service &io_service_;
+ int identifier_;
+ std::string name_id_;
+ transport::core::Name flow_name_;
+ std::string begin_;
+ std::string end_;
+
+ // Members initialized by the in-class initializer
+ utils::ColorModifier color_mod_;
+ utils::ColorModifier bold_mod_{utils::ColorModifier::Code::BOLD};
+ utils::ColorModifier end_mod_{utils::ColorModifier::Code::RESET};
+};
+
+static inline int ensureFlows(const Prefix &prefix, std::size_t flows) {
+ int ret = ERROR_SUCCESS;
+
+ // Make sure the provided prefix length not allows to accomodate the
+ // provided number of flows.
+ uint16_t max_ip_addr_len_bits;
+ uint16_t log2_n_flow;
+ u64 max_n_flow;
+ if (prefix.getAddressFamily() == AF_INET) {
+ max_ip_addr_len_bits = IPV4_ADDR_LEN_BITS;
+ } else if (prefix.getAddressFamily() == AF_INET6) {
+ max_ip_addr_len_bits = IPV6_ADDR_LEN_BITS;
+ } else {
+ LoggerErr() << "Error: unknown address family.";
+ ret = ERROR_SETUP;
+ goto END;
+ }
+
+ log2_n_flow = max_ip_addr_len_bits - prefix.getPrefixLength();
+ max_n_flow = log2_n_flow < 64 ? (1 << log2_n_flow) : ~0ULL;
+
+ if (flows > max_n_flow) {
+ LoggerErr() << "Error: the provided prefix length does not allow to "
+ "accomodate the provided number of flows ("
+ << flows << " > " << max_n_flow << ").";
+ ret = ERROR_SETUP;
+ }
+
+END:
+ return ret;
}
-namespace hiperf {
+/**
+ * Class to retrieve the maximum payload size given the MTU and packet headers.
+ */
+class PayloadSize {
+ public:
+ PayloadSize(Packet::Format format, std::size_t mtu = HIPERF_MTU)
+ : mtu_(mtu), format_(format) {}
+
+ std::size_t getPayloadSizeMax(std::size_t transport_size = 0,
+ std::size_t fec_size = 0,
+ std::size_t signature_size = 0) {
+ return mtu_ - Packet::getHeaderSizeFromFormat(format_, signature_size) -
+ transport_size - fec_size;
+ }
+
+ private:
+ std::size_t mtu_;
+ Packet::Format format_;
+};
/**
* Class for handling the production rate for the RTC producer.
*/
class Rate {
public:
- Rate() : rate_kbps_(0) {}
+ Rate() {}
+ ~Rate() {}
- Rate(const std::string &rate) {
+ explicit Rate(const std::string &rate) {
std::size_t found = rate.find("kbps");
if (found != std::string::npos) {
rate_kbps_ = std::stof(rate.substr(0, found));
@@ -107,7 +216,7 @@ class Rate {
}
private:
- float rate_kbps_;
+ float rate_kbps_ = 0.0;
};
struct packet_t {
@@ -115,103 +224,71 @@ struct packet_t {
uint32_t size;
};
+struct Configuration {
+ Prefix name_{"b001::abcd/64"};
+ std::string passphrase_;
+ std::string aggr_interest_passphrase_;
+ bool rtc_{false};
+ uint16_t port_{0};
+ bool aggregated_data_{false};
+ Packet::Format packet_format_{
+ transport::interface::default_values::packet_format};
+ uint32_t parallel_flows_{1};
+ bool colored_{true};
+};
+
/**
* Container for command line configuration for hiperf client.
*/
-struct ClientConfiguration {
- ClientConfiguration()
- : name("b001::abcd", 0),
- beta(-1.f),
- drop_factor(-1.f),
- window(-1),
- producer_certificate(""),
- passphrase(""),
- receive_buffer(nullptr),
- receive_buffer_size_(128 * 1024),
- download_size(0),
- report_interval_milliseconds_(1000),
- transport_protocol_(CBR),
- rtc_(false),
- test_mode_(false),
- relay_(false),
- secure_(false),
- producer_prefix_(),
- interest_lifetime_(500),
- relay_name_("c001::abcd/64"),
- output_stream_mode_(false),
- port_(0) {}
-
- Name name;
- double beta;
- double drop_factor;
- double window;
- std::string producer_certificate;
- std::string passphrase;
- std::shared_ptr<utils::MemBuf> receive_buffer;
- std::size_t receive_buffer_size_;
- std::size_t download_size;
- std::uint32_t report_interval_milliseconds_;
- TransportProtocolAlgorithms transport_protocol_;
- bool rtc_;
- bool test_mode_;
- bool relay_;
- bool secure_;
+struct ClientConfiguration : Configuration {
+ double beta_{-1.f};
+ double drop_factor_{-1.f};
+ double window_{-1.f};
+ std::string producer_certificate_;
+ std::size_t receive_buffer_size_{128 * 1024};
+ std::uint32_t report_interval_milliseconds_{1000};
+ transport::interface::TransportProtocolAlgorithms transport_protocol_{
+ transport::interface::CBR};
+ bool test_mode_{false};
+ bool relay_{false};
Prefix producer_prefix_;
- uint32_t interest_lifetime_;
- Prefix relay_name_;
- bool output_stream_mode_;
- uint16_t port_;
+ uint32_t interest_lifetime_{500};
+ uint32_t manifest_factor_relevant_{100};
+ uint32_t manifest_factor_alert_{20};
+ Prefix relay_name_{"c001::abcd/64"};
+ bool output_stream_mode_{false};
+ uint32_t recovery_strategy_{4};
+ bool print_headers_{true};
+ std::uint32_t nb_iterations_{
+ std::numeric_limits<decltype(nb_iterations_)>::max()};
+ bool content_sharing_mode_{false};
+ bool aggregated_interests_{false};
};
/**
* Container for command line configuration for hiperf server.
*/
-struct ServerConfiguration {
- ServerConfiguration()
- : name("b001::abcd/64"),
- virtual_producer(true),
- manifest(false),
- live_production(false),
- content_lifetime(600000000_U32),
- download_size(20 * 1024 * 1024),
- hash_algorithm(CryptoHashType::SHA256),
- keystore_name(""),
- passphrase(""),
- keystore_password("cisco"),
- multiphase_produce_(false),
- rtc_(false),
- interactive_(false),
- trace_based_(false),
- trace_index_(0),
- trace_file_(nullptr),
- production_rate_(std::string("2048kbps")),
- payload_size_(1400),
- secure_(false),
- input_stream_mode_(false),
- port_(0) {}
-
- Prefix name;
- bool virtual_producer;
- bool manifest;
- bool live_production;
- std::uint32_t content_lifetime;
- std::uint32_t download_size;
- CryptoHashType hash_algorithm;
- std::string keystore_name;
- std::string passphrase;
- std::string keystore_password;
- bool multiphase_produce_;
- bool rtc_;
- bool interactive_;
- bool trace_based_;
- std::uint32_t trace_index_;
- char *trace_file_;
- Rate production_rate_;
- std::size_t payload_size_;
- bool secure_;
- bool input_stream_mode_;
- uint16_t port_;
+struct ServerConfiguration : Configuration {
+ bool virtual_producer_{true};
+ std::uint32_t manifest_max_capacity_{0};
+ bool live_production_{false};
+ std::uint32_t content_lifetime_{
+ transport::interface::default_values::content_object_expiry_time};
+ std::uint32_t download_size_{20 * 1024 * 1024};
+ transport::auth::CryptoHashType hash_algorithm_{
+ transport::auth::CryptoHashType::SHA256};
+ std::string keystore_name_;
+ std::string keystore_password_{"cisco"};
+ bool multiphase_produce_{false};
+ bool interactive_{false};
+ bool trace_based_{false};
+ std::uint32_t trace_index_{0};
+ char *trace_file_{nullptr};
+ Rate production_rate_{"2048kbps"};
+ std::size_t payload_size_{1384};
+ bool input_stream_mode_{false};
std::vector<struct packet_t> trace_;
+ std::string fec_type_;
};
} // namespace hiperf
diff --git a/apps/hiperf/src/forwarder_config.h b/apps/hiperf/src/forwarder_config.h
deleted file mode 100644
index 655ac3b66..000000000
--- a/apps/hiperf/src/forwarder_config.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-namespace hiperf {
-
-struct ListenerConfig {
- std::string address;
- std::uint16_t port;
- std::string interface;
- std::string name;
-};
-
-struct ConnectorConfig {
- std::string local_address;
- std::uint16_t local_port;
- std::string remote_address;
- std::uint16_t remote_port;
- std::string interface;
- std::string name;
-};
-
-struct RouteConfig {
- std::string prefix;
- uint16_t weight;
- std::string main_connector;
- std::string backup_connector;
- std::string name;
-};
-
-class ForwarderConfiguration {
- public:
- ForwarderConfiguration() : n_threads_(1) {}
-
- bool empty() {
- return listeners_.empty() && connectors_.empty() && routes_.empty();
- }
-
- ForwarderConfiguration &setThreadNumber(std::size_t threads) {
- n_threads_ = threads;
- return *this;
- }
-
- std::size_t getThreadNumber() { return n_threads_; }
-
- template <typename... Args>
- ForwarderConfiguration &addListener(Args &&...args) {
- listeners_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addConnector(const std::string &name,
- Args &&...args) {
- connectors_.emplace(name, std::forward<Args>(args)...);
- return *this;
- }
-
- template <typename... Args>
- ForwarderConfiguration &addRoute(Args &&...args) {
- routes_.emplace_back(std::forward<Args>(args)...);
- return *this;
- }
-
- std::vector<ListenerConfig> &getListeners() { return listeners_; }
-
- std::unordered_map<std::string, ConnectorConfig> &getConnectors() {
- return connectors_;
- }
-
- std::vector<RouteConfig> &getRoutes() { return routes_; }
-
- private:
- std::vector<ListenerConfig> listeners_;
- std::unordered_map<std::string, ConnectorConfig> connectors_;
- std::vector<RouteConfig> routes_;
- std::size_t n_threads_;
-};
-
-} \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.cc b/apps/hiperf/src/forwarder_interface.cc
deleted file mode 100644
index 864208239..000000000
--- a/apps/hiperf/src/forwarder_interface.cc
+++ /dev/null
@@ -1,676 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arpa/inet.h>
-#include <forwarder_interface.h>
-#include <hicn/transport/utils/log.h>
-
-#include <chrono>
-#include <iostream>
-#include <thread>
-#include <unordered_set>
-
-extern "C" {
-#include <hicn/error.h>
-#include <hicn/util/ip_address.h>
-}
-
-// XXX the main listener should be retrieve in this class at initialization, aka
-// when hICN becomes avialable
-//
-// XXX the main listener port will be retrieved in the forwarder
-// interface... everything else will be delayed until we have this
-// information
-
-namespace hiperf {
-
-ForwarderInterface::ForwarderInterface(asio::io_service &io_service,
- ICallback *callback)
- : external_ioservice_(io_service),
- forwarder_interface_callback_(callback),
- work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
- sock_(nullptr),
- thread_(std::make_unique<std::thread>([this]() {
- std::cout << "Starting Forwarder Interface thread" << std::endl;
- internal_ioservice_.run();
- std::cout << "Stopping Forwarder Interface thread" << std::endl;
- })),
- // set_route_callback_(std::forward<Callback &&>(setRouteCallback)),
- check_routes_timer_(nullptr),
- pending_add_route_counter_(0),
- hicn_listen_port_(9695),
- /* We start in disabled state even when a forwarder is always available */
- state_(State::Disabled),
- timer_(io_service),
- num_reattempts(0) {
- std::cout << "Forwarder interface created... connecting to forwarder...\n";
- internal_ioservice_.post([this]() { onHicnServiceAvailable(true); });
-}
-
-ForwarderInterface::~ForwarderInterface() {
- if (thread_ && thread_->joinable()) {
- internal_ioservice_.dispatch([this]() {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- work_.reset();
- });
-
- thread_->join();
- }
-
- std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl;
-}
-
-void ForwarderInterface::onHicnServiceAvailable(bool flag) {
- if (flag) {
- switch (state_) {
- case State::Disabled:
- case State::Requested:
- state_ = State::Available;
- case State::Available:
- connectToForwarder();
- /* Synchronous */
- if (state_ != State::Connected) {
- std::cout << "ConnectToForwarder failed" << std::endl;
- goto REATTEMPT;
- }
- state_ = State::Ready;
-
- std::cout << "Connected to forwarder... cancelling reconnection timer"
- << std::endl;
- timer_.cancel();
- num_reattempts = 0;
-
- // case State::Connected:
- // checkListener();
-
- // if (state_ != State::Ready) {
- // std::cout << "Listener not found" << std::endl;
- // goto REATTEMPT;
- // }
- // state_ = State::Ready;
-
- // timer_.cancel();
- // num_reattempts = 0;
-
- std::cout << "Forwarder interface is ready... communicate to controller"
- << std::endl;
-
- forwarder_interface_callback_->onHicnServiceReady();
-
- case State::Ready:
- break;
- }
- } else {
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
- state_ = State::Disabled; // XXX to be checked upon callback to prevent the
- // state from going forward (used to manage
- // concurrency)
- }
- return;
-
-REATTEMPT:
- /* Schedule reattempt */
- std::cout << "Failed to connect, scheduling reattempt" << std::endl;
- num_reattempts++;
-
- timer_.expires_from_now(
- std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS));
- // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable,
- // this, flag, std::placeholders::_1));
- timer_.async_wait([this, flag](std::error_code ec) {
- if (ec) return;
- onHicnServiceAvailable(flag);
- });
-}
-
-int ForwarderInterface::connectToForwarder() {
- sock_ = hc_sock_create();
- if (!sock_) {
- std::cout << "Could not create socket" << std::endl;
- goto ERR_SOCK;
- }
-
- if (hc_sock_connect(sock_) < 0) {
- std::cout << "Could not connect to forwarder" << std::endl;
- goto ERR;
- }
-
- std::cout << "Forwarder interface connected" << std::endl;
- state_ = State::Connected;
- return 0;
-
-ERR:
- hc_sock_free(sock_);
- sock_ = nullptr;
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::checkListener() {
- if (!sock_) return -1;
-
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) return -1;
-
- int ret = -1;
- foreach_listener(l, data) {
- std::string interface = std::string(l->interface_name);
- if (interface.compare("lo") != 0) {
- hicn_listen_port_ = l->local_port;
- state_ = State::Ready;
- ret = 0;
- std::cout << "Got listener port" << std::endl;
- break;
- }
- }
-
- hc_data_free(data);
- return ret;
-}
-
-void ForwarderInterface::close() {
- std::cout << "ForwarderInterface::close" << std::endl;
-
- state_ = State::Disabled;
- /* Cancelling eventual reattempts */
- timer_.cancel();
-
- if (sock_) {
- hc_sock_free(sock_);
- sock_ = nullptr;
- }
-
- internal_ioservice_.post([this]() { work_.reset(); });
-
- if (thread_->joinable()) {
- thread_->join();
- }
-}
-
-#if 0
-void ForwarderInterface::enableCheckRoutesTimer() {
- if (check_routes_timer_ != nullptr) return;
-
- check_routes_timer_ =
- std::make_unique<asio::steady_timer>(internal_ioservice_);
- checkRoutesLoop();
-}
-
-void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) {
- internalRemoveConnectedUser(protocol);
-}
-
-void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) {
- internal_ioservice_.post(
- [this, protocol]() { internalRemoveConnectedUser(protocol); });
-}
-#endif
-
-void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- createFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::createFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- pending_add_route_counter_++;
- auto timer = new asio::steady_timer(internal_ioservice_);
- internal_ioservice_.post([this, routes_info, timer]() {
- internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT,
- timer);
- });
-}
-
-void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) {
- std::vector<RouteInfoPtr> routes;
- routes.push_back(std::move(route_info));
- deleteFaceAndRoutes(routes);
-}
-
-void ForwarderInterface::deleteFaceAndRoutes(
- const std::vector<RouteInfoPtr> &routes_info) {
- internal_ioservice_.post([this, routes_info]() {
- for (auto &route : routes_info) {
- internalDeleteFaceAndRoute(route);
- }
- });
-}
-
-void ForwarderInterface::internalDeleteFaceAndRoute(
- const RouteInfoPtr &route_info) {
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) return;
-
- std::vector<hc_route_t *> routes_to_remove;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
-
- std::string route_addr(remote_addr);
- if (route_addr.compare(route_info->route_addr) == 0 &&
- r->len == route_info->route_len) {
- // route found
- routes_to_remove.push_back(r);
- }
- }
-
- if (routes_to_remove.size() == 0) {
- // nothing to do here
- hc_data_free(data);
- return;
- }
-
- std::unordered_set<uint32_t> connids_to_remove;
- for (unsigned i = 0; i < routes_to_remove.size(); i++) {
- connids_to_remove.insert(routes_to_remove[i]->face_id);
- if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
- std::cout << "Error removing route from forwarder." << std::endl;
- }
- }
-
- // remove connection
- if (hc_connection_list(sock_, &data) < 0) {
- hc_data_free(data);
- return;
- }
-
- // collects pointerst to the connections using the conn IDs
- std::vector<hc_connection_t *> conns_to_remove;
- foreach_connection(c, data) {
- if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
- // conn found
- conns_to_remove.push_back(c);
- }
- }
-
- if (conns_to_remove.size() == 0) {
- // nothing else to do here
- hc_data_free(data);
- return;
- }
-
- for (unsigned i = 0; i < conns_to_remove.size(); i++) {
- if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
- std::cout << "Error removing connection from forwarder." << std::endl;
- }
- }
-
- hc_data_free(data);
-}
-
-void ForwarderInterface::internalCreateFaceAndRoutes(
- const std::vector<RouteInfoPtr> &route_info, uint8_t max_try,
- asio::steady_timer *timer) {
- uint32_t face_id;
-
- std::vector<RouteInfoPtr> failed;
- for (auto &route : route_info) {
- int ret = tryToCreateFace(route.get(), &face_id);
- if (ret >= 0) {
- auto ret = tryToCreateRoute(route.get(), face_id);
- if (ret < 0) {
- failed.push_back(route);
- std::cerr << "Error creating route and face" << std::endl;
- continue;
- }
- }
- }
-
- if (failed.size() > 0) {
- if (max_try == 0) {
- /* All attempts failed */
- goto RESULT;
- }
- max_try--;
- timer->expires_from_now(std::chrono::milliseconds(500));
- timer->async_wait([this, failed, max_try, timer](std::error_code ec) {
- if (ec) return;
- internalCreateFaceAndRoutes(failed, max_try, timer);
- });
- return;
- }
-
-#if 0
- // route_status_[protocol] = std::move(route_info);
- for (size_t i = 0; i < route_info.size(); i++) {
- route_status_.insert(
- std::pair<ClientId, RouteInfoPtr>(protocol, std::move(route_info[i])));
- }
-#endif
-
-RESULT:
- std::cout << "Face / Route create ok, now calling back protocol" << std::endl;
- pending_add_route_counter_--;
- external_ioservice_.post([this, r = std::move(route_info)]() mutable {
- forwarder_interface_callback_->onRouteConfigured(r);
- });
- delete timer;
-}
-
-int ForwarderInterface::tryToCreateFace(RouteInfo *route_info,
- uint32_t *face_id) {
- bool found = false;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateFace] socket error"
- << std::endl;
- goto ERR_SOCK;
- }
-
- // get listeners list
- hc_data_t *data;
- if (hc_listener_list(sock_, &data) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners";
- goto ERR_LIST;
- }
-
- char _local_address[128];
- foreach_listener(l, data) {
- std::cout << "Processing " << l->interface_name << std::endl;
- std::string interface = std::string(l->interface_name);
- int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET);
- if (ret < 0) {
- std::cerr << "Error in ip_address_ntop" << std::endl;
- goto ERR;
- }
-
- std::string local_address = std::string(_local_address);
- uint16_t local_port = l->local_port;
-
- if (interface.compare(route_info->interface) == 0 &&
- local_address.compare(route_info->local_addr) == 0 &&
- local_port == route_info->local_port) {
- found = true;
- break;
- }
- }
-
- std::cout << route_info->remote_addr << std::endl;
-
- ip_address_t local_address, remote_address;
- ip_address_pton(route_info->local_addr.c_str(), &local_address);
- ip_address_pton(route_info->remote_addr.c_str(), &remote_address);
-
- if (!found) {
- // Create listener
- hc_listener_t listener;
- memset(&listener, 0, sizeof(hc_listener_t));
-
- std::string name = "l_" + route_info->name;
- listener.local_addr = local_address;
- listener.type = CONNECTION_TYPE_UDP;
- listener.family = AF_INET;
- listener.local_port = route_info->local_port;
- strncpy(listener.name, name.c_str(), sizeof(listener.name));
- strncpy(listener.interface_name, route_info->interface.c_str(),
- sizeof(listener.interface_name));
-
- std::cout << "------------> " << route_info->interface << std::endl;
-
- int ret = hc_listener_create(sock_, &listener);
-
- if (ret < 0) {
- std::cerr << "Error creating listener." << std::endl;
- return -1;
- } else {
- std::cout << "Listener " << listener.id << " created." << std::endl;
- }
- }
-
- // Create face
- hc_face_t face;
- memset(&face, 0, sizeof(hc_face_t));
-
- // crate face with the local interest
- face.face.type = FACE_TYPE_UDP;
- face.face.family = route_info->family;
- face.face.local_addr = local_address;
- face.face.remote_addr = remote_address;
- face.face.local_port = route_info->local_port;
- face.face.remote_port = route_info->remote_port;
-
- if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) <
- 0) {
- std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] "
- "netdevice_set_name "
- "("
- << face.face.netdevice.name << ", "
- << route_info->interface << ") error" << std::endl;
- goto ERR;
- }
-
- // create face
- if (hc_face_create(sock_, &face) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateFace] error creating face";
- goto ERR;
- }
-
- std::cout << "Face created successfully" << std::endl;
-
- // assing face to the return value
- *face_id = face.id;
-
- hc_data_free(data);
- return 0;
-
-ERR:
- hc_data_free(data);
-ERR_LIST:
-ERR_SOCK:
- return -1;
-}
-
-int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info,
- uint32_t face_id) {
- std::cout << "Trying to create route" << std::endl;
-
- // check connection with the forwarder
- if (!sock_) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] socket error";
- return -1;
- }
-
- ip_address_t route_ip;
- hc_route_t route;
-
- if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error";
- return -1;
- }
-
- route.face_id = face_id;
- route.family = AF_INET6;
- route.remote_addr = route_ip;
- route.len = route_info->route_len;
- route.cost = 1;
-
- if (hc_route_create(sock_, &route) < 0) {
- std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route";
- return -1;
- }
-
- std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl;
- return 0;
-}
-
-#if 0 // not used
-void ForwarderInterface::checkRoutesLoop() {
- check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000));
- check_routes_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- if (pending_add_route_counter_ == 0) checkRoutes();
- });
-}
-
-void ForwarderInterface::checkRoutes() {
- std::cout << "someone called the checkRoutes function" << std::endl;
- if (!sock_) return;
-
- hc_data_t *data;
- if (hc_route_list(sock_, &data) < 0) {
- return;
- }
-
- std::unordered_set<std::string> routes_set;
- foreach_route(r, data) {
- char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
- if (ret < 0) continue;
- std::string route(std::string(remote_addr) + "/" + std::to_string(r->len));
- routes_set.insert(route);
- }
-
- for (auto it = route_status_.begin(); it != route_status_.end(); it++) {
- std::string route(it->second->route_addr + "/" +
- std::to_string(it->second->route_len));
- if (routes_set.find(route) == routes_set.end()) {
- // the route is missing
- createFaceAndRoute(it->second, it->first);
- break;
- }
- }
-
- hc_data_free(data);
-}
-#endif
-
-#if 0
- using ListenerRetrievedCallback =
- std::function<void(std::error_code, uint32_t)>;
-
- ListenerRetrievedCallback listener_retrieved_callback_;
-
-#ifdef __ANDROID__
- hicn_listen_port_(9695),
-#else
- hicn_listen_port_(0),
-#endif
- timer_(forward_engine_.getIoService()),
-
- void initConfigurationProtocol(void)
- {
- // We need the configuration, which is different for every protocol...
- // so we move this step down towards the protocol implementation itself.
- if (!permanent_hicn) {
- doInitConfigurationProtocol();
- } else {
- // XXX This should be moved somewhere else
- getMainListener(
- [this](std::error_code ec, uint32_t hicn_listen_port) {
- if (!ec)
- {
- hicn_listen_port_ = hicn_listen_port;
- doInitConfigurationProtocol();
- }
- });
- }
- }
-
- template <typename Callback>
- void getMainListener(Callback &&callback)
- {
- listener_retrieved_callback_ = std::forward<Callback &&>(callback);
- tryToConnectToForwarder();
- }
- private:
- void doGetMainListener(std::error_code ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.getMainListenerPort();
- if (ret <= 0)
- {
- // Since without the main listener of the forwarder the proxy cannot
- // work, we can stop the program here until we get the listener port.
- std::cout <<
- "Could not retrieve main listener port from the forwarder. "
- "Retrying.";
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doGetMainListener, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- hicn_listen_port_ = uint16_t(ret);
- listener_retrieved_callback_(
- make_error_code(configuration_error::success), hicn_listen_port_);
- }
- }
- else
- {
- std::cout << "Timer for retrieving main hicn listener canceled." << std::endl;
- }
- }
-
- void tryToConnectToForwarder()
- {
- doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
- }
-
- void doTryToConnectToForwarder(std::error_code ec)
- {
- if (!ec)
- {
- // ec == 0 --> timer expired
- int ret = forwarder_interface_.connect();
- if (ret < 0)
- {
- // We were not able to connect to the local forwarder. Do not give up
- // and retry.
- std::cout << "Could not connect to local forwarder. Retrying." << std::endl;
-
- timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
- timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this,
- std::placeholders::_1));
- }
- else
- {
- timer_.cancel();
- retx_count_ = 0;
- doGetMainListener(std::make_error_code(std::errc(0)));
- }
- }
- else
- {
- std::cout << "Timer for re-trying forwarder connection canceled." << std::endl;
- }
- }
-
-
- template <typename ProtocolImplementation>
- constexpr uint32_t Protocol<ProtocolImplementation>::RETRY_INTERVAL;
-
-#endif
-
-constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS;
-constexpr uint32_t ForwarderInterface::MAX_REATTEMPT;
-
-} // namespace hiperf \ No newline at end of file
diff --git a/apps/hiperf/src/forwarder_interface.h b/apps/hiperf/src/forwarder_interface.h
deleted file mode 100644
index 7591ea257..000000000
--- a/apps/hiperf/src/forwarder_interface.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-extern "C" {
-#ifndef WITH_POLICY
-#define WITH_POLICY
-#endif
-#include <hicn/ctrl/api.h>
-#include <hicn/util/ip_address.h>
-}
-
-#ifndef ASIO_STANDALONE
-#define ASIO_STANDALONE
-#endif
-#include <asio.hpp>
-
-#include <functional>
-#include <thread>
-#include <unordered_map>
-
-namespace hiperf {
-
-class ForwarderInterface {
- static const uint32_t REATTEMPT_DELAY_MS = 500;
- static const uint32_t MAX_REATTEMPT = 10;
-
- public:
- struct RouteInfo {
- int family;
- std::string local_addr;
- uint16_t local_port;
- std::string remote_addr;
- uint16_t remote_port;
- std::string route_addr;
- uint8_t route_len;
- std::string interface;
- std::string name;
- };
-
- using RouteInfoPtr = std::shared_ptr<RouteInfo>;
-
- class ICallback {
- public:
- virtual void onHicnServiceReady() = 0;
- virtual void onRouteConfigured(std::vector<RouteInfoPtr> &route_info) = 0;
- };
-
- enum class State {
- Disabled, /* Stack is stopped */
- Requested, /* Stack is starting */
- Available, /* Forwarder is running */
- Connected, /* Control socket connected */
- Ready, /* Listener present */
- };
-
- public:
- ForwarderInterface(asio::io_service &io_service, ICallback *callback);
-
- ~ForwarderInterface();
-
- State getState();
-
- void setState(State state);
-
- void onHicnServiceAvailable(bool flag);
-
- void enableCheckRoutesTimer();
-
- void createFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void createFaceAndRoute(const RouteInfoPtr &route_info);
-
- void deleteFaceAndRoutes(const std::vector<RouteInfoPtr> &routes_info);
-
- void deleteFaceAndRoute(const RouteInfoPtr &route_info);
-
- void close();
-
- uint16_t getHicnListenerPort() { return hicn_listen_port_; }
-
- private:
- int connectToForwarder();
-
- int checkListener();
-
- void internalCreateFaceAndRoutes(const std::vector<RouteInfoPtr> &route_info,
- uint8_t max_try, asio::steady_timer *timer);
-
- void internalDeleteFaceAndRoute(const RouteInfoPtr &routes_info);
-
- int tryToCreateFace(RouteInfo *RouteInfo, uint32_t *face_id);
- int tryToCreateRoute(RouteInfo *RouteInfo, uint32_t face_id);
-
- void checkRoutesLoop();
-
- void checkRoutes();
-
- asio::io_service &external_ioservice_;
- asio::io_service internal_ioservice_;
- ICallback *forwarder_interface_callback_;
- std::unique_ptr<asio::io_service::work> work_;
- hc_sock_t *sock_;
- std::unique_ptr<std::thread> thread_;
- // SetRouteCallback set_route_callback_;
- // std::unordered_multimap<ProtocolPtr, RouteInfoPtr> route_status_;
- std::unique_ptr<asio::steady_timer> check_routes_timer_;
- uint32_t pending_add_route_counter_;
- uint16_t hicn_listen_port_;
-
- State state_;
-
- /* Reattempt timer */
- asio::steady_timer timer_;
- unsigned num_reattempts;
-};
-
-} // namespace hiperf
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
index b2d99c4a4..25c1a288c 100644
--- a/apps/hiperf/src/main.cc
+++ b/apps/hiperf/src/main.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,133 +14,184 @@
*/
#include <client.h>
+#include <hicn/apps/utils/logger.h>
#include <server.h>
-#include <forwarder_interface.h>
namespace hiperf {
+using transport::auth::CryptoHashType;
+
+static std::unordered_map<std::string, hicn_packet_format_t> const
+ packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP},
+ {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP},
+ {"new", HICN_PACKET_FORMAT_NEW}};
+
+std::string str_tolower(std::string s) {
+ std::transform(s.begin(), s.end(), s.begin(),
+ [](unsigned char c) { return std::tolower(c); });
+ return s;
+}
+
void usage() {
- std::cerr << "HIPERF - A tool for performing network throughput "
- "measurements with hICN"
- << std::endl;
- std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl;
- std::cerr << std::endl;
- std::cerr << "SERVER OR CLIENT:" << std::endl;
+ LoggerInfo() << "HIPERF - Instrumentation tool for performing active network"
+ "measurements with hICN";
+ LoggerInfo() << "usage: hiperf [-S|-C] [options] [prefix|name]";
+ LoggerInfo();
+ LoggerInfo() << "SERVER OR CLIENT:";
#ifndef _WIN32
- std::cerr << "-D\t\t\t\t\t"
- << "Run as a daemon" << std::endl;
- std::cerr << "-R\t\t\t\t\t"
- << "Run RTC protocol (client or server)" << std::endl;
- std::cerr << "-f\t<filename>\t\t\t"
- << "Log file" << std::endl;
- std::cerr << "-z\t<io_module>\t\t\t"
- << "IO module to use. Default: hicnlight_module" << std::endl;
+ LoggerInfo() << "-D\t\t\t\t\t"
+ << "Run as a daemon";
+ LoggerInfo() << "-R\t\t\t\t\t"
+ << "Run RTC protocol (client or server)";
+ LoggerInfo() << "-f\t<filename>\t\t\t"
+ << "Log file";
+ LoggerInfo() << "-z\t<io_module>\t\t\t"
+ << "IO module to use. Default: hicnlight_module";
+ LoggerInfo() << "-F\t<conf_file>\t\t\t"
+ << "Path to optional configuration file for libtransport";
+ LoggerInfo() << "-a\t\t\t\t\t"
+ << "Enables data packet aggregation. "
+ << "Works only in RTC mode";
+ LoggerInfo() << "-X\t<param>\t\t\t\t"
+ << "Set FEC params. Options are Rely_K#_N# or RS_K#_N#";
+ LoggerInfo()
+ << "-J\t<passphrase>\t\t\t"
+ << "Set the passphrase used to sign/verify aggregated interests. "
+ "If set on the client, aggregated interests are enable automatically.";
#endif
- std::cerr << std::endl;
- std::cerr << "SERVER SPECIFIC:" << std::endl;
- std::cerr << "-A\t<content_size>\t\t\t"
- "Size of the content to publish. This "
- "is not the size of the packet (see -s for it)."
- << std::endl;
- std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
- << std::endl;
- std::cerr << "-r\t\t\t\t\t"
- << "Produce real content of <content_size> bytes" << std::endl;
- std::cerr << "-m\t\t\t\t\t"
- << "Produce transport manifest" << std::endl;
- std::cerr << "-l\t\t\t\t\t"
- << "Start producing content upon the reception of the "
- "first interest"
- << std::endl;
- std::cerr << "-K\t<keystore_path>\t\t\t"
- << "Path of p12 file containing the "
- "crypto material used for signing packets"
- << std::endl;
- std::cerr << "-k\t<passphrase>\t\t\t"
- << "String from which a 128-bit symmetric key will be "
- "derived for signing packets"
- << std::endl;
- std::cerr << "-y\t<hash_algorithm>\t\t"
- << "Use the selected hash algorithm for "
- "calculating manifest digests"
- << std::endl;
- std::cerr << "-p\t<password>\t\t\t"
- << "Password for p12 keystore" << std::endl;
- std::cerr << "-x\t\t\t\t\t"
- << "Produce a content of <content_size>, then after downloading "
- "it produce a new content of"
- << "\n\t\t\t\t\t<content_size> without resetting "
- "the suffix to 0."
- << std::endl;
- std::cerr << "-B\t<bitrate>\t\t\t"
- << "Bitrate for RTC producer, to be used with the -R option."
- << std::endl;
+ LoggerInfo();
+ LoggerInfo() << "SERVER SPECIFIC:";
+ LoggerInfo()
+ << "-A\t<content_size>\t\t\t"
+ "Sends an application data unit in bytes that is published once "
+ "before exit";
+ LoggerInfo() << "-E\t<expiry_time>\t\t\t"
+ "Expiration time for data packets generated by the producer "
+ "socket";
+ LoggerInfo() << "-s\t<packet_size>\t\t\tData packet payload size.";
+ LoggerInfo() << "-r\t\t\t\t\t"
+ << "Produce real content of <content_size> bytes";
+ LoggerInfo()
+ << "-m\t<manifest_max_capacity>\t\t"
+ << "The maximum number of entries a manifest can contain. Set it "
+ "to 0 to disable manifests. Default is 30, max is 255.";
+ LoggerInfo() << "-l\t\t\t\t\t"
+ << "Start producing content upon the reception of the "
+ "first interest";
+ LoggerInfo() << "-K\t<keystore_path>\t\t\t"
+ << "Path of p12 file containing the "
+ "crypto material used for signing packets";
+ LoggerInfo() << "-k\t<passphrase>\t\t\t"
+ << "String from which a 128-bit symmetric key will be "
+ "derived for signing packets";
+ LoggerInfo() << "-p\t<password>\t\t\t"
+ << "Password for p12 keystore";
+ LoggerInfo() << "-y\t<hash_algorithm>\t\t"
+ << "Use the selected hash algorithm for "
+ "computing manifest digests (default: SHA256)";
+ LoggerInfo() << "-x\t\t\t\t\t"
+ << "Produces application data units of size <content_size> "
+ << "without resetting the name suffix to 0.";
+ LoggerInfo() << "-B\t<bitrate>\t\t\t"
+ << "RTC producer data bitrate, to be used with the -R option.";
#ifndef _WIN32
- std::cerr << "-I\t\t\t\t\t"
- "Interactive mode, start/stop real time content production "
- "by pressing return. To be used with the -R option"
- << std::endl;
- std::cerr
+ LoggerInfo() << "-I\t\t\t\t\t"
+ "Interactive mode, start/stop real time content production "
+ "by pressing return. To be used with the -R option";
+ LoggerInfo()
<< "-T\t<filename>\t\t\t"
"Trace based mode, hiperf takes as input a file with a trace. "
"Each line of the file indicates the timestamp and the size of "
"the packet to generate. To be used with the -R option. -B and -I "
- "will be ignored."
- << std::endl;
- std::cerr << "-E\t\t\t\t\t"
- << "Enable encrypted communication. Requires the path to a p12 "
- "file containing the "
- "crypto material used for the TLS handshake"
- << std::endl;
- std::cerr << "-G\t<port>\t\t\t"
- << "input stream from localhost at the specified port" << std::endl;
+ "will be ignored.";
+ LoggerInfo() << "-G\t<port>\t\t\t\t"
+ << "Input stream from localhost at the specified port";
#endif
- std::cerr << std::endl;
- std::cerr << "CLIENT SPECIFIC:" << std::endl;
- std::cerr << "-b\t<beta_parameter>\t\t"
- << "RAAQM beta parameter" << std::endl;
- std::cerr << "-d\t<drop_factor_parameter>\t\t"
- << "RAAQM drop factor "
- "parameter"
- << std::endl;
- std::cerr << "-L\t<interest lifetime>\t\t"
- << "Set interest lifetime." << std::endl;
- std::cerr << "-M\t<input_buffer_size>\t\t"
- << "Size of consumer input buffer. If 0, reassembly of packets "
- "will be disabled."
- << std::endl;
- std::cerr << "-W\t<window_size>\t\t\t"
- << "Use a fixed congestion window "
- "for retrieving the data."
- << std::endl;
- std::cerr << "-i\t<stats_interval>\t\t"
- << "Show the statistics every <stats_interval> milliseconds."
- << std::endl;
- std::cerr << "-c\t<certificate_path>\t\t"
- << "Path of the producer certificate to be used for verifying the "
- "origin of the packets received."
- << std::endl;
- std::cerr << "-k\t<passphrase>\t\t\t"
- << "String from which is derived the symmetric key used by the "
- "producer to sign packets and by the consumer to verify them."
- << std::endl;
- std::cerr << "-t\t\t\t\t\t"
- "Test mode, check if the client is receiving the "
- "correct data. This is an RTC specific option, to be "
- "used with the -R (default false)"
- << std::endl;
- std::cerr << "-P\t\t\t\t\t"
- << "Prefix of the producer where to do the handshake" << std::endl;
- std::cerr << "-j\t<relay_name>\t\t\t"
- << "Publish the received content under the name relay_name."
- "This is an RTC specific option, to be "
- "used with the -R (default false)"
- << std::endl;
- std::cerr << "-g\t<port>\t\t\t"
- << "output stream to localhost at the specified port" << std::endl;
+ LoggerInfo();
+ LoggerInfo() << "CLIENT SPECIFIC:";
+ LoggerInfo() << "-b\t<beta_parameter>\t\t"
+ << "RAAQM beta parameter";
+ LoggerInfo() << "-d\t<drop_factor_parameter>\t\t"
+ << "RAAQM drop factor "
+ "parameter";
+ LoggerInfo() << "-L\t<interest lifetime>\t\t"
+ << "Set interest lifetime.";
+ LoggerInfo() << "-U\t<factor>\t\t\t"
+ << "Update the relevance threshold: if an unverified packet has "
+ "been received before the last U * manifest_max_capacity_ "
+ "packets received (verified or not), it will be flushed out. "
+ "Should be > 1, default is 100.";
+ LoggerInfo()
+ << "-u\t<factor>\t\t\t"
+ << "Update the alert threshold: if the "
+ "number of unverified packet is > u * manifest_max_capacity_, "
+ "an alert is raised. Should be set such that U > u >= 1, "
+ "default is 20. If u >= U, no alert will ever be raised.";
+ LoggerInfo() << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled.";
+ LoggerInfo()
+ << "-N\t\t\t\t\t"
+ << "Enable aggregated interests; the number of suffixes (including "
+ "the one in the header) can be set through the env variable "
+ "`MAX_AGGREGATED_INTERESTS`.";
+ LoggerInfo() << "-W\t<window_size>\t\t\t"
+ << "Use a fixed congestion window "
+ "for retrieving the data.";
+ LoggerInfo() << "-i\t<stats_interval>\t\t"
+ << "Show the statistics every <stats_interval> milliseconds.";
+ LoggerInfo()
+ << "-c\t<certificate_path>\t\t"
+ << "Path of the producer certificate to be used for verifying the "
+ "origin of the packets received.";
+ LoggerInfo()
+ << "-k\t<passphrase>\t\t\t"
+ << "String from which is derived the symmetric key used by the "
+ "producer to sign packets and by the consumer to verify them.";
+ LoggerInfo() << "-t\t\t\t\t\t"
+ "Test mode, check if the client is receiving the "
+ "correct data. This is an RTC specific option, to be "
+ "used with the -R (default: false)";
+ LoggerInfo()
+ << "-P\t\t\t\t\t"
+ << "Number of parallel streams. For hiperf client, this is the "
+ "number of consumer to create, while for hiperf server this is "
+ "the number of producers to create.";
+ LoggerInfo() << "-j\t<relay_name>\t\t\t"
+ << "Publish received content under the name relay_name."
+ "This is an RTC specific option, to be "
+ "used with the -R (default: false)";
+ LoggerInfo() << "-g\t<port>\t\t\t\t"
+ << "Output stream to localhost at the specified port";
+ LoggerInfo()
+ << "-o\t\t\t\t\t"
+ << "Content sharing mode: if set the socket work in content sharing"
+ << "mode. It works only in RTC mode";
+ LoggerInfo() << "-e\t<strategy>\t\t\t"
+ << "Enhance the network with a reliability strategy. Options";
+ LoggerInfo() << "\t\t\t\t\t\t1: unreliable ";
+ LoggerInfo() << "\t\t\t\t\t\t2: rtx only ";
+ LoggerInfo() << "\t\t\t\t\t\t3: fec only ";
+ LoggerInfo() << "\t\t\t\t\t\t4: delay based ";
+ LoggerInfo() << "\t\t\t\t\t\t5: low rate ";
+ LoggerInfo() << "\t\t\t\t\t\t6: low rate and best path ";
+ LoggerInfo() << "\t\t\t\t\t\t7: low rate and replication";
+ LoggerInfo() << "\t\t\t\t\t\t8: low rate and best path/replication ";
+ LoggerInfo() << "\t\t\t\t\t\t9: only fec low residual losses ";
+ LoggerInfo() << "\t\t\t\t\t\t10: delay and best path ";
+ LoggerInfo() << "\t\t\t\t\t\t11: delay and replication ";
+ LoggerInfo() << "\t\t\t\t\t\t(default: 2 = rtx only) ";
+ LoggerInfo() << "-H\t\t\t\t\t"
+ << "Disable periodic print headers in stats report.";
+ LoggerInfo() << "-n\t<nb_iterations>\t\t\t"
+ << "Print the stats report <nb_iterations> times and exit.\n"
+ << "\t\t\t\t\tThis option limits the duration of the run to "
+ "<nb_iterations> * <stats_interval> milliseconds.";
+ LoggerInfo() << "-w <packet_format> Packet format (without signature, "
+ "defaults to IPV6_TCP)";
}
-int main(int argc, char *argv[]) {
+int hiperf_main(int argc, char *argv[]) {
#ifndef _WIN32
// Common
bool daemon = false;
@@ -149,11 +200,13 @@ int main(int argc, char *argv[]) {
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
+ transport::interface::global_config::GlobalConfigInterface global_conf;
+
// -1 server, 0 undefined, 1 client
int role = 0;
int options = 0;
- char *log_file = nullptr;
+ const char *log_file = nullptr;
transport::interface::global_config::IoModuleConfiguration config;
std::string conf_file;
config.name = "hicnlight_module";
@@ -166,10 +219,11 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(
- argc, argv,
- "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:g:G:")) !=
- -1) {
+ // Please keep in alphabetical order.
+ while (
+ (opt = getopt(argc, argv,
+ "A:B:CDE:F:G:HIJ:K:L:M:NP:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:"
+ "n:op:qrs:tu:vw:xy:z:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -202,11 +256,16 @@ int main(int argc, char *argv[]) {
break;
}
#else
+ // Please keep in alphabetical order.
while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:j:")) !=
- -1) {
+ "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:op:rs:"
+ "tu:vwxy:z:")) != -1) {
switch (opt) {
#endif
+ case 'E': {
+ server_configuration.content_lifetime_ = std::stoul(optarg);
+ break;
+ }
case 'f': {
log_file = optarg;
break;
@@ -216,6 +275,30 @@ int main(int argc, char *argv[]) {
server_configuration.rtc_ = true;
break;
}
+ case 'a': {
+ client_configuration.aggregated_data_ = true;
+ server_configuration.aggregated_data_ = true;
+ break;
+ }
+ case 'o': {
+ client_configuration.content_sharing_mode_ = true;
+ break;
+ }
+ case 'w': {
+ std::string packet_format_s = std::string(optarg);
+ packet_format_s = str_tolower(packet_format_s);
+ auto it = packet_format_map.find(std::string(optarg));
+ if (it == packet_format_map.end())
+ throw std::runtime_error("Bad packet format");
+ client_configuration.packet_format_ = it->second;
+ server_configuration.packet_format_ = it->second;
+ break;
+ }
+ case 'k': {
+ server_configuration.passphrase_ = std::string(optarg);
+ client_configuration.passphrase_ = std::string(optarg);
+ break;
+ }
case 'z': {
config.name = optarg;
break;
@@ -234,25 +317,31 @@ int main(int argc, char *argv[]) {
role += 1;
break;
}
- case 'k': {
- server_configuration.passphrase = std::string(optarg);
- client_configuration.passphrase = std::string(optarg);
+ case 'q': {
+ client_configuration.colored_ = server_configuration.colored_ = false;
+ break;
+ }
+ case 'J': {
+ client_configuration.aggr_interest_passphrase_ = optarg;
+ server_configuration.aggr_interest_passphrase_ = optarg;
+ // Consumer signature is only used with aggregated interests,
+ // hence enabling it also forces usage of aggregated interests
+ client_configuration.aggregated_interests_ = true;
break;
}
-
// Client specifc
case 'b': {
- client_configuration.beta = std::stod(optarg);
+ client_configuration.beta_ = std::stod(optarg);
options = 1;
break;
}
case 'd': {
- client_configuration.drop_factor = std::stod(optarg);
+ client_configuration.drop_factor_ = std::stod(optarg);
options = 1;
break;
}
case 'W': {
- client_configuration.window = std::stod(optarg);
+ client_configuration.window_ = std::stod(optarg);
options = 1;
break;
}
@@ -261,13 +350,17 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'N': {
+ client_configuration.aggregated_interests_ = true;
+ break;
+ }
case 'P': {
- client_configuration.producer_prefix_ = Prefix(optarg);
- client_configuration.secure_ = true;
+ client_configuration.parallel_flows_ =
+ server_configuration.parallel_flows_ = std::stoull(optarg);
break;
}
case 'c': {
- client_configuration.producer_certificate = std::string(optarg);
+ client_configuration.producer_certificate_ = std::string(optarg);
options = 1;
break;
}
@@ -286,15 +379,35 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'U': {
+ client_configuration.manifest_factor_relevant_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'u': {
+ client_configuration.manifest_factor_alert_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
case 'j': {
client_configuration.relay_ = true;
client_configuration.relay_name_ = Prefix(optarg);
options = 1;
break;
}
+ case 'H': {
+ client_configuration.print_headers_ = false;
+ options = 1;
+ break;
+ }
+ case 'n': {
+ client_configuration.nb_iterations_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
// Server specific
case 'A': {
- server_configuration.download_size = std::stoul(optarg);
+ server_configuration.download_size_ = std::stoul(optarg);
options = -1;
break;
}
@@ -304,43 +417,44 @@ int main(int argc, char *argv[]) {
break;
}
case 'r': {
- server_configuration.virtual_producer = false;
+ server_configuration.virtual_producer_ = false;
options = -1;
break;
}
case 'm': {
- server_configuration.manifest = true;
+ server_configuration.manifest_max_capacity_ = std::stoul(optarg);
options = -1;
break;
}
case 'l': {
- server_configuration.live_production = true;
+ server_configuration.live_production_ = true;
options = -1;
break;
}
case 'K': {
- server_configuration.keystore_name = std::string(optarg);
+ server_configuration.keystore_name_ = std::string(optarg);
options = -1;
break;
}
case 'y': {
+ CryptoHashType hash_algorithm = CryptoHashType::SHA256;
if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA256;
+ hash_algorithm = CryptoHashType::SHA256;
} else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::SHA512;
+ hash_algorithm = CryptoHashType::SHA512;
} else if (strncasecmp(optarg, "blake2b512", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2B512;
+ hash_algorithm = CryptoHashType::BLAKE2B512;
} else if (strncasecmp(optarg, "blake2s256", 10) == 0) {
- server_configuration.hash_algorithm = CryptoHashType::BLAKE2S256;
+ hash_algorithm = CryptoHashType::BLAKE2S256;
} else {
- std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
- << std::endl;
+ LoggerWarn() << "Unknown hash algorithm. Using SHA 256.";
}
+ server_configuration.hash_algorithm_ = hash_algorithm;
options = -1;
break;
}
case 'p': {
- server_configuration.keystore_password = std::string(optarg);
+ server_configuration.keystore_password_ = std::string(optarg);
options = -1;
break;
}
@@ -356,12 +470,16 @@ int main(int argc, char *argv[]) {
options = -1;
break;
}
- case 'E': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.secure_ = true;
+ case 'e': {
+ client_configuration.recovery_strategy_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'X': {
+ server_configuration.fec_type_ = std::string(optarg);
+ options = -1;
break;
}
- case 'h':
default:
usage();
return EXIT_FAILURE;
@@ -369,34 +487,31 @@ int main(int argc, char *argv[]) {
}
if (options > 0 && role < 0) {
- std::cerr << "Client options cannot be used when using the "
- "software in server mode"
- << std::endl;
+ LoggerErr() << "Client options cannot be used when using the "
+ "software in server mode";
usage();
return EXIT_FAILURE;
} else if (options < 0 && role > 0) {
- std::cerr << "Server options cannot be used when using the "
- "software in client mode"
- << std::endl;
+ LoggerErr() << "Server options cannot be used when using the "
+ "software in client mode";
usage();
return EXIT_FAILURE;
} else if (!role) {
- std::cerr << "Please specify if running hiperf as client "
- "or server."
- << std::endl;
+ LoggerErr() << "Please specify if running hiperf as client "
+ "or server.";
usage();
return EXIT_FAILURE;
}
if (argv[optind] == 0) {
- std::cerr << "Please specify the name/prefix to use." << std::endl;
+ LoggerErr() << "Please specify the name/prefix to use.";
usage();
return EXIT_FAILURE;
} else {
if (role > 0) {
- client_configuration.name = Name(argv[optind]);
+ client_configuration.name_ = Prefix(argv[optind]);
} else {
- server_configuration.name = Prefix(argv[optind]);
+ server_configuration.name_ = Prefix(argv[optind]);
}
}
@@ -427,7 +542,7 @@ int main(int argc, char *argv[]) {
config.set();
// Parse config file
- transport::interface::global_config::parseConfigurationFile(conf_file);
+ global_conf.parseConfigurationFile(conf_file);
if (role > 0) {
HIperfClient c(client_configuration);
@@ -453,4 +568,4 @@ int main(int argc, char *argv[]) {
} // namespace hiperf
-int main(int argc, char *argv[]) { return hiperf::main(argc, argv); }
+int main(int argc, char *argv[]) { return hiperf::hiperf_main(argc, argv); }
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
index 968d42e2c..3f6c335f9 100644
--- a/apps/hiperf/src/server.cc
+++ b/apps/hiperf/src/server.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -17,167 +17,92 @@
namespace hiperf {
+using transport::core::ContentObject;
+using transport::core::Interest;
+using transport::core::Name;
+using transport::interface::GeneralTransportOptions;
+using transport::interface::ProducerCallbacksOptions;
+using transport::interface::ProducerInterestCallback;
+using transport::interface::ProducerSocket;
+using transport::interface::ProductionProtocolAlgorithms;
+
/**
* Hiperf server class: configure and setup an hicn producer following the
* ServerConfiguration.
*/
class HIperfServer::Impl {
- const std::size_t log2_content_object_buffer_size = 8;
-
- public:
- Impl(const hiperf::ServerConfiguration &conf)
- : configuration_(conf),
- signals_(io_service_),
- rtc_timer_(io_service_),
- unsatisfied_interests_(),
- content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
- content_objects_index_(0),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- last_segment_(0),
-#ifndef _WIN32
- ptr_last_segment_(&last_segment_),
- input_(io_service_),
- rtc_running_(false),
-#else
- ptr_last_segment_(&last_segment_),
-#endif
- flow_name_(configuration_.name.getName()),
- socket_(io_service_),
- recv_buffer_(nullptr, 0) {
- std::string buffer(configuration_.payload_size_, 'X');
- std::cout << "Producing contents under name " << conf.name.getName()
- << std::endl;
-#ifndef _WIN32
- if (configuration_.interactive_) {
- input_.assign(::dup(STDIN_FILENO));
- }
-#endif
-
- for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = ContentObject::Ptr(
- new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
- (const uint8_t *)buffer.data(), buffer.size()));
- content_objects_[i]->setLifetime(
- default_values::content_object_expiry_time);
- }
- }
-
- void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
- content_objects_[content_objects_index_ & mask_]->setName(
- interest.getName());
- producer_socket_->produce(
- *content_objects_[content_objects_index_++ & mask_]);
+ static constexpr std::size_t klog2_content_object_buffer_size() { return 8; }
+ static constexpr std::size_t kcontent_object_buffer_size() {
+ return (1 << klog2_content_object_buffer_size());
}
-
- void processInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)VOID_HANDLER);
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
-
- produceContent(p, interest.getName(), interest.getName().getSuffix());
- std::cout << "Received interest " << interest.getName().getSuffix()
- << std::endl;
+ static constexpr std::size_t kmask() {
+ return (kcontent_object_buffer_size() - 1);
}
- void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
- std::placeholders::_1,
- std::placeholders::_2));
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
- uint32_t suffix = interest.getName().getSuffix();
-
- if (suffix == 0) {
- last_segment_ = 0;
- ptr_last_segment_ = &last_segment_;
- unsatisfied_interests_.clear();
- }
-
- // The suffix will either be the one from the received interest or the
- // smallest suffix of a previous interest not satisfed
- if (!unsatisfied_interests_.empty()) {
- auto it =
- std::lower_bound(unsatisfied_interests_.begin(),
- unsatisfied_interests_.end(), *ptr_last_segment_);
- if (it != unsatisfied_interests_.end()) {
- suffix = *it;
+ /**
+ * @brief As we can (potentially) setup many producer sockets, we need to keep
+ * a separate context for each one of them. The context contains parameters
+ * and variable that are specific to a single producer socket.
+ */
+ class ProducerContext
+ : public Base<ProducerContext, ServerConfiguration, Impl>,
+ public ProducerSocket::Callback {
+ public:
+ using ConfType = ServerConfiguration;
+ using ParentType = typename HIperfServer::Impl;
+ static inline const auto getContextType() { return "ProducerContext"; }
+
+ ProducerContext(HIperfServer::Impl &server, int producer_identifier)
+ : Base(server, server.io_service_, producer_identifier) {
+ // Allocate buffer to copy as content objects payload
+ std::string buffer(configuration_.payload_size_, 'X');
+
+ // Allocate array of content objects. They are share_ptr so that the
+ // transport will only capture a reference to them instead of performing
+ // an hard copy.
+ for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) {
+ const auto &element =
+ content_objects_.emplace_back(std::make_shared<ContentObject>(
+ configuration_.name_.makeName(), configuration_.packet_format_,
+ 0, (const uint8_t *)buffer.data(), buffer.size()));
+ element->setLifetime(
+ transport::interface::default_values::content_object_expiry_time);
}
- unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
- std::cout << "Received interest " << interest.getName().getSuffix()
- << ", starting production at " << suffix << std::endl;
- std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
- << std::endl;
- produceContentAsync(p, interest.getName(), suffix);
- }
-
- void produceContent(ProducerSocket &p, const Name &content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
- uint32_t total;
-
- utils::TimePoint t0 = utils::SteadyClock::now();
- total = p.produceStream(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- utils::TimePoint t1 = utils::SteadyClock::now();
-
- std::cout
- << "Written " << total
- << " data packets in output buffer (Segmentation time: "
- << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count()
- << " us)" << std::endl;
- }
-
- void produceContentAsync(ProducerSocket &p, Name content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
+ // To make vector happy (move or copy constructor is needed when vector
+ // resizes)
+ ProducerContext(ProducerContext &&other) noexcept
+ : Base(std::move(other)),
+ content_objects_(std::move(other.content_objects_)),
+ unsatisfied_interests_(std::move(other.unsatisfied_interests_)),
+ last_segment_(other.last_segment_),
+ producer_socket_(std::move(other.producer_socket_)),
+ content_objects_index_(other.content_objects_index_),
+ payload_size_max_(other.payload_size_max_) {}
- p.asyncProduce(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix,
- &ptr_last_segment_);
- }
+ virtual ~ProducerContext() = default;
- void cacheMiss(ProducerSocket &p, const Interest &interest) {
- unsatisfied_interests_.push_back(interest.getName().getSuffix());
- }
+ /**
+ * @brief Produce datagram
+ */
+ void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const {
+ assert(producer_socket_);
- void onContentProduced(ProducerSocket &p, const std::error_code &err,
- uint64_t bytes_written) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(
- &Impl::asyncProcessInterest, this,
- std::placeholders::_1, std::placeholders::_2));
- }
+ auto size = std::min(buffer_size, payload_size_max_);
- std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path,
- std::string &keystore_pwd,
- CryptoHashType &hash_type) {
- if (access(keystore_path.c_str(), F_OK) != -1) {
- return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type);
+ producer_socket_->produceDatagram(flow_name_, buffer, size);
}
- return std::make_shared<Identity>(keystore_path, keystore_pwd,
- CryptoSuite::RSA_SHA256, 1024, 365,
- "producer-test");
- }
- int setup() {
- int ret;
- int production_protocol;
-
- if (configuration_.secure_) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
- producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
- configuration_.rtc_, identity);
- } else {
+ /**
+ * @brief Create and setup the producer socket
+ */
+ int setup() {
+ int ret;
+ int production_protocol;
+ std::shared_ptr<transport::auth::Signer> signer =
+ std::make_shared<transport::auth::VoidSigner>();
+
if (!configuration_.rtc_) {
production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
} else {
@@ -185,210 +110,470 @@ class HIperfServer::Impl {
}
producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- }
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ if (producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "Failed to set producer callback." << std::endl;
+ return ERROR_SETUP;
+ }
- if (!configuration_.passphrase.empty()) {
- std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>(
- CryptoSuite::HMAC_SHA256, configuration_.passphrase);
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
- signer);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::HASH_ALGORITHM,
+ configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_MAX_CAPACITY,
+ configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(transport::interface::PACKET_FORMAT,
+ configuration_.packet_format_) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.passphrase_.empty()) {
+ signer = std::make_shared<transport::auth::SymmetricSigner>(
+ transport::auth::CryptoSuite::HMAC_SHA256,
+ configuration_.passphrase_);
+ }
+
+ if (!configuration_.keystore_name_.empty()) {
+ signer = std::make_shared<transport::auth::AsymmetricSigner>(
+ configuration_.keystore_name_, configuration_.keystore_password_);
+ }
- if (!configuration_.keystore_name.empty()) {
- auto identity = getProducerIdentity(configuration_.keystore_name,
- configuration_.keystore_password,
- configuration_.hash_algorithm);
- std::shared_ptr<Signer> signer = identity->getSigner();
producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
signer);
- }
- uint32_t rtc_header_size = 0;
- if (configuration_.rtc_) rtc_header_size = 12;
- producer_socket_->setSocketOption(
- GeneralTransportOptions::DATA_PACKET_SIZE,
- (uint32_t)(
- configuration_.payload_size_ + rtc_header_size +
- (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60)));
- producer_socket_->registerPrefix(configuration_.name);
- producer_socket_->connect();
-
- if (configuration_.rtc_) {
- std::cout << "Running RTC producer: the prefix length will be ignored."
- " Use /128 by default in RTC mode"
- << std::endl;
- return ERROR_SUCCESS;
- }
+ // Compute maximum payload size
+ Packet::Format format = configuration_.packet_format_;
+ if (!configuration_.manifest_max_capacity_)
+ format = Packet::toAHFormat(format);
+ payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
+ configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
+ configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
+ !configuration_.manifest_max_capacity_
+ ? signer->getSignatureFieldSize()
+ : 0);
+
+ if (configuration_.payload_size_ > payload_size_max_) {
+ getOutputStream() << "WARNING: Payload has size "
+ << configuration_.payload_size_ << ", maximum is "
+ << payload_size_max_
+ << ". Payload will be truncated to fit." << std::endl;
+ }
+
+ // Verifier for aggregated interests
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::VoidVerifier>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ verifier = std::make_unique<transport::auth::SymmetricVerifier>(
+ configuration_.aggr_interest_passphrase_);
+ }
+ ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(
+ transport::interface::RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
- if (!configuration_.virtual_producer) {
if (producer_socket_->setSocketOption(
GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ producer_socket_->registerPrefix(Prefix(flow_name_, 128));
+ producer_socket_->connect();
+ producer_socket_->start();
+
+ if (configuration_.rtc_) {
+ return ERROR_SUCCESS;
}
- if (!configuration_.live_production) {
- produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ if (!configuration_.virtual_producer_) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_SEGMENT_SIZE,
+ static_cast<uint32_t>(configuration_.payload_size_)) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production_) {
+ produceContent(*producer_socket_, configuration_.name_.makeName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
} else {
ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)bind(
+ &ProducerContext::virtualProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
}
- } else {
- ret = producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (transport::interface::ProducerContentCallback)bind(
+ &ProducerContext::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- ret = producer_socket_->setSocketOption(
+ return ERROR_SUCCESS;
+ }
+
+ int run() {
+ getOutputStream() << "started to serve consumers with name " << flow_name_
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ void stop() {
+ getOutputStream() << "stopped to serve consumers" << std::endl;
+ producer_socket_->stop();
+ }
+
+ private:
+ /**
+ * @brief Produce an existing content object. Set the name as the
+ * interest.
+ */
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & kmask()]->setName(
+ interest.getName());
+ p.produce(*content_objects_[content_objects_index_++ & kmask()]);
+ }
+
+ /**
+ * @brief Create and produce a buffer of configuration_.download_size_
+ * length.
+ */
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) const {
+ uint32_t total;
+
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
+
+ LoggerInfo() << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
+ << std::endl;
+ }
+
+ /**
+ * @brief Synchronously produce content upon reception of one interest
+ */
+ void processInterest(ProducerSocket &p, const Interest &interest) const {
+ p.setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)transport::interface::VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ LoggerInfo() << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ /**
+ * @brief Async create and produce a buffer of
+ * configuration_.download_size_ length.
+ */
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ parent_.produce_thread_.add([this, suffix, content_name, &p]() {
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ last_segment_ =
+ suffix + p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_,
+ suffix);
+ });
+ }
+
+ /**
+ * @brief Asynchronously produce content upon reception of one interest
+ */
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::cacheMiss, this,
+ std::placeholders::_1, std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either come from the received interest or will be set
+ // to the smallest suffix of a previous interest not satisfied
+ if (!unsatisfied_interests_.empty()) {
+ auto it = std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
+
+ getOutputStream() << " Received interest "
+ << interest.getName().getSuffix()
+ << ", starting production at " << suffix << end_mod_
+ << std::endl;
+ getOutputStream() << unsatisfied_interests_.size()
+ << " interests still unsatisfied" << end_mod_
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
+ }
+
+ /**
+ * @brief Register cache miss events
+ */
+ void cacheMiss([[maybe_unused]] const ProducerSocket &p,
+ const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CONTENT_PRODUCED,
- (ProducerContentCallback)bind(
- &Impl::onContentProduced, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ /**
+ * @brief When content is produced, set cache miss callback so that we can
+ * register any cache miss happening after the production.
+ */
+ void onContentProduced(ProducerSocket &p,
+ [[maybe_unused]] const std::error_code &err,
+ [[maybe_unused]] uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
- return ERROR_SUCCESS;
+ /**
+ * @brief Internal producer error. When this callback is triggered
+ * something important happened. Here we stop the program.
+ */
+ void produceError(const std::error_code &err) noexcept override {
+ getOutputStream() << "Error from producer transport: " << err.message()
+ << std::endl;
+ parent_.stop();
+ }
+
+ // Members initialized in constructor
+ std::vector<ContentObject::Ptr> content_objects_;
+
+ // Members initialized by in-class initializer
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::uint32_t last_segment_{0};
+ std::unique_ptr<ProducerSocket> producer_socket_{nullptr};
+ std::uint16_t content_objects_index_{0};
+ std::size_t payload_size_max_{0};
+ };
+
+ public:
+ explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) {
+#ifndef _WIN32
+ if (config_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ std::memset(rtc_payload_.data(), 'X', rtc_payload_.size());
+ }
+
+ ~Impl() = default;
+
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ producer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = producer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
+
+ if (ret) {
+ break;
+ }
+ }
+
+ return ret;
}
void receiveStream() {
socket_.async_receive_from(
- asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
- [this](std::error_code ec, std::size_t length) {
+ asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()),
+ remote_, [this](const std::error_code &ec, std::size_t length) {
if (ec) return;
- sendRTCContentFromStream(recv_buffer_.first, length);
+ sendRTCContentFromStream(recv_buffer_.writableData(), length);
receiveStream();
});
}
- void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+ void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- uint8_t *start = (uint8_t *)payload->writableData();
+ auto now = utils::SystemTime::nowMs().count();
+
+ auto start = rtc_payload_.data();
std::memcpy(start, &now, sizeof(uint64_t));
std::memcpy(start + sizeof(uint64_t), buff, len);
- producer_socket_->produceDatagram(flow_name_, start,
- len + sizeof(uint64_t));
+
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, len + sizeof(uint64_t));
+ }
}
- void sendRTCContentObjectCallback(std::error_code ec) {
+ void sendRTCContentObjectCallback(const std::error_code &ec) {
if (ec) return;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ auto start = rtc_payload_.data();
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ std::memcpy(start, &now, sizeof(uint64_t));
- producer_socket_->produceDatagram(
- flow_name_, payload->data(),
- payload->length() < 1400 ? payload->length() : 1400);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, config_.payload_size_);
+ }
}
- void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) {
if (ec) return;
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
-
- uint32_t packet_len =
- configuration_.trace_[configuration_.trace_index_].size;
+ std::size_t packet_len = config_.trace_[config_.trace_index_].size;
// this is used to compute the data packet delay
// used only for performance evaluation
// it requires clock synchronization between producer and consumer
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ auto start = rtc_payload_.data();
+ std::memcpy(start, &now, sizeof(uint64_t));
- if (packet_len > payload->length()) packet_len = payload->length();
- if (packet_len > 1400) packet_len = 1400;
+ if (packet_len > config_.payload_size_) {
+ packet_len = config_.payload_size_;
+ }
- producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, packet_len);
+ }
- uint32_t next_index = configuration_.trace_index_ + 1;
+ uint32_t next_index = config_.trace_index_ + 1;
uint64_t schedule_next;
- if (next_index < configuration_.trace_.size()) {
- schedule_next =
- configuration_.trace_[next_index].timestamp -
- configuration_.trace_[configuration_.trace_index_].timestamp;
+ if (next_index < config_.trace_.size()) {
+ schedule_next = config_.trace_[next_index].timestamp -
+ config_.trace_[config_.trace_index_].timestamp;
} else {
// here we need to loop, schedule in a random time
schedule_next = 1000;
}
- configuration_.trace_index_ =
- (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size();
rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
}
+ int parseTraceFile() {
+ std::ifstream trace(config_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ config_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
#ifndef _WIN32
void handleInput(const std::error_code &error, std::size_t length) {
if (error) {
- producer_socket_->stop();
- io_service_.stop();
+ stop();
}
if (rtc_running_) {
- std::cout << "stop real time content production" << std::endl;
+ LoggerInfo() << "stop real time content production" << std::endl;
rtc_running_ = false;
rtc_timer_.cancel();
} else {
- std::cout << "start real time content production" << std::endl;
+ LoggerInfo() << "start real time content production" << std::endl;
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
}
@@ -401,46 +586,33 @@ class HIperfServer::Impl {
}
#endif
- int parseTraceFile() {
- std::ifstream trace(configuration_.trace_file_);
- if (trace.fail()) {
- return -1;
- }
- std::string line;
- while (std::getline(trace, line)) {
- std::istringstream iss(line);
- hiperf::packet_t packet;
- iss >> packet.timestamp >> packet.size;
- configuration_.trace_.push_back(packet);
+ void stop() {
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.stop();
}
- return 0;
+
+ io_service_.stop();
}
int run() {
- std::cerr << "Starting to serve consumers" << std::endl;
-
signals_.add(SIGINT);
- signals_.async_wait([this](const std::error_code &, const int &) {
- std::cout << "STOPPING!!" << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- });
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { stop(); });
- if (configuration_.rtc_) {
-#ifndef _WIN32
- if (configuration_.interactive_) {
+ if (config_.rtc_) {
+ if (config_.interactive_) {
asio::async_read_until(
input_, input_buffer_, '\n',
std::bind(&Impl::handleInput, this, std::placeholders::_1,
std::placeholders::_2));
- } else if (configuration_.trace_based_) {
- std::cout << "trace-based mode enabled" << std::endl;
- if (configuration_.trace_file_ == nullptr) {
- std::cout << "cannot find the trace file" << std::endl;
+ } else if (config_.trace_based_) {
+ LoggerInfo() << "trace-based mode enabled" << std::endl;
+ if (config_.trace_file_ == nullptr) {
+ LoggerErr() << "cannot find the trace file" << std::endl;
return ERROR_SETUP;
}
if (parseTraceFile() < 0) {
- std::cout << "cannot parse the trace file" << std::endl;
+ LoggerErr() << "cannot parse the trace file" << std::endl;
return ERROR_SETUP;
}
rtc_running_ = true;
@@ -448,31 +620,26 @@ class HIperfServer::Impl {
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
- } else if (configuration_.input_stream_mode_) {
+ } else if (config_.input_stream_mode_) {
rtc_running_ = true;
- // crate socket
+ // create socket
remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ asio::ip::address::from_string("127.0.0.1"), config_.port_);
socket_.open(asio::ip::udp::v4());
socket_.bind(remote_);
- recv_buffer_.first = (uint8_t *)malloc(1500);
- recv_buffer_.second = 1500;
receiveStream();
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
this, std::placeholders::_1));
}
-#else
- rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
- rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
- std::placeholders::_1));
-#endif
+ }
+
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.run();
}
io_service_.run();
@@ -480,34 +647,31 @@ class HIperfServer::Impl {
return ERROR_SUCCESS;
}
+ ServerConfiguration &getConfig() { return config_; }
+
private:
- hiperf::ServerConfiguration configuration_;
+ // Variables initialized by the constructor.
+ ServerConfiguration config_;
+
+ // Variable initialized in the in-class initializer list.
asio::io_service io_service_;
- asio::signal_set signals_;
- asio::steady_timer rtc_timer_;
- std::vector<uint32_t> unsatisfied_interests_;
- std::vector<std::shared_ptr<ContentObject>> content_objects_;
- std::uint16_t content_objects_index_;
- std::uint16_t mask_;
- std::uint32_t last_segment_;
- std::uint32_t *ptr_last_segment_;
- std::unique_ptr<ProducerSocket> producer_socket_;
-#ifndef _WIN32
- asio::posix::stream_descriptor input_;
+ asio::signal_set signals_{io_service_};
+ asio::steady_timer rtc_timer_{io_service_};
+ asio::posix::stream_descriptor input_{io_service_};
+ asio::ip::udp::socket socket_{io_service_};
+ std::vector<ProducerContext> producer_contexts_;
+ ::utils::EventThread produce_thread_;
asio::streambuf input_buffer_;
- bool rtc_running_;
- Name flow_name_;
- asio::ip::udp::socket socket_;
+ bool rtc_running_{false};
asio::ip::udp::endpoint remote_;
- std::pair<uint8_t *, std::size_t> recv_buffer_;
-#endif
+ utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU};
+ std::array<uint8_t, HIPERF_MTU> rtc_payload_;
};
-HIperfServer::HIperfServer(const ServerConfiguration &conf) {
- impl_ = new Impl(conf);
-}
+HIperfServer::HIperfServer(const ServerConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfServer::~HIperfServer() { delete impl_; }
+HIperfServer::~HIperfServer() = default;
int HIperfServer::setup() { return impl_->setup(); }
diff --git a/apps/hiperf/src/server.h b/apps/hiperf/src/server.h
index 05407a807..d7420da48 100644
--- a/apps/hiperf/src/server.h
+++ b/apps/hiperf/src/server.h
@@ -21,14 +21,15 @@ namespace hiperf {
class HIperfServer {
public:
- HIperfServer(const ServerConfiguration &conf);
+ explicit HIperfServer(const ServerConfiguration &conf);
+
~HIperfServer();
int setup();
void run();
private:
class Impl;
- Impl *impl_;
+ std::unique_ptr<Impl> impl_;
};
} // namespace hiperf \ No newline at end of file
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
index 66b9c1bab..80f671567 100644
--- a/apps/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -11,8 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
-set(CMAKE_CXX_STANDARD 14)
+##############################################################
+# Compiler options
+##############################################################
+set(COMPILER_OPTIONS
+ PRIVATE ${DEFAULT_COMPILER_OPTIONS}
+)
# -Wno-c99-designator issue
#
@@ -22,20 +26,22 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
EXECUTE_PROCESS( COMMAND ${CMAKE_CXX_COMPILER} --version OUTPUT_VARIABLE clang_full_version_string )
string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION_STRING ${clang_full_version_string})
if (CLANG_VERSION_STRING VERSION_GREATER_EQUAL 11)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-c99-designator")
+ list(APPEND COMPILER_OPTIONS
+ "-Wno-c99-designator"
+ )
endif()
endif()
-set(CMAKE_MODULE_PATH
- ${CMAKE_MODULE_PATH}
- "${CMAKE_SOURCE_DIR}/cmake/Modules/"
-)
-if (NOT CMAKE_BUILD_TYPE)
- message(STATUS "${PROJECT_NAME}: No build type selected, default to Release")
- set(CMAKE_BUILD_TYPE "Release")
-endif ()
+##############################################################
+# Includes subdirectory
+##############################################################
+add_subdirectory(includes/hicn/http-proxy)
+
+##############################################################
+# Source files
+##############################################################
set(LIB_SOURCE_FILES
src/http_session.cc
src/http_proxy.cc
@@ -48,41 +54,50 @@ set(APP_SOURCE_FILES
main.cc
)
-add_subdirectory(includes/hicn/http-proxy)
-set(LIBHTTP_PROXY hicnhttpproxy)
-set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static)
-
-list(APPEND COMPILER_DEFINITIONS
- -DWITH_POLICY
-)
-
+##############################################################
+# Libraries to link
+##############################################################
list(APPEND HTTP_PROXY_LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${LIBHICNCTRL_LIBRARIES}
- ${LIBHICN_LIBRARIES}
- ${OPENSSL_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
+ PUBLIC ${LIBTRANSPORT_LIBRARIES}
+ PUBLIC ${LIBHICNCTRL_LIBRARIES}
+ PUBLIC ${LIBHICN_LIBRARIES}
+ PRIVATE ${CMAKE_THREAD_LIBS_INIT}
)
+
+##############################################################
+# Build http proxy library
+##############################################################
build_library(${LIBHTTP_PROXY}
STATIC
SOURCES ${LIB_SOURCE_FILES}
LINK_LIBRARIES ${HTTP_PROXY_LIBRARIES}
+ INCLUDE_DIRS
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
DEPENDS ${DEPENDENCIES}
INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES}
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS}
+ INCLUDE_DIRS
+ PUBLIC
+ $<BUILD_INTERFACE:${LIBPROXY_INCLUDE_DIRS}>
+ $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
- DEFINITIONS ${COMPILER_DEFINITIONS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
+
+##############################################################
+# Build http proxy executable
+##############################################################
if (NOT DISABLE_EXECUTABLES)
build_executable(${HTTP_PROXY}
SOURCES ${APP_SOURCE_FILES}
LINK_LIBRARIES ${LIBHTTP_PROXY_STATIC}
- DEPENDS ${LIBHTTP_PROXY_STATIC}
+ INCLUDE_DIRS
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${LIBHTTP_PROXY_STATIC} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
- DEFINITIONS ${COMPILER_DEFINITIONS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif ()
diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
index 5cc80a168..18f5704d1 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -24,7 +24,7 @@ list(APPEND HEADER_FILES
set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
set(LIBPROXY_INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/../.. ""
+ ${CMAKE_CURRENT_SOURCE_DIR}/../..
CACHE INTERNAL
"" FORCE
)
@@ -34,4 +34,3 @@ set(LIBPROXY_TO_INSTALL_HEADER_FILES
CACHE INTERNAL
"" FORCE
)
-
diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
index e02b9d9a7..14e0068ed 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,9 +15,9 @@
#pragma once
+#include <hicn/apps/utils/logger.h>
#include <hicn/transport/portability/c_portability.h>
#include <hicn/transport/utils/branch_prediction.h>
-#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/string_utils.h>
#include <asio.hpp>
@@ -60,15 +60,14 @@ class ForwarderConfig {
doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
}
- void doTryToConnectToForwarder(std::error_code ec) {
+ void doTryToConnectToForwarder(const std::error_code& ec) {
if (!ec) {
// ec == 0 --> timer expired
int ret = forwarder_interface_.connectToForwarder();
if (ret < 0) {
// We were not able to connect to the local forwarder. Do not give up
// and retry.
- TRANSPORT_LOG_ERROR
- << "Could not connect to local forwarder. Retrying.";
+ LoggerErr() << "Could not connect to local forwarder. Retrying.";
timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder,
@@ -79,19 +78,18 @@ class ForwarderConfig {
doGetMainListener(std::make_error_code(std::errc(0)));
}
} else {
- TRANSPORT_LOG_ERROR
- << "Timer for re-trying forwarder connection canceled.";
+ LoggerErr() << "Timer for re-trying forwarder connection canceled.";
}
}
- void doGetMainListener(std::error_code ec) {
+ void doGetMainListener(const std::error_code& ec) {
if (!ec) {
// ec == 0 --> timer expired
int ret = forwarder_interface_.getMainListenerPort();
if (ret <= 0) {
// Since without the main listener of the forwarder the proxy cannot
// work, we can stop the program here until we get the listener port.
- TRANSPORT_LOG_ERROR
+ LoggerErr()
<< "Could not retrieve main listener port from the forwarder. "
"Retrying.";
@@ -105,8 +103,7 @@ class ForwarderConfig {
listener_retrieved_callback_(std::make_error_code(std::errc(0)));
}
} else {
- TRANSPORT_LOG_ERROR
- << "Timer for retrieving main hicn listener canceled.";
+ LoggerErr() << "Timer for retrieving main hicn listener canceled.";
}
}
@@ -114,7 +111,7 @@ class ForwarderConfig {
TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header,
Callback&& callback) {
std::stringstream ss(header);
- route_info_t* ret = new route_info_t();
+ RouteInfoPtr ret = std::make_shared<route_info_t>();
std::string port_string;
while (ss.good()) {
@@ -174,9 +171,9 @@ class ForwarderConfig {
ret->family = AF_INET;
std::string _prefix = ret->route_addr;
forwarder_interface_.createFaceAndRoute(
- RouteInfoPtr(ret), [callback = std::forward<Callback>(callback),
- configured_prefix = std::move(_prefix)](
- uint32_t route_id, bool result) {
+ std::move(ret), [callback = std::forward<Callback>(callback),
+ configured_prefix = std::move(_prefix)](
+ uint32_t route_id, bool result) {
callback(result, configured_prefix);
});
diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h
index 54941a4ba..ae9562a12 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -23,8 +23,15 @@ extern "C" {
#ifndef ASIO_STANDALONE
#define ASIO_STANDALONE 1
#endif
-#include <asio.hpp>
-#include <asio/steady_timer.hpp>
+
+#ifdef __APPLE__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wshorten-64-to-32"
+#endif
+#include <hicn/transport/core/asio_wrapper.h>
+#ifdef __APPLE__
+#pragma clang diagnostic pop
+#endif
#include <functional>
#include <thread>
#include <unordered_map>
@@ -45,16 +52,11 @@ using RouteInfoPtr = std::shared_ptr<route_info_t>;
class ForwarderInterface {
public:
- ForwarderInterface(asio::io_service &io_service)
+ explicit ForwarderInterface(asio::io_service &io_service)
: external_ioservice_(io_service),
work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
- sock_(nullptr),
thread_(std::make_unique<std::thread>(
- [this]() { internal_ioservice_.run(); })),
- check_routes_timer_(nullptr),
- pending_add_route_counter_(0),
- route_id_(0),
- closed_(false) {}
+ [this]() { internal_ioservice_.run(); })) {}
~ForwarderInterface();
@@ -88,20 +90,20 @@ class ForwarderInterface {
void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try,
asio::steady_timer *timer,
- SetRouteCallback callback);
+ const SetRouteCallback &callback);
- int tryToCreateFaceAndRoute(route_info_t *route_info);
+ int tryToCreateFaceAndRoute(const route_info_t *route_info);
asio::io_service &external_ioservice_;
asio::io_service internal_ioservice_;
std::unique_ptr<asio::io_service::work> work_;
- hc_sock_t *sock_;
+ hc_sock_t *sock_ = nullptr;
std::unique_ptr<std::thread> thread_;
std::unordered_map<uint32_t, RouteInfoPtr> route_status_;
- std::unique_ptr<asio::steady_timer> check_routes_timer_;
- uint32_t pending_add_route_counter_;
- uint32_t route_id_;
- bool closed_;
+ std::unique_ptr<asio::steady_timer> check_routes_timer_ = nullptr;
+ uint32_t pending_add_route_counter_ = 0;
+ uint32_t route_id_ = 0;
+ bool closed_ = false;
};
} // namespace transport
diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h
index 7c035c83b..87a5c7d4e 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h
index a4139a620..567ab4540 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#pragma once
+#include <hicn/apps/utils/logger.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/event_thread.h>
@@ -22,7 +23,9 @@
#include "http_session.h"
#include "icn_receiver.h"
+#ifndef ASIO_STANDALONE
#define ASIO_STANDALONE
+#endif
#include <asio.hpp>
#include <asio/version.hpp>
#include <unordered_set>
@@ -31,7 +34,8 @@ class TcpListener {
public:
using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>;
- TcpListener(asio::io_service& io_service, short port, AcceptCallback callback)
+ TcpListener(asio::io_service& io_service, short port,
+ const AcceptCallback& callback)
: acceptor_(io_service),
#if ((ASIO_VERSION / 100 % 1000) < 12)
socket_(io_service),
@@ -46,13 +50,12 @@ class TcpListener {
acceptor_.listen();
}
- public:
void doAccept() {
#if ((ASIO_VERSION / 100 % 1000) >= 12)
acceptor_.async_accept(
- [this](std::error_code ec, asio::ip::tcp::socket socket) {
+ [this](const std::error_code& ec, asio::ip::tcp::socket socket) {
#else
- acceptor_.async_accept(socket_, [this](std::error_code ec) {
+ acceptor_.async_accept(socket_, [this](const std::error_code& ec) {
auto socket = std::move(socket_);
#endif
if (!ec) {
@@ -77,7 +80,7 @@ class HTTPClientConnectionCallback;
class Receiver {
public:
- Receiver() : thread_() {}
+ Receiver() {}
virtual ~Receiver() = default;
void stopAndJoinThread() { thread_.stop(); }
virtual void stop() = 0;
@@ -112,13 +115,13 @@ class TcpReceiver : public Receiver {
std::deque<HTTPClientConnectionCallback*> http_clients_;
std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_;
ForwarderConfig forwarder_config_;
- bool stopped_;
+ bool stopped_ = false;
};
class IcnReceiver : public Receiver {
public:
template <typename... Args>
- IcnReceiver(Args&&... args)
+ explicit IcnReceiver(Args&&... args)
: Receiver(),
icn_consum_producer_(thread_.getIoService(),
std::forward<Args>(args)...) {
@@ -145,20 +148,18 @@ class HTTPProxy {
std::string prefix;
std::string first_ipv6_word;
- virtual void printParams() { std::cout << "Parameters: " << std::endl; };
+ virtual ~CommonParams() {}
+ virtual void printParams() { LoggerInfo() << "Parameters: "; };
};
struct ClientParams : virtual CommonParams {
short tcp_listen_port;
void printParams() override {
- std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl;
+ LoggerInfo() << "Running HTTP/TCP -> HTTP/hICN proxy.";
CommonParams::printParams();
- std::cout << "\t"
- << "HTTP listen port: " << tcp_listen_port << std::endl;
- std::cout << "\t"
- << "Consumer Prefix: " << prefix << std::endl;
- std::cout << "\t"
- << "Prefix first word: " << first_ipv6_word << std::endl;
+ LoggerInfo() << "\tHTTP listen port: " << tcp_listen_port;
+ LoggerInfo() << "\tConsumer Prefix: " << prefix;
+ LoggerInfo() << "\tPrefix first word: " << first_ipv6_word;
}
};
@@ -171,25 +172,24 @@ class HTTPProxy {
bool manifest;
void printParams() override {
- std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl;
+ LoggerInfo() << "Running HTTP/hICN -> HTTP/TCP proxy.";
CommonParams::printParams();
- std::cout << "\t"
- << "Origin address: " << origin_address << std::endl;
- std::cout << "\t"
- << "Origin port: " << origin_port << std::endl;
- std::cout << "\t"
- << "Producer cache size: " << cache_size << std::endl;
- std::cout << "\t"
- << "hICN MTU: " << mtu << std::endl;
- std::cout << "\t"
- << "Default content lifetime: " << content_lifetime
- << std::endl;
- std::cout << "\t"
- << "Producer Prefix: " << prefix << std::endl;
- std::cout << "\t"
- << "Prefix first word: " << first_ipv6_word << std::endl;
- std::cout << "\t"
- << "Use manifest: " << manifest << std::endl;
+ LoggerInfo() << "\t"
+ << "Origin address: " << origin_address;
+ LoggerInfo() << "\t"
+ << "Origin port: " << origin_port;
+ LoggerInfo() << "\t"
+ << "Producer cache size: " << cache_size;
+ LoggerInfo() << "\t"
+ << "hICN MTU: " << mtu;
+ LoggerInfo() << "\t"
+ << "Default content lifetime: " << content_lifetime;
+ LoggerInfo() << "\t"
+ << "Producer Prefix: " << prefix;
+ LoggerInfo() << "\t"
+ << "Prefix first word: " << first_ipv6_word;
+ LoggerInfo() << "\t"
+ << "Use manifest: " << manifest;
}
};
@@ -207,4 +207,4 @@ class HTTPProxy {
asio::signal_set signals_;
};
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_session.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h
index f4a3dbdee..1b7df96a6 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/http_session.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -17,13 +17,19 @@
#include <hicn/transport/core/packet.h>
-#include "http_1x_message_fast_parser.h"
-
-#define ASIO_STANDALONE
-#include <asio.hpp>
+#ifdef __APPLE__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wshorten-64-to-32"
+#endif
+#include <hicn/transport/core/asio_wrapper.h>
+#ifdef __APPLE__
+#pragma clang diagnostic pop
+#endif
#include <deque>
#include <functional>
+#include "http_1x_message_fast_parser.h"
+
namespace transport {
using asio::ip::tcp;
@@ -67,13 +73,16 @@ class HTTPSession {
};
public:
- HTTPSession(asio::io_service &io_service, std::string &ip_address,
- std::string &port, ContentReceivedCallback receive_callback,
- OnConnectionClosed on_reconnect_callback, bool client = false);
+ HTTPSession(asio::io_service &io_service, const std::string &ip_address,
+ const std::string &port,
+ const ContentReceivedCallback &receive_callback,
+ const OnConnectionClosed &on_reconnect_callback,
+ bool client = false);
HTTPSession(asio::ip::tcp::socket socket,
- ContentReceivedCallback receive_callback,
- OnConnectionClosed on_reconnect_callback, bool client = true);
+ const ContentReceivedCallback &receive_callback,
+ const OnConnectionClosed &on_reconnect_callback,
+ bool client = true);
~HTTPSession();
@@ -97,8 +106,7 @@ class HTTPSession {
bool checkConnected();
- private:
- void handleRead(std::error_code ec, std::size_t length);
+ void handleRead(const std::error_code &ec, std::size_t length);
void tryReconnection();
void startConnectionTimer();
void handleDeadline(const std::error_code &ec);
@@ -114,14 +122,14 @@ class HTTPSession {
asio::streambuf input_buffer_;
bool reverse_;
- bool is_reconnection_;
- bool data_available_;
+ bool is_reconnection_ = false;
+ bool data_available_ = false;
- std::size_t content_length_;
+ std::size_t content_length_ = 0;
// Chunked encoding
- bool is_last_chunk_;
- bool chunked_;
+ bool is_last_chunk_ = false;
+ bool chunked_ = false;
ContentReceivedCallback receive_callback_;
OnConnectionClosed on_connection_closed_callback_;
diff --git a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h
index 780037665..e3ab97fdd 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,20 +13,18 @@
* limitations under the License.
*/
+#include <hicn/http-proxy/http_session.h>
+#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/interfaces/publication_options.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/spinlock.h>
-#include <asio.hpp>
#include <cassert>
#include <cstring>
#include <queue>
#include <utility>
-#include <hicn/http-proxy/http_session.h>
-//#include "http_session.h"
-
namespace transport {
class AsyncConsumerProducer {
@@ -49,9 +47,7 @@ class AsyncConsumerProducer {
const std::string& content_lifetime, bool manifest)
: AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word,
origin_address, origin_port, cache_size, mtu,
- content_lifetime, manifest) {
- external_io_service_ = false;
- }
+ content_lifetime, manifest) {}
void run();
@@ -73,7 +69,7 @@ class AsyncConsumerProducer {
core::Prefix prefix_;
asio::io_service& io_service_;
asio::io_service internal_io_service_;
- bool external_io_service_;
+ bool external_io_service_ = false;
interface::ProducerSocket producer_socket_;
std::string ip_address_;
@@ -81,9 +77,8 @@ class AsyncConsumerProducer {
uint32_t cache_size_;
uint32_t mtu_;
- uint64_t request_counter_;
+ uint64_t request_counter_ = 0;
- // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>>
// connection_map_;
HTTPSession connector_;
diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h
index d87c796d0..9865d8e4c 100644
--- a/apps/http-proxy/includes/hicn/http-proxy/utils.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -35,17 +35,16 @@ TRANSPORT_ALWAYS_INLINE std::string generatePrefix(
str += pos;
uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
+ const uint16_t* word = (const uint16_t*)&locator_hash;
std::stringstream stream;
stream << first_ipv6_word << ":0";
- for (uint16_t* word = (uint16_t*)&locator_hash;
- std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
- word++) {
- stream << ":" << std::hex << *word;
+ for (std::size_t i = 0; i < sizeof(locator_hash) / 2; i++) {
+ stream << ":" << std::hex << word[i];
}
stream << "::";
return stream.str();
-} \ No newline at end of file
+}
diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc
index d9c29ecde..832ec23e6 100644
--- a/apps/http-proxy/main.cc
+++ b/apps/http-proxy/main.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,32 +13,31 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
#include <hicn/http-proxy/http_proxy.h>
using namespace transport;
-int usage(char* program) {
- std::cerr << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n"
- << "Server or Client: \n"
- << " -P [FIRST_IPv6_WORD_HEX]\n"
- << " -t [number of threads]\n"
- << "Client Options: \n"
- << " -L [PROXY_LISTEN_PORT]\n"
- << "Server Options: \n"
- << " -a [ORIGIN_IP_ADDRESS]\n"
- << " -p [ORIGIN_PORT]\n"
- << " -c [CACHE_SIZE]\n"
- << " -m [MTU]"
- << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n"
- << " -M (enable manifest)\n"
- << std::endl
- << "Example Server:\n"
- << " " << program
- << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 "
- "http://httpserver\n"
- << "Example Client:\n"
- << " " << program << " -C -L 9091 http://httpserver\n"
- << std::endl;
+int usage(const char* program) {
+ LoggerInfo() << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n"
+ << "Server or Client: \n"
+ << " -P [FIRST_IPv6_WORD_HEX]\n"
+ << " -t [number of threads]\n"
+ << "Client Options: \n"
+ << " -L [PROXY_LISTEN_PORT]\n"
+ << "Server Options: \n"
+ << " -a [ORIGIN_IP_ADDRESS]\n"
+ << " -p [ORIGIN_PORT]\n"
+ << " -c [CACHE_SIZE]\n"
+ << " -m [MTU]"
+ << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n"
+ << " -M (enable manifest)\n";
+ LoggerInfo() << "Example Server:\n"
+ << " " << program
+ << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 "
+ "http://httpserver\n"
+ << "Example Client:\n"
+ << " " << program << " -C -L 9091 http://httpserver\n";
return -1;
}
@@ -53,8 +52,7 @@ struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams {
"Proxy configured as client and server at the same time.");
}
- std::cout << "\t"
- << "N Threads: " << n_thread << std::endl;
+ LoggerInfo() << "\tN Threads: " << n_thread;
}
HTTPProxy* instantiateProxyAsValue() {
@@ -93,18 +91,16 @@ int main(int argc, char** argv) {
switch (opt) {
case 'C':
if (params.server) {
- std::cerr << "Cannot be both client and server (both -C anc -S "
- "options specified.)."
- << std::endl;
+ LoggerErr() << "Cannot be both client and server (both -C anc -S "
+ "options specified.).";
return usage(argv[0]);
}
params.client = true;
break;
case 'S':
if (params.client) {
- std::cerr << "Cannot be both client and server (both -C anc -S "
- "options specified.)."
- << std::endl;
+ LoggerErr() << "Cannot be both client and server (both -C anc -S "
+ "options specified.).";
return usage(argv[0]);
}
params.server = true;
@@ -136,15 +132,13 @@ int main(int argc, char** argv) {
case 't':
params.n_thread = std::stoul(optarg);
break;
- case 'h':
default:
return usage(argv[0]);
- break;
}
}
if (argv[optind] == 0) {
- std::cerr << "Using default prefix " << params.prefix << std::endl;
+ LoggerInfo() << "Using default prefix " << params.prefix;
} else {
params.prefix = argv[optind];
}
@@ -155,4 +149,4 @@ int main(int argc, char** argv) {
delete proxy;
return 0;
-} \ No newline at end of file
+}
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc
index c2448de9a..717679e09 100644
--- a/apps/http-proxy/src/forwarder_interface.cc
+++ b/apps/http-proxy/src/forwarder_interface.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,8 +14,8 @@
*/
#include <arpa/inet.h>
+#include <hicn/apps/utils/logger.h>
#include <hicn/http-proxy/forwarder_interface.h>
-#include <hicn/transport/utils/log.h>
#include <chrono>
#include <iostream>
@@ -27,7 +27,7 @@ namespace transport {
ForwarderInterface::~ForwarderInterface() {}
int ForwarderInterface::connectToForwarder() {
- sock_ = hc_sock_create();
+ sock_ = hc_sock_create(FORWARDER_TYPE_HICNLIGHT, NULL);
if (!sock_) return -1;
if (hc_sock_connect(sock_) < 0) {
@@ -96,7 +96,8 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
std::vector<hc_route_t *> routes_to_remove;
foreach_route(r, data) {
char remote_addr[INET6_ADDRSTRLEN];
- int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ int ret =
+ hicn_ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
if (ret < 0) continue;
std::string route_addr(remote_addr);
@@ -119,7 +120,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
for (unsigned i = 0; i < routes_to_remove.size(); i++) {
connids_to_remove.insert(routes_to_remove[i]->face_id);
if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
- TRANSPORT_LOG_ERROR << "Error removing route from forwarder.";
+ LoggerErr() << "Error removing route from forwarder.";
}
}
@@ -146,24 +147,23 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
for (unsigned i = 0; i < conns_to_remove.size(); i++) {
if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
- TRANSPORT_LOG_ERROR << "Error removing connection from forwarder.";
+ LoggerErr() << "Error removing connection from forwarder.";
}
}
hc_data_free(data);
}
-void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info,
- uint8_t max_try,
- asio::steady_timer *timer,
- SetRouteCallback callback) {
+void ForwarderInterface::internalCreateFaceAndRoute(
+ RouteInfoPtr route_info, uint8_t max_try, asio::steady_timer *timer,
+ const SetRouteCallback &callback) {
int ret = tryToCreateFaceAndRoute(route_info.get());
if (ret < 0 && max_try > 0) {
max_try--;
timer->expires_from_now(std::chrono::milliseconds(500));
timer->async_wait([this, _route_info = std::move(route_info), max_try,
- timer, callback](std::error_code ec) {
+ timer, callback](const std::error_code &ec) {
if (ec) return;
internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
std::move(callback));
@@ -185,7 +185,8 @@ void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info,
delete timer;
}
-int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
+int ForwarderInterface::tryToCreateFaceAndRoute(
+ const route_info_t *route_info) {
if (!sock_) return -1;
hc_data_t *data;
@@ -201,8 +202,9 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
if (interface.compare("lo") != 0) {
found = true;
- ip_address_t remote_ip;
- if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) {
+ hicn_ip_address_t remote_ip;
+ if (hicn_ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) <
+ 0) {
hc_data_free(data);
return -1;
}
@@ -210,14 +212,14 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
hc_face_t face;
memset(&face, 0, sizeof(hc_face_t));
- face.face.type = FACE_TYPE_UDP;
- face.face.family = route_info->family;
- face.face.local_addr = l->local_addr;
- face.face.remote_addr = remote_ip;
- face.face.local_port = l->local_port;
- face.face.remote_port = route_info->remote_port;
+ face.type = FACE_TYPE_UDP;
+ face.family = route_info->family;
+ face.local_addr = l->local_addr;
+ face.remote_addr = remote_ip;
+ face.local_port = l->local_port;
+ face.remote_port = route_info->remote_port;
- if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) {
+ if (netdevice_set_name(&face.netdevice, l->interface_name) < 0) {
hc_data_free(data);
return -1;
}
@@ -237,10 +239,10 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
return -1;
}
- ip_address_t route_ip;
+ hicn_ip_address_t route_ip;
hc_route_t route;
- if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ if (hicn_ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
hc_data_free(data);
return -1;
}
diff --git a/apps/http-proxy/src/http_1x_message_fast_parser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc
index 4b6b78d55..ffae2368b 100644
--- a/apps/http-proxy/src/http_1x_message_fast_parser.cc
+++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -99,7 +99,8 @@ bool HTTPMessageFastParser::isMpdRequest(const uint8_t *headers,
return false;
}
-uint32_t HTTPMessageFastParser::parseCacheControl(const uint8_t *headers,
- std::size_t length) {
+uint32_t HTTPMessageFastParser::parseCacheControl(
+ [[maybe_unused]] const uint8_t *headers,
+ [[maybe_unused]] std::size_t length) {
return 0;
}
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
index 2040f7cfa..7419c7f7f 100644
--- a/apps/http-proxy/src/http_proxy.cc
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,11 +13,11 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
#include <hicn/http-proxy/http_proxy.h>
#include <hicn/http-proxy/http_session.h>
#include <hicn/http-proxy/utils.h>
#include <hicn/transport/core/interest.h>
-#include <hicn/transport/utils/log.h>
#include <hicn/transport/utils/string_utils.h>
namespace transport {
@@ -29,16 +29,15 @@ using interface::ConsumerInterestCallback;
using interface::ConsumerSocket;
using interface::TransportProtocolAlgorithms;
-class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
+class HTTPClientConnectionCallback
+ : public interface::ConsumerSocket::ReadCallback {
public:
HTTPClientConnectionCallback(TcpReceiver& tcp_receiver,
utils::EventThread& thread)
: tcp_receiver_(tcp_receiver),
thread_(thread),
prefix_hash_(tcp_receiver_.prefix_hash_),
- consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()),
- session_(nullptr),
- current_size_(0) {
+ consumer_(TransportProtocolAlgorithms::RAAQM, thread_) {
consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
consumer_.setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
@@ -62,15 +61,15 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
- [this](asio::ip::tcp::socket& socket) -> bool {
+ [this](const asio::ip::tcp::socket& socket) {
try {
std::string remote_address =
socket.remote_endpoint().address().to_string();
std::uint16_t remote_port = socket.remote_endpoint().port();
- TRANSPORT_LOG_INFO << "Client " << remote_address << ":"
- << remote_port << "disconnected.";
- } catch (std::system_error& e) {
- // Do nothing
+ LoggerInfo() << "Client " << remote_address << ":" << remote_port
+ << "disconnected.";
+ } catch (asio::system_error& e) {
+ LoggerInfo() << "Client disconnected.";
}
consumer_.stop();
@@ -136,24 +135,19 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
current_size_ += size;
if (is_last) {
- // TRANSPORT_LOGD("Request received: %s",
- // std::string((const char*)tmp_buffer_.first->data(),
- // tmp_buffer_.first->length())
- // .c_str());
if (current_size_ < 1400) {
request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
} else {
- TRANSPORT_LOG_ERROR << "Ignoring client request due to size ("
- << current_size_ << ") > 1400.";
+ LoggerErr() << "Ignoring client request due to size (" << current_size_
+ << ") > 1400.";
session_->close();
current_size_ = 0;
return;
}
if (!consumer_.isRunning()) {
- TRANSPORT_LOG_INFO
- << "Consumer stopped, triggering consume from TCP session "
- "handler..";
+ LoggerInfo() << "Consumer stopped, triggering consume from TCP session "
+ "handler..";
consumeNextRequest();
}
@@ -163,15 +157,16 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
// hicn callbacks
- void processLeavingInterest(interface::ConsumerSocket& c,
- const core::Interest& interest) {
+ void processLeavingInterest(
+ [[maybe_unused]] const interface::ConsumerSocket& c,
+ const core::Interest& interest) {
if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) {
Interest& int2 = const_cast<Interest&>(interest);
int2.appendPayload(request_buffer_queue_.front().first->clone());
}
}
- void processInterestRetx(interface::ConsumerSocket& c,
+ void processInterestRetx([[maybe_unused]] const interface::ConsumerSocket& c,
const core::Interest& interest) {
if (interest.payloadSize() == 0) {
Interest& int2 = const_cast<Interest&>(interest);
@@ -180,20 +175,22 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
}
bool isBufferMovable() noexcept { return true; }
- void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {}
- void readDataAvailable(size_t length) noexcept {}
+ void getReadBuffer(uint8_t** application_buffer,
+ size_t* max_length) { /*nothing to do*/
+ }
+ void readDataAvailable(size_t length) noexcept { /*nothing to do*/
+ }
size_t maxBufferSize() const { return 64 * 1024; }
void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept {
// Response received. Send it back to client
auto _buffer = buffer.release();
// TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
- session_->send(_buffer, []() {});
+ session_->send(_buffer, []() { /*nothing to do*/ });
}
- void readError(const std::error_code ec) noexcept {
- TRANSPORT_LOG_ERROR
- << "Error reading from hicn consumer socket. Closing session.";
+ void readError(const std::error_code& ec) noexcept {
+ LoggerErr() << "Error reading from hicn consumer socket. Closing session.";
session_->close();
}
@@ -213,15 +210,16 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
session_->send((const uint8_t*)HTTPMessageFastParser::http_cors,
std::strlen(HTTPMessageFastParser::http_cors), [this]() {
auto& socket = session_->socket_;
- TRANSPORT_LOG_INFO
- << "Sent OPTIONS to client "
- << socket.remote_endpoint().address() << ":"
- << socket.remote_endpoint().port();
+ LoggerInfo() << "Sent OPTIONS to client "
+ << socket.remote_endpoint().address()
+ << ":" << socket.remote_endpoint().port();
});
}
} else {
tcp_receiver_.parseHicnHeader(
- it->second, [this](bool result, std::string configured_prefix) {
+ it->second,
+ [this](bool result,
+ [[maybe_unused]] const std::string& configured_prefix) {
const char* reply = nullptr;
if (result) {
reply = HTTPMessageFastParser::http_ok;
@@ -230,28 +228,26 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
}
/* Route created. Send back a 200 OK to client */
- session_->send((const uint8_t*)reply, std::strlen(reply),
- [this, result]() {
- auto& socket = session_->socket_;
- TRANSPORT_LOG_INFO
- << "Sent " << result << " response to client "
- << socket.remote_endpoint().address() << ":"
- << socket.remote_endpoint().port();
- });
+ session_->send(
+ (const uint8_t*)reply, std::strlen(reply), [this, result]() {
+ auto& socket = session_->socket_;
+ LoggerInfo() << "Sent " << result << " response to client "
+ << socket.remote_endpoint().address() << ":"
+ << socket.remote_endpoint().port();
+ });
});
}
}
- private:
TcpReceiver& tcp_receiver_;
utils::EventThread& thread_;
std::string& prefix_hash_;
ConsumerSocket consumer_;
- std::unique_ptr<HTTPSession> session_;
+ std::unique_ptr<HTTPSession> session_ = nullptr;
std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>>
request_buffer_queue_;
std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_;
- std::size_t current_size_;
+ std::size_t current_size_ = 0;
};
TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
@@ -264,8 +260,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
ipv6_first_word_(ipv6_first_word),
prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)),
forwarder_config_(
- thread_.getIoService(),
- [this](std::error_code ec) {
+ thread_.getIoService(), [this](const std::error_code& ec) {
if (!ec) {
listener_.doAccept();
for (int i = 0; i < 10; i++) {
@@ -273,8 +268,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
new HTTPClientConnectionCallback(*this, thread_));
}
}
- }),
- stopped_(false) {
+ }) {
forwarder_config_.tryToConnectToForwarder();
}
@@ -331,8 +325,8 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
void HTTPProxy::setupSignalHandler() {
signals_.async_wait([this](const std::error_code& ec, int signal_number) {
if (!ec) {
- TRANSPORT_LOG_INFO << "Received signal " << signal_number
- << ". Stopping gracefully.";
+ LoggerInfo() << "Received signal " << signal_number
+ << ". Stopping gracefully.";
stop();
}
});
@@ -353,7 +347,6 @@ void HTTPProxy::stop() {
HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread)
: signals_(main_io_context_, SIGINT, SIGQUIT) {
for (uint16_t i = 0; i < n_thread; i++) {
- // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params));
receivers_.emplace_back(std::make_unique<TcpReceiver>(
params.tcp_listen_port, params.prefix, params.first_ipv6_word));
}
diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc
index 84c814cbd..06a81dc27 100644
--- a/apps/http-proxy/src/http_session.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,30 +13,24 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
#include <hicn/http-proxy/http_proxy.h>
#include <hicn/transport/utils/branch_prediction.h>
-#include <hicn/transport/utils/log.h>
#include <iostream>
namespace transport {
-HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
- std::string &port,
- ContentReceivedCallback receive_callback,
- OnConnectionClosed on_connection_closed_callback,
- bool client)
+HTTPSession::HTTPSession(
+ asio::io_service &io_service, const std::string &ip_address,
+ const std::string &port, const ContentReceivedCallback &receive_callback,
+ const OnConnectionClosed &on_connection_closed_callback, bool client)
: io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
endpoint_iterator_(resolver_.resolve({ip_address, port})),
timer_(io_service),
reverse_(client),
- is_reconnection_(false),
- data_available_(false),
- content_length_(0),
- is_last_chunk_(false),
- chunked_(false),
receive_callback_(receive_callback),
on_connection_closed_callback_(on_connection_closed_callback) {
input_buffer_.prepare(buffer_size + 2048);
@@ -51,10 +45,10 @@ HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
doConnect();
}
-HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
- ContentReceivedCallback receive_callback,
- OnConnectionClosed on_connection_closed_callback,
- bool client)
+HTTPSession::HTTPSession(
+ asio::ip::tcp::socket socket,
+ const ContentReceivedCallback &receive_callback,
+ const OnConnectionClosed &on_connection_closed_callback, bool client)
:
#if ((ASIO_VERSION / 100 % 1000) < 12)
io_service_(socket.get_io_service()),
@@ -92,7 +86,7 @@ void HTTPSession::send(const uint8_t *packet, std::size_t len,
io_service_.dispatch([this, packet, len, content_sent]() {
asio::async_write(socket_, asio::buffer(packet, len),
[content_sent = std::move(content_sent)](
- std::error_code ec, std::size_t /*length*/) {
+ const std::error_code &ec, std::size_t /*length*/) {
if (!ec) {
content_sent();
}
@@ -120,9 +114,7 @@ void HTTPSession::close() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
- // on_disconnect_callback_();
}
}
}
@@ -130,25 +122,26 @@ void HTTPSession::close() {
void HTTPSession::doWrite() {
auto &buffer = write_msgs_.front().first;
- asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
- [this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_FALSE(!ec)) {
- write_msgs_.front().second();
- write_msgs_.pop_front();
- if (!write_msgs_.empty()) {
- doWrite();
- }
- }
- });
+ asio::async_write(
+ socket_, asio::buffer(buffer->data(), buffer->length()),
+ [this](const std::error_code &ec, [[maybe_unused]] std::size_t length) {
+ if (TRANSPORT_EXPECT_FALSE(!ec)) {
+ write_msgs_.front().second();
+ write_msgs_.pop_front();
+ if (!write_msgs_.empty()) {
+ doWrite();
+ }
+ }
+ });
} // namespace transport
-void HTTPSession::handleRead(std::error_code ec, std::size_t length) {
+void HTTPSession::handleRead(const std::error_code &ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false)
- : !content_length_;
+ bool check = is_last_chunk_ ? !content_length_ : false;
+ bool is_last = chunked_ ? check : !content_length_;
receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr);
input_buffer_.consume(input_buffer_.size());
@@ -207,7 +200,7 @@ void HTTPSession::doReadBody(std::size_t body_size,
void HTTPSession::doReadChunkedHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n",
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
@@ -226,7 +219,7 @@ void HTTPSession::doReadChunkedHeader() {
void HTTPSession::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
@@ -269,12 +262,11 @@ void HTTPSession::doReadHeader() {
void HTTPSession::tryReconnection() {
if (on_connection_closed_callback_(socket_)) {
if (state_ == ConnectorState::CONNECTED) {
- TRANSPORT_LOG_ERROR << "Connection lost. Trying to reconnect...";
+ LoggerErr() << "Connection lost. Trying to reconnect...";
state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
io_service_.post([this]() {
if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
}
startConnectionTimer();
@@ -287,7 +279,7 @@ void HTTPSession::tryReconnection() {
void HTTPSession::doConnect() {
asio::async_connect(
socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
+ [this](const std::error_code &ec, tcp::resolver::iterator) {
if (!ec) {
timer_.cancel();
state_ = ConnectorState::CONNECTED;
@@ -295,8 +287,6 @@ void HTTPSession::doConnect() {
asio::ip::tcp::no_delay noDelayOption(true);
socket_.set_option(noDelayOption);
- // on_reconnect_callback_();
-
doReadHeader();
if (data_available_ && !write_msgs_.empty()) {
@@ -306,11 +296,11 @@ void HTTPSession::doConnect() {
if (is_reconnection_) {
is_reconnection_ = false;
- TRANSPORT_LOG_INFO << "Connection recovered!";
+ LoggerInfo() << "Connection recovered!";
}
} else {
- TRANSPORT_LOG_ERROR << "Impossible to reconnect: " << ec.message();
+ LoggerErr() << "Impossible to reconnect: " << ec.message();
close();
}
});
@@ -330,7 +320,7 @@ void HTTPSession::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
- TRANSPORT_LOG_ERROR << "Error connecting. Is the server running?";
+ LoggerErr() << "Error connecting. Is the server running?";
io_service_.stop();
});
}
diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc
index ea8ac7191..c8904aa95 100644
--- a/apps/http-proxy/src/icn_receiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,13 +13,13 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
#include <hicn/http-proxy/http_1x_message_fast_parser.h>
#include <hicn/http-proxy/icn_receiver.h>
#include <hicn/http-proxy/utils.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/http/default_values.h>
#include <hicn/transport/utils/hash.h>
-#include <hicn/transport/utils/log.h>
#include <functional>
#include <memory>
@@ -33,18 +33,15 @@ AsyncConsumerProducer::AsyncConsumerProducer(
const std::string& mtu, const std::string& content_lifetime, bool manifest)
: prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)),
io_service_(io_service),
- external_io_service_(true),
- producer_socket_(),
ip_address_(origin_address),
port_(origin_port),
- cache_size_(std::stoul(cache_size)),
- mtu_(std::stoul(mtu)),
- request_counter_(0),
+ cache_size_((uint32_t)std::stoul(cache_size)),
+ mtu_((uint32_t)std::stoul(mtu)),
connector_(io_service_, ip_address_, port_,
std::bind(&AsyncConsumerProducer::publishContent, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
- [this](asio::ip::tcp::socket& socket) -> bool {
+ [this]([[maybe_unused]] const asio::ip::tcp::socket& socket) {
std::queue<interface::PublicationOptions> empty;
std::swap(response_name_queue_, empty);
@@ -55,28 +52,28 @@ AsyncConsumerProducer::AsyncConsumerProducer(
interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOG_WARNING << "Warning: output buffer size has not been set.";
+ LoggerWarn() << "Warning: output buffer size has not been set.";
}
ret = producer_socket_.setSocketOption(
- interface::GeneralTransportOptions::MAKE_MANIFEST, manifest);
+ interface::GeneralTransportOptions::MANIFEST_MAX_CAPACITY, manifest);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOG_WARNING << "Warning: impossible to enable signatures.";
+ LoggerWarn() << "Warning: impossible to enable signatures.";
}
ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_);
if (ret != SOCKET_OPTION_SET) {
- TRANSPORT_LOG_WARNING << "Warning: mtu has not been set.";
+ LoggerWarn() << "Warning: mtu has not been set.";
}
producer_socket_.registerPrefix(prefix_);
}
void AsyncConsumerProducer::start() {
- TRANSPORT_LOG_INFO << "Starting listening";
+ LoggerInfo() << "Starting listening";
doReceive();
}
@@ -90,8 +87,8 @@ void AsyncConsumerProducer::run() {
void AsyncConsumerProducer::stop() {
io_service_.post([this]() {
- TRANSPORT_LOG_INFO << "Number of requests processed by plugin: "
- << request_counter_;
+ LoggerInfo() << "Number of requests processed by plugin: "
+ << request_counter_;
producer_socket_.stop();
connector_.close();
});
@@ -100,7 +97,7 @@ void AsyncConsumerProducer::stop() {
void AsyncConsumerProducer::doReceive() {
producer_socket_.setSocketOption(
interface::ProducerCallbacksOptions::CACHE_MISS,
- [this](interface::ProducerSocket& producer,
+ [this]([[maybe_unused]] const interface::ProducerSocket& producer,
interface::Interest& interest) {
if (interest.payloadSize() > 0) {
// Interest may contain http request
@@ -112,6 +109,7 @@ void AsyncConsumerProducer::doReceive() {
});
producer_socket_.connect();
+ producer_socket_.start();
}
void AsyncConsumerProducer::manageIncomingInterest(
@@ -128,9 +126,9 @@ void AsyncConsumerProducer::manageIncomingInterest(
if (seg >= _it->second.first) {
// TRANSPORT_LOGD(
- // "Ignoring interest with name %s for a content object which does not "
- // "exist. (Request: %u, max: %u)",
- // name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
+ // "Ignoring interest with name %s for a content object which does not
+ // " "exist. (Request: %u, max: %u)", name.toString().c_str(),
+ // (uint32_t)seg, (uint32_t)_it->second.first);
return;
}
}
@@ -148,7 +146,8 @@ void AsyncConsumerProducer::manageIncomingInterest(
response_name_queue_.emplace(std::move(name),
is_mpd ? 1000 : default_content_lifetime_);
- connector_.send(payload, [packet = std::move(packet)]() {});
+ connector_.send(payload,
+ [packet = std::move(packet)]() { /*nothing to do*/ });
}
void AsyncConsumerProducer::publishContent(const uint8_t* data,
@@ -157,26 +156,25 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data,
uint32_t start_suffix = 0;
if (response_name_queue_.empty()) {
- std::cerr << "Aborting due tue empty request queue" << std::endl;
+ LoggerErr() << "Aborting due tue empty request queue";
abort();
}
- interface::PublicationOptions& options = response_name_queue_.front();
+ const interface::PublicationOptions& options = response_name_queue_.front();
int ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
options.getLifetime());
if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) {
- TRANSPORT_LOG_WARNING << "Warning: content object lifetime has not been set.";
+ LoggerWarn() << "Warning: content object lifetime has not been set.";
}
const interface::Name& name = options.getName();
auto it = chunk_number_map_.find(name);
if (it == chunk_number_map_.end()) {
- std::cerr << "Aborting due to response not found in ResposeInfo map."
- << std::endl;
+ LoggerErr() << "Aborting due to response not found in ResposeInfo map.";
abort();
}
diff --git a/apps/ping/.clang-format b/apps/ping/.clang-format
index cd21e2017..adc73c6fd 100644
--- a/apps/ping/.clang-format
+++ b/apps/ping/.clang-format
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2021 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
diff --git a/apps/ping/CMakeLists.txt b/apps/ping/CMakeLists.txt
index 42f7f98c1..ab3fdf56d 100644
--- a/apps/ping/CMakeLists.txt
+++ b/apps/ping/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021-2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -12,28 +12,49 @@
# limitations under the License.
if (NOT DISABLE_EXECUTABLES)
+##############################################################
+# Compiler options
+##############################################################
+set(COMPILER_OPTIONS
+ ${DEFAULT_COMPILER_OPTIONS}
+)
+
+##############################################################
+# Libraries to link
+##############################################################
list (APPEND PING_LIBRARIES
- ${LIBTRANSPORT_LIBRARIES}
- ${CMAKE_THREAD_LIBS_INIT}
- ${WSOCK32_LIBRARY}
- ${WS2_32_LIBRARY}
+ PRIVATE ${LIBHICN_LIBRARIES}
+ PRIVATE ${LIBTRANSPORT_LIBRARIES}
+ PRIVATE ${CMAKE_THREAD_LIBS_INIT}
+ PRIVATE ${WSOCK32_LIBRARY}
+ PRIVATE ${WS2_32_LIBRARY}
)
+##############################################################
+# Build ping server
+##############################################################
build_executable(hicn-ping-server
SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_server.cc
LINK_LIBRARIES ${PING_LIBRARIES}
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
- DEPENDS ${DEPENDENCIES}
+ INCLUDE_DIRS
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
+##############################################################
+# Build ping client
+##############################################################
build_executable(hicn-ping-client
SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/ping_client.cc
LINK_LIBRARIES ${PING_LIBRARIES}
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
- DEPENDS ${DEPENDENCIES}
+ INCLUDE_DIRS
+ PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} ${COMMON_INCLUDE_DIRS}
+ DEPENDS ${DEPENDENCIES} ${THIRD_PARTY_DEPENDENCIES}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ COMPILE_OPTIONS ${COMPILER_OPTIONS}
)
endif () \ No newline at end of file
diff --git a/apps/ping/src/ping_client.cc b/apps/ping/src/ping_client.cc
index 24e0bf3ed..08938734b 100644
--- a/apps/ping/src/ping_client.cc
+++ b/apps/ping/src/ping_client.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,18 +13,22 @@
* limitations under the License.
*/
+#include <hicn/apps/utils/logger.h>
+#include <hicn/transport/auth/signer.h>
#include <hicn/transport/auth/verifier.h>
#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <hicn/transport/utils/traffic_generator.h>
#include <asio/signal_set.hpp>
#include <asio/steady_timer.hpp>
#include <chrono>
#include <map>
-#define SYN_STATE 1
-#define ACK_STATE 2
+static constexpr uint32_t SYN_STATE = 1;
namespace transport {
@@ -32,147 +36,131 @@ namespace core {
namespace ping {
-typedef std::map<uint64_t, uint64_t> SendTimeMap;
-typedef auth::AsymmetricVerifier Verifier;
+using SendTimeMap = std::map<uint64_t, utils::SteadyTime::TimePoint>;
+using Verifier = auth::AsymmetricVerifier;
class Configuration {
public:
- uint64_t interestLifetime_;
- uint64_t pingInterval_;
- uint64_t maxPing_;
- uint64_t first_suffix_;
- std::string name_;
+ static constexpr char TRAFFIC_GENERATOR_RAND[] = "RANDOM";
+
+ uint32_t num_int_manifest_suffixes_ =
+ 0; // Number of suffixes in interest manifest (suffix in the header
+ // is not included in the count)
+ uint64_t interestLifetime_ = 500; // ms
+ uint64_t pingInterval_ = 1000000; // us
+ uint32_t maxPing_ = 10; // number of interests
+ uint32_t first_suffix_ = 0;
+ std::string name_ = "b001::1";
std::string certificate_;
- uint16_t srcPort_;
- uint16_t dstPort_;
- bool verbose_;
- bool dump_;
- bool jump_;
- bool open_;
- bool always_syn_;
- bool always_ack_;
- bool quiet_;
- uint32_t jump_freq_;
- uint32_t jump_size_;
- uint8_t ttl_;
-
- Configuration() {
- interestLifetime_ = 500; // ms
- pingInterval_ = 1000000; // us
- maxPing_ = 10; // number of interests
- first_suffix_ = 0;
- name_ = "b001::1"; // string
- srcPort_ = 9695;
- dstPort_ = 8080;
- verbose_ = false;
- dump_ = false;
- jump_ = false;
- open_ = false;
- always_syn_ = false;
- always_ack_ = false;
- quiet_ = false;
- jump_freq_ = 0;
- jump_size_ = 0;
- ttl_ = 64;
- }
+ std::string passphrase_;
+ std::string traffic_generator_type_;
+ bool jump_ = false;
+ uint32_t jump_freq_ = 0;
+ uint32_t jump_size_ = 0;
+ hicn_packet_format_t packet_format_ = HICN_PACKET_FORMAT_DEFAULT;
+
+ Configuration() = default;
};
-class Client : interface::Portal::ConsumerCallback {
+class Client : public interface::Portal::TransportCallback {
public:
- Client(Configuration *c)
- : portal_(), signals_(portal_.getIoService(), SIGINT) {
+ explicit Client(Configuration *c)
+ : signals_(io_service_, SIGINT),
+ config_(c),
+ timer_(std::make_unique<asio::steady_timer>(
+ portal_.getThread().getIoService())) {
// Let the main thread to catch SIGINT
- portal_.connect();
- portal_.setConsumerCallback(this);
-
signals_.async_wait(std::bind(&Client::afterSignal, this));
- timer_.reset(new asio::steady_timer(portal_.getIoService()));
- config_ = c;
- sequence_number_ = config_->first_suffix_;
- last_jump_ = 0;
- processed_ = 0;
- state_ = SYN_STATE;
- sent_ = 0;
- received_ = 0;
- timedout_ = 0;
if (!c->certificate_.empty()) {
verifier_.useCertificate(c->certificate_);
}
+
+ // If interst manifest, sign it
+ if (c->num_int_manifest_suffixes_ != 0) {
+ assert(!c->passphrase_.empty());
+ signer_ = std::make_unique<auth::SymmetricSigner>(
+ auth::CryptoSuite::HMAC_SHA256, c->passphrase_);
+ }
+
+ if (c->traffic_generator_type_ ==
+ std::string(Configuration::TRAFFIC_GENERATOR_RAND)) {
+ traffic_generator_ =
+ std::make_unique<RandomTrafficGenerator>(config_->maxPing_);
+ } else {
+ traffic_generator_ = std::make_unique<IncrSuffixTrafficGenerator>(
+ config_->name_, config_->first_suffix_, config_->maxPing_);
+ }
}
- virtual ~Client() {}
+ virtual ~Client() = default;
void ping() {
- std::cout << "start ping" << std::endl;
- doPing();
- portal_.runEventsLoop();
+ LoggerInfo() << "Starting ping...";
+
+ portal_.getThread().add([this]() {
+ portal_.connect();
+ portal_.registerTransportCallback(this);
+ doPing();
+ });
+
+ io_service_.run();
+ }
+
+ void onInterest(Interest &interest) override {
+ LoggerInfo() << "Unexpected interest received.";
}
void onContentObject(Interest &interest, ContentObject &object) override {
uint64_t rtt = 0;
if (!config_->certificate_.empty()) {
- auto t0 = std::chrono::steady_clock::now();
+ auto t0 = utils::SteadyTime::now();
if (verifier_.verifyPacket(&object)) {
- auto t1 = std::chrono::steady_clock::now();
- auto dt =
- std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0);
- std::cout << "Verification time: " << dt.count() << std::endl;
- std::cout << "<<< Signature Ok." << std::endl;
+ auto t1 = utils::SteadyTime::now();
+ auto dt = utils::SteadyTime::getDurationUs(t0, t1);
+ LoggerInfo() << "Verification time: " << dt.count();
+ LoggerInfo() << "<<< Signature Ok.";
} else {
- std::cout << "<<< Signature verification failed!" << std::endl;
+ LoggerErr() << "<<< Signature verification failed!";
}
}
- auto it = send_timestamps_.find(interest.getName().getSuffix());
- if (it != send_timestamps_.end()) {
- rtt = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count() -
- it->second;
+ if (auto it = send_timestamps_.find(interest.getName().getSuffix());
+ it != send_timestamps_.end()) {
+ rtt =
+ utils::SteadyTime::getDurationUs(it->second, utils::SteadyTime::now())
+ .count();
send_timestamps_.erase(it);
}
- if (config_->verbose_) {
- std::cout << "<<< recevied object. " << std::endl;
- std::cout << "<<< interest name: " << interest.getName()
- << " src port: " << interest.getSrcPort()
- << " dst port: " << interest.getDstPort()
- << " flags: " << interest.printFlags() << std::endl;
- std::cout << "<<< object name: " << object.getName()
- << " src port: " << object.getSrcPort()
- << " dst port: " << object.getDstPort()
- << " flags: " << object.printFlags() << " path label "
- << object.getPathLabel() << " ("
- << (object.getPathLabel() >> 24) << ")"
- << " TTL: " << (int)object.getTTL() << std::endl;
- } else if (!config_->quiet_) {
- std::cout << "<<< received object. " << std::endl;
- std::cout << "<<< round trip: " << rtt << " [us]" << std::endl;
- std::cout << "<<< interest name: " << interest.getName() << std::endl;
- std::cout << "<<< object name: " << object.getName() << std::endl;
- std::cout << "<<< content object size: "
- << object.payloadSize() + object.headerSize() << " [bytes]"
- << std::endl;
+ if (LoggerIsOn(2)) {
+ LoggerInfo() << "<<< recevied object. ";
+ LoggerInfo() << "<<< interest name: " << interest.getName().getPrefix()
+ << " (n_suffixes=" << config_->num_int_manifest_suffixes_
+ << ")";
+ LoggerInfo() << "<<< object name: " << object.getName() << " path label "
+ << object.getPathLabel() << " ("
+ << (object.getPathLabel() >> 24) << ")";
+ } else if (LoggerIsOn(1)) {
+ LoggerInfo() << "<<< received object. ";
+ LoggerInfo() << "<<< round trip: " << rtt << " [us]";
+ LoggerInfo() << "<<< interest name: " << interest.getName().getPrefix();
+
+ LoggerInfo() << "<<< object name: " << object.getName();
+ LoggerInfo() << "<<< content object size: "
+ << object.payloadSize() + object.headerSize() << " [bytes]";
}
- if (config_->dump_) {
- std::cout << "----- interest dump -----" << std::endl;
+ if (LoggerIsOn(3)) {
+ LoggerInfo() << "----- interest dump -----";
interest.dump();
- std::cout << "-------------------------" << std::endl;
- std::cout << "----- object dump -------" << std::endl;
+ LoggerInfo() << "-------------------------";
+ LoggerInfo() << "----- object dump -------";
object.dump();
- std::cout << "-------------------------" << std::endl;
- }
-
- if (!config_->quiet_) std::cout << std::endl;
-
- if (!config_->always_syn_) {
- if (object.testSyn() && object.testAck() && state_ == SYN_STATE) {
- state_ = ACK_STATE;
- }
+ LoggerInfo() << "-------------------------";
}
+ LoggerVerbose(1) << "\n";
received_++;
processed_++;
@@ -182,178 +170,221 @@ class Client : interface::Portal::ConsumerCallback {
}
void onTimeout(Interest::Ptr &interest, const Name &name) override {
- if (config_->verbose_) {
- std::cout << "### timeout for " << name
- << " src port: " << interest->getSrcPort()
- << " dst port: " << interest->getDstPort()
- << " flags: " << interest->printFlags() << std::endl;
- } else if (!config_->quiet_) {
- std::cout << "### timeout for " << name << std::endl;
+ if (LoggerIsOn(2)) {
+ LoggerInfo() << "### timeout for " << name;
+ } else if (LoggerIsOn(1)) {
+ LoggerInfo() << "### timeout for " << name;
}
- if (config_->dump_) {
- std::cout << "----- interest dump -----" << std::endl;
+ if (LoggerIsOn(3)) {
+ LoggerInfo() << "----- interest dump -----";
interest->dump();
- std::cout << "-------------------------" << std::endl;
+ LoggerInfo() << "-------------------------";
}
-
- if (!config_->quiet_) std::cout << std::endl;
+ LoggerVerbose(1) << "\n";
timedout_++;
processed_++;
- if (processed_ >= config_->maxPing_) {
- afterSignal();
- }
+ if (processed_ >= config_->maxPing_) afterSignal();
}
- void onError(std::error_code ec) override {}
+ void onError(const std::error_code &ec) override {
+ LoggerErr() << "Aborting ping due to internal error: " << ec.message();
+ afterSignal();
+ }
+
+ void checkFamily(hicn_packet_format_t format, int family) {
+ switch (HICN_PACKET_FORMAT_GET(format, 0)) {
+ case IPPROTO_IP:
+ if (family != AF_INET) throw std::runtime_error("Bad packet format");
+ break;
+ case IPPROTO_IPV6:
+ if (family != AF_INET6) throw std::runtime_error("Bad packet format");
+ break;
+ default:
+ throw std::runtime_error("Bad packet format");
+ }
+ }
void doPing() {
- const Name interest_name(config_->name_, (uint32_t)sequence_number_);
- hicn_format_t format;
- if (interest_name.getAddressFamily() == AF_INET) {
- format = HF_INET_TCP;
+ std::string name = traffic_generator_->getPrefix();
+ uint32_t sequence_number = traffic_generator_->getSuffix();
+ const Name interest_name(name, sequence_number);
+
+ hicn_packet_format_t format = config_->packet_format_;
+
+ switch (format) {
+ case HICN_PACKET_FORMAT_NEW:
+ /* Nothing to do */
+ break;
+ case HICN_PACKET_FORMAT_IPV4_TCP:
+ case HICN_PACKET_FORMAT_IPV6_TCP:
+ checkFamily(format, interest_name.getAddressFamily());
+ break;
+ default:
+ throw std::runtime_error("Bad packet format");
+ }
+
+ /*
+ * Eventually add the AH header if a signer is defined. Raise an error
+ * if format include the AH header but no signer is defined.
+ */
+ if (HICN_PACKET_FORMAT_IS_AH(format)) {
+ if (!signer_) throw std::runtime_error("Bad packet format");
} else {
- format = HF_INET6_TCP;
+ if (signer_) format = Packet::toAHFormat(format);
}
- auto interest = std::make_shared<Interest>(interest_name, format);
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(
+ format, signer_ ? signer_->getSignatureFieldSize() : 0);
+ interest->setName(interest_name);
interest->setLifetime(uint32_t(config_->interestLifetime_));
- interest->resetFlags();
- if (config_->open_ || config_->always_syn_) {
- if (state_ == SYN_STATE) {
- interest->setSyn();
- } else if (state_ == ACK_STATE) {
- interest->setAck();
- }
- } else if (config_->always_ack_) {
- interest->setAck();
+ if (LoggerIsOn(2)) {
+ LoggerInfo() << ">>> send interest " << interest->getName()
+ << " suffixes in manifest: "
+ << config_->num_int_manifest_suffixes_;
+ } else if (LoggerIsOn(1)) {
+ LoggerInfo() << ">>> send interest " << interest->getName();
}
+ LoggerVerbose(1) << "\n";
+
+ send_timestamps_[sequence_number] = utils::SteadyTime::now();
+ for (uint32_t i = 0; i < config_->num_int_manifest_suffixes_ &&
+ !traffic_generator_->hasFinished();
+ i++) {
+ uint32_t sequence_number = traffic_generator_->getSuffix();
- interest->setSrcPort(config_->srcPort_);
- interest->setDstPort(config_->dstPort_);
- interest->setTTL(config_->ttl_);
-
- if (config_->verbose_) {
- std::cout << ">>> send interest " << interest->getName()
- << " src port: " << interest->getSrcPort()
- << " dst port: " << interest->getDstPort()
- << " flags: " << interest->printFlags()
- << " TTL: " << (int)interest->getTTL() << std::endl;
- } else if (!config_->quiet_) {
- std::cout << ">>> send interest " << interest->getName() << std::endl;
+ interest->appendSuffix(sequence_number);
+ send_timestamps_[sequence_number] = utils::SteadyTime::now();
}
- if (config_->dump_) {
- std::cout << "----- interest dump -----" << std::endl;
+ if (LoggerIsOn(3)) {
+ LoggerInfo() << "----- interest dump -----";
interest->dump();
- std::cout << "-------------------------" << std::endl;
+ LoggerInfo() << "-------------------------";
}
- if (!config_->quiet_) std::cout << std::endl;
+ interest->encodeSuffixes();
+ if (signer_) signer_->signPacket(interest.get());
+ portal_.sendInterest(interest, interest->getLifetime());
- send_timestamps_[sequence_number_] =
- std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- portal_.sendInterest(std::move(interest));
-
- sequence_number_++;
- sent_++;
-
- if (sent_ < config_->maxPing_) {
+ if (!traffic_generator_->hasFinished()) {
this->timer_->expires_from_now(
std::chrono::microseconds(config_->pingInterval_));
- this->timer_->async_wait([this](const std::error_code e) { doPing(); });
+ this->timer_->async_wait([this](const std::error_code e) {
+ if (!e) {
+ doPing();
+ }
+ });
}
}
void afterSignal() {
- std::cout << "Stop ping" << std::endl;
- std::cout << "Sent: " << sent_ << " Received: " << received_
- << " Timeouts: " << timedout_ << std::endl;
- portal_.stopEventsLoop();
+ LoggerInfo() << "Stopping ping...";
+ LoggerInfo() << "Sent: " << traffic_generator_->getSentCount()
+ << " Received: " << received_ << " Timeouts: " << timedout_;
+ io_service_.stop();
}
void reset() {
- timer_.reset(new asio::steady_timer(portal_.getIoService()));
- sequence_number_ = config_->first_suffix_;
+ timer_.reset(new asio::steady_timer(portal_.getThread().getIoService()));
+ traffic_generator_->reset();
last_jump_ = 0;
processed_ = 0;
state_ = SYN_STATE;
- sent_ = 0;
received_ = 0;
timedout_ = 0;
}
private:
SendTimeMap send_timestamps_;
+ asio::io_service io_service_;
interface::Portal portal_;
asio::signal_set signals_;
- uint64_t sequence_number_;
- uint64_t last_jump_;
- uint64_t processed_;
- uint32_t state_;
- uint32_t sent_;
- uint32_t received_;
- uint32_t timedout_;
- std::unique_ptr<asio::steady_timer> timer_;
Configuration *config_;
+ std::unique_ptr<asio::steady_timer> timer_;
+ uint64_t last_jump_ = 0;
+ uint64_t processed_ = 0;
+ uint32_t state_ = SYN_STATE;
+ uint32_t received_ = 0;
+ uint32_t timedout_ = 0;
Verifier verifier_;
+ std::unique_ptr<auth::Signer> signer_;
+ std::unique_ptr<TrafficGenerator> traffic_generator_;
};
+static std::unordered_map<std::string, hicn_packet_format_t> const
+ packet_format_map = {{"ipv4_tcp", HICN_PACKET_FORMAT_IPV4_TCP},
+ {"ipv6_tcp", HICN_PACKET_FORMAT_IPV6_TCP},
+ {"new", HICN_PACKET_FORMAT_NEW}};
+
+std::string str_tolower(std::string s) {
+ std::transform(s.begin(), s.end(), s.begin(),
+ [](unsigned char c) { return std::tolower(c); });
+ return s;
+}
+
void help() {
- std::cout << "usage: hicn-consumer-ping [options]" << std::endl;
- std::cout << "PING options" << std::endl;
- std::cout
- << "-i <val> ping interval in microseconds (default 1000000ms)"
- << std::endl;
- std::cout << "-m <val> maximum number of pings to send (default 10)"
- << std::endl;
- std::cout << "-s <val> sorce port (default 9695)" << std::endl;
- std::cout << "-d <val> destination port (default 8080)" << std::endl;
- std::cout << "-t <val> set packet ttl (default 64)" << std::endl;
- std::cout << "-O open tcp connection (three way handshake) "
- "(default false)"
- << std::endl;
- std::cout << "-S send always syn messages (default false)"
- << std::endl;
- std::cout << "-A send always ack messages (default false)"
- << std::endl;
- std::cout << "HICN options" << std::endl;
- std::cout << "-n <val> hicn name (default b001::1)" << std::endl;
- std::cout
- << "-l <val> interest lifetime in milliseconds (default 500ms)"
- << std::endl;
- std::cout << "OUTPUT options" << std::endl;
- std::cout << "-V verbose, prints statistics about the "
- "messagges sent and received (default false)"
- << std::endl;
- std::cout << "-D dump, dumps sent and received packets "
- "(default false)"
- << std::endl;
- std::cout << "-q quiet, not prints (default false)"
- << std::endl;
- std::cout << "-H prints this message" << std::endl;
+ LoggerInfo() << "usage: hicn-consumer-ping [options]";
+ LoggerInfo() << "PING options";
+ LoggerInfo() << "-i <val> ping interval in microseconds (default "
+ "1000000ms)";
+ LoggerInfo()
+ << "-m <val> maximum number of pings to send (default 10)";
+ LoggerInfo() << "-a <val> <pass> set the passphrase and the number of "
+ "suffixes in interest manifest (default 0);";
+ LoggerInfo()
+ << " e.g. '-m 6 -a -2' sends two interest (0 and "
+ "3) with 2 suffixes each (1,2 and 4,5 respectively)";
+ LoggerInfo() << "HICN options";
+ LoggerInfo() << "-n <val> hicn name (default b001::1)";
+ LoggerInfo()
+ << "-l <val> interest lifetime in milliseconds (default "
+ "500ms)";
+ LoggerInfo() << "OUTPUT options";
+ LoggerInfo() << "-V verbose, prints statistics about the "
+ "messagges sent and received (default false)";
+ LoggerInfo() << "-D dump, dumps sent and received packets "
+ "(default false)";
+ LoggerInfo() << "-q quiet, not prints (default false)";
+ LoggerInfo()
+ << "-z <io_module> IO module to use. Default: hicnlight_module";
+ LoggerInfo() << "-F <conf_file> Path to optional configuration file for "
+ "libtransport";
+ LoggerInfo() << "-b <type> Traffic generator type. Use 'RANDOM' for "
+ "random prefixes and suffixes. Default: sequential suffixes.";
+ LoggerInfo()
+ << "-w <packet_format> Packet format (without signature, defaults "
+ "to IPV6_TCP)";
+ LoggerInfo() << "-H prints this message";
}
-int main(int argc, char *argv[]) {
+int start(int argc, char *argv[]) {
#ifdef _WIN32
WSADATA wsaData = {0};
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
- Configuration *c = new Configuration();
+ transport::interface::global_config::GlobalConfigInterface global_conf;
+
+ auto c = std::make_unique<Configuration>();
int opt;
std::string producer_certificate = "";
- while ((opt = getopt(argc, argv, "j::t:i:m:s:d:n:l:f:c:SAOqVDH")) != -1) {
+ std::string conf_file;
+ transport::interface::global_config::IoModuleConfiguration io_config;
+ io_config.name = "hicnlight_module";
+
+ while ((opt = getopt(argc, argv, "a:b:i:m:f:n:l:c:z:F:w:H")) != -1) {
switch (opt) {
- case 't':
- c->ttl_ = (uint8_t)std::stoi(optarg);
+ case 'a':
+ c->num_int_manifest_suffixes_ = std::stoi(optarg);
+ c->passphrase_ = argv[optind];
+ break;
+ case 'b':
+ c->traffic_generator_type_ = optarg;
break;
case 'i':
c->pingInterval_ = std::stoi(optarg);
@@ -362,13 +393,7 @@ int main(int argc, char *argv[]) {
c->maxPing_ = std::stoi(optarg);
break;
case 'f':
- c->first_suffix_ = std::stoul(optarg);
- break;
- case 's':
- c->srcPort_ = std::stoi(optarg);
- break;
- case 'd':
- c->dstPort_ = std::stoi(optarg);
+ c->first_suffix_ = uint32_t(std::stoul(optarg));
break;
case 'n':
c->name_ = optarg;
@@ -376,53 +401,48 @@ int main(int argc, char *argv[]) {
case 'l':
c->interestLifetime_ = std::stoi(optarg);
break;
- case 'V':
- c->verbose_ = true;
- ;
- break;
- case 'D':
- c->dump_ = true;
- break;
- case 'O':
- c->always_syn_ = false;
- c->always_ack_ = false;
- c->open_ = true;
- break;
- case 'S':
- c->always_syn_ = true;
- c->always_ack_ = false;
- c->open_ = false;
+ case 'c':
+ c->certificate_ = std::string(optarg);
break;
- case 'A':
- c->always_syn_ = false;
- c->always_ack_ = true;
- c->open_ = false;
+ case 'z':
+ io_config.name = optarg;
break;
- case 'q':
- c->quiet_ = true;
- c->verbose_ = false;
- c->dump_ = false;
+ case 'F':
+ conf_file = optarg;
break;
- case 'c':
- c->certificate_ = std::string(optarg);
+ case 'w': {
+ std::string packet_format_s = std::string(optarg);
+ packet_format_s = str_tolower(packet_format_s);
+ auto it = packet_format_map.find(std::string(optarg));
+ if (it == packet_format_map.end())
+ throw std::runtime_error("Bad packet format");
+ c->packet_format_ = it->second;
break;
- case 'H':
+ }
default:
help();
exit(EXIT_FAILURE);
}
}
- auto ping = std::make_unique<Client>(c);
+ /**
+ * IO module configuration
+ */
+ io_config.set();
+
+ /**
+ * Parse config file
+ */
+ global_conf.parseConfigurationFile(conf_file);
+
+ auto ping = std::make_unique<Client>(c.get());
auto t0 = std::chrono::steady_clock::now();
ping->ping();
auto t1 = std::chrono::steady_clock::now();
- std::cout
- << "Elapsed time: "
- << std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0).count()
- << std::endl;
+ LoggerInfo() << "Elapsed time: "
+ << utils::SteadyTime::getDurationMs(t0, t1).count() << "ms";
#ifdef _WIN32
WSACleanup();
@@ -437,5 +457,5 @@ int main(int argc, char *argv[]) {
} // namespace transport
int main(int argc, char *argv[]) {
- return transport::core::ping::main(argc, argv);
+ return transport::core::ping::start(argc, argv);
}
diff --git a/apps/ping/src/ping_server.cc b/apps/ping/src/ping_server.cc
index baf9c6698..900da18ca 100644
--- a/apps/ping/src/ping_server.cc
+++ b/apps/ping/src/ping_server.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -21,10 +21,13 @@
#include <openssl/applink.c>
#endif
-#include <hicn/transport/auth/identity.h>
+#include <hicn/apps/utils/logger.h>
#include <hicn/transport/auth/signer.h>
+#include <hicn/transport/auth/verifier.h>
#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/core/interest.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/utils/string_tokenizer.h>
#include <asio.hpp>
@@ -36,170 +39,128 @@ namespace interface {
using HashAlgorithm = core::HashAlgorithm;
using CryptoSuite = auth::CryptoSuite;
-auth::Identity setProducerIdentity(std::string keystore_name,
- std::string keystore_password,
- auth::CryptoHashType hash_algorithm) {
- if (access(keystore_name.c_str(), F_OK) != -1) {
- return auth::Identity(keystore_name, keystore_password, hash_algorithm);
- } else {
- return auth::Identity(keystore_name, keystore_password,
- CryptoSuite::RSA_SHA256, 1024, 365, "producer-test");
- }
-}
-
class CallbackContainer {
- const std::size_t log2_content_object_buffer_size = 12;
-
- public:
- CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose,
- bool dump, bool quite, bool flags, bool reset, uint8_t ttl,
- auth::Identity *identity, bool sign, uint32_t lifetime)
- : buffer_(object_size, 'X'),
- content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- content_objects_index_(0),
- verbose_(verbose),
- dump_(dump),
- quite_(quite),
- flags_(flags),
- reset_(reset),
- ttl_(ttl),
- identity_(identity),
- sign_(sign) {
- core::Packet::Format format;
-
- if (prefix.getAddressFamily() == AF_INET) {
- format = core::Packet::Format::HF_INET_TCP;
- if (sign_) {
- format = core::Packet::Format::HF_INET_TCP_AH;
- }
- } else {
- format = core::Packet::Format::HF_INET6_TCP;
- if (sign_) {
- format = core::Packet::Format::HF_INET6_TCP_AH;
- }
+ private:
+ std::shared_ptr<ContentObject> createContentObject(const Name &name,
+ uint32_t lifetime,
+ const Interest &interest) {
+ auto content_object =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ interest.getFormat(),
+ (sign_ && signer_) ? signer_->getSignatureFieldSize() : 0);
+
+ content_object->setName(name);
+ content_object->setLifetime(lifetime);
+ content_object->setLocator(interest.getLocator());
+
+ if (LoggerIsOn(2)) {
+ LoggerInfo() << ">>> send object " << content_object->getName();
+ } else if (LoggerIsOn(1)) {
+ LoggerInfo() << ">>> send object " << content_object->getName();
}
- for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = std::make_shared<ContentObject>(
- prefix, format, 0, (const uint8_t *)buffer_.data(), buffer_.size());
- content_objects_[i]->setLifetime(lifetime);
+ if (LoggerIsOn(3)) {
+ LoggerInfo() << "----- object dump -----";
+ content_object->dump();
+ LoggerInfo() << "-----------------------";
}
+
+ if (sign_ && signer_) signer_->signPacket(content_object.get());
+ return content_object;
}
- void processInterest(ProducerSocket &p, const Interest &interest,
+ public:
+ CallbackContainer([[maybe_unused]] const Name &prefix, uint32_t object_size,
+ auth::Signer *signer, bool sign, std::string passphrase,
+ [[maybe_unused]] uint32_t lifetime)
+ : buffer_(object_size, 'X'), signer_(signer), sign_(sign) {
+ // Verifier for interest manifests
+ if (!passphrase.empty())
+ verifier_ = std::make_unique<auth::SymmetricVerifier>(passphrase);
+ }
+
+ void processInterest(ProducerSocket &p, Interest &interest,
uint32_t lifetime) {
- if (verbose_) {
- std::cout << "<<< received interest " << interest.getName()
- << " src port: " << interest.getSrcPort()
- << " dst port: " << interest.getDstPort()
- << " flags: " << interest.printFlags()
- << "TTL: " << (int)interest.getTTL() << std::endl;
- } else if (!quite_) {
- std::cout << "<<< received interest " << interest.getName() << std::endl;
+ if (verifier_ && interest.hasManifest()) {
+ auto t0 = utils::SteadyTime::now();
+ if (verifier_->verifyPacket(&interest)) {
+ auto t1 = utils::SteadyTime::now();
+ auto dt = utils::SteadyTime::getDurationUs(t0, t1);
+ LoggerInfo() << "Verification time: " << dt.count();
+ LoggerInfo() << "<<< Signature Ok.";
+ } else {
+ LoggerErr() << "<<< Signature verification failed!";
+ }
}
- if (dump_) {
- std::cout << "----- interest dump -----" << std::endl;
- interest.dump();
- std::cout << "-------------------------" << std::endl;
+ if (LoggerIsOn(2)) {
+ LoggerInfo() << "<<< received interest " << interest.getName()
+ << " suffixes in manifest: " << interest.numberOfSuffixes();
+ } else if (LoggerIsOn(1)) {
+ LoggerInfo() << "<<< received interest " << interest.getName();
}
- if (interest.testRst()) {
- std::cout << "!!!got a reset, I don't reply" << std::endl;
- } else {
- auto &content_object = content_objects_[content_objects_index_++ & mask_];
-
- content_object->setName(interest.getName());
- content_object->setLifetime(lifetime);
- content_object->setLocator(interest.getLocator());
- content_object->setSrcPort(interest.getDstPort());
- content_object->setDstPort(interest.getSrcPort());
- content_object->setTTL(ttl_);
-
- if (!sign_) {
- content_object->resetFlags();
- }
-
- if (flags_) {
- if (interest.testSyn()) {
- content_object->setSyn();
- content_object->setAck();
- } else if (interest.testAck()) {
- content_object->setAck();
- } // here I may need to handle the FIN flag;
- } else if (reset_) {
- content_object->setRst();
- }
+ if (LoggerIsOn(3)) {
+ LoggerInfo() << "----- interest dump -----";
+ interest.dump();
+ LoggerInfo() << "-------------------------";
+ }
- if (verbose_) {
- std::cout << ">>> send object " << content_object->getName()
- << " src port: " << content_object->getSrcPort()
- << " dst port: " << content_object->getDstPort()
- << " flags: " << content_object->printFlags()
- << " TTL: " << (int)content_object->getTTL() << std::endl;
- } else if (!quite_) {
- std::cout << ">>> send object " << content_object->getName()
- << std::endl;
- }
+ if (!interest.isValid()) throw std::runtime_error("Bad interest format");
+ Name name = interest.getName();
- if (dump_) {
- std::cout << "----- object dump -----" << std::endl;
- content_object->dump();
- std::cout << "-----------------------" << std::endl;
- }
+ if (!interest.hasManifest()) { // Single interest
+ auto content_object = createContentObject(name, lifetime, interest);
+ p.produce(*content_object);
+ } else { // Interest manifest
+ uint32_t _;
+ const uint32_t *suffix = NULL;
+ UNUSED(_);
- if (!quite_) std::cout << std::endl;
+ interest_manifest_foreach_suffix(interest.getIntManifestHeader(), suffix,
+ _) {
+ name.setSuffix(*suffix);
- if (sign_) {
- identity_->getSigner()->signPacket(content_object.get());
+ auto content_object = createContentObject(name, lifetime, interest);
+ p.produce(*content_object);
}
-
- p.produce(*content_object);
}
+
+ LoggerVerbose(1) << "\n";
}
private:
std::string buffer_;
- std::vector<std::shared_ptr<ContentObject>> content_objects_;
- std::uint16_t mask_;
- std::uint16_t content_objects_index_;
- bool verbose_;
- bool dump_;
- bool quite_;
- bool flags_;
- bool reset_;
- uint8_t ttl_;
- auth::Identity *identity_;
+ auth::Signer *signer_;
bool sign_;
+ std::unique_ptr<auth::Verifier> verifier_;
};
void help() {
- std::cout << "usage: hicn-preoducer-ping [options]" << std::endl;
- std::cout << "PING options" << std::endl;
- std::cout << "-s <val> object content size (default 1350B)" << std::endl;
- std::cout << "-n <val> hicn name (default b001::/64)" << std::endl;
- std::cout << "-f set tcp flags according to the flag received "
- "(default false)"
- << std::endl;
- std::cout << "-l data lifetime" << std::endl;
- std::cout << "-r always reply with a reset flag (default false)"
- << std::endl;
- std::cout << "-t set ttl (default 64)" << std::endl;
- std::cout << "OUTPUT options" << std::endl;
- std::cout << "-V verbose, prints statistics about the messagges sent "
- "and received (default false)"
- << std::endl;
- std::cout << "-D dump, dumps sent and received packets (default false)"
- << std::endl;
- std::cout << "-q quite, not prints (default false)" << std::endl;
+ LoggerInfo() << "usage: hicn-preoducer-ping [options]";
+ LoggerInfo() << "PING options";
+ LoggerInfo() << "-s <val> object content size (default 1350B)";
+ LoggerInfo() << "-n <val> hicn name (default b001::/64)";
+ LoggerInfo() << "-l data lifetime";
+ LoggerInfo() << "OUTPUT options";
+ LoggerInfo() << "-V verbose, prints statistics about the "
+ "messagges sent "
+ " and received (default false)";
+ LoggerInfo() << "-D dump, dumps sent and received packets "
+ "(default false)";
+ LoggerInfo() << "-q quiet, not prints (default false)";
+ LoggerInfo()
+ << "-z <io_module> IO module to use. Default: hicnlight_module";
+ LoggerInfo() << "-F <conf_file> Path to optional configuration file for "
+ "libtransport";
#ifndef _WIN32
- std::cout << "-d daemon mode" << std::endl;
+ LoggerInfo() << "-d daemon mode";
#endif
- std::cout << "-H prints this message" << std::endl;
+ LoggerInfo() << "-H prints this message";
}
-int main(int argc, char **argv) {
+int ping_main(int argc, char **argv) {
+ transport::interface::global_config::GlobalConfigInterface global_conf;
#ifdef _WIN32
WSADATA wsaData = {0};
WSAStartup(MAKEWORD(2, 2), &wsaData);
@@ -208,59 +169,41 @@ int main(int argc, char **argv) {
#endif
std::string name_prefix = "b001::0/64";
std::string delimiter = "/";
- bool verbose = false;
- bool dump = false;
- bool quite = false;
- bool flags = false;
- bool reset = false;
uint32_t object_size = 1250;
- uint8_t ttl = 64;
std::string keystore_path = "./rsa_crypto_material.p12";
std::string keystore_password = "cisco";
+ std::string passphrase = "";
bool sign = false;
uint32_t data_lifetime = default_values::content_object_expiry_time;
+ std::string conf_file;
+ transport::interface::global_config::IoModuleConfiguration io_config;
+ io_config.name = "hicnlight_module";
+
int opt;
#ifndef _WIN32
- while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDdHk:p:")) != -1) {
+ while ((opt = getopt(argc, argv, "a:s:n:t:l:frdHk:p:z:F:")) != -1) {
#else
- while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) {
+ while ((opt = getopt(argc, argv, "s:n:t:l:frHk:p:z:F:")) != -1) {
#endif
switch (opt) {
+ case 'a':
+ passphrase = optarg;
+ break;
case 's':
object_size = std::stoi(optarg);
break;
case 'n':
name_prefix = optarg;
break;
- case 't':
- ttl = (uint8_t)std::stoi(optarg);
- break;
case 'l':
data_lifetime = std::stoi(optarg);
break;
- case 'V':
- verbose = true;
- break;
- case 'D':
- dump = true;
- break;
- case 'q':
- verbose = false;
- dump = false;
- quite = true;
- break;
#ifndef _WIN32
case 'd':
daemon = true;
break;
#endif
- case 'f':
- flags = true;
- break;
- case 'r':
- reset = true;
- break;
case 'k':
keystore_path = optarg;
sign = true;
@@ -268,7 +211,12 @@ int main(int argc, char **argv) {
case 'p':
keystore_password = optarg;
break;
- case 'H':
+ case 'z':
+ io_config.name = optarg;
+ break;
+ case 'F':
+ conf_file = optarg;
+ break;
default:
help();
exit(EXIT_FAILURE);
@@ -281,6 +229,16 @@ int main(int argc, char **argv) {
}
#endif
+ /**
+ * IO module configuration
+ */
+ io_config.set();
+
+ /**
+ * Parse config file
+ */
+ global_conf.parseConfigurationFile(conf_file);
+
core::Prefix producer_namespace(name_prefix);
utils::StringTokenizer tokenizer(name_prefix, delimiter);
@@ -290,21 +248,23 @@ int main(int argc, char **argv) {
if (object_size > 1350) object_size = 1350;
CallbackContainer *stubs;
- auth::Identity identity = setProducerIdentity(
- keystore_path, keystore_password, auth::CryptoHashType::SHA256);
+ std::unique_ptr<auth::Signer> signer;
if (sign) {
- stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags,
- reset, ttl, &identity, sign, data_lifetime);
+ signer = std::make_unique<auth::AsymmetricSigner>(keystore_path,
+ keystore_password);
+ stubs = new CallbackContainer(n, object_size, signer.get(), sign,
+ passphrase, data_lifetime);
} else {
- auth::Identity *identity = nullptr;
- stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags,
- reset, ttl, identity, sign, data_lifetime);
+ auth::Signer *signer = nullptr;
+ stubs = new CallbackContainer(n, object_size, signer, sign, passphrase,
+ data_lifetime);
}
ProducerSocket p;
p.registerPrefix(producer_namespace);
+ p.setSocketOption(GeneralTransportOptions::MANIFEST_MAX_CAPACITY, 0U);
p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
p.setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
@@ -313,12 +273,13 @@ int main(int argc, char **argv) {
std::placeholders::_2, data_lifetime));
p.connect();
+ p.start();
asio::io_service io_service;
asio::signal_set signal_set(io_service, SIGINT);
signal_set.async_wait(
[&p, &io_service](const std::error_code &, const int &) {
- std::cout << "STOPPING!!" << std::endl;
+ LoggerInfo() << "STOPPING!!";
p.stop();
io_service.stop();
});
@@ -336,5 +297,5 @@ int main(int argc, char **argv) {
} // end namespace transport
int main(int argc, char **argv) {
- return transport::interface::main(argc, argv);
+ return transport::interface::ping_main(argc, argv);
}