aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-03-19 14:26:52 +0100
committerMauro Sardara <msardara@cisco.com>2019-03-23 15:05:53 +0100
commit1ad06afe9f952642a26f4d28239cf05eb3283eb7 (patch)
tree1ea58529d64a38597cd09f78653cc784c4b61d79 /apps/http-proxy
parente6d4612011483b267dc9f47c5d2b2444dd88f402 (diff)
[HICN-6] ATS Working, little refactoring of apps
Change-Id: I174815b70bf3a9fbe99ffab7dd2914be04d364b9 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'apps/http-proxy')
-rw-r--r--apps/http-proxy/CMakeLists.txt66
-rw-r--r--apps/http-proxy/main.cc65
-rw-r--r--apps/http-proxy/src/ATSConnector.cc224
-rw-r--r--apps/http-proxy/src/ATSConnector.h99
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.cc71
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h34
-rw-r--r--apps/http-proxy/src/IcnReceiver.cc174
-rw-r--r--apps/http-proxy/src/IcnReceiver.h69
8 files changed, 802 insertions, 0 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
new file mode 100644
index 000000000..3d6b9c672
--- /dev/null
+++ b/apps/http-proxy/CMakeLists.txt
@@ -0,0 +1,66 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+set(CMAKE_CXX_STANDARD 14)
+
+if (NOT CMAKE_BUILD_TYPE)
+ message(STATUS "No build type selected, default to Release")
+ set(CMAKE_BUILD_TYPE "Release")
+endif()
+
+set(CMAKE_MODULE_PATH
+ ${CMAKE_MODULE_PATH}
+ "${CMAKE_SOURCE_DIR}/cmake/Modules/"
+)
+
+find_package(Threads REQUIRED)
+include_directories(
+ SYSTEM
+ ${CMAKE_BINARY_DIR}
+ ${LIB${TRANSPORT_LIBRARY}_INCLUDE_DIR}
+ http-server
+ http-client
+)
+
+set(LIB_SOURCE_FILES
+ src/ATSConnector.cc
+ src/HTTP1.xMessageFastParser.cc
+ src/IcnReceiver.cc
+)
+
+set(LIB_SERVER_HEADER_FILES
+ src/IcnReceiver.h
+ src/ATSConnector.h
+ src/HTTP1.xMessageFastParser.h
+)
+
+set(APP_SOURCE_FILES
+ main.cc
+)
+
+build_library(httpproxylib
+ STATIC
+ SOURCES ${LIB_SOURCE_FILES}
+ LINK_LIBRARIES ${LIBRARIES}
+ DEPENDS ${DEPENDENCIES}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
+)
+
+build_executable(hicn-http-proxy
+ SOURCES ${APP_SOURCE_FILES}
+ LINK_LIBRARIES httpproxylib
+ DEPENDS httpproxylib
+ COMPONENT hicn-http-proxy
+ DEFINITIONS ${COMPILER_DEFINITIONS}
+) \ No newline at end of file
diff --git a/apps/http-proxy/main.cc b/apps/http-proxy/main.cc
new file mode 100644
index 000000000..179da452b
--- /dev/null
+++ b/apps/http-proxy/main.cc
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/IcnReceiver.h"
+
+using namespace transport;
+
+int usage(char* program) {
+ std::cerr << "ICN Plugin not loaded!" << std::endl;
+ std::cerr << "USAGE: " << program
+ << "[HTTP_PREFIX] -a [SERVER_IP_ADDRESS] "
+ "-p [SERVER_PORT] -c [CACHE_SIZE]"
+ << std::endl;
+ return -1;
+}
+
+int main(int argc, char** argv) {
+ std::string prefix("http://hicn-http-proxy");
+ std::string ip_address("127.0.0.1");
+ std::string port("80");
+ std::string cache_size("50000");
+
+ int opt;
+ while ((opt = getopt(argc, argv, "a:p:c:")) != -1) {
+ switch (opt) {
+ case 'a':
+ prefix = optarg;
+ break;
+ case 'p':
+ port = optarg;
+ break;
+ case 'c':
+ cache_size = optarg;
+ break;
+ case 'h':
+ default:
+ usage(argv[0]);
+ break;
+ }
+ }
+
+ if (argv[optind] == 0) {
+ std::cerr << "Using default prefix " << prefix << std::endl;
+ } else {
+ prefix = argv[optind];
+ }
+
+ transport::AsyncConsumerProducer proxy(prefix, ip_address, port, cache_size);
+
+ proxy.run();
+
+ return 0;
+} \ No newline at end of file
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc
new file mode 100644
index 000000000..81f7a776a
--- /dev/null
+++ b/apps/http-proxy/src/ATSConnector.cc
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ATSConnector.h"
+#include "HTTP1.xMessageFastParser.h"
+
+#include <hicn/transport/utils/branch_prediction.h>
+#include <hicn/transport/utils/log.h>
+#include <iostream>
+
+namespace transport {
+
+ATSConnector::ATSConnector(asio::io_service &io_service,
+ std::string &ip_address, std::string &port,
+ ContentReceivedCallback receive_callback,
+ OnReconnect on_reconnect_callback)
+ : io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ endpoint_iterator_(resolver_.resolve({ip_address, port})),
+ timer_(io_service),
+ is_reconnection_(false),
+ data_available_(false),
+ receive_callback_(receive_callback),
+ on_reconnect_callback_(on_reconnect_callback) {
+ header_input_buffer_.prepare(2048);
+ state_ = ConnectorState::CONNECTING;
+ doConnect();
+}
+
+ATSConnector::~ATSConnector() {}
+
+void ATSConnector::send(const uint8_t *packet, std::size_t len,
+ ContentSentCallback &&content_sent) {
+ asio::async_write(
+ socket_, asio::buffer(packet, len),
+ [content_sent = std::move(content_sent)](
+ std::error_code ec, std::size_t /*length*/) { content_sent(); });
+}
+
+void ATSConnector::send(utils::MemBuf *buffer,
+ ContentSentCallback &&content_sent) {
+ io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() {
+ bool write_in_progress = !write_msgs_.empty();
+ write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer),
+ std::move(callback));
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+ if (!write_in_progress) {
+ doWrite();
+ }
+ } else {
+ TRANSPORT_LOGD(" Tell the handle connect it has data to write");
+ data_available_ = true;
+ }
+ });
+}
+
+void ATSConnector::close() {
+ if (state_ != ConnectorState::CLOSED) {
+ state_ = ConnectorState::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ // on_disconnect_callback_();
+ }
+ }
+}
+
+void ATSConnector::doWrite() {
+ auto &buffer = write_msgs_.front().first;
+
+ asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_FALSE(!ec)) {
+ TRANSPORT_LOGD("Content successfully sent!");
+ write_msgs_.front().second();
+ write_msgs_.pop_front();
+ if (!write_msgs_.empty()) {
+ doWrite();
+ }
+ } else {
+ TRANSPORT_LOGD("Content NOT sent!");
+ }
+ });
+} // namespace transport
+
+void ATSConnector::handleRead(std::error_code ec, std::size_t length,
+ std::size_t size) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ std::size_t bytes_in_buffer = length;
+ size -= bytes_in_buffer;
+ receive_callback_(input_buffer_, bytes_in_buffer, !size, false);
+
+ if (!size) {
+ doReadHeader();
+ } else {
+ auto to_read = size >= buffer_size ? buffer_size : size;
+ asio::async_read(
+ socket_, asio::buffer(input_buffer_, to_read),
+ std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
+ std::placeholders::_2, size));
+ }
+ } else if (ec == asio::error::eof) {
+ tryReconnection();
+ }
+}
+
+void ATSConnector::doReadBody(std::size_t size) {
+ auto to_read = size >= buffer_size ? buffer_size : size;
+ asio::async_read(
+ socket_, asio::buffer(input_buffer_, to_read),
+ std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
+ std::placeholders::_2, size));
+}
+
+void ATSConnector::doReadHeader() {
+ asio::async_read_until(
+ socket_, header_input_buffer_, "\r\n\r\n",
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ // TRANSPORT_LOGD("Headers received");
+
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(header_input_buffer_.data());
+ std::size_t size = HTTPMessageFastParser::hasBody(buffer, length);
+
+ auto additional_bytes = header_input_buffer_.size() - length;
+ auto bytes_to_read = size - additional_bytes;
+ receive_callback_(buffer, header_input_buffer_.size(), !bytes_to_read,
+ true);
+ header_input_buffer_.consume(header_input_buffer_.size());
+
+ if (bytes_to_read) {
+ doReadBody(bytes_to_read);
+ } else {
+ doReadHeader();
+ }
+ } else {
+ header_input_buffer_.consume(header_input_buffer_.size());
+ tryReconnection();
+ }
+ });
+}
+
+void ATSConnector::tryReconnection() {
+ TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
+ if (state_ == ConnectorState::CONNECTED) {
+ state_ = ConnectorState::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ startConnectionTimer();
+ doConnect();
+ });
+ }
+}
+
+void ATSConnector::doConnect() {
+ asio::async_connect(socket_, endpoint_iterator_,
+ [this](std::error_code ec, tcp::resolver::iterator) {
+ if (!ec) {
+ timer_.cancel();
+ state_ = ConnectorState::CONNECTED;
+
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
+
+ // on_reconnect_callback_();
+
+ doReadHeader();
+
+ if (data_available_ && !write_msgs_.empty()) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ TRANSPORT_LOGD("Connection recovered!");
+ }
+
+ } else {
+ TRANSPORT_LOGE("Impossible to reconnect.");
+ close();
+ }
+ });
+}
+
+bool ATSConnector::checkConnected() {
+ return state_ == ConnectorState::CONNECTED;
+}
+
+void ATSConnector::startConnectionTimer() {
+ timer_.expires_from_now(std::chrono::seconds(10));
+ timer_.async_wait(
+ std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1));
+}
+
+void ATSConnector::handleDeadline(const std::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ TRANSPORT_LOGE("Error connecting. Is the server running?\n");
+ io_service_.stop();
+ });
+ }
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h
new file mode 100644
index 000000000..be5c2c8d5
--- /dev/null
+++ b/apps/http-proxy/src/ATSConnector.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/packet.h>
+
+#define ASIO_STANDALONE
+#include <asio.hpp>
+#include <deque>
+#include <functional>
+
+namespace transport {
+
+using asio::ip::tcp;
+
+typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last,
+ bool headers)>
+ ContentReceivedCallback;
+typedef std::function<void()> OnReconnect;
+typedef std::function<void()> ContentSentCallback;
+typedef std::deque<
+ std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>>
+ BufferQueue;
+
+class ATSConnector {
+ static constexpr uint32_t buffer_size = 1024 * 64;
+
+ enum class ConnectorState {
+ CLOSED,
+ CONNECTING,
+ CONNECTED,
+ };
+
+ public:
+ ATSConnector(asio::io_service &io_service, std::string &ip_address,
+ std::string &port, ContentReceivedCallback receive_callback,
+ OnReconnect on_reconnect_callback);
+
+ ~ATSConnector();
+
+ void send(const uint8_t *buffer, std::size_t len,
+ ContentSentCallback &&content_sent = 0);
+
+ void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent);
+
+ void close();
+
+ private:
+ void doConnect();
+
+ void doReadHeader();
+
+ void doReadBody(std::size_t size);
+
+ void doWrite();
+
+ bool checkConnected();
+
+ private:
+ void handleRead(std::error_code ec, std::size_t length, std::size_t bytes);
+ void tryReconnection();
+ void startConnectionTimer();
+ void handleDeadline(const std::error_code &ec);
+
+ asio::io_service &io_service_;
+ asio::ip::tcp::socket socket_;
+ asio::ip::tcp::resolver resolver_;
+ asio::ip::tcp::resolver::iterator endpoint_iterator_;
+ asio::steady_timer timer_;
+
+ BufferQueue write_msgs_;
+
+ asio::streambuf header_input_buffer_;
+ uint8_t input_buffer_[buffer_size];
+
+ bool is_reconnection_;
+ bool data_available_;
+
+ ContentReceivedCallback receive_callback_;
+ OnReconnect on_reconnect_callback_;
+
+ // Connector state
+ ConnectorState state_;
+};
+
+} // namespace transport
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
new file mode 100644
index 000000000..a03871649
--- /dev/null
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "HTTP1.xMessageFastParser.h"
+
+#include <experimental/algorithm>
+#include <experimental/functional>
+#include <iostream>
+
+std::string HTTPMessageFastParser::numbers = "0123456789";
+std::string HTTPMessageFastParser::content_length = "Content-Length";
+std::string HTTPMessageFastParser::cache_control = "Cache-Control";
+std::string HTTPMessageFastParser::mpd = "mpd";
+std::string HTTPMessageFastParser::connection = "Connection";
+std::string HTTPMessageFastParser::separator = "\r\n\r\n";
+
+std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers,
+ std::size_t length) {
+ const char *buffer = reinterpret_cast<const char *>(headers);
+ const char *begin = buffer;
+ const char *end = buffer + length;
+
+ using std::experimental::make_boyer_moore_searcher;
+ auto it = std::experimental::search(
+ begin, end,
+ make_boyer_moore_searcher(content_length.begin(), content_length.end()));
+
+ if (it != end) {
+ // Read header line
+ auto it2 = std::find_first_of(it, end, numbers.begin(), numbers.end());
+ auto it3 = std::find(it2, end, '\n');
+
+ return std::stoul(std::string(it2, it3));
+ }
+
+ return 0;
+}
+
+bool HTTPMessageFastParser::isMpdRequest(const uint8_t *headers,
+ std::size_t length) {
+ const char *buffer = reinterpret_cast<const char *>(headers);
+ const char *begin = buffer;
+ const char *end = buffer + length;
+
+ using std::experimental::make_boyer_moore_searcher;
+ auto it = std::experimental::search(
+ begin, end, make_boyer_moore_searcher(mpd.begin(), mpd.end()));
+
+ if (it != end) {
+ return true;
+ }
+
+ return false;
+}
+
+uint32_t HTTPMessageFastParser::parseCacheControl(const uint8_t *headers,
+ std::size_t length) {
+ return 0;
+}
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
new file mode 100644
index 000000000..10a70c3e9
--- /dev/null
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <string>
+
+class HTTPMessageFastParser {
+ public:
+ static std::size_t hasBody(const uint8_t* headers, std::size_t length);
+ static bool isMpdRequest(const uint8_t* headers, std::size_t length);
+ static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
+
+ private:
+ static std::string numbers;
+ static std::string content_length;
+ static std::string cache_control;
+ static std::string connection;
+ static std::string mpd;
+ static std::string separator;
+};
diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc
new file mode 100644
index 000000000..ee8ef0823
--- /dev/null
+++ b/apps/http-proxy/src/IcnReceiver.cc
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IcnReceiver.h"
+#include "HTTP1.xMessageFastParser.h"
+
+#include <hicn/transport/http/default_values.h>
+#include <hicn/transport/utils/hash.h>
+
+#include <functional>
+#include <memory>
+
+namespace transport {
+
+core::Prefix generatePrefix(const std::string& prefix_url) {
+ const char* str = prefix_url.c_str();
+ uint16_t pos = 0;
+
+ if (strncmp("http://", str, 7) == 0) {
+ pos = 7;
+ } else if (strncmp("https://", str, 8) == 0) {
+ pos = 8;
+ }
+
+ str += pos;
+
+ uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
+
+ std::stringstream stream;
+ stream << std::hex << http::default_values::ipv6_first_word << ":0";
+
+ for (uint16_t* word = (uint16_t*)&locator_hash;
+ std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
+ word++) {
+ stream << ":" << std::hex << *word;
+ }
+
+ stream << "::0";
+
+ return core::Prefix(stream.str(), 64);
+}
+
+AsyncConsumerProducer::AsyncConsumerProducer(const std::string& prefix,
+ std::string& ip_address,
+ std::string& port,
+ std::string& cache_size)
+ : prefix_(generatePrefix(prefix)),
+ producer_socket_(),
+ ip_address_(ip_address),
+ port_(port),
+ cache_size_(std::stoul(cache_size)),
+ request_counter_(0),
+ signals_(io_service_, SIGINT, SIGQUIT),
+ connector_(io_service_, ip_address_, port_,
+ std::bind(&AsyncConsumerProducer::publishContent, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, std::placeholders::_4),
+ [this]() {
+ std::queue<interface::PublicationOptions> empty;
+ std::swap(response_name_queue_, empty);
+ }) {
+ int ret = producer_socket_.setSocketOption(
+ interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
+
+ if (ret != SOCKET_OPTION_SET) {
+ TRANSPORT_LOGD("Warning: output buffer size has not been set.");
+ }
+
+ producer_socket_.registerPrefix(prefix_);
+
+ // Let the main thread to catch SIGINT and SIGQUIT
+ signals_.async_wait(
+ [this](const std::error_code& errorCode, int signal_number) {
+ TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
+ (unsigned long)request_counter_);
+ producer_socket_.stop();
+ connector_.close();
+ });
+}
+
+void AsyncConsumerProducer::start() {
+ TRANSPORT_LOGD("Starting listening");
+ doReceive();
+}
+
+void AsyncConsumerProducer::run() {
+ start();
+ io_service_.run();
+}
+
+void AsyncConsumerProducer::doReceive() {
+ producer_socket_.setSocketOption(
+ interface::ProducerCallbacksOptions::CACHE_MISS,
+ [this](interface::ProducerSocket& producer,
+ interface::Interest& interest) {
+ // core::Name n(interest.getWritableName(), true);
+ io_service_.post(std::bind(
+ &AsyncConsumerProducer::manageIncomingInterest, this,
+ interest.getWritableName(), interest.acquireMemBufReference(),
+ interest.getPayload().release()));
+ });
+
+ producer_socket_.connect();
+}
+
+void AsyncConsumerProducer::manageIncomingInterest(
+ core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) {
+ // auto seg = name.getSuffix();
+ name.setSuffix(0);
+ auto _it = chunk_number_map_.find(name);
+ auto _end = chunk_number_map_.end();
+
+ if (_it != _end) {
+ return;
+ }
+
+ bool is_mpd =
+ HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length());
+
+ chunk_number_map_.emplace(name, 0);
+ response_name_queue_.emplace(std::move(name), is_mpd ? 500 : 10000);
+
+ connector_.send(payload, [packet = std::move(packet)]() {});
+}
+
+void AsyncConsumerProducer::publishContent(const uint8_t* data,
+ std::size_t size, bool is_last,
+ bool headers) {
+ uint32_t start_suffix = 0;
+
+ if (response_name_queue_.empty()) {
+ abort();
+ }
+
+ interface::PublicationOptions& options = response_name_queue_.front();
+
+ int ret = producer_socket_.setSocketOption(
+ interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ options.getLifetime());
+
+ if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) {
+ TRANSPORT_LOGD("Warning: content object lifetime has not been set.");
+ }
+
+ const interface::Name& name = options.getName();
+
+ start_suffix = chunk_number_map_[name];
+
+ if (headers) {
+ request_counter_++;
+ }
+
+ chunk_number_map_[name] +=
+ producer_socket_.produce(name, data, size, is_last, start_suffix);
+
+ if (is_last) {
+ chunk_number_map_.erase(name);
+ response_name_queue_.pop();
+ }
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h
new file mode 100644
index 000000000..7d5c5e4c8
--- /dev/null
+++ b/apps/http-proxy/src/IcnReceiver.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ATSConnector.h"
+
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/interfaces/publication_options.h>
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/spinlock.h>
+
+#include <cassert>
+#include <cstring>
+#include <queue>
+#include <utility>
+
+namespace transport {
+
+class AsyncConsumerProducer {
+ public:
+ explicit AsyncConsumerProducer(const std::string& prefix,
+ std::string& ip_address, std::string& port,
+ std::string& cache_size);
+
+ void start();
+
+ void run();
+
+ private:
+ void doSend();
+
+ void doReceive();
+
+ void publishContent(const uint8_t* data, std::size_t size,
+ bool is_last = true, bool headers = false);
+
+ void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet,
+ utils::MemBuf* payload);
+
+ core::Prefix prefix_;
+ asio::io_service io_service_;
+ interface::ProducerSocket producer_socket_;
+
+ std::string ip_address_;
+ std::string port_;
+ uint32_t cache_size_;
+
+ uint64_t request_counter_;
+ asio::signal_set signals_;
+
+ // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>>
+ // connection_map_;
+ ATSConnector connector_;
+ std::unordered_map<core::Name, uint32_t> chunk_number_map_;
+ std::queue<interface::PublicationOptions> response_name_queue_;
+};
+
+} // namespace transport