From 5d8156ea4c34f9a3cb986da16a71faebfb2add6b Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Tue, 2 Jun 2020 18:52:39 +0200 Subject: [HICN-622] Add stop() functionality to http proxy. Signed-off-by: Mauro Sardara Change-Id: I9091cd8ef0f9da869b886541a0116adf3f30e6b9 Signed-off-by: Angelo Mantellini --- apps/http-proxy/CMakeLists.txt | 13 +- .../includes/hicn/http-proxy/CMakeLists.txt | 39 ++++ .../hicn/http-proxy/HTTP1.xMessageFastParser.h | 67 +++++++ .../includes/hicn/http-proxy/forwarder_config.h | 200 ++++++++++++++++++++ .../includes/hicn/http-proxy/forwarder_interface.h | 107 +++++++++++ .../includes/hicn/http-proxy/http_proxy.h | 204 +++++++++++++++++++++ .../includes/hicn/http-proxy/http_session.h | 136 ++++++++++++++ .../includes/hicn/http-proxy/icn_receiver.h | 98 ++++++++++ apps/http-proxy/includes/hicn/http-proxy/utils.h | 51 ++++++ apps/http-proxy/main.cc | 13 +- apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 4 +- apps/http-proxy/src/HTTP1.xMessageFastParser.h | 67 ------- apps/http-proxy/src/forwarder_config.h | 193 ------------------- apps/http-proxy/src/forwarder_interface.cc | 22 ++- apps/http-proxy/src/forwarder_interface.h | 105 ----------- apps/http-proxy/src/http_proxy.cc | 66 ++++++- apps/http-proxy/src/http_proxy.h | 188 ------------------- apps/http-proxy/src/http_session.cc | 3 +- apps/http-proxy/src/http_session.h | 136 -------------- apps/http-proxy/src/icn_receiver.cc | 25 ++- apps/http-proxy/src/icn_receiver.h | 96 ---------- apps/http-proxy/src/utils.h | 51 ------ libtransport/CMakeLists.txt | 3 - .../includes/hicn/transport/utils/event_thread.h | 8 +- 24 files changed, 1004 insertions(+), 891 deletions(-) create mode 100644 apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt create mode 100644 apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/http_proxy.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/http_session.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h create mode 100644 apps/http-proxy/includes/hicn/http-proxy/utils.h delete mode 100644 apps/http-proxy/src/HTTP1.xMessageFastParser.h delete mode 100644 apps/http-proxy/src/forwarder_config.h delete mode 100644 apps/http-proxy/src/forwarder_interface.h delete mode 100644 apps/http-proxy/src/http_proxy.h delete mode 100644 apps/http-proxy/src/http_session.h delete mode 100644 apps/http-proxy/src/icn_receiver.h delete mode 100644 apps/http-proxy/src/utils.h diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index 44f6deea6..d3155a0d4 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -32,19 +32,11 @@ set(LIB_SOURCE_FILES src/forwarder_interface.cc ) -set(LIB_SERVER_HEADER_FILES - src/icn_receiver.h - src/http_session.h - src/http_proxy.h - src/HTTP1.xMessageFastParser.h - src/forwarder_interface.h - src/forwarder_config.h -) - set(APP_SOURCE_FILES main.cc ) +add_subdirectory(includes/hicn/http-proxy) set(LIBHTTP_PROXY hicnhttpproxy) set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) @@ -57,7 +49,8 @@ build_library(${LIBHTTP_PROXY} SOURCES ${LIB_SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} DEPENDS ${DEPENDENCIES} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} + INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} DEFINITIONS ${COMPILER_DEFINITIONS} diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt new file mode 100644 index 000000000..5bc7d45fc --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_proxy.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_session.h + ${CMAKE_CURRENT_SOURCE_DIR}/HTTP1.xMessageFastParser.h + ${CMAKE_CURRENT_SOURCE_DIR}/icn_receiver.h + ${CMAKE_CURRENT_SOURCE_DIR}/utils.h +) + +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) + +set(LIBPROXY_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/../.. "" + CACHE INTERNAL + "" FORCE +) + +set(LIBPROXY_TO_INSTALL_HEADER_FILES + ${HEADER_FILES} "" + CACHE INTERNAL + "" FORCE +) + diff --git a/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h new file mode 100644 index 000000000..7c035c83b --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +using transport::http::HTTPHeaders; + +namespace transport { +struct Metadata; +} + +class HTTPMessageFastParser { + public: + static constexpr char http_ok[] = + "HTTP/1.1 200 OK\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n\r\n"; + + static constexpr char http_cors[] = + "HTTP/1.1 200 OK\r\n" + "Date: %s\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Access-Control-Allow-Methods: GET\r\n" + "Access-Control-Allow-Headers: hicn\r\n" + "Access-Control-Max-Age: 1800\r\n\r\n"; + + static constexpr char http_failed[] = + "HTTP/1.1 500 Internal Server Error\r\n" + "Date: %s\r\n" + "Content-Length: 0\r\nConnection: " + "close\r\n\r\n"; + + static void getHeaders(const uint8_t* headers, std::size_t length, + bool request, transport::Metadata* metadata); + static std::size_t hasBody(const uint8_t* headers, std::size_t length); + static bool isMpdRequest(const uint8_t* headers, std::size_t length); + static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); + + static std::string numbers; + static std::string content_length; + static std::string transfer_encoding; + static std::string chunked; + static std::string cache_control; + static std::string connection; + static std::string mpd; + static std::string separator; +}; diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h new file mode 100644 index 000000000..19c96a9e3 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "forwarder_interface.h" + + +#define RETRY_INTERVAL 300 + +namespace transport { + +static constexpr char server_header[] = "server"; +static constexpr char prefix_header[] = "prefix"; +static constexpr char port_header[] = "port"; + +using OnForwarderConfiguredCallback = std::function; + +class ForwarderConfig { + public: + using ListenerRetrievedCallback = std::function; + + template + ForwarderConfig(asio::io_service& io_service, Callback&& callback) + : forwarder_interface_(io_service), + resolver_(io_service), + retx_count_(0), + timer_(io_service), + hicn_listen_port_(~0), + listener_retrieved_callback_(std::forward(callback)) {} + + void close() { + timer_.cancel(); + resolver_.cancel(); + forwarder_interface_.close(); + } + + void tryToConnectToForwarder() { + doTryToConnectToForwarder(std::make_error_code(std::errc(0))); + } + + void doTryToConnectToForwarder(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.connectToForwarder(); + if (ret < 0) { + // We were not able to connect to the local forwarder. Do not give up + // and retry. + TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, + this, std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + doGetMainListener(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); + } + } + + void doGetMainListener(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.getMainListenerPort(); + if (ret <= 0) { + // Since without the main listener of the forwarder the proxy cannot + // work, we can stop the program here until we get the listener port. + TRANSPORT_LOGE( + "Could not retrieve main listener port from the forwarder. " + "Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, + std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + hicn_listen_port_ = uint16_t(ret); + listener_retrieved_callback_(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + } + } + + template + TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, + Callback&& callback) { + std::stringstream ss(header); + route_info_t* ret = new route_info_t(); + std::string port_string; + + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + + if (TRANSPORT_EXPECT_FALSE(substr.empty())) { + continue; + } + + utils::trim(substr); + auto it = std::find_if(substr.begin(), substr.end(), + [](int ch) { return ch == '='; }); + if (it != std::end(substr)) { + auto key = std::string(substr.begin(), it); + auto value = std::string(it + 1, substr.end()); + + if (key == server_header) { + ret->remote_addr = value; + } else if (key == prefix_header) { + auto it = std::find_if(value.begin(), value.end(), + [](int ch) { return ch == '/'; }); + + if (it != std::end(value)) { + ret->route_addr = std::string(value.begin(), it); + ret->route_len = std::stoul(std::string(it + 1, value.end())); + } else { + return false; + } + } else if (key == port_header) { + ret->remote_port = std::stoul(value); + port_string = value; + } else { + // Header not recognized + return false; + } + } + } + + /* + * Resolve server address + */ + auto results = + resolver_.resolve({ret->remote_addr, port_string, + asio::ip::resolver_query_base::numeric_service}); + +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::udp::resolver::iterator end; + auto& it = results; + while (it != end) { +#else + for (auto it = results.begin(); it != results.end(); it++) { +#endif + if (it->endpoint().address().is_v4()) { + // Use this v4 address to configure the forwarder. + ret->remote_addr = it->endpoint().address().to_string(); + ret->family = AF_INET; + std::string _prefix = ret->route_addr; + forwarder_interface_.createFaceAndRoute( + RouteInfoPtr(ret), [callback = std::forward(callback), + configured_prefix = std::move(_prefix)]( + uint32_t route_id, bool result) { + callback(result, configured_prefix); + }); + + return true; + } +#if ((ASIO_VERSION / 100 % 1000) < 12) + it++; +#endif + } + + return false; + } + + private: + ForwarderInterface forwarder_interface_; + asio::ip::udp::resolver resolver_; + std::uint32_t retx_count_; + asio::steady_timer timer_; + uint16_t hicn_listen_port_; + ListenerRetrievedCallback listener_retrieved_callback_; +}; // namespace transport + +} // namespace transport \ No newline at end of file diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h new file mode 100644 index 000000000..54941a4ba --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern "C" { +#include +#include +} + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE 1 +#endif +#include +#include +#include +#include +#include + +namespace transport { + +typedef std::function SetRouteCallback; + +struct route_info_t { + int family; + std::string remote_addr; + uint16_t remote_port; + std::string route_addr; + uint8_t route_len; +}; + +using RouteInfoPtr = std::shared_ptr; + +class ForwarderInterface { + public: + ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), + work_(std::make_unique(internal_ioservice_)), + sock_(nullptr), + thread_(std::make_unique( + [this]() { internal_ioservice_.run(); })), + check_routes_timer_(nullptr), + pending_add_route_counter_(0), + route_id_(0), + closed_(false) {} + + ~ForwarderInterface(); + + int connectToForwarder(); + + void removeConnectedUserNow(uint32_t route_id); + + // to be called at the server + // at the client this creates a race condition + // and the program enters in a loop + void scheduleRemoveConnectedUser(uint32_t route_id); + + template + void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { + internal_ioservice_.post([this, _route_info = std::move(route_info), + _callback = std::forward(callback)]() { + pending_add_route_counter_++; + uint8_t max_try = 5; + auto timer = new asio::steady_timer(internal_ioservice_); + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(_callback)); + }); + } + + int32_t getMainListenerPort(); + + void close(); + + private: + void internalRemoveConnectedUser(uint32_t route_id); + + void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback); + + int tryToCreateFaceAndRoute(route_info_t *route_info); + + asio::io_service &external_ioservice_; + asio::io_service internal_ioservice_; + std::unique_ptr work_; + hc_sock_t *sock_; + std::unique_ptr thread_; + std::unordered_map route_status_; + std::unique_ptr check_routes_timer_; + uint32_t pending_add_route_counter_; + uint32_t route_id_; + bool closed_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h new file mode 100644 index 000000000..6b5b1c3e3 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "forwarder_config.h" +#include "http_session.h" +#include "icn_receiver.h" + +#define ASIO_STANDALONE +#include +#include +#include + +class TcpListener { + public: + using AcceptCallback = std::function; + + TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + : acceptor_(io_service, + asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), +#if ((ASIO_VERSION / 100 % 1000) < 12) + socket_(io_service), +#endif + callback_(callback) { + } + + public: + void doAccept() { +#if ((ASIO_VERSION / 100 % 1000) >= 12) + acceptor_.async_accept( + [this](std::error_code ec, asio::ip::tcp::socket socket) { +#else + acceptor_.async_accept(socket_, [this](std::error_code ec) { + auto socket = std::move(socket_); +#endif + if (!ec) { + callback_(std::move(socket)); + doAccept(); + } + }); + } + + void stop() { acceptor_.close(); } + + asio::ip::tcp::acceptor acceptor_; +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::tcp::socket socket_; +#endif + AcceptCallback callback_; +}; + +namespace transport { + +class HTTPClientConnectionCallback; + +class Receiver { + public: + Receiver() : thread_() {} + virtual ~Receiver() = default; + void stopAndJoinThread() { thread_.stop(); } + virtual void stop() = 0; + + protected: + utils::EventThread thread_; +}; + +class TcpReceiver : public Receiver { + friend class HTTPClientConnectionCallback; + + public: + TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word); + + void stop() override; + + private: + void onNewConnection(asio::ip::tcp::socket&& socket); + void onClientDisconnect(HTTPClientConnectionCallback* client); + + template + void parseHicnHeader(std::string& hicn_header, Callback&& callback) { + forwarder_config_.parseHicnHeader(hicn_header, + std::forward(callback)); + } + + TcpListener listener_; + std::string prefix_; + std::string ipv6_first_word_; + std::string prefix_hash_; + std::deque http_clients_; + std::unordered_set used_http_clients_; + ForwarderConfig forwarder_config_; + bool stopped_; +}; + +class IcnReceiver : public Receiver { + public: + template + IcnReceiver(Args&&... args) + : Receiver(), + icn_consum_producer_(thread_.getIoService(), + std::forward(args)...) { + icn_consum_producer_.run(); + } + + void stop() override { + thread_.add([this]() { + /* Stop the listener */ + icn_consum_producer_.stop(); + }); + } + + private: + AsyncConsumerProducer icn_consum_producer_; +}; + +class HTTPProxy { + public: + enum Server { CREATE }; + enum Client { WRAP_BUFFER }; + + struct CommonParams { + std::string prefix; + std::string first_ipv6_word; + + virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + }; + + struct ClientParams : virtual CommonParams { + short tcp_listen_port; + void printParams() override { + std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "HTTP listen port: " << tcp_listen_port << std::endl; + std::cout << "\t" + << "Consumer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + } + }; + + struct ServerParams : virtual CommonParams { + std::string origin_address; + std::string origin_port; + std::string cache_size; + std::string mtu; + std::string content_lifetime; + bool manifest; + + void printParams() override { + std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "Origin address: " << origin_address << std::endl; + std::cout << "\t" + << "Origin port: " << origin_port << std::endl; + std::cout << "\t" + << "Producer cache size: " << cache_size << std::endl; + std::cout << "\t" + << "hICN MTU: " << mtu << std::endl; + std::cout << "\t" + << "Default content lifetime: " << content_lifetime + << std::endl; + std::cout << "\t" + << "Producer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + std::cout << "\t" + << "Use manifest: " << manifest << std::endl; + } + }; + + HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); + HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); + + void run() { main_io_context_.run(); } + void stop(); + + private: + void setupSignalHandler(); + + std::vector> receivers_; + asio::io_service main_io_context_; + asio::signal_set signals_; +}; + +} // namespace transport \ No newline at end of file diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_session.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h new file mode 100644 index 000000000..05fdf62fa --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "HTTP1.xMessageFastParser.h" + +#define ASIO_STANDALONE +#include +#include +#include + +namespace transport { + +using asio::ip::tcp; + +struct Metadata; + +typedef std::function + ContentReceivedCallback; +typedef std::function OnConnectionClosed; +typedef std::function ContentSentCallback; +typedef std::deque< + std::pair, ContentSentCallback>> + BufferQueue; + +struct Metadata { + std::string http_version; + HTTPHeaders headers; +}; + +struct RequestMetadata : Metadata { + std::string method; + std::string path; +}; + +struct ResponseMetadata : Metadata { + std::string status_code; + std::string status_string; +}; + +class HTTPClientConnectionCallback; + +class HTTPSession { + friend class HTTPClientConnectionCallback; + static constexpr uint32_t buffer_size = 1024 * 512; + + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + + public: + HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = false); + + HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = true); + + ~HTTPSession(); + + void send(const uint8_t *buffer, std::size_t len, + ContentSentCallback &&content_sent = 0); + + void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); + + void close(); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_size, std::size_t additional_bytes); + + void doReadChunkedHeader(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleRead(std::error_code ec, std::size_t length); + void tryReconnection(); + void startConnectionTimer(); + void handleDeadline(const std::error_code &ec); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + BufferQueue write_msgs_; + + asio::streambuf input_buffer_; + + bool reverse_; + bool is_reconnection_; + bool data_available_; + + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + + ContentReceivedCallback receive_callback_; + OnConnectionClosed on_connection_closed_callback_; + + // HTTP headers + std::unique_ptr header_info_; + + // Connector state + ConnectorState state_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h new file mode 100644 index 000000000..780037665 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +//#include "http_session.h" + +namespace transport { + +class AsyncConsumerProducer { + using SegmentProductionPair = std::pair; + using ResponseInfoMap = std::unordered_map; + using RequestQueue = std::queue; + + public: + explicit AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, + bool manifest); + + explicit AsyncConsumerProducer( + const std::string& prefix, const std::string& first_ipv6_word, + const std::string& origin_address, const std::string& origin_port, + const std::string& cache_size, const std::string& mtu, + const std::string& content_lifetime, bool manifest) + : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, + origin_address, origin_port, cache_size, mtu, + content_lifetime, manifest) { + external_io_service_ = false; + } + + void run(); + + void stop(); + + private: + void start(); + + void doSend(); + + void doReceive(); + + void publishContent(const uint8_t* data, std::size_t size, + bool is_last = true, bool headers = false); + + void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, + utils::MemBuf* payload); + + core::Prefix prefix_; + asio::io_service& io_service_; + asio::io_service internal_io_service_; + bool external_io_service_; + interface::ProducerSocket producer_socket_; + + std::string ip_address_; + std::string port_; + uint32_t cache_size_; + uint32_t mtu_; + + uint64_t request_counter_; + + // std::unordered_map> + // connection_map_; + HTTPSession connector_; + + unsigned long default_content_lifetime_; + + // ResponseInfoMap --> max_seq_number + bool indicating whether request is in + // production + ResponseInfoMap chunk_number_map_; + RequestQueue response_name_queue_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h new file mode 100644 index 000000000..d87c796d0 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#pragma once + +TRANSPORT_ALWAYS_INLINE std::string generatePrefix( + const std::string& prefix_url, const std::string& first_ipv6_word) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << first_ipv6_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::"; + + return stream.str(); +} \ No newline at end of file diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index 8d407ba4c..d5052ef11 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "src/http_proxy.h" +#include using namespace transport; @@ -41,13 +41,13 @@ struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { << "N Threads: " << n_thread << std::endl; } - HTTPProxy instantiateProxyAsValue() { + HTTPProxy* instantiateProxyAsValue() { if (client) { HTTPProxy::ClientParams* p = dynamic_cast(this); - return transport::HTTPProxy(*p, n_thread); + return new transport::HTTPProxy(*p, n_thread); } else if (server) { HTTPProxy::ServerParams* p = dynamic_cast(this); - return transport::HTTPProxy(*p, n_thread); + return new transport::HTTPProxy(*p, n_thread); } else { throw std::runtime_error( "Proxy configured as client and server at the same time."); @@ -134,8 +134,9 @@ int main(int argc, char** argv) { } params.printParams(); - transport::HTTPProxy proxy = params.instantiateProxyAsValue(); - proxy.run(); + auto proxy = params.instantiateProxyAsValue(); + proxy->run(); + delete proxy; return 0; } \ No newline at end of file diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index ea942a463..c22abdc90 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "HTTP1.xMessageFastParser.h" +#include #include #include @@ -22,7 +22,7 @@ #include #include -#include "http_session.h" +#include constexpr char HTTPMessageFastParser::http_ok[]; constexpr char HTTPMessageFastParser::http_cors[]; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h deleted file mode 100644 index 7c035c83b..000000000 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include - -using transport::http::HTTPHeaders; - -namespace transport { -struct Metadata; -} - -class HTTPMessageFastParser { - public: - static constexpr char http_ok[] = - "HTTP/1.1 200 OK\r\n" - "Access-Control-Allow-Origin: *\r\n" - "Connection: close\r\n" - "Content-Length: 0\r\n\r\n"; - - static constexpr char http_cors[] = - "HTTP/1.1 200 OK\r\n" - "Date: %s\r\n" - "Connection: close\r\n" - "Content-Length: 0\r\n" - "Access-Control-Allow-Origin: *\r\n" - "Access-Control-Allow-Methods: GET\r\n" - "Access-Control-Allow-Headers: hicn\r\n" - "Access-Control-Max-Age: 1800\r\n\r\n"; - - static constexpr char http_failed[] = - "HTTP/1.1 500 Internal Server Error\r\n" - "Date: %s\r\n" - "Content-Length: 0\r\nConnection: " - "close\r\n\r\n"; - - static void getHeaders(const uint8_t* headers, std::size_t length, - bool request, transport::Metadata* metadata); - static std::size_t hasBody(const uint8_t* headers, std::size_t length); - static bool isMpdRequest(const uint8_t* headers, std::size_t length); - static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - - static std::string numbers; - static std::string content_length; - static std::string transfer_encoding; - static std::string chunked; - static std::string cache_control; - static std::string connection; - static std::string mpd; - static std::string separator; -}; diff --git a/apps/http-proxy/src/forwarder_config.h b/apps/http-proxy/src/forwarder_config.h deleted file mode 100644 index 3d69c998e..000000000 --- a/apps/http-proxy/src/forwarder_config.h +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "forwarder_interface.h" - -#define RETRY_INTERVAL 300 - -namespace transport { - -static constexpr char server_header[] = "server"; -static constexpr char prefix_header[] = "prefix"; -static constexpr char port_header[] = "port"; - -using OnForwarderConfiguredCallback = std::function; - -class ForwarderConfig { - public: - using ListenerRetrievedCallback = std::function; - - template - ForwarderConfig(asio::io_service& io_service, Callback&& callback) - : forwarder_interface_(io_service), - resolver_(io_service), - retx_count_(0), - timer_(io_service), - hicn_listen_port_(~0), - listener_retrieved_callback_(std::forward(callback)) {} - - void tryToConnectToForwarder() { - doTryToConnectToForwarder(std::make_error_code(std::errc(0))); - } - - void doTryToConnectToForwarder(std::error_code ec) { - if (!ec) { - // ec == 0 --> timer expired - int ret = forwarder_interface_.connectToForwarder(); - if (ret < 0) { - // We were not able to connect to the local forwarder. Do not give up - // and retry. - TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, - this, std::placeholders::_1)); - } else { - timer_.cancel(); - retx_count_ = 0; - doGetMainListener(std::make_error_code(std::errc(0))); - } - } else { - TRANSPORT_LOGI("Timer for re-trying forwarder connection canceled."); - } - } - - void doGetMainListener(std::error_code ec) { - if (!ec) { - // ec == 0 --> timer expired - int ret = forwarder_interface_.getMainListenerPort(); - if (ret <= 0) { - // Since without the main listener of the forwarder the proxy cannot - // work, we can stop the program here until we get the listener port. - TRANSPORT_LOGE( - "Could not retrieve main listener port from the forwarder. " - "Retrying."); - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, - std::placeholders::_1)); - } else { - timer_.cancel(); - retx_count_ = 0; - hicn_listen_port_ = uint16_t(ret); - listener_retrieved_callback_(std::make_error_code(std::errc(0))); - } - } else { - TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); - } - } - - template - TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, - Callback&& callback) { - std::stringstream ss(header); - route_info_t* ret = new route_info_t(); - std::string port_string; - - while (ss.good()) { - std::string substr; - getline(ss, substr, ','); - - if (TRANSPORT_EXPECT_FALSE(substr.empty())) { - continue; - } - - utils::trim(substr); - auto it = std::find_if(substr.begin(), substr.end(), - [](int ch) { return ch == '='; }); - if (it != std::end(substr)) { - auto key = std::string(substr.begin(), it); - auto value = std::string(it + 1, substr.end()); - - if (key == server_header) { - ret->remote_addr = value; - } else if (key == prefix_header) { - auto it = std::find_if(value.begin(), value.end(), - [](int ch) { return ch == '/'; }); - - if (it != std::end(value)) { - ret->route_addr = std::string(value.begin(), it); - ret->route_len = std::stoul(std::string(it + 1, value.end())); - } else { - return false; - } - } else if (key == port_header) { - ret->remote_port = std::stoul(value); - port_string = value; - } else { - // Header not recognized - return false; - } - } - } - - /* - * Resolve server address - */ - auto results = - resolver_.resolve({ret->remote_addr, port_string, - asio::ip::resolver_query_base::numeric_service}); - -#if ((ASIO_VERSION / 100 % 1000) < 12) - asio::ip::udp::resolver::iterator end; - auto& it = results; - while (it != end) { -#else - for (auto it = results.begin(); it != results.end(); it++) { -#endif - if (it->endpoint().address().is_v4()) { - // Use this v4 address to configure the forwarder. - ret->remote_addr = it->endpoint().address().to_string(); - ret->family = AF_INET; - std::string _prefix = ret->route_addr; - forwarder_interface_.createFaceAndRoute( - RouteInfoPtr(ret), [callback = std::forward(callback), - configured_prefix = std::move(_prefix)]( - uint32_t route_id, bool result) { - callback(result, configured_prefix); - }); - - return true; - } -#if ((ASIO_VERSION / 100 % 1000) < 12) - it++; -#endif - } - - return false; - } - - private: - ForwarderInterface forwarder_interface_; - asio::ip::udp::resolver resolver_; - std::uint32_t retx_count_; - asio::steady_timer timer_; - uint16_t hicn_listen_port_; - ListenerRetrievedCallback listener_retrieved_callback_; -}; // namespace transport - -} // namespace transport \ No newline at end of file diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index 105d5a8e9..d80939b8b 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "forwarder_interface.h" +#include #include #include @@ -41,16 +41,18 @@ int ForwarderInterface::connectToForwarder() { } void ForwarderInterface::close() { - internal_ioservice_.post([this]() { - work_.reset(); - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - }); + if (!closed_) { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); - if (thread_->joinable()) { - thread_->join(); + if (thread_->joinable()) { + thread_->join(); + } } } diff --git a/apps/http-proxy/src/forwarder_interface.h b/apps/http-proxy/src/forwarder_interface.h deleted file mode 100644 index 116b09a07..000000000 --- a/apps/http-proxy/src/forwarder_interface.h +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -extern "C" { -#include -#include -} - -#ifndef ASIO_STANDALONE -#define ASIO_STANDALONE 1 -#endif -#include -#include -#include -#include -#include - -namespace transport { - -typedef std::function SetRouteCallback; - -struct route_info_t { - int family; - std::string remote_addr; - uint16_t remote_port; - std::string route_addr; - uint8_t route_len; -}; - -using RouteInfoPtr = std::shared_ptr; - -class ForwarderInterface { - public: - ForwarderInterface(asio::io_service &io_service) - : external_ioservice_(io_service), - work_(std::make_unique(internal_ioservice_)), - sock_(nullptr), - thread_(std::make_unique( - [this]() { internal_ioservice_.run(); })), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - route_id_(0) {} - - ~ForwarderInterface(); - - int connectToForwarder(); - - void removeConnectedUserNow(uint32_t route_id); - - // to be called at the server - // at the client this creates a race condition - // and the program enters in a loop - void scheduleRemoveConnectedUser(uint32_t route_id); - - template - void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { - internal_ioservice_.post([this, _route_info = std::move(route_info), - _callback = std::forward(callback)]() { - pending_add_route_counter_++; - uint8_t max_try = 5; - auto timer = new asio::steady_timer(internal_ioservice_); - internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, - std::move(_callback)); - }); - } - - int32_t getMainListenerPort(); - - void close(); - - private: - void internalRemoveConnectedUser(uint32_t route_id); - - void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, - asio::steady_timer *timer, - SetRouteCallback callback); - - int tryToCreateFaceAndRoute(route_info_t *route_info); - - asio::io_service &external_ioservice_; - asio::io_service internal_ioservice_; - std::unique_ptr work_; - hc_sock_t *sock_; - std::unique_ptr thread_; - std::unordered_map route_status_; - std::unique_ptr check_routes_timer_; - uint32_t pending_add_route_counter_; - uint32_t route_id_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 1e6dcd88f..4afa5b832 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -13,13 +13,14 @@ * limitations under the License. */ -#include "http_proxy.h" +#include +#include #include #include #include -#include "utils.h" +#include namespace transport { @@ -54,6 +55,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { consumer_.connect(); } + void stop() { session_->close(); } + void setHttpSession(asio::ip::tcp::socket&& socket) { session_ = std::make_unique( std::move(socket), @@ -271,11 +274,39 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, new HTTPClientConnectionCallback(*this, thread_)); } } - }) { + }), + stopped_(false) { forwarder_config_.tryToConnectToForwarder(); } +void TcpReceiver::stop() { + thread_.add([this](){ + stopped_ = true; + + /* Stop the listener */ + listener_.stop(); + + /* Close connection with forwarder */ + forwarder_config_.close(); + + /* Stop the used http clients */ + for (auto& client : used_http_clients_) { + client->stop(); + } + + /* Delete unused clients */ + for (auto& client : http_clients_) { + delete client; + } + }); +} + void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + if (stopped_) { + delete client; + return; + } + http_clients_.emplace_front(client); used_http_clients_.erase(client); } @@ -299,21 +330,46 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { used_http_clients_.insert(c); } -HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) { +void HTTPProxy::setupSignalHandler() { + signals_.async_wait([this](const std::error_code& ec, int signal_number) { + if (!ec) { + TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + stop(); + } + }); +} + +void HTTPProxy::stop() { + for (auto& receiver : receivers_) { + receiver->stop(); + } + + for (auto& receiver : receivers_) { + receiver->stopAndJoinThread(); + } +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { // icn_receivers_.emplace_back(std::make_unique(icn_params)); receivers_.emplace_back(std::make_unique( params.tcp_listen_port, params.prefix, params.first_ipv6_word)); } + + setupSignalHandler(); } -HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) { +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { receivers_.emplace_back(std::make_unique( params.prefix, params.first_ipv6_word, params.origin_address, params.origin_port, params.cache_size, params.mtu, params.content_lifetime, params.manifest)); } + + setupSignalHandler(); } } // namespace transport diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h deleted file mode 100644 index c3f183af2..000000000 --- a/apps/http-proxy/src/http_proxy.h +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include "forwarder_config.h" -#include "http_session.h" -#include "icn_receiver.h" - -#define ASIO_STANDALONE -#include -#include -#include - -class TcpListener { - public: - using AcceptCallback = std::function; - - TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) - : acceptor_(io_service, - asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), -#if ((ASIO_VERSION / 100 % 1000) < 12) - socket_(io_service), -#endif - callback_(callback) { - } - - public: - void doAccept() { -#if ((ASIO_VERSION / 100 % 1000) >= 12) - acceptor_.async_accept( - [this](std::error_code ec, asio::ip::tcp::socket socket) { -#else - acceptor_.async_accept(socket_, [this](std::error_code ec) { - auto socket = std::move(socket_); -#endif - if (!ec) { - callback_(std::move(socket)); - } - - doAccept(); - }); - } - - asio::ip::tcp::acceptor acceptor_; -#if ((ASIO_VERSION / 100 % 1000) < 12) - asio::ip::tcp::socket socket_; -#endif - AcceptCallback callback_; -}; - -namespace transport { - -class HTTPClientConnectionCallback; - -class Receiver { - public: - Receiver() : thread_() {} - - protected: - utils::EventThread thread_; -}; - -class TcpReceiver : public Receiver { - friend class HTTPClientConnectionCallback; - - public: - TcpReceiver(std::uint16_t port, const std::string& prefix, - const std::string& ipv6_first_word); - - private: - void onNewConnection(asio::ip::tcp::socket&& socket); - void onClientDisconnect(HTTPClientConnectionCallback* client); - - template - void parseHicnHeader(std::string& hicn_header, Callback&& callback) { - forwarder_config_.parseHicnHeader(hicn_header, - std::forward(callback)); - } - - TcpListener listener_; - std::string prefix_; - std::string ipv6_first_word_; - std::string prefix_hash_; - std::deque http_clients_; - std::unordered_set used_http_clients_; - ForwarderConfig forwarder_config_; -}; - -class IcnReceiver : public Receiver { - public: - template - IcnReceiver(Args&&... args) - : Receiver(), - icn_consum_producer_(thread_.getIoService(), - std::forward(args)...) { - icn_consum_producer_.run(); - } - - private: - AsyncConsumerProducer icn_consum_producer_; -}; - -class HTTPProxy { - public: - enum Server { CREATE }; - enum Client { WRAP_BUFFER }; - - struct CommonParams { - std::string prefix; - std::string first_ipv6_word; - - virtual void printParams() { std::cout << "Parameters: " << std::endl; }; - }; - - struct ClientParams : virtual CommonParams { - short tcp_listen_port; - void printParams() override { - std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; - CommonParams::printParams(); - std::cout << "\t" - << "HTTP listen port: " << tcp_listen_port << std::endl; - std::cout << "\t" - << "Consumer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; - } - }; - - struct ServerParams : virtual CommonParams { - std::string origin_address; - std::string origin_port; - std::string cache_size; - std::string mtu; - std::string content_lifetime; - bool manifest; - - void printParams() override { - std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; - CommonParams::printParams(); - std::cout << "\t" - << "Origin address: " << origin_address << std::endl; - std::cout << "\t" - << "Origin port: " << origin_port << std::endl; - std::cout << "\t" - << "Producer cache size: " << cache_size << std::endl; - std::cout << "\t" - << "hICN MTU: " << mtu << std::endl; - std::cout << "\t" - << "Default content lifetime: " << content_lifetime - << std::endl; - std::cout << "\t" - << "Producer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; - std::cout << "\t" - << "Use manifest: " << manifest << std::endl; - } - }; - - HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); - HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); - - void run() { sleep(1000000); } - - private: - void acceptTCPClient(asio::ip::tcp::socket&& socket); - - private: - std::vector> receivers_; -}; - -} // namespace transport \ No newline at end of file diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 760539fe0..6b91c12c3 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,8 +13,7 @@ * limitations under the License. */ -#include "http_session.h" - +#include #include #include diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/src/http_session.h deleted file mode 100644 index 05fdf62fa..000000000 --- a/apps/http-proxy/src/http_session.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include "HTTP1.xMessageFastParser.h" - -#define ASIO_STANDALONE -#include -#include -#include - -namespace transport { - -using asio::ip::tcp; - -struct Metadata; - -typedef std::function - ContentReceivedCallback; -typedef std::function OnConnectionClosed; -typedef std::function ContentSentCallback; -typedef std::deque< - std::pair, ContentSentCallback>> - BufferQueue; - -struct Metadata { - std::string http_version; - HTTPHeaders headers; -}; - -struct RequestMetadata : Metadata { - std::string method; - std::string path; -}; - -struct ResponseMetadata : Metadata { - std::string status_code; - std::string status_string; -}; - -class HTTPClientConnectionCallback; - -class HTTPSession { - friend class HTTPClientConnectionCallback; - static constexpr uint32_t buffer_size = 1024 * 512; - - enum class ConnectorState { - CLOSED, - CONNECTING, - CONNECTED, - }; - - public: - HTTPSession(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = false); - - HTTPSession(asio::ip::tcp::socket socket, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = true); - - ~HTTPSession(); - - void send(const uint8_t *buffer, std::size_t len, - ContentSentCallback &&content_sent = 0); - - void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); - - void close(); - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_size, std::size_t additional_bytes); - - void doReadChunkedHeader(); - - void doWrite(); - - bool checkConnected(); - - private: - void handleRead(std::error_code ec, std::size_t length); - void tryReconnection(); - void startConnectionTimer(); - void handleDeadline(const std::error_code &ec); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - BufferQueue write_msgs_; - - asio::streambuf input_buffer_; - - bool reverse_; - bool is_reconnection_; - bool data_available_; - - std::size_t content_length_; - - // Chunked encoding - bool is_last_chunk_; - bool chunked_; - - ContentReceivedCallback receive_callback_; - OnConnectionClosed on_connection_closed_callback_; - - // HTTP headers - std::unique_ptr header_info_; - - // Connector state - ConnectorState state_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 6ccd2dc31..34b85f9c9 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "icn_receiver.h" +#include #include #include @@ -23,8 +23,8 @@ #include #include -#include "HTTP1.xMessageFastParser.h" -#include "utils.h" +#include +#include namespace transport { @@ -42,7 +42,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, @@ -76,15 +75,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( } producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); } void AsyncConsumerProducer::start() { @@ -100,6 +90,15 @@ void AsyncConsumerProducer::run() { } } +void AsyncConsumerProducer::stop() { + io_service_.post([this]() { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + void AsyncConsumerProducer::doReceive() { producer_socket_.setSocketOption( interface::ProducerCallbacksOptions::CACHE_MISS, diff --git a/apps/http-proxy/src/icn_receiver.h b/apps/http-proxy/src/icn_receiver.h deleted file mode 100644 index 31e9ae932..000000000 --- a/apps/http-proxy/src/icn_receiver.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "http_session.h" - -namespace transport { - -class AsyncConsumerProducer { - using SegmentProductionPair = std::pair; - using ResponseInfoMap = std::unordered_map; - using RequestQueue = std::queue; - - public: - explicit AsyncConsumerProducer( - asio::io_service& io_service, const std::string& prefix, - const std::string& first_ipv6_word, const std::string& origin_address, - const std::string& origin_port, const std::string& cache_size, - const std::string& mtu, const std::string& content_lifetime, - bool manifest); - - explicit AsyncConsumerProducer( - const std::string& prefix, const std::string& first_ipv6_word, - const std::string& origin_address, const std::string& origin_port, - const std::string& cache_size, const std::string& mtu, - const std::string& content_lifetime, bool manifest) - : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, - origin_address, origin_port, cache_size, mtu, - content_lifetime, manifest) { - external_io_service_ = false; - } - - void run(); - - private: - void start(); - - void doSend(); - - void doReceive(); - - void publishContent(const uint8_t* data, std::size_t size, - bool is_last = true, bool headers = false); - - void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, - utils::MemBuf* payload); - - core::Prefix prefix_; - asio::io_service& io_service_; - asio::io_service internal_io_service_; - bool external_io_service_; - interface::ProducerSocket producer_socket_; - - std::string ip_address_; - std::string port_; - uint32_t cache_size_; - uint32_t mtu_; - - uint64_t request_counter_; - asio::signal_set signals_; - - // std::unordered_map> - // connection_map_; - HTTPSession connector_; - - unsigned long default_content_lifetime_; - - // ResponseInfoMap --> max_seq_number + bool indicating whether request is in - // production - ResponseInfoMap chunk_number_map_; - RequestQueue response_name_queue_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/src/utils.h deleted file mode 100644 index d87c796d0..000000000 --- a/apps/http-proxy/src/utils.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -#pragma once - -TRANSPORT_ALWAYS_INLINE std::string generatePrefix( - const std::string& prefix_url, const std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::"; - - return stream.str(); -} \ No newline at end of file diff --git a/libtransport/CMakeLists.txt b/libtransport/CMakeLists.txt index c431ace04..67492cb11 100644 --- a/libtransport/CMakeLists.txt +++ b/libtransport/CMakeLists.txt @@ -52,9 +52,6 @@ set(TRANSPORT_HTTP ${TRANSPORT_ROOT_PATH}/http) set(TRANSPORT_PORTABILITY ${TRANSPORT_ROOT_PATH}/portability) set(TRANSPORT_INTERFACES ${TRANSPORT_ROOT_PATH}/interfaces) -# Install includes -set(INSTALL_INCLUDE_DIR include/hicn/transport) - set(LIBTRANSPORT hicntransport) if ((BUILD_HICNPLUGIN OR BUILD_MEMIF_CONNECTOR) AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") set(__vpp__ 1) diff --git a/libtransport/includes/hicn/transport/utils/event_thread.h b/libtransport/includes/hicn/transport/utils/event_thread.h index db1194821..702c98f8d 100644 --- a/libtransport/includes/hicn/transport/utils/event_thread.h +++ b/libtransport/includes/hicn/transport/utils/event_thread.h @@ -34,7 +34,7 @@ class EventThread { explicit EventThread(asio::io_service& io_service) : internal_io_service_(nullptr), io_service_(io_service), - work_(io_service_), + work_(std::make_unique(io_service_)), thread_(nullptr) { run(); } @@ -42,7 +42,7 @@ class EventThread { explicit EventThread() : internal_io_service_(std::make_unique()), io_service_(*internal_io_service_), - work_(io_service_), + work_(std::make_unique(io_service_)), thread_(nullptr) { run(); } @@ -78,7 +78,7 @@ class EventThread { } void stop() { - io_service_.stop(); + work_.reset(); if (thread_ && thread_->joinable()) { thread_->join(); @@ -94,7 +94,7 @@ class EventThread { private: std::unique_ptr internal_io_service_; asio::io_service& io_service_; - asio::io_service::work work_; + std::unique_ptr work_; std::unique_ptr thread_; }; -- cgit 1.2.3-korg