diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-06-02 18:52:39 +0200 |
---|---|---|
committer | Angelo Mantellini <angelo.mantellini@cisco.com> | 2020-06-03 16:21:49 +0200 |
commit | 5d8156ea4c34f9a3cb986da16a71faebfb2add6b (patch) | |
tree | 5895f7546c91eab1c6cad917f3a41594a543ca64 /apps | |
parent | 15458966a342caa0912b7806a755d0d8277ca00f (diff) |
[HICN-622] Add stop() functionality to http proxy.
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Change-Id: I9091cd8ef0f9da869b886541a0116adf3f30e6b9
Signed-off-by: Angelo Mantellini <angelo.mantellini@cisco.com>
Diffstat (limited to 'apps')
15 files changed, 173 insertions, 57 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index 44f6deea6..d3155a0d4 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -32,19 +32,11 @@ set(LIB_SOURCE_FILES src/forwarder_interface.cc ) -set(LIB_SERVER_HEADER_FILES - src/icn_receiver.h - src/http_session.h - src/http_proxy.h - src/HTTP1.xMessageFastParser.h - src/forwarder_interface.h - src/forwarder_config.h -) - set(APP_SOURCE_FILES main.cc ) +add_subdirectory(includes/hicn/http-proxy) set(LIBHTTP_PROXY hicnhttpproxy) set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) @@ -57,7 +49,8 @@ build_library(${LIBHTTP_PROXY} SOURCES ${LIB_SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} DEPENDS ${DEPENDENCIES} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} + INSTALL_HEADERS ${LIBPROXY_TO_INSTALL_HEADER_FILES} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} ${LIBPROXY_INCLUDE_DIRS} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} DEFINITIONS ${COMPILER_DEFINITIONS} diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt new file mode 100644 index 000000000..5bc7d45fc --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_proxy.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_session.h + ${CMAKE_CURRENT_SOURCE_DIR}/HTTP1.xMessageFastParser.h + ${CMAKE_CURRENT_SOURCE_DIR}/icn_receiver.h + ${CMAKE_CURRENT_SOURCE_DIR}/utils.h +) + +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) + +set(LIBPROXY_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/../.. "" + CACHE INTERNAL + "" FORCE +) + +set(LIBPROXY_TO_INSTALL_HEADER_FILES + ${HEADER_FILES} "" + CACHE INTERNAL + "" FORCE +) + diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h index 7c035c83b..7c035c83b 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h diff --git a/apps/http-proxy/src/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h index 3d69c998e..19c96a9e3 100644 --- a/apps/http-proxy/src/forwarder_config.h +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -27,6 +27,7 @@ #include "forwarder_interface.h" + #define RETRY_INTERVAL 300 namespace transport { @@ -50,6 +51,12 @@ class ForwarderConfig { hicn_listen_port_(~0), listener_retrieved_callback_(std::forward<Callback>(callback)) {} + void close() { + timer_.cancel(); + resolver_.cancel(); + forwarder_interface_.close(); + } + void tryToConnectToForwarder() { doTryToConnectToForwarder(std::make_error_code(std::errc(0))); } @@ -72,7 +79,7 @@ class ForwarderConfig { doGetMainListener(std::make_error_code(std::errc(0))); } } else { - TRANSPORT_LOGI("Timer for re-trying forwarder connection canceled."); + TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); } } diff --git a/apps/http-proxy/src/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h index 116b09a07..54941a4ba 100644 --- a/apps/http-proxy/src/forwarder_interface.h +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h @@ -53,7 +53,8 @@ class ForwarderInterface { [this]() { internal_ioservice_.run(); })), check_routes_timer_(nullptr), pending_add_route_counter_(0), - route_id_(0) {} + route_id_(0), + closed_(false) {} ~ForwarderInterface(); @@ -100,6 +101,7 @@ class ForwarderInterface { std::unique_ptr<asio::steady_timer> check_routes_timer_; uint32_t pending_add_route_counter_; uint32_t route_id_; + bool closed_; }; } // namespace transport diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h index c3f183af2..6b5b1c3e3 100644 --- a/apps/http-proxy/src/http_proxy.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h @@ -51,12 +51,13 @@ class TcpListener { #endif if (!ec) { callback_(std::move(socket)); + doAccept(); } - - doAccept(); }); } + void stop() { acceptor_.close(); } + asio::ip::tcp::acceptor acceptor_; #if ((ASIO_VERSION / 100 % 1000) < 12) asio::ip::tcp::socket socket_; @@ -71,6 +72,9 @@ class HTTPClientConnectionCallback; class Receiver { public: Receiver() : thread_() {} + virtual ~Receiver() = default; + void stopAndJoinThread() { thread_.stop(); } + virtual void stop() = 0; protected: utils::EventThread thread_; @@ -83,6 +87,8 @@ class TcpReceiver : public Receiver { TcpReceiver(std::uint16_t port, const std::string& prefix, const std::string& ipv6_first_word); + void stop() override; + private: void onNewConnection(asio::ip::tcp::socket&& socket); void onClientDisconnect(HTTPClientConnectionCallback* client); @@ -100,6 +106,7 @@ class TcpReceiver : public Receiver { std::deque<HTTPClientConnectionCallback*> http_clients_; std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; ForwarderConfig forwarder_config_; + bool stopped_; }; class IcnReceiver : public Receiver { @@ -112,6 +119,13 @@ class IcnReceiver : public Receiver { icn_consum_producer_.run(); } + void stop() override { + thread_.add([this]() { + /* Stop the listener */ + icn_consum_producer_.stop(); + }); + } + private: AsyncConsumerProducer icn_consum_producer_; }; @@ -176,13 +190,15 @@ class HTTPProxy { HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); - void run() { sleep(1000000); } + void run() { main_io_context_.run(); } + void stop(); private: - void acceptTCPClient(asio::ip::tcp::socket&& socket); + void setupSignalHandler(); - private: std::vector<std::unique_ptr<Receiver>> receivers_; + asio::io_service main_io_context_; + asio::signal_set signals_; }; } // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h index 05fdf62fa..05fdf62fa 100644 --- a/apps/http-proxy/src/http_session.h +++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h diff --git a/apps/http-proxy/src/icn_receiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h index 31e9ae932..780037665 100644 --- a/apps/http-proxy/src/icn_receiver.h +++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h @@ -24,7 +24,8 @@ #include <queue> #include <utility> -#include "http_session.h" +#include <hicn/http-proxy/http_session.h> +//#include "http_session.h" namespace transport { @@ -54,6 +55,8 @@ class AsyncConsumerProducer { void run(); + void stop(); + private: void start(); @@ -79,7 +82,6 @@ class AsyncConsumerProducer { uint32_t mtu_; uint64_t request_counter_; - asio::signal_set signals_; // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> // connection_map_; diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h index d87c796d0..d87c796d0 100644 --- a/apps/http-proxy/src/utils.h +++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index 8d407ba4c..d5052ef11 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "src/http_proxy.h" +#include <hicn/http-proxy/http_proxy.h> using namespace transport; @@ -41,13 +41,13 @@ struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { << "N Threads: " << n_thread << std::endl; } - HTTPProxy instantiateProxyAsValue() { + HTTPProxy* instantiateProxyAsValue() { if (client) { HTTPProxy::ClientParams* p = dynamic_cast<HTTPProxy::ClientParams*>(this); - return transport::HTTPProxy(*p, n_thread); + return new transport::HTTPProxy(*p, n_thread); } else if (server) { HTTPProxy::ServerParams* p = dynamic_cast<HTTPProxy::ServerParams*>(this); - return transport::HTTPProxy(*p, n_thread); + return new transport::HTTPProxy(*p, n_thread); } else { throw std::runtime_error( "Proxy configured as client and server at the same time."); @@ -134,8 +134,9 @@ int main(int argc, char** argv) { } params.printParams(); - transport::HTTPProxy proxy = params.instantiateProxyAsValue(); - proxy.run(); + auto proxy = params.instantiateProxyAsValue(); + proxy->run(); + delete proxy; return 0; }
\ No newline at end of file diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index ea942a463..c22abdc90 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "HTTP1.xMessageFastParser.h" +#include <hicn/http-proxy/HTTP1.xMessageFastParser.h> #include <hicn/transport/http/request.h> #include <hicn/transport/http/response.h> @@ -22,7 +22,7 @@ #include <experimental/functional> #include <iostream> -#include "http_session.h" +#include <hicn/http-proxy/http_session.h> constexpr char HTTPMessageFastParser::http_ok[]; constexpr char HTTPMessageFastParser::http_cors[]; diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index 105d5a8e9..d80939b8b 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "forwarder_interface.h" +#include <hicn/http-proxy/forwarder_interface.h> #include <arpa/inet.h> #include <hicn/transport/utils/log.h> @@ -41,16 +41,18 @@ int ForwarderInterface::connectToForwarder() { } void ForwarderInterface::close() { - internal_ioservice_.post([this]() { - work_.reset(); - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - }); + if (!closed_) { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); - if (thread_->joinable()) { - thread_->join(); + if (thread_->joinable()) { + thread_->join(); + } } } diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 1e6dcd88f..4afa5b832 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -13,13 +13,14 @@ * limitations under the License. */ -#include "http_proxy.h" +#include <hicn/http-proxy/http_proxy.h> +#include <hicn/http-proxy/http_session.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/utils/log.h> #include <hicn/transport/utils/string_utils.h> -#include "utils.h" +#include <hicn/http-proxy/utils.h> namespace transport { @@ -54,6 +55,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { consumer_.connect(); } + void stop() { session_->close(); } + void setHttpSession(asio::ip::tcp::socket&& socket) { session_ = std::make_unique<HTTPSession>( std::move(socket), @@ -271,11 +274,39 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, new HTTPClientConnectionCallback(*this, thread_)); } } - }) { + }), + stopped_(false) { forwarder_config_.tryToConnectToForwarder(); } +void TcpReceiver::stop() { + thread_.add([this](){ + stopped_ = true; + + /* Stop the listener */ + listener_.stop(); + + /* Close connection with forwarder */ + forwarder_config_.close(); + + /* Stop the used http clients */ + for (auto& client : used_http_clients_) { + client->stop(); + } + + /* Delete unused clients */ + for (auto& client : http_clients_) { + delete client; + } + }); +} + void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + if (stopped_) { + delete client; + return; + } + http_clients_.emplace_front(client); used_http_clients_.erase(client); } @@ -299,21 +330,46 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { used_http_clients_.insert(c); } -HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) { +void HTTPProxy::setupSignalHandler() { + signals_.async_wait([this](const std::error_code& ec, int signal_number) { + if (!ec) { + TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + stop(); + } + }); +} + +void HTTPProxy::stop() { + for (auto& receiver : receivers_) { + receiver->stop(); + } + + for (auto& receiver : receivers_) { + receiver->stopAndJoinThread(); + } +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); receivers_.emplace_back(std::make_unique<TcpReceiver>( params.tcp_listen_port, params.prefix, params.first_ipv6_word)); } + + setupSignalHandler(); } -HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) { +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { receivers_.emplace_back(std::make_unique<IcnReceiver>( params.prefix, params.first_ipv6_word, params.origin_address, params.origin_port, params.cache_size, params.mtu, params.content_lifetime, params.manifest)); } + + setupSignalHandler(); } } // namespace transport diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 760539fe0..6b91c12c3 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,8 +13,7 @@ * limitations under the License. */ -#include "http_session.h" - +#include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> #include <hicn/transport/utils/log.h> diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 6ccd2dc31..34b85f9c9 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "icn_receiver.h" +#include <hicn/http-proxy/icn_receiver.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> @@ -23,8 +23,8 @@ #include <functional> #include <memory> -#include "HTTP1.xMessageFastParser.h" -#include "utils.h" +#include <hicn/http-proxy/HTTP1.xMessageFastParser.h> +#include <hicn/http-proxy/utils.h> namespace transport { @@ -42,7 +42,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, @@ -76,15 +75,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( } producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); } void AsyncConsumerProducer::start() { @@ -100,6 +90,15 @@ void AsyncConsumerProducer::run() { } } +void AsyncConsumerProducer::stop() { + io_service_.post([this]() { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + void AsyncConsumerProducer::doReceive() { producer_socket_.setSocketOption( interface::ProducerCallbacksOptions::CACHE_MISS, |