aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy/src
diff options
context:
space:
mode:
Diffstat (limited to 'apps/http-proxy/src')
-rw-r--r--apps/http-proxy/src/ATSConnector.h109
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h40
-rw-r--r--apps/http-proxy/src/IcnReceiver.h81
-rw-r--r--apps/http-proxy/src/forwarder_interface.cc263
-rw-r--r--apps/http-proxy/src/http_1x_message_fast_parser.cc (renamed from apps/http-proxy/src/HTTP1.xMessageFastParser.cc)47
-rw-r--r--apps/http-proxy/src/http_proxy.cc376
-rw-r--r--apps/http-proxy/src/http_session.cc (renamed from apps/http-proxy/src/ATSConnector.cc)166
-rw-r--r--apps/http-proxy/src/icn_receiver.cc (renamed from apps/http-proxy/src/IcnReceiver.cc)103
8 files changed, 822 insertions, 363 deletions
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h
deleted file mode 100644
index 8d91b7b7b..000000000
--- a/apps/http-proxy/src/ATSConnector.h
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/packet.h>
-
-#define ASIO_STANDALONE
-#include <asio.hpp>
-#include <deque>
-#include <functional>
-
-namespace transport {
-
-using asio::ip::tcp;
-
-typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last,
- bool headers)>
- ContentReceivedCallback;
-typedef std::function<void()> OnReconnect;
-typedef std::function<void()> ContentSentCallback;
-typedef std::deque<
- std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>>
- BufferQueue;
-
-class ATSConnector {
- static constexpr uint32_t buffer_size = 1024 * 512;
-
- enum class ConnectorState {
- CLOSED,
- CONNECTING,
- CONNECTED,
- };
-
- public:
- ATSConnector(asio::io_service &io_service, std::string &ip_address,
- std::string &port, ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback);
-
- ~ATSConnector();
-
- void send(const uint8_t *buffer, std::size_t len,
- ContentSentCallback &&content_sent = 0);
-
- void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent);
-
- void close();
-
- private:
- void doConnect();
-
- void doReadHeader();
-
- void doReadBody(std::size_t body_size, std::size_t additional_bytes);
-
- // void handleReadChunked(std::error_code ec, std::size_t length,
- // std::size_t size);
-
- void doReadChunkedHeader();
-
- void doWrite();
-
- bool checkConnected();
-
- private:
- void handleRead(std::error_code ec, std::size_t length);
- void tryReconnection();
- void startConnectionTimer();
- void handleDeadline(const std::error_code &ec);
-
- asio::io_service &io_service_;
- asio::ip::tcp::socket socket_;
- asio::ip::tcp::resolver resolver_;
- asio::ip::tcp::resolver::iterator endpoint_iterator_;
- asio::steady_timer timer_;
-
- BufferQueue write_msgs_;
-
- asio::streambuf input_buffer_;
-
- bool is_reconnection_;
- bool data_available_;
-
- std::size_t content_length_;
-
- // Chunked encoding
- bool is_last_chunk_;
- bool chunked_;
-
- ContentReceivedCallback receive_callback_;
- OnReconnect on_reconnect_callback_;
-
- // Connector state
- ConnectorState state_;
-};
-
-} // namespace transport
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
deleted file mode 100644
index 79dbce19d..000000000
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <algorithm>
-#include <string>
-
-#include <hicn/transport/http/message.h>
-
-using transport::http::HTTPHeaders;
-
-class HTTPMessageFastParser {
- public:
- static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length);
- static std::size_t hasBody(const uint8_t* headers, std::size_t length);
- static bool isMpdRequest(const uint8_t* headers, std::size_t length);
- static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
-
- static std::string numbers;
- static std::string content_length;
- static std::string transfer_encoding;
- static std::string chunked;
- static std::string cache_control;
- static std::string connection;
- static std::string mpd;
- static std::string separator;
-};
diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h
deleted file mode 100644
index 9d0ab5172..000000000
--- a/apps/http-proxy/src/IcnReceiver.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ATSConnector.h"
-
-#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/interfaces/publication_options.h>
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/spinlock.h>
-
-#include <cassert>
-#include <cstring>
-#include <queue>
-#include <utility>
-
-namespace transport {
-
-class AsyncConsumerProducer {
- using SegmentProductionPair = std::pair<uint32_t, bool>;
- using ResponseInfoMap = std::unordered_map<core::Name, SegmentProductionPair>;
- using RequestQueue = std::queue<interface::PublicationOptions>;
-
- public:
- explicit AsyncConsumerProducer(const std::string& prefix,
- std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu,
- std::string& first_ipv6_word,
- unsigned long default_content_lifetime, bool manifest);
-
- void start();
-
- void run();
-
- private:
- void doSend();
-
- void doReceive();
-
- void publishContent(const uint8_t* data, std::size_t size,
- bool is_last = true, bool headers = false);
-
- void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet,
- utils::MemBuf* payload);
-
- core::Prefix prefix_;
- asio::io_service io_service_;
- interface::ProducerSocket producer_socket_;
-
- std::string ip_address_;
- std::string port_;
- uint32_t cache_size_;
- uint32_t mtu_;
-
- uint64_t request_counter_;
- asio::signal_set signals_;
-
- // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>>
- // connection_map_;
- ATSConnector connector_;
-
- unsigned long default_content_lifetime_;
-
- // ResponseInfoMap --> max_seq_number + bool indicating whether request is in
- // production
- ResponseInfoMap chunk_number_map_;
- RequestQueue response_name_queue_;
-};
-
-} // namespace transport
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc
new file mode 100644
index 000000000..7d8235ac6
--- /dev/null
+++ b/apps/http-proxy/src/forwarder_interface.cc
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arpa/inet.h>
+#include <hicn/http-proxy/forwarder_interface.h>
+#include <hicn/transport/utils/log.h>
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <unordered_set>
+
+namespace transport {
+
+ForwarderInterface::~ForwarderInterface() {}
+
+int ForwarderInterface::connectToForwarder() {
+ sock_ = hc_sock_create();
+ if (!sock_) return -1;
+
+ if (hc_sock_connect(sock_) < 0) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ return -1;
+ }
+
+ return 0;
+}
+
+void ForwarderInterface::close() {
+ if (!closed_) {
+ internal_ioservice_.post([this]() {
+ work_.reset();
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+ });
+
+ if (thread_->joinable()) {
+ thread_->join();
+ }
+ }
+}
+
+void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) {
+ internalRemoveConnectedUser(route_id);
+}
+
+void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) {
+ internal_ioservice_.post(
+ [this, route_id]() { internalRemoveConnectedUser(route_id); });
+}
+
+int32_t ForwarderInterface::getMainListenerPort() {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) return -1;
+
+ int ret = -1;
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ ret = l->local_port;
+ break;
+ }
+ }
+
+ hc_data_free(data);
+ return ret;
+}
+
+void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
+ auto it = route_status_.find(route_id);
+ if (it == route_status_.end()) return;
+
+ if (!sock_) return;
+
+ // remove route
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) return;
+
+ std::vector<hc_route_t *> routes_to_remove;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+
+ std::string route_addr(remote_addr);
+ if (route_addr.compare(it->second->route_addr) == 0 &&
+ r->len == it->second->route_len) {
+ // route found
+ routes_to_remove.push_back(r);
+ }
+ }
+
+ route_status_.erase(it);
+
+ if (routes_to_remove.size() == 0) {
+ // nothing to do here
+ hc_data_free(data);
+ return;
+ }
+
+ std::unordered_set<uint32_t> connids_to_remove;
+ for (unsigned i = 0; i < routes_to_remove.size(); i++) {
+ connids_to_remove.insert(routes_to_remove[i]->face_id);
+ if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing route from forwarder.");
+ }
+ }
+
+ // remove connection
+ if (hc_connection_list(sock_, &data) < 0) {
+ hc_data_free(data);
+ return;
+ }
+
+ // collects pointerst to the connections using the conn IDs
+ std::vector<hc_connection_t *> conns_to_remove;
+ foreach_connection(c, data) {
+ if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
+ // conn found
+ conns_to_remove.push_back(c);
+ }
+ }
+
+ if (conns_to_remove.size() == 0) {
+ // nothing else to do here
+ hc_data_free(data);
+ return;
+ }
+
+ for (unsigned i = 0; i < conns_to_remove.size(); i++) {
+ if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing connection from forwarder.");
+ }
+ }
+
+ hc_data_free(data);
+}
+
+void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info,
+ uint8_t max_try,
+ asio::steady_timer *timer,
+ SetRouteCallback callback) {
+ int ret = tryToCreateFaceAndRoute(route_info.get());
+
+ if (ret < 0 && max_try > 0) {
+ max_try--;
+ timer->expires_from_now(std::chrono::milliseconds(500));
+ timer->async_wait([this, _route_info = std::move(route_info), max_try,
+ timer, callback](std::error_code ec) {
+ if (ec) return;
+ internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
+ std::move(callback));
+ });
+ return;
+ }
+
+ if (max_try == 0 && ret < 0) {
+ pending_add_route_counter_--;
+ external_ioservice_.post([callback]() { callback(false, ~0); });
+ } else {
+ pending_add_route_counter_--;
+ route_status_[route_id_] = std::move(route_info);
+ external_ioservice_.post(
+ [route_id = route_id_, callback]() { callback(route_id, true); });
+ route_id_++;
+ }
+
+ delete timer;
+}
+
+int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) {
+ return -1;
+ }
+
+ bool found = false;
+ uint32_t face_id;
+
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ found = true;
+
+ ip_address_t remote_ip;
+ if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_face_t face;
+ memset(&face, 0, sizeof(hc_face_t));
+
+ face.face.type = FACE_TYPE_UDP;
+ face.face.family = route_info->family;
+ face.face.local_addr = l->local_addr;
+ face.face.remote_addr = remote_ip;
+ face.face.local_port = l->local_port;
+ face.face.remote_port = route_info->remote_port;
+
+ if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ if (hc_face_create(sock_, &face) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ face_id = face.id;
+ break;
+ }
+ }
+
+ if (!found) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ ip_address_t route_ip;
+ hc_route_t route;
+
+ if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ route.face_id = face_id;
+ route.family = AF_INET6;
+ route.remote_addr = route_ip;
+ route.len = route_info->route_len;
+ route.cost = 1;
+
+ if (hc_route_create(sock_, &route) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_data_free(data);
+ return 0;
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc
index 729eb3aeb..4b6b78d55 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc
@@ -13,33 +13,48 @@
* limitations under the License.
*/
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_session.h>
+#include <hicn/transport/http/request.h>
#include <hicn/transport/http/response.h>
#include <experimental/algorithm>
#include <experimental/functional>
#include <iostream>
+constexpr char HTTPMessageFastParser::http_ok[];
+constexpr char HTTPMessageFastParser::http_cors[];
+constexpr char HTTPMessageFastParser::http_failed[];
+
std::string HTTPMessageFastParser::numbers = "0123456789";
-std::string HTTPMessageFastParser::content_length = "Content-Length";
-std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding";
+std::string HTTPMessageFastParser::content_length = "content-length";
+std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding";
std::string HTTPMessageFastParser::chunked = "chunked";
-std::string HTTPMessageFastParser::cache_control = "Cache-Control";
+std::string HTTPMessageFastParser::cache_control = "cache-control";
std::string HTTPMessageFastParser::mpd = "mpd";
-std::string HTTPMessageFastParser::connection = "Connection";
+std::string HTTPMessageFastParser::connection = "connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
-HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
- std::size_t length) {
- HTTPHeaders ret;
- std::string http_version;
- std::string status_code;
- std::string status_string;
-
- if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version,
- status_code, status_string)) {
- return ret;
+void HTTPMessageFastParser::getHeaders(const uint8_t *headers,
+ std::size_t length, bool request,
+ transport::Metadata *metadata) {
+ if (request) {
+ transport::RequestMetadata *_metadata =
+ (transport::RequestMetadata *)(metadata);
+
+ if (transport::http::HTTPRequest::parseHeaders(
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->method, _metadata->path)) {
+ return;
+ }
+ } else {
+ transport::ResponseMetadata *_metadata =
+ (transport::ResponseMetadata *)(metadata);
+
+ if (transport::http::HTTPResponse::parseHeaders(
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->status_code, _metadata->status_string)) {
+ return;
+ }
}
throw std::runtime_error("Error parsing response headers.");
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
new file mode 100644
index 000000000..262fcb8e1
--- /dev/null
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/http-proxy/http_proxy.h>
+#include <hicn/http-proxy/http_session.h>
+#include <hicn/http-proxy/utils.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
+
+namespace transport {
+
+using core::Interest;
+using core::Name;
+using interface::ConsumerCallbacksOptions;
+using interface::ConsumerInterestCallback;
+using interface::ConsumerSocket;
+using interface::TransportProtocolAlgorithms;
+
+class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
+ public:
+ HTTPClientConnectionCallback(TcpReceiver& tcp_receiver,
+ utils::EventThread& thread)
+ : tcp_receiver_(tcp_receiver),
+ thread_(thread),
+ prefix_hash_(tcp_receiver_.prefix_hash_),
+ consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()),
+ session_(nullptr),
+ current_size_(0) {
+ consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processLeavingInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processInterestRetx, this,
+ std::placeholders::_1, std::placeholders::_2));
+ consumer_.connect();
+ }
+
+ void stop() { session_->close(); }
+
+ void setHttpSession(asio::ip::tcp::socket&& socket) {
+ session_ = std::make_unique<HTTPSession>(
+ std::move(socket),
+ std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, std::placeholders::_4,
+ std::placeholders::_5),
+ [this](asio::ip::tcp::socket& socket) -> bool {
+ try {
+ std::string remote_address =
+ socket.remote_endpoint().address().to_string();
+ std::uint16_t remote_port = socket.remote_endpoint().port();
+ TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(),
+ remote_port);
+ } catch (std::system_error& e) {
+ // Do nothing
+ }
+
+ consumer_.stop();
+ request_buffer_queue_.clear();
+ tcp_receiver_.onClientDisconnect(this);
+ return false;
+ });
+
+ current_size_ = 0;
+ }
+
+ private:
+ void consumeNextRequest() {
+ if (request_buffer_queue_.size() == 0) {
+ TRANSPORT_LOGD("No additional requests to process.");
+ return;
+ }
+
+ auto& buffer = request_buffer_queue_.front().second;
+ uint64_t request_hash =
+ utils::hash::fnv64_buf(buffer.data(), buffer.size());
+
+ std::stringstream name;
+ name << prefix_hash_.substr(0, prefix_hash_.length() - 2);
+
+ for (uint16_t* word = (uint16_t*)&request_hash;
+ std::size_t(word) <
+ (std::size_t(&request_hash) + sizeof(request_hash));
+ word++) {
+ name << ":" << std::hex << *word;
+ }
+
+ name << "|0";
+
+ // Non blocking consume :)
+ consumer_.consume(Name(name.str()));
+ }
+
+ // tcp callbacks
+
+ void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last,
+ bool headers, Metadata* metadata) {
+ if (headers) {
+ // Add the request to the request queue
+ RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata);
+ tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size),
+ _metadata->path);
+ if (TRANSPORT_EXPECT_FALSE(
+ _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) {
+ /**
+ * It seems this request is for us.
+ * Get hicn parameters.
+ */
+ processClientRequest(_metadata);
+ return;
+ }
+ } else {
+ // Append payload chunk to last request added. Here we are assuming
+ // HTTP/1.1.
+ tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size));
+ }
+
+ current_size_ += size;
+
+ if (is_last) {
+ TRANSPORT_LOGD("Request received: %s",
+ std::string((const char*)tmp_buffer_.first->data(),
+ tmp_buffer_.first->length())
+ .c_str());
+ if (current_size_ < 1400) {
+ request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
+ } else {
+ TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.",
+ current_size_);
+ session_->close();
+ current_size_ = 0;
+ return;
+ }
+
+ if (!consumer_.isRunning()) {
+ TRANSPORT_LOGD(
+ "Consumer stopped, triggering consume from TCP session "
+ "handler..");
+ consumeNextRequest();
+ }
+
+ current_size_ = 0;
+ }
+ }
+
+ // hicn callbacks
+
+ void processLeavingInterest(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
+ if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ void processInterestRetx(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
+ if (interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ bool isBufferMovable() noexcept { return true; }
+ void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {}
+ void readDataAvailable(size_t length) noexcept {}
+ size_t maxBufferSize() const { return 64 * 1024; }
+
+ void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept {
+ // Response received. Send it back to client
+ auto _buffer = buffer.release();
+ TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length());
+ session_->send(_buffer, []() {});
+ }
+
+ void readError(const std::error_code ec) noexcept {
+ TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session.");
+ session_->close();
+ }
+
+ void readSuccess(std::size_t total_size) noexcept {
+ request_buffer_queue_.pop_front();
+ consumeNextRequest();
+ }
+
+ void processClientRequest(RequestMetadata* metadata) {
+ auto it = metadata->headers.find("hicn");
+ if (it == metadata->headers.end()) {
+ /*
+ * Probably it is an OPTION message for access control.
+ * Let's grant it!
+ */
+ if (metadata->method == "OPTIONS") {
+ session_->send(
+ (const uint8_t*)HTTPMessageFastParser::http_cors,
+ std::strlen(HTTPMessageFastParser::http_cors), [this]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent OPTIONS to client %s:%d",
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ }
+ } else {
+ tcp_receiver_.parseHicnHeader(
+ it->second, [this](bool result, std::string configured_prefix) {
+ const char* reply = nullptr;
+ if (result) {
+ reply = HTTPMessageFastParser::http_ok;
+ } else {
+ reply = HTTPMessageFastParser::http_failed;
+ }
+
+ /* Route created. Send back a 200 OK to client */
+ session_->send(
+ (const uint8_t*)reply, std::strlen(reply), [this, result]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent %d response to client %s:%d", result,
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ });
+ }
+ }
+
+ private:
+ TcpReceiver& tcp_receiver_;
+ utils::EventThread& thread_;
+ std::string& prefix_hash_;
+ ConsumerSocket consumer_;
+ std::unique_ptr<HTTPSession> session_;
+ std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>>
+ request_buffer_queue_;
+ std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_;
+ std::size_t current_size_;
+};
+
+TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
+ const std::string& ipv6_first_word)
+ : Receiver(),
+ listener_(thread_.getIoService(), port,
+ std::bind(&TcpReceiver::onNewConnection, this,
+ std::placeholders::_1)),
+ prefix_(prefix),
+ ipv6_first_word_(ipv6_first_word),
+ prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)),
+ forwarder_config_(
+ thread_.getIoService(),
+ [this](std::error_code ec) {
+ if (!ec) {
+ listener_.doAccept();
+ for (int i = 0; i < 10; i++) {
+ http_clients_.emplace_back(
+ new HTTPClientConnectionCallback(*this, thread_));
+ }
+ }
+ }),
+ stopped_(false) {
+ forwarder_config_.tryToConnectToForwarder();
+}
+
+void TcpReceiver::stop() {
+ thread_.add([this]() {
+ stopped_ = true;
+
+ /* Stop the listener */
+ listener_.stop();
+
+ /* Close connection with forwarder */
+ forwarder_config_.close();
+
+ /* Stop the used http clients */
+ for (auto& client : used_http_clients_) {
+ client->stop();
+ }
+
+ /* Delete unused clients */
+ for (auto& client : http_clients_) {
+ delete client;
+ }
+ });
+}
+
+void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {
+ if (stopped_) {
+ delete client;
+ return;
+ }
+
+ http_clients_.emplace_front(client);
+ used_http_clients_.erase(client);
+}
+
+void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) {
+ if (http_clients_.size() == 0) {
+ // Create new HTTPClientConnectionCallback
+ TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback.");
+ http_clients_.emplace_back(
+ new HTTPClientConnectionCallback(*this, thread_));
+ }
+
+ // Get new HTTPClientConnectionCallback
+ HTTPClientConnectionCallback* c = http_clients_.front();
+ http_clients_.pop_front();
+
+ // Set http session
+ c->setHttpSession(std::move(socket));
+
+ // Move it to used clients
+ used_http_clients_.insert(c);
+}
+
+void HTTPProxy::setupSignalHandler() {
+ signals_.async_wait([this](const std::error_code& ec, int signal_number) {
+ if (!ec) {
+ TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number);
+ stop();
+ }
+ });
+}
+
+void HTTPProxy::stop() {
+ for (auto& receiver : receivers_) {
+ receiver->stop();
+ }
+
+ for (auto& receiver : receivers_) {
+ receiver->stopAndJoinThread();
+ }
+
+ signals_.cancel();
+}
+
+HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread)
+ : signals_(main_io_context_, SIGINT, SIGQUIT) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params));
+ receivers_.emplace_back(std::make_unique<TcpReceiver>(
+ params.tcp_listen_port, params.prefix, params.first_ipv6_word));
+ }
+
+ setupSignalHandler();
+}
+
+HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread)
+ : signals_(main_io_context_, SIGINT, SIGQUIT) {
+ for (uint16_t i = 0; i < n_thread; i++) {
+ receivers_.emplace_back(std::make_unique<IcnReceiver>(
+ params.prefix, params.first_ipv6_word, params.origin_address,
+ params.origin_port, params.cache_size, params.mtu,
+ params.content_lifetime, params.manifest));
+ }
+
+ setupSignalHandler();
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc
index a9b889941..6b91c12c3 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -13,48 +13,95 @@
* limitations under the License.
*/
-#include "ATSConnector.h"
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_proxy.h>
#include <hicn/transport/utils/branch_prediction.h>
#include <hicn/transport/utils/log.h>
+
#include <iostream>
namespace transport {
-ATSConnector::ATSConnector(asio::io_service &io_service,
- std::string &ip_address, std::string &port,
- ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback)
+HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
+ std::string &port,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool client)
: io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
endpoint_iterator_(resolver_.resolve({ip_address, port})),
timer_(io_service),
+ reverse_(client),
is_reconnection_(false),
data_available_(false),
content_length_(0),
is_last_chunk_(false),
chunked_(false),
receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback) {
+ on_connection_closed_callback_(on_connection_closed_callback) {
input_buffer_.prepare(buffer_size + 2048);
state_ = ConnectorState::CONNECTING;
+
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
+
doConnect();
}
-ATSConnector::~ATSConnector() {}
+HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
+ ContentReceivedCallback receive_callback,
+ OnConnectionClosed on_connection_closed_callback,
+ bool client)
+ :
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket.get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket.get_executor().context())),
+#endif
+ socket_(std::move(socket)),
+ resolver_(io_service_),
+ timer_(io_service_),
+ reverse_(client),
+ is_reconnection_(false),
+ data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
+ receive_callback_(receive_callback),
+ on_connection_closed_callback_(on_connection_closed_callback) {
+ input_buffer_.prepare(buffer_size + 2048);
+ state_ = ConnectorState::CONNECTED;
+ asio::ip::tcp::no_delay noDelayOption(true);
+ socket_.set_option(noDelayOption);
-void ATSConnector::send(const uint8_t *packet, std::size_t len,
- ContentSentCallback &&content_sent) {
- asio::async_write(
- socket_, asio::buffer(packet, len),
- [content_sent = std::move(content_sent)](
- std::error_code ec, std::size_t /*length*/) { content_sent(); });
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
+ doReadHeader();
}
-void ATSConnector::send(utils::MemBuf *buffer,
- ContentSentCallback &&content_sent) {
+HTTPSession::~HTTPSession() {}
+
+void HTTPSession::send(const uint8_t *packet, std::size_t len,
+ ContentSentCallback &&content_sent) {
+ io_service_.dispatch([this, packet, len, content_sent]() {
+ asio::async_write(socket_, asio::buffer(packet, len),
+ [content_sent = std::move(content_sent)](
+ std::error_code ec, std::size_t /*length*/) {
+ if (!ec) {
+ content_sent();
+ }
+ });
+ });
+}
+
+void HTTPSession::send(utils::MemBuf *buffer,
+ ContentSentCallback &&content_sent) {
io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() {
bool write_in_progress = !write_msgs_.empty();
write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer),
@@ -70,24 +117,25 @@ void ATSConnector::send(utils::MemBuf *buffer,
});
}
-void ATSConnector::close() {
+void HTTPSession::close() {
if (state_ != ConnectorState::CLOSED) {
state_ = ConnectorState::CLOSED;
if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
// on_disconnect_callback_();
}
}
}
-void ATSConnector::doWrite() {
+void HTTPSession::doWrite() {
auto &buffer = write_msgs_.front().first;
asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_FALSE(!ec)) {
- TRANSPORT_LOGD("Content successfully sent!");
+ TRANSPORT_LOGD("Content successfully sent! %zu",
+ length);
write_msgs_.front().second();
write_msgs_.pop_front();
if (!write_msgs_.empty()) {
@@ -99,12 +147,14 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
+void HTTPSession::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
+ bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false)
+ : !content_length_;
+ receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr);
input_buffer_.consume(input_buffer_.size());
if (!content_length_) {
@@ -117,7 +167,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
auto to_read =
content_length_ >= buffer_size ? buffer_size : content_length_;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
@@ -126,8 +176,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
}
}
-void ATSConnector::doReadBody(std::size_t body_size,
- std::size_t additional_bytes) {
+void HTTPSession::doReadBody(std::size_t body_size,
+ std::size_t additional_bytes) {
auto bytes_to_read =
body_size > additional_bytes ? (body_size - additional_bytes) : 0;
@@ -140,14 +190,16 @@ void ATSConnector::doReadBody(std::size_t body_size,
if (to_read > 0) {
content_length_ = bytes_to_read;
asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
+ std::bind(&HTTPSession::handleRead, this,
std::placeholders::_1, std::placeholders::_2));
} else {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
- false);
- input_buffer_.consume(body_size);
+ if (body_size) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false, nullptr);
+ input_buffer_.consume(body_size);
+ }
if (!chunked_ || is_last_chunk_) {
doReadHeader();
@@ -157,7 +209,7 @@ void ATSConnector::doReadBody(std::size_t body_size,
}
}
-void ATSConnector::doReadChunkedHeader() {
+void HTTPSession::doReadChunkedHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n",
[this](std::error_code ec, std::size_t length) {
@@ -176,14 +228,17 @@ void ATSConnector::doReadChunkedHeader() {
});
}
-void ATSConnector::doReadHeader() {
+void HTTPSession::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
[this](std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
+ HTTPMessageFastParser::getHeaders(buffer, length, reverse_,
+ header_info_.get());
+
+ auto &headers = header_info_->headers;
// Try to get content length, if available
auto it = headers.find(HTTPMessageFastParser::content_length);
@@ -199,7 +254,8 @@ void ATSConnector::doReadHeader() {
}
}
- receive_callback_(buffer, length, !size && !chunked_, true);
+ receive_callback_(buffer, length, !size && !chunked_, true,
+ header_info_.get());
auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
@@ -215,23 +271,25 @@ void ATSConnector::doReadHeader() {
});
}
-void ATSConnector::tryReconnection() {
- TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
- if (state_ == ConnectorState::CONNECTED) {
- state_ = ConnectorState::CONNECTING;
- is_reconnection_ = true;
- io_service_.post([this]() {
- if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
- });
+void HTTPSession::tryReconnection() {
+ if (on_connection_closed_callback_(socket_)) {
+ if (state_ == ConnectorState::CONNECTED) {
+ TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
+ state_ = ConnectorState::CONNECTING;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ if (socket_.is_open()) {
+ // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ startConnectionTimer();
+ doConnect();
+ });
+ }
}
}
-void ATSConnector::doConnect() {
+void HTTPSession::doConnect() {
asio::async_connect(socket_, endpoint_iterator_,
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
@@ -263,17 +321,17 @@ void ATSConnector::doConnect() {
});
}
-bool ATSConnector::checkConnected() {
+bool HTTPSession::checkConnected() {
return state_ == ConnectorState::CONNECTED;
}
-void ATSConnector::startConnectionTimer() {
+void HTTPSession::startConnectionTimer() {
timer_.expires_from_now(std::chrono::seconds(10));
timer_.async_wait(
- std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1));
+ std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1));
}
-void ATSConnector::handleDeadline(const std::error_code &ec) {
+void HTTPSession::handleDeadline(const std::error_code &ec) {
if (!ec) {
io_service_.post([this]() {
socket_.close();
diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc
index 24b0eb5dc..23e5b5623 100644
--- a/apps/http-proxy/src/IcnReceiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-#include "IcnReceiver.h"
-#include "HTTP1.xMessageFastParser.h"
-
+#include <hicn/http-proxy/http_1x_message_fast_parser.h>
+#include <hicn/http-proxy/icn_receiver.h>
+#include <hicn/http-proxy/utils.h>
#include <hicn/transport/core/interest.h>
#include <hicn/transport/http/default_values.h>
#include <hicn/transport/utils/hash.h>
@@ -26,56 +26,31 @@
namespace transport {
-core::Prefix generatePrefix(const std::string& prefix_url,
- std::string& first_ipv6_word) {
- const char* str = prefix_url.c_str();
- uint16_t pos = 0;
-
- if (strncmp("http://", str, 7) == 0) {
- pos = 7;
- } else if (strncmp("https://", str, 8) == 0) {
- pos = 8;
- }
-
- str += pos;
-
- uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str));
-
- std::stringstream stream;
- stream << first_ipv6_word << ":0";
-
- for (uint16_t* word = (uint16_t*)&locator_hash;
- std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
- word++) {
- stream << ":" << std::hex << *word;
- }
-
- stream << "::0";
-
- return core::Prefix(stream.str(), 64);
-}
-
AsyncConsumerProducer::AsyncConsumerProducer(
- const std::string& prefix, std::string& ip_address, std::string& port,
- std::string& cache_size, std::string& mtu, std::string& first_ipv6_word,
- unsigned long default_lifetime, bool manifest)
- : prefix_(generatePrefix(prefix, first_ipv6_word)),
+ asio::io_service& io_service, const std::string& prefix,
+ const std::string& first_ipv6_word, const std::string& origin_address,
+ const std::string& origin_port, const std::string& cache_size,
+ const std::string& mtu, const std::string& content_lifetime, bool manifest)
+ : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)),
+ io_service_(io_service),
+ external_io_service_(true),
producer_socket_(),
- ip_address_(ip_address),
- port_(port),
+ ip_address_(origin_address),
+ port_(origin_port),
cache_size_(std::stoul(cache_size)),
mtu_(std::stoul(mtu)),
request_counter_(0),
- signals_(io_service_, SIGINT, SIGQUIT),
connector_(io_service_, ip_address_, port_,
std::bind(&AsyncConsumerProducer::publishContent, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
- [this]() {
+ [this](asio::ip::tcp::socket& socket) -> bool {
std::queue<interface::PublicationOptions> empty;
std::swap(response_name_queue_, empty);
+
+ return true;
}),
- default_content_lifetime_(default_lifetime) {
+ default_content_lifetime_(std::stoul(content_lifetime)) {
int ret = producer_socket_.setSocketOption(
interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_);
@@ -98,15 +73,6 @@ AsyncConsumerProducer::AsyncConsumerProducer(
}
producer_socket_.registerPrefix(prefix_);
-
- // Let the main thread to catch SIGINT and SIGQUIT
- signals_.async_wait(
- [this](const std::error_code& errorCode, int signal_number) {
- TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
- (unsigned long)request_counter_);
- producer_socket_.stop();
- connector_.close();
- });
}
void AsyncConsumerProducer::start() {
@@ -116,7 +82,19 @@ void AsyncConsumerProducer::start() {
void AsyncConsumerProducer::run() {
start();
- io_service_.run();
+
+ if (!external_io_service_) {
+ io_service_.run();
+ }
+}
+
+void AsyncConsumerProducer::stop() {
+ io_service_.post([this]() {
+ TRANSPORT_LOGI("Number of requests processed by plugin: %lu",
+ (unsigned long)request_counter_);
+ producer_socket_.stop();
+ connector_.close();
+ });
}
void AsyncConsumerProducer::doReceive() {
@@ -124,11 +102,13 @@ void AsyncConsumerProducer::doReceive() {
interface::ProducerCallbacksOptions::CACHE_MISS,
[this](interface::ProducerSocket& producer,
interface::Interest& interest) {
- // core::Name n(interest.getWritableName(), true);
- io_service_.post(std::bind(
- &AsyncConsumerProducer::manageIncomingInterest, this,
- interest.getWritableName(), interest.acquireMemBufReference(),
- interest.getPayload().release()));
+ if (interest.payloadSize() > 0) {
+ // Interest may contain http request
+ io_service_.post(std::bind(
+ &AsyncConsumerProducer::manageIncomingInterest, this,
+ interest.getWritableName(), interest.acquireMemBufReference(),
+ interest.getPayload().release()));
+ }
});
producer_socket_.connect();
@@ -141,16 +121,15 @@ void AsyncConsumerProducer::manageIncomingInterest(
auto _it = chunk_number_map_.find(name);
auto _end = chunk_number_map_.end();
- std::cout << "Received interest " << seg << std::endl;
-
if (_it != _end) {
if (_it->second.second) {
- // Content is in production
+ TRANSPORT_LOGD(
+ "Content is in production, interests will be satisfied shortly.");
return;
}
if (seg >= _it->second.first) {
- TRANSPORT_LOGI(
+ TRANSPORT_LOGD(
"Ignoring interest with name %s for a content object which does not "
"exist. (Request: %u, max: %u)",
name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first);
@@ -158,8 +137,6 @@ void AsyncConsumerProducer::manageIncomingInterest(
}
}
- std::cout << "Received interest " << seg << std::endl;
-
bool is_mpd =
HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length());
@@ -212,7 +189,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data,
}
it->second.first +=
- producer_socket_.produce(name, data, size, is_last, start_suffix);
+ producer_socket_.produceStream(name, data, size, is_last, start_suffix);
if (is_last) {
it->second.second = false;