diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-06-02 18:52:39 +0200 |
---|---|---|
committer | Angelo Mantellini <angelo.mantellini@cisco.com> | 2020-06-03 16:21:49 +0200 |
commit | 5d8156ea4c34f9a3cb986da16a71faebfb2add6b (patch) | |
tree | 5895f7546c91eab1c6cad917f3a41594a543ca64 /apps/http-proxy/includes | |
parent | 15458966a342caa0912b7806a755d0d8277ca00f (diff) |
[HICN-622] Add stop() functionality to http proxy.
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Change-Id: I9091cd8ef0f9da869b886541a0116adf3f30e6b9
Signed-off-by: Angelo Mantellini <angelo.mantellini@cisco.com>
Diffstat (limited to 'apps/http-proxy/includes')
8 files changed, 902 insertions, 0 deletions
diff --git a/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt new file mode 100644 index 000000000..5bc7d45fc --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_config.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_proxy.h + ${CMAKE_CURRENT_SOURCE_DIR}/http_session.h + ${CMAKE_CURRENT_SOURCE_DIR}/HTTP1.xMessageFastParser.h + ${CMAKE_CURRENT_SOURCE_DIR}/icn_receiver.h + ${CMAKE_CURRENT_SOURCE_DIR}/utils.h +) + +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) + +set(LIBPROXY_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR}/../.. "" + CACHE INTERNAL + "" FORCE +) + +set(LIBPROXY_TO_INSTALL_HEADER_FILES + ${HEADER_FILES} "" + CACHE INTERNAL + "" FORCE +) + diff --git a/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h new file mode 100644 index 000000000..7c035c83b --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/HTTP1.xMessageFastParser.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/http/message.h> + +#include <algorithm> +#include <string> + +using transport::http::HTTPHeaders; + +namespace transport { +struct Metadata; +} + +class HTTPMessageFastParser { + public: + static constexpr char http_ok[] = + "HTTP/1.1 200 OK\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n\r\n"; + + static constexpr char http_cors[] = + "HTTP/1.1 200 OK\r\n" + "Date: %s\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Access-Control-Allow-Methods: GET\r\n" + "Access-Control-Allow-Headers: hicn\r\n" + "Access-Control-Max-Age: 1800\r\n\r\n"; + + static constexpr char http_failed[] = + "HTTP/1.1 500 Internal Server Error\r\n" + "Date: %s\r\n" + "Content-Length: 0\r\nConnection: " + "close\r\n\r\n"; + + static void getHeaders(const uint8_t* headers, std::size_t length, + bool request, transport::Metadata* metadata); + static std::size_t hasBody(const uint8_t* headers, std::size_t length); + static bool isMpdRequest(const uint8_t* headers, std::size_t length); + static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); + + static std::string numbers; + static std::string content_length; + static std::string transfer_encoding; + static std::string chunked; + static std::string cache_control; + static std::string connection; + static std::string mpd; + static std::string separator; +}; diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h new file mode 100644 index 000000000..19c96a9e3 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_config.h @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/utils/branch_prediction.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> + +#include <asio.hpp> +#include <chrono> +#include <sstream> +#include <string> + +#include "forwarder_interface.h" + + +#define RETRY_INTERVAL 300 + +namespace transport { + +static constexpr char server_header[] = "server"; +static constexpr char prefix_header[] = "prefix"; +static constexpr char port_header[] = "port"; + +using OnForwarderConfiguredCallback = std::function<void(bool)>; + +class ForwarderConfig { + public: + using ListenerRetrievedCallback = std::function<void(std::error_code)>; + + template <typename Callback> + ForwarderConfig(asio::io_service& io_service, Callback&& callback) + : forwarder_interface_(io_service), + resolver_(io_service), + retx_count_(0), + timer_(io_service), + hicn_listen_port_(~0), + listener_retrieved_callback_(std::forward<Callback>(callback)) {} + + void close() { + timer_.cancel(); + resolver_.cancel(); + forwarder_interface_.close(); + } + + void tryToConnectToForwarder() { + doTryToConnectToForwarder(std::make_error_code(std::errc(0))); + } + + void doTryToConnectToForwarder(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.connectToForwarder(); + if (ret < 0) { + // We were not able to connect to the local forwarder. Do not give up + // and retry. + TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, + this, std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + doGetMainListener(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGD("Timer for re-trying forwarder connection canceled."); + } + } + + void doGetMainListener(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.getMainListenerPort(); + if (ret <= 0) { + // Since without the main listener of the forwarder the proxy cannot + // work, we can stop the program here until we get the listener port. + TRANSPORT_LOGE( + "Could not retrieve main listener port from the forwarder. " + "Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, + std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + hicn_listen_port_ = uint16_t(ret); + listener_retrieved_callback_(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + } + } + + template <typename Callback> + TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, + Callback&& callback) { + std::stringstream ss(header); + route_info_t* ret = new route_info_t(); + std::string port_string; + + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + + if (TRANSPORT_EXPECT_FALSE(substr.empty())) { + continue; + } + + utils::trim(substr); + auto it = std::find_if(substr.begin(), substr.end(), + [](int ch) { return ch == '='; }); + if (it != std::end(substr)) { + auto key = std::string(substr.begin(), it); + auto value = std::string(it + 1, substr.end()); + + if (key == server_header) { + ret->remote_addr = value; + } else if (key == prefix_header) { + auto it = std::find_if(value.begin(), value.end(), + [](int ch) { return ch == '/'; }); + + if (it != std::end(value)) { + ret->route_addr = std::string(value.begin(), it); + ret->route_len = std::stoul(std::string(it + 1, value.end())); + } else { + return false; + } + } else if (key == port_header) { + ret->remote_port = std::stoul(value); + port_string = value; + } else { + // Header not recognized + return false; + } + } + } + + /* + * Resolve server address + */ + auto results = + resolver_.resolve({ret->remote_addr, port_string, + asio::ip::resolver_query_base::numeric_service}); + +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::udp::resolver::iterator end; + auto& it = results; + while (it != end) { +#else + for (auto it = results.begin(); it != results.end(); it++) { +#endif + if (it->endpoint().address().is_v4()) { + // Use this v4 address to configure the forwarder. + ret->remote_addr = it->endpoint().address().to_string(); + ret->family = AF_INET; + std::string _prefix = ret->route_addr; + forwarder_interface_.createFaceAndRoute( + RouteInfoPtr(ret), [callback = std::forward<Callback>(callback), + configured_prefix = std::move(_prefix)]( + uint32_t route_id, bool result) { + callback(result, configured_prefix); + }); + + return true; + } +#if ((ASIO_VERSION / 100 % 1000) < 12) + it++; +#endif + } + + return false; + } + + private: + ForwarderInterface forwarder_interface_; + asio::ip::udp::resolver resolver_; + std::uint32_t retx_count_; + asio::steady_timer timer_; + uint16_t hicn_listen_port_; + ListenerRetrievedCallback listener_retrieved_callback_; +}; // namespace transport + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h new file mode 100644 index 000000000..54941a4ba --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/forwarder_interface.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern "C" { +#include <hicn/ctrl/api.h> +#include <hicn/util/ip_address.h> +} + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE 1 +#endif +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <functional> +#include <thread> +#include <unordered_map> + +namespace transport { + +typedef std::function<void(uint32_t, bool)> SetRouteCallback; + +struct route_info_t { + int family; + std::string remote_addr; + uint16_t remote_port; + std::string route_addr; + uint8_t route_len; +}; + +using RouteInfoPtr = std::shared_ptr<route_info_t>; + +class ForwarderInterface { + public: + ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), + work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), + sock_(nullptr), + thread_(std::make_unique<std::thread>( + [this]() { internal_ioservice_.run(); })), + check_routes_timer_(nullptr), + pending_add_route_counter_(0), + route_id_(0), + closed_(false) {} + + ~ForwarderInterface(); + + int connectToForwarder(); + + void removeConnectedUserNow(uint32_t route_id); + + // to be called at the server + // at the client this creates a race condition + // and the program enters in a loop + void scheduleRemoveConnectedUser(uint32_t route_id); + + template <typename Callback> + void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { + internal_ioservice_.post([this, _route_info = std::move(route_info), + _callback = std::forward<Callback>(callback)]() { + pending_add_route_counter_++; + uint8_t max_try = 5; + auto timer = new asio::steady_timer(internal_ioservice_); + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(_callback)); + }); + } + + int32_t getMainListenerPort(); + + void close(); + + private: + void internalRemoveConnectedUser(uint32_t route_id); + + void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback); + + int tryToCreateFaceAndRoute(route_info_t *route_info); + + asio::io_service &external_ioservice_; + asio::io_service internal_ioservice_; + std::unique_ptr<asio::io_service::work> work_; + hc_sock_t *sock_; + std::unique_ptr<std::thread> thread_; + std::unordered_map<uint32_t, RouteInfoPtr> route_status_; + std::unique_ptr<asio::steady_timer> check_routes_timer_; + uint32_t pending_add_route_counter_; + uint32_t route_id_; + bool closed_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h new file mode 100644 index 000000000..6b5b1c3e3 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/http_proxy.h @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/utils/event_thread.h> + +#include "forwarder_config.h" +#include "http_session.h" +#include "icn_receiver.h" + +#define ASIO_STANDALONE +#include <asio.hpp> +#include <asio/version.hpp> +#include <unordered_set> + +class TcpListener { + public: + using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>; + + TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + : acceptor_(io_service, + asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), +#if ((ASIO_VERSION / 100 % 1000) < 12) + socket_(io_service), +#endif + callback_(callback) { + } + + public: + void doAccept() { +#if ((ASIO_VERSION / 100 % 1000) >= 12) + acceptor_.async_accept( + [this](std::error_code ec, asio::ip::tcp::socket socket) { +#else + acceptor_.async_accept(socket_, [this](std::error_code ec) { + auto socket = std::move(socket_); +#endif + if (!ec) { + callback_(std::move(socket)); + doAccept(); + } + }); + } + + void stop() { acceptor_.close(); } + + asio::ip::tcp::acceptor acceptor_; +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::tcp::socket socket_; +#endif + AcceptCallback callback_; +}; + +namespace transport { + +class HTTPClientConnectionCallback; + +class Receiver { + public: + Receiver() : thread_() {} + virtual ~Receiver() = default; + void stopAndJoinThread() { thread_.stop(); } + virtual void stop() = 0; + + protected: + utils::EventThread thread_; +}; + +class TcpReceiver : public Receiver { + friend class HTTPClientConnectionCallback; + + public: + TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word); + + void stop() override; + + private: + void onNewConnection(asio::ip::tcp::socket&& socket); + void onClientDisconnect(HTTPClientConnectionCallback* client); + + template <typename Callback> + void parseHicnHeader(std::string& hicn_header, Callback&& callback) { + forwarder_config_.parseHicnHeader(hicn_header, + std::forward<Callback>(callback)); + } + + TcpListener listener_; + std::string prefix_; + std::string ipv6_first_word_; + std::string prefix_hash_; + std::deque<HTTPClientConnectionCallback*> http_clients_; + std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; + ForwarderConfig forwarder_config_; + bool stopped_; +}; + +class IcnReceiver : public Receiver { + public: + template <typename... Args> + IcnReceiver(Args&&... args) + : Receiver(), + icn_consum_producer_(thread_.getIoService(), + std::forward<Args>(args)...) { + icn_consum_producer_.run(); + } + + void stop() override { + thread_.add([this]() { + /* Stop the listener */ + icn_consum_producer_.stop(); + }); + } + + private: + AsyncConsumerProducer icn_consum_producer_; +}; + +class HTTPProxy { + public: + enum Server { CREATE }; + enum Client { WRAP_BUFFER }; + + struct CommonParams { + std::string prefix; + std::string first_ipv6_word; + + virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + }; + + struct ClientParams : virtual CommonParams { + short tcp_listen_port; + void printParams() override { + std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "HTTP listen port: " << tcp_listen_port << std::endl; + std::cout << "\t" + << "Consumer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + } + }; + + struct ServerParams : virtual CommonParams { + std::string origin_address; + std::string origin_port; + std::string cache_size; + std::string mtu; + std::string content_lifetime; + bool manifest; + + void printParams() override { + std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "Origin address: " << origin_address << std::endl; + std::cout << "\t" + << "Origin port: " << origin_port << std::endl; + std::cout << "\t" + << "Producer cache size: " << cache_size << std::endl; + std::cout << "\t" + << "hICN MTU: " << mtu << std::endl; + std::cout << "\t" + << "Default content lifetime: " << content_lifetime + << std::endl; + std::cout << "\t" + << "Producer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + std::cout << "\t" + << "Use manifest: " << manifest << std::endl; + } + }; + + HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); + HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); + + void run() { main_io_context_.run(); } + void stop(); + + private: + void setupSignalHandler(); + + std::vector<std::unique_ptr<Receiver>> receivers_; + asio::io_service main_io_context_; + asio::signal_set signals_; +}; + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/includes/hicn/http-proxy/http_session.h b/apps/http-proxy/includes/hicn/http-proxy/http_session.h new file mode 100644 index 000000000..05fdf62fa --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/http_session.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/packet.h> + +#include "HTTP1.xMessageFastParser.h" + +#define ASIO_STANDALONE +#include <asio.hpp> +#include <deque> +#include <functional> + +namespace transport { + +using asio::ip::tcp; + +struct Metadata; + +typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, + bool headers, Metadata *metadata)> + ContentReceivedCallback; +typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed; +typedef std::function<void()> ContentSentCallback; +typedef std::deque< + std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> + BufferQueue; + +struct Metadata { + std::string http_version; + HTTPHeaders headers; +}; + +struct RequestMetadata : Metadata { + std::string method; + std::string path; +}; + +struct ResponseMetadata : Metadata { + std::string status_code; + std::string status_string; +}; + +class HTTPClientConnectionCallback; + +class HTTPSession { + friend class HTTPClientConnectionCallback; + static constexpr uint32_t buffer_size = 1024 * 512; + + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + + public: + HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = false); + + HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool client = true); + + ~HTTPSession(); + + void send(const uint8_t *buffer, std::size_t len, + ContentSentCallback &&content_sent = 0); + + void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); + + void close(); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_size, std::size_t additional_bytes); + + void doReadChunkedHeader(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleRead(std::error_code ec, std::size_t length); + void tryReconnection(); + void startConnectionTimer(); + void handleDeadline(const std::error_code &ec); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + BufferQueue write_msgs_; + + asio::streambuf input_buffer_; + + bool reverse_; + bool is_reconnection_; + bool data_available_; + + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + + ContentReceivedCallback receive_callback_; + OnConnectionClosed on_connection_closed_callback_; + + // HTTP headers + std::unique_ptr<Metadata> header_info_; + + // Connector state + ConnectorState state_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h new file mode 100644 index 000000000..780037665 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/icn_receiver.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/interfaces/publication_options.h> +#include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/utils/spinlock.h> + +#include <asio.hpp> +#include <cassert> +#include <cstring> +#include <queue> +#include <utility> + +#include <hicn/http-proxy/http_session.h> +//#include "http_session.h" + +namespace transport { + +class AsyncConsumerProducer { + using SegmentProductionPair = std::pair<uint32_t, bool>; + using ResponseInfoMap = std::unordered_map<core::Name, SegmentProductionPair>; + using RequestQueue = std::queue<interface::PublicationOptions>; + + public: + explicit AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, + bool manifest); + + explicit AsyncConsumerProducer( + const std::string& prefix, const std::string& first_ipv6_word, + const std::string& origin_address, const std::string& origin_port, + const std::string& cache_size, const std::string& mtu, + const std::string& content_lifetime, bool manifest) + : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, + origin_address, origin_port, cache_size, mtu, + content_lifetime, manifest) { + external_io_service_ = false; + } + + void run(); + + void stop(); + + private: + void start(); + + void doSend(); + + void doReceive(); + + void publishContent(const uint8_t* data, std::size_t size, + bool is_last = true, bool headers = false); + + void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, + utils::MemBuf* payload); + + core::Prefix prefix_; + asio::io_service& io_service_; + asio::io_service internal_io_service_; + bool external_io_service_; + interface::ProducerSocket producer_socket_; + + std::string ip_address_; + std::string port_; + uint32_t cache_size_; + uint32_t mtu_; + + uint64_t request_counter_; + + // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> + // connection_map_; + HTTPSession connector_; + + unsigned long default_content_lifetime_; + + // ResponseInfoMap --> max_seq_number + bool indicating whether request is in + // production + ResponseInfoMap chunk_number_map_; + RequestQueue response_name_queue_; +}; + +} // namespace transport diff --git a/apps/http-proxy/includes/hicn/http-proxy/utils.h b/apps/http-proxy/includes/hicn/http-proxy/utils.h new file mode 100644 index 000000000..d87c796d0 --- /dev/null +++ b/apps/http-proxy/includes/hicn/http-proxy/utils.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/utils/hash.h> + +#include <sstream> +#include <string> + +#pragma once + +TRANSPORT_ALWAYS_INLINE std::string generatePrefix( + const std::string& prefix_url, const std::string& first_ipv6_word) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << first_ipv6_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::"; + + return stream.str(); +}
\ No newline at end of file |