From a27536f3d1ce6c2f46aef61a29dd1f516644e663 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 14 May 2020 20:27:11 +0200 Subject: [HICN-614] Add client HTTP proxy (TCP->hICN) Change-Id: Ieaa875edff98404676083bab91233b98ce50c8d0 Signed-off-by: Mauro Sardara --- apps/http-proxy/src/IcnReceiver.cc | 223 ------------------------------------- 1 file changed, 223 deletions(-) delete mode 100644 apps/http-proxy/src/IcnReceiver.cc (limited to 'apps/http-proxy/src/IcnReceiver.cc') diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc deleted file mode 100644 index 24b0eb5dc..000000000 --- a/apps/http-proxy/src/IcnReceiver.cc +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "IcnReceiver.h" -#include "HTTP1.xMessageFastParser.h" - -#include -#include -#include -#include - -#include -#include - -namespace transport { - -core::Prefix generatePrefix(const std::string& prefix_url, - std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::0"; - - return core::Prefix(stream.str(), 64); -} - -AsyncConsumerProducer::AsyncConsumerProducer( - const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, std::string& first_ipv6_word, - unsigned long default_lifetime, bool manifest) - : prefix_(generatePrefix(prefix, first_ipv6_word)), - producer_socket_(), - ip_address_(ip_address), - port_(port), - cache_size_(std::stoul(cache_size)), - mtu_(std::stoul(mtu)), - request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), - connector_(io_service_, ip_address_, port_, - std::bind(&AsyncConsumerProducer::publishContent, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4), - [this]() { - std::queue empty; - std::swap(response_name_queue_, empty); - }), - default_content_lifetime_(default_lifetime) { - int ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: output buffer size has not been set."); - } - - ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: impossible to enable signatures."); - } - - ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); - - if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOGD("Warning: mtu has not been set."); - } - - producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); -} - -void AsyncConsumerProducer::start() { - TRANSPORT_LOGD("Starting listening"); - doReceive(); -} - -void AsyncConsumerProducer::run() { - start(); - io_service_.run(); -} - -void AsyncConsumerProducer::doReceive() { - producer_socket_.setSocketOption( - interface::ProducerCallbacksOptions::CACHE_MISS, - [this](interface::ProducerSocket& producer, - interface::Interest& interest) { - // core::Name n(interest.getWritableName(), true); - io_service_.post(std::bind( - &AsyncConsumerProducer::manageIncomingInterest, this, - interest.getWritableName(), interest.acquireMemBufReference(), - interest.getPayload().release())); - }); - - producer_socket_.connect(); -} - -void AsyncConsumerProducer::manageIncomingInterest( - core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { - auto seg = name.getSuffix(); - name.setSuffix(0); - auto _it = chunk_number_map_.find(name); - auto _end = chunk_number_map_.end(); - - std::cout << "Received interest " << seg << std::endl; - - if (_it != _end) { - if (_it->second.second) { - // Content is in production - return; - } - - if (seg >= _it->second.first) { - TRANSPORT_LOGI( - "Ignoring interest with name %s for a content object which does not " - "exist. (Request: %u, max: %u)", - name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); - return; - } - } - - std::cout << "Received interest " << seg << std::endl; - - bool is_mpd = - HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); - - auto pair = chunk_number_map_.emplace(name, std::pair(0, 0)); - if (!pair.second) { - pair.first->second.first = 0; - } - - pair.first->second.second = true; - - response_name_queue_.emplace(std::move(name), - is_mpd ? 1000 : default_content_lifetime_); - - connector_.send(payload, [packet = std::move(packet)]() {}); -} - -void AsyncConsumerProducer::publishContent(const uint8_t* data, - std::size_t size, bool is_last, - bool headers) { - uint32_t start_suffix = 0; - - if (response_name_queue_.empty()) { - std::cerr << "Aborting due tue empty request queue" << std::endl; - abort(); - } - - interface::PublicationOptions& options = response_name_queue_.front(); - - int ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - options.getLifetime()); - - if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { - TRANSPORT_LOGD("Warning: content object lifetime has not been set."); - } - - const interface::Name& name = options.getName(); - - auto it = chunk_number_map_.find(name); - if (it == chunk_number_map_.end()) { - std::cerr << "Aborting due to response not found in ResposeInfo map." - << std::endl; - abort(); - } - - start_suffix = it->second.first; - - if (headers) { - request_counter_++; - } - - it->second.first += - producer_socket_.produce(name, data, size, is_last, start_suffix); - - if (is_last) { - it->second.second = false; - response_name_queue_.pop(); - } -} - -} // namespace transport -- cgit 1.2.3-korg