From 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 30 Jun 2021 07:57:22 +0000 Subject: [HICN-713] Transport Library Major Refactoring 2 Co-authored-by: Luca Muscariello Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Giulio Grassi Signed-off-by: Mauro Sardara Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2 --- libtransport/src/protocols/rtc/rtc_ldr.cc | 212 +++++++++++++++++++++--------- 1 file changed, 149 insertions(+), 63 deletions(-) (limited to 'libtransport/src/protocols/rtc/rtc_ldr.cc') diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index 0ef381fe1..f0de48871 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include #include #include @@ -26,11 +27,13 @@ namespace protocol { namespace rtc { RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( - SendRtxCallback &&callback, asio::io_service &io_service) + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service) : rtx_on_(false), + fec_on_(false), next_rtx_timer_(MAX_TIMER_RTX), last_event_(0), sentinel_timer_interval_(MAX_TIMER_RTX), + indexer_(indexer), send_rtx_callback_(std::move(callback)) { timer_ = std::make_unique(io_service); sentinel_timer_ = std::make_unique(io_service); @@ -40,7 +43,7 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} void RTCLossDetectionAndRecovery::turnOnRTX() { rtx_on_ = true; - scheduleSentinelTimer((uint32_t)(state_->getRTT() * CATCH_UP_RTT_INCREMENT)); + scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT); } void RTCLossDetectionAndRecovery::turnOffRTX() { @@ -48,6 +51,54 @@ void RTCLossDetectionAndRecovery::turnOffRTX() { clear(); } +uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) { + uint32_t current_fec = indexer_->getNFec(); + double current_loss_rate = state_->getLossRate(); + double last_loss_rate = state_->getLastRoundLossRate(); + + // when in sync ask for fec only if there are losses for 2 rounds + if (in_sync && current_fec == 0 && + (current_loss_rate == 0 || last_loss_rate == 0)) + return 0; + + double loss_rate = state_->getMaxLossRate() * 1.5; + + if (!in_sync && loss_rate == 0) loss_rate = 0.05; + if (loss_rate > 0.5) loss_rate = 0.5; + + double exp_losses = (double)k_ * loss_rate; + uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate)); + + if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_; + + return fec_to_ask; +} + +void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) { + uint64_t rtt = state_->getRTT(); + if (!fec_on_ && rtt >= 100) { + // turn on fec, here we may have no info so ask for all packets + fec_on_ = true; + turnOffRTX(); + indexer_->setNFec(computeFecPacketsToAsk(in_sync)); + return; + } + + if (fec_on_ && rtt > 80) { + // keep using fec, maybe update it + indexer_->setNFec(computeFecPacketsToAsk(in_sync)); + return; + } + + if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { + // turn on rtx + fec_on_ = false; + indexer_->setNFec(0); + turnOnRTX(); + return; + } +} + void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { // always add timeouts to the RTX list to avoid to send the same packet as if // it was not a rtx @@ -55,17 +106,23 @@ void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { last_event_ = getNow(); } +void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { + // if an RTX is scheduled for a packet recovered using FEC delete it + deleteRtx(seq); + recover_with_fec_.erase(seq); +} + void RTCLossDetectionAndRecovery::onDataPacketReceived( const core::ContentObject &content_object) { last_event_ = getNow(); uint32_t seq = content_object.getName().getSuffix(); if (deleteRtx(seq)) { - state_->onPacketRecovered(seq); + state_->onPacketRecoveredRtx(seq); } else { - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off - TRANSPORT_LOGD("received data. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received data. add from " + << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq); } } @@ -76,8 +133,6 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( uint32_t seq = nack.getName().getSuffix(); - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off - struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); uint32_t production_seq = nack_pkt->getProductionSegement(); @@ -91,8 +146,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and // 14 that are not arrived yet deleteRtx(seq); - TRANSPORT_LOGD("received past nack. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from " + << state_->getHighestSeqReceivedInOrder() + 1 + << " to " << production_seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } else { @@ -105,8 +161,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( // with productionSeq = 18. this says that all the packets between 12 and 18 // may got lost and we should ask them deleteRtx(seq); - TRANSPORT_LOGD("received futrue nack. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from " + << state_->getHighestSeqReceivedInOrder() + 1 + << " to " << production_seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } @@ -117,12 +174,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived( // we don't log the reception of a probe packet for the sentinel timer because // probes are not taken into account into the sync window. we use them as // future nacks to detect possible packets lost - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off struct nack_packet_t *probe_pkt = (struct nack_packet_t *)probe.getPayload()->data(); uint32_t production_seq = probe_pkt->getProductionSegement(); - TRANSPORT_LOGD("received probe. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received probe. add from " + << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq; + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } @@ -150,20 +208,41 @@ void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start, } for (uint32_t seq = start; seq < stop; seq++) { - if (!isRtx(seq) && // is not already an rtx - // is not received or lost - state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { - // add rtx - rtxState state; - state.first_send_ = state_->getInterestSentTime(seq); - if (state.first_send_ == 0) // this interest was never sent before - state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); - state.rtx_count_ = 0; - TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq, - (state.next_send_ - getNow())); - rtx_state_.insert(std::pair(seq, state)); - rtx_timers_.insert(std::pair(state.next_send_, seq)); + if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { + if (rtx_on_) { + if (!indexer_->isFec(seq)) { + // handle it with rtx + if (!isRtx(seq)) { + state_->onLossDetected(seq); + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.next_send_ = computeNextSend(seq, true); + state.rtx_count_ = 0; + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Add " << seq << " to retransmissions. next rtx is %lu " + << state.next_send_ - getNow(); + rtx_state_.insert(std::pair(seq, state)); + rtx_timers_.insert( + std::pair(state.next_send_, seq)); + } + } else { + // is fec, do not send it + auto it = recover_with_fec_.find(seq); + if (it == recover_with_fec_.end()) { + state_->onLossDetected(seq); + recover_with_fec_.insert(seq); + } + } + } else { + // keep track of losses but recover with FEC + auto it = recover_with_fec_.find(seq); + if (it == recover_with_fec_.end()) { + state_->onLossDetected(seq); + recover_with_fec_.insert(seq); + } + } } } scheduleNextRtx(); @@ -182,13 +261,15 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, if (prod_rate != 0) { double packet_size = state_->getAveragePacketSize(); - estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); - jitter = (uint32_t)ceil(state_->getJitter()); + estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + jitter = ceil(state_->getJitter()); } uint32_t wait = estimated_iat + jitter; - TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u", - seq, wait, state_->getRTT(), estimated_iat, jitter); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "first rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat + << " jttr = " << jitter; return now + wait; } else { @@ -202,25 +283,26 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, } double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); uint64_t rtt = state_->getRTT(); if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = (uint32_t)rtt; + wait = rtt; if (estimated_iat > rtt) wait = estimated_iat; - uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + uint32_t jitter = ceil(state_->getJitter()); wait += jitter; // it may happen that the channel is congested and we have some additional // queuing delay to take into account - uint32_t queue = (uint32_t)ceil(state_->getQueuing()); + uint32_t queue = ceil(state_->getQueuing()); wait += queue; - TRANSPORT_LOGD( - "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u", - seq, wait, state_->getRTT(), estimated_iat, jitter, queue); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat + << " jttr = " << jitter << " queue = " << queue; return now + wait; } @@ -235,7 +317,7 @@ void RTCLossDetectionAndRecovery::retransmit() { std::unordered_set lost_pkt; uint32_t sent_counter = 0; while (it != rtx_timers_.end() && it->first <= now && - sent_counter < MAX_INTERESTS_IN_BATCH) { + sent_counter < MAX_RTX_IN_BATCH) { uint32_t seq = it->second; auto rtx_it = rtx_state_.find(seq); // this should always return a valid iter @@ -243,11 +325,11 @@ void RTCLossDetectionAndRecovery::retransmit() { (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || seq < state_->getLastSeqNacked()) { // max rtx reached or packet too old or packet nacked, this packet is lost - TRANSPORT_LOGD( - "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u", - seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX), - ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE), - (seq < state_->getLastSeqNacked())); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "packet " << seq << " lost because 1) max rtx: " + << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: " + << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE) + << " 3) nacked: " << (seq < state_->getLastSeqNacked()); lost_pkt.insert(seq); it++; } else { @@ -259,8 +341,9 @@ void RTCLossDetectionAndRecovery::retransmit() { it = rtx_timers_.erase(it); rtx_timers_.insert( std::pair(rtx_it->second.next_send_, seq)); - TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq, - (rtx_it->second.next_send_ - now)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "send rtx for sequence " << seq << ", next send in " + << (rtx_it->second.next_send_ - now); send_rtx_callback_(seq); sent_counter++; } @@ -358,20 +441,21 @@ void RTCLossDetectionAndRecovery::sentinelTimer() { if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { // this happens at the beginning (or if the producer stops for some // reason) we need to keep sending interest 0 until we get an answer - TRANSPORT_LOGD( - "sentinel timer: the producer is not active, send packet 0"); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "sentinel timer: the producer is not active, send packet 0"; state_->onRetransmission(0); send_rtx_callback_(0); } else { - TRANSPORT_LOGD( - "sentinel timer: the producer is active, send the 10 oldest packets"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, " + "send the 10 oldest packets"; sent = true; uint32_t rtx = 0; auto it = state_->getPendingInterestsMapBegin(); auto end = state_->getPendingInterestsMapEnd(); while (it != end && rtx < MAX_RTX_WITH_SENTINEL) { uint32_t seq = it->first; - TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "sentinel timer, add " << seq << " to the rtx list"; addToRetransmissions(seq, seq + 1); rtx++; it++; @@ -384,36 +468,38 @@ void RTCLossDetectionAndRecovery::sentinelTimer() { uint32_t next_timer; double prod_rate = state_->getProducerRate(); if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) { - TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL; next_timer = SENTINEL_TIMER_INTERVAL; } else { double prod_rate = state_->getProducerRate(); double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); - uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint32_t jitter = ceil(state_->getJitter()); // try to reduce the number of timers if the estimated IAT is too small next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1); - TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u", - next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat, - jitter); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next sentinel in " << next_timer + << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0) + << ", iat: " << estimated_iat << ", jitter: " << jitter; if (!expired) { // discount the amout of time that is already passed - uint32_t discount = (uint32_t)(now - last_event_); + uint32_t discount = now - last_event_; if (next_timer > discount) { next_timer = next_timer - discount; } else { // in this case we trigger the timer in 1 ms next_timer = 1; } - TRANSPORT_LOGD("timer after discout: %u", next_timer); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer; } else if (sent) { // wait at least one producer stats interval + owd to check if the // production rate is reducing. - uint32_t min_wait = PRODUCER_STATS_INTERVAL + (uint32_t)ceil(state_->getQueuing()); + uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing()); next_timer = std::max(next_timer, min_wait); - TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "wait for updates from prod, next timer: " << next_timer; } } -- cgit 1.2.3-korg