From 6b94663b2455e212009a544ae23bb6a8c55407f8 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Thu, 9 Jun 2022 21:34:09 +0200 Subject: refactor(lib, hicn-light, vpp, hiperf): HICN-723 - move infra data structure into the shared lib - new packet cache using double hashing and lookup on prefix suffix - testing updates - authenticated requests using interest manifests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mauro Sardara Co-authored-by: Jordan Augé Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Enrico Loparco Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9 Signed-off-by: Luca Muscariello --- libtransport/src/protocols/rtc/rtc.cc | 125 +++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 54 deletions(-) (limited to 'libtransport/src/protocols/rtc/rtc.cc') diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index d2682edfa..9a56269f3 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), new RtcReassembly(icn_socket, this)), + max_aggregated_interest_(1), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); round_timer_ = @@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() { TransportProtocol::resume(); } -std::size_t RTCTransportProtocol::transportHeaderLength() { +std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) { return DATA_HEADER_SIZE + - (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0); + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0); } // private @@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() { std::shared_ptr verifier; socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - uint32_t unverified_interval; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL, - unverified_interval); + uint32_t factor_relevant; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + factor_relevant); - double unverified_ratio; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, - unverified_ratio); + uint32_t factor_alert; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + factor_alert); rc_ = std::make_shared(); ldr_ = std::make_shared( @@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() { } }); - verifier_ = std::make_shared(verifier, unverified_interval, - unverified_ratio); + verifier_ = + std::make_shared(verifier, factor_relevant, factor_alert); state_ = std::make_shared( indexer_verifier_.get(), @@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() { last_interest_sent_time_ = 0; last_interest_sent_seq_ = 0; -#if 0 - if(portal_->isConnectedToFwd()){ - max_aggregated_interest_ = 1; - }else{ - max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; - } -#else - max_aggregated_interest_ = 1; - if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) { - LOG(INFO) << "Max Aggregated: " << max_aggr; - max_aggregated_interest_ = std::stoul(std::string(max_aggr)); + // Aggregated interests setup + bool aggregated_interests_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS, + aggregated_interests_on); + if (aggregated_interests_on) { + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) + max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr)); + else + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + + max_aggregated_interest_ = std::min(max_aggregated_interest_, + 1 + MAX_SUFFIXES_IN_MANIFEST); } -#endif + LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_; max_sent_int_ = std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); @@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() { socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); ldr_->changeRecoveryStrategy( (interface::RtcTransportRecoveryStrategies)strategy); + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) ldr_->setContentSharingMode(); ldr_->turnOnRecovery(); ldr_->onNewRound(false); @@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() { Name *name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); Prefix prefix(*name, 128); - if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BEST_PATH); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_REPLICATION) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::REPLICATION); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_ALL_FWD_STRATEGIES) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BOTH); - } - + fwd_strategy_.initFwdStrategy( + portal_, prefix, state_.get(), + (interface::RtcTransportRecoveryStrategies)strategy); updateSyncWindow(); } @@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() { return; } + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE)) + production_rate = MIN_PROD_RATE_SHARING_MODE; + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); uint32_t lifetime = default_values::interest_lifetime; @@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() { double prod_rate = state_->getProducerRate(); double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = state_->getAveragePacketSize(); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE)) + prod_rate = MIN_PROD_RATE_SHARING_MODE; // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { @@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { sendInterest(*interest_name); } +void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + // we got a timeout for this packet so it is not pending anymore + interest_name->setSuffix(seq); + state_->onSendNewInterest(interest_name); + sendInterest(*interest_name); +} + void RTCTransportProtocol::scheduleNextInterests() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; @@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() { } // skip received packets - if (indexer_verifier_->checkNextSuffix() <= - state_->getHighestSeqReceivedInOrder()) { - indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); + uint32_t max_received = state_->getHighestSeqReceivedInOrder(); + if (indexer_verifier_->checkNextSuffix() <= max_received) { + indexer_verifier_->jumpToIndex(max_received + 1); } uint32_t sent_interests = 0; @@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() { << "In while loop. Window size: " << current_sync_win_; uint32_t next_seg = indexer_verifier_->getNextSuffix(); - name->setSuffix(next_seg); // send the packet only if: @@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, } timeouts_or_nacks_.insert(segment_number); - if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && segment_number <= state_->getHighestSeqReceived()) { // we retransmit packets only if the producer is active, otherwise we @@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() << ", jump to " << segment_number; // add an extra space in the window - current_sync_win_++; indexer_verifier_->jumpToIndex(segment_number); } state_->onTimeout(segment_number, false); + sendInterestForTimeout(segment_number); scheduleNextInterests(); } @@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); state_->onJumpForward(production_seg); - verifier_->onJumpForward(production_seg); // the client is asking for content in the past // switch to catch up state and increase the window // this is true only if the packet is not an RTX @@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived( // Check if the packet is a retransmission if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { if (is_data || is_manifest) { - state_->onPacketRecoveredRtx(segment_number); + uint64_t rtt = ldr_->getRtxRtt(segment_number); + state_->onPacketRecoveredRtx(content_object, rtt); if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived( } if (is_fec) { - state_->onFecPacketRecoveredRtx(segment_number); + state_->onFecPacketRecoveredRtx(content_object); } } @@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); stats_->updateLossRatio(state_->getPerSecondLossRate()); uint64_t rtt = state_->getAvgRTT(); - stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt)); + stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000)); stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); @@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object, DLOG_IF(INFO, VLOG_IS_ON(4)) << "Send packet " << content_object.getName() << " to FEC decoder"; - uint32_t offset = is_manifest - ? content_object.headerSize() - : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t offset = + is_manifest + ? (uint32_t)content_object.headerSize() + : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE); uint32_t metadata = static_cast(content_object.getPayloadType()); fec_decoder_->onDataPacket(content_object, offset, metadata); @@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { processManifest(*interest, *content_object); } - state_->onPacketRecoveredFec(seq_number, buffer->length()); + state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length()); ldr_->onPacketRecoveredFec(seq_number); if (payload_type == PayloadType::DATA) { @@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest, ContentObject::Ptr RTCTransportProtocol::removeFecHeader( const ContentObject &content_object) { - if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) { return nullptr; } - size_t fec_header_size = fec_decoder_->getFecHeaderSize(); + size_t fec_header_size = fec_decoder_->getFecHeaderSize(false); const uint8_t *payload = content_object.data() + content_object.headerSize() + fec_header_size; size_t payload_size = content_object.payloadSize() - fec_header_size; -- cgit 1.2.3-korg