diff options
-rw-r--r-- | apps/http-proxy/CMakeLists.txt | 10 | ||||
-rw-r--r-- | apps/http-proxy/main.cc | 108 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 25 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 7 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.cc | 243 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.h | 180 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.cc (renamed from apps/http-proxy/src/ATSConnector.cc) | 131 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.h (renamed from apps/http-proxy/src/ATSConnector.h) | 22 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc (renamed from apps/http-proxy/src/IcnReceiver.cc) | 70 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.h (renamed from apps/http-proxy/src/IcnReceiver.h) | 37 | ||||
-rw-r--r-- | apps/http-proxy/src/utils.h | 51 |
11 files changed, 729 insertions, 155 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index d6681097c..13839dbf7 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -31,14 +31,16 @@ include_directories( ) set(LIB_SOURCE_FILES - src/ATSConnector.cc + src/http_session.cc + src/http_proxy.cc src/HTTP1.xMessageFastParser.cc - src/IcnReceiver.cc + src/icn_receiver.cc ) set(LIB_SERVER_HEADER_FILES - src/IcnReceiver.h - src/ATSConnector.h + src/icn_receiver.h + src/http_session.h + src/http_proxy.h src/HTTP1.xMessageFastParser.h ) diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc index 20655071c..8d407ba4c 100644 --- a/apps/http-proxy/main.cc +++ b/apps/http-proxy/main.cc @@ -13,12 +13,11 @@ * limitations under the License. */ -#include "src/IcnReceiver.h" +#include "src/http_proxy.h" using namespace transport; int usage(char* program) { - std::cerr << "ICN Plugin not loaded!" << std::endl; std::cerr << "USAGE: " << program << "\n" << "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] " "-p [SERVER_PORT] -c [CACHE_SIZE] -m [MTU] -l [DEFAULT_LIFETIME " @@ -27,39 +26,99 @@ int usage(char* program) { return -1; } +struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams { + void printParams() override { + if (client) { + HTTPProxy::ClientParams::printParams(); + } else if (server) { + HTTPProxy::ServerParams::printParams(); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + + std::cout << "\t" + << "N Threads: " << n_thread << std::endl; + } + + HTTPProxy instantiateProxyAsValue() { + if (client) { + HTTPProxy::ClientParams* p = dynamic_cast<HTTPProxy::ClientParams*>(this); + return transport::HTTPProxy(*p, n_thread); + } else if (server) { + HTTPProxy::ServerParams* p = dynamic_cast<HTTPProxy::ServerParams*>(this); + return transport::HTTPProxy(*p, n_thread); + } else { + throw std::runtime_error( + "Proxy configured as client and server at the same time."); + } + } + + bool client = false; + bool server = false; + std::uint16_t n_thread = 1; +}; + int main(int argc, char** argv) { - std::string prefix("http://hicn-http-proxy"); - std::string ip_address("127.0.0.1"); - std::string port("80"); - std::string cache_size("50000"); - std::string mtu("1500"); - std::string first_ipv6_word("b001"); - std::string default_content_lifetime("7200"); // seconds - bool manifest = false; + Params params; + + params.prefix = "http://hicn-http-proxy"; + params.origin_address = "127.0.0.1"; + params.origin_port = "80"; + params.cache_size = "50000"; + params.mtu = "1500"; + params.first_ipv6_word = "b001"; + params.content_lifetime = "7200;"; // seconds + params.manifest = false; + params.tcp_listen_port = 8080; int opt; - while ((opt = getopt(argc, argv, "a:p:c:m:P:l:M")) != -1) { + while ((opt = getopt(argc, argv, "CSa:p:c:m:P:l:ML:t:")) != -1) { switch (opt) { + case 'C': + if (params.server) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.client = true; + break; + case 'S': + if (params.client) { + std::cerr << "Cannot be both client and server (both -C anc -S " + "options specified.)." + << std::endl; + return usage(argv[0]); + } + params.server = true; + break; case 'a': - ip_address = optarg; + params.origin_address = optarg; break; case 'p': - port = optarg; + params.origin_port = optarg; break; case 'c': - cache_size = optarg; + params.cache_size = optarg; break; case 'm': - mtu = optarg; + params.mtu = optarg; break; case 'P': - first_ipv6_word = optarg; + params.first_ipv6_word = optarg; break; case 'l': - default_content_lifetime = optarg; + params.content_lifetime = optarg; + break; + case 'L': + params.tcp_listen_port = std::stoul(optarg); break; case 'M': - manifest = true; + params.manifest = true; + break; + case 't': + params.n_thread = std::stoul(optarg); break; case 'h': default: @@ -69,18 +128,13 @@ int main(int argc, char** argv) { } if (argv[optind] == 0) { - std::cerr << "Using default prefix " << prefix << std::endl; + std::cerr << "Using default prefix " << params.prefix << std::endl; } else { - prefix = argv[optind]; + params.prefix = argv[optind]; } - std::cout << "Connecting to " << ip_address << " port " << port - << " Cache size " << cache_size << " Prefix " << prefix << " MTU " - << mtu << " IPv6 first word " << first_ipv6_word << std::endl; - transport::AsyncConsumerProducer proxy( - prefix, ip_address, port, cache_size, mtu, first_ipv6_word, - std::stoul(default_content_lifetime) * 1000, manifest); - + params.printParams(); + transport::HTTPProxy proxy = params.instantiateProxyAsValue(); proxy.run(); return 0; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index 729eb3aeb..d1271ebdf 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -15,6 +15,7 @@ #include "HTTP1.xMessageFastParser.h" +#include <hicn/transport/http/request.h> #include <hicn/transport/http/response.h> #include <experimental/algorithm> @@ -31,15 +32,27 @@ std::string HTTPMessageFastParser::connection = "Connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, - std::size_t length) { + std::size_t length, + bool request) { HTTPHeaders ret; std::string http_version; - std::string status_code; - std::string status_string; - if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, - status_code, status_string)) { - return ret; + if (request) { + std::string method; + std::string url; + + if (transport::http::HTTPRequest::parseHeaders(headers, length, ret, + http_version, method, url)) { + return ret; + } + } else { + std::string status_code; + std::string status_string; + + if (transport::http::HTTPResponse::parseHeaders( + headers, length, ret, http_version, status_code, status_string)) { + return ret; + } } throw std::runtime_error("Error parsing response headers."); diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 79dbce19d..0ad38b9b6 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -15,16 +15,17 @@ #pragma once +#include <hicn/transport/http/message.h> + #include <algorithm> #include <string> -#include <hicn/transport/http/message.h> - using transport::http::HTTPHeaders; class HTTPMessageFastParser { public: - static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); + static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length, + bool request); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc new file mode 100644 index 000000000..18e9bf727 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.cc @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "http_proxy.h" + +#include <hicn/transport/core/interest.h> +#include <hicn/transport/utils/log.h> + +#include "utils.h" + +namespace transport { + +using core::Interest; +using core::Name; +using interface::ConsumerCallbacksOptions; +using interface::ConsumerInterestCallback; +using interface::ConsumerSocket; +using interface::TransportProtocolAlgorithms; + +class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { + public: + HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, + utils::EventThread& thread, + const std::string& prefix, + const std::string& ipv6_first_word) + : tcp_receiver_(tcp_receiver), + thread_(thread), + prefix_hash_(generatePrefix(prefix, ipv6_first_word)), + consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), + session_(nullptr), + current_size_(0) { + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.connect(); + } + + void setHttpSession(asio::ip::tcp::socket&& socket) { + session_ = std::make_unique<HTTPSession>( + std::move(socket), + std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4), + [this](asio::ip::tcp::socket& socket) -> bool { + try { + std::string remote_address = + socket.remote_endpoint().address().to_string(); + std::uint16_t remote_port = socket.remote_endpoint().port(); + TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), + remote_port); + } catch (std::system_error& e) { + // Do nothing + } + + consumer_.stop(); + tcp_receiver_.onClientDisconnect(this); + return false; + }); + + current_size_ = 0; + } + + private: + void consumeNextRequest() { + if (request_buffer_queue_.size() == 0) { + // No additiona requests to process. + return; + } + + auto& buffer = request_buffer_queue_.front(); + uint64_t request_hash = + utils::hash::fnv64_buf(buffer->data(), buffer->length()); + + std::stringstream name; + name << prefix_hash_.substr(0, prefix_hash_.length() - 2); + + for (uint16_t* word = (uint16_t*)&request_hash; + std::size_t(word) < + (std::size_t(&request_hash) + sizeof(request_hash)); + word++) { + name << ":" << std::hex << *word; + } + + name << "|0"; + + // Non blocking consume :) + consumer_.consume(Name(name.str())); + } + + // tcp callbacks + + void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last, + bool headers) { + if (headers) { + // Add the request to the request queue + tmp_buffer_ = utils::MemBuf::copyBuffer(data, size); + } else { + // Append payload chunk to last request added. Here we are assuming + // HTTP/1.1. + tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size)); + } + + current_size_ += size; + + if (is_last) { + TRANSPORT_LOGD( + "Request received: %s", + std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length()) + .c_str()); + if (current_size_ < 1400) { + request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); + } else { + TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", + current_size_); + session_->close(); + current_size_ = 0; + return; + } + + if (!consumer_.isRunning()) { + TRANSPORT_LOGD( + "Consumer stopped, triggering consume from TCP session handler.."); + consumeNextRequest(); + } + + current_size_ = 0; + } + } + + // hicn callbacks + + void processLeavingInterest(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front()->clone()); + } + } + + bool isBufferMovable() noexcept { return true; } + void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} + void readDataAvailable(size_t length) noexcept {} + size_t maxBufferSize() const { return 64 * 1024; } + + void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { + // Response received. Send it back to client + auto _buffer = buffer.release(); + TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + session_->send(_buffer, []() {}); + } + + void readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + session_->close(); + } + + void readSuccess(std::size_t total_size) noexcept { + request_buffer_queue_.pop_front(); + consumeNextRequest(); + } + + private: + TcpReceiver& tcp_receiver_; + utils::EventThread& thread_; + std::string prefix_hash_; + ConsumerSocket consumer_; + std::unique_ptr<HTTPSession> session_; + std::deque<std::unique_ptr<utils::MemBuf>> request_buffer_queue_; + std::unique_ptr<utils::MemBuf> tmp_buffer_; + std::size_t current_size_; +}; + +TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word) + : Receiver(), + listener_(thread_.getIoService(), port, + std::bind(&TcpReceiver::onNewConnection, this, + std::placeholders::_1)), + prefix_(prefix), + ipv6_first_word_(ipv6_first_word) { + for (int i = 0; i < 10; i++) { + http_clients_.emplace_back(new HTTPClientConnectionCallback( + *this, thread_, prefix, ipv6_first_word)); + } +} + +void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + http_clients_.emplace_front(client); + used_http_clients_.erase(client); +} + +void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { + if (http_clients_.size() == 0) { + // Create new HTTPClientConnectionCallback + TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); + http_clients_.emplace_back(new HTTPClientConnectionCallback( + *this, thread_, prefix_, ipv6_first_word_)); + } + + // Get new HTTPClientConnectionCallback + HTTPClientConnectionCallback* c = http_clients_.front(); + http_clients_.pop_front(); + + // Set http session + c->setHttpSession(std::move(socket)); + + // Move it to used clients + used_http_clients_.insert(c); +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) { + for (uint16_t i = 0; i < n_thread; i++) { + // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); + receivers_.emplace_back(std::make_unique<TcpReceiver>( + params.tcp_listen_port, params.prefix, params.first_ipv6_word)); + } +} + +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) { + for (uint16_t i = 0; i < n_thread; i++) { + receivers_.emplace_back(std::make_unique<IcnReceiver>( + params.prefix, params.first_ipv6_word, params.origin_address, + params.origin_port, params.cache_size, params.mtu, + params.content_lifetime, params.manifest)); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h new file mode 100644 index 000000000..6fa394e28 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.h @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/utils/event_thread.h> + +#include "http_session.h" +#include "icn_receiver.h" + +#define ASIO_STANDALONE +#include <asio.hpp> +#include <asio/version.hpp> +#include <unordered_set> + +class TcpListener { + public: + using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>; + + TcpListener(asio::io_service& io_service, short port, AcceptCallback callback) + : acceptor_(io_service, + asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), +#if ((ASIO_VERSION / 100 % 1000) < 12) + socket_(io_service), +#endif + callback_(callback) { + do_accept(); + } + + private: + void do_accept() { +#if ((ASIO_VERSION / 100 % 1000) >= 12) + acceptor_.async_accept( + [this](std::error_code ec, asio::ip::tcp::socket socket) { +#else + acceptor_.async_accept(socket_, [this](std::error_code ec) { + auto socket = std::move(socket_); +#endif + if (!ec) { + callback_(std::move(socket)); + } + + do_accept(); + }); + } + + asio::ip::tcp::acceptor acceptor_; +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::tcp::socket socket_; +#endif + AcceptCallback callback_; +}; + +namespace transport { + +class HTTPClientConnectionCallback; + +class Receiver { + public: + Receiver() : thread_() {} + + protected: + utils::EventThread thread_; +}; + +class TcpReceiver : public Receiver { + friend class HTTPClientConnectionCallback; + + public: + TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word); + + private: + void onNewConnection(asio::ip::tcp::socket&& socket); + void onClientDisconnect(HTTPClientConnectionCallback* client); + + TcpListener listener_; + std::string prefix_; + std::string ipv6_first_word_; + std::deque<HTTPClientConnectionCallback*> http_clients_; + std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; +}; + +class IcnReceiver : public Receiver { + public: + template <typename... Args> + IcnReceiver(Args&&... args) + : Receiver(), + icn_consum_producer_(thread_.getIoService(), + std::forward<Args>(args)...) { + icn_consum_producer_.run(); + } + + private: + AsyncConsumerProducer icn_consum_producer_; +}; + +class HTTPProxy { + public: + enum Server { CREATE }; + enum Client { WRAP_BUFFER }; + + struct CommonParams { + std::string prefix; + std::string first_ipv6_word; + + virtual void printParams() { std::cout << "Parameters: " << std::endl; }; + }; + + struct ClientParams : virtual CommonParams { + short tcp_listen_port; + void printParams() override { + std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "HTTP listen port: " << tcp_listen_port << std::endl; + std::cout << "\t" + << "Consumer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + } + }; + + struct ServerParams : virtual CommonParams { + std::string origin_address; + std::string origin_port; + std::string cache_size; + std::string mtu; + std::string content_lifetime; + bool manifest; + + void printParams() override { + std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl; + CommonParams::printParams(); + std::cout << "\t" + << "Origin address: " << origin_address << std::endl; + std::cout << "\t" + << "Origin port: " << origin_port << std::endl; + std::cout << "\t" + << "Producer cache size: " << cache_size << std::endl; + std::cout << "\t" + << "hICN MTU: " << mtu << std::endl; + std::cout << "\t" + << "Default content lifetime: " << content_lifetime + << std::endl; + std::cout << "\t" + << "Producer Prefix: " << prefix << std::endl; + std::cout << "\t" + << "Prefix first word: " << first_ipv6_word << std::endl; + std::cout << "\t" + << "Use manifest: " << manifest << std::endl; + } + }; + + HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1); + HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1); + + void run() { sleep(1000000); } + + private: + void acceptTCPClient(asio::ip::tcp::socket&& socket); + + private: + std::vector<std::unique_ptr<Receiver>> receivers_; +}; + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc index a9b889941..2c281468f 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,48 +13,75 @@ * limitations under the License. */ -#include "ATSConnector.h" -#include "HTTP1.xMessageFastParser.h" +#include "http_session.h" #include <hicn/transport/utils/branch_prediction.h> #include <hicn/transport/utils/log.h> + #include <iostream> +#include "HTTP1.xMessageFastParser.h" + namespace transport { -ATSConnector::ATSConnector(asio::io_service &io_service, - std::string &ip_address, std::string &port, - ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback) +HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), + reverse_(reverse), is_reconnection_(false), data_available_(false), content_length_(0), is_last_chunk_(false), chunked_(false), receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback) { + on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); state_ = ConnectorState::CONNECTING; doConnect(); } -ATSConnector::~ATSConnector() {} +HTTPSession::HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) + : io_service_(socket.get_io_service()), + socket_(std::move(socket)), + resolver_(io_service_), + timer_(io_service_), + reverse_(reverse), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); +} + +HTTPSession::~HTTPSession() {} -void ATSConnector::send(const uint8_t *packet, std::size_t len, - ContentSentCallback &&content_sent) { +void HTTPSession::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { asio::async_write( socket_, asio::buffer(packet, len), [content_sent = std::move(content_sent)]( std::error_code ec, std::size_t /*length*/) { content_sent(); }); } -void ATSConnector::send(utils::MemBuf *buffer, - ContentSentCallback &&content_sent) { +void HTTPSession::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer), @@ -70,24 +97,25 @@ void ATSConnector::send(utils::MemBuf *buffer, }); } -void ATSConnector::close() { +void HTTPSession::close() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); // on_disconnect_callback_(); } } } -void ATSConnector::doWrite() { +void HTTPSession::doWrite() { auto &buffer = write_msgs_.front().first; asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent!"); + TRANSPORT_LOGD("Content successfully sent! %zu", + length); write_msgs_.front().second(); write_msgs_.pop_front(); if (!write_msgs_.empty()) { @@ -99,12 +127,14 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length) { +void HTTPSession::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !content_length_, false); + bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) + : !content_length_; + receive_callback_(buffer, input_buffer_.size(), is_last, false); input_buffer_.consume(input_buffer_.size()); if (!content_length_) { @@ -117,7 +147,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { auto to_read = content_length_ >= buffer_size ? buffer_size : content_length_; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { @@ -126,8 +156,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { } } -void ATSConnector::doReadBody(std::size_t body_size, - std::size_t additional_bytes) { +void HTTPSession::doReadBody(std::size_t body_size, + std::size_t additional_bytes) { auto bytes_to_read = body_size > additional_bytes ? (body_size - additional_bytes) : 0; @@ -140,14 +170,16 @@ void ATSConnector::doReadBody(std::size_t body_size, if (to_read > 0) { content_length_ = bytes_to_read; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } else { - const uint8_t *buffer = - asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, - false); - input_buffer_.consume(body_size); + if (body_size) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); + input_buffer_.consume(body_size); + } if (!chunked_ || is_last_chunk_) { doReadHeader(); @@ -157,7 +189,7 @@ void ATSConnector::doReadBody(std::size_t body_size, } } -void ATSConnector::doReadChunkedHeader() { +void HTTPSession::doReadChunkedHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n", [this](std::error_code ec, std::size_t length) { @@ -176,14 +208,15 @@ void ATSConnector::doReadChunkedHeader() { }); } -void ATSConnector::doReadHeader() { +void HTTPSession::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - auto headers = HTTPMessageFastParser::getHeaders(buffer, length); + auto headers = + HTTPMessageFastParser::getHeaders(buffer, length, reverse_); // Try to get content length, if available auto it = headers.find(HTTPMessageFastParser::content_length); @@ -215,23 +248,25 @@ void ATSConnector::doReadHeader() { }); } -void ATSConnector::tryReconnection() { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); - if (state_ == ConnectorState::CONNECTED) { - state_ = ConnectorState::CONNECTING; - is_reconnection_ = true; - io_service_.post([this]() { - if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); - }); +void HTTPSession::tryReconnection() { + if (on_connection_closed_callback_(socket_)) { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } } } -void ATSConnector::doConnect() { +void HTTPSession::doConnect() { asio::async_connect(socket_, endpoint_iterator_, [this](std::error_code ec, tcp::resolver::iterator) { if (!ec) { @@ -263,17 +298,17 @@ void ATSConnector::doConnect() { }); } -bool ATSConnector::checkConnected() { +bool HTTPSession::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void ATSConnector::startConnectionTimer() { +void HTTPSession::startConnectionTimer() { timer_.expires_from_now(std::chrono::seconds(10)); timer_.async_wait( - std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1)); + std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1)); } -void ATSConnector::handleDeadline(const std::error_code &ec) { +void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/http_session.h index 8d91b7b7b..20ebc5d7d 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/http_session.h @@ -29,13 +29,13 @@ using asio::ip::tcp; typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, bool headers)> ContentReceivedCallback; -typedef std::function<void()> OnReconnect; +typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed; typedef std::function<void()> ContentSentCallback; typedef std::deque< std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> BufferQueue; -class ATSConnector { +class HTTPSession { static constexpr uint32_t buffer_size = 1024 * 512; enum class ConnectorState { @@ -45,11 +45,15 @@ class ATSConnector { }; public: - ATSConnector(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback); + HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool reverse = false); - ~ATSConnector(); + HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_reconnect_callback, bool reverse = true); + + ~HTTPSession(); void send(const uint8_t *buffer, std::size_t len, ContentSentCallback &&content_sent = 0); @@ -65,9 +69,6 @@ class ATSConnector { void doReadBody(std::size_t body_size, std::size_t additional_bytes); - // void handleReadChunked(std::error_code ec, std::size_t length, - // std::size_t size); - void doReadChunkedHeader(); void doWrite(); @@ -90,6 +91,7 @@ class ATSConnector { asio::streambuf input_buffer_; + bool reverse_; bool is_reconnection_; bool data_available_; @@ -100,7 +102,7 @@ class ATSConnector { bool chunked_; ContentReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; + OnConnectionClosed on_connection_closed_callback_; // Connector state ConnectorState state_; diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc index 24b0eb5dc..3bd5525cc 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,8 +13,7 @@ * limitations under the License. */ -#include "IcnReceiver.h" -#include "HTTP1.xMessageFastParser.h" +#include "icn_receiver.h" #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> @@ -24,45 +23,22 @@ #include <functional> #include <memory> -namespace transport { - -core::Prefix generatePrefix(const std::string& prefix_url, - std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::0"; +#include "HTTP1.xMessageFastParser.h" +#include "utils.h" - return core::Prefix(stream.str(), 64); -} +namespace transport { AsyncConsumerProducer::AsyncConsumerProducer( - const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, std::string& first_ipv6_word, - unsigned long default_lifetime, bool manifest) - : prefix_(generatePrefix(prefix, first_ipv6_word)), + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, bool manifest) + : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), + io_service_(io_service), + external_io_service_(true), producer_socket_(), - ip_address_(ip_address), - port_(port), + ip_address_(origin_address), + port_(origin_port), cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), @@ -71,11 +47,13 @@ AsyncConsumerProducer::AsyncConsumerProducer( std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - [this]() { + [this](asio::ip::tcp::socket& socket) -> bool { std::queue<interface::PublicationOptions> empty; std::swap(response_name_queue_, empty); + + return true; }), - default_content_lifetime_(default_lifetime) { + default_content_lifetime_(std::stoul(content_lifetime)) { int ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); @@ -116,7 +94,10 @@ void AsyncConsumerProducer::start() { void AsyncConsumerProducer::run() { start(); - io_service_.run(); + + if (!external_io_service_) { + io_service_.run(); + } } void AsyncConsumerProducer::doReceive() { @@ -141,16 +122,15 @@ void AsyncConsumerProducer::manageIncomingInterest( auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); - std::cout << "Received interest " << seg << std::endl; - if (_it != _end) { if (_it->second.second) { - // Content is in production + TRANSPORT_LOGD( + "Content is in production, interest will be satisfied shortly."); return; } if (seg >= _it->second.first) { - TRANSPORT_LOGI( + TRANSPORT_LOGD( "Ignoring interest with name %s for a content object which does not " "exist. (Request: %u, max: %u)", name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); @@ -158,8 +138,6 @@ void AsyncConsumerProducer::manageIncomingInterest( } } - std::cout << "Received interest " << seg << std::endl; - bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/icn_receiver.h index 9d0ab5172..31e9ae932 100644 --- a/apps/http-proxy/src/IcnReceiver.h +++ b/apps/http-proxy/src/icn_receiver.h @@ -13,18 +13,19 @@ * limitations under the License. */ -#include "ATSConnector.h" - #include <hicn/transport/core/prefix.h> #include <hicn/transport/interfaces/publication_options.h> #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/spinlock.h> +#include <asio.hpp> #include <cassert> #include <cstring> #include <queue> #include <utility> +#include "http_session.h" + namespace transport { class AsyncConsumerProducer { @@ -33,17 +34,29 @@ class AsyncConsumerProducer { using RequestQueue = std::queue<interface::PublicationOptions>; public: - explicit AsyncConsumerProducer(const std::string& prefix, - std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, - std::string& first_ipv6_word, - unsigned long default_content_lifetime, bool manifest); - - void start(); + explicit AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, + bool manifest); + + explicit AsyncConsumerProducer( + const std::string& prefix, const std::string& first_ipv6_word, + const std::string& origin_address, const std::string& origin_port, + const std::string& cache_size, const std::string& mtu, + const std::string& content_lifetime, bool manifest) + : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word, + origin_address, origin_port, cache_size, mtu, + content_lifetime, manifest) { + external_io_service_ = false; + } void run(); private: + void start(); + void doSend(); void doReceive(); @@ -55,7 +68,9 @@ class AsyncConsumerProducer { utils::MemBuf* payload); core::Prefix prefix_; - asio::io_service io_service_; + asio::io_service& io_service_; + asio::io_service internal_io_service_; + bool external_io_service_; interface::ProducerSocket producer_socket_; std::string ip_address_; @@ -68,7 +83,7 @@ class AsyncConsumerProducer { // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> // connection_map_; - ATSConnector connector_; + HTTPSession connector_; unsigned long default_content_lifetime_; diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/src/utils.h new file mode 100644 index 000000000..d87c796d0 --- /dev/null +++ b/apps/http-proxy/src/utils.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/utils/hash.h> + +#include <sstream> +#include <string> + +#pragma once + +TRANSPORT_ALWAYS_INLINE std::string generatePrefix( + const std::string& prefix_url, const std::string& first_ipv6_word) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << first_ipv6_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::"; + + return stream.str(); +}
\ No newline at end of file |