summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/http-proxy/CMakeLists.txt10
-rw-r--r--apps/http-proxy/main.cc108
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.cc25
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h7
-rw-r--r--apps/http-proxy/src/http_proxy.cc243
-rw-r--r--apps/http-proxy/src/http_proxy.h180
-rw-r--r--apps/http-proxy/src/http_session.cc (renamed from apps/http-proxy/src/ATSConnector.cc)131
-rw-r--r--apps/http-proxy/src/http_session.h (renamed from apps/http-proxy/src/ATSConnector.h)22
-rw-r--r--apps/http-proxy/src/icn_receiver.cc (renamed from apps/http-proxy/src/IcnReceiver.cc)70
-rw-r--r--apps/http-proxy/src/icn_receiver.h (renamed from apps/http-proxy/src/IcnReceiver.h)37
-rw-r--r--apps/http-proxy/src/utils.h51
11 files changed, 729 insertions, 155 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
index d6681097c..13839dbf7 100644
--- a/apps/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/CMakeLists.txt
@@ -31,14 +31,16 @@ include_directories(
)
set(LIB_SOURCE_FILES
- src/ATSConnector.cc
+ src/http_session.cc
+ src/http_proxy.cc
src/HTTP1.xMessageFastParser.cc
- src/IcnReceiver.cc
+ src/icn_receiver.cc
)
set(LIB_SERVER_HEADER_FILES
- src/IcnReceiver.h
- src/ATSConnector.h
+ src/icn_receiver.h
+ src/http_session.h
+ src/http_proxy.h
src/HTTP1.xMessageFastParser.h
)
diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc
index 20655071c..8d407ba4c 100644
--- a/apps/http-proxy/main.cc
+++ b/apps/http-proxy/main.cc
@@ -13,12 +13,11 @@
* limitations under the License.
*/
-#include "src/IcnReceiver.h"
+#include "src/http_proxy.h"
using namespace transport;
int usage(char* program) {
- std::cerr << "ICN Plugin not loaded!" << std::endl;
std::cerr << "USAGE: " << program << "\n"
<< "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] "
"-p [SERVER_PORT] -c [CACHE_SIZE] -m [MTU] -l [DEFAULT_LIFETIME "
@@ -27,39 +26,99 @@ int usage(char* program) {
return -1;
}
+struct Params : HTTPProxy::ClientParams, HTTPProxy::ServerParams {
+ void printParams() override {
+ if (client) {
+ HTTPProxy::ClientParams::printParams();
+ } else if (server) {
+ HTTPProxy::ServerParams::printParams();
+ } else {
+ throw std::runtime_error(
+ "Proxy configured as client and server at the same time.");
+ }
+
+ std::cout << "\t"
+ << "N Threads: " << n_thread << std::endl;
+ }
+
+ HTTPProxy instantiateProxyAsValue() {
+ if (client) {
+ HTTPProxy::ClientParams* p = dynamic_cast<HTTPProxy::ClientParams*>(this);
+ return transport::HTTPProxy(*p, n_thread);
+ } else if (server) {
+ HTTPProxy::ServerParams* p = dynamic_cast<HTTPProxy::ServerParams*>(this);
+ return transport::HTTPProxy(*p, n_thread);
+ } else {
+ throw std::runtime_error(
+ "Proxy configured as client and server at the same time.");
+ }
+ }
+
+ bool client = false;
+ bool server = false;
+ std::uint16_t n_thread = 1;
+};
+
int main(int argc, char** argv) {
- std::string prefix("http://hicn-http-proxy");
- std::string ip_address("127.0.0.1");
- std::string port("80");
- std::string cache_size("50000");
- std::string mtu("1500");
- std::string first_ipv6_word("b001");
- std::string default_content_lifetime("7200"); // seconds
- bool manifest = false;
+ Params params;
+
+ params.prefix = "http://hicn-http-proxy";
+ params.origin_address = "127.0.0.1";
+ params.origin_port = "80";
+ params.cache_size = "50000";
+ params.mtu = "1500";
+ params.first_ipv6_word = "b001";
+ params.content_lifetime = "7200;"; // seconds
+ params.manifest = false;
+ params.tcp_listen_port = 8080;
int opt;
- while ((opt = getopt(argc, argv, "a:p:c:m:P:l:M")) != -1) {
+ while ((opt = getopt(argc, argv, "CSa:p:c:m:P:l:ML:t:")) != -1) {
switch (opt) {
+ case 'C':
+ if (params.server) {
+ std::cerr << "Cannot be both client and server (both -C anc -S "
+ "options specified.)."
+ << std::endl;
+ return usage(argv[0]);
+ }
+ params.client = true;
+ break;
+ case 'S':
+ if (params.client) {
+ std::cerr << "Cannot be both client and server (both -C anc -S "
+ "options specified.)."
+ << std::endl;
+ return usage(argv[0]);
+ }
+ params.server = true;
+ break;
case 'a':
- ip_address = optarg;
+ params.origin_address = optarg;
break;
case 'p':
- port = optarg;
+ params.origin_port = optarg;
break;
case 'c':
- cache_size = optarg;
+ params.cache_size = optarg;
break;
case 'm':
- mtu = optarg;
+ params.mtu = optarg;
break;
case 'P':
- first_ipv6_word = optarg;
+ params.first_ipv6_word = optarg;
break;
case 'l':
- default_content_lifetime = optarg;
+ params.content_lifetime = optarg;
+ break;
+ case 'L':
+ params.tcp_listen_port = std::stoul(optarg);
break;
case 'M':
- manifest = true;
+ params.manifest = true;
+ break;
+ case 't':
+ params.n_thread = std::stoul(optarg);
break;
case 'h':
default:
@@ -69,18 +128,13 @@ int main(int argc, char** argv) {
}
if (argv[optind] == 0) {
- std::cerr << "Using default prefix " << prefix << std::endl;
+ std::cerr << "Using default prefix " << params.prefix << std::endl;
} else {
- prefix = argv[optind];
+ params.prefix = argv[optind];
}
- std::cout << "Connecting to " << ip_address << " port " << port
- << " Cache size " << cache_size << " Prefix " << prefix << " MTU "
- << mtu << " IPv6 first word " << first_ipv6_word << std::endl;
- transport::AsyncConsumerProducer proxy(
- prefix, ip_address, port, cache_size, mtu, first_ipv6_word,
- std::stoul(default_content_lifetime) * 1000, manifest);
-
+ params.printParams();
+ transport::HTTPProxy proxy = params.instantiateProxyAsValue();
proxy.run();
return 0;
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
index 729eb3aeb..d1271ebdf 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
@@ -15,6 +15,7 @@
#include "HTTP1.xMessageFastParser.h"
+#include <hicn/transport/http/request.h>
#include <hicn/transport/http/response.h>
#include <experimental/algorithm>
@@ -31,15 +32,27 @@ std::string HTTPMessageFastParser::connection = "Connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
- std::size_t length) {
+ std::size_t length,
+ bool request) {
HTTPHeaders ret;
std::string http_version;
- std::string status_code;
- std::string status_string;
- if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version,
- status_code, status_string)) {
- return ret;
+ if (request) {
+ std::string method;
+ std::string url;
+
+ if (transport::http::HTTPRequest::parseHeaders(headers, length, ret,
+ http_version, method, url)) {
+ return ret;
+ }
+ } else {
+ std::string status_code;
+ std::string status_string;
+
+ if (transport::http::HTTPResponse::parseHeaders(
+ headers, length, ret, http_version, status_code, status_string)) {
+ return ret;
+ }
}
throw std::runtime_error("Error parsing response headers.");
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
index 79dbce19d..0ad38b9b6 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
@@ -15,16 +15,17 @@
#pragma once
+#include <hicn/transport/http/message.h>
+
#include <algorithm>
#include <string>
-#include <hicn/transport/http/message.h>
-
using transport::http::HTTPHeaders;
class HTTPMessageFastParser {
public:
- static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length);
+ static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length,
+ bool request);
static std::size_t hasBody(const uint8_t* headers, std::size_t length);
static bool isMpdRequest(const uint8_t* headers, std::size_t length);
static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
new file mode 100644
index 000000000..18e9bf727
--- /dev/null
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -0,0 +1,243 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "http_proxy.h"
+
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/log.h>
+
+#include "utils.h"
+
+namespace transport {
+
+using core::Interest;
+using core::Name;
+using interface::ConsumerCallbacksOptions;
+using interface::ConsumerInterestCallback;
+using interface::ConsumerSocket;
+using interface::TransportProtocolAlgorithms;
+
+class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
+ public:
+ HTTPClientConnectionCallback(TcpReceiver& tcp_receiver,
+ utils::EventThread& thread,
+ const std::string& prefix,
+ const std::string& ipv6_first_word)
+ : tcp_receiver_(tcp_receiver),
+ thread_(thread),
+ prefix_hash_(generatePrefix(prefix, ipv6_first_word)),
+ consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()),
+ session_(nullptr),
+ current_size_(0) {
+ consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ consumer_.connect();
+ }
+
+ void setHttpSession(asio::ip::tcp::socket&& socket) {
+ session_ = std::make_unique<HTTPSession>(
+ std::move(socket),
+ std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, std::placeholders::_4),
+ [this](asio::ip::tcp::socket& socket) -> bool {
+ try {
+ std::string remote_address =
+ socket.remote_endpoint().address().to_string();
+ std::uint16_t remote_port = socket.remote_endpoint().port();
+ TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(),
+ remote_port);
+ } catch (std::system_error& e) {
+ // Do nothing
+ }
+
+ consumer_.stop();
+ tcp_receiver_.onClientDisconnect(this);
+ return false;
+ });
+
+ current_size_ = 0;
+ }
+
+ private:
+ void consumeNextRequest() {
+ if (request_buffer_queue_.size() == 0) {
+ // No additiona requests to process.
+ return;
+ }
+
+ auto& buffer = request_buffer_queue_.front();
+ uint64_t request_hash =
+ utils::hash::fnv64_buf(buffer->data(), buffer->length());
+
+ std::stringstream name;
+ name << prefix_hash_.substr(0, prefix_hash_.length() - 2);
+
+ for (uint16_t* word = (uint16_t*)&request_hash;
+ std::size_t(word) <
+ (std::size_t(&request_hash) + sizeof(request_hash));
+ word++) {
+ name << ":" << std::hex << *word;
+ }
+
+ name << "|0";
+
+ // Non blocking consume :)
+ consumer_.consume(Name(name.str()));
+ }
+
+ // tcp callbacks
+
+ void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last,
+ bool headers) {
+ if (headers) {
+ // Add the request to the request queue
+ tmp_buffer_ = utils::MemBuf::copyBuffer(data, size);
+ } else {
+ // Append payload chunk to last request added. Here we are assuming
+ // HTTP/1.1.
+ tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size));
+ }
+
+ current_size_ += size;
+
+ if (is_last) {
+ TRANSPORT_LOGD(
+ "Request received: %s",
+ std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length())
+ .c_str());
+ if (current_size_ < 1400) {
+ request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
+ } else {
+ TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.",
+ current_size_);
+ session_->close();
+ current_size_ = 0;
+ return;
+ }
+
+ if (!consumer_.isRunning()) {
+ TRANSPORT_LOGD(
+ "Consumer stopped, triggering consume from TCP session handler..");
+ consumeNextRequest();
+ }
+
+ current_size_ = 0;
+ }
+ }
+
+ // hicn callbacks
+
+ void processLeavingInterest(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
+ if (interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front()->clone());
+ }
+ }
+
+ bool isBufferMovable() noexcept { return true; }
+ void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {}
+ void readDataAvailable(size_t length) noexcept {}
+ size_t maxBufferSize() const { return 64 * 1024; }
+
+ void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept {
+ // Response received. Send it back to client
+ auto _buffer = buffer.release();
+ TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
+ session_->send(_buffer, []() {});
+ }
+
+ void readError(const std::error_code ec) noexcept {
+ TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session.");
+ session_->close();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept {
+ request_buffer_queue_.pop_front();
+ consumeNextRequest();
+ }
+
+ private:
+ TcpReceiver& tcp_receiver_;
+ utils::EventThread& thread_;
+ std::string prefix_hash_;
+ ConsumerSocket consumer_;
+ std::unique_ptr<HTTPSession> session_;
+ std::deque<std::unique_ptr<utils::MemBuf>> request_buffer_queue_;
+ std::unique_ptr<utils::MemBuf> tmp_buffer_;
+ std::size_t current_size_;
+};
+
+TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
+ const std::string& ipv6_first_word)
+ : Receiver(),
+ listener_(thread_.getIoService(), port,
+ std::bind(&TcpReceiver::onNewConnection, this,
+ std::placeholders::_1)),
+ prefix_(prefix),
+ ipv6_first_word_(ipv6_first_word) {
+ for (int i = 0; i < 10; i++) {
+ http_clients_.emplace_back(new HTTPClientConnectionCallback(
+ *this, thread_, prefix, ipv6_first_word));
+ }
+}
+
+void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {
+ http_clients_.emplace_front(client);
+ used_http_clients_.erase(client);
+}
+
+void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
+ if (http_clients_.size() == 0) {
+ // Create new HTTPClientConnectionCallback
+ TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback.");
+ http_clients_.emplace_back(new HTTPClientConnectionCallback(
+ *this, thread_, prefix_, ipv6_first_word_));
+ }
+
+ // Get new HTTPClientConnectionCallback
+ HTTPClientConnectionCallback* c = http_clients_.front();
+ http_clients_.pop_front();
+
+ // Set http session
+ c->setHttpSession(std::move(socket));
+
+ // Move it to used clients
+ used_http_clients_.insert(c);
+}
+
+HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params));
+ receivers_.emplace_back(std::make_unique<TcpReceiver>(
+ params.tcp_listen_port, params.prefix, params.first_ipv6_word));
+ }
+}
+
+HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ receivers_.emplace_back(std::make_unique<IcnReceiver>(
+ params.prefix, params.first_ipv6_word, params.origin_address,
+ params.origin_port, params.cache_size, params.mtu,
+ params.content_lifetime, params.manifest));
+ }
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h
new file mode 100644
index 000000000..6fa394e28
--- /dev/null
+++ b/apps/http-proxy/src/http_proxy.h
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/utils/event_thread.h>
+
+#include "http_session.h"
+#include "icn_receiver.h"
+
+#define ASIO_STANDALONE
+#include <asio.hpp>
+#include <asio/version.hpp>
+#include <unordered_set>
+
+class TcpListener {
+ public:
+ using AcceptCallback = std::function<void(asio::ip::tcp::socket&&)>;
+
+ TcpListener(asio::io_service& io_service, short port, AcceptCallback callback)
+ : acceptor_(io_service,
+ asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ socket_(io_service),
+#endif
+ callback_(callback) {
+ do_accept();
+ }
+
+ private:
+ void do_accept() {
+#if ((ASIO_VERSION / 100 % 1000) >= 12)
+ acceptor_.async_accept(
+ [this](std::error_code ec, asio::ip::tcp::socket socket) {
+#else
+ acceptor_.async_accept(socket_, [this](std::error_code ec) {
+ auto socket = std::move(socket_);
+#endif
+ if (!ec) {
+ callback_(std::move(socket));
+ }
+
+ do_accept();
+ });
+ }
+
+ asio::ip::tcp::acceptor acceptor_;
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ asio::ip::tcp::socket socket_;
+#endif
+ AcceptCallback callback_;
+};
+
+namespace transport {
+
+class HTTPClientConnectionCallback;
+
+class Receiver {
+ public:
+ Receiver() : thread_() {}
+
+ protected:
+ utils::EventThread thread_;
+};
+
+class TcpReceiver : public Receiver {
+ friend class HTTPClientConnectionCallback;
+
+ public:
+ TcpReceiver(std::uint16_t port, const std::string& prefix,
+ const std::string& ipv6_first_word);
+
+ private:
+ void onNewConnection(asio::ip::tcp::socket&& socket);
+ void onClientDisconnect(HTTPClientConnectionCallback* client);
+
+ TcpListener listener_;
+ std::string prefix_;
+ std::string ipv6_first_word_;
+ std::deque<HTTPClientConnectionCallback*> http_clients_;
+ std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_;
+};
+
+class IcnReceiver : public Receiver {
+ public:
+ template <typename... Args>
+ IcnReceiver(Args&&... args)
+ : Receiver(),
+ icn_consum_producer_(thread_.getIoService(),
+ std::forward<Args>(args)...) {
+ icn_consum_producer_.run();
+ }
+
+ private:
+ AsyncConsumerProducer icn_consum_producer_;
+};
+
+class HTTPProxy {
+ public:
+ enum Server { CREATE };
+ enum Client { WRAP_BUFFER };
+
+ struct CommonParams {
+ std::string prefix;
+ std::string first_ipv6_word;
+
+ virtual void printParams() { std::cout << "Parameters: " << std::endl; };
+ };
+
+ struct ClientParams : virtual CommonParams {
+ short tcp_listen_port;
+ void printParams() override {
+ std::cout << "Running HTTP/TCP -> HTTP/hICN proxy." << std::endl;
+ CommonParams::printParams();
+ std::cout << "\t"
+ << "HTTP listen port: " << tcp_listen_port << std::endl;
+ std::cout << "\t"
+ << "Consumer Prefix: " << prefix << std::endl;
+ std::cout << "\t"
+ << "Prefix first word: " << first_ipv6_word << std::endl;
+ }
+ };
+
+ struct ServerParams : virtual CommonParams {
+ std::string origin_address;
+ std::string origin_port;
+ std::string cache_size;
+ std::string mtu;
+ std::string content_lifetime;
+ bool manifest;
+
+ void printParams() override {
+ std::cout << "Running HTTP/hICN -> HTTP/TCP proxy." << std::endl;
+ CommonParams::printParams();
+ std::cout << "\t"
+ << "Origin address: " << origin_address << std::endl;
+ std::cout << "\t"
+ << "Origin port: " << origin_port << std::endl;
+ std::cout << "\t"
+ << "Producer cache size: " << cache_size << std::endl;
+ std::cout << "\t"
+ << "hICN MTU: " << mtu << std::endl;
+ std::cout << "\t"
+ << "Default content lifetime: " << content_lifetime
+ << std::endl;
+ std::cout << "\t"
+ << "Producer Prefix: " << prefix << std::endl;
+ std::cout << "\t"
+ << "Prefix first word: " << first_ipv6_word << std::endl;
+ std::cout << "\t"
+ << "Use manifest: " << manifest << std::endl;
+ }
+ };
+
+ HTTPProxy(ClientParams& icn_params, std::size_t n_thread = 1);
+ HTTPProxy(ServerParams& icn_params, std::size_t n_thread = 1);
+
+ void run() { sleep(1000000); }
+
+ private:
+ void acceptTCPClient(asio::ip::tcp::socket&& socket);
+
+ private:
+ std::vector<std::unique_ptr<Receiver>> receivers_;
+};
+
+} // namespace transport \ No newline at end of file
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc
index a9b889941..2c281468f 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -13,48 +13,75 @@
* limitations under the License.
*/
-#include "ATSConnector.h"
-#include "HTTP1.xMessageFastParser.h"
+#include "http_session.h"
#include <hicn/transport/utils/branch_prediction.h>
#include <hicn/transport/utils/log.h>
+
#include <iostream>
+#include "HTTP1.xMessageFastParser.h"
+
namespace transport {
-ATSConnector::ATSConnector(asio::io_service &io_service,
- std::string &ip_address, std::string &port,
- ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback)
+HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
+ std::string &port,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool reverse)
: io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
endpoint_iterator_(resolver_.resolve({ip_address, port})),
timer_(io_service),
+ reverse_(reverse),
is_reconnection_(false),
data_available_(false),
content_length_(0),
is_last_chunk_(false),
chunked_(false),
receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback) {
+ on_connection_closed_callback_(on_connection_closed_callback) {
input_buffer_.prepare(buffer_size + 2048);
state_ = ConnectorState::CONNECTING;
doConnect();
}
-ATSConnector::~ATSConnector() {}
+HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool reverse)
+ : io_service_(socket.get_io_service()),
+ socket_(std::move(socket)),
+ resolver_(io_service_),
+ timer_(io_service_),
+ reverse_(reverse),
+ is_reconnection_(false),
+ data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
+ receive_callback_(receive_callback),
+ on_connection_closed_callback_(on_connection_closed_callback) {
+ input_buffer_.prepare(buffer_size + 2048);
+ state_ = ConnectorState::CONNECTED;
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
+ doReadHeader();
+}
+
+HTTPSession::~HTTPSession() {}
-void ATSConnector::send(const uint8_t *packet, std::size_t len,
- ContentSentCallback &&content_sent) {
+void HTTPSession::send(const uint8_t *packet, std::size_t len,
+ ContentSentCallback &&content_sent) {
asio::async_write(
socket_, asio::buffer(packet, len),
[content_sent = std::move(content_sent)](
std::error_code ec, std::size_t /*length*/) { content_sent(); });
}
-void ATSConnector::send(utils::MemBuf *buffer,
- ContentSentCallback &&content_sent) {
+void HTTPSession::send(utils::MemBuf *buffer,
+ ContentSentCallback &&content_sent) {
io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer),
@@ -70,24 +97,25 @@ void ATSConnector::send(utils::MemBuf *buffer,
});
}
-void ATSConnector::close() {
+void HTTPSession::close() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
// on_disconnect_callback_();
}
}
}
-void ATSConnector::doWrite() {
+void HTTPSession::doWrite() {
auto &buffer = write_msgs_.front().first;
asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_FALSE(!ec)) {
- TRANSPORT_LOGD("Content successfully sent!");
+ TRANSPORT_LOGD("Content successfully sent! %zu",
+ length);
write_msgs_.front().second();
write_msgs_.pop_front();
if (!write_msgs_.empty()) {
@@ -99,12 +127,14 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
+void HTTPSession::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
+ bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false)
+ : !content_length_;
+ receive_callback_(buffer, input_buffer_.size(), is_last, false);
input_buffer_.consume(input_buffer_.size());
if (!content_length_) {
@@ -117,7 +147,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
auto to_read =
content_length_ >= buffer_size ? buffer_size : content_length_;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
@@ -126,8 +156,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
}
}
-void ATSConnector::doReadBody(std::size_t body_size,
- std::size_t additional_bytes) {
+void HTTPSession::doReadBody(std::size_t body_size,
+ std::size_t additional_bytes) {
auto bytes_to_read =
body_size > additional_bytes ? (body_size - additional_bytes) : 0;
@@ -140,14 +170,16 @@ void ATSConnector::doReadBody(std::size_t body_size,
if (to_read > 0) {
content_length_ = bytes_to_read;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
} else {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
- false);
- input_buffer_.consume(body_size);
+ if (body_size) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false);
+ input_buffer_.consume(body_size);
+ }
if (!chunked_ || is_last_chunk_) {
doReadHeader();
@@ -157,7 +189,7 @@ void ATSConnector::doReadBody(std::size_t body_size,
}
}
-void ATSConnector::doReadChunkedHeader() {
+void HTTPSession::doReadChunkedHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n",
[this](std::error_code ec, std::size_t length) {
@@ -176,14 +208,15 @@ void ATSConnector::doReadChunkedHeader() {
});
}
-void ATSConnector::doReadHeader() {
+void HTTPSession::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
+ auto headers =
+ HTTPMessageFastParser::getHeaders(buffer, length, reverse_);
// Try to get content length, if available
auto it = headers.find(HTTPMessageFastParser::content_length);
@@ -215,23 +248,25 @@ void ATSConnector::doReadHeader() {
});
}
-void ATSConnector::tryReconnection() {
- TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
- if (state_ == ConnectorState::CONNECTED) {
- state_ = ConnectorState::CONNECTING;
- is_reconnection_ = true;
- io_service_.post([this]() {
- if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
- });
+void HTTPSession::tryReconnection() {
+ if (on_connection_closed_callback_(socket_)) {
+ if (state_ == ConnectorState::CONNECTED) {
+ TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
+ state_ = ConnectorState::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ startConnectionTimer();
+ doConnect();
+ });
+ }
}
}
-void ATSConnector::doConnect() {
+void HTTPSession::doConnect() {
asio::async_connect(socket_, endpoint_iterator_,
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
@@ -263,17 +298,17 @@ void ATSConnector::doConnect() {
});
}
-bool ATSConnector::checkConnected() {
+bool HTTPSession::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void ATSConnector::startConnectionTimer() {
+void HTTPSession::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(10));
timer_.async_wait(
- std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1));
+ std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1));
}
-void ATSConnector::handleDeadline(const std::error_code &ec) {
+void HTTPSession::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/http_session.h
index 8d91b7b7b..20ebc5d7d 100644
--- a/apps/http-proxy/src/ATSConnector.h
+++ b/apps/http-proxy/src/http_session.h
@@ -29,13 +29,13 @@ using asio::ip::tcp;
typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last,
bool headers)>
ContentReceivedCallback;
-typedef std::function<void()> OnReconnect;
+typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed;
typedef std::function<void()> ContentSentCallback;
typedef std::deque<
std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>>
BufferQueue;
-class ATSConnector {
+class HTTPSession {
static constexpr uint32_t buffer_size = 1024 * 512;
enum class ConnectorState {
@@ -45,11 +45,15 @@ class ATSConnector {
};
public:
- ATSConnector(asio::io_service &io_service, std::string &ip_address,
- std::string &port, ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback);
+ HTTPSession(asio::io_service &io_service, std::string &ip_address,
+ std::string &port, ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_reconnect_callback, bool reverse = false);
- ~ATSConnector();
+ HTTPSession(asio::ip::tcp::socket socket,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_reconnect_callback, bool reverse = true);
+
+ ~HTTPSession();
void send(const uint8_t *buffer, std::size_t len,
ContentSentCallback &&content_sent = 0);
@@ -65,9 +69,6 @@ class ATSConnector {
void doReadBody(std::size_t body_size, std::size_t additional_bytes);
- // void handleReadChunked(std::error_code ec, std::size_t length,
- // std::size_t size);
-
void doReadChunkedHeader();
void doWrite();
@@ -90,6 +91,7 @@ class ATSConnector {
asio::streambuf input_buffer_;
+ bool reverse_;
bool is_reconnection_;
bool data_available_;
@@ -100,7 +102,7 @@ class ATSConnector {
bool chunked_;
ContentReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
+ OnConnectionClosed on_connection_closed_callback_;
// Connector state
ConnectorState state_;
diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc
index 24b0eb5dc..3bd5525cc 100644
--- a/apps/http-proxy/src/IcnReceiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -13,8 +13,7 @@
* limitations under the License.
*/
-#include "IcnReceiver.h"
-#include "HTTP1.xMessageFastParser.h"
+#include "icn_receiver.h"
#include <hicn/transport/core/interest.h>
#include <hicn/transport/http/default_values.h>
@@ -24,45 +23,22 @@
#include <functional>
#include <memory>
-namespace transport {
-
-core::Prefix generatePrefix(const std::string& prefix_url,
- std::string& first_ipv6_word) {
- const char* str = prefix_url.c_str();
- uint16_t pos = 0;
-
- if (strncmp("http://", str, 7) == 0) {
- pos = 7;
- } else if (strncmp("https://", str, 8) == 0) {
- pos = 8;
- }
-
- str += pos;
-
- uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
-
- std::stringstream stream;
- stream << first_ipv6_word << ":0";
-
- for (uint16_t* word = (uint16_t*)&locator_hash;
- std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
- word++) {
- stream << ":" << std::hex << *word;
- }
-
- stream << "::0";
+#include "HTTP1.xMessageFastParser.h"
+#include "utils.h"
- return core::Prefix(stream.str(), 64);
-}
+namespace transport {
AsyncConsumerProducer::AsyncConsumerProducer(
- const std::string& prefix, std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu, std::string& first_ipv6_word,
- unsigned long default_lifetime, bool manifest)
- : prefix_(generatePrefix(prefix, first_ipv6_word)),
+ asio::io_service& io_service, const std::string& prefix,
+ const std::string& first_ipv6_word, const std::string& origin_address,
+ const std::string& origin_port, const std::string& cache_size,
+ const std::string& mtu, const std::string& content_lifetime, bool manifest)
+ : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)),
+ io_service_(io_service),
+ external_io_service_(true),
producer_socket_(),
- ip_address_(ip_address),
- port_(port),
+ ip_address_(origin_address),
+ port_(origin_port),
cache_size_(std::stoul(cache_size)),
mtu_(std::stoul(mtu)),
request_counter_(0),
@@ -71,11 +47,13 @@ AsyncConsumerProducer::AsyncConsumerProducer(
std::bind(&AsyncConsumerProducer::publishContent, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
- [this]() {
+ [this](asio::ip::tcp::socket& socket) -> bool {
std::queue<interface::PublicationOptions> empty;
std::swap(response_name_queue_, empty);
+
+ return true;
}),
- default_content_lifetime_(default_lifetime) {
+ default_content_lifetime_(std::stoul(content_lifetime)) {
int ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
@@ -116,7 +94,10 @@ void AsyncConsumerProducer::start() {
void AsyncConsumerProducer::run() {
start();
- io_service_.run();
+
+ if (!external_io_service_) {
+ io_service_.run();
+ }
}
void AsyncConsumerProducer::doReceive() {
@@ -141,16 +122,15 @@ void AsyncConsumerProducer::manageIncomingInterest(
auto _it = chunk_number_map_.find(name);
auto _end = chunk_number_map_.end();
- std::cout << "Received interest " << seg << std::endl;
-
if (_it != _end) {
if (_it->second.second) {
- // Content is in production
+ TRANSPORT_LOGD(
+ "Content is in production, interest will be satisfied shortly.");
return;
}
if (seg >= _it->second.first) {
- TRANSPORT_LOGI(
+ TRANSPORT_LOGD(
"Ignoring interest with name %s for a content object which does not "
"exist. (Request: %u, max: %u)",
name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
@@ -158,8 +138,6 @@ void AsyncConsumerProducer::manageIncomingInterest(
}
}
- std::cout << "Received interest " << seg << std::endl;
-
bool is_mpd =
HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length());
diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/icn_receiver.h
index 9d0ab5172..31e9ae932 100644
--- a/apps/http-proxy/src/IcnReceiver.h
+++ b/apps/http-proxy/src/icn_receiver.h
@@ -13,18 +13,19 @@
* limitations under the License.
*/
-#include "ATSConnector.h"
-
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/interfaces/publication_options.h>
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/spinlock.h>
+#include <asio.hpp>
#include <cassert>
#include <cstring>
#include <queue>
#include <utility>
+#include "http_session.h"
+
namespace transport {
class AsyncConsumerProducer {
@@ -33,17 +34,29 @@ class AsyncConsumerProducer {
using RequestQueue = std::queue<interface::PublicationOptions>;
public:
- explicit AsyncConsumerProducer(const std::string& prefix,
- std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu,
- std::string& first_ipv6_word,
- unsigned long default_content_lifetime, bool manifest);
-
- void start();
+ explicit AsyncConsumerProducer(
+ asio::io_service& io_service, const std::string& prefix,
+ const std::string& first_ipv6_word, const std::string& origin_address,
+ const std::string& origin_port, const std::string& cache_size,
+ const std::string& mtu, const std::string& content_lifetime,
+ bool manifest);
+
+ explicit AsyncConsumerProducer(
+ const std::string& prefix, const std::string& first_ipv6_word,
+ const std::string& origin_address, const std::string& origin_port,
+ const std::string& cache_size, const std::string& mtu,
+ const std::string& content_lifetime, bool manifest)
+ : AsyncConsumerProducer(internal_io_service_, prefix, first_ipv6_word,
+ origin_address, origin_port, cache_size, mtu,
+ content_lifetime, manifest) {
+ external_io_service_ = false;
+ }
void run();
private:
+ void start();
+
void doSend();
void doReceive();
@@ -55,7 +68,9 @@ class AsyncConsumerProducer {
utils::MemBuf* payload);
core::Prefix prefix_;
- asio::io_service io_service_;
+ asio::io_service& io_service_;
+ asio::io_service internal_io_service_;
+ bool external_io_service_;
interface::ProducerSocket producer_socket_;
std::string ip_address_;
@@ -68,7 +83,7 @@ class AsyncConsumerProducer {
// std::unordered_map<core::Name, std::shared_ptr<ATSConnector>>
// connection_map_;
- ATSConnector connector_;
+ HTTPSession connector_;
unsigned long default_content_lifetime_;
diff --git a/apps/http-proxy/src/utils.h b/apps/http-proxy/src/utils.h
new file mode 100644
index 000000000..d87c796d0
--- /dev/null
+++ b/apps/http-proxy/src/utils.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/utils/hash.h>
+
+#include <sstream>
+#include <string>
+
+#pragma once
+
+TRANSPORT_ALWAYS_INLINE std::string generatePrefix(
+ const std::string& prefix_url, const std::string& first_ipv6_word) {
+ const char* str = prefix_url.c_str();
+ uint16_t pos = 0;
+
+ if (strncmp("http://", str, 7) == 0) {
+ pos = 7;
+ } else if (strncmp("https://", str, 8) == 0) {
+ pos = 8;
+ }
+
+ str += pos;
+
+ uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
+
+ std::stringstream stream;
+ stream << first_ipv6_word << ":0";
+
+ for (uint16_t* word = (uint16_t*)&locator_hash;
+ std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
+ word++) {
+ stream << ":" << std::hex << *word;
+ }
+
+ stream << "::";
+
+ return stream.str();
+} \ No newline at end of file