From a27536f3d1ce6c2f46aef61a29dd1f516644e663 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 14 May 2020 20:27:11 +0200 Subject: [HICN-614] Add client HTTP proxy (TCP->hICN) Change-Id: Ieaa875edff98404676083bab91233b98ce50c8d0 Signed-off-by: Mauro Sardara --- apps/http-proxy/CMakeLists.txt | 10 +- apps/http-proxy/main.cc | 108 ++++++-- apps/http-proxy/src/ATSConnector.cc | 286 --------------------- apps/http-proxy/src/ATSConnector.h | 109 -------- apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 25 +- apps/http-proxy/src/HTTP1.xMessageFastParser.h | 7 +- apps/http-proxy/src/IcnReceiver.cc | 223 ---------------- apps/http-proxy/src/IcnReceiver.h | 81 ------ apps/http-proxy/src/http_proxy.cc | 243 ++++++++++++++++++ apps/http-proxy/src/http_proxy.h | 180 +++++++++++++ apps/http-proxy/src/http_session.cc | 321 ++++++++++++++++++++++++ apps/http-proxy/src/http_session.h | 111 ++++++++ apps/http-proxy/src/icn_receiver.cc | 201 +++++++++++++++ apps/http-proxy/src/icn_receiver.h | 96 +++++++ apps/http-proxy/src/utils.h | 51 ++++ 15 files changed, 1313 insertions(+), 739 deletions(-) delete mode 100644 apps/http-proxy/src/ATSConnector.cc delete mode 100644 apps/http-proxy/src/ATSConnector.h delete mode 100644 apps/http-proxy/src/IcnReceiver.cc delete mode 100644 apps/http-proxy/src/IcnReceiver.h create mode 100644 apps/http-proxy/src/http_proxy.cc create mode 100644 apps/http-proxy/src/http_proxy.h create mode 100644 apps/http-proxy/src/http_session.cc create mode 100644 apps/http-proxy/src/http_session.h create mode 100644 apps/http-proxy/src/icn_receiver.cc create mode 100644 apps/http-proxy/src/icn_receiver.h create mode 100644 apps/http-proxy/src/utils.h (limited to 'apps/http-proxy') diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index d6681097c..13839dbf7 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -31,14 +31,16 @@ include_directories( ) set(LIB_SOURCE_FILES - src/ATSConnector.cc + src/http_session.cc + src/http_proxy.cc src/HTTP1.xMessageFastParser.cc - src/IcnReceiver.cc + src/icn_receiver.cc ) set(LIB_SERVER_HEADER_FILES - src/IcnReceiver.h - src/ATSConnector.h + src/icn_receiver.h + src/http_session.h + src/http_proxy.h src/HTTP1.xMessageFastParser.h ) diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index 20655071c..8d407ba4c 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -13,12 +13,11 @@ * limitations under the License. */ -#include "src/IcnReceiver.h" +#include "src/http_proxy.h" using namespace transport; int usage(char* program) { - std::cerr << "ICN Plugin not loaded!" << std::endl; std::cerr << "USAGE: " << program << "\n" << "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] " "-p [SERVER_PORT] -c [CACHE_SIZE] -m [MTU] -l [DEFAULT_LIFETIME " @@ -27,39 +26,99 @@ int usage(char* program) { return -1; } +struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { + void printParams() override { + if (client) { + HTTPProxy::ClientParams::printParams(); + } else if (server) { + HTTPProxy::ServerParams::printParams(); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + + std::cout << "\t" + << "N Threads: " << n_thread << std::endl; + } + + HTTPProxy instantiateProxyAsValue() { + if (client) { + HTTPProxy::ClientParams* p = dynamic_cast(this); + return transport::HTTPProxy(*p, n_thread); + } else if (server) { + HTTPProxy::ServerParams* p = dynamic_cast(this); + return transport::HTTPProxy(*p, n_thread); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + } + + bool client = false; + bool server = false; + std::uint16_t n_thread = 1; +}; + int main(int argc, char** argv) { - std::string prefix("http://hicn-http-proxy"); - std::string ip_address("127.0.0.1"); - std::string port("80"); - std::string cache_size("50000"); - std::string mtu("1500"); - std::string first_ipv6_word("b001"); - std::string default_content_lifetime("7200"); // seconds - bool manifest = false; + Params params; + + params.prefix = "http://hicn-http-proxy"; + params.origin_address = "127.0.0.1"; + params.origin_port = "80"; + params.cache_size = "50000"; + params.mtu = "1500"; + params.first_ipv6_word = "b001"; + params.content_lifetime = "7200;"; // seconds + params.manifest = false; + params.tcp_listen_port = 8080; int opt; - while ((opt = getopt(argc, argv, "a:p:c:m:P:l:M")) != -1) { + while ((opt = getopt(argc, argv, "CSa:p:c:m:P:l:ML:t:")) != -1) { switch (opt) { + case 'C': + if (params.server) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.client = true; + break; + case 'S': + if (params.client) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.server = true; + break; case 'a': - ip_address = optarg; + params.origin_address = optarg; break; case 'p': - port = optarg; + params.origin_port = optarg; break; case 'c': - cache_size = optarg; + params.cache_size = optarg; break; case 'm': - mtu = optarg; + params.mtu = optarg; break; case 'P': - first_ipv6_word = optarg; + params.first_ipv6_word = optarg; break; case 'l': - default_content_lifetime = optarg; + params.content_lifetime = optarg; + break; + case 'L': + params.tcp_listen_port = std::stoul(optarg); break; case 'M': - manifest = true; + params.manifest = true; + break; + case 't': + params.n_thread = std::stoul(optarg); break; case 'h': default: @@ -69,18 +128,13 @@ int main(int argc, char** argv) { } if (argv[optind] == 0) { - std::cerr << "Using default prefix " << prefix << std::endl; + std::cerr << "Using default prefix " << params.prefix << std::endl; } else { - prefix = argv[optind]; + params.prefix = argv[optind]; } - std::cout << "Connecting to " << ip_address << " port " << port - << " Cache size " << cache_size << " Prefix " << prefix << " MTU " - << mtu << " IPv6 first word " << first_ipv6_word << std::endl; - transport::AsyncConsumerProducer proxy( - prefix, ip_address, port, cache_size, mtu, first_ipv6_word, - std::stoul(default_content_lifetime) * 1000, manifest); - + params.printParams(); + transport::HTTPProxy proxy = params.instantiateProxyAsValue(); proxy.run(); return 0; diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc deleted file mode 100644 index a9b889941..000000000 --- a/apps/http-proxy/src/ATSConnector.cc +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ATSConnector.h" -#include "HTTP1.xMessageFastParser.h" - -#include -#include -#include - -namespace transport { - -ATSConnector::ATSConnector(asio::io_service &io_service, - std::string &ip_address, std::string &port, - ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback) - : io_service_(io_service), - socket_(io_service_), - resolver_(io_service_), - endpoint_iterator_(resolver_.resolve({ip_address, port})), - timer_(io_service), - is_reconnection_(false), - data_available_(false), - content_length_(0), - is_last_chunk_(false), - chunked_(false), - receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback) { - input_buffer_.prepare(buffer_size + 2048); - state_ = ConnectorState::CONNECTING; - doConnect(); -} - -ATSConnector::~ATSConnector() {} - -void ATSConnector::send(const uint8_t *packet, std::size_t len, - ContentSentCallback &&content_sent) { - asio::async_write( - socket_, asio::buffer(packet, len), - [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { content_sent(); }); -} - -void ATSConnector::send(utils::MemBuf *buffer, - ContentSentCallback &&content_sent) { - io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { - bool write_in_progress = !write_msgs_.empty(); - write_msgs_.emplace_back(std::unique_ptr(buffer), - std::move(callback)); - if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { - if (!write_in_progress) { - doWrite(); - } - } else { - TRANSPORT_LOGD("Tell the handle connect it has data to write"); - data_available_ = true; - } - }); -} - -void ATSConnector::close() { - if (state_ != ConnectorState::CLOSED) { - state_ = ConnectorState::CLOSED; - if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - // on_disconnect_callback_(); - } - } -} - -void ATSConnector::doWrite() { - auto &buffer = write_msgs_.front().first; - - asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent!"); - write_msgs_.front().second(); - write_msgs_.pop_front(); - if (!write_msgs_.empty()) { - doWrite(); - } - } else { - TRANSPORT_LOGD("Content NOT sent!"); - } - }); -} // namespace transport - -void ATSConnector::handleRead(std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - content_length_ -= length; - const uint8_t *buffer = - asio::buffer_cast(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !content_length_, false); - input_buffer_.consume(input_buffer_.size()); - - if (!content_length_) { - if (!chunked_ || is_last_chunk_) { - doReadHeader(); - } else { - doReadChunkedHeader(); - } - } else { - auto to_read = - content_length_ >= buffer_size ? buffer_size : content_length_; - asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, - std::placeholders::_1, std::placeholders::_2)); - } - } else if (ec == asio::error::eof) { - input_buffer_.consume(input_buffer_.size()); - tryReconnection(); - } -} - -void ATSConnector::doReadBody(std::size_t body_size, - std::size_t additional_bytes) { - auto bytes_to_read = - body_size > additional_bytes ? (body_size - additional_bytes) : 0; - - auto to_read = bytes_to_read >= buffer_size - ? (buffer_size - input_buffer_.size()) - : bytes_to_read; - - is_last_chunk_ = chunked_ && body_size == 5; - - if (to_read > 0) { - content_length_ = bytes_to_read; - asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, - std::placeholders::_1, std::placeholders::_2)); - } else { - const uint8_t *buffer = - asio::buffer_cast(input_buffer_.data()); - receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, - false); - input_buffer_.consume(body_size); - - if (!chunked_ || is_last_chunk_) { - doReadHeader(); - } else { - doReadChunkedHeader(); - } - } -} - -void ATSConnector::doReadChunkedHeader() { - asio::async_read_until( - socket_, input_buffer_, "\r\n", - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - const uint8_t *buffer = - asio::buffer_cast(input_buffer_.data()); - std::size_t chunk_size = - std::stoul(reinterpret_cast(buffer), 0, 16) + 2 + - length; - auto additional_bytes = input_buffer_.size(); - doReadBody(chunk_size, additional_bytes); - } else { - input_buffer_.consume(input_buffer_.size()); - tryReconnection(); - } - }); -} - -void ATSConnector::doReadHeader() { - asio::async_read_until( - socket_, input_buffer_, "\r\n\r\n", - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - const uint8_t *buffer = - asio::buffer_cast(input_buffer_.data()); - auto headers = HTTPMessageFastParser::getHeaders(buffer, length); - - // Try to get content length, if available - auto it = headers.find(HTTPMessageFastParser::content_length); - std::size_t size = 0; - if (it != headers.end()) { - size = std::stoull(it->second); - chunked_ = false; - } else { - it = headers.find(HTTPMessageFastParser::transfer_encoding); - if (it != headers.end() && - it->second.compare(HTTPMessageFastParser::chunked) == 0) { - chunked_ = true; - } - } - - receive_callback_(buffer, length, !size && !chunked_, true); - auto additional_bytes = input_buffer_.size() - length; - input_buffer_.consume(length); - - if (!chunked_) { - doReadBody(size, additional_bytes); - } else { - doReadChunkedHeader(); - } - } else { - input_buffer_.consume(input_buffer_.size()); - tryReconnection(); - } - }); -} - -void ATSConnector::tryReconnection() { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); - if (state_ == ConnectorState::CONNECTED) { - state_ = ConnectorState::CONNECTING; - is_reconnection_ = true; - io_service_.post([this]() { - if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); - }); - } -} - -void ATSConnector::doConnect() { - asio::async_connect(socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { - if (!ec) { - timer_.cancel(); - state_ = ConnectorState::CONNECTED; - - asio::ip::tcp::no_delay noDelayOption(true); - socket_.set_option(noDelayOption); - - // on_reconnect_callback_(); - - doReadHeader(); - - if (data_available_ && !write_msgs_.empty()) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - TRANSPORT_LOGD("Connection recovered!"); - } - - } else { - TRANSPORT_LOGE("Impossible to reconnect: %s", - ec.message().c_str()); - close(); - } - }); -} - -bool ATSConnector::checkConnected() { - return state_ == ConnectorState::CONNECTED; -} - -void ATSConnector::startConnectionTimer() { - timer_.expires_from_now(std::chrono::seconds(10)); - timer_.async_wait( - std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1)); -} - -void ATSConnector::handleDeadline(const std::error_code &ec) { - if (!ec) { - io_service_.post([this]() { - socket_.close(); - TRANSPORT_LOGE("Error connecting. Is the server running?\n"); - io_service_.stop(); - }); - } -} - -} // namespace transport diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h deleted file mode 100644 index 8d91b7b7b..000000000 --- a/apps/http-proxy/src/ATSConnector.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#define ASIO_STANDALONE -#include -#include -#include - -namespace transport { - -using asio::ip::tcp; - -typedef std::function - ContentReceivedCallback; -typedef std::function OnReconnect; -typedef std::function ContentSentCallback; -typedef std::deque< - std::pair, ContentSentCallback>> - BufferQueue; - -class ATSConnector { - static constexpr uint32_t buffer_size = 1024 * 512; - - enum class ConnectorState { - CLOSED, - CONNECTING, - CONNECTED, - }; - - public: - ATSConnector(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback); - - ~ATSConnector(); - - void send(const uint8_t *buffer, std::size_t len, - ContentSentCallback &&content_sent = 0); - - void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); - - void close(); - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_size, std::size_t additional_bytes); - - // void handleReadChunked(std::error_code ec, std::size_t length, - // std::size_t size); - - void doReadChunkedHeader(); - - void doWrite(); - - bool checkConnected(); - - private: - void handleRead(std::error_code ec, std::size_t length); - void tryReconnection(); - void startConnectionTimer(); - void handleDeadline(const std::error_code &ec); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - BufferQueue write_msgs_; - - asio::streambuf input_buffer_; - - bool is_reconnection_; - bool data_available_; - - std::size_t content_length_; - - // Chunked encoding - bool is_last_chunk_; - bool chunked_; - - ContentReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; - - // Connector state - ConnectorState state_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index 729eb3aeb..d1271ebdf 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -15,6 +15,7 @@ #include "HTTP1.xMessageFastParser.h" +#include #include #include @@ -31,15 +32,27 @@ std::string HTTPMessageFastParser::connection = "Connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, - std::size_t length) { + std::size_t length, + bool request) { HTTPHeaders ret; std::string http_version; - std::string status_code; - std::string status_string; - if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, - status_code, status_string)) { - return ret; + if (request) { + std::string method; + std::string url; + + if (transport::http::HTTPRequest::parseHeaders(headers, length, ret, + http_version, method, url)) { + return ret; + } + } else { + std::string status_code; + std::string status_string; + + if (transport::http::HTTPResponse::parseHeaders( + headers, length, ret, http_version, status_code, status_string)) { + return ret; + } } throw std::runtime_error("Error parsing response headers."); diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 79dbce19d..0ad38b9b6 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -15,16 +15,17 @@ #pragma once +#include + #include #include -#include - using transport::http::HTTPHeaders; class HTTPMessageFastParser { public: - static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); + static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length, + bool request); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc deleted file mode 100644 index 24b0eb5dc..000000000 --- a/apps/http-proxy/src/IcnReceiver.cc +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "IcnReceiver.h" -#include "HTTP1.xMessageFastParser.h" - -#include -#include -#include -#include - -#include -#include - -namespace transport { - -core::Prefix generatePrefix(const std::string& prefix_url, - std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::0"; - - return core::Prefix(stream.str(), 64); -} - -AsyncConsumerProducer::AsyncConsumerProducer( - const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, std::string& first_ipv6_word, - unsigned long default_lifetime, bool manifest) - : prefix_(generatePrefix(prefix, first_ipv6_word)), - producer_socket_(), - ip_address_(ip_address), - port_(port), - cache_size_(std::stoul(cache_size)), - mtu_(std::stoul(mtu)), - request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), - connector_(io_service_, ip_address_, port_, - std::bind(&AsyncConsumerProducer::publishContent, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4), - [this]() { - std::queue empty; - std::swap(response_name_queue_, empty); - }), - default_content_lifetime_(default_lifetime) { - int ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: output buffer size has not been set."); - } - - ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: impossible to enable signatures."); - } - - ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: mtu has not been set."); - } - - producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); -} - -void AsyncConsumerProducer::start() { - TRANSPORT_LOGD("Starting listening"); - doReceive(); -} - -void AsyncConsumerProducer::run() { - start(); - io_service_.run(); -} - -void AsyncConsumerProducer::doReceive() { - producer_socket_.setSocketOption( - interface::ProducerCallbacksOptions::CACHE_MISS, - [this](interface::ProducerSocket& producer, - interface::Interest& interest) { - // core::Name n(interest.getWritableName(), true); - io_service_.post(std::bind( - &AsyncConsumerProducer::manageIncomingInterest, this, - interest.getWritableName(), interest.acquireMemBufReference(), - interest.getPayload().release())); - }); - - producer_socket_.connect(); -} - -void AsyncConsumerProducer::manageIncomingInterest( - core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { - auto seg = name.getSuffix(); - name.setSuffix(0); - auto _it = chunk_number_map_.find(name); - auto _end = chunk_number_map_.end(); - - std::cout << "Received interest " << seg << std::endl; - - if (_it != _end) { - if (_it->second.second) { - // Content is in production - return; - } - - if (seg >= _it->second.first) { - TRANSPORT_LOGI( - "Ignoring interest with name %s for a content object which does not " - "exist. (Request: %u, max: %u)", - name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); - return; - } - } - - std::cout << "Received interest " << seg << std::endl; - - bool is_mpd = - HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); - - auto pair = chunk_number_map_.emplace(name, std::pair(0, 0)); - if (!pair.second) { - pair.first->second.first = 0; - } - - pair.first->second.second = true; - - response_name_queue_.emplace(std::move(name), - is_mpd ? 1000 : default_content_lifetime_); - - connector_.send(payload, [packet = std::move(packet)]() {}); -} - -void AsyncConsumerProducer::publishContent(const uint8_t* data, - std::size_t size, bool is_last, - bool headers) { - uint32_t start_suffix = 0; - - if (response_name_queue_.empty()) { - std::cerr << "Aborting due tue empty request queue" << std::endl; - abort(); - } - - interface::PublicationOptions& options = response_name_queue_.front(); - - int ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - options.getLifetime()); - - if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { - TRANSPORT_LOGD("Warning: content object lifetime has not been set."); - } - - const interface::Name& name = options.getName(); - - auto it = chunk_number_map_.find(name); - if (it == chunk_number_map_.end()) { - std::cerr << "Aborting due to response not found in ResposeInfo map." - << std::endl; - abort(); - } - - start_suffix = it->second.first; - - if (headers) { - request_counter_++; - } - - it->second.first += - producer_socket_.produce(name, data, size, is_last, start_suffix); - - if (is_last) { - it->second.second = false; - response_name_queue_.pop(); - } -} - -} // namespace transport diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h deleted file mode 100644 index 9d0ab5172..000000000 --- a/apps/http-proxy/src/IcnReceiver.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ATSConnector.h" - -#include -#include -#include -#include - -#include -#include -#include -#include - -namespace transport { - -class AsyncConsumerProducer { - using SegmentProductionPair = std::pair; - using ResponseInfoMap = std::unordered_map; - using RequestQueue = std::queue; - - public: - explicit AsyncConsumerProducer(const std::string& prefix, - std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, - std::string& first_ipv6_word, - unsigned long default_content_lifetime, bool manifest); - - void start(); - - void run(); - - private: - void doSend(); - - void doReceive(); - - void publishContent(const uint8_t* data, std::size_t size, - bool is_last = true, bool headers = false); - - void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, - utils::MemBuf* payload); - - core::Prefix prefix_; - asio::io_service io_service_; - interface::ProducerSocket producer_socket_; - - std::string ip_address_; - std::string port_; - uint32_t cache_size_; - uint32_t mtu_; - - uint64_t request_counter_; - asio::signal_set signals_; - - // std::unordered_map> - // connection_map_; - ATSConnector connector_; - - unsigned long default_content_lifetime_; - - // ResponseInfoMap --> max_seq_number + bool indicating whether request is in - // production - ResponseInfoMap chunk_number_map_; - RequestQueue response_name_queue_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc new file mode 100644 index 000000000..18e9bf727 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.cc @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "http_proxy.h" + +#include +#include + +#include "utils.h" + +namespace transport { + +using core::Interest; +using core::Name; +using interface::ConsumerCallbacksOptions; +using interface::ConsumerInterestCallback; +using interface::ConsumerSocket; +using interface::TransportProtocolAlgorithms; + +class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { + public: + HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, + utils::EventThread& thread, + const std::string& prefix, + const std::string& ipv6_first_word) + : tcp_receiver_(tcp_receiver), + thread_(thread), + prefix_hash_(generatePrefix(prefix, ipv6_first_word)), + consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), + session_(nullptr), + current_size_(0) { + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.connect(); + } + + void setHttpSession(asio::ip::tcp::socket&& socket) { + session_ = std::make_unique( + std::move(socket), + std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4), + [this](asio::ip::tcp::socket& socket) -> bool { + try { + std::string remote_address = + socket.remote_endpoint().address().to_string(); + std::uint16_t remote_port = socket.remote_endpoint().port(); + TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), + remote_port); + } catch (std::system_error& e) { + // Do nothing + } + + consumer_.stop(); + tcp_receiver_.onClientDisconnect(this); + return false; + }); + + current_size_ = 0; + } + + private: + void consumeNextRequest() { + if (request_buffer_queue_.size() == 0) { + // No additiona requests to process. + return; + } + + auto& buffer = request_buffer_queue_.front(); + uint64_t request_hash = + utils::hash::fnv64_buf(buffer->data(), buffer->length()); + + std::stringstream name; + name << prefix_hash_.substr(0, prefix_hash_.length() - 2); + + for (uint16_t* word = (uint16_t*)&request_hash; + std::size_t(word) < + (std::size_t(&request_hash) + sizeof(request_hash)); + word++) { + name << ":" << std::hex << *word; + } + + name << "|0"; + + // Non blocking consume :) + consumer_.consume(Name(name.str())); + } + + // tcp callbacks + + void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last, + bool headers) { + if (headers) { + // Add the request to the request queue + tmp_buffer_ = utils::MemBuf::copyBuffer(data, size); + } else { + // Append payload chunk to last request added. Here we are assuming + // HTTP/1.1. + tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size)); + } + + current_size_ += size; + + if (is_last) { + TRANSPORT_LOGD( + "Request received: %s", + std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length()) + .c_str()); + if (current_size_ < 1400) { + request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); + } else { + TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", + current_size_); + session_->close(); + current_size_ = 0; + return; + } + + if (!consumer_.isRunning()) { + TRANSPORT_LOGD( + "Consumer stopped, triggering consume from TCP session handler.."); + consumeNextRequest(); + } + + current_size_ = 0; + } + } + + // hicn callbacks + + void processLeavingInterest(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.payloadSize() == 0) { + Interest& int2 = const_cast(interest); + int2.appendPayload(request_buffer_queue_.front()->clone()); + } + } + + bool isBufferMovable() noexcept { return true; } + void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} + void readDataAvailable(size_t length) noexcept {} + size_t maxBufferSize() const { return 64 * 1024; } + + void readBufferAvailable(std::unique_ptr&& buffer) noexcept { + // Response received. Send it back to client + auto _buffer = buffer.release(); + TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + session_->send(_buffer, []() {}); + } + + void readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + session_->close(); + } + + void readSuccess(std::size_t total_size) noexcept { + request_buffer_queue_.pop_front(); + consumeNextRequest(); + } + + private: + TcpReceiver& tcp_receiver_; + utils::EventThread& thread_; + std::string prefix_hash_; + ConsumerSocket consumer_; + std::unique_ptr session_; + std::deque> request_buffer_queue_; + std::unique_ptr tmp_buffer_; + std::size_t current_size_; +}; + +TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word) + : Receiver(), + listener_(thread_.getIoService(), port, + std::bind(&TcpReceiver::onNewConnection, this, + std::placeholders::_1)), + prefix_(prefix), + ipv6_first_word_(ipv6_first_word) { + for (int i = 0; i < 10; i++) { + http_clients_.emplace_back(new HTTPClientConnectionCallback( + *this, thread_, prefix, ipv6_first_word)); + } +} + +void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + http_clients_.emplace_front(client); + used_http_clients_.erase(client); +} + +void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { + if (http_clients_.size() == 0) { + // Create new HTTPClientConnectionCallback + TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); + http_clients_.emplace_back(new HTTPClientConnectionCallback( + *this, thread_, prefix_, ipv6_first_word_)); + } + + // Get new HTTPClientConnectionCallback + HTTPClientConnectionCallback* c = http_clients_.front(); + http_clients_.pop_front(); + + // Set http session + c->setHttpSession(std::move(socket)); + + // Move it to used clients + used_http_clients_.insert(c); +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) { + for (uint16_t i = 0; i < n_thread; i++) { + // icn_receivers_.emplace_back(std::make_unique(icn_params)); + receivers_.emplace_back(std::make_unique( + params.tcp_listen_port, params.prefix, params.first_ipv6_word)); + } +} + +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) { + for (uint16_t i = 0; i < n_thread; i++) { + receivers_.emplace_back(std::make_unique( + params.prefix, params.first_ipv6_word, params.origin_address, + params.origin_port, params.cache_size, params.mtu, + params.content_lifetime, params.manifest)); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h new file mode 100644 index 000000000..6fa394e28 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.h @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "http_session.h" +#include "icn_receiver.h" + +#define ASIO_STANDALONE +#include +#include +#include + +class TcpListener { + public: + using AcceptCallback = std::function; + + TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + : acceptor_(io_service, + asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), +#if ((ASIO_VERSION / 100 % 1000) < 12) + socket_(io_service), +#endif + callback_(callback) { + do_accept(); + } + + private: + void do_accept() { +#if ((ASIO_VERSION / 100 % 1000) >= 12) + acceptor_.async_accept( + [this](std::error_code ec, asio::ip::tcp::socket socket) { +#else + acceptor_.async_accept(socket_, [this](std::error_code ec) { + auto socket = std::move(socket_); +#endif + if (!ec) { + callback_(std::move(socket)); + } + + do_accept(); + }); + } + + asio::ip::tcp::acceptor acceptor_; +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::tcp::socket socket_; +#endif + AcceptCallback callback_; +}; + +namespace transport { + +class HTTPClientConnectionCallback; + +class Receiver { + public: + Receiver() : thread_() {} + + protected: + utils::EventThread thread_; +}; + +class TcpReceiver : public Receiver { + friend class HTTPClientConnectionCallback; + + public: + TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word); + + private: + void onNewConnection(asio::ip::tcp::socket&& socket); + void onClientDisconnect(HTTPClientConnectionCallback* client); + + TcpListener listener_; + std::string prefix_; + std::string ipv6_first_word_; + std::deque http_clients_; + std::unordered_set used_http_clients_; +}; + +class IcnReceiver : public Receiver { + public: + template + IcnReceiver(Args&&... args) + : Receiver(), + icn_consum_producer_(thread_.getIoService(), + std::forward(args)...) { + icn_consum_producer_.run(); + } + + private: + AsyncConsumerProducer icn_consum_producer_; +}; + +class HTTPProxy { + public: + enum Server { CREATE }; + enum Client { WRAP_BUFFER }; + + struct CommonParams { + std::string prefix; + std::string first_ipv6_word; + + virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + }; + + struct ClientParams : virtual CommonParams { + short tcp_listen_port; + void printParams() override { + std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "HTTP listen port: " << tcp_listen_port << std::endl; + std::cout << "\t" + << "Consumer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + } + }; + + struct ServerParams : virtual CommonParams { + std::string origin_address; + std::string origin_port; + std::string cache_size; + std::string mtu; + std::string content_lifetime; + bool manifest; + + void printParams() override { + std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "Origin address: " << origin_address << std::endl; + std::cout << "\t" + << "Origin port: " << origin_port << std::endl; + std::cout << "\t" + << "Producer cache size: " << cache_size << std::endl; + std::cout << "\t" + << "hICN MTU: " << mtu << std::endl; + std::cout << "\t" + << "Default content lifetime: " << content_lifetime + << std::endl; + std::cout << "\t" + << "Producer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + std::cout << "\t" + << "Use manifest: " << manifest << std::endl; + } + }; + + HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); + HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); + + void run() { sleep(1000000); } + + private: + void acceptTCPClient(asio::ip::tcp::socket&& socket); + + private: + std::vector> receivers_; +}; + +} // namespace transport \ No newline at end of file diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc new file mode 100644 index 000000000..2c281468f --- /dev/null +++ b/apps/http-proxy/src/http_session.cc @@ -0,0 +1,321 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "http_session.h" + +#include +#include + +#include + +#include "HTTP1.xMessageFastParser.h" + +namespace transport { + +HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) + : io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + endpoint_iterator_(resolver_.resolve({ip_address, port})), + timer_(io_service), + reverse_(reverse), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTING; + doConnect(); +} + +HTTPSession::HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) + : io_service_(socket.get_io_service()), + socket_(std::move(socket)), + resolver_(io_service_), + timer_(io_service_), + reverse_(reverse), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); +} + +HTTPSession::~HTTPSession() {} + +void HTTPSession::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { + asio::async_write( + socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { content_sent(); }); +} + +void HTTPSession::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { + io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { + bool write_in_progress = !write_msgs_.empty(); + write_msgs_.emplace_back(std::unique_ptr(buffer), + std::move(callback)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + TRANSPORT_LOGD("Tell the handle connect it has data to write"); + data_available_ = true; + } + }); +} + +void HTTPSession::close() { + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + // on_disconnect_callback_(); + } + } +} + +void HTTPSession::doWrite() { + auto &buffer = write_msgs_.front().first; + + asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_FALSE(!ec)) { + TRANSPORT_LOGD("Content successfully sent! %zu", + length); + write_msgs_.front().second(); + write_msgs_.pop_front(); + if (!write_msgs_.empty()) { + doWrite(); + } + } else { + TRANSPORT_LOGD("Content NOT sent!"); + } + }); +} // namespace transport + +void HTTPSession::handleRead(std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + content_length_ -= length; + const uint8_t *buffer = + asio::buffer_cast(input_buffer_.data()); + bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) + : !content_length_; + receive_callback_(buffer, input_buffer_.size(), is_last, false); + input_buffer_.consume(input_buffer_.size()); + + if (!content_length_) { + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } + } else { + auto to_read = + content_length_ >= buffer_size ? buffer_size : content_length_; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&HTTPSession::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); + } + } else if (ec == asio::error::eof) { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } +} + +void HTTPSession::doReadBody(std::size_t body_size, + std::size_t additional_bytes) { + auto bytes_to_read = + body_size > additional_bytes ? (body_size - additional_bytes) : 0; + + auto to_read = bytes_to_read >= buffer_size + ? (buffer_size - input_buffer_.size()) + : bytes_to_read; + + is_last_chunk_ = chunked_ && body_size == 5; + + if (to_read > 0) { + content_length_ = bytes_to_read; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&HTTPSession::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); + } else { + if (body_size) { + const uint8_t *buffer = + asio::buffer_cast(input_buffer_.data()); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); + input_buffer_.consume(body_size); + } + + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } + } +} + +void HTTPSession::doReadChunkedHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast(input_buffer_.data()); + std::size_t chunk_size = + std::stoul(reinterpret_cast(buffer), 0, 16) + 2 + + length; + auto additional_bytes = input_buffer_.size(); + doReadBody(chunk_size, additional_bytes); + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + +void HTTPSession::doReadHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast(input_buffer_.data()); + auto headers = + HTTPMessageFastParser::getHeaders(buffer, length, reverse_); + + // Try to get content length, if available + auto it = headers.find(HTTPMessageFastParser::content_length); + std::size_t size = 0; + if (it != headers.end()) { + size = std::stoull(it->second); + chunked_ = false; + } else { + it = headers.find(HTTPMessageFastParser::transfer_encoding); + if (it != headers.end() && + it->second.compare(HTTPMessageFastParser::chunked) == 0) { + chunked_ = true; + } + } + + receive_callback_(buffer, length, !size && !chunked_, true); + auto additional_bytes = input_buffer_.size() - length; + input_buffer_.consume(length); + + if (!chunked_) { + doReadBody(size, additional_bytes); + } else { + doReadChunkedHeader(); + } + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + +void HTTPSession::tryReconnection() { + if (on_connection_closed_callback_(socket_)) { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } + } +} + +void HTTPSession::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; + + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + + // on_reconnect_callback_(); + + doReadHeader(); + + if (data_available_ && !write_msgs_.empty()) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGD("Connection recovered!"); + } + + } else { + TRANSPORT_LOGE("Impossible to reconnect: %s", + ec.message().c_str()); + close(); + } + }); +} + +bool HTTPSession::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} + +void HTTPSession::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(10)); + timer_.async_wait( + std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1)); +} + +void HTTPSession::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the server running?\n"); + io_service_.stop(); + }); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/src/http_session.h new file mode 100644 index 000000000..20ebc5d7d --- /dev/null +++ b/apps/http-proxy/src/http_session.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#define ASIO_STANDALONE +#include +#include +#include + +namespace transport { + +using asio::ip::tcp; + +typedef std::function + ContentReceivedCallback; +typedef std::function OnConnectionClosed; +typedef std::function ContentSentCallback; +typedef std::deque< + std::pair, ContentSentCallback>> + BufferQueue; + +class HTTPSession { + static constexpr uint32_t buffer_size = 1024 * 512; + + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + + public: + HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool reverse = false); + + HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool reverse = true); + + ~HTTPSession(); + + void send(const uint8_t *buffer, std::size_t len, + ContentSentCallback &&content_sent = 0); + + void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); + + void close(); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_size, std::size_t additional_bytes); + + void doReadChunkedHeader(); + + void doWrite(); + + bool checkConnected(); + + private: + void handleRead(std::error_code ec, std::size_t length); + void tryReconnection(); + void startConnectionTimer(); + void handleDeadline(const std::error_code &ec); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + BufferQueue write_msgs_; + + asio::streambuf input_buffer_; + + bool reverse_; + bool is_reconnection_; + bool data_available_; + + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + + ContentReceivedCallback receive_callback_; + OnConnectionClosed on_connection_closed_callback_; + + // Connector state + ConnectorState state_; +}; + +} // namespace transport diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc new file mode 100644 index 000000000..3bd5525cc --- /dev/null +++ b/apps/http-proxy/src/icn_receiver.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icn_receiver.h" + +#include +#include +#include +#include + +#include +#include + +#include "HTTP1.xMessageFastParser.h" +#include "utils.h" + +namespace transport { + +AsyncConsumerProducer::AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, bool manifest) + : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), + io_service_(io_service), + external_io_service_(true), + producer_socket_(), + ip_address_(origin_address), + port_(origin_port), + cache_size_(std::stoul(cache_size)), + mtu_(std::stoul(mtu)), + request_counter_(0), + signals_(io_service_, SIGINT, SIGQUIT), + connector_(io_service_, ip_address_, port_, + std::bind(&AsyncConsumerProducer::publishContent, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4), + [this](asio::ip::tcp::socket& socket) -> bool { + std::queue empty; + std::swap(response_name_queue_, empty); + + return true; + }), + default_content_lifetime_(std::stoul(content_lifetime)) { + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: output buffer size has not been set."); + } + + ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: impossible to enable signatures."); + } + + ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: mtu has not been set."); + } + + producer_socket_.registerPrefix(prefix_); + + // Let the main thread to catch SIGINT and SIGQUIT + signals_.async_wait( + [this](const std::error_code& errorCode, int signal_number) { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + +void AsyncConsumerProducer::start() { + TRANSPORT_LOGD("Starting listening"); + doReceive(); +} + +void AsyncConsumerProducer::run() { + start(); + + if (!external_io_service_) { + io_service_.run(); + } +} + +void AsyncConsumerProducer::doReceive() { + producer_socket_.setSocketOption( + interface::ProducerCallbacksOptions::CACHE_MISS, + [this](interface::ProducerSocket& producer, + interface::Interest& interest) { + // core::Name n(interest.getWritableName(), true); + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + }); + + producer_socket_.connect(); +} + +void AsyncConsumerProducer::manageIncomingInterest( + core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { + auto seg = name.getSuffix(); + name.setSuffix(0); + auto _it = chunk_number_map_.find(name); + auto _end = chunk_number_map_.end(); + + if (_it != _end) { + if (_it->second.second) { + TRANSPORT_LOGD( + "Content is in production, interest will be satisfied shortly."); + return; + } + + if (seg >= _it->second.first) { + TRANSPORT_LOGD( + "Ignoring interest with name %s for a content object which does not " + "exist. (Request: %u, max: %u)", + name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + return; + } + } + + bool is_mpd = + HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); + + auto pair = chunk_number_map_.emplace(name, std::pair(0, 0)); + if (!pair.second) { + pair.first->second.first = 0; + } + + pair.first->second.second = true; + + response_name_queue_.emplace(std::move(name), + is_mpd ? 1000 : default_content_lifetime_); + + connector_.send(payload, [packet = std::move(packet)]() {}); +} + +void AsyncConsumerProducer::publishContent(const uint8_t* data, + std::size_t size, bool is_last, + bool headers) { + uint32_t start_suffix = 0; + + if (response_name_queue_.empty()) { + std::cerr << "Aborting due tue empty request queue" << std::endl; + abort(); + } + + interface::PublicationOptions& options = response_name_queue_.front(); + + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + options.getLifetime()); + + if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { + TRANSPORT_LOGD("Warning: content object lifetime has not been set."); + } + + const interface::Name& name = options.getName(); + + auto it = chunk_number_map_.find(name); + if (it == chunk_number_map_.end()) { + std::cerr << "Aborting due to response not found in ResposeInfo map." + << std::endl; + abort(); + } + + start_suffix = it->second.first; + + if (headers) { + request_counter_++; + } + + it->second.first += + producer_socket_.produce(name, data, size, is_last, start_suffix); + + if (is_last) { + it->second.second = false; + response_name_queue_.pop(); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/icn_receiver.h b/apps/http-proxy/src/icn_receiver.h new file mode 100644 index 000000000..31e9ae932 --- /dev/null +++ b/apps/http-proxy/src/icn_receiver.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "http_session.h" + +namespace transport { + +class AsyncConsumerProducer { + using SegmentProductionPair = std::pair; + using ResponseInfoMap = std::unordered_map; + using RequestQueue = std::queue; + + public: + explicit AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, + bool manifest); + + explicit AsyncConsumerProducer( + const std::string& prefix, const std::string& first_ipv6_word, + const std::string& origin_address, const std::string& origin_port, + const std::string& cache_size, const std::string& mtu, + const std::string& content_lifetime, bool manifest) + : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, + origin_address, origin_port, cache_size, mtu, + content_lifetime, manifest) { + external_io_service_ = false; + } + + void run(); + + private: + void start(); + + void doSend(); + + void doReceive(); + + void publishContent(const uint8_t* data, std::size_t size, + bool is_last = true, bool headers = false); + + void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, + utils::MemBuf* payload); + + core::Prefix prefix_; + asio::io_service& io_service_; + asio::io_service internal_io_service_; + bool external_io_service_; + interface::ProducerSocket producer_socket_; + + std::string ip_address_; + std::string port_; + uint32_t cache_size_; + uint32_t mtu_; + + uint64_t request_counter_; + asio::signal_set signals_; + + // std::unordered_map> + // connection_map_; + HTTPSession connector_; + + unsigned long default_content_lifetime_; + + // ResponseInfoMap --> max_seq_number + bool indicating whether request is in + // production + ResponseInfoMap chunk_number_map_; + RequestQueue response_name_queue_; +}; + +} // namespace transport diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/src/utils.h new file mode 100644 index 000000000..d87c796d0 --- /dev/null +++ b/apps/http-proxy/src/utils.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#pragma once + +TRANSPORT_ALWAYS_INLINE std::string generatePrefix( + const std::string& prefix_url, const std::string& first_ipv6_word) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << first_ipv6_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::"; + + return stream.str(); +} \ No newline at end of file -- cgit 1.2.3-korg