diff options
author | Enrico Loparco (eloparco) <eloparco@cisco.com> | 2021-06-24 09:15:41 +0200 |
---|---|---|
committer | Enrico Loparco (eloparco) <eloparco@cisco.com> | 2021-06-24 09:15:41 +0200 |
commit | 229385955109b866a23c4ac2aa03d4d11044c39d (patch) | |
tree | 0591f9c2fc4144d62330337cc2b94c63dfeded54 /apps | |
parent | 6ffbb5ed61733b8dbef39b1a9d437e899e9359d7 (diff) |
[HICN-708] Rebase with master
Signed-off-by: Enrico Loparco (eloparco) <eloparco@cisco.com>
Change-Id: I2122e1d61dd3b2e039972624ffbdbcb3c5610159
Diffstat (limited to 'apps')
20 files changed, 1723 insertions, 234 deletions
diff --git a/apps/.clang-format b/apps/.clang-format new file mode 100644 index 000000000..cd21e2017 --- /dev/null +++ b/apps/.clang-format @@ -0,0 +1,14 @@ +# Copyright (c) 2017-2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BasedOnStyle: Google diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index f5075a7aa..37e44f9e7 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -29,15 +29,19 @@ set(HICN_APPS hicn-apps CACHE INTERNAL "" FORCE) if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) find_package(Libtransport REQUIRED) + find_package(hicnctrl REQUIRED) find_package(Threads REQUIRED) - include_directories(${LIBTRANSPORT_INCLUDE_DIRS}) else() - if (${CMAKE_SYSTEM_NAME} STREQUAL "Android") + if (DISABLE_SHARED_LIBRARIES) find_package(OpenSSL REQUIRED) - find_package(ZLIB REQUIRED) + if (NOT WIN32) + find_package(ZLIB REQUIRED) + endif () set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC}) + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC}) else () set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED}) + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED}) endif () list(APPEND DEPENDENCIES @@ -45,21 +49,19 @@ else() ) endif() -set(SUFFIX "") -if (${LIBTRANSPORT_LIBRARIES} MATCHES ".*-memif.*") - set(DEPENDENCIES ${LIBMEMIF_SHARED}) - set(SUFFIX "-memif") +# Worksroung for unresolved symbols in vpp libraries +if(${CMAKE_SYSTEM_NAME} MATCHES Linux) set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs") endif() -set(HICN_APPS "${HICN_APPS}${SUFFIX}") - list(APPEND LIBRARIES ${LIBTRANSPORT_LIBRARIES} + ${LIBHICNCTRL_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ) +set(APPS_LIBRARY_LIST "${OPENSSL_LIBRARIES};${CMAKE_THREAD_LIBS_INIT}" CACHE INTERNAL "APPS_LIBRARY_LIST") if (WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996") endif () @@ -68,6 +70,7 @@ include(Packaging) set(HIGET higet) set(HTTP_PROXY hicn-http-proxy) - -add_subdirectory(http-proxy) +if (NOT WIN32) + add_subdirectory(http-proxy) +endif () add_subdirectory(higet) diff --git a/apps/cmake/Modules/Packaging.cmake b/apps/cmake/Modules/Packaging.cmake index 6a6e34777..9b3fa2e72 100644 --- a/apps/cmake/Modules/Packaging.cmake +++ b/apps/cmake/Modules/Packaging.cmake @@ -22,7 +22,17 @@ set(${HICN_APPS}_DEB_DEPENDENCIES CACHE STRING "Dependencies for deb/rpm package." ) +set(${HICN_APPS}-dev_DEB_DEPENDENCIES + "lib${LIBTRANSPORT}-dev (>= stable_version)" + CACHE STRING "Dependencies for deb/rpm package." +) + set(${HICN_APPS}_RPM_DEPENDENCIES "lib${LIBTRANSPORT} >= stable_version" CACHE STRING "Dependencies for deb/rpm package." +) + +set(${HICN_APPS}-dev_RPM_DEPENDENCIES + "lib${LIBTRANSPORT}-devel >= stable_version" + CACHE STRING "Dependencies for deb/rpm package." )
\ No newline at end of file diff --git a/apps/higet/CMakeLists.txt b/apps/higet/CMakeLists.txt index 1cf14c287..b929a24e4 100644 --- a/apps/higet/CMakeLists.txt +++ b/apps/higet/CMakeLists.txt @@ -31,6 +31,10 @@ list(APPEND APPS_SRC higet.cc ) +if (WIN32) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\"" ) +endif() + if (NOT DISABLE_EXECUTABLES) build_executable(${HIGET} SOURCES ${APPS_SRC} diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc index 2aa42e460..9ae869731 100644 --- a/apps/higet/higet.cc +++ b/apps/higet/higet.cc @@ -14,12 +14,12 @@ */ #include <hicn/transport/http/client_connection.h> + +#include <algorithm> #include <fstream> +#include <functional> #include <map> -#include <experimental/algorithm> -#include <experimental/functional> - #define ASIO_STANDALONE #include <asio.hpp> #undef ASIO_STANDALONE @@ -114,15 +114,12 @@ class ReadBytesCallbackImplementation // read next chunk size const char *begin = (const char *)payload->data(); const char *end = (const char *)payload->tail(); - - using std::experimental::make_boyer_moore_searcher; - auto it = std::experimental::search( - begin, end, - make_boyer_moore_searcher(chunk_separator.begin(), - chunk_separator.end())); + const char *begincrlf2 = (const char *)chunk_separator.c_str(); + const char *endcrlf2 = begincrlf2 + chunk_separator.size(); + auto it = std::search(begin, end, begincrlf2, endcrlf2); if (it != end) { chunk_size_ = std::stoul(begin, 0, 16); - content_size_ += chunk_size_; + content_size_ += (long)chunk_size_; payload->trimStart(it + chunk_separator.size() - begin); std::size_t to_write; @@ -134,7 +131,7 @@ class ReadBytesCallbackImplementation } out_->write((char *)payload->data(), to_write); - byte_downloaded_ += to_write; + byte_downloaded_ += (long)to_write; payload->trimStart(to_write); if (payload->length() >= chunk_separator.size()) { @@ -144,7 +141,7 @@ class ReadBytesCallbackImplementation } } else { out_->write((char *)payload->data(), payload->length()); - byte_downloaded_ += payload->length(); + byte_downloaded_ += (long)payload->length(); } if (file_name_ != "-") { @@ -200,12 +197,18 @@ class ReadBytesCallbackImplementation void print_bar(long value, long max_value, bool last) { float progress = (float)value / max_value; +#ifdef _WIN32 + CONSOLE_SCREEN_BUFFER_INFO csbi; + GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi); + int barWidth = csbi.srWindow.Right - csbi.srWindow.Left + 7; +#else struct winsize size; ioctl(STDOUT_FILENO, TIOCGWINSZ, &size); int barWidth = size.ws_col - 8; +#endif std::cout << "["; - int pos = barWidth * progress; + int pos = barWidth * (int)progress; for (int i = 0; i < barWidth; ++i) { if (i < pos) { std::cout << "="; @@ -252,7 +255,7 @@ long checkFileStatus(std::string file_name) { void usage(char *program_name) { std::cerr << "usage:" << std::endl; std::cerr << program_name << " [option]... [url]..." << std::endl; - std::cerr << program_name << "options:" << std::endl; + std::cerr << program_name << " options:" << std::endl; std::cerr << "-O <out_put_path> = write documents to <out_put_file>" << std::endl; @@ -303,8 +306,11 @@ int main(int argc, char **argv) { } } - name = argv[optind]; + if (!argv[optind]) { + usage(argv[0]); + } + name = argv[optind]; std::cerr << "Using name " << name << " and name first word " << conf.ipv6_first_word << std::endl; @@ -329,9 +335,14 @@ int main(int argc, char **argv) { {"Connection", "Keep-Alive"}, {"Range", range}}; } + transport::http::HTTPClientConnection connection; + if (!conf.producer_certificate.empty()) { - connection.setCertificate(conf.producer_certificate); + std::shared_ptr<transport::auth::Verifier> verifier = + std::make_shared<transport::auth::AsymmetricVerifier>( + conf.producer_certificate); + connection.setVerifier(verifier); } t1 = std::chrono::system_clock::now(); diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index d6681097c..8c2043c30 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -14,6 +14,18 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) set(CMAKE_CXX_STANDARD 14) +# -Wno-c99-designator issue +# +# Unsure about version for which this was introduced +# clang version 9.0.8 (no flag), 11.0.5 (ndk22, flag) +if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + EXECUTE_PROCESS( COMMAND ${CMAKE_CXX_COMPILER} --version OUTPUT_VARIABLE clang_full_version_string ) + string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION_STRING ${clang_full_version_string}) + if (CLANG_VERSION_STRING VERSION_GREATER_EQUAL 11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-c99-designator") + endif() +endif() + set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules/" @@ -24,39 +36,36 @@ if (NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "Release") endif () -include_directories( - SYSTEM - ${CMAKE_BINARY_DIR} - ${LIB${TRANSPORT_LIBRARY}_INCLUDE_DIR} -) - set(LIB_SOURCE_FILES - src/ATSConnector.cc - src/HTTP1.xMessageFastParser.cc - src/IcnReceiver.cc -) - -set(LIB_SERVER_HEADER_FILES - src/IcnReceiver.h - src/ATSConnector.h - src/HTTP1.xMessageFastParser.h + src/http_session.cc + src/http_proxy.cc + src/http_1x_message_fast_parser.cc + src/icn_receiver.cc + src/forwarder_interface.cc ) set(APP_SOURCE_FILES main.cc ) +add_subdirectory(includes/hicn/http-proxy) set(LIBHTTP_PROXY hicnhttpproxy) set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) +list(APPEND COMPILER_DEFINITIONS + -DWITH_POLICY +) + build_library(${LIBHTTP_PROXY} STATIC SOURCES ${LIB_SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} DEPENDS ${DEPENDENCIES} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} + INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} + DEFINITIONS ${COMPILER_DEFINITIONS} ) if (NOT DISABLE_EXECUTABLES) diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt new file mode 100644 index 000000000..75cbbd64b --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_proxy.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_session.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_1x_message_fast_parser.h + ${CMAKE_CURRENT_SOURCE_DIR}/icn_receiver.h + ${CMAKE_CURRENT_SOURCE_DIR}/utils.h +) + +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) + +set(LIBPROXY_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/../.. "" + CACHE INTERNAL + "" FORCE +) + +set(LIBPROXY_TO_INSTALL_HEADER_FILES + ${HEADER_FILES} "" + CACHE INTERNAL + "" FORCE +) + diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h new file mode 100644 index 000000000..19c96a9e3 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/utils/branch_prediction.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> + +#include <asio.hpp> +#include <chrono> +#include <sstream> +#include <string> + +#include "forwarder_interface.h" + + +#define RETRY_INTERVAL 300 + +namespace transport { + +static constexpr char server_header[] = "server"; +static constexpr char prefix_header[] = "prefix"; +static constexpr char port_header[] = "port"; + +using OnForwarderConfiguredCallback = std::function<void(bool)>; + +class ForwarderConfig { + public: + using ListenerRetrievedCallback = std::function<void(std::error_code)>; + + template <typename Callback> + ForwarderConfig(asio::io_service& io_service, Callback&& callback) + : forwarder_interface_(io_service), + resolver_(io_service), + retx_count_(0), + timer_(io_service), + hicn_listen_port_(~0), + listener_retrieved_callback_(std::forward<Callback>(callback)) {} + + void close() { + timer_.cancel(); + resolver_.cancel(); + forwarder_interface_.close(); + } + + void tryToConnectToForwarder() { + doTryToConnectToForwarder(std::make_error_code(std::errc(0))); + } + + void doTryToConnectToForwarder(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.connectToForwarder(); + if (ret < 0) { + // We were not able to connect to the local forwarder. Do not give up + // and retry. + TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, + this, std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + doGetMainListener(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); + } + } + + void doGetMainListener(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.getMainListenerPort(); + if (ret <= 0) { + // Since without the main listener of the forwarder the proxy cannot + // work, we can stop the program here until we get the listener port. + TRANSPORT_LOGE( + "Could not retrieve main listener port from the forwarder. " + "Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, + std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + hicn_listen_port_ = uint16_t(ret); + listener_retrieved_callback_(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + } + } + + template <typename Callback> + TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, + Callback&& callback) { + std::stringstream ss(header); + route_info_t* ret = new route_info_t(); + std::string port_string; + + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + + if (TRANSPORT_EXPECT_FALSE(substr.empty())) { + continue; + } + + utils::trim(substr); + auto it = std::find_if(substr.begin(), substr.end(), + [](int ch) { return ch == '='; }); + if (it != std::end(substr)) { + auto key = std::string(substr.begin(), it); + auto value = std::string(it + 1, substr.end()); + + if (key == server_header) { + ret->remote_addr = value; + } else if (key == prefix_header) { + auto it = std::find_if(value.begin(), value.end(), + [](int ch) { return ch == '/'; }); + + if (it != std::end(value)) { + ret->route_addr = std::string(value.begin(), it); + ret->route_len = std::stoul(std::string(it + 1, value.end())); + } else { + return false; + } + } else if (key == port_header) { + ret->remote_port = std::stoul(value); + port_string = value; + } else { + // Header not recognized + return false; + } + } + } + + /* + * Resolve server address + */ + auto results = + resolver_.resolve({ret->remote_addr, port_string, + asio::ip::resolver_query_base::numeric_service}); + +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::udp::resolver::iterator end; + auto& it = results; + while (it != end) { +#else + for (auto it = results.begin(); it != results.end(); it++) { +#endif + if (it->endpoint().address().is_v4()) { + // Use this v4 address to configure the forwarder. + ret->remote_addr = it->endpoint().address().to_string(); + ret->family = AF_INET; + std::string _prefix = ret->route_addr; + forwarder_interface_.createFaceAndRoute( + RouteInfoPtr(ret), [callback = std::forward<Callback>(callback), + configured_prefix = std::move(_prefix)]( + uint32_t route_id, bool result) { + callback(result, configured_prefix); + }); + + return true; + } +#if ((ASIO_VERSION / 100 % 1000) < 12) + it++; +#endif + } + + return false; + } + + private: + ForwarderInterface forwarder_interface_; + asio::ip::udp::resolver resolver_; + std::uint32_t retx_count_; + asio::steady_timer timer_; + uint16_t hicn_listen_port_; + ListenerRetrievedCallback listener_retrieved_callback_; +}; // namespace transport + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h new file mode 100644 index 000000000..54941a4ba --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern "C" { +#include <hicn/ctrl/api.h> +#include <hicn/util/ip_address.h> +} + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE 1 +#endif +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <functional> +#include <thread> +#include <unordered_map> + +namespace transport { + +typedef std::function<void(uint32_t, bool)> SetRouteCallback; + +struct route_info_t { + int family; + std::string remote_addr; + uint16_t remote_port; + std::string route_addr; + uint8_t route_len; +}; + +using RouteInfoPtr = std::shared_ptr<route_info_t>; + +class ForwarderInterface { + public: + ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), + work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), + sock_(nullptr), + thread_(std::make_unique<std::thread>( + [this]() { internal_ioservice_.run(); })), + check_routes_timer_(nullptr), + pending_add_route_counter_(0), + route_id_(0), + closed_(false) {} + + ~ForwarderInterface(); + + int connectToForwarder(); + + void removeConnectedUserNow(uint32_t route_id); + + // to be called at the server + // at the client this creates a race condition + // and the program enters in a loop + void scheduleRemoveConnectedUser(uint32_t route_id); + + template <typename Callback> + void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { + internal_ioservice_.post([this, _route_info = std::move(route_info), + _callback = std::forward<Callback>(callback)]() { + pending_add_route_counter_++; + uint8_t max_try = 5; + auto timer = new asio::steady_timer(internal_ioservice_); + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(_callback)); + }); + } + + int32_t getMainListenerPort(); + + void close(); + + private: + void internalRemoveConnectedUser(uint32_t route_id); + + void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback); + + int tryToCreateFaceAndRoute(route_info_t *route_info); + + asio::io_service &external_ioservice_; + asio::io_service internal_ioservice_; + std::unique_ptr<asio::io_service::work> work_; + hc_sock_t *sock_; + std::unique_ptr<std::thread> thread_; + std::unordered_map<uint32_t, RouteInfoPtr> route_status_; + std::unique_ptr<asio::steady_timer> check_routes_timer_; + uint32_t pending_add_route_counter_; + uint32_t route_id_; + bool closed_; +}; + +} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h index 79dbce19d..7c035c83b 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_1x_message_fast_parser.h @@ -15,16 +15,43 @@ #pragma once +#include <hicn/transport/http/message.h> + #include <algorithm> #include <string> -#include <hicn/transport/http/message.h> - using transport::http::HTTPHeaders; +namespace transport { +struct Metadata; +} + class HTTPMessageFastParser { public: - static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); + static constexpr char http_ok[] = + "HTTP/1.1 200 OK\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n\r\n"; + + static constexpr char http_cors[] = + "HTTP/1.1 200 OK\r\n" + "Date: %s\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Access-Control-Allow-Methods: GET\r\n" + "Access-Control-Allow-Headers: hicn\r\n" + "Access-Control-Max-Age: 1800\r\n\r\n"; + + static constexpr char http_failed[] = + "HTTP/1.1 500 Internal Server Error\r\n" + "Date: %s\r\n" + "Content-Length: 0\r\nConnection: " + "close\r\n\r\n"; + + static void getHeaders(const uint8_t* headers, std::size_t length, + bool request, transport::Metadata* metadata); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h new file mode 100644 index 000000000..a4139a620 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/utils/event_thread.h> + +#include "forwarder_config.h" +#include "http_session.h" +#include "icn_receiver.h" + +#define ASIO_STANDALONE +#include <asio.hpp> +#include <asio/version.hpp> +#include <unordered_set> + +class TcpListener { + public: + using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>; + + TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + : acceptor_(io_service), +#if ((ASIO_VERSION / 100 % 1000) < 12) + socket_(io_service), +#endif + callback_(callback) { + acceptor_.open(asio::ip::tcp::v4()); + typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> + reuse_port; + acceptor_.set_option(reuse_port(true)); + acceptor_.bind(asio::ip::tcp::endpoint( + asio::ip::address::from_string("127.0.0.1"), port)); + acceptor_.listen(); + } + + public: + void doAccept() { +#if ((ASIO_VERSION / 100 % 1000) >= 12) + acceptor_.async_accept( + [this](std::error_code ec, asio::ip::tcp::socket socket) { +#else + acceptor_.async_accept(socket_, [this](std::error_code ec) { + auto socket = std::move(socket_); +#endif + if (!ec) { + callback_(std::move(socket)); + doAccept(); + } + }); + } + + void stop() { acceptor_.close(); } + + asio::ip::tcp::acceptor acceptor_; +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::tcp::socket socket_; +#endif + AcceptCallback callback_; +}; + +namespace transport { + +class HTTPClientConnectionCallback; + +class Receiver { + public: + Receiver() : thread_() {} + virtual ~Receiver() = default; + void stopAndJoinThread() { thread_.stop(); } + virtual void stop() = 0; + + protected: + utils::EventThread thread_; +}; + +class TcpReceiver : public Receiver { + friend class HTTPClientConnectionCallback; + + public: + TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word); + + void stop() override; + + private: + void onNewConnection(asio::ip::tcp::socket&& socket); + void onClientDisconnect(HTTPClientConnectionCallback* client); + + template <typename Callback> + void parseHicnHeader(std::string& hicn_header, Callback&& callback) { + forwarder_config_.parseHicnHeader(hicn_header, + std::forward<Callback>(callback)); + } + + TcpListener listener_; + std::string prefix_; + std::string ipv6_first_word_; + std::string prefix_hash_; + std::deque<HTTPClientConnectionCallback*> http_clients_; + std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; + ForwarderConfig forwarder_config_; + bool stopped_; +}; + +class IcnReceiver : public Receiver { + public: + template <typename... Args> + IcnReceiver(Args&&... args) + : Receiver(), + icn_consum_producer_(thread_.getIoService(), + std::forward<Args>(args)...) { + icn_consum_producer_.run(); + } + + void stop() override { + thread_.add([this]() { + /* Stop the listener */ + icn_consum_producer_.stop(); + }); + } + + private: + AsyncConsumerProducer icn_consum_producer_; +}; + +class HTTPProxy { + public: + enum Server { CREATE }; + enum Client { WRAP_BUFFER }; + + struct CommonParams { + std::string prefix; + std::string first_ipv6_word; + + virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + }; + + struct ClientParams : virtual CommonParams { + short tcp_listen_port; + void printParams() override { + std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "HTTP listen port: " << tcp_listen_port << std::endl; + std::cout << "\t" + << "Consumer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + } + }; + + struct ServerParams : virtual CommonParams { + std::string origin_address; + std::string origin_port; + std::string cache_size; + std::string mtu; + std::string content_lifetime; + bool manifest; + + void printParams() override { + std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "Origin address: " << origin_address << std::endl; + std::cout << "\t" + << "Origin port: " << origin_port << std::endl; + std::cout << "\t" + << "Producer cache size: " << cache_size << std::endl; + std::cout << "\t" + << "hICN MTU: " << mtu << std::endl; + std::cout << "\t" + << "Default content lifetime: " << content_lifetime + << std::endl; + std::cout << "\t" + << "Producer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + std::cout << "\t" + << "Use manifest: " << manifest << std::endl; + } + }; + + HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); + HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); + + void run() { main_io_context_.run(); } + void stop(); + + private: + void setupSignalHandler(); + + std::vector<std::unique_ptr<Receiver>> receivers_; + asio::io_service main_io_context_; + asio::signal_set signals_; +}; + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h index 8d91b7b7b..f4a3dbdee 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h @@ -17,6 +17,8 @@ #include <hicn/transport/core/packet.h> +#include "http_1x_message_fast_parser.h" + #define ASIO_STANDALONE #include <asio.hpp> #include <deque> @@ -26,16 +28,36 @@ namespace transport { using asio::ip::tcp; +struct Metadata; + typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, - bool headers)> + bool headers, Metadata *metadata)> ContentReceivedCallback; -typedef std::function<void()> OnReconnect; +typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed; typedef std::function<void()> ContentSentCallback; typedef std::deque< std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> BufferQueue; -class ATSConnector { +struct Metadata { + std::string http_version; + HTTPHeaders headers; +}; + +struct RequestMetadata : Metadata { + std::string method; + std::string path; +}; + +struct ResponseMetadata : Metadata { + std::string status_code; + std::string status_string; +}; + +class HTTPClientConnectionCallback; + +class HTTPSession { + friend class HTTPClientConnectionCallback; static constexpr uint32_t buffer_size = 1024 * 512; enum class ConnectorState { @@ -45,11 +67,15 @@ class ATSConnector { }; public: - ATSConnector(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback); + HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = false); - ~ATSConnector(); + HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = true); + + ~HTTPSession(); void send(const uint8_t *buffer, std::size_t len, ContentSentCallback &&content_sent = 0); @@ -65,9 +91,6 @@ class ATSConnector { void doReadBody(std::size_t body_size, std::size_t additional_bytes); - // void handleReadChunked(std::error_code ec, std::size_t length, - // std::size_t size); - void doReadChunkedHeader(); void doWrite(); @@ -90,6 +113,7 @@ class ATSConnector { asio::streambuf input_buffer_; + bool reverse_; bool is_reconnection_; bool data_available_; @@ -100,7 +124,10 @@ class ATSConnector { bool chunked_; ContentReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; + OnConnectionClosed on_connection_closed_callback_; + + // HTTP headers + std::unique_ptr<Metadata> header_info_; // Connector state ConnectorState state_; diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h index 9d0ab5172..780037665 100644 --- a/apps/http-proxy/src/IcnReceiver.h +++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h @@ -13,18 +13,20 @@ * limitations under the License. */ -#include "ATSConnector.h" - #include <hicn/transport/core/prefix.h> #include <hicn/transport/interfaces/publication_options.h> #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/spinlock.h> +#include <asio.hpp> #include <cassert> #include <cstring> #include <queue> #include <utility> +#include <hicn/http-proxy/http_session.h> +//#include "http_session.h" + namespace transport { class AsyncConsumerProducer { @@ -33,17 +35,31 @@ class AsyncConsumerProducer { using RequestQueue = std::queue<interface::PublicationOptions>; public: - explicit AsyncConsumerProducer(const std::string& prefix, - std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, - std::string& first_ipv6_word, - unsigned long default_content_lifetime, bool manifest); - - void start(); + explicit AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, + bool manifest); + + explicit AsyncConsumerProducer( + const std::string& prefix, const std::string& first_ipv6_word, + const std::string& origin_address, const std::string& origin_port, + const std::string& cache_size, const std::string& mtu, + const std::string& content_lifetime, bool manifest) + : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, + origin_address, origin_port, cache_size, mtu, + content_lifetime, manifest) { + external_io_service_ = false; + } void run(); + void stop(); + private: + void start(); + void doSend(); void doReceive(); @@ -55,7 +71,9 @@ class AsyncConsumerProducer { utils::MemBuf* payload); core::Prefix prefix_; - asio::io_service io_service_; + asio::io_service& io_service_; + asio::io_service internal_io_service_; + bool external_io_service_; interface::ProducerSocket producer_socket_; std::string ip_address_; @@ -64,11 +82,10 @@ class AsyncConsumerProducer { uint32_t mtu_; uint64_t request_counter_; - asio::signal_set signals_; // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> // connection_map_; - ATSConnector connector_; + HTTPSession connector_; unsigned long default_content_lifetime_; diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h new file mode 100644 index 000000000..d87c796d0 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/utils/hash.h> + +#include <sstream> +#include <string> + +#pragma once + +TRANSPORT_ALWAYS_INLINE std::string generatePrefix( + const std::string& prefix_url, const std::string& first_ipv6_word) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << first_ipv6_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::"; + + return stream.str(); +}
\ No newline at end of file diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index 20655071c..d9c29ecde 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -13,53 +13,128 @@ * limitations under the License. */ -#include "src/IcnReceiver.h" +#include <hicn/http-proxy/http_proxy.h> using namespace transport; int usage(char* program) { - std::cerr << "ICN Plugin not loaded!" << std::endl; - std::cerr << "USAGE: " << program << "\n" - << "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] " - "-p [SERVER_PORT] -c [CACHE_SIZE] -m [MTU] -l [DEFAULT_LIFETIME " - "(seconds)] -P [FIRST_IPv6_WORD_HEX] -M (enable manifest)" + std::cerr << "USAGE: " << program << "[-C|-S] [options] <http_prefix>\n" + << "Server or Client: \n" + << " -P [FIRST_IPv6_WORD_HEX]\n" + << " -t [number of threads]\n" + << "Client Options: \n" + << " -L [PROXY_LISTEN_PORT]\n" + << "Server Options: \n" + << " -a [ORIGIN_IP_ADDRESS]\n" + << " -p [ORIGIN_PORT]\n" + << " -c [CACHE_SIZE]\n" + << " -m [MTU]" + << " -l [DEFAULT_CONTENT_LIFETIME] (seconds)\n" + << " -M (enable manifest)\n" + << std::endl + << "Example Server:\n" + << " " << program + << " -S -a example.com -p 80 -c 10000 -m 1300 -l 7200 -M -t 1 " + "http://httpserver\n" + << "Example Client:\n" + << " " << program << " -C -L 9091 http://httpserver\n" << std::endl; return -1; } +struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { + void printParams() override { + if (client) { + HTTPProxy::ClientParams::printParams(); + } else if (server) { + HTTPProxy::ServerParams::printParams(); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + + std::cout << "\t" + << "N Threads: " << n_thread << std::endl; + } + + HTTPProxy* instantiateProxyAsValue() { + if (client) { + HTTPProxy::ClientParams* p = dynamic_cast<HTTPProxy::ClientParams*>(this); + return new transport::HTTPProxy(*p, n_thread); + } else if (server) { + HTTPProxy::ServerParams* p = dynamic_cast<HTTPProxy::ServerParams*>(this); + return new transport::HTTPProxy(*p, n_thread); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + } + + bool client = false; + bool server = false; + std::uint16_t n_thread = 1; +}; + int main(int argc, char** argv) { - std::string prefix("http://hicn-http-proxy"); - std::string ip_address("127.0.0.1"); - std::string port("80"); - std::string cache_size("50000"); - std::string mtu("1500"); - std::string first_ipv6_word("b001"); - std::string default_content_lifetime("7200"); // seconds - bool manifest = false; + Params params; + + params.prefix = "http://hicn-http-proxy"; + params.origin_address = "127.0.0.1"; + params.origin_port = "80"; + params.cache_size = "50000"; + params.mtu = "1500"; + params.first_ipv6_word = "b001"; + params.content_lifetime = "7200;"; // seconds + params.manifest = false; + params.tcp_listen_port = 8080; int opt; - while ((opt = getopt(argc, argv, "a:p:c:m:P:l:M")) != -1) { + while ((opt = getopt(argc, argv, "CSa:p:c:m:P:l:ML:t:")) != -1) { switch (opt) { + case 'C': + if (params.server) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.client = true; + break; + case 'S': + if (params.client) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.server = true; + break; case 'a': - ip_address = optarg; + params.origin_address = optarg; break; case 'p': - port = optarg; + params.origin_port = optarg; break; case 'c': - cache_size = optarg; + params.cache_size = optarg; break; case 'm': - mtu = optarg; + params.mtu = optarg; break; case 'P': - first_ipv6_word = optarg; + params.first_ipv6_word = optarg; break; case 'l': - default_content_lifetime = optarg; + params.content_lifetime = optarg; + break; + case 'L': + params.tcp_listen_port = std::stoul(optarg); break; case 'M': - manifest = true; + params.manifest = true; + break; + case 't': + params.n_thread = std::stoul(optarg); break; case 'h': default: @@ -69,19 +144,15 @@ int main(int argc, char** argv) { } if (argv[optind] == 0) { - std::cerr << "Using default prefix " << prefix << std::endl; + std::cerr << "Using default prefix " << params.prefix << std::endl; } else { - prefix = argv[optind]; + params.prefix = argv[optind]; } - std::cout << "Connecting to " << ip_address << " port " << port - << " Cache size " << cache_size << " Prefix " << prefix << " MTU " - << mtu << " IPv6 first word " << first_ipv6_word << std::endl; - transport::AsyncConsumerProducer proxy( - prefix, ip_address, port, cache_size, mtu, first_ipv6_word, - std::stoul(default_content_lifetime) * 1000, manifest); - - proxy.run(); + params.printParams(); + auto proxy = params.instantiateProxyAsValue(); + proxy->run(); + delete proxy; return 0; }
\ No newline at end of file diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc new file mode 100644 index 000000000..7d8235ac6 --- /dev/null +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <arpa/inet.h> +#include <hicn/http-proxy/forwarder_interface.h> +#include <hicn/transport/utils/log.h> + +#include <chrono> +#include <iostream> +#include <thread> +#include <unordered_set> + +namespace transport { + +ForwarderInterface::~ForwarderInterface() {} + +int ForwarderInterface::connectToForwarder() { + sock_ = hc_sock_create(); + if (!sock_) return -1; + + if (hc_sock_connect(sock_) < 0) { + hc_sock_free(sock_); + sock_ = nullptr; + return -1; + } + + return 0; +} + +void ForwarderInterface::close() { + if (!closed_) { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); + + if (thread_->joinable()) { + thread_->join(); + } + } +} + +void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) { + internalRemoveConnectedUser(route_id); +} + +void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) { + internal_ioservice_.post( + [this, route_id]() { internalRemoveConnectedUser(route_id); }); +} + +int32_t ForwarderInterface::getMainListenerPort() { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) return -1; + + int ret = -1; + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + ret = l->local_port; + break; + } + } + + hc_data_free(data); + return ret; +} + +void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { + auto it = route_status_.find(route_id); + if (it == route_status_.end()) return; + + if (!sock_) return; + + // remove route + hc_data_t *data; + if (hc_route_list(sock_, &data) < 0) return; + + std::vector<hc_route_t *> routes_to_remove; + foreach_route(r, data) { + char remote_addr[INET6_ADDRSTRLEN]; + int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + if (ret < 0) continue; + + std::string route_addr(remote_addr); + if (route_addr.compare(it->second->route_addr) == 0 && + r->len == it->second->route_len) { + // route found + routes_to_remove.push_back(r); + } + } + + route_status_.erase(it); + + if (routes_to_remove.size() == 0) { + // nothing to do here + hc_data_free(data); + return; + } + + std::unordered_set<uint32_t> connids_to_remove; + for (unsigned i = 0; i < routes_to_remove.size(); i++) { + connids_to_remove.insert(routes_to_remove[i]->face_id); + if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing route from forwarder."); + } + } + + // remove connection + if (hc_connection_list(sock_, &data) < 0) { + hc_data_free(data); + return; + } + + // collects pointerst to the connections using the conn IDs + std::vector<hc_connection_t *> conns_to_remove; + foreach_connection(c, data) { + if (connids_to_remove.find(c->id) != connids_to_remove.end()) { + // conn found + conns_to_remove.push_back(c); + } + } + + if (conns_to_remove.size() == 0) { + // nothing else to do here + hc_data_free(data); + return; + } + + for (unsigned i = 0; i < conns_to_remove.size(); i++) { + if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing connection from forwarder."); + } + } + + hc_data_free(data); +} + +void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, + uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback) { + int ret = tryToCreateFaceAndRoute(route_info.get()); + + if (ret < 0 && max_try > 0) { + max_try--; + timer->expires_from_now(std::chrono::milliseconds(500)); + timer->async_wait([this, _route_info = std::move(route_info), max_try, + timer, callback](std::error_code ec) { + if (ec) return; + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(callback)); + }); + return; + } + + if (max_try == 0 && ret < 0) { + pending_add_route_counter_--; + external_ioservice_.post([callback]() { callback(false, ~0); }); + } else { + pending_add_route_counter_--; + route_status_[route_id_] = std::move(route_info); + external_ioservice_.post( + [route_id = route_id_, callback]() { callback(route_id, true); }); + route_id_++; + } + + delete timer; +} + +int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) { + return -1; + } + + bool found = false; + uint32_t face_id; + + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + found = true; + + ip_address_t remote_ip; + if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) { + hc_data_free(data); + return -1; + } + + hc_face_t face; + memset(&face, 0, sizeof(hc_face_t)); + + face.face.type = FACE_TYPE_UDP; + face.face.family = route_info->family; + face.face.local_addr = l->local_addr; + face.face.remote_addr = remote_ip; + face.face.local_port = l->local_port; + face.face.remote_port = route_info->remote_port; + + if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) { + hc_data_free(data); + return -1; + } + + if (hc_face_create(sock_, &face) < 0) { + hc_data_free(data); + return -1; + } + + face_id = face.id; + break; + } + } + + if (!found) { + hc_data_free(data); + return -1; + } + + ip_address_t route_ip; + hc_route_t route; + + if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + hc_data_free(data); + return -1; + } + + route.face_id = face_id; + route.family = AF_INET6; + route.remote_addr = route_ip; + route.len = route_info->route_len; + route.cost = 1; + + if (hc_route_create(sock_, &route) < 0) { + hc_data_free(data); + return -1; + } + + hc_data_free(data); + return 0; +} + +} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc index 729eb3aeb..4b6b78d55 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc @@ -13,33 +13,48 @@ * limitations under the License. */ -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_session.h> +#include <hicn/transport/http/request.h> #include <hicn/transport/http/response.h> #include <experimental/algorithm> #include <experimental/functional> #include <iostream> +constexpr char HTTPMessageFastParser::http_ok[]; +constexpr char HTTPMessageFastParser::http_cors[]; +constexpr char HTTPMessageFastParser::http_failed[]; + std::string HTTPMessageFastParser::numbers = "0123456789"; -std::string HTTPMessageFastParser::content_length = "Content-Length"; -std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::content_length = "content-length"; +std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding"; std::string HTTPMessageFastParser::chunked = "chunked"; -std::string HTTPMessageFastParser::cache_control = "Cache-Control"; +std::string HTTPMessageFastParser::cache_control = "cache-control"; std::string HTTPMessageFastParser::mpd = "mpd"; -std::string HTTPMessageFastParser::connection = "Connection"; +std::string HTTPMessageFastParser::connection = "connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; -HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, - std::size_t length) { - HTTPHeaders ret; - std::string http_version; - std::string status_code; - std::string status_string; - - if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, - status_code, status_string)) { - return ret; +void HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length, bool request, + transport::Metadata *metadata) { + if (request) { + transport::RequestMetadata *_metadata = + (transport::RequestMetadata *)(metadata); + + if (transport::http::HTTPRequest::parseHeaders( + headers, length, _metadata->headers, _metadata->http_version, + _metadata->method, _metadata->path)) { + return; + } + } else { + transport::ResponseMetadata *_metadata = + (transport::ResponseMetadata *)(metadata); + + if (transport::http::HTTPResponse::parseHeaders( + headers, length, _metadata->headers, _metadata->http_version, + _metadata->status_code, _metadata->status_string)) { + return; + } } throw std::runtime_error("Error parsing response headers."); diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc new file mode 100644 index 000000000..262fcb8e1 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.cc @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/http-proxy/http_proxy.h> +#include <hicn/http-proxy/http_session.h> +#include <hicn/http-proxy/utils.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> + +namespace transport { + +using core::Interest; +using core::Name; +using interface::ConsumerCallbacksOptions; +using interface::ConsumerInterestCallback; +using interface::ConsumerSocket; +using interface::TransportProtocolAlgorithms; + +class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { + public: + HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, + utils::EventThread& thread) + : tcp_receiver_(tcp_receiver), + thread_(thread), + prefix_hash_(tcp_receiver_.prefix_hash_), + consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), + session_(nullptr), + current_size_(0) { + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processInterestRetx, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.connect(); + } + + void stop() { session_->close(); } + + void setHttpSession(asio::ip::tcp::socket&& socket) { + session_ = std::make_unique<HTTPSession>( + std::move(socket), + std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5), + [this](asio::ip::tcp::socket& socket) -> bool { + try { + std::string remote_address = + socket.remote_endpoint().address().to_string(); + std::uint16_t remote_port = socket.remote_endpoint().port(); + TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), + remote_port); + } catch (std::system_error& e) { + // Do nothing + } + + consumer_.stop(); + request_buffer_queue_.clear(); + tcp_receiver_.onClientDisconnect(this); + return false; + }); + + current_size_ = 0; + } + + private: + void consumeNextRequest() { + if (request_buffer_queue_.size() == 0) { + TRANSPORT_LOGD("No additional requests to process."); + return; + } + + auto& buffer = request_buffer_queue_.front().second; + uint64_t request_hash = + utils::hash::fnv64_buf(buffer.data(), buffer.size()); + + std::stringstream name; + name << prefix_hash_.substr(0, prefix_hash_.length() - 2); + + for (uint16_t* word = (uint16_t*)&request_hash; + std::size_t(word) < + (std::size_t(&request_hash) + sizeof(request_hash)); + word++) { + name << ":" << std::hex << *word; + } + + name << "|0"; + + // Non blocking consume :) + consumer_.consume(Name(name.str())); + } + + // tcp callbacks + + void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last, + bool headers, Metadata* metadata) { + if (headers) { + // Add the request to the request queue + RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata); + tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size), + _metadata->path); + if (TRANSPORT_EXPECT_FALSE( + _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) { + /** + * It seems this request is for us. + * Get hicn parameters. + */ + processClientRequest(_metadata); + return; + } + } else { + // Append payload chunk to last request added. Here we are assuming + // HTTP/1.1. + tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size)); + } + + current_size_ += size; + + if (is_last) { + TRANSPORT_LOGD("Request received: %s", + std::string((const char*)tmp_buffer_.first->data(), + tmp_buffer_.first->length()) + .c_str()); + if (current_size_ < 1400) { + request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); + } else { + TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", + current_size_); + session_->close(); + current_size_ = 0; + return; + } + + if (!consumer_.isRunning()) { + TRANSPORT_LOGD( + "Consumer stopped, triggering consume from TCP session " + "handler.."); + consumeNextRequest(); + } + + current_size_ = 0; + } + } + + // hicn callbacks + + void processLeavingInterest(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front().first->clone()); + } + } + + void processInterestRetx(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front().first->clone()); + } + } + + bool isBufferMovable() noexcept { return true; } + void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} + void readDataAvailable(size_t length) noexcept {} + size_t maxBufferSize() const { return 64 * 1024; } + + void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { + // Response received. Send it back to client + auto _buffer = buffer.release(); + TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + session_->send(_buffer, []() {}); + } + + void readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + session_->close(); + } + + void readSuccess(std::size_t total_size) noexcept { + request_buffer_queue_.pop_front(); + consumeNextRequest(); + } + + void processClientRequest(RequestMetadata* metadata) { + auto it = metadata->headers.find("hicn"); + if (it == metadata->headers.end()) { + /* + * Probably it is an OPTION message for access control. + * Let's grant it! + */ + if (metadata->method == "OPTIONS") { + session_->send( + (const uint8_t*)HTTPMessageFastParser::http_cors, + std::strlen(HTTPMessageFastParser::http_cors), [this]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent OPTIONS to client %s:%d", + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + } + } else { + tcp_receiver_.parseHicnHeader( + it->second, [this](bool result, std::string configured_prefix) { + const char* reply = nullptr; + if (result) { + reply = HTTPMessageFastParser::http_ok; + } else { + reply = HTTPMessageFastParser::http_failed; + } + + /* Route created. Send back a 200 OK to client */ + session_->send( + (const uint8_t*)reply, std::strlen(reply), [this, result]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent %d response to client %s:%d", result, + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + }); + } + } + + private: + TcpReceiver& tcp_receiver_; + utils::EventThread& thread_; + std::string& prefix_hash_; + ConsumerSocket consumer_; + std::unique_ptr<HTTPSession> session_; + std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>> + request_buffer_queue_; + std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_; + std::size_t current_size_; +}; + +TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word) + : Receiver(), + listener_(thread_.getIoService(), port, + std::bind(&TcpReceiver::onNewConnection, this, + std::placeholders::_1)), + prefix_(prefix), + ipv6_first_word_(ipv6_first_word), + prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)), + forwarder_config_( + thread_.getIoService(), + [this](std::error_code ec) { + if (!ec) { + listener_.doAccept(); + for (int i = 0; i < 10; i++) { + http_clients_.emplace_back( + new HTTPClientConnectionCallback(*this, thread_)); + } + } + }), + stopped_(false) { + forwarder_config_.tryToConnectToForwarder(); +} + +void TcpReceiver::stop() { + thread_.add([this]() { + stopped_ = true; + + /* Stop the listener */ + listener_.stop(); + + /* Close connection with forwarder */ + forwarder_config_.close(); + + /* Stop the used http clients */ + for (auto& client : used_http_clients_) { + client->stop(); + } + + /* Delete unused clients */ + for (auto& client : http_clients_) { + delete client; + } + }); +} + +void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + if (stopped_) { + delete client; + return; + } + + http_clients_.emplace_front(client); + used_http_clients_.erase(client); +} + +void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { + if (http_clients_.size() == 0) { + // Create new HTTPClientConnectionCallback + TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); + http_clients_.emplace_back( + new HTTPClientConnectionCallback(*this, thread_)); + } + + // Get new HTTPClientConnectionCallback + HTTPClientConnectionCallback* c = http_clients_.front(); + http_clients_.pop_front(); + + // Set http session + c->setHttpSession(std::move(socket)); + + // Move it to used clients + used_http_clients_.insert(c); +} + +void HTTPProxy::setupSignalHandler() { + signals_.async_wait([this](const std::error_code& ec, int signal_number) { + if (!ec) { + TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + stop(); + } + }); +} + +void HTTPProxy::stop() { + for (auto& receiver : receivers_) { + receiver->stop(); + } + + for (auto& receiver : receivers_) { + receiver->stopAndJoinThread(); + } + + signals_.cancel(); +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { + for (uint16_t i = 0; i < n_thread; i++) { + // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); + receivers_.emplace_back(std::make_unique<TcpReceiver>( + params.tcp_listen_port, params.prefix, params.first_ipv6_word)); + } + + setupSignalHandler(); +} + +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { + for (uint16_t i = 0; i < n_thread; i++) { + receivers_.emplace_back(std::make_unique<IcnReceiver>( + params.prefix, params.first_ipv6_word, params.origin_address, + params.origin_port, params.cache_size, params.mtu, + params.content_lifetime, params.manifest)); + } + + setupSignalHandler(); +} + +} // namespace transport diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc index a9b889941..6b91c12c3 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,48 +13,95 @@ * limitations under the License. */ -#include "ATSConnector.h" -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> #include <hicn/transport/utils/log.h> + #include <iostream> namespace transport { -ATSConnector::ATSConnector(asio::io_service &io_service, - std::string &ip_address, std::string &port, - ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback) +HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool client) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), + reverse_(client), is_reconnection_(false), data_available_(false), content_length_(0), is_last_chunk_(false), chunked_(false), receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback) { + on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); state_ = ConnectorState::CONNECTING; + + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } + doConnect(); } -ATSConnector::~ATSConnector() {} +HTTPSession::HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool client) + : +#if ((ASIO_VERSION / 100 % 1000) < 12) + io_service_(socket.get_io_service()), +#else + io_service_((asio::io_context &)(socket.get_executor().context())), +#endif + socket_(std::move(socket)), + resolver_(io_service_), + timer_(io_service_), + reverse_(client), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); -void ATSConnector::send(const uint8_t *packet, std::size_t len, - ContentSentCallback &&content_sent) { - asio::async_write( - socket_, asio::buffer(packet, len), - [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { content_sent(); }); + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } + doReadHeader(); } -void ATSConnector::send(utils::MemBuf *buffer, - ContentSentCallback &&content_sent) { +HTTPSession::~HTTPSession() {} + +void HTTPSession::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { + io_service_.dispatch([this, packet, len, content_sent]() { + asio::async_write(socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { + if (!ec) { + content_sent(); + } + }); + }); +} + +void HTTPSession::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer), @@ -70,24 +117,25 @@ void ATSConnector::send(utils::MemBuf *buffer, }); } -void ATSConnector::close() { +void HTTPSession::close() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); // on_disconnect_callback_(); } } } -void ATSConnector::doWrite() { +void HTTPSession::doWrite() { auto &buffer = write_msgs_.front().first; asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent!"); + TRANSPORT_LOGD("Content successfully sent! %zu", + length); write_msgs_.front().second(); write_msgs_.pop_front(); if (!write_msgs_.empty()) { @@ -99,12 +147,14 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length) { +void HTTPSession::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !content_length_, false); + bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) + : !content_length_; + receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr); input_buffer_.consume(input_buffer_.size()); if (!content_length_) { @@ -117,7 +167,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { auto to_read = content_length_ >= buffer_size ? buffer_size : content_length_; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { @@ -126,8 +176,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { } } -void ATSConnector::doReadBody(std::size_t body_size, - std::size_t additional_bytes) { +void HTTPSession::doReadBody(std::size_t body_size, + std::size_t additional_bytes) { auto bytes_to_read = body_size > additional_bytes ? (body_size - additional_bytes) : 0; @@ -140,14 +190,16 @@ void ATSConnector::doReadBody(std::size_t body_size, if (to_read > 0) { content_length_ = bytes_to_read; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } else { - const uint8_t *buffer = - asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, - false); - input_buffer_.consume(body_size); + if (body_size) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false, nullptr); + input_buffer_.consume(body_size); + } if (!chunked_ || is_last_chunk_) { doReadHeader(); @@ -157,7 +209,7 @@ void ATSConnector::doReadBody(std::size_t body_size, } } -void ATSConnector::doReadChunkedHeader() { +void HTTPSession::doReadChunkedHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n", [this](std::error_code ec, std::size_t length) { @@ -176,14 +228,17 @@ void ATSConnector::doReadChunkedHeader() { }); } -void ATSConnector::doReadHeader() { +void HTTPSession::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - auto headers = HTTPMessageFastParser::getHeaders(buffer, length); + HTTPMessageFastParser::getHeaders(buffer, length, reverse_, + header_info_.get()); + + auto &headers = header_info_->headers; // Try to get content length, if available auto it = headers.find(HTTPMessageFastParser::content_length); @@ -199,7 +254,8 @@ void ATSConnector::doReadHeader() { } } - receive_callback_(buffer, length, !size && !chunked_, true); + receive_callback_(buffer, length, !size && !chunked_, true, + header_info_.get()); auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); @@ -215,23 +271,25 @@ void ATSConnector::doReadHeader() { }); } -void ATSConnector::tryReconnection() { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); - if (state_ == ConnectorState::CONNECTED) { - state_ = ConnectorState::CONNECTING; - is_reconnection_ = true; - io_service_.post([this]() { - if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); - }); +void HTTPSession::tryReconnection() { + if (on_connection_closed_callback_(socket_)) { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } } } -void ATSConnector::doConnect() { +void HTTPSession::doConnect() { asio::async_connect(socket_, endpoint_iterator_, [this](std::error_code ec, tcp::resolver::iterator) { if (!ec) { @@ -263,17 +321,17 @@ void ATSConnector::doConnect() { }); } -bool ATSConnector::checkConnected() { +bool HTTPSession::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void ATSConnector::startConnectionTimer() { +void HTTPSession::startConnectionTimer() { timer_.expires_from_now(std::chrono::seconds(10)); timer_.async_wait( - std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1)); + std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1)); } -void ATSConnector::handleDeadline(const std::error_code &ec) { +void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc index 24b0eb5dc..23e5b5623 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,9 +13,9 @@ * limitations under the License. */ -#include "IcnReceiver.h" -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_1x_message_fast_parser.h> +#include <hicn/http-proxy/icn_receiver.h> +#include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> #include <hicn/transport/utils/hash.h> @@ -26,56 +26,31 @@ namespace transport { -core::Prefix generatePrefix(const std::string& prefix_url, - std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::0"; - - return core::Prefix(stream.str(), 64); -} - AsyncConsumerProducer::AsyncConsumerProducer( - const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, std::string& first_ipv6_word, - unsigned long default_lifetime, bool manifest) - : prefix_(generatePrefix(prefix, first_ipv6_word)), + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, bool manifest) + : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), + io_service_(io_service), + external_io_service_(true), producer_socket_(), - ip_address_(ip_address), - port_(port), + ip_address_(origin_address), + port_(origin_port), cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - [this]() { + [this](asio::ip::tcp::socket& socket) -> bool { std::queue<interface::PublicationOptions> empty; std::swap(response_name_queue_, empty); + + return true; }), - default_content_lifetime_(default_lifetime) { + default_content_lifetime_(std::stoul(content_lifetime)) { int ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); @@ -98,15 +73,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( } producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); } void AsyncConsumerProducer::start() { @@ -116,7 +82,19 @@ void AsyncConsumerProducer::start() { void AsyncConsumerProducer::run() { start(); - io_service_.run(); + + if (!external_io_service_) { + io_service_.run(); + } +} + +void AsyncConsumerProducer::stop() { + io_service_.post([this]() { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); } void AsyncConsumerProducer::doReceive() { @@ -124,11 +102,13 @@ void AsyncConsumerProducer::doReceive() { interface::ProducerCallbacksOptions::CACHE_MISS, [this](interface::ProducerSocket& producer, interface::Interest& interest) { - // core::Name n(interest.getWritableName(), true); - io_service_.post(std::bind( - &AsyncConsumerProducer::manageIncomingInterest, this, - interest.getWritableName(), interest.acquireMemBufReference(), - interest.getPayload().release())); + if (interest.payloadSize() > 0) { + // Interest may contain http request + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + } }); producer_socket_.connect(); @@ -141,16 +121,15 @@ void AsyncConsumerProducer::manageIncomingInterest( auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); - std::cout << "Received interest " << seg << std::endl; - if (_it != _end) { if (_it->second.second) { - // Content is in production + TRANSPORT_LOGD( + "Content is in production, interests will be satisfied shortly."); return; } if (seg >= _it->second.first) { - TRANSPORT_LOGI( + TRANSPORT_LOGD( "Ignoring interest with name %s for a content object which does not " "exist. (Request: %u, max: %u)", name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); @@ -158,8 +137,6 @@ void AsyncConsumerProducer::manageIncomingInterest( } } - std::cout << "Received interest " << seg << std::endl; - bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); @@ -212,7 +189,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, } it->second.first += - producer_socket_.produce(name, data, size, is_last, start_suffix); + producer_socket_.produceStream(name, data, size, is_last, start_suffix); if (is_last) { it->second.second = false; |