From bac3da61644515f05663789b122554dc77549286 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Thu, 17 Jan 2019 13:47:57 +0100 Subject: This is the first commit of the hicn project Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f Signed-off-by: Luca Muscariello --- .../src/hicn/transport/http/server_publisher.cc | 173 +++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100755 libtransport/src/hicn/transport/http/server_publisher.cc (limited to 'libtransport/src/hicn/transport/http/server_publisher.cc') diff --git a/libtransport/src/hicn/transport/http/server_publisher.cc b/libtransport/src/hicn/transport/http/server_publisher.cc new file mode 100755 index 000000000..012f36091 --- /dev/null +++ b/libtransport/src/hicn/transport/http/server_publisher.cc @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace http { + +HTTPServerPublisher::HTTPServerPublisher(const core::Name &content_name) + : content_name_(content_name, true) { + std::string identity = "acceptor_producer"; + producer_ = std::make_unique(io_service_); + // utils::Identity::generateIdentity(identity)); + core::Prefix publisher_prefix(content_name_, 128); + producer_->registerPrefix(publisher_prefix); +} + +HTTPServerPublisher::~HTTPServerPublisher() { + if (timer_) { + this->timer_->cancel(); + } +} + +HTTPServerPublisher &HTTPServerPublisher::attachPublisher() { + // Create a new publisher + producer_->setSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE, + 1410_U32); + producer_->connect(); + return *this; +} + +HTTPServerPublisher &HTTPServerPublisher::setTimeout( + const std::chrono::milliseconds &timeout, bool timeout_renewal) { + std::shared_ptr portal; + producer_->getSocketOption(GeneralTransportOptions::PORTAL, portal); + timer_ = + std::make_unique(portal->getIoService(), timeout); + + wait_callback_ = [this](const std::error_code &e) { + if (!e) { + producer_->stop(); + } + }; + + if (timeout_renewal) { + interest_enter_callback_ = [this, timeout](ProducerSocket &p, + const Interest &interest) { + this->timer_->cancel(); + this->timer_->expires_from_now(timeout); + this->timer_->async_wait(wait_callback_); + }; + + producer_->setSocketOption( + ProducerCallbacksOptions::CACHE_HIT, + (ProducerInterestCallback)interest_enter_callback_); + } + + timer_->async_wait(wait_callback_); + + return *this; +} + +void HTTPServerPublisher::publishContent( + const uint8_t *buf, size_t buffer_size, + std::chrono::milliseconds content_lifetime, bool is_last) { + if (producer_) { + producer_->setSocketOption( + GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + static_cast(content_lifetime.count())); + producer_->produce(content_name_, buf, buffer_size, is_last); + // producer_->setSocketOption(ProducerCallbacksOptions::CACHE_MISS, + // [this](ProducerSocket &p, const + // core::Interest &interest){ + // producer_->stop(); + // }); + } +} + +template +void HTTPServerPublisher::asyncPublishContent( + const uint8_t *buf, size_t buffer_size, + std::chrono::milliseconds content_lifetime, Handler &&handler, + bool is_last) { + if (producer_) { + producer_->setSocketOption( + GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + static_cast(content_lifetime.count())); + producer_->asyncProduce(content_name_, buf, buffer_size, + std::forward(handler), is_last); + } +} + +void HTTPServerPublisher::serveClients() { producer_->serveForever(); } + +void HTTPServerPublisher::stop() { + std::shared_ptr portal_ptr; + producer_->getSocketOption(GeneralTransportOptions::PORTAL, portal_ptr); + portal_ptr->getIoService().stop(); +} + +ProducerSocket &HTTPServerPublisher::getProducer() { return *producer_; } + +void HTTPServerPublisher::setPublisherName(std::string &name, + std::string &mask) { + // Name represents the last 64 bits of the ipv6 address. + // It is an ipv6 address with the first 64 bits set to 0 + uint16_t i; + std::string s = content_name_.toString(); + std::shared_ptr sockaddr = content_name_.getAddress(); + in6_addr name_ipv6 = ((core::Sockaddr6 *)sockaddr.get())->sin6_addr; + + in6_addr bitmask, new_address, _name; + + if (inet_pton(AF_INET6, mask.c_str(), &bitmask) != 1) { + throw errors::RuntimeException("Error during conversion to ipv6 address."); + } + + if (inet_pton(AF_INET6, name.c_str(), &_name) != 1) { + throw errors::RuntimeException("Error during conversion to ipv6 address."); + } + + for (i = 0; i < sizeof(new_address.s6_addr); i++) { + new_address.s6_addr[i] = name_ipv6.s6_addr[i] & bitmask.s6_addr[i]; + } + + for (i = 0; i < sizeof(new_address.s6_addr); i++) { + new_address.s6_addr[i] |= _name.s6_addr[i] & ~bitmask.s6_addr[i]; + } + + // Effectively change the name + char str[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &new_address, str, INET6_ADDRSTRLEN); + std::string str2(str); + + core::Name new_name(str2, 0); + + // If the new name differs from the one required by the consumer part, send a + // manifest + if (!new_name.equals(content_name_, false)) { + // Publish manifest pointing to the new name + + auto manifest = + std::make_shared(content_name_.setSuffix(0)); + + content_name_ = core::Name(str2, 0); + + // manifest->setNameList(content_name_); + manifest->setLifetime(4000 * 1000); + manifest->encode(); + producer_->produce(*manifest); + + core::Prefix ns(content_name_, 128); + producer_->registerPrefix(ns); + } +} + +} // namespace http + +} // namespace transport -- cgit 1.2.3-korg