aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorEnrico Loparco (eloparco) <eloparco@cisco.com>2021-06-24 09:15:41 +0200
committerEnrico Loparco (eloparco) <eloparco@cisco.com>2021-06-24 09:15:41 +0200
commit229385955109b866a23c4ac2aa03d4d11044c39d (patch)
tree0591f9c2fc4144d62330337cc2b94c63dfeded54 /apps
parent6ffbb5ed61733b8dbef39b1a9d437e899e9359d7 (diff)
[HICN-708] Rebase with master
Signed-off-by: Enrico Loparco (eloparco) <eloparco@cisco.com> Change-Id: I2122e1d61dd3b2e039972624ffbdbcb3c5610159
Diffstat (limited to 'apps')
-rw-r--r--apps/.clang-format14
-rw-r--r--apps/CMakeLists.txt25
-rw-r--r--apps/cmake/Modules/Packaging.cmake10
-rw-r--r--apps/higet/CMakeLists.txt4
-rw-r--r--apps/higet/higet.cc43
-rw-r--r--apps/http-proxy/CMakeLists.txt41
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt39
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h200
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h107
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h (renamed from apps/http-proxy/src/HTTP1.xMessageFastParser.h)33
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_proxy.h210
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/http_session.h (renamed from apps/http-proxy/src/ATSConnector.h)49
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h (renamed from apps/http-proxy/src/IcnReceiver.h)41
-rw-r--r--apps/http-proxy/includes/hicn/http-proxy/utils.h51
-rw-r--r--apps/http-proxy/main.cc135
-rw-r--r--apps/http-proxy/src/forwarder_interface.cc263
-rw-r--r--apps/http-proxy/src/http_1x_message_fast_parser.cc (renamed from apps/http-proxy/src/HTTP1.xMessageFastParser.cc)47
-rw-r--r--apps/http-proxy/src/http_proxy.cc376
-rw-r--r--apps/http-proxy/src/http_session.cc (renamed from apps/http-proxy/src/ATSConnector.cc)166
-rw-r--r--apps/http-proxy/src/icn_receiver.cc (renamed from apps/http-proxy/src/IcnReceiver.cc)103
20 files changed, 1723 insertions, 234 deletions
diff --git a/apps/.clang-format b/apps/.clang-format
new file mode 100644
index 000000000..cd21e2017
--- /dev/null
+++ b/apps/.clang-format
@@ -0,0 +1,14 @@
+# Copyright (c) 2017-2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+BasedOnStyle: Google
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index f5075a7aa..37e44f9e7 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -29,15 +29,19 @@ set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE)
if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR)
find_package(Libtransport REQUIRED)
+ find_package(hicnctrl REQUIRED)
find_package(Threads REQUIRED)
- include_directories(${LIBTRANSPORT_INCLUDE_DIRS})
else()
- if (${CMAKE_SYSTEM_NAME} STREQUAL "Android")
+ if (DISABLE_SHARED_LIBRARIES)
find_package(OpenSSL REQUIRED)
- find_package(ZLIB REQUIRED)
+ if (NOT WIN32)
+ find_package(ZLIB REQUIRED)
+ endif ()
set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC})
+ set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC})
else ()
set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED})
+ set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED})
endif ()
list(APPEND DEPENDENCIES
@@ -45,21 +49,19 @@ else()
)
endif()
-set(SUFFIX "")
-if (${LIBTRANSPORT_LIBRARIES} MATCHES ".*-memif.*")
- set(DEPENDENCIES ${LIBMEMIF_SHARED})
- set(SUFFIX "-memif")
+# Worksroung for unresolved symbols in vpp libraries
+if(${CMAKE_SYSTEM_NAME} MATCHES Linux)
set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs")
endif()
-set(HICN_APPS "${HICN_APPS}${SUFFIX}")
-
list(APPEND LIBRARIES
${LIBTRANSPORT_LIBRARIES}
+ ${LIBHICNCTRL_LIBRARIES}
${OPENSSL_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
)
+set(APPS_LIBRARY_LIST "${OPENSSL_LIBRARIES};${CMAKE_THREAD_LIBS_INIT}" CACHE INTERNAL "APPS_LIBRARY_LIST")
if (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996")
endif ()
@@ -68,6 +70,7 @@ include(Packaging)
set(HIGET higet)
set(HTTP_PROXY hicn-http-proxy)
-
-add_subdirectory(http-proxy)
+if (NOT WIN32)
+ add_subdirectory(http-proxy)
+endif ()
add_subdirectory(higet)
diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/Modules/Packaging.cmake
index 6a6e34777..9b3fa2e72 100644
--- a/apps/cmake/Modules/Packaging.cmake
+++ b/apps/cmake/Modules/Packaging.cmake
@@ -22,7 +22,17 @@ set(${HICN_APPS}_DEB_DEPENDENCIES
CACHE STRING "Dependencies for deb/rpm package."
)
+set(${HICN_APPS}-dev_DEB_DEPENDENCIES
+ "lib${LIBTRANSPORT}-dev (>= stable_version)"
+ CACHE STRING "Dependencies for deb/rpm package."
+)
+
set(${HICN_APPS}_RPM_DEPENDENCIES
"lib${LIBTRANSPORT} >= stable_version"
CACHE STRING "Dependencies for deb/rpm package."
+)
+
+set(${HICN_APPS}-dev_RPM_DEPENDENCIES
+ "lib${LIBTRANSPORT}-devel >= stable_version"
+ CACHE STRING "Dependencies for deb/rpm package."
) \ No newline at end of file
diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt
index 1cf14c287..b929a24e4 100644
--- a/apps/higet/CMakeLists.txt
+++ b/apps/higet/CMakeLists.txt
@@ -31,6 +31,10 @@ list(APPEND APPS_SRC
higet.cc
)
+if (WIN32)
+ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\"" )
+endif()
+
if (NOT DISABLE_EXECUTABLES)
build_executable(${HIGET}
SOURCES ${APPS_SRC}
diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc
index 2aa42e460..9ae869731 100644
--- a/apps/higet/higet.cc
+++ b/apps/higet/higet.cc
@@ -14,12 +14,12 @@
*/
#include <hicn/transport/http/client_connection.h>
+
+#include <algorithm>
#include <fstream>
+#include <functional>
#include <map>
-#include <experimental/algorithm>
-#include <experimental/functional>
-
#define ASIO_STANDALONE
#include <asio.hpp>
#undef ASIO_STANDALONE
@@ -114,15 +114,12 @@ class ReadBytesCallbackImplementation
// read next chunk size
const char *begin = (const char *)payload->data();
const char *end = (const char *)payload->tail();
-
- using std::experimental::make_boyer_moore_searcher;
- auto it = std::experimental::search(
- begin, end,
- make_boyer_moore_searcher(chunk_separator.begin(),
- chunk_separator.end()));
+ const char *begincrlf2 = (const char *)chunk_separator.c_str();
+ const char *endcrlf2 = begincrlf2 + chunk_separator.size();
+ auto it = std::search(begin, end, begincrlf2, endcrlf2);
if (it != end) {
chunk_size_ = std::stoul(begin, 0, 16);
- content_size_ += chunk_size_;
+ content_size_ += (long)chunk_size_;
payload->trimStart(it + chunk_separator.size() - begin);
std::size_t to_write;
@@ -134,7 +131,7 @@ class ReadBytesCallbackImplementation
}
out_->write((char *)payload->data(), to_write);
- byte_downloaded_ += to_write;
+ byte_downloaded_ += (long)to_write;
payload->trimStart(to_write);
if (payload->length() >= chunk_separator.size()) {
@@ -144,7 +141,7 @@ class ReadBytesCallbackImplementation
}
} else {
out_->write((char *)payload->data(), payload->length());
- byte_downloaded_ += payload->length();
+ byte_downloaded_ += (long)payload->length();
}
if (file_name_ != "-") {
@@ -200,12 +197,18 @@ class ReadBytesCallbackImplementation
void print_bar(long value, long max_value, bool last) {
float progress = (float)value / max_value;
+#ifdef _WIN32
+ CONSOLE_SCREEN_BUFFER_INFO csbi;
+ GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi);
+ int barWidth = csbi.srWindow.Right - csbi.srWindow.Left + 7;
+#else
struct winsize size;
ioctl(STDOUT_FILENO, TIOCGWINSZ, &size);
int barWidth = size.ws_col - 8;
+#endif
std::cout << "[";
- int pos = barWidth * progress;
+ int pos = barWidth * (int)progress;
for (int i = 0; i < barWidth; ++i) {
if (i < pos) {
std::cout << "=";
@@ -252,7 +255,7 @@ long checkFileStatus(std::string file_name) {
void usage(char *program_name) {
std::cerr << "usage:" << std::endl;
std::cerr << program_name << " [option]... [url]..." << std::endl;
- std::cerr << program_name << "options:" << std::endl;
+ std::cerr << program_name << " options:" << std::endl;
std::cerr
<< "-O <out_put_path> = write documents to <out_put_file>"
<< std::endl;
@@ -303,8 +306,11 @@ int main(int argc, char **argv) {
}
}
- name = argv[optind];
+ if (!argv[optind]) {
+ usage(argv[0]);
+ }
+ name = argv[optind];
std::cerr << "Using name " << name << " and name first word "
<< conf.ipv6_first_word << std::endl;
@@ -329,9 +335,14 @@ int main(int argc, char **argv) {
{"Connection", "Keep-Alive"},
{"Range", range}};
}
+
transport::http::HTTPClientConnection connection;
+
if (!conf.producer_certificate.empty()) {
- connection.setCertificate(conf.producer_certificate);
+ std::shared_ptr<transport::auth::Verifier> verifier =
+ std::make_shared<transport::auth::AsymmetricVerifier>(
+ conf.producer_certificate);
+ connection.setVerifier(verifier);
}
t1 = std::chrono::system_clock::now();
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
index d6681097c..8c2043c30 100644
--- a/apps/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/CMakeLists.txt
@@ -14,6 +14,18 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
set(CMAKE_CXX_STANDARD 14)
+# -Wno-c99-designator issue
+#
+# Unsure about version for which this was introduced
+# clang version 9.0.8 (no flag), 11.0.5 (ndk22, flag)
+if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ EXECUTE_PROCESS( COMMAND ${CMAKE_CXX_COMPILER} --version OUTPUT_VARIABLE clang_full_version_string )
+ string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION_STRING ${clang_full_version_string})
+ if (CLANG_VERSION_STRING VERSION_GREATER_EQUAL 11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-c99-designator")
+ endif()
+endif()
+
set(CMAKE_MODULE_PATH
${CMAKE_MODULE_PATH}
"${CMAKE_SOURCE_DIR}/cmake/Modules/"
@@ -24,39 +36,36 @@ if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release")
endif ()
-include_directories(
- SYSTEM
- ${CMAKE_BINARY_DIR}
- ${LIB${TRANSPORT_LIBRARY}_INCLUDE_DIR}
-)
-
set(LIB_SOURCE_FILES
- src/ATSConnector.cc
- src/HTTP1.xMessageFastParser.cc
- src/IcnReceiver.cc
-)
-
-set(LIB_SERVER_HEADER_FILES
- src/IcnReceiver.h
- src/ATSConnector.h
- src/HTTP1.xMessageFastParser.h
+ src/http_session.cc
+ src/http_proxy.cc
+ src/http_1x_message_fast_parser.cc
+ src/icn_receiver.cc
+ src/forwarder_interface.cc
)
set(APP_SOURCE_FILES
main.cc
)
+add_subdirectory(includes/hicn/http-proxy)
set(LIBHTTP_PROXY hicnhttpproxy)
set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static)
+list(APPEND COMPILER_DEFINITIONS
+ -DWITH_POLICY
+)
+
build_library(${LIBHTTP_PROXY}
STATIC
SOURCES ${LIB_SOURCE_FILES}
LINK_LIBRARIES ${LIBRARIES}
DEPENDS ${DEPENDENCIES}
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
+ INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
)
if (NOT DISABLE_EXECUTABLES)
diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
new file mode 100644
index 000000000..75cbbd64b
--- /dev/null
+++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt
@@ -0,0 +1,39 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/http_proxy.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/http_session.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/http_1x_message_fast_parser.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/icn_receiver.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/utils.h
+)
+
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
+
+set(LIBPROXY_INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/../.. ""
+ CACHE INTERNAL
+ "" FORCE
+)
+
+set(LIBPROXY_TO_INSTALL_HEADER_FILES
+ ${HEADER_FILES} ""
+ CACHE INTERNAL
+ "" FORCE
+)
+
diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
new file mode 100644
index 000000000..19c96a9e3
--- /dev/null
+++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/portability/c_portability.h>
+#include <hicn/transport/utils/branch_prediction.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
+
+#include <asio.hpp>
+#include <chrono>
+#include <sstream>
+#include <string>
+
+#include "forwarder_interface.h"
+
+
+#define RETRY_INTERVAL 300
+
+namespace transport {
+
+static constexpr char server_header[] = "server";
+static constexpr char prefix_header[] = "prefix";
+static constexpr char port_header[] = "port";
+
+using OnForwarderConfiguredCallback = std::function<void(bool)>;
+
+class ForwarderConfig {
+ public:
+ using ListenerRetrievedCallback = std::function<void(std::error_code)>;
+
+ template <typename Callback>
+ ForwarderConfig(asio::io_service& io_service, Callback&& callback)
+ : forwarder_interface_(io_service),
+ resolver_(io_service),
+ retx_count_(0),
+ timer_(io_service),
+ hicn_listen_port_(~0),
+ listener_retrieved_callback_(std::forward<Callback>(callback)) {}
+
+ void close() {
+ timer_.cancel();
+ resolver_.cancel();
+ forwarder_interface_.close();
+ }
+
+ void tryToConnectToForwarder() {
+ doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
+ }
+
+ void doTryToConnectToForwarder(std::error_code ec) {
+ if (!ec) {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.connectToForwarder();
+ if (ret < 0) {
+ // We were not able to connect to the local forwarder. Do not give up
+ // and retry.
+ TRANSPORT_LOGE("Could not connect to local forwarder. Retrying.");
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder,
+ this, std::placeholders::_1));
+ } else {
+ timer_.cancel();
+ retx_count_ = 0;
+ doGetMainListener(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled.");
+ }
+ }
+
+ void doGetMainListener(std::error_code ec) {
+ if (!ec) {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.getMainListenerPort();
+ if (ret <= 0) {
+ // Since without the main listener of the forwarder the proxy cannot
+ // work, we can stop the program here until we get the listener port.
+ TRANSPORT_LOGE(
+ "Could not retrieve main listener port from the forwarder. "
+ "Retrying.");
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this,
+ std::placeholders::_1));
+ } else {
+ timer_.cancel();
+ retx_count_ = 0;
+ hicn_listen_port_ = uint16_t(ret);
+ listener_retrieved_callback_(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled.");
+ }
+ }
+
+ template <typename Callback>
+ TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header,
+ Callback&& callback) {
+ std::stringstream ss(header);
+ route_info_t* ret = new route_info_t();
+ std::string port_string;
+
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+
+ if (TRANSPORT_EXPECT_FALSE(substr.empty())) {
+ continue;
+ }
+
+ utils::trim(substr);
+ auto it = std::find_if(substr.begin(), substr.end(),
+ [](int ch) { return ch == '='; });
+ if (it != std::end(substr)) {
+ auto key = std::string(substr.begin(), it);
+ auto value = std::string(it + 1, substr.end());
+
+ if (key == server_header) {
+ ret->remote_addr = value;
+ } else if (key == prefix_header) {
+ auto it = std::find_if(value.begin(), value.end(),
+ [](int ch) { return ch == '/'; });
+
+ if (it != std::end(value)) {
+ ret->route_addr = std::string(value.begin(), it);
+ ret->route_len = std::stoul(std::string(it + 1, value.end()));
+ } else {
+ return false;
+ }
+ } else if (key == port_header) {
+ ret->remote_port = std::stoul(value);
+ port_string = value;
+ } else {
+ // Header not recognized
+ return false;
+ }
+ }
+ }
+
+ /*
+ * Resolve server address
+ */
+ auto results =
+ resolver_.resolve({ret->remote_addr, port_string,
+ asio::ip::resolver_query_base::numeric_service});
+
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ asio::ip::udp::resolver::iterator end;
+ auto& it = results;
+ while (it != end) {
+#else
+ for (auto it = results.begin(); it != results.end(); it++) {
+#endif
+ if (it->endpoint().address().is_v4()) {
+ // Use this v4 address to configure the forwarder.
+ ret->remote_addr = it->endpoint().address().to_string();
+ ret->family = AF_INET;
+ std::string _prefix = ret->route_addr;
+ forwarder_interface_.createFaceAndRoute(
+ RouteInfoPtr(ret), [callback = std::forward<Callback>(callback),
+ configured_prefix = std::move(_prefix)](
+ uint32_t route_id, bool result) {
+ callback(result, configured_prefix);
+ });
+
+ return true;
+ }
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ it++;
+#endif
+ }
+
+ return false;
+ }
+
+ private:
+ ForwarderInterface forwarder_interface_;
+ asio::ip::udp::resolver resolver_;
+ std::uint32_t retx_count_;
+ asio::steady_timer timer_;
+ uint16_t hicn_listen_port_;
+ ListenerRetrievedCallback listener_retrieved_callback_;
+}; // namespace transport
+
+} // namespace transport \ No newline at end of file
diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h
new file mode 100644
index 000000000..54941a4ba
--- /dev/null
+++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#include <hicn/ctrl/api.h>
+#include <hicn/util/ip_address.h>
+}
+
+#ifndef ASIO_STANDALONE
+#define ASIO_STANDALONE 1
+#endif
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <thread>
+#include <unordered_map>
+
+namespace transport {
+
+typedef std::function<void(uint32_t, bool)> SetRouteCallback;
+
+struct route_info_t {
+ int family;
+ std::string remote_addr;
+ uint16_t remote_port;
+ std::string route_addr;
+ uint8_t route_len;
+};
+
+using RouteInfoPtr = std::shared_ptr<route_info_t>;
+
+class ForwarderInterface {
+ public:
+ ForwarderInterface(asio::io_service &io_service)
+ : external_ioservice_(io_service),
+ work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
+ sock_(nullptr),
+ thread_(std::make_unique<std::thread>(
+ [this]() { internal_ioservice_.run(); })),
+ check_routes_timer_(nullptr),
+ pending_add_route_counter_(0),
+ route_id_(0),
+ closed_(false) {}
+
+ ~ForwarderInterface();
+
+ int connectToForwarder();
+
+ void removeConnectedUserNow(uint32_t route_id);
+
+ // to be called at the server
+ // at the client this creates a race condition
+ // and the program enters in a loop
+ void scheduleRemoveConnectedUser(uint32_t route_id);
+
+ template <typename Callback>
+ void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) {
+ internal_ioservice_.post([this, _route_info = std::move(route_info),
+ _callback = std::forward<Callback>(callback)]() {
+ pending_add_route_counter_++;
+ uint8_t max_try = 5;
+ auto timer = new asio::steady_timer(internal_ioservice_);
+ internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
+ std::move(_callback));
+ });
+ }
+
+ int32_t getMainListenerPort();
+
+ void close();
+
+ private:
+ void internalRemoveConnectedUser(uint32_t route_id);
+
+ void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try,
+ asio::steady_timer *timer,
+ SetRouteCallback callback);
+
+ int tryToCreateFaceAndRoute(route_info_t *route_info);
+
+ asio::io_service &external_ioservice_;
+ asio::io_service internal_ioservice_;
+ std::unique_ptr<asio::io_service::work> work_;
+ hc_sock_t *sock_;
+ std::unique_ptr<std::thread> thread_;
+ std::unordered_map<uint32_t, RouteInfoPtr> route_status_;
+ std::unique_ptr<asio::steady_timer> check_routes_timer_;
+ uint32_t pending_add_route_counter_;
+ uint32_t route_id_;
+ bool closed_;
+};
+
+} // namespace transport
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h
index 79dbce19d..7c035c83b 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h
@@ -15,16 +15,43 @@
#pragma once
+#include <hicn/transport/http/message.h>
+
#include <algorithm>
#include <string>
-#include <hicn/transport/http/message.h>
-
using transport::http::HTTPHeaders;
+namespace transport {
+struct Metadata;
+}
+
class HTTPMessageFastParser {
public:
- static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length);
+ static constexpr char http_ok[] =
+ "HTTP/1.1 200 OK\r\n"
+ "Access-Control-Allow-Origin: *\r\n"
+ "Connection: close\r\n"
+ "Content-Length: 0\r\n\r\n";
+
+ static constexpr char http_cors[] =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: %s\r\n"
+ "Connection: close\r\n"
+ "Content-Length: 0\r\n"
+ "Access-Control-Allow-Origin: *\r\n"
+ "Access-Control-Allow-Methods: GET\r\n"
+ "Access-Control-Allow-Headers: hicn\r\n"
+ "Access-Control-Max-Age: 1800\r\n\r\n";
+
+ static constexpr char http_failed[] =
+ "HTTP/1.1 500 Internal Server Error\r\n"
+ "Date: %s\r\n"
+ "Content-Length: 0\r\nConnection: "
+ "close\r\n\r\n";
+
+ static void getHeaders(const uint8_t* headers, std::size_t length,
+ bool request, transport::Metadata* metadata);
static std::size_t hasBody(const uint8_t* headers, std::size_t length);
static bool isMpdRequest(const uint8_t* headers, std::size_t length);
static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h
new file mode 100644
index 000000000..a4139a620
--- /dev/null
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/utils/event_thread.h>
+
+#include "forwarder_config.h"
+#include "http_session.h"
+#include "icn_receiver.h"
+
+#define ASIO_STANDALONE
+#include <asio.hpp>
+#include <asio/version.hpp>
+#include <unordered_set>
+
+class TcpListener {
+ public:
+ using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>;
+
+ TcpListener(asio::io_service& io_service, short port, AcceptCallback callback)
+ : acceptor_(io_service),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ socket_(io_service),
+#endif
+ callback_(callback) {
+ acceptor_.open(asio::ip::tcp::v4());
+ typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT>
+ reuse_port;
+ acceptor_.set_option(reuse_port(true));
+ acceptor_.bind(asio::ip::tcp::endpoint(
+ asio::ip::address::from_string("127.0.0.1"), port));
+ acceptor_.listen();
+ }
+
+ public:
+ void doAccept() {
+#if ((ASIO_VERSION / 100 % 1000) >= 12)
+ acceptor_.async_accept(
+ [this](std::error_code ec, asio::ip::tcp::socket socket) {
+#else
+ acceptor_.async_accept(socket_, [this](std::error_code ec) {
+ auto socket = std::move(socket_);
+#endif
+ if (!ec) {
+ callback_(std::move(socket));
+ doAccept();
+ }
+ });
+ }
+
+ void stop() { acceptor_.close(); }
+
+ asio::ip::tcp::acceptor acceptor_;
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ asio::ip::tcp::socket socket_;
+#endif
+ AcceptCallback callback_;
+};
+
+namespace transport {
+
+class HTTPClientConnectionCallback;
+
+class Receiver {
+ public:
+ Receiver() : thread_() {}
+ virtual ~Receiver() = default;
+ void stopAndJoinThread() { thread_.stop(); }
+ virtual void stop() = 0;
+
+ protected:
+ utils::EventThread thread_;
+};
+
+class TcpReceiver : public Receiver {
+ friend class HTTPClientConnectionCallback;
+
+ public:
+ TcpReceiver(std::uint16_t port, const std::string& prefix,
+ const std::string& ipv6_first_word);
+
+ void stop() override;
+
+ private:
+ void onNewConnection(asio::ip::tcp::socket&& socket);
+ void onClientDisconnect(HTTPClientConnectionCallback* client);
+
+ template <typename Callback>
+ void parseHicnHeader(std::string& hicn_header, Callback&& callback) {
+ forwarder_config_.parseHicnHeader(hicn_header,
+ std::forward<Callback>(callback));
+ }
+
+ TcpListener listener_;
+ std::string prefix_;
+ std::string ipv6_first_word_;
+ std::string prefix_hash_;
+ std::deque<HTTPClientConnectionCallback*> http_clients_;
+ std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_;
+ ForwarderConfig forwarder_config_;
+ bool stopped_;
+};
+
+class IcnReceiver : public Receiver {
+ public:
+ template <typename... Args>
+ IcnReceiver(Args&&... args)
+ : Receiver(),
+ icn_consum_producer_(thread_.getIoService(),
+ std::forward<Args>(args)...) {
+ icn_consum_producer_.run();
+ }
+
+ void stop() override {
+ thread_.add([this]() {
+ /* Stop the listener */
+ icn_consum_producer_.stop();
+ });
+ }
+
+ private:
+ AsyncConsumerProducer icn_consum_producer_;
+};
+
+class HTTPProxy {
+ public:
+ enum Server { CREATE };
+ enum Client { WRAP_BUFFER };
+
+ struct CommonParams {
+ std::string prefix;
+ std::string first_ipv6_word;
+
+ virtual void printParams() { std::cout << "Parameters: " << std::endl; };
+ };
+
+ struct ClientParams : virtual CommonParams {
+ short tcp_listen_port;
+ void printParams() override {
+ std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl;
+ CommonParams::printParams();
+ std::cout << "\t"
+ << "HTTP listen port: " << tcp_listen_port << std::endl;
+ std::cout << "\t"
+ << "Consumer Prefix: " << prefix << std::endl;
+ std::cout << "\t"
+ << "Prefix first word: " << first_ipv6_word << std::endl;
+ }
+ };
+
+ struct ServerParams : virtual CommonParams {
+ std::string origin_address;
+ std::string origin_port;
+ std::string cache_size;
+ std::string mtu;
+ std::string content_lifetime;
+ bool manifest;
+
+ void printParams() override {
+ std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl;
+ CommonParams::printParams();
+ std::cout << "\t"
+ << "Origin address: " << origin_address << std::endl;
+ std::cout << "\t"
+ << "Origin port: " << origin_port << std::endl;
+ std::cout << "\t"
+ << "Producer cache size: " << cache_size << std::endl;
+ std::cout << "\t"
+ << "hICN MTU: " << mtu << std::endl;
+ std::cout << "\t"
+ << "Default content lifetime: " << content_lifetime
+ << std::endl;
+ std::cout << "\t"
+ << "Producer Prefix: " << prefix << std::endl;
+ std::cout << "\t"
+ << "Prefix first word: " << first_ipv6_word << std::endl;
+ std::cout << "\t"
+ << "Use manifest: " << manifest << std::endl;
+ }
+ };
+
+ HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1);
+ HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1);
+
+ void run() { main_io_context_.run(); }
+ void stop();
+
+ private:
+ void setupSignalHandler();
+
+ std::vector<std::unique_ptr<Receiver>> receivers_;
+ asio::io_service main_io_context_;
+ asio::signal_set signals_;
+};
+
+} // namespace transport \ No newline at end of file
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h
index 8d91b7b7b..f4a3dbdee 100644
--- a/apps/http-proxy/src/ATSConnector.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h
@@ -17,6 +17,8 @@
#include <hicn/transport/core/packet.h>
+#include "http_1x_message_fast_parser.h"
+
#define ASIO_STANDALONE
#include <asio.hpp>
#include <deque>
@@ -26,16 +28,36 @@ namespace transport {
using asio::ip::tcp;
+struct Metadata;
+
typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last,
- bool headers)>
+ bool headers, Metadata *metadata)>
ContentReceivedCallback;
-typedef std::function<void()> OnReconnect;
+typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed;
typedef std::function<void()> ContentSentCallback;
typedef std::deque<
std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>>
BufferQueue;
-class ATSConnector {
+struct Metadata {
+ std::string http_version;
+ HTTPHeaders headers;
+};
+
+struct RequestMetadata : Metadata {
+ std::string method;
+ std::string path;
+};
+
+struct ResponseMetadata : Metadata {
+ std::string status_code;
+ std::string status_string;
+};
+
+class HTTPClientConnectionCallback;
+
+class HTTPSession {
+ friend class HTTPClientConnectionCallback;
static constexpr uint32_t buffer_size = 1024 * 512;
enum class ConnectorState {
@@ -45,11 +67,15 @@ class ATSConnector {
};
public:
- ATSConnector(asio::io_service &io_service, std::string &ip_address,
- std::string &port, ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback);
+ HTTPSession(asio::io_service &io_service, std::string &ip_address,
+ std::string &port, ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_reconnect_callback, bool client = false);
- ~ATSConnector();
+ HTTPSession(asio::ip::tcp::socket socket,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_reconnect_callback, bool client = true);
+
+ ~HTTPSession();
void send(const uint8_t *buffer, std::size_t len,
ContentSentCallback &&content_sent = 0);
@@ -65,9 +91,6 @@ class ATSConnector {
void doReadBody(std::size_t body_size, std::size_t additional_bytes);
- // void handleReadChunked(std::error_code ec, std::size_t length,
- // std::size_t size);
-
void doReadChunkedHeader();
void doWrite();
@@ -90,6 +113,7 @@ class ATSConnector {
asio::streambuf input_buffer_;
+ bool reverse_;
bool is_reconnection_;
bool data_available_;
@@ -100,7 +124,10 @@ class ATSConnector {
bool chunked_;
ContentReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
+ OnConnectionClosed on_connection_closed_callback_;
+
+ // HTTP headers
+ std::unique_ptr<Metadata> header_info_;
// Connector state
ConnectorState state_;
diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h
index 9d0ab5172..780037665 100644
--- a/apps/http-proxy/src/IcnReceiver.h
+++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h
@@ -13,18 +13,20 @@
* limitations under the License.
*/
-#include "ATSConnector.h"
-
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/interfaces/publication_options.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/spinlock.h>
+#include <asio.hpp>
#include <cassert>
#include <cstring>
#include <queue>
#include <utility>
+#include <hicn/http-proxy/http_session.h>
+//#include "http_session.h"
+
namespace transport {
class AsyncConsumerProducer {
@@ -33,17 +35,31 @@ class AsyncConsumerProducer {
using RequestQueue = std::queue<interface::PublicationOptions>;
public:
- explicit AsyncConsumerProducer(const std::string& prefix,
- std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu,
- std::string& first_ipv6_word,
- unsigned long default_content_lifetime, bool manifest);
-
- void start();
+ explicit AsyncConsumerProducer(
+ asio::io_service& io_service, const std::string& prefix,
+ const std::string& first_ipv6_word, const std::string& origin_address,
+ const std::string& origin_port, const std::string& cache_size,
+ const std::string& mtu, const std::string& content_lifetime,
+ bool manifest);
+
+ explicit AsyncConsumerProducer(
+ const std::string& prefix, const std::string& first_ipv6_word,
+ const std::string& origin_address, const std::string& origin_port,
+ const std::string& cache_size, const std::string& mtu,
+ const std::string& content_lifetime, bool manifest)
+ : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word,
+ origin_address, origin_port, cache_size, mtu,
+ content_lifetime, manifest) {
+ external_io_service_ = false;
+ }
void run();
+ void stop();
+
private:
+ void start();
+
void doSend();
void doReceive();
@@ -55,7 +71,9 @@ class AsyncConsumerProducer {
utils::MemBuf* payload);
core::Prefix prefix_;
- asio::io_service io_service_;
+ asio::io_service& io_service_;
+ asio::io_service internal_io_service_;
+ bool external_io_service_;
interface::ProducerSocket producer_socket_;
std::string ip_address_;
@@ -64,11 +82,10 @@ class AsyncConsumerProducer {
uint32_t mtu_;
uint64_t request_counter_;
- asio::signal_set signals_;
// std::unordered_map<core::Name, std::shared_ptr<ATSConnector>>
// connection_map_;
- ATSConnector connector_;
+ HTTPSession connector_;
unsigned long default_content_lifetime_;
diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h
new file mode 100644
index 000000000..d87c796d0
--- /dev/null
+++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/utils/hash.h>
+
+#include <sstream>
+#include <string>
+
+#pragma once
+
+TRANSPORT_ALWAYS_INLINE std::string generatePrefix(
+ const std::string& prefix_url, const std::string& first_ipv6_word) {
+ const char* str = prefix_url.c_str();
+ uint16_t pos = 0;
+
+ if (strncmp("http://", str, 7) == 0) {
+ pos = 7;
+ } else if (strncmp("https://", str, 8) == 0) {
+ pos = 8;
+ }
+
+ str += pos;
+
+ uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
+
+ std::stringstream stream;
+ stream << first_ipv6_word << ":0";
+
+ for (uint16_t* word = (uint16_t*)&locator_hash;
+ std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
+ word++) {
+ stream << ":" << std::hex << *word;
+ }
+
+ stream << "::";
+
+ return stream.str();
+} \ No newline at end of file
diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc
index 20655071c..d9c29ecde 100644
--- a/apps/http-proxy/main.cc
+++ b/apps/http-proxy/main.cc
@@ -13,53 +13,128 @@
* limitations under the License.
*/
-#include "src/IcnReceiver.h"
+#include <hicn/http-proxy/http_proxy.h>
using namespace transport;
int usage(char* program) {
- std::cerr << "ICN Plugin not loaded!" << std::endl;
- std::cerr << "USAGE: " << program << "\n"
- << "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] "
- "-p [SERVER_PORT] -c [CACHE_SIZE] -m [MTU] -l [DEFAULT_LIFETIME "
- "(seconds)] -P [FIRST_IPv6_WORD_HEX] -M (enable manifest)"
+ std::cerr << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n"
+ << "Server or Client: \n"
+ << " -P [FIRST_IPv6_WORD_HEX]\n"
+ << " -t [number of threads]\n"
+ << "Client Options: \n"
+ << " -L [PROXY_LISTEN_PORT]\n"
+ << "Server Options: \n"
+ << " -a [ORIGIN_IP_ADDRESS]\n"
+ << " -p [ORIGIN_PORT]\n"
+ << " -c [CACHE_SIZE]\n"
+ << " -m [MTU]"
+ << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n"
+ << " -M (enable manifest)\n"
+ << std::endl
+ << "Example Server:\n"
+ << " " << program
+ << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 "
+ "http://httpserver\n"
+ << "Example Client:\n"
+ << " " << program << " -C -L 9091 http://httpserver\n"
<< std::endl;
return -1;
}
+struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams {
+ void printParams() override {
+ if (client) {
+ HTTPProxy::ClientParams::printParams();
+ } else if (server) {
+ HTTPProxy::ServerParams::printParams();
+ } else {
+ throw std::runtime_error(
+ "Proxy configured as client and server at the same time.");
+ }
+
+ std::cout << "\t"
+ << "N Threads: " << n_thread << std::endl;
+ }
+
+ HTTPProxy* instantiateProxyAsValue() {
+ if (client) {
+ HTTPProxy::ClientParams* p = dynamic_cast<HTTPProxy::ClientParams*>(this);
+ return new transport::HTTPProxy(*p, n_thread);
+ } else if (server) {
+ HTTPProxy::ServerParams* p = dynamic_cast<HTTPProxy::ServerParams*>(this);
+ return new transport::HTTPProxy(*p, n_thread);
+ } else {
+ throw std::runtime_error(
+ "Proxy configured as client and server at the same time.");
+ }
+ }
+
+ bool client = false;
+ bool server = false;
+ std::uint16_t n_thread = 1;
+};
+
int main(int argc, char** argv) {
- std::string prefix("http://hicn-http-proxy");
- std::string ip_address("127.0.0.1");
- std::string port("80");
- std::string cache_size("50000");
- std::string mtu("1500");
- std::string first_ipv6_word("b001");
- std::string default_content_lifetime("7200"); // seconds
- bool manifest = false;
+ Params params;
+
+ params.prefix = "http://hicn-http-proxy";
+ params.origin_address = "127.0.0.1";
+ params.origin_port = "80";
+ params.cache_size = "50000";
+ params.mtu = "1500";
+ params.first_ipv6_word = "b001";
+ params.content_lifetime = "7200;"; // seconds
+ params.manifest = false;
+ params.tcp_listen_port = 8080;
int opt;
- while ((opt = getopt(argc, argv, "a:p:c:m:P:l:M")) != -1) {
+ while ((opt = getopt(argc, argv, "CSa:p:c:m:P:l:ML:t:")) != -1) {
switch (opt) {
+ case 'C':
+ if (params.server) {
+ std::cerr << "Cannot be both client and server (both -C anc -S "
+ "options specified.)."
+ << std::endl;
+ return usage(argv[0]);
+ }
+ params.client = true;
+ break;
+ case 'S':
+ if (params.client) {
+ std::cerr << "Cannot be both client and server (both -C anc -S "
+ "options specified.)."
+ << std::endl;
+ return usage(argv[0]);
+ }
+ params.server = true;
+ break;
case 'a':
- ip_address = optarg;
+ params.origin_address = optarg;
break;
case 'p':
- port = optarg;
+ params.origin_port = optarg;
break;
case 'c':
- cache_size = optarg;
+ params.cache_size = optarg;
break;
case 'm':
- mtu = optarg;
+ params.mtu = optarg;
break;
case 'P':
- first_ipv6_word = optarg;
+ params.first_ipv6_word = optarg;
break;
case 'l':
- default_content_lifetime = optarg;
+ params.content_lifetime = optarg;
+ break;
+ case 'L':
+ params.tcp_listen_port = std::stoul(optarg);
break;
case 'M':
- manifest = true;
+ params.manifest = true;
+ break;
+ case 't':
+ params.n_thread = std::stoul(optarg);
break;
case 'h':
default:
@@ -69,19 +144,15 @@ int main(int argc, char** argv) {
}
if (argv[optind] == 0) {
- std::cerr << "Using default prefix " << prefix << std::endl;
+ std::cerr << "Using default prefix " << params.prefix << std::endl;
} else {
- prefix = argv[optind];
+ params.prefix = argv[optind];
}
- std::cout << "Connecting to " << ip_address << " port " << port
- << " Cache size " << cache_size << " Prefix " << prefix << " MTU "
- << mtu << " IPv6 first word " << first_ipv6_word << std::endl;
- transport::AsyncConsumerProducer proxy(
- prefix, ip_address, port, cache_size, mtu, first_ipv6_word,
- std::stoul(default_content_lifetime) * 1000, manifest);
-
- proxy.run();
+ params.printParams();
+ auto proxy = params.instantiateProxyAsValue();
+ proxy->run();
+ delete proxy;
return 0;
} \ No newline at end of file
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc
new file mode 100644
index 000000000..7d8235ac6
--- /dev/null
+++ b/apps/http-proxy/src/forwarder_interface.cc
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arpa/inet.h>
+#include <hicn/http-proxy/forwarder_interface.h>
+#include <hicn/transport/utils/log.h>
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <unordered_set>
+
+namespace transport {
+
+ForwarderInterface::~ForwarderInterface() {}
+
+int ForwarderInterface::connectToForwarder() {
+ sock_ = hc_sock_create();
+ if (!sock_) return -1;
+
+ if (hc_sock_connect(sock_) < 0) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ return -1;
+ }
+
+ return 0;
+}
+
+void ForwarderInterface::close() {
+ if (!closed_) {
+ internal_ioservice_.post([this]() {
+ work_.reset();
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+ });
+
+ if (thread_->joinable()) {
+ thread_->join();
+ }
+ }
+}
+
+void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) {
+ internalRemoveConnectedUser(route_id);
+}
+
+void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) {
+ internal_ioservice_.post(
+ [this, route_id]() { internalRemoveConnectedUser(route_id); });
+}
+
+int32_t ForwarderInterface::getMainListenerPort() {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) return -1;
+
+ int ret = -1;
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ ret = l->local_port;
+ break;
+ }
+ }
+
+ hc_data_free(data);
+ return ret;
+}
+
+void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
+ auto it = route_status_.find(route_id);
+ if (it == route_status_.end()) return;
+
+ if (!sock_) return;
+
+ // remove route
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) return;
+
+ std::vector<hc_route_t *> routes_to_remove;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+
+ std::string route_addr(remote_addr);
+ if (route_addr.compare(it->second->route_addr) == 0 &&
+ r->len == it->second->route_len) {
+ // route found
+ routes_to_remove.push_back(r);
+ }
+ }
+
+ route_status_.erase(it);
+
+ if (routes_to_remove.size() == 0) {
+ // nothing to do here
+ hc_data_free(data);
+ return;
+ }
+
+ std::unordered_set<uint32_t> connids_to_remove;
+ for (unsigned i = 0; i < routes_to_remove.size(); i++) {
+ connids_to_remove.insert(routes_to_remove[i]->face_id);
+ if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing route from forwarder.");
+ }
+ }
+
+ // remove connection
+ if (hc_connection_list(sock_, &data) < 0) {
+ hc_data_free(data);
+ return;
+ }
+
+ // collects pointerst to the connections using the conn IDs
+ std::vector<hc_connection_t *> conns_to_remove;
+ foreach_connection(c, data) {
+ if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
+ // conn found
+ conns_to_remove.push_back(c);
+ }
+ }
+
+ if (conns_to_remove.size() == 0) {
+ // nothing else to do here
+ hc_data_free(data);
+ return;
+ }
+
+ for (unsigned i = 0; i < conns_to_remove.size(); i++) {
+ if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing connection from forwarder.");
+ }
+ }
+
+ hc_data_free(data);
+}
+
+void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info,
+ uint8_t max_try,
+ asio::steady_timer *timer,
+ SetRouteCallback callback) {
+ int ret = tryToCreateFaceAndRoute(route_info.get());
+
+ if (ret < 0 && max_try > 0) {
+ max_try--;
+ timer->expires_from_now(std::chrono::milliseconds(500));
+ timer->async_wait([this, _route_info = std::move(route_info), max_try,
+ timer, callback](std::error_code ec) {
+ if (ec) return;
+ internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
+ std::move(callback));
+ });
+ return;
+ }
+
+ if (max_try == 0 && ret < 0) {
+ pending_add_route_counter_--;
+ external_ioservice_.post([callback]() { callback(false, ~0); });
+ } else {
+ pending_add_route_counter_--;
+ route_status_[route_id_] = std::move(route_info);
+ external_ioservice_.post(
+ [route_id = route_id_, callback]() { callback(route_id, true); });
+ route_id_++;
+ }
+
+ delete timer;
+}
+
+int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) {
+ return -1;
+ }
+
+ bool found = false;
+ uint32_t face_id;
+
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ found = true;
+
+ ip_address_t remote_ip;
+ if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_face_t face;
+ memset(&face, 0, sizeof(hc_face_t));
+
+ face.face.type = FACE_TYPE_UDP;
+ face.face.family = route_info->family;
+ face.face.local_addr = l->local_addr;
+ face.face.remote_addr = remote_ip;
+ face.face.local_port = l->local_port;
+ face.face.remote_port = route_info->remote_port;
+
+ if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ if (hc_face_create(sock_, &face) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ face_id = face.id;
+ break;
+ }
+ }
+
+ if (!found) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ ip_address_t route_ip;
+ hc_route_t route;
+
+ if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ route.face_id = face_id;
+ route.family = AF_INET6;
+ route.remote_addr = route_ip;
+ route.len = route_info->route_len;
+ route.cost = 1;
+
+ if (hc_route_create(sock_, &route) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_data_free(data);
+ return 0;
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc
index 729eb3aeb..4b6b78d55 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc
@@ -13,33 +13,48 @@
* limitations under the License.
*/
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_session.h>
+#include <hicn/transport/http/request.h>
#include <hicn/transport/http/response.h>
#include <experimental/algorithm>
#include <experimental/functional>
#include <iostream>
+constexpr char HTTPMessageFastParser::http_ok[];
+constexpr char HTTPMessageFastParser::http_cors[];
+constexpr char HTTPMessageFastParser::http_failed[];
+
std::string HTTPMessageFastParser::numbers = "0123456789";
-std::string HTTPMessageFastParser::content_length = "Content-Length";
-std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding";
+std::string HTTPMessageFastParser::content_length = "content-length";
+std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding";
std::string HTTPMessageFastParser::chunked = "chunked";
-std::string HTTPMessageFastParser::cache_control = "Cache-Control";
+std::string HTTPMessageFastParser::cache_control = "cache-control";
std::string HTTPMessageFastParser::mpd = "mpd";
-std::string HTTPMessageFastParser::connection = "Connection";
+std::string HTTPMessageFastParser::connection = "connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
-HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
- std::size_t length) {
- HTTPHeaders ret;
- std::string http_version;
- std::string status_code;
- std::string status_string;
-
- if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version,
- status_code, status_string)) {
- return ret;
+void HTTPMessageFastParser::getHeaders(const uint8_t *headers,
+ std::size_t length, bool request,
+ transport::Metadata *metadata) {
+ if (request) {
+ transport::RequestMetadata *_metadata =
+ (transport::RequestMetadata *)(metadata);
+
+ if (transport::http::HTTPRequest::parseHeaders(
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->method, _metadata->path)) {
+ return;
+ }
+ } else {
+ transport::ResponseMetadata *_metadata =
+ (transport::ResponseMetadata *)(metadata);
+
+ if (transport::http::HTTPResponse::parseHeaders(
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->status_code, _metadata->status_string)) {
+ return;
+ }
}
throw std::runtime_error("Error parsing response headers.");
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
new file mode 100644
index 000000000..262fcb8e1
--- /dev/null
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/http-proxy/http_proxy.h>
+#include <hicn/http-proxy/http_session.h>
+#include <hicn/http-proxy/utils.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
+
+namespace transport {
+
+using core::Interest;
+using core::Name;
+using interface::ConsumerCallbacksOptions;
+using interface::ConsumerInterestCallback;
+using interface::ConsumerSocket;
+using interface::TransportProtocolAlgorithms;
+
+class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
+ public:
+ HTTPClientConnectionCallback(TcpReceiver& tcp_receiver,
+ utils::EventThread& thread)
+ : tcp_receiver_(tcp_receiver),
+ thread_(thread),
+ prefix_hash_(tcp_receiver_.prefix_hash_),
+ consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()),
+ session_(nullptr),
+ current_size_(0) {
+ consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processInterestRetx, this,
+ std::placeholders::_1, std::placeholders::_2));
+ consumer_.connect();
+ }
+
+ void stop() { session_->close(); }
+
+ void setHttpSession(asio::ip::tcp::socket&& socket) {
+ session_ = std::make_unique<HTTPSession>(
+ std::move(socket),
+ std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, std::placeholders::_4,
+ std::placeholders::_5),
+ [this](asio::ip::tcp::socket& socket) -> bool {
+ try {
+ std::string remote_address =
+ socket.remote_endpoint().address().to_string();
+ std::uint16_t remote_port = socket.remote_endpoint().port();
+ TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(),
+ remote_port);
+ } catch (std::system_error& e) {
+ // Do nothing
+ }
+
+ consumer_.stop();
+ request_buffer_queue_.clear();
+ tcp_receiver_.onClientDisconnect(this);
+ return false;
+ });
+
+ current_size_ = 0;
+ }
+
+ private:
+ void consumeNextRequest() {
+ if (request_buffer_queue_.size() == 0) {
+ TRANSPORT_LOGD("No additional requests to process.");
+ return;
+ }
+
+ auto& buffer = request_buffer_queue_.front().second;
+ uint64_t request_hash =
+ utils::hash::fnv64_buf(buffer.data(), buffer.size());
+
+ std::stringstream name;
+ name << prefix_hash_.substr(0, prefix_hash_.length() - 2);
+
+ for (uint16_t* word = (uint16_t*)&request_hash;
+ std::size_t(word) <
+ (std::size_t(&request_hash) + sizeof(request_hash));
+ word++) {
+ name << ":" << std::hex << *word;
+ }
+
+ name << "|0";
+
+ // Non blocking consume :)
+ consumer_.consume(Name(name.str()));
+ }
+
+ // tcp callbacks
+
+ void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last,
+ bool headers, Metadata* metadata) {
+ if (headers) {
+ // Add the request to the request queue
+ RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata);
+ tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size),
+ _metadata->path);
+ if (TRANSPORT_EXPECT_FALSE(
+ _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) {
+ /**
+ * It seems this request is for us.
+ * Get hicn parameters.
+ */
+ processClientRequest(_metadata);
+ return;
+ }
+ } else {
+ // Append payload chunk to last request added. Here we are assuming
+ // HTTP/1.1.
+ tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size));
+ }
+
+ current_size_ += size;
+
+ if (is_last) {
+ TRANSPORT_LOGD("Request received: %s",
+ std::string((const char*)tmp_buffer_.first->data(),
+ tmp_buffer_.first->length())
+ .c_str());
+ if (current_size_ < 1400) {
+ request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
+ } else {
+ TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.",
+ current_size_);
+ session_->close();
+ current_size_ = 0;
+ return;
+ }
+
+ if (!consumer_.isRunning()) {
+ TRANSPORT_LOGD(
+ "Consumer stopped, triggering consume from TCP session "
+ "handler..");
+ consumeNextRequest();
+ }
+
+ current_size_ = 0;
+ }
+ }
+
+ // hicn callbacks
+
+ void processLeavingInterest(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
+ if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ void processInterestRetx(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
+ if (interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ bool isBufferMovable() noexcept { return true; }
+ void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {}
+ void readDataAvailable(size_t length) noexcept {}
+ size_t maxBufferSize() const { return 64 * 1024; }
+
+ void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept {
+ // Response received. Send it back to client
+ auto _buffer = buffer.release();
+ TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
+ session_->send(_buffer, []() {});
+ }
+
+ void readError(const std::error_code ec) noexcept {
+ TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session.");
+ session_->close();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept {
+ request_buffer_queue_.pop_front();
+ consumeNextRequest();
+ }
+
+ void processClientRequest(RequestMetadata* metadata) {
+ auto it = metadata->headers.find("hicn");
+ if (it == metadata->headers.end()) {
+ /*
+ * Probably it is an OPTION message for access control.
+ * Let's grant it!
+ */
+ if (metadata->method == "OPTIONS") {
+ session_->send(
+ (const uint8_t*)HTTPMessageFastParser::http_cors,
+ std::strlen(HTTPMessageFastParser::http_cors), [this]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent OPTIONS to client %s:%d",
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ }
+ } else {
+ tcp_receiver_.parseHicnHeader(
+ it->second, [this](bool result, std::string configured_prefix) {
+ const char* reply = nullptr;
+ if (result) {
+ reply = HTTPMessageFastParser::http_ok;
+ } else {
+ reply = HTTPMessageFastParser::http_failed;
+ }
+
+ /* Route created. Send back a 200 OK to client */
+ session_->send(
+ (const uint8_t*)reply, std::strlen(reply), [this, result]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent %d response to client %s:%d", result,
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ });
+ }
+ }
+
+ private:
+ TcpReceiver& tcp_receiver_;
+ utils::EventThread& thread_;
+ std::string& prefix_hash_;
+ ConsumerSocket consumer_;
+ std::unique_ptr<HTTPSession> session_;
+ std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>>
+ request_buffer_queue_;
+ std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_;
+ std::size_t current_size_;
+};
+
+TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
+ const std::string& ipv6_first_word)
+ : Receiver(),
+ listener_(thread_.getIoService(), port,
+ std::bind(&TcpReceiver::onNewConnection, this,
+ std::placeholders::_1)),
+ prefix_(prefix),
+ ipv6_first_word_(ipv6_first_word),
+ prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)),
+ forwarder_config_(
+ thread_.getIoService(),
+ [this](std::error_code ec) {
+ if (!ec) {
+ listener_.doAccept();
+ for (int i = 0; i < 10; i++) {
+ http_clients_.emplace_back(
+ new HTTPClientConnectionCallback(*this, thread_));
+ }
+ }
+ }),
+ stopped_(false) {
+ forwarder_config_.tryToConnectToForwarder();
+}
+
+void TcpReceiver::stop() {
+ thread_.add([this]() {
+ stopped_ = true;
+
+ /* Stop the listener */
+ listener_.stop();
+
+ /* Close connection with forwarder */
+ forwarder_config_.close();
+
+ /* Stop the used http clients */
+ for (auto& client : used_http_clients_) {
+ client->stop();
+ }
+
+ /* Delete unused clients */
+ for (auto& client : http_clients_) {
+ delete client;
+ }
+ });
+}
+
+void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {
+ if (stopped_) {
+ delete client;
+ return;
+ }
+
+ http_clients_.emplace_front(client);
+ used_http_clients_.erase(client);
+}
+
+void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
+ if (http_clients_.size() == 0) {
+ // Create new HTTPClientConnectionCallback
+ TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback.");
+ http_clients_.emplace_back(
+ new HTTPClientConnectionCallback(*this, thread_));
+ }
+
+ // Get new HTTPClientConnectionCallback
+ HTTPClientConnectionCallback* c = http_clients_.front();
+ http_clients_.pop_front();
+
+ // Set http session
+ c->setHttpSession(std::move(socket));
+
+ // Move it to used clients
+ used_http_clients_.insert(c);
+}
+
+void HTTPProxy::setupSignalHandler() {
+ signals_.async_wait([this](const std::error_code& ec, int signal_number) {
+ if (!ec) {
+ TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number);
+ stop();
+ }
+ });
+}
+
+void HTTPProxy::stop() {
+ for (auto& receiver : receivers_) {
+ receiver->stop();
+ }
+
+ for (auto& receiver : receivers_) {
+ receiver->stopAndJoinThread();
+ }
+
+ signals_.cancel();
+}
+
+HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread)
+ : signals_(main_io_context_, SIGINT, SIGQUIT) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params));
+ receivers_.emplace_back(std::make_unique<TcpReceiver>(
+ params.tcp_listen_port, params.prefix, params.first_ipv6_word));
+ }
+
+ setupSignalHandler();
+}
+
+HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread)
+ : signals_(main_io_context_, SIGINT, SIGQUIT) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ receivers_.emplace_back(std::make_unique<IcnReceiver>(
+ params.prefix, params.first_ipv6_word, params.origin_address,
+ params.origin_port, params.cache_size, params.mtu,
+ params.content_lifetime, params.manifest));
+ }
+
+ setupSignalHandler();
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc
index a9b889941..6b91c12c3 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -13,48 +13,95 @@
* limitations under the License.
*/
-#include "ATSConnector.h"
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_proxy.h>
#include <hicn/transport/utils/branch_prediction.h>
#include <hicn/transport/utils/log.h>
+
#include <iostream>
namespace transport {
-ATSConnector::ATSConnector(asio::io_service &io_service,
- std::string &ip_address, std::string &port,
- ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback)
+HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
+ std::string &port,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool client)
: io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
endpoint_iterator_(resolver_.resolve({ip_address, port})),
timer_(io_service),
+ reverse_(client),
is_reconnection_(false),
data_available_(false),
content_length_(0),
is_last_chunk_(false),
chunked_(false),
receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback) {
+ on_connection_closed_callback_(on_connection_closed_callback) {
input_buffer_.prepare(buffer_size + 2048);
state_ = ConnectorState::CONNECTING;
+
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
+
doConnect();
}
-ATSConnector::~ATSConnector() {}
+HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool client)
+ :
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket.get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket.get_executor().context())),
+#endif
+ socket_(std::move(socket)),
+ resolver_(io_service_),
+ timer_(io_service_),
+ reverse_(client),
+ is_reconnection_(false),
+ data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
+ receive_callback_(receive_callback),
+ on_connection_closed_callback_(on_connection_closed_callback) {
+ input_buffer_.prepare(buffer_size + 2048);
+ state_ = ConnectorState::CONNECTED;
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
-void ATSConnector::send(const uint8_t *packet, std::size_t len,
- ContentSentCallback &&content_sent) {
- asio::async_write(
- socket_, asio::buffer(packet, len),
- [content_sent = std::move(content_sent)](
- std::error_code ec, std::size_t /*length*/) { content_sent(); });
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
+ doReadHeader();
}
-void ATSConnector::send(utils::MemBuf *buffer,
- ContentSentCallback &&content_sent) {
+HTTPSession::~HTTPSession() {}
+
+void HTTPSession::send(const uint8_t *packet, std::size_t len,
+ ContentSentCallback &&content_sent) {
+ io_service_.dispatch([this, packet, len, content_sent]() {
+ asio::async_write(socket_, asio::buffer(packet, len),
+ [content_sent = std::move(content_sent)](
+ std::error_code ec, std::size_t /*length*/) {
+ if (!ec) {
+ content_sent();
+ }
+ });
+ });
+}
+
+void HTTPSession::send(utils::MemBuf *buffer,
+ ContentSentCallback &&content_sent) {
io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer),
@@ -70,24 +117,25 @@ void ATSConnector::send(utils::MemBuf *buffer,
});
}
-void ATSConnector::close() {
+void HTTPSession::close() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
// on_disconnect_callback_();
}
}
}
-void ATSConnector::doWrite() {
+void HTTPSession::doWrite() {
auto &buffer = write_msgs_.front().first;
asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_FALSE(!ec)) {
- TRANSPORT_LOGD("Content successfully sent!");
+ TRANSPORT_LOGD("Content successfully sent! %zu",
+ length);
write_msgs_.front().second();
write_msgs_.pop_front();
if (!write_msgs_.empty()) {
@@ -99,12 +147,14 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
+void HTTPSession::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
+ bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false)
+ : !content_length_;
+ receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr);
input_buffer_.consume(input_buffer_.size());
if (!content_length_) {
@@ -117,7 +167,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
auto to_read =
content_length_ >= buffer_size ? buffer_size : content_length_;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
@@ -126,8 +176,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
}
}
-void ATSConnector::doReadBody(std::size_t body_size,
- std::size_t additional_bytes) {
+void HTTPSession::doReadBody(std::size_t body_size,
+ std::size_t additional_bytes) {
auto bytes_to_read =
body_size > additional_bytes ? (body_size - additional_bytes) : 0;
@@ -140,14 +190,16 @@ void ATSConnector::doReadBody(std::size_t body_size,
if (to_read > 0) {
content_length_ = bytes_to_read;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
} else {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
- false);
- input_buffer_.consume(body_size);
+ if (body_size) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false, nullptr);
+ input_buffer_.consume(body_size);
+ }
if (!chunked_ || is_last_chunk_) {
doReadHeader();
@@ -157,7 +209,7 @@ void ATSConnector::doReadBody(std::size_t body_size,
}
}
-void ATSConnector::doReadChunkedHeader() {
+void HTTPSession::doReadChunkedHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n",
[this](std::error_code ec, std::size_t length) {
@@ -176,14 +228,17 @@ void ATSConnector::doReadChunkedHeader() {
});
}
-void ATSConnector::doReadHeader() {
+void HTTPSession::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
+ HTTPMessageFastParser::getHeaders(buffer, length, reverse_,
+ header_info_.get());
+
+ auto &headers = header_info_->headers;
// Try to get content length, if available
auto it = headers.find(HTTPMessageFastParser::content_length);
@@ -199,7 +254,8 @@ void ATSConnector::doReadHeader() {
}
}
- receive_callback_(buffer, length, !size && !chunked_, true);
+ receive_callback_(buffer, length, !size && !chunked_, true,
+ header_info_.get());
auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
@@ -215,23 +271,25 @@ void ATSConnector::doReadHeader() {
});
}
-void ATSConnector::tryReconnection() {
- TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
- if (state_ == ConnectorState::CONNECTED) {
- state_ = ConnectorState::CONNECTING;
- is_reconnection_ = true;
- io_service_.post([this]() {
- if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
- });
+void HTTPSession::tryReconnection() {
+ if (on_connection_closed_callback_(socket_)) {
+ if (state_ == ConnectorState::CONNECTED) {
+ TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
+ state_ = ConnectorState::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ startConnectionTimer();
+ doConnect();
+ });
+ }
}
}
-void ATSConnector::doConnect() {
+void HTTPSession::doConnect() {
asio::async_connect(socket_, endpoint_iterator_,
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
@@ -263,17 +321,17 @@ void ATSConnector::doConnect() {
});
}
-bool ATSConnector::checkConnected() {
+bool HTTPSession::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void ATSConnector::startConnectionTimer() {
+void HTTPSession::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(10));
timer_.async_wait(
- std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1));
+ std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1));
}
-void ATSConnector::handleDeadline(const std::error_code &ec) {
+void HTTPSession::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc
index 24b0eb5dc..23e5b5623 100644
--- a/apps/http-proxy/src/IcnReceiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-#include "IcnReceiver.h"
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_1x_message_fast_parser.h>
+#include <hicn/http-proxy/icn_receiver.h>
+#include <hicn/http-proxy/utils.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/http/default_values.h>
#include <hicn/transport/utils/hash.h>
@@ -26,56 +26,31 @@
namespace transport {
-core::Prefix generatePrefix(const std::string& prefix_url,
- std::string& first_ipv6_word) {
- const char* str = prefix_url.c_str();
- uint16_t pos = 0;
-
- if (strncmp("http://", str, 7) == 0) {
- pos = 7;
- } else if (strncmp("https://", str, 8) == 0) {
- pos = 8;
- }
-
- str += pos;
-
- uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
-
- std::stringstream stream;
- stream << first_ipv6_word << ":0";
-
- for (uint16_t* word = (uint16_t*)&locator_hash;
- std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
- word++) {
- stream << ":" << std::hex << *word;
- }
-
- stream << "::0";
-
- return core::Prefix(stream.str(), 64);
-}
-
AsyncConsumerProducer::AsyncConsumerProducer(
- const std::string& prefix, std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu, std::string& first_ipv6_word,
- unsigned long default_lifetime, bool manifest)
- : prefix_(generatePrefix(prefix, first_ipv6_word)),
+ asio::io_service& io_service, const std::string& prefix,
+ const std::string& first_ipv6_word, const std::string& origin_address,
+ const std::string& origin_port, const std::string& cache_size,
+ const std::string& mtu, const std::string& content_lifetime, bool manifest)
+ : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)),
+ io_service_(io_service),
+ external_io_service_(true),
producer_socket_(),
- ip_address_(ip_address),
- port_(port),
+ ip_address_(origin_address),
+ port_(origin_port),
cache_size_(std::stoul(cache_size)),
mtu_(std::stoul(mtu)),
request_counter_(0),
- signals_(io_service_, SIGINT, SIGQUIT),
connector_(io_service_, ip_address_, port_,
std::bind(&AsyncConsumerProducer::publishContent, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
- [this]() {
+ [this](asio::ip::tcp::socket& socket) -> bool {
std::queue<interface::PublicationOptions> empty;
std::swap(response_name_queue_, empty);
+
+ return true;
}),
- default_content_lifetime_(default_lifetime) {
+ default_content_lifetime_(std::stoul(content_lifetime)) {
int ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
@@ -98,15 +73,6 @@ AsyncConsumerProducer::AsyncConsumerProducer(
}
producer_socket_.registerPrefix(prefix_);
-
- // Let the main thread to catch SIGINT and SIGQUIT
- signals_.async_wait(
- [this](const std::error_code& errorCode, int signal_number) {
- TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
- (unsigned long)request_counter_);
- producer_socket_.stop();
- connector_.close();
- });
}
void AsyncConsumerProducer::start() {
@@ -116,7 +82,19 @@ void AsyncConsumerProducer::start() {
void AsyncConsumerProducer::run() {
start();
- io_service_.run();
+
+ if (!external_io_service_) {
+ io_service_.run();
+ }
+}
+
+void AsyncConsumerProducer::stop() {
+ io_service_.post([this]() {
+ TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
+ (unsigned long)request_counter_);
+ producer_socket_.stop();
+ connector_.close();
+ });
}
void AsyncConsumerProducer::doReceive() {
@@ -124,11 +102,13 @@ void AsyncConsumerProducer::doReceive() {
interface::ProducerCallbacksOptions::CACHE_MISS,
[this](interface::ProducerSocket& producer,
interface::Interest& interest) {
- // core::Name n(interest.getWritableName(), true);
- io_service_.post(std::bind(
- &AsyncConsumerProducer::manageIncomingInterest, this,
- interest.getWritableName(), interest.acquireMemBufReference(),
- interest.getPayload().release()));
+ if (interest.payloadSize() > 0) {
+ // Interest may contain http request
+ io_service_.post(std::bind(
+ &AsyncConsumerProducer::manageIncomingInterest, this,
+ interest.getWritableName(), interest.acquireMemBufReference(),
+ interest.getPayload().release()));
+ }
});
producer_socket_.connect();
@@ -141,16 +121,15 @@ void AsyncConsumerProducer::manageIncomingInterest(
auto _it = chunk_number_map_.find(name);
auto _end = chunk_number_map_.end();
- std::cout << "Received interest " << seg << std::endl;
-
if (_it != _end) {
if (_it->second.second) {
- // Content is in production
+ TRANSPORT_LOGD(
+ "Content is in production, interests will be satisfied shortly.");
return;
}
if (seg >= _it->second.first) {
- TRANSPORT_LOGI(
+ TRANSPORT_LOGD(
"Ignoring interest with name %s for a content object which does not "
"exist. (Request: %u, max: %u)",
name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
@@ -158,8 +137,6 @@ void AsyncConsumerProducer::manageIncomingInterest(
}
}
- std::cout << "Received interest " << seg << std::endl;
-
bool is_mpd =
HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length());
@@ -212,7 +189,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data,
}
it->second.first +=
- producer_socket_.produce(name, data, size, is_last, start_suffix);
+ producer_socket_.produceStream(name, data, size, is_last, start_suffix);
if (is_last) {
it->second.second = false;