diff options
Diffstat (limited to 'apps/http-proxy/src/icn_receiver.cc')
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc new file mode 100644 index 000000000..3bd5525cc --- /dev/null +++ b/apps/http-proxy/src/icn_receiver.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icn_receiver.h" + +#include <hicn/transport/core/interest.h> +#include <hicn/transport/http/default_values.h> +#include <hicn/transport/utils/hash.h> +#include <hicn/transport/utils/log.h> + +#include <functional> +#include <memory> + +#include "HTTP1.xMessageFastParser.h" +#include "utils.h" + +namespace transport { + +AsyncConsumerProducer::AsyncConsumerProducer( + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, bool manifest) + : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), + io_service_(io_service), + external_io_service_(true), + producer_socket_(), + ip_address_(origin_address), + port_(origin_port), + cache_size_(std::stoul(cache_size)), + mtu_(std::stoul(mtu)), + request_counter_(0), + signals_(io_service_, SIGINT, SIGQUIT), + connector_(io_service_, ip_address_, port_, + std::bind(&AsyncConsumerProducer::publishContent, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4), + [this](asio::ip::tcp::socket& socket) -> bool { + std::queue<interface::PublicationOptions> empty; + std::swap(response_name_queue_, empty); + + return true; + }), + default_content_lifetime_(std::stoul(content_lifetime)) { + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: output buffer size has not been set."); + } + + ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: impossible to enable signatures."); + } + + ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: mtu has not been set."); + } + + producer_socket_.registerPrefix(prefix_); + + // Let the main thread to catch SIGINT and SIGQUIT + signals_.async_wait( + [this](const std::error_code& errorCode, int signal_number) { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + +void AsyncConsumerProducer::start() { + TRANSPORT_LOGD("Starting listening"); + doReceive(); +} + +void AsyncConsumerProducer::run() { + start(); + + if (!external_io_service_) { + io_service_.run(); + } +} + +void AsyncConsumerProducer::doReceive() { + producer_socket_.setSocketOption( + interface::ProducerCallbacksOptions::CACHE_MISS, + [this](interface::ProducerSocket& producer, + interface::Interest& interest) { + // core::Name n(interest.getWritableName(), true); + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + }); + + producer_socket_.connect(); +} + +void AsyncConsumerProducer::manageIncomingInterest( + core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { + auto seg = name.getSuffix(); + name.setSuffix(0); + auto _it = chunk_number_map_.find(name); + auto _end = chunk_number_map_.end(); + + if (_it != _end) { + if (_it->second.second) { + TRANSPORT_LOGD( + "Content is in production, interest will be satisfied shortly."); + return; + } + + if (seg >= _it->second.first) { + TRANSPORT_LOGD( + "Ignoring interest with name %s for a content object which does not " + "exist. (Request: %u, max: %u)", + name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + return; + } + } + + bool is_mpd = + HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); + + auto pair = chunk_number_map_.emplace(name, std::pair<uint32_t, bool>(0, 0)); + if (!pair.second) { + pair.first->second.first = 0; + } + + pair.first->second.second = true; + + response_name_queue_.emplace(std::move(name), + is_mpd ? 1000 : default_content_lifetime_); + + connector_.send(payload, [packet = std::move(packet)]() {}); +} + +void AsyncConsumerProducer::publishContent(const uint8_t* data, + std::size_t size, bool is_last, + bool headers) { + uint32_t start_suffix = 0; + + if (response_name_queue_.empty()) { + std::cerr << "Aborting due tue empty request queue" << std::endl; + abort(); + } + + interface::PublicationOptions& options = response_name_queue_.front(); + + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + options.getLifetime()); + + if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { + TRANSPORT_LOGD("Warning: content object lifetime has not been set."); + } + + const interface::Name& name = options.getName(); + + auto it = chunk_number_map_.find(name); + if (it == chunk_number_map_.end()) { + std::cerr << "Aborting due to response not found in ResposeInfo map." + << std::endl; + abort(); + } + + start_suffix = it->second.first; + + if (headers) { + request_counter_++; + } + + it->second.first += + producer_socket_.produce(name, data, size, is_last, start_suffix); + + if (is_last) { + it->second.second = false; + response_name_queue_.pop(); + } +} + +} // namespace transport |