diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-06-02 18:52:39 +0200 |
---|---|---|
committer | Angelo Mantellini <angelo.mantellini@cisco.com> | 2020-06-03 16:21:49 +0200 |
commit | 5d8156ea4c34f9a3cb986da16a71faebfb2add6b (patch) | |
tree | 5895f7546c91eab1c6cad917f3a41594a543ca64 /apps/http-proxy/src | |
parent | 15458966a342caa0912b7806a755d0d8277ca00f (diff) |
[HICN-622] Add stop() functionality to http proxy.
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Change-Id: I9091cd8ef0f9da869b886541a0116adf3f30e6b9
Signed-off-by: Angelo Mantellini <angelo.mantellini@cisco.com>
Diffstat (limited to 'apps/http-proxy/src')
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 4 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 67 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_config.h | 193 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.cc | 22 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.h | 105 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.cc | 66 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.h | 188 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.cc | 3 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.h | 136 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc | 25 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.h | 96 | ||||
-rw-r--r-- | apps/http-proxy/src/utils.h | 51 |
12 files changed, 88 insertions, 868 deletions
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index ea942a463..c22abdc90 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "HTTP1.xMessageFastParser.h" +#include <hicn/http-proxy/HTTP1.xMessageFastParser.h> #include <hicn/transport/http/request.h> #include <hicn/transport/http/response.h> @@ -22,7 +22,7 @@ #include <experimental/functional> #include <iostream> -#include "http_session.h" +#include <hicn/http-proxy/http_session.h> constexpr char HTTPMessageFastParser::http_ok[]; constexpr char HTTPMessageFastParser::http_cors[]; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h deleted file mode 100644 index 7c035c83b..000000000 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/http/message.h> - -#include <algorithm> -#include <string> - -using transport::http::HTTPHeaders; - -namespace transport { -struct Metadata; -} - -class HTTPMessageFastParser { - public: - static constexpr char http_ok[] = - "HTTP/1.1 200 OK\r\n" - "Access-Control-Allow-Origin: *\r\n" - "Connection: close\r\n" - "Content-Length: 0\r\n\r\n"; - - static constexpr char http_cors[] = - "HTTP/1.1 200 OK\r\n" - "Date: %s\r\n" - "Connection: close\r\n" - "Content-Length: 0\r\n" - "Access-Control-Allow-Origin: *\r\n" - "Access-Control-Allow-Methods: GET\r\n" - "Access-Control-Allow-Headers: hicn\r\n" - "Access-Control-Max-Age: 1800\r\n\r\n"; - - static constexpr char http_failed[] = - "HTTP/1.1 500 Internal Server Error\r\n" - "Date: %s\r\n" - "Content-Length: 0\r\nConnection: " - "close\r\n\r\n"; - - static void getHeaders(const uint8_t* headers, std::size_t length, - bool request, transport::Metadata* metadata); - static std::size_t hasBody(const uint8_t* headers, std::size_t length); - static bool isMpdRequest(const uint8_t* headers, std::size_t length); - static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - - static std::string numbers; - static std::string content_length; - static std::string transfer_encoding; - static std::string chunked; - static std::string cache_control; - static std::string connection; - static std::string mpd; - static std::string separator; -}; diff --git a/apps/http-proxy/src/forwarder_config.h b/apps/http-proxy/src/forwarder_config.h deleted file mode 100644 index 3d69c998e..000000000 --- a/apps/http-proxy/src/forwarder_config.h +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/portability/c_portability.h> -#include <hicn/transport/utils/branch_prediction.h> -#include <hicn/transport/utils/log.h> -#include <hicn/transport/utils/string_utils.h> - -#include <asio.hpp> -#include <chrono> -#include <sstream> -#include <string> - -#include "forwarder_interface.h" - -#define RETRY_INTERVAL 300 - -namespace transport { - -static constexpr char server_header[] = "server"; -static constexpr char prefix_header[] = "prefix"; -static constexpr char port_header[] = "port"; - -using OnForwarderConfiguredCallback = std::function<void(bool)>; - -class ForwarderConfig { - public: - using ListenerRetrievedCallback = std::function<void(std::error_code)>; - - template <typename Callback> - ForwarderConfig(asio::io_service& io_service, Callback&& callback) - : forwarder_interface_(io_service), - resolver_(io_service), - retx_count_(0), - timer_(io_service), - hicn_listen_port_(~0), - listener_retrieved_callback_(std::forward<Callback>(callback)) {} - - void tryToConnectToForwarder() { - doTryToConnectToForwarder(std::make_error_code(std::errc(0))); - } - - void doTryToConnectToForwarder(std::error_code ec) { - if (!ec) { - // ec == 0 --> timer expired - int ret = forwarder_interface_.connectToForwarder(); - if (ret < 0) { - // We were not able to connect to the local forwarder. Do not give up - // and retry. - TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, - this, std::placeholders::_1)); - } else { - timer_.cancel(); - retx_count_ = 0; - doGetMainListener(std::make_error_code(std::errc(0))); - } - } else { - TRANSPORT_LOGI("Timer for re-trying forwarder connection canceled."); - } - } - - void doGetMainListener(std::error_code ec) { - if (!ec) { - // ec == 0 --> timer expired - int ret = forwarder_interface_.getMainListenerPort(); - if (ret <= 0) { - // Since without the main listener of the forwarder the proxy cannot - // work, we can stop the program here until we get the listener port. - TRANSPORT_LOGE( - "Could not retrieve main listener port from the forwarder. " - "Retrying."); - - timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); - timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, - std::placeholders::_1)); - } else { - timer_.cancel(); - retx_count_ = 0; - hicn_listen_port_ = uint16_t(ret); - listener_retrieved_callback_(std::make_error_code(std::errc(0))); - } - } else { - TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); - } - } - - template <typename Callback> - TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, - Callback&& callback) { - std::stringstream ss(header); - route_info_t* ret = new route_info_t(); - std::string port_string; - - while (ss.good()) { - std::string substr; - getline(ss, substr, ','); - - if (TRANSPORT_EXPECT_FALSE(substr.empty())) { - continue; - } - - utils::trim(substr); - auto it = std::find_if(substr.begin(), substr.end(), - [](int ch) { return ch == '='; }); - if (it != std::end(substr)) { - auto key = std::string(substr.begin(), it); - auto value = std::string(it + 1, substr.end()); - - if (key == server_header) { - ret->remote_addr = value; - } else if (key == prefix_header) { - auto it = std::find_if(value.begin(), value.end(), - [](int ch) { return ch == '/'; }); - - if (it != std::end(value)) { - ret->route_addr = std::string(value.begin(), it); - ret->route_len = std::stoul(std::string(it + 1, value.end())); - } else { - return false; - } - } else if (key == port_header) { - ret->remote_port = std::stoul(value); - port_string = value; - } else { - // Header not recognized - return false; - } - } - } - - /* - * Resolve server address - */ - auto results = - resolver_.resolve({ret->remote_addr, port_string, - asio::ip::resolver_query_base::numeric_service}); - -#if ((ASIO_VERSION / 100 % 1000) < 12) - asio::ip::udp::resolver::iterator end; - auto& it = results; - while (it != end) { -#else - for (auto it = results.begin(); it != results.end(); it++) { -#endif - if (it->endpoint().address().is_v4()) { - // Use this v4 address to configure the forwarder. - ret->remote_addr = it->endpoint().address().to_string(); - ret->family = AF_INET; - std::string _prefix = ret->route_addr; - forwarder_interface_.createFaceAndRoute( - RouteInfoPtr(ret), [callback = std::forward<Callback>(callback), - configured_prefix = std::move(_prefix)]( - uint32_t route_id, bool result) { - callback(result, configured_prefix); - }); - - return true; - } -#if ((ASIO_VERSION / 100 % 1000) < 12) - it++; -#endif - } - - return false; - } - - private: - ForwarderInterface forwarder_interface_; - asio::ip::udp::resolver resolver_; - std::uint32_t retx_count_; - asio::steady_timer timer_; - uint16_t hicn_listen_port_; - ListenerRetrievedCallback listener_retrieved_callback_; -}; // namespace transport - -} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index 105d5a8e9..d80939b8b 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "forwarder_interface.h" +#include <hicn/http-proxy/forwarder_interface.h> #include <arpa/inet.h> #include <hicn/transport/utils/log.h> @@ -41,16 +41,18 @@ int ForwarderInterface::connectToForwarder() { } void ForwarderInterface::close() { - internal_ioservice_.post([this]() { - work_.reset(); - if (sock_) { - hc_sock_free(sock_); - sock_ = nullptr; - } - }); + if (!closed_) { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); - if (thread_->joinable()) { - thread_->join(); + if (thread_->joinable()) { + thread_->join(); + } } } diff --git a/apps/http-proxy/src/forwarder_interface.h b/apps/http-proxy/src/forwarder_interface.h deleted file mode 100644 index 116b09a07..000000000 --- a/apps/http-proxy/src/forwarder_interface.h +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -extern "C" { -#include <hicn/ctrl/api.h> -#include <hicn/util/ip_address.h> -} - -#ifndef ASIO_STANDALONE -#define ASIO_STANDALONE 1 -#endif -#include <asio.hpp> -#include <asio/steady_timer.hpp> -#include <functional> -#include <thread> -#include <unordered_map> - -namespace transport { - -typedef std::function<void(uint32_t, bool)> SetRouteCallback; - -struct route_info_t { - int family; - std::string remote_addr; - uint16_t remote_port; - std::string route_addr; - uint8_t route_len; -}; - -using RouteInfoPtr = std::shared_ptr<route_info_t>; - -class ForwarderInterface { - public: - ForwarderInterface(asio::io_service &io_service) - : external_ioservice_(io_service), - work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), - sock_(nullptr), - thread_(std::make_unique<std::thread>( - [this]() { internal_ioservice_.run(); })), - check_routes_timer_(nullptr), - pending_add_route_counter_(0), - route_id_(0) {} - - ~ForwarderInterface(); - - int connectToForwarder(); - - void removeConnectedUserNow(uint32_t route_id); - - // to be called at the server - // at the client this creates a race condition - // and the program enters in a loop - void scheduleRemoveConnectedUser(uint32_t route_id); - - template <typename Callback> - void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { - internal_ioservice_.post([this, _route_info = std::move(route_info), - _callback = std::forward<Callback>(callback)]() { - pending_add_route_counter_++; - uint8_t max_try = 5; - auto timer = new asio::steady_timer(internal_ioservice_); - internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, - std::move(_callback)); - }); - } - - int32_t getMainListenerPort(); - - void close(); - - private: - void internalRemoveConnectedUser(uint32_t route_id); - - void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, - asio::steady_timer *timer, - SetRouteCallback callback); - - int tryToCreateFaceAndRoute(route_info_t *route_info); - - asio::io_service &external_ioservice_; - asio::io_service internal_ioservice_; - std::unique_ptr<asio::io_service::work> work_; - hc_sock_t *sock_; - std::unique_ptr<std::thread> thread_; - std::unordered_map<uint32_t, RouteInfoPtr> route_status_; - std::unique_ptr<asio::steady_timer> check_routes_timer_; - uint32_t pending_add_route_counter_; - uint32_t route_id_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 1e6dcd88f..4afa5b832 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -13,13 +13,14 @@ * limitations under the License. */ -#include "http_proxy.h" +#include <hicn/http-proxy/http_proxy.h> +#include <hicn/http-proxy/http_session.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/utils/log.h> #include <hicn/transport/utils/string_utils.h> -#include "utils.h" +#include <hicn/http-proxy/utils.h> namespace transport { @@ -54,6 +55,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { consumer_.connect(); } + void stop() { session_->close(); } + void setHttpSession(asio::ip::tcp::socket&& socket) { session_ = std::make_unique<HTTPSession>( std::move(socket), @@ -271,11 +274,39 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, new HTTPClientConnectionCallback(*this, thread_)); } } - }) { + }), + stopped_(false) { forwarder_config_.tryToConnectToForwarder(); } +void TcpReceiver::stop() { + thread_.add([this](){ + stopped_ = true; + + /* Stop the listener */ + listener_.stop(); + + /* Close connection with forwarder */ + forwarder_config_.close(); + + /* Stop the used http clients */ + for (auto& client : used_http_clients_) { + client->stop(); + } + + /* Delete unused clients */ + for (auto& client : http_clients_) { + delete client; + } + }); +} + void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + if (stopped_) { + delete client; + return; + } + http_clients_.emplace_front(client); used_http_clients_.erase(client); } @@ -299,21 +330,46 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { used_http_clients_.insert(c); } -HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) { +void HTTPProxy::setupSignalHandler() { + signals_.async_wait([this](const std::error_code& ec, int signal_number) { + if (!ec) { + TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + stop(); + } + }); +} + +void HTTPProxy::stop() { + for (auto& receiver : receivers_) { + receiver->stop(); + } + + for (auto& receiver : receivers_) { + receiver->stopAndJoinThread(); + } +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); receivers_.emplace_back(std::make_unique<TcpReceiver>( params.tcp_listen_port, params.prefix, params.first_ipv6_word)); } + + setupSignalHandler(); } -HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) { +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { receivers_.emplace_back(std::make_unique<IcnReceiver>( params.prefix, params.first_ipv6_word, params.origin_address, params.origin_port, params.cache_size, params.mtu, params.content_lifetime, params.manifest)); } + + setupSignalHandler(); } } // namespace transport diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h deleted file mode 100644 index c3f183af2..000000000 --- a/apps/http-proxy/src/http_proxy.h +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/utils/event_thread.h> - -#include "forwarder_config.h" -#include "http_session.h" -#include "icn_receiver.h" - -#define ASIO_STANDALONE -#include <asio.hpp> -#include <asio/version.hpp> -#include <unordered_set> - -class TcpListener { - public: - using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>; - - TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) - : acceptor_(io_service, - asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), -#if ((ASIO_VERSION / 100 % 1000) < 12) - socket_(io_service), -#endif - callback_(callback) { - } - - public: - void doAccept() { -#if ((ASIO_VERSION / 100 % 1000) >= 12) - acceptor_.async_accept( - [this](std::error_code ec, asio::ip::tcp::socket socket) { -#else - acceptor_.async_accept(socket_, [this](std::error_code ec) { - auto socket = std::move(socket_); -#endif - if (!ec) { - callback_(std::move(socket)); - } - - doAccept(); - }); - } - - asio::ip::tcp::acceptor acceptor_; -#if ((ASIO_VERSION / 100 % 1000) < 12) - asio::ip::tcp::socket socket_; -#endif - AcceptCallback callback_; -}; - -namespace transport { - -class HTTPClientConnectionCallback; - -class Receiver { - public: - Receiver() : thread_() {} - - protected: - utils::EventThread thread_; -}; - -class TcpReceiver : public Receiver { - friend class HTTPClientConnectionCallback; - - public: - TcpReceiver(std::uint16_t port, const std::string& prefix, - const std::string& ipv6_first_word); - - private: - void onNewConnection(asio::ip::tcp::socket&& socket); - void onClientDisconnect(HTTPClientConnectionCallback* client); - - template <typename Callback> - void parseHicnHeader(std::string& hicn_header, Callback&& callback) { - forwarder_config_.parseHicnHeader(hicn_header, - std::forward<Callback>(callback)); - } - - TcpListener listener_; - std::string prefix_; - std::string ipv6_first_word_; - std::string prefix_hash_; - std::deque<HTTPClientConnectionCallback*> http_clients_; - std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; - ForwarderConfig forwarder_config_; -}; - -class IcnReceiver : public Receiver { - public: - template <typename... Args> - IcnReceiver(Args&&... args) - : Receiver(), - icn_consum_producer_(thread_.getIoService(), - std::forward<Args>(args)...) { - icn_consum_producer_.run(); - } - - private: - AsyncConsumerProducer icn_consum_producer_; -}; - -class HTTPProxy { - public: - enum Server { CREATE }; - enum Client { WRAP_BUFFER }; - - struct CommonParams { - std::string prefix; - std::string first_ipv6_word; - - virtual void printParams() { std::cout << "Parameters: " << std::endl; }; - }; - - struct ClientParams : virtual CommonParams { - short tcp_listen_port; - void printParams() override { - std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; - CommonParams::printParams(); - std::cout << "\t" - << "HTTP listen port: " << tcp_listen_port << std::endl; - std::cout << "\t" - << "Consumer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; - } - }; - - struct ServerParams : virtual CommonParams { - std::string origin_address; - std::string origin_port; - std::string cache_size; - std::string mtu; - std::string content_lifetime; - bool manifest; - - void printParams() override { - std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; - CommonParams::printParams(); - std::cout << "\t" - << "Origin address: " << origin_address << std::endl; - std::cout << "\t" - << "Origin port: " << origin_port << std::endl; - std::cout << "\t" - << "Producer cache size: " << cache_size << std::endl; - std::cout << "\t" - << "hICN MTU: " << mtu << std::endl; - std::cout << "\t" - << "Default content lifetime: " << content_lifetime - << std::endl; - std::cout << "\t" - << "Producer Prefix: " << prefix << std::endl; - std::cout << "\t" - << "Prefix first word: " << first_ipv6_word << std::endl; - std::cout << "\t" - << "Use manifest: " << manifest << std::endl; - } - }; - - HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); - HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); - - void run() { sleep(1000000); } - - private: - void acceptTCPClient(asio::ip::tcp::socket&& socket); - - private: - std::vector<std::unique_ptr<Receiver>> receivers_; -}; - -} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 760539fe0..6b91c12c3 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,8 +13,7 @@ * limitations under the License. */ -#include "http_session.h" - +#include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> #include <hicn/transport/utils/log.h> diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/src/http_session.h deleted file mode 100644 index 05fdf62fa..000000000 --- a/apps/http-proxy/src/http_session.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/core/packet.h> - -#include "HTTP1.xMessageFastParser.h" - -#define ASIO_STANDALONE -#include <asio.hpp> -#include <deque> -#include <functional> - -namespace transport { - -using asio::ip::tcp; - -struct Metadata; - -typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, - bool headers, Metadata *metadata)> - ContentReceivedCallback; -typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed; -typedef std::function<void()> ContentSentCallback; -typedef std::deque< - std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> - BufferQueue; - -struct Metadata { - std::string http_version; - HTTPHeaders headers; -}; - -struct RequestMetadata : Metadata { - std::string method; - std::string path; -}; - -struct ResponseMetadata : Metadata { - std::string status_code; - std::string status_string; -}; - -class HTTPClientConnectionCallback; - -class HTTPSession { - friend class HTTPClientConnectionCallback; - static constexpr uint32_t buffer_size = 1024 * 512; - - enum class ConnectorState { - CLOSED, - CONNECTING, - CONNECTED, - }; - - public: - HTTPSession(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = false); - - HTTPSession(asio::ip::tcp::socket socket, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool client = true); - - ~HTTPSession(); - - void send(const uint8_t *buffer, std::size_t len, - ContentSentCallback &&content_sent = 0); - - void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); - - void close(); - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_size, std::size_t additional_bytes); - - void doReadChunkedHeader(); - - void doWrite(); - - bool checkConnected(); - - private: - void handleRead(std::error_code ec, std::size_t length); - void tryReconnection(); - void startConnectionTimer(); - void handleDeadline(const std::error_code &ec); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - BufferQueue write_msgs_; - - asio::streambuf input_buffer_; - - bool reverse_; - bool is_reconnection_; - bool data_available_; - - std::size_t content_length_; - - // Chunked encoding - bool is_last_chunk_; - bool chunked_; - - ContentReceivedCallback receive_callback_; - OnConnectionClosed on_connection_closed_callback_; - - // HTTP headers - std::unique_ptr<Metadata> header_info_; - - // Connector state - ConnectorState state_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 6ccd2dc31..34b85f9c9 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,7 +13,7 @@ * limitations under the License. */ -#include "icn_receiver.h" +#include <hicn/http-proxy/icn_receiver.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> @@ -23,8 +23,8 @@ #include <functional> #include <memory> -#include "HTTP1.xMessageFastParser.h" -#include "utils.h" +#include <hicn/http-proxy/HTTP1.xMessageFastParser.h> +#include <hicn/http-proxy/utils.h> namespace transport { @@ -42,7 +42,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, @@ -76,15 +75,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( } producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); } void AsyncConsumerProducer::start() { @@ -100,6 +90,15 @@ void AsyncConsumerProducer::run() { } } +void AsyncConsumerProducer::stop() { + io_service_.post([this]() { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + void AsyncConsumerProducer::doReceive() { producer_socket_.setSocketOption( interface::ProducerCallbacksOptions::CACHE_MISS, diff --git a/apps/http-proxy/src/icn_receiver.h b/apps/http-proxy/src/icn_receiver.h deleted file mode 100644 index 31e9ae932..000000000 --- a/apps/http-proxy/src/icn_receiver.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/core/prefix.h> -#include <hicn/transport/interfaces/publication_options.h> -#include <hicn/transport/interfaces/socket_producer.h> -#include <hicn/transport/utils/spinlock.h> - -#include <asio.hpp> -#include <cassert> -#include <cstring> -#include <queue> -#include <utility> - -#include "http_session.h" - -namespace transport { - -class AsyncConsumerProducer { - using SegmentProductionPair = std::pair<uint32_t, bool>; - using ResponseInfoMap = std::unordered_map<core::Name, SegmentProductionPair>; - using RequestQueue = std::queue<interface::PublicationOptions>; - - public: - explicit AsyncConsumerProducer( - asio::io_service& io_service, const std::string& prefix, - const std::string& first_ipv6_word, const std::string& origin_address, - const std::string& origin_port, const std::string& cache_size, - const std::string& mtu, const std::string& content_lifetime, - bool manifest); - - explicit AsyncConsumerProducer( - const std::string& prefix, const std::string& first_ipv6_word, - const std::string& origin_address, const std::string& origin_port, - const std::string& cache_size, const std::string& mtu, - const std::string& content_lifetime, bool manifest) - : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, - origin_address, origin_port, cache_size, mtu, - content_lifetime, manifest) { - external_io_service_ = false; - } - - void run(); - - private: - void start(); - - void doSend(); - - void doReceive(); - - void publishContent(const uint8_t* data, std::size_t size, - bool is_last = true, bool headers = false); - - void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, - utils::MemBuf* payload); - - core::Prefix prefix_; - asio::io_service& io_service_; - asio::io_service internal_io_service_; - bool external_io_service_; - interface::ProducerSocket producer_socket_; - - std::string ip_address_; - std::string port_; - uint32_t cache_size_; - uint32_t mtu_; - - uint64_t request_counter_; - asio::signal_set signals_; - - // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> - // connection_map_; - HTTPSession connector_; - - unsigned long default_content_lifetime_; - - // ResponseInfoMap --> max_seq_number + bool indicating whether request is in - // production - ResponseInfoMap chunk_number_map_; - RequestQueue response_name_queue_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/src/utils.h deleted file mode 100644 index d87c796d0..000000000 --- a/apps/http-proxy/src/utils.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/core/prefix.h> -#include <hicn/transport/utils/hash.h> - -#include <sstream> -#include <string> - -#pragma once - -TRANSPORT_ALWAYS_INLINE std::string generatePrefix( - const std::string& prefix_url, const std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::"; - - return stream.str(); -}
\ No newline at end of file |