diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 834 |
1 files changed, 497 insertions, 337 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index cdc882d81..83cd23ac6 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,15 +13,21 @@ * limitations under the License. */ +#include <glog/logging.h> #include <hicn/transport/core/global_object_pool.h> #include <implementation/socket_producer.h> #include <protocols/prod_protocol_rtc.h> +#include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> #include <stdlib.h> #include <time.h> #include <unordered_set> +extern "C" { +#include <hicn/util/bitmap.h> +} + namespace transport { namespace protocol { @@ -31,117 +37,144 @@ RTCProductionProtocol::RTCProductionProtocol( implementation::ProducerSocket *icn_socket) : ProductionProtocol(icn_socket), current_seg_(1), + prev_produced_bytes_(0), + prev_produced_packets_(0), produced_bytes_(0), produced_packets_(0), - produced_fec_packets_(0), - max_packet_production_(1), - bytes_production_rate_(0), + max_packet_production_(UINT32_MAX), + bytes_production_rate_(UINT32_MAX), packets_production_rate_(0), - fec_packets_production_rate_(0), - last_round_(std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count()), + last_produced_data_ts_(0), + last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), - queue_timer_on_(false), - consumer_in_sync_(false), - on_consumer_in_sync_(nullptr) { - srand((unsigned int)time(NULL)); - prod_label_ = rand() % 256; + pending_fec_pace_(false), + max_len_(0), + queue_len_(0), + data_aggregation_(true), + data_aggregation_timer_switch_(false) { + std::uniform_int_distribution<> dis(0, 255); + prod_label_ = dis(gen_); cache_label_ = (prod_label_ + 1) % 256; - interests_queue_timer_ = - std::make_unique<asio::steady_timer>(portal_->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + fec_pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + app_packets_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); setOutputBufferSize(10000); - scheduleRoundTimer(); +} + +RTCProductionProtocol::~RTCProductionProtocol() {} + +void RTCProductionProtocol::setProducerParam() { + // Flow name: here we assume there is only one prefix registered in the portal + flow_name_ = portal_->getServedNamespaces().begin()->makeName(); + + // Default format + core::Packet::Format default_format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + default_format); // FEC using namespace std::placeholders; enableFEC(std::bind(&RTCProductionProtocol::onFecPackets, this, _1), std::bind(&RTCProductionProtocol::getBuffer, this, _1)); -} -RTCProductionProtocol::~RTCProductionProtocol() {} - -void RTCProductionProtocol::registerNamespaceWithNetwork( - const Prefix &producer_namespace) { - ProductionProtocol::registerNamespaceWithNetwork(producer_namespace); - - flow_name_ = producer_namespace.getName(); - auto family = flow_name_.getAddressFamily(); - - switch (family) { - case AF_INET6: - data_header_size_ = - signer_ && !making_manifest_ - ? (uint32_t)Packet::getHeaderSizeFromFormat( - HF_INET6_TCP_AH, signer_->getSignatureFieldSize()) - : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); - ; - break; - case AF_INET: - data_header_size_ = - signer_ && !making_manifest_ - ? (uint32_t)Packet::getHeaderSizeFromFormat( - HF_INET_TCP_AH, signer_->getSignatureFieldSize()) - : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); - break; - default: - throw errors::RuntimeException("Unknown name format."); - } + // Aggregated data + socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA, + data_aggregation_); + + size_t signature_size = signer_->getSignatureFieldSize(); + data_header_format_ = {!manifest_max_capacity_ + ? Packet::toAHFormat(default_format) + : default_format, + !manifest_max_capacity_ ? signature_size : 0}; + manifest_header_format_ = {Packet::toAHFormat(default_format), + signature_size}; + nack_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + fec_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + + // Initialize verifier for aggregated interests + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER, + verifier); + verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0); + + // Schedule round timer + scheduleRoundTimer(); } void RTCProductionProtocol::scheduleRoundTimer() { round_timer_->expires_from_now( std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL)); - round_timer_->async_wait([this](std::error_code ec) { + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - updateStats(); + + auto sp = self.lock(); + if (sp && sp->isRunning()) { + sp->updateStats(true); + } }); } -void RTCProductionProtocol::updateStats() { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); +void RTCProductionProtocol::updateStats(bool new_round) { + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t duration = now - last_round_; - if (duration == 0) duration = 1; + if (!new_round) { + duration += rtc::PRODUCER_STATS_INTERVAL; + } else { + prev_produced_bytes_ = 0; + prev_produced_packets_ = 0; + } + double per_second = rtc::MILLI_IN_A_SEC / duration; uint32_t prev_packets_production_rate = packets_production_rate_; - bytes_production_rate_ = ceil((double)produced_bytes_ * per_second); - packets_production_rate_ = ceil((double)produced_packets_ * per_second); - fec_packets_production_rate_ = - ceil((double)produced_fec_packets_ * per_second); - - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Updating production rate: produced_bytes_ = " << produced_bytes_ - << " bps = " << bytes_production_rate_; + // bytes_production_rate_ does not take into account FEC!!! this is because + // each client requests a differen amount of FEC packet so the client itself + // increase the production rate in the right way + bytes_production_rate_ = + ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second); + packets_production_rate_ = + ceil((double)(produced_packets_ + prev_produced_packets_) * per_second); + + if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) { + // add fec packets looking at the fec code. we don't use directly the number + // of fec packets produced in 1 round because it may happen that different + // numbers of blocks are generated during the rounds and this creates + // inconsistencies in the estimation of the production rate + uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); + uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); + + packets_production_rate_ += + ceil((double)packets_production_rate_ / (double)k) * (n - k); + } // update the production rate as soon as it increases by 10% with respect to // the last round max_packet_production_ = - produced_packets_ + ceil((double)produced_packets_ * 0.1); + produced_packets_ + ceil((double)produced_packets_ * 0.10); if (max_packet_production_ < rtc::WIN_MIN) max_packet_production_ = rtc::WIN_MIN; - if (packets_production_rate_ != 0) { - allow_delayed_nacks_ = false; - } else if (prev_packets_production_rate == 0) { - // at least 2 rounds with production rate = 0 + if (packets_production_rate_ <= rtc::MIN_PRODUCTION_RATE || + prev_packets_production_rate <= rtc::MIN_PRODUCTION_RATE) { allow_delayed_nacks_ = true; + } else { + // at least 2 rounds with enough packets + allow_delayed_nacks_ = false; } - // check if the production rate is decreased. if yes send nacks if needed - if (prev_packets_production_rate < packets_production_rate_) { - sendNacksForPendingInterests(); + if (new_round) { + prev_produced_bytes_ = produced_bytes_; + prev_produced_packets_ = produced_packets_; + produced_bytes_ = 0; + produced_packets_ = 0; + last_round_ = now; + scheduleRoundTimer(); } - - produced_bytes_ = 0; - produced_packets_ = 0; - produced_fec_packets_ = 0; - last_round_ = now; - scheduleRoundTimer(); } uint32_t RTCProductionProtocol::produceStream( @@ -164,75 +197,275 @@ void RTCProductionProtocol::produce(ContentObject &content_object) { uint32_t RTCProductionProtocol::produceDatagram( const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { std::size_t buffer_size = buffer->length(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Maybe Sending content object: " << content_name; + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name; + uint32_t data_packet_size; socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); - - if (TRANSPORT_EXPECT_FALSE((buffer_size + data_header_size_ + - rtc::DATA_HEADER_SIZE) > data_packet_size)) { + // this is a source packet but we check the fec header size of FEC packet in + // order to leave room for the header when FEC packets will be generated + uint32_t fec_header = 0; + if (fec_encoder_) fec_encoder_->getFecHeaderSize(true); + uint32_t headers_size = + (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first, + data_header_format_.second) + + rtc::DATA_HEADER_SIZE + fec_header; + if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) { return 0; } + if (!data_aggregation_) { + // if data aggregation is off emptyQueue will always return doing nothing + emptyQueue(); + + sendManifest(content_name); + + // create content object + auto content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + data_header_format_.first, data_header_format_.second); + + // add rtc header to the payload + struct rtc::data_packet_t header; + content_object->appendPayload((const uint8_t *)&header, + rtc::DATA_HEADER_SIZE); + content_object->appendPayload(buffer->data(), buffer->length()); + + // schedule actual sending on internal thread + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(content_object)}, + content_name]() mutable { + produceInternal(std::move(content_object), content_name); + }); + } else { + // XXX here we assume that all the packets that we push to the queue have + // the same name + auto app_pkt = utils::MemBuf::copyBuffer(buffer->data(), buffer->length()); + addPacketToQueue(std::move(app_pkt)); + } + + return 1; +} + +void RTCProductionProtocol::addPacketToQueue( + std::unique_ptr<utils::MemBuf> &&buffer) { + std::size_t buffer_size = buffer->length(); + if ((queue_len_ + buffer_size) > rtc::MAX_RTC_PAYLOAD_SIZE) { + emptyQueue(); // this should guaranty that the generated packet will never + // be larger than an MTU + } + + waiting_app_packets_.push(std::move(buffer)); + if (max_len_ < buffer_size) max_len_ = buffer_size; + queue_len_ += buffer_size; + + if (waiting_app_packets_.size() >= rtc::MAX_AGGREGATED_PACKETS) { + emptyQueue(); + } + + if (waiting_app_packets_.size() >= 1 && !data_aggregation_timer_switch_) { + data_aggregation_timer_switch_ = true; + app_packets_timer_->expires_from_now( + std::chrono::milliseconds(rtc::AGGREGATED_PACKETS_TIMER)); + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + app_packets_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->data_aggregation_timer_switch_) return; + ptr->emptyQueue(); + } + }); + } +} + +void RTCProductionProtocol::emptyQueue() { + if (waiting_app_packets_.size() == 0) return; // queue is empty + + Name n(flow_name_); + + // cancel timer is scheduled + if (data_aggregation_timer_switch_) { + data_aggregation_timer_switch_ = false; + app_packets_timer_->cancel(); + } + + // send a manifest beforehand if the hash buffer if full + sendManifest(n); + + // create content object auto content_object = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); + data_header_format_.first, data_header_format_.second); + // add rtc header to the payload struct rtc::data_packet_t header; content_object->appendPayload((const uint8_t *)&header, rtc::DATA_HEADER_SIZE); - content_object->appendPayload(buffer->data(), buffer->length()); - // schedule actual sending on internal thread - portal_->getIoService().dispatch([this, - content_object{std::move(content_object)}, - content_name]() mutable { - produceInternal(std::move(content_object), content_name); + // init aggregated header + rtc::AggrPktHeader hdr( + (uint8_t *)(content_object->getPayload()->data() + rtc::DATA_HEADER_SIZE), + max_len_, waiting_app_packets_.size()); + uint32_t header_size = hdr.getHeaderLen(); + content_object->append(header_size); // leave space for the aggregated header + + uint8_t index = 0; + while (waiting_app_packets_.size() != 0) { + std::unique_ptr<utils::MemBuf> pkt = + std::move(waiting_app_packets_.front()); + waiting_app_packets_.pop(); + // XXX for the moment we have a single name, so this works, otherwise we + // need to do something else + hdr.addPacketToHeader(index, pkt->length()); + // append packet + content_object->appendPayload(pkt->data(), pkt->length()); + index++; + } + + // reset queue values + max_len_ = 0; + queue_len_ = 0; + + // the packet is ready we need to send it + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(content_object)}, n]() mutable { + produceInternal(std::move(content_object), n); + }); +} + +void RTCProductionProtocol::sendManifest(const Name &name) { + if (!manifest_max_capacity_) { + return; + } + + Name manifest_name = name; + + // If there is not enough hashes to fill a manifest, return early + if (manifest_entries_.size() < manifest_max_capacity_) { + return; + } + + // Create a new manifest + std::shared_ptr<core::ContentObjectManifest> manifest = + createManifest(manifest_name.setSuffix(current_seg_)); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); + + // Fill the manifest with packet hashes that were previously saved + uint32_t nb_entries; + for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) { + if (manifest_entries_.empty()) { + break; + } + std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front(); + manifest->addEntry(front.first, front.second); + manifest_entries_.pop(); + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending manifest " << manifest_co->getName().getSuffix() + << " of size " << nb_entries; + + // Encode and send the manifest + manifest->encode(); + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(manifest_co)}, manifest_name]() mutable { + produceInternal(std::move(content_object), manifest_name); + }); +} + +std::shared_ptr<core::ContentObjectManifest> +RTCProductionProtocol::createManifest(const Name &content_name) const { + Name name(content_name); + + auth::CryptoHashType hash_algo; + socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, + hash_algo); + + uint64_t now = utils::SteadyTime::nowMs().count(); + + // Create a new manifest + std::shared_ptr<core::ContentObjectManifest> manifest = + ContentObjectManifest::createContentManifest( + manifest_header_format_.first, name, manifest_header_format_.second); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, false /* is_last */, + name); + + // Set connection parameters + manifest->setParamsRTC(ParamsRTC{ + .timestamp = now, + .prod_rate = bytes_production_rate_, + .prod_seg = current_seg_, + .fec_type = fec_type_, }); - return 1; + return manifest; } void RTCProductionProtocol::produceInternal( std::shared_ptr<ContentObject> &&content_object, const Name &content_name, bool fec) { + uint64_t now = utils::SteadyTime::nowMs().count(); + + if (fec && (now - last_produced_data_ts_) < rtc::FEC_PACING_TIME) { + paced_fec_packets_.push(std::pair<uint64_t, ContentObject::Ptr>( + now, std::move(content_object))); + postponeFecPacket(); + } else { + // need to check if there are FEC packets waiting to be sent + flushFecPkts(current_seg_); + producePktInternal(std::move(content_object), content_name, fec); + } +} + +void RTCProductionProtocol::producePktInternal( + std::shared_ptr<ContentObject> &&content_object, const Name &content_name, + bool fec) { + bool is_manifest = content_object->getPayloadType() == PayloadType::MANIFEST; + uint64_t now = utils::SteadyTime::nowMs().count(); + // set rtc header - struct rtc::data_packet_t *data_pkt = - (struct rtc::data_packet_t *)content_object->getPayload()->data(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - data_pkt->setTimestamp(now); - data_pkt->setProductionRate(bytes_production_rate_); + if (!is_manifest) { + struct rtc::data_packet_t *data_pkt = + (struct rtc::data_packet_t *)content_object->getPayload()->data(); + data_pkt->setTimestamp(now); + data_pkt->setProductionRate(bytes_production_rate_); + } // set hicn stuff Name n(content_name); content_object->setName(n.setSuffix(current_seg_)); - content_object->setLifetime(500); // XXX this should be set by the APP - content_object->setPathLabel(prod_label_); - // sign packet - if (signer_) { - signer_->signPacket(content_object.get()); - } + uint32_t expiry_time = 0; + socket_->getSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + expiry_time); + if (expiry_time == interface::default_values::content_object_expiry_time) + expiry_time = 500; // the data expiration time should be set by the App. if + // the App does not specify it the default is 500ms + content_object->setLifetime(expiry_time); + content_object->setPathLabel(prod_label_); // update stats if (!fec) { produced_bytes_ += content_object->headerSize() + content_object->payloadSize(); produced_packets_++; - } else { - produced_fec_packets_++; } - if (produced_packets_ >= max_packet_production_) { + if (!data_aggregation_ && produced_packets_ >= max_packet_production_) { // in this case all the pending interests may be used to accomodate the // sudden increase in the production rate. calling the updateStats we will // notify all the clients - round_timer_->cancel(); - updateStats(); + updateStats(false); } DLOG_IF(INFO, VLOG_IS_ON(3)) @@ -240,8 +473,12 @@ void RTCProductionProtocol::produceInternal( // pass packet to FEC encoder if (fec_encoder_ && !fec) { - fec_encoder_->onPacketProduced( - *content_object, content_object->headerSize() + rtc::DATA_HEADER_SIZE); + uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize() + : (uint32_t)content_object->headerSize() + + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType()); + + fec_encoder_->onPacketProduced(*content_object, offset, metadata); } output_buffer_.insert(content_object); @@ -251,18 +488,15 @@ void RTCProductionProtocol::produceInternal( *content_object); } - auto seq_it = seqs_map_.find(current_seg_); - if (seq_it != seqs_map_.end()) { - portal_->sendContentObject(*content_object); - } + // TODO we may want to send FEC only if an interest is pending in the pit in + sendContentObject(content_object, false, fec); if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *content_object); } - // remove interests from the interest cache if it exists - removeFromInterestQueue(current_seg_); + if (!fec) last_produced_data_ts_ = now; // Update current segment current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ; @@ -277,254 +511,169 @@ void RTCProductionProtocol::produceInternal( } } -void RTCProductionProtocol::onInterest(Interest &interest) { - if (*on_interest_input_) { - on_interest_input_->operator()(*socket_->getInterface(), interest); +void RTCProductionProtocol::flushFecPkts(uint32_t current_seq_num) { + // Currently we immediately send all the pending fec packets + // A pacing policy may be helpful, but we do not want to delay too much + // the packets at this moment. + while (paced_fec_packets_.size() > 0) { + producePktInternal(std::move(paced_fec_packets_.front().second), flow_name_, + true); + paced_fec_packets_.pop(); + } + fec_pacing_timer_->cancel(); + pending_fec_pace_ = false; + postponeFecPacket(); +} + +void RTCProductionProtocol::postponeFecPacket() { + if (paced_fec_packets_.size() == 0) return; + if (pending_fec_pace_) { + return; } - auto suffix = interest.firstSuffix(); - // numberOfSuffixes returns only the prefixes in the payalod - // we add + 1 to count anche the seq in the name - auto n_suffixes = interest.numberOfSuffixes() + 1; - Name name = interest.getName(); - bool prev_consumer_state = consumer_in_sync_; + uint64_t produced_time = paced_fec_packets_.front().first; + uint64_t now = utils::SteadyTime::nowMs().count(); - for (uint32_t i = 0; i < n_suffixes; i++) { - if (i > 0) { - name.setSuffix(*(suffix + (i - 1))); - } - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name; + uint64_t wait_time = 0; + if ((produced_time + rtc::FEC_PACING_TIME) > now) + wait_time = produced_time + rtc::FEC_PACING_TIME - now; - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(name); + fec_pacing_timer_->expires_from_now(std::chrono::milliseconds(wait_time)); + pending_fec_pace_ = true; - if (content_object) { - if (*on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_->operator()( - *socket_->getInterface(), interest); - } + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + fec_pacing_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), - *content_object); - } + auto sp = self.lock(); + if (sp && sp->isRunning()) { + if (!sp->pending_fec_pace_) return; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Send content %u (onInterest) " << content_object->getName(); - content_object->setPathLabel(cache_label_); - portal_->sendContentObject(*content_object); - } else { - if (*on_interest_process_) { - on_interest_process_->operator()(*socket_->getInterface(), interest); + if (sp->paced_fec_packets_.size() > 0) { + sp->producePktInternal(std::move(sp->paced_fec_packets_.front().second), + sp->flow_name_, true); + sp->paced_fec_packets_.pop(); } - processInterest(name.getSuffix(), interest.getLifetime()); + sp->pending_fec_pace_ = false; + sp->postponeFecPacket(); } - } - - if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_) - on_consumer_in_sync_(*socket_->getInterface(), interest); + }); } -void RTCProductionProtocol::processInterest(uint32_t interest_seg, - uint32_t lifetime) { - if (interest_seg == 0) { - // first packet from the consumer, reset sync state - consumer_in_sync_ = false; +void RTCProductionProtocol::onInterest(Interest &interest) { + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); } - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + if (!interest.isValid()) throw std::runtime_error("Bad interest format"); + if (interest.hasManifest() && + verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT) + throw std::runtime_error("Interset manifest verification failed"); - if (interest_seg > rtc::MIN_PROBE_SEQ) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << interest_seg; - sendNack(interest_seg); - return; - } - - // if the production rate 0 use delayed nacks - if (allow_delayed_nacks_ && interest_seg >= current_seg_) { - uint64_t next_timer = ~0; - if (!timers_map_.empty()) { - next_timer = timers_map_.begin()->first; - } + uint32_t *suffix = interest.firstSuffix(); + uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes(); + hicn_uword *request_bitmap = interest.getRequestBitmap(); - uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL; - addToInterestQueue(interest_seg, expiration); - - // here we have at least one interest in the queue, we need to start or - // update the timer - if (!queue_timer_on_) { - // set timeout - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } else { - // re-schedule the timer because a new interest will expires sooner - if (next_timer > timers_map_.begin()->first) { - interests_queue_timer_->cancel(); - scheduleQueueTimer(timers_map_.begin()->first - now); - } - } - return; - } + Name name = interest.getName(); + uint32_t pos = 0; // Position of current suffix in manifest - if (queue_timer_on_) { - // the producer is producing. Send nacks to packets that will expire before - // the data production and remove the timer - queue_timer_on_ = false; - interests_queue_timer_->cancel(); - sendNacksForPendingInterests(); - } + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received interest " << name << " (" << n_suffixes_in_manifest + << " suffixes in manifest)"; + + // Process the suffix in the interest header + // (first loop iteration), then suffixes in the manifest + do { + if (!interest.hasManifest() || + bitmap_is_set_no_check(request_bitmap, pos)) { + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(name); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()( + *socket_->getInterface(), interest); + } - uint32_t max_gap = (uint32_t)floor( - (double)((double)((double)lifetime * - rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / - rtc::MILLI_IN_A_SEC) * - (double)(packets_production_rate_ + - fec_packets_production_rate_))); + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } - if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { - sendNack(interest_seg); - } else { - if (!consumer_in_sync_ && on_consumer_in_sync_) { - // we consider the remote consumer to be in sync as soon as it covers 70% - // of the production window with interests - uint32_t perc = ceil((double)max_gap * 0.7); - if (interest_seg > (perc + current_seg_)) { - consumer_in_sync_ = true; - // on_consumer_in_sync_(*socket_->getInterface(), interest); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content %u (onInterest) " << content_object->getName(); + content_object->setPathLabel(cache_label_); + sendContentObject(content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + processInterest(name.getSuffix(), interest.getLifetime()); } } - uint64_t expiration = - now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR); - addToInterestQueue(interest_seg, expiration); - } -} -void RTCProductionProtocol::onError(std::error_code ec) {} + // Retrieve next suffix in the manifest + if (interest.hasManifest()) { + uint32_t seq = *suffix; + suffix++; -void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { - interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); - interests_queue_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestQueueTimer(); - }); + name.setSuffix(seq); + interest.setName(name); + } + } while (pos++ < n_suffixes_in_manifest); } -void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, - uint64_t expiration) { - // check if the seq number exists already - auto it_seqs = seqs_map_.find(interest_seg); - if (it_seqs != seqs_map_.end()) { - // the seq already exists - if (expiration < it_seqs->second) { - // we need to update the timer becasue we got a smaller one - // 1) remove the entry from the multimap - // 2) update this entry - auto range = timers_map_.equal_range(it_seqs->second); - for (auto it_timers = range.first; it_timers != range.second; - it_timers++) { - if (it_timers->second == it_seqs->first) { - timers_map_.erase(it_timers); - break; - } - } - timers_map_.insert( - std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - it_seqs->second = expiration; - } else { - // nothing to do here +void RTCProductionProtocol::processInterest(uint32_t interest_seg, + uint32_t lifetime) { + switch (rtc::ProbeHandler::getProbeType(interest_seg)) { + case rtc::ProbeType::INIT: + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg; + sendManifestProbe(interest_seg); return; - } - } else { - // add the new seq - timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration)); + case rtc::ProbeType::RTT: + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received RTT probe " << interest_seg; + sendNack(interest_seg); + return; + default: + break; } -} -void RTCProductionProtocol::sendNacksForPendingInterests() { - std::unordered_set<uint32_t> to_remove; + if (interest_seg < current_seg_) sendNack(interest_seg); +} - uint32_t packet_gap = 100000; // set it to a high value (100sec) - if (packets_production_rate_ != 0) - packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); +void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { + Name manifest_name(flow_name_); + manifest_name.setSuffix(sequence); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::shared_ptr<core::ContentObjectManifest> manifest_probe = + createManifest(manifest_name); + auto manifest_probe_co = + std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket()); - for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { - if (it->first > current_seg_) { - uint64_t production_time = - ((it->first - current_seg_) * packet_gap) + now; - if (production_time >= it->second) { - sendNack(it->first); - to_remove.insert(it->first); - } - } - } + manifest_probe_co->setLifetime(0); + manifest_probe_co->setPathLabel(prod_label_); + manifest_probe->encode(); - // delete nacked interests - for (auto it = to_remove.begin(); it != to_remove.end(); it++) { - removeFromInterestQueue(*it); - } -} - -void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { - auto seq_it = seqs_map_.find(interest_seg); - if (seq_it != seqs_map_.end()) { - auto range = timers_map_.equal_range(seq_it->second); - for (auto it_timers = range.first; it_timers != range.second; it_timers++) { - if (it_timers->second == seq_it->first) { - timers_map_.erase(it_timers); - break; - } - } - seqs_map_.erase(seq_it); + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *manifest_probe_co); } -} -void RTCProductionProtocol::interestQueueTimer() { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { - uint64_t expire = it_timers->first; - if (expire <= now) { - uint32_t seq = it_timers->second; - sendNack(seq); - // remove the interest from the other map - seqs_map_.erase(seq); - it_timers = timers_map_.erase(it_timers); - } else { - // stop, we are done! - break; - } - } - if (timers_map_.empty()) { - queue_timer_on_ = false; - } else { - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence; + sendContentObject(manifest_probe_co, true, false); } void RTCProductionProtocol::sendNack(uint32_t sequence) { auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + nack_header_format_.first, nack_header_format_.second); + uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t next_packet = current_seg_; uint32_t prod_rate = bytes_production_rate_; struct rtc::nack_packet_t header; header.setTimestamp(now); header.setProductionRate(prod_rate); - header.setProductionSegement(next_packet); + header.setProductionSegment(next_packet); nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE); Name n(flow_name_); @@ -533,33 +682,42 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { nack->setLifetime(0); nack->setPathLabel(prod_label_); - if (signer_) { - signer_->signPacket(nack.get()); + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *nack); } - if (!consumer_in_sync_ && on_consumer_in_sync_ && - sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) { - consumer_in_sync_ = true; - auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); - interest->setName(n); - on_consumer_in_sync_(*socket_->getInterface(), *interest); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence; + sendContentObject(nack, true, false); +} + +void RTCProductionProtocol::sendContentObject( + std::shared_ptr<ContentObject> content_object, bool nack, bool fec) { + bool is_ah = HICN_PACKET_FORMAT_IS_AH(content_object->getFormat()); + + // Compute signature + if (is_ah) { + signer_->signPacket(content_object.get()); } - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), *nack); + // Compute and save data packet digest + if (manifest_max_capacity_ && !is_ah) { + auth::CryptoHashType hash_algo; + socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, + hash_algo); + manifest_entries_.push({content_object->getName().getSuffix(), + content_object->computeDigest(hash_algo)}); } - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence; - portal_->sendContentObject(*nack); + portal_->sendContentObject(*content_object); } -void RTCProductionProtocol::onFecPackets( - std::vector<std::pair<uint32_t, fec::buffer>> &packets) { +void RTCProductionProtocol::onFecPackets(fec::BufferArray &packets) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Produced " << packets.size() << " FEC packets"; + for (auto &packet : packets) { auto content_object = - std::static_pointer_cast<ContentObject>(packet.second); + std::static_pointer_cast<ContentObject>(packet.getBuffer()); content_object->prepend(content_object->headerSize() + rtc::DATA_HEADER_SIZE); pending_fec_packets_.push(std::move(content_object)); @@ -569,19 +727,21 @@ void RTCProductionProtocol::onFecPackets( fec::buffer RTCProductionProtocol::getBuffer(std::size_t size) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Asked buffer for FEC symbol of size " << size; + auto ret = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); + fec_header_format_.first, fec_header_format_.second); + ret->updateLength(rtc::DATA_HEADER_SIZE + size); ret->append(rtc::DATA_HEADER_SIZE + size); ret->trimStart(ret->headerSize() + rtc::DATA_HEADER_SIZE); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Responding with buffer of length " << ret->length(); - assert(ret->length() >= size); + DCHECK(ret->length() >= size); return ret; } } // namespace protocol -} // end namespace transport +} // namespace transport |