diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_recovery_strategy.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc_recovery_strategy.cc | 197 |
1 files changed, 82 insertions, 115 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc index 66ae5086c..257fdd09b 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -29,8 +29,12 @@ using namespace transport::interface; RecoveryStrategy::RecoveryStrategy( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback) - : recovery_on_(false), + bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : rs_type_(rs_type), + recovery_on_(false), + content_sharing_mode_(false), rtx_during_fec_(0), next_rtx_timer_(MAX_TIMER_RTX), send_rtx_callback_(std::move(callback)), @@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy( } RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) - : rtx_during_fec_(0), + : rs_type_(rs.rs_type_), + content_sharing_mode_(rs.content_sharing_mode_), + rtx_during_fec_(0), rtx_state_(std::move(rs.rtx_state_)), rtx_timers_(std::move(rs.rtx_timers_)), recover_with_fec_(std::move(rs.recover_with_fec_)), @@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) RecoveryStrategy::~RecoveryStrategy() {} void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { + // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64 n_ = n; k_ = k; // XXX for the moment we go in steps of 5% loss rate. - // max loss rate = 95% + uint32_t i = 0; for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)(loss_rate + 5) / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - fec_state_ f; - f.fec_to_ask = std::min(fec_to_ask, (n_ - k_)); - f.last_update = round_id_; - f.avg_residual_losses = 0.0; - f.consecutive_use = 0; - fec_per_loss_rate_.push_back(f); + uint32_t fec_to_ask = 0; + if (n_ != 0 && k_ != 0) { + if (rs_type_ == + interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) { + // the max loss rate in the matrix is 50% + uint32_t index = i; + if (i > 9) index = 9; + fec_to_ask = FEC_MATRIX[k_ - 1][index]; + } else { + double dec_loss_rate = (double)(loss_rate + 5); + if (dec_loss_rate == 100.0) dec_loss_rate = 95.0; + dec_loss_rate = dec_loss_rate / 100.0; + double exp_losses = ceil((double)k_ * dec_loss_rate); + fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25); + } + } + fec_to_ask = std::min(fec_to_ask, (n_ - k_)); + fec_per_loss_rate_.push_back(fec_to_ask); + + i++; } } +uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) { + auto it = rtx_state_.find(seq); + + if (it == rtx_state_.end()) return 0; + + // we can compute the RTT of an RTX only if it was send once. Infact if the + // RTX was sent twice or more the data may be alredy in flight and the RTT + // will be underestimated. This may happen also for packets that we + // retransmitted too soon. in that case the RTT will be filtered out by + // checking the path label + if (it->second.rtx_count_ != 1) return 0; + + // this a potentialy valid packet, compute the RTT + return (utils::SteadyTime::nowMs().count() - it->second.last_send_); +} + bool RecoveryStrategy::lossDetected(uint32_t seq) { if (isRtx(seq)) { // this packet is already in the list of rtx @@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { state.first_send_ = state_->getInterestSentTime(seq); if (state.first_send_ == 0) // this interest was never sent before state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); + state.last_send_ = state.first_send_; // we didn't send an RTX for this + // packet yet state.rtx_count_ = 0; + state.next_send_ = computeNextSend(seq, state.rtx_count_); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Add " << seq << " to retransmissions. next rtx is in " << state.next_send_ - getNow() << " ms"; @@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { } } -uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) { +uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) { uint64_t now = getNow(); - if (new_rtx) { - // for the new rtx we wait one estimated IAT after the loss detection. this - // is bacause, assuming that packets arrive with a constant IAT, we should - // get a new packet every IAT - double prod_rate = state_->getProducerRate(); - uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; - uint32_t jitter = 0; + if (rtx_counter == 0) { + uint32_t wait = 1; + if (content_sharing_mode_) return now + wait; - if (prod_rate != 0) { - double packet_size = state_->getAveragePacketSize(); - estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - jitter = ceil(state_->getJitter()); - } + uint32_t jitter = SENTINEL_TIMER_INTERVAL; + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) jitter = ceil(state_->getJitter()); - uint32_t wait = 1; - if (estimated_iat < 18) { - // for low rate app we do not wait to send a RTX - // we consider low rate stream with less than 50pps (iat >= 20ms) - // (e.g. audio in videoconf, mobile games). - // in the check we use 18ms to accomodate for measurements errors - // for flows with higher rate wait 1 ait + jitter - wait = estimated_iat + jitter; - } + wait += jitter; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "first rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait + << " ms, jitter = " << jitter; return now + wait; } else { - // wait one RTT - uint32_t wait = SENTINEL_TIMER_INTERVAL; - + // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx double prod_rate = state_->getProducerRate(); if (prod_rate == 0) { return now + SENTINEL_TIMER_INTERVAL; } - double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint64_t rtt = 0; + // if the transport detects an edge we try first to get the RTX from the + // edge. if no interest get a reply we move to the full RTT + if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) { + rtt = state_->getEdgeRtt(); + } else { + rtt = state_->getAvgRTT(); + } - uint64_t rtt = state_->getMinRTT(); if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = rtt; + + if (content_sharing_mode_) return now + rtt; + + uint32_t wait = (uint32_t)rtt; uint32_t jitter = ceil(state_->getJitter()); wait += jitter; - // it may happen that the channel is congested and we have some additional - // queuing delay to take into account - uint32_t queue = ceil(state_->getQueuing()); - wait += queue; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "next rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter << " queue = " << queue; + << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt + << " jtter = " << jitter; return now + wait; } @@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() { state_->onRetransmission(seq); double prod_rate = state_->getProducerRate(); if (prod_rate != 0) rtx_it->second.rtx_count_++; - rtx_it->second.next_send_ = computeNextSend(seq, false); + rtx_it->second.last_send_ = now; + rtx_it->second.next_send_ = + computeNextSend(seq, rtx_it->second.rtx_count_); it = rtx_timers_.erase(it); rtx_timers_.insert( std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); @@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) { } it_timers++; } + // remove rtx rtx_state_.erase(it_rtx); } @@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() { if (loss_rate == 0) return 0; - // once per minute try to reduce the fec rate. it may happen that for some bin - // we ask too many fec packet. here we try to reduce this values gently - if (round_id_ % ROUNDS_PER_MIN == 0) { - reduceFec(); - } - // keep track of the last used fec. if we use a new bin on this round reset // consecutive use and avg loss in the prev bin uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1; + if (bin > fec_per_loss_rate_.size() - 1) + bin = (uint32_t)fec_per_loss_rate_.size() - 1; - if (bin != last_fec_used_) { - fec_per_loss_rate_[last_fec_used_].consecutive_use = 0; - fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0; - } - last_fec_used_ = bin; - fec_per_loss_rate_[last_fec_used_].consecutive_use++; - - // we update the stats only once very 5 rounds (1sec) that is the rate at - // which we compute residual losses - if (round_id_ % ROUNDS_PER_SEC == 0) { - double residual_losses = state_->getResidualLossRate() * 100; - // update residual loss rate - fec_per_loss_rate_[bin].avg_residual_losses = - (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) + - (1 - MOVING_AVG_ALPHA) * residual_losses; - - if ((fec_per_loss_rate_[bin].last_update - round_id_) < - WAIT_BEFORE_FEC_UPDATE) { - // this bin is been updated recently so don't modify it and - // return the current state - return fec_per_loss_rate_[bin].fec_to_ask; - } - - // if the residual loss rate is too high and we can ask more fec packets and - // we are using this configuration since at least 5 sec update fec - if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE && - fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) && - fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) { - // so increase the number of fec packets to ask - fec_per_loss_rate_[bin].fec_to_ask++; - fec_per_loss_rate_[bin].last_update = round_id_; - fec_per_loss_rate_[bin].avg_residual_losses = 0.0; - } - } - - return fec_per_loss_rate_[bin].fec_to_ask; + return fec_per_loss_rate_[bin]; } void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, @@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) { deleteRtx(seq); } -// private methods - -void RecoveryStrategy::reduceFec() { - for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)loss_rate / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) { - fec_per_loss_rate_[bin].fec_to_ask--; - } - } -} - } // end namespace rtc } // end namespace protocol |