diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 553 |
1 files changed, 434 insertions, 119 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index cdc882d81..242abd30d 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,9 +13,11 @@ * limitations under the License. */ +#include <glog/logging.h> #include <hicn/transport/core/global_object_pool.h> #include <implementation/socket_producer.h> #include <protocols/prod_protocol_rtc.h> +#include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> #include <stdlib.h> #include <time.h> @@ -38,71 +40,89 @@ RTCProductionProtocol::RTCProductionProtocol( bytes_production_rate_(0), packets_production_rate_(0), fec_packets_production_rate_(0), - last_round_(std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count()), + last_produced_data_ts_(0), + last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), queue_timer_on_(false), consumer_in_sync_(false), - on_consumer_in_sync_(nullptr) { - srand((unsigned int)time(NULL)); - prod_label_ = rand() % 256; + on_consumer_in_sync_(nullptr), + pending_fec_pace_(false), + max_len_(0), + queue_len_(0), + data_aggregation_(true), + data_aggregation_timer_switch_(false) { + std::uniform_int_distribution<> dis(0, 255); + prod_label_ = dis(gen_); cache_label_ = (prod_label_ + 1) % 256; interests_queue_timer_ = - std::make_unique<asio::steady_timer>(portal_->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + fec_pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + app_packets_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); setOutputBufferSize(10000); - scheduleRoundTimer(); +} + +RTCProductionProtocol::~RTCProductionProtocol() {} + +void RTCProductionProtocol::setProducerParam() { + // Flow name: here we assume there is only one prefix registered in the portal + flow_name_ = portal_->getServedNamespaces().begin()->getName(); + + // Manifest + uint32_t making_manifest; + socket_->getSocketOption(interface::GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + + // Signer + std::shared_ptr<auth::Signer> signer; + socket_->getSocketOption(interface::GeneralTransportOptions::SIGNER, signer); + + // Default format + core::Packet::Format default_format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + default_format); // FEC using namespace std::placeholders; enableFEC(std::bind(&RTCProductionProtocol::onFecPackets, this, _1), std::bind(&RTCProductionProtocol::getBuffer, this, _1)); -} -RTCProductionProtocol::~RTCProductionProtocol() {} + // Aggregated data + socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA, + data_aggregation_); -void RTCProductionProtocol::registerNamespaceWithNetwork( - const Prefix &producer_namespace) { - ProductionProtocol::registerNamespaceWithNetwork(producer_namespace); - - flow_name_ = producer_namespace.getName(); - auto family = flow_name_.getAddressFamily(); - - switch (family) { - case AF_INET6: - data_header_size_ = - signer_ && !making_manifest_ - ? (uint32_t)Packet::getHeaderSizeFromFormat( - HF_INET6_TCP_AH, signer_->getSignatureFieldSize()) - : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); - ; - break; - case AF_INET: - data_header_size_ = - signer_ && !making_manifest_ - ? (uint32_t)Packet::getHeaderSizeFromFormat( - HF_INET_TCP_AH, signer_->getSignatureFieldSize()) - : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); - break; - default: - throw errors::RuntimeException("Unknown name format."); - } + size_t signature_size = signer->getSignatureFieldSize(); + data_header_format_ = { + !making_manifest ? Packet::toAHFormat(default_format) : default_format, + !making_manifest ? signature_size : 0}; + manifest_header_format_ = {Packet::toAHFormat(default_format), + signature_size}; + nack_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + fec_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + + // Schedule round timer + scheduleRoundTimer(); } void RTCProductionProtocol::scheduleRoundTimer() { round_timer_->expires_from_now( std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL)); - round_timer_->async_wait([this](std::error_code ec) { + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - updateStats(); + + auto sp = self.lock(); + if (sp && sp->isRunning()) { + sp->updateStats(); + } }); } void RTCProductionProtocol::updateStats() { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t duration = now - last_round_; if (duration == 0) duration = 1; double per_second = rtc::MILLI_IN_A_SEC / duration; @@ -125,11 +145,12 @@ void RTCProductionProtocol::updateStats() { if (max_packet_production_ < rtc::WIN_MIN) max_packet_production_ = rtc::WIN_MIN; - if (packets_production_rate_ != 0) { - allow_delayed_nacks_ = false; - } else if (prev_packets_production_rate == 0) { - // at least 2 rounds with production rate = 0 + if (packets_production_rate_ <= rtc::MIN_PRODUCTION_RATE || + prev_packets_production_rate <= rtc::MIN_PRODUCTION_RATE) { allow_delayed_nacks_ = true; + } else { + // at least 2 rounds with enough packets + allow_delayed_nacks_ = false; } // check if the production rate is decreased. if yes send nacks if needed @@ -170,42 +191,237 @@ uint32_t RTCProductionProtocol::produceDatagram( socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); - if (TRANSPORT_EXPECT_FALSE((buffer_size + data_header_size_ + - rtc::DATA_HEADER_SIZE) > data_packet_size)) { + if (TRANSPORT_EXPECT_FALSE( + (Packet::getHeaderSizeFromFormat(data_header_format_.first, + data_header_format_.second) + + rtc::DATA_HEADER_SIZE + buffer_size) > data_packet_size)) { return 0; } + if (!data_aggregation_) { + // if data aggregation is off emptyQueue will always return doing nothing + emptyQueue(); + + sendManifest(content_name); + + // create content object + auto content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + data_header_format_.first, data_header_format_.second); + + // add rtc header to the payload + struct rtc::data_packet_t header; + content_object->appendPayload((const uint8_t *)&header, + rtc::DATA_HEADER_SIZE); + content_object->appendPayload(buffer->data(), buffer->length()); + + // schedule actual sending on internal thread + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(content_object)}, + content_name]() mutable { + produceInternal(std::move(content_object), content_name); + }); + } else { + // XXX here we assume that all the packets that we push to the queue have + // the same name + auto app_pkt = utils::MemBuf::copyBuffer(buffer->data(), buffer->length()); + addPacketToQueue(std::move(app_pkt)); + } + + return 1; +} + +void RTCProductionProtocol::addPacketToQueue( + std::unique_ptr<utils::MemBuf> &&buffer) { + std::size_t buffer_size = buffer->length(); + if ((queue_len_ + buffer_size) > rtc::MAX_RTC_PAYLOAD_SIZE) { + emptyQueue(); // this should guaranty that the generated packet will never + // be larger than an MTU + } + + waiting_app_packets_.push(std::move(buffer)); + if (max_len_ < buffer_size) max_len_ = buffer_size; + queue_len_ += buffer_size; + + if (waiting_app_packets_.size() >= rtc::MAX_AGGREGATED_PACKETS) { + emptyQueue(); + } + + if (waiting_app_packets_.size() >= 1 && !data_aggregation_timer_switch_) { + data_aggregation_timer_switch_ = true; + app_packets_timer_->expires_from_now( + std::chrono::milliseconds(rtc::AGGREGATED_PACKETS_TIMER)); + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + app_packets_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->data_aggregation_timer_switch_) return; + ptr->emptyQueue(); + } + }); + } +} + +void RTCProductionProtocol::emptyQueue() { + if (waiting_app_packets_.size() == 0) return; // queue is empty + + Name n(flow_name_); + + // cancel timer is scheduled + if (data_aggregation_timer_switch_) { + data_aggregation_timer_switch_ = false; + app_packets_timer_->cancel(); + } + + // send a manifest beforehand if the hash buffer if full + sendManifest(n); + + // create content object auto content_object = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); + data_header_format_.first, data_header_format_.second); + // add rtc header to the payload struct rtc::data_packet_t header; content_object->appendPayload((const uint8_t *)&header, rtc::DATA_HEADER_SIZE); - content_object->appendPayload(buffer->data(), buffer->length()); - // schedule actual sending on internal thread - portal_->getIoService().dispatch([this, - content_object{std::move(content_object)}, - content_name]() mutable { - produceInternal(std::move(content_object), content_name); + // init aggregated header + rtc::AggrPktHeader hdr( + (uint8_t *)(content_object->getPayload()->data() + rtc::DATA_HEADER_SIZE), + max_len_, waiting_app_packets_.size()); + uint32_t header_size = hdr.getHeaderLen(); + content_object->append(header_size); // leave space for the aggregated header + + uint8_t index = 0; + while (waiting_app_packets_.size() != 0) { + std::unique_ptr<utils::MemBuf> pkt = + std::move(waiting_app_packets_.front()); + waiting_app_packets_.pop(); + // XXX for the moment we have a single name, so this works, otherwise we + // need to do something else + hdr.addPacketToHeader(index, pkt->length()); + // append packet + content_object->appendPayload(pkt->data(), pkt->length()); + index++; + } + + // reset queue values + max_len_ = 0; + queue_len_ = 0; + + // the packet is ready we need to send it + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(content_object)}, n]() mutable { + produceInternal(std::move(content_object), n); + }); +} + +void RTCProductionProtocol::sendManifest(const Name &name) { + if (!making_manifest_) { + return; + } + + Name manifest_name(name); + + uint32_t data_packet_size; + socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + // The maximum number of entries a manifest can hold + uint32_t manifest_capacity = making_manifest_; + + // If there is not enough hashes to fill a manifest, return early + if (manifest_entries_.size() < manifest_capacity) { + return; + } + + // Create a new manifest + std::shared_ptr<core::ContentObjectManifest> manifest = + createManifest(manifest_name.setSuffix(current_seg_)); + + // Fill the manifest with packet hashes that were previously saved + uint32_t nb_entries; + for (nb_entries = 0; nb_entries < manifest_capacity; ++nb_entries) { + if (manifest_entries_.empty()) { + break; + } + std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front(); + manifest->addSuffixHash(front.first, front.second); + manifest_entries_.pop(); + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending manifest " << manifest->getName().getSuffix() << " of size " + << nb_entries; + + // Encode and send the manifest + manifest->encode(); + portal_->getThread().tryRunHandlerNow( + [this, content_object{std::move(manifest)}, manifest_name]() mutable { + produceInternal(std::move(content_object), manifest_name); + }); +} + +std::shared_ptr<core::ContentObjectManifest> +RTCProductionProtocol::createManifest(const Name &content_name) const { + Name name(content_name); + + auth::CryptoHashType hash_algo; + socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, + hash_algo); + + uint64_t now = utils::SteadyTime::nowMs().count(); + + // Create a new manifest + std::shared_ptr<core::ContentObjectManifest> manifest( + ContentObjectManifest::createManifest( + manifest_header_format_.first, name, core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, false, name, hash_algo, + manifest_header_format_.second)); + + // Set connection parameters + manifest->setParamsRTC(ParamsRTC{ + .timestamp = now, + .prod_rate = bytes_production_rate_, + .prod_seg = current_seg_, + .support_fec = false, }); - return 1; + return manifest; } void RTCProductionProtocol::produceInternal( std::shared_ptr<ContentObject> &&content_object, const Name &content_name, bool fec) { + uint64_t now = utils::SteadyTime::nowMs().count(); + + if (fec && (now - last_produced_data_ts_) < rtc::FEC_PACING_TIME) { + paced_fec_packets_.push(std::pair<uint64_t, ContentObject::Ptr>( + now, std::move(content_object))); + postponeFecPacket(); + } else { + // need to check if there are FEC packets waiting to be sent + flushFecPkts(current_seg_); + producePktInternal(std::move(content_object), content_name, fec); + } +} + +void RTCProductionProtocol::producePktInternal( + std::shared_ptr<ContentObject> &&content_object, const Name &content_name, + bool fec) { + bool is_manifest = content_object->getPayloadType() == PayloadType::MANIFEST; + uint64_t now = utils::SteadyTime::nowMs().count(); + // set rtc header - struct rtc::data_packet_t *data_pkt = - (struct rtc::data_packet_t *)content_object->getPayload()->data(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - data_pkt->setTimestamp(now); - data_pkt->setProductionRate(bytes_production_rate_); + if (!is_manifest) { + struct rtc::data_packet_t *data_pkt = + (struct rtc::data_packet_t *)content_object->getPayload()->data(); + data_pkt->setTimestamp(now); + data_pkt->setProductionRate(bytes_production_rate_); + } // set hicn stuff Name n(content_name); @@ -213,11 +429,6 @@ void RTCProductionProtocol::produceInternal( content_object->setLifetime(500); // XXX this should be set by the APP content_object->setPathLabel(prod_label_); - // sign packet - if (signer_) { - signer_->signPacket(content_object.get()); - } - // update stats if (!fec) { produced_bytes_ += @@ -227,7 +438,7 @@ void RTCProductionProtocol::produceInternal( produced_fec_packets_++; } - if (produced_packets_ >= max_packet_production_) { + if (!data_aggregation_ && produced_packets_ >= max_packet_production_) { // in this case all the pending interests may be used to accomodate the // sudden increase in the production rate. calling the updateStats we will // notify all the clients @@ -240,8 +451,12 @@ void RTCProductionProtocol::produceInternal( // pass packet to FEC encoder if (fec_encoder_ && !fec) { - fec_encoder_->onPacketProduced( - *content_object, content_object->headerSize() + rtc::DATA_HEADER_SIZE); + uint32_t offset = + is_manifest ? content_object->headerSize() + : content_object->headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType()); + + fec_encoder_->onPacketProduced(*content_object, offset, metadata); } output_buffer_.insert(content_object); @@ -253,7 +468,7 @@ void RTCProductionProtocol::produceInternal( auto seq_it = seqs_map_.find(current_seg_); if (seq_it != seqs_map_.end()) { - portal_->sendContentObject(*content_object); + sendContentObject(content_object, false, fec); } if (*on_content_object_output_) { @@ -264,6 +479,8 @@ void RTCProductionProtocol::produceInternal( // remove interests from the interest cache if it exists removeFromInterestQueue(current_seg_); + if (!fec) last_produced_data_ts_ = now; + // Update current segment current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ; @@ -277,6 +494,55 @@ void RTCProductionProtocol::produceInternal( } } +void RTCProductionProtocol::flushFecPkts(uint32_t current_seq_num) { + // Currently we immediately send all the pending fec packets + // A pacing policy may be helpful, but we do not want to delay too much + // the packets at this moment. + while (paced_fec_packets_.size() > 0) { + producePktInternal(std::move(paced_fec_packets_.front().second), flow_name_, + true); + paced_fec_packets_.pop(); + } + fec_pacing_timer_->cancel(); + pending_fec_pace_ = false; + postponeFecPacket(); +} + +void RTCProductionProtocol::postponeFecPacket() { + if (paced_fec_packets_.size() == 0) return; + if (pending_fec_pace_) { + return; + } + + uint64_t produced_time = paced_fec_packets_.front().first; + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint64_t wait_time = 0; + if ((produced_time + rtc::FEC_PACING_TIME) > now) + wait_time = produced_time + rtc::FEC_PACING_TIME - now; + + fec_pacing_timer_->expires_from_now(std::chrono::milliseconds(wait_time)); + pending_fec_pace_ = true; + + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + fec_pacing_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto sp = self.lock(); + if (sp && sp->isRunning()) { + if (!sp->pending_fec_pace_) return; + + if (sp->paced_fec_packets_.size() > 0) { + sp->producePktInternal(std::move(sp->paced_fec_packets_.front().second), + sp->flow_name_, true); + sp->paced_fec_packets_.pop(); + } + sp->pending_fec_pace_ = false; + sp->postponeFecPacket(); + } + }); +} + void RTCProductionProtocol::onInterest(Interest &interest) { if (*on_interest_input_) { on_interest_input_->operator()(*socket_->getInterface(), interest); @@ -284,7 +550,7 @@ void RTCProductionProtocol::onInterest(Interest &interest) { auto suffix = interest.firstSuffix(); // numberOfSuffixes returns only the prefixes in the payalod - // we add + 1 to count anche the seq in the name + // we add + 1 to count also the seq in the name auto n_suffixes = interest.numberOfSuffixes() + 1; Name name = interest.getName(); bool prev_consumer_state = consumer_in_sync_; @@ -293,6 +559,7 @@ void RTCProductionProtocol::onInterest(Interest &interest) { if (i > 0) { name.setSuffix(*(suffix + (i - 1))); } + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name; const std::shared_ptr<ContentObject> content_object = @@ -312,7 +579,7 @@ void RTCProductionProtocol::onInterest(Interest &interest) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send content %u (onInterest) " << content_object->getName(); content_object->setPathLabel(cache_label_); - portal_->sendContentObject(*content_object); + sendContentObject(content_object); } else { if (*on_interest_process_) { on_interest_process_->operator()(*socket_->getInterface(), interest); @@ -332,14 +599,19 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, consumer_in_sync_ = false; } - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); - if (interest_seg > rtc::MIN_PROBE_SEQ) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << interest_seg; - sendNack(interest_seg); - return; + switch (rtc::ProbeHandler::getProbeType(interest_seg)) { + case rtc::ProbeType::INIT: + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg; + sendManifestProbe(interest_seg); + return; + case rtc::ProbeType::RTT: + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received RTT probe " << interest_seg; + sendNack(interest_seg); + return; + default: + break; } // if the production rate 0 use delayed nacks @@ -349,7 +621,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, next_timer = timers_map_.begin()->first; } - uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL; + uint64_t expiration = now + rtc::NACK_DELAY; addToInterestQueue(interest_seg, expiration); // here we have at least one interest in the queue, we need to start or @@ -369,8 +641,8 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, } if (queue_timer_on_) { - // the producer is producing. Send nacks to packets that will expire before - // the data production and remove the timer + // the producer is producing. Send nacks to packets that will expire + // before the data production and remove the timer queue_timer_on_ = false; interests_queue_timer_->cancel(); sendNacksForPendingInterests(); @@ -387,8 +659,8 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, sendNack(interest_seg); } else { if (!consumer_in_sync_ && on_consumer_in_sync_) { - // we consider the remote consumer to be in sync as soon as it covers 70% - // of the production window with interests + // we consider the remote consumer to be in sync as soon as it covers + // 70% of the production window with interests uint32_t perc = ceil((double)max_gap * 0.7); if (interest_seg > (perc + current_seg_)) { consumer_in_sync_ = true; @@ -401,13 +673,18 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, } } -void RTCProductionProtocol::onError(std::error_code ec) {} - void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); - interests_queue_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestQueueTimer(); + std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); + interests_queue_timer_->async_wait([self](const std::error_code &ec) { + if (ec) { + return; + } + + auto sp = self.lock(); + if (sp && sp->isRunning()) { + sp->interestQueueTimer(); + } }); } @@ -450,9 +727,7 @@ void RTCProductionProtocol::sendNacksForPendingInterests() { if (packets_production_rate_ != 0) packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { if (it->first > current_seg_) { @@ -486,9 +761,7 @@ void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { } void RTCProductionProtocol::interestQueueTimer() { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { uint64_t expire = it_timers->first; @@ -511,20 +784,37 @@ void RTCProductionProtocol::interestQueueTimer() { } } +void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { + Name manifest_name(flow_name_); + manifest_name.setSuffix(sequence); + + std::shared_ptr<core::ContentObjectManifest> manifest_probe = + createManifest(manifest_name); + + manifest_probe->setLifetime(0); + manifest_probe->setPathLabel(prod_label_); + manifest_probe->encode(); + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *manifest_probe); + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence; + sendContentObject(manifest_probe, true, false); +} + void RTCProductionProtocol::sendNack(uint32_t sequence) { auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + nack_header_format_.first, nack_header_format_.second); + uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t next_packet = current_seg_; uint32_t prod_rate = bytes_production_rate_; struct rtc::nack_packet_t header; header.setTimestamp(now); header.setProductionRate(prod_rate); - header.setProductionSegement(next_packet); + header.setProductionSegment(next_packet); nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE); Name n(flow_name_); @@ -533,14 +823,16 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { nack->setLifetime(0); nack->setPathLabel(prod_label_); - if (signer_) { - signer_->signPacket(nack.get()); - } - if (!consumer_in_sync_ && on_consumer_in_sync_ && - sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) { + rtc::ProbeHandler::getProbeType(sequence) == rtc::ProbeType::NOT_PROBE && + sequence > next_packet) { consumer_in_sync_ = true; - auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); interest->setName(n); on_consumer_in_sync_(*socket_->getInterface(), *interest); } @@ -550,16 +842,37 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { } DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence; - portal_->sendContentObject(*nack); + sendContentObject(nack, true, false); } -void RTCProductionProtocol::onFecPackets( - std::vector<std::pair<uint32_t, fec::buffer>> &packets) { +void RTCProductionProtocol::sendContentObject( + std::shared_ptr<ContentObject> content_object, bool nack, bool fec) { + bool is_ah = _is_ah(content_object->getFormat()); + + // Compute signature + if (is_ah) { + signer_->signPacket(content_object.get()); + } + + portal_->sendContentObject(*content_object); + + // Compute and save data packet digest + if (making_manifest_ && !is_ah) { + auth::CryptoHashType hash_algo; + socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, + hash_algo); + manifest_entries_.push({content_object->getName().getSuffix(), + content_object->computeDigest(hash_algo)}); + } +} + +void RTCProductionProtocol::onFecPackets(fec::BufferArray &packets) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Produced " << packets.size() << " FEC packets"; + for (auto &packet : packets) { auto content_object = - std::static_pointer_cast<ContentObject>(packet.second); + std::static_pointer_cast<ContentObject>(packet.getBuffer()); content_object->prepend(content_object->headerSize() + rtc::DATA_HEADER_SIZE); pending_fec_packets_.push(std::move(content_object)); @@ -569,19 +882,21 @@ void RTCProductionProtocol::onFecPackets( fec::buffer RTCProductionProtocol::getBuffer(std::size_t size) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Asked buffer for FEC symbol of size " << size; + auto ret = core::PacketManager<>::getInstance().getPacket<ContentObject>( - signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, - signer_ ? signer_->getSignatureFieldSize() : 0); + fec_header_format_.first, fec_header_format_.second); + ret->updateLength(rtc::DATA_HEADER_SIZE + size); ret->append(rtc::DATA_HEADER_SIZE + size); ret->trimStart(ret->headerSize() + rtc::DATA_HEADER_SIZE); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Responding with buffer of length " << ret->length(); - assert(ret->length() >= size); + DCHECK(ret->length() >= size); return ret; } } // namespace protocol -} // end namespace transport +} // namespace transport |