diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 426 |
1 files changed, 121 insertions, 305 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index e49f58167..cb8dff6e4 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -43,9 +43,6 @@ RTCProductionProtocol::RTCProductionProtocol( last_produced_data_ts_(0), last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), - queue_timer_on_(false), - consumer_in_sync_(false), - on_consumer_in_sync_(nullptr), pending_fec_pace_(false), max_len_(0), queue_len_(0), @@ -54,8 +51,6 @@ RTCProductionProtocol::RTCProductionProtocol( std::uniform_int_distribution<> dis(0, 255); prod_label_ = dis(gen_); cache_label_ = (prod_label_ + 1) % 256; - interests_queue_timer_ = - std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); fec_pacing_timer_ = @@ -69,16 +64,7 @@ RTCProductionProtocol::~RTCProductionProtocol() {} void RTCProductionProtocol::setProducerParam() { // Flow name: here we assume there is only one prefix registered in the portal - flow_name_ = portal_->getServedNamespaces().begin()->getName(); - - // Manifest - uint32_t making_manifest; - socket_->getSocketOption(interface::GeneralTransportOptions::MAKE_MANIFEST, - making_manifest); - - // Signer - std::shared_ptr<auth::Signer> signer; - socket_->getSocketOption(interface::GeneralTransportOptions::SIGNER, signer); + flow_name_ = portal_->getServedNamespaces().begin()->makeName(); // Default format core::Packet::Format default_format; @@ -94,15 +80,22 @@ void RTCProductionProtocol::setProducerParam() { socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); - size_t signature_size = signer->getSignatureFieldSize(); - data_header_format_ = { - !making_manifest ? Packet::toAHFormat(default_format) : default_format, - !making_manifest ? signature_size : 0}; + size_t signature_size = signer_->getSignatureFieldSize(); + data_header_format_ = {!manifest_max_capacity_ + ? Packet::toAHFormat(default_format) + : default_format, + !manifest_max_capacity_ ? signature_size : 0}; manifest_header_format_ = {Packet::toAHFormat(default_format), signature_size}; nack_header_format_ = {Packet::toAHFormat(default_format), signature_size}; fec_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + // Initialize verifier for aggregated interests + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER, + verifier); + verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0); + // Schedule round timer scheduleRoundTimer(); } @@ -143,15 +136,17 @@ void RTCProductionProtocol::updateStats(bool new_round) { packets_production_rate_ = ceil((double)(produced_packets_ + prev_produced_packets_) * per_second); - // add fec packets looking at the fec code. we don't use directly the number - // of fec packets produced in 1 round because it may happen that different - // numbers of blocks are generated during the rounds and this creates - // inconsistencies in the estimation of the production rate - uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); - uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); + if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) { + // add fec packets looking at the fec code. we don't use directly the number + // of fec packets produced in 1 round because it may happen that different + // numbers of blocks are generated during the rounds and this creates + // inconsistencies in the estimation of the production rate + uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); + uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); - packets_production_rate_ += - ceil((double)packets_production_rate_ / (double)k) * (n - k); + packets_production_rate_ += + ceil((double)packets_production_rate_ / (double)k) * (n - k); + } // update the production rate as soon as it increases by 10% with respect to // the last round @@ -168,11 +163,6 @@ void RTCProductionProtocol::updateStats(bool new_round) { allow_delayed_nacks_ = false; } - // check if the production rate is decreased. if yes send nacks if needed - if (prev_packets_production_rate < packets_production_rate_) { - sendNacksForPendingInterests(); - } - if (new_round) { prev_produced_bytes_ = produced_bytes_; prev_produced_packets_ = produced_packets_; @@ -203,16 +193,25 @@ void RTCProductionProtocol::produce(ContentObject &content_object) { uint32_t RTCProductionProtocol::produceDatagram( const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { std::size_t buffer_size = buffer->length(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Maybe Sending content object: " << content_name; + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name; + uint32_t data_packet_size; socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); - - if (TRANSPORT_EXPECT_FALSE( - (Packet::getHeaderSizeFromFormat(data_header_format_.first, - data_header_format_.second) + - rtc::DATA_HEADER_SIZE + buffer_size) > data_packet_size)) { + // this is a source packet but we check the fec header size of FEC packet in + // order to leave room for the header when FEC packets will be generated + uint32_t fec_header = 0; + if (fec_encoder_) fec_encoder_->getFecHeaderSize(true); + uint32_t headers_size = + (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first, + data_header_format_.second) + + rtc::DATA_HEADER_SIZE + fec_header; + if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) { return 0; } @@ -338,47 +337,42 @@ void RTCProductionProtocol::emptyQueue() { } void RTCProductionProtocol::sendManifest(const Name &name) { - if (!making_manifest_) { + if (!manifest_max_capacity_) { return; } - Name manifest_name(name); - - uint32_t data_packet_size; - socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, - data_packet_size); - - // The maximum number of entries a manifest can hold - uint32_t manifest_capacity = making_manifest_; + Name manifest_name = name; // If there is not enough hashes to fill a manifest, return early - if (manifest_entries_.size() < manifest_capacity) { + if (manifest_entries_.size() < manifest_max_capacity_) { return; } // Create a new manifest std::shared_ptr<core::ContentObjectManifest> manifest = createManifest(manifest_name.setSuffix(current_seg_)); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); // Fill the manifest with packet hashes that were previously saved uint32_t nb_entries; - for (nb_entries = 0; nb_entries < manifest_capacity; ++nb_entries) { + for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) { if (manifest_entries_.empty()) { break; } std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front(); - manifest->addSuffixHash(front.first, front.second); + manifest->addEntry(front.first, front.second); manifest_entries_.pop(); } DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Sending manifest " << manifest->getName().getSuffix() << " of size " - << nb_entries; + << "Sending manifest " << manifest_co->getName().getSuffix() + << " of size " << nb_entries; // Encode and send the manifest manifest->encode(); portal_->getThread().tryRunHandlerNow( - [this, content_object{std::move(manifest)}, manifest_name]() mutable { + [this, content_object{std::move(manifest_co)}, manifest_name]() mutable { produceInternal(std::move(content_object), manifest_name); }); } @@ -394,11 +388,12 @@ RTCProductionProtocol::createManifest(const Name &content_name) const { uint64_t now = utils::SteadyTime::nowMs().count(); // Create a new manifest - std::shared_ptr<core::ContentObjectManifest> manifest( - ContentObjectManifest::createManifest( - manifest_header_format_.first, name, core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, false, name, hash_algo, - manifest_header_format_.second)); + std::shared_ptr<core::ContentObjectManifest> manifest = + ContentObjectManifest::createContentManifest( + manifest_header_format_.first, name, manifest_header_format_.second); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, false /* is_last */, + name); // Set connection parameters manifest->setParamsRTC(ParamsRTC{ @@ -444,7 +439,15 @@ void RTCProductionProtocol::producePktInternal( // set hicn stuff Name n(content_name); content_object->setName(n.setSuffix(current_seg_)); - content_object->setLifetime(500); // XXX this should be set by the APP + + uint32_t expiry_time = 0; + socket_->getSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + expiry_time); + if (expiry_time == interface::default_values::content_object_expiry_time) + expiry_time = 500; // the data expiration time should be set by the App. if + // the App does not specify it the default is 500ms + content_object->setLifetime(expiry_time); content_object->setPathLabel(prod_label_); // update stats @@ -466,9 +469,9 @@ void RTCProductionProtocol::producePktInternal( // pass packet to FEC encoder if (fec_encoder_ && !fec) { - uint32_t offset = - is_manifest ? content_object->headerSize() - : content_object->headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize() + : (uint32_t)content_object->headerSize() + + rtc::DATA_HEADER_SIZE; uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType()); fec_encoder_->onPacketProduced(*content_object, offset, metadata); @@ -481,19 +484,14 @@ void RTCProductionProtocol::producePktInternal( *content_object); } - auto seq_it = seqs_map_.find(current_seg_); - if (seq_it != seqs_map_.end()) { - sendContentObject(content_object, false, fec); - } + // TODO we may want to send FEC only if an interest is pending in the pit in + sendContentObject(content_object, false, fec); if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *content_object); } - // remove interests from the interest cache if it exists - removeFromInterestQueue(current_seg_); - if (!fec) last_produced_data_ts_ = now; // Update current segment @@ -563,59 +561,65 @@ void RTCProductionProtocol::onInterest(Interest &interest) { on_interest_input_->operator()(*socket_->getInterface(), interest); } - auto suffix = interest.firstSuffix(); - // numberOfSuffixes returns only the prefixes in the payalod - // we add + 1 to count also the seq in the name - auto n_suffixes = interest.numberOfSuffixes() + 1; - Name name = interest.getName(); - bool prev_consumer_state = consumer_in_sync_; - - for (uint32_t i = 0; i < n_suffixes; i++) { - if (i > 0) { - name.setSuffix(*(suffix + (i - 1))); - } + if (!interest.isValid()) throw std::runtime_error("Bad interest format"); + if (interest.hasManifest() && + verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT) + throw std::runtime_error("Interset manifest verification failed"); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name; + uint32_t *suffix = interest.firstSuffix(); + uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes(); + uint32_t *request_bitmap = interest.getRequestBitmap(); - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(name); + Name name = interest.getName(); + uint32_t pos = 0; // Position of current suffix in manifest - if (content_object) { - if (*on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_->operator()( - *socket_->getInterface(), interest); - } + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received interest " << name << " (" << n_suffixes_in_manifest + << " suffixes in manifest)"; + + // Process the suffix in the interest header + // (first loop iteration), then suffixes in the manifest + do { + if (!interest.hasManifest() || is_bit_set(request_bitmap, pos)) { + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(name); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()( + *socket_->getInterface(), interest); + } - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), - *content_object); - } + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Send content %u (onInterest) " << content_object->getName(); - content_object->setPathLabel(cache_label_); - sendContentObject(content_object); - } else { - if (*on_interest_process_) { - on_interest_process_->operator()(*socket_->getInterface(), interest); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content %u (onInterest) " << content_object->getName(); + content_object->setPathLabel(cache_label_); + sendContentObject(content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + processInterest(name.getSuffix(), interest.getLifetime()); } - processInterest(name.getSuffix(), interest.getLifetime()); } - } - if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_) - on_consumer_in_sync_(*socket_->getInterface(), interest); + // Retrieve next suffix in the manifest + if (interest.hasManifest()) { + uint32_t seq = *suffix; + suffix++; + + name.setSuffix(seq); + interest.setName(name); + } + } while (pos++ < n_suffixes_in_manifest); } void RTCProductionProtocol::processInterest(uint32_t interest_seg, uint32_t lifetime) { - if (interest_seg == 0) { - // first packet from the consumer, reset sync state - consumer_in_sync_ = false; - } - - uint64_t now = utils::SteadyTime::nowMs().count(); - switch (rtc::ProbeHandler::getProbeType(interest_seg)) { case rtc::ProbeType::INIT: DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg; @@ -629,183 +633,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, break; } - // if the production rate 0 use delayed nacks - if (allow_delayed_nacks_ && interest_seg >= current_seg_) { - uint64_t next_timer = UINT64_MAX; - if (!timers_map_.empty()) { - next_timer = timers_map_.begin()->first; - } - - uint64_t expiration = now + rtc::NACK_DELAY; - addToInterestQueue(interest_seg, expiration); - - // here we have at least one interest in the queue, we need to start or - // update the timer - if (!queue_timer_on_) { - // set timeout - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } else { - // re-schedule the timer because a new interest will expires sooner - if (next_timer > timers_map_.begin()->first) { - interests_queue_timer_->cancel(); - scheduleQueueTimer(timers_map_.begin()->first - now); - } - } - return; - } - - if (queue_timer_on_) { - // the producer is producing. Send nacks to packets that will expire - // before the data production and remove the timer - queue_timer_on_ = false; - interests_queue_timer_->cancel(); - sendNacksForPendingInterests(); - } - - uint32_t max_gap = (uint32_t)floor( - (double)((double)((double)lifetime * - rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / - rtc::MILLI_IN_A_SEC) * - (double)(packets_production_rate_))); - - if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { - sendNack(interest_seg); - } else { - if (!consumer_in_sync_ && on_consumer_in_sync_) { - // we consider the remote consumer to be in sync as soon as it covers - // 70% of the production window with interests - uint32_t perc = ceil((double)max_gap * 0.7); - if (interest_seg > (perc + current_seg_)) { - consumer_in_sync_ = true; - // on_consumer_in_sync_(*socket_->getInterface(), interest); - } - } - uint64_t expiration = - now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR); - addToInterestQueue(interest_seg, expiration); - } -} - -void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { - interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); - std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); - interests_queue_timer_->async_wait([self](const std::error_code &ec) { - if (ec) { - return; - } - - auto sp = self.lock(); - if (sp && sp->isRunning()) { - sp->interestQueueTimer(); - } - }); -} - -void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, - uint64_t expiration) { - // check if the seq number exists already - auto it_seqs = seqs_map_.find(interest_seg); - if (it_seqs != seqs_map_.end()) { - // the seq already exists - if (expiration < it_seqs->second) { - // we need to update the timer becasue we got a smaller one - // 1) remove the entry from the multimap - // 2) update this entry - auto range = timers_map_.equal_range(it_seqs->second); - for (auto it_timers = range.first; it_timers != range.second; - it_timers++) { - if (it_timers->second == it_seqs->first) { - timers_map_.erase(it_timers); - break; - } - } - timers_map_.insert( - std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - it_seqs->second = expiration; - } else { - // nothing to do here - return; - } - } else { - // add the new seq - timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration)); - } -} - -void RTCProductionProtocol::sendNacksForPendingInterests() { - std::unordered_set<uint32_t> to_remove; - - uint32_t pps = ceil((double)(packets_production_rate_)*rtc:: - INTEREST_LIFETIME_REDUCTION_FACTOR); - - uint64_t now = utils::SteadyTime::nowMs().count(); - for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { - if (it->first > current_seg_ && it->second > now) { - double exp_time_in_sec = - (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC; - uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec); - - if (it->first > (current_seg_ + packets_prod_before_expire)) { - sendNack(it->first); - to_remove.insert(it->first); - } - } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ || - it->second <= now)) { - // this branch should never be execcuted - // first condition: the packet was already prdocued and we have and old - // interest pending. send a nack to notify the consumer if needed. the - // case it->first = current_seg_ is not handled because - // the interest will be satified by the next data packet. - // second condition: the interest is expired. - sendNack(it->first); - to_remove.insert(it->first); - } - } - - // delete nacked interests - for (auto it = to_remove.begin(); it != to_remove.end(); it++) { - removeFromInterestQueue(*it); - } -} - -void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { - auto seq_it = seqs_map_.find(interest_seg); - if (seq_it != seqs_map_.end()) { - auto range = timers_map_.equal_range(seq_it->second); - for (auto it_timers = range.first; it_timers != range.second; it_timers++) { - if (it_timers->second == seq_it->first) { - timers_map_.erase(it_timers); - break; - } - } - seqs_map_.erase(seq_it); - } -} - -void RTCProductionProtocol::interestQueueTimer() { - uint64_t now = utils::SteadyTime::nowMs().count(); - - for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { - uint64_t expire = it_timers->first; - if (expire <= now) { - uint32_t seq = it_timers->second; - sendNack(seq); - // remove the interest from the other map - seqs_map_.erase(seq); - it_timers = timers_map_.erase(it_timers); - } else { - // stop, we are done! - break; - } - } - if (timers_map_.empty()) { - queue_timer_on_ = false; - } else { - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } + if (interest_seg < current_seg_) sendNack(interest_seg); } void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { @@ -814,18 +642,20 @@ void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { std::shared_ptr<core::ContentObjectManifest> manifest_probe = createManifest(manifest_name); + auto manifest_probe_co = + std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket()); - manifest_probe->setLifetime(0); - manifest_probe->setPathLabel(prod_label_); + manifest_probe_co->setLifetime(0); + manifest_probe_co->setPathLabel(prod_label_); manifest_probe->encode(); if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), - *manifest_probe); + *manifest_probe_co); } DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence; - sendContentObject(manifest_probe, true, false); + sendContentObject(manifest_probe_co, true, false); } void RTCProductionProtocol::sendNack(uint32_t sequence) { @@ -847,20 +677,6 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { nack->setLifetime(0); nack->setPathLabel(prod_label_); - if (!consumer_in_sync_ && on_consumer_in_sync_ && - rtc::ProbeHandler::getProbeType(sequence) == rtc::ProbeType::NOT_PROBE && - sequence > next_packet) { - consumer_in_sync_ = true; - Packet::Format format; - socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, - format); - - auto interest = - core::PacketManager<>::getInstance().getPacket<Interest>(format); - interest->setName(n); - on_consumer_in_sync_(*socket_->getInterface(), *interest); - } - if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *nack); } @@ -881,7 +697,7 @@ void RTCProductionProtocol::sendContentObject( portal_->sendContentObject(*content_object); // Compute and save data packet digest - if (making_manifest_ && !is_ah) { + if (manifest_max_capacity_ && !is_ah) { auth::CryptoHashType hash_algo; socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, hash_algo); |