summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_recovery_strategy.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc197
1 files changed, 82 insertions, 115 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
index 66ae5086c..257fdd09b 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -29,8 +29,12 @@ using namespace transport::interface;
RecoveryStrategy::RecoveryStrategy(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback)
- : recovery_on_(false),
+ bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : rs_type_(rs_type),
+ recovery_on_(false),
+ content_sharing_mode_(false),
rtx_during_fec_(0),
next_rtx_timer_(MAX_TIMER_RTX),
send_rtx_callback_(std::move(callback)),
@@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy(
}
RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
- : rtx_during_fec_(0),
+ : rs_type_(rs.rs_type_),
+ content_sharing_mode_(rs.content_sharing_mode_),
+ rtx_during_fec_(0),
rtx_state_(std::move(rs.rtx_state_)),
rtx_timers_(std::move(rs.rtx_timers_)),
recover_with_fec_(std::move(rs.recover_with_fec_)),
@@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
RecoveryStrategy::~RecoveryStrategy() {}
void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
+ // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64
n_ = n;
k_ = k;
// XXX for the moment we go in steps of 5% loss rate.
- // max loss rate = 95%
+ uint32_t i = 0;
for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)(loss_rate + 5) / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- fec_state_ f;
- f.fec_to_ask = std::min(fec_to_ask, (n_ - k_));
- f.last_update = round_id_;
- f.avg_residual_losses = 0.0;
- f.consecutive_use = 0;
- fec_per_loss_rate_.push_back(f);
+ uint32_t fec_to_ask = 0;
+ if (n_ != 0 && k_ != 0) {
+ if (rs_type_ ==
+ interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) {
+ // the max loss rate in the matrix is 50%
+ uint32_t index = i;
+ if (i > 9) index = 9;
+ fec_to_ask = FEC_MATRIX[k_ - 1][index];
+ } else {
+ double dec_loss_rate = (double)(loss_rate + 5);
+ if (dec_loss_rate == 100.0) dec_loss_rate = 95.0;
+ dec_loss_rate = dec_loss_rate / 100.0;
+ double exp_losses = ceil((double)k_ * dec_loss_rate);
+ fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25);
+ }
+ }
+ fec_to_ask = std::min(fec_to_ask, (n_ - k_));
+ fec_per_loss_rate_.push_back(fec_to_ask);
+
+ i++;
}
}
+uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) {
+ auto it = rtx_state_.find(seq);
+
+ if (it == rtx_state_.end()) return 0;
+
+ // we can compute the RTT of an RTX only if it was send once. Infact if the
+ // RTX was sent twice or more the data may be alredy in flight and the RTT
+ // will be underestimated. This may happen also for packets that we
+ // retransmitted too soon. in that case the RTT will be filtered out by
+ // checking the path label
+ if (it->second.rtx_count_ != 1) return 0;
+
+ // this a potentialy valid packet, compute the RTT
+ return (utils::SteadyTime::nowMs().count() - it->second.last_send_);
+}
+
bool RecoveryStrategy::lossDetected(uint32_t seq) {
if (isRtx(seq)) {
// this packet is already in the list of rtx
@@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
state.first_send_ = state_->getInterestSentTime(seq);
if (state.first_send_ == 0) // this interest was never sent before
state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
+ state.last_send_ = state.first_send_; // we didn't send an RTX for this
+ // packet yet
state.rtx_count_ = 0;
+ state.next_send_ = computeNextSend(seq, state.rtx_count_);
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Add " << seq << " to retransmissions. next rtx is in "
<< state.next_send_ - getNow() << " ms";
@@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
}
}
-uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) {
+uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) {
uint64_t now = getNow();
- if (new_rtx) {
- // for the new rtx we wait one estimated IAT after the loss detection. this
- // is bacause, assuming that packets arrive with a constant IAT, we should
- // get a new packet every IAT
- double prod_rate = state_->getProducerRate();
- uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
- uint32_t jitter = 0;
+ if (rtx_counter == 0) {
+ uint32_t wait = 1;
+ if (content_sharing_mode_) return now + wait;
- if (prod_rate != 0) {
- double packet_size = state_->getAveragePacketSize();
- estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
- jitter = ceil(state_->getJitter());
- }
+ uint32_t jitter = SENTINEL_TIMER_INTERVAL;
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) jitter = ceil(state_->getJitter());
- uint32_t wait = 1;
- if (estimated_iat < 18) {
- // for low rate app we do not wait to send a RTX
- // we consider low rate stream with less than 50pps (iat >= 20ms)
- // (e.g. audio in videoconf, mobile games).
- // in the check we use 18ms to accomodate for measurements errors
- // for flows with higher rate wait 1 ait + jitter
- wait = estimated_iat + jitter;
- }
+ wait += jitter;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "first rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait
+ << " ms, jitter = " << jitter;
return now + wait;
} else {
- // wait one RTT
- uint32_t wait = SENTINEL_TIMER_INTERVAL;
-
+ // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx
double prod_rate = state_->getProducerRate();
if (prod_rate == 0) {
return now + SENTINEL_TIMER_INTERVAL;
}
- double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint64_t rtt = 0;
+ // if the transport detects an edge we try first to get the RTX from the
+ // edge. if no interest get a reply we move to the full RTT
+ if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) {
+ rtt = state_->getEdgeRtt();
+ } else {
+ rtt = state_->getAvgRTT();
+ }
- uint64_t rtt = state_->getMinRTT();
if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = rtt;
+
+ if (content_sharing_mode_) return now + rtt;
+
+ uint32_t wait = (uint32_t)rtt;
uint32_t jitter = ceil(state_->getJitter());
wait += jitter;
- // it may happen that the channel is congested and we have some additional
- // queuing delay to take into account
- uint32_t queue = ceil(state_->getQueuing());
- wait += queue;
-
DLOG_IF(INFO, VLOG_IS_ON(3))
- << "next rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter << " queue = " << queue;
+ << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt
+ << " jtter = " << jitter;
return now + wait;
}
@@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() {
state_->onRetransmission(seq);
double prod_rate = state_->getProducerRate();
if (prod_rate != 0) rtx_it->second.rtx_count_++;
- rtx_it->second.next_send_ = computeNextSend(seq, false);
+ rtx_it->second.last_send_ = now;
+ rtx_it->second.next_send_ =
+ computeNextSend(seq, rtx_it->second.rtx_count_);
it = rtx_timers_.erase(it);
rtx_timers_.insert(
std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
@@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) {
}
it_timers++;
}
+
// remove rtx
rtx_state_.erase(it_rtx);
}
@@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
if (loss_rate == 0) return 0;
- // once per minute try to reduce the fec rate. it may happen that for some bin
- // we ask too many fec packet. here we try to reduce this values gently
- if (round_id_ % ROUNDS_PER_MIN == 0) {
- reduceFec();
- }
-
// keep track of the last used fec. if we use a new bin on this round reset
// consecutive use and avg loss in the prev bin
uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1;
+ if (bin > fec_per_loss_rate_.size() - 1)
+ bin = (uint32_t)fec_per_loss_rate_.size() - 1;
- if (bin != last_fec_used_) {
- fec_per_loss_rate_[last_fec_used_].consecutive_use = 0;
- fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0;
- }
- last_fec_used_ = bin;
- fec_per_loss_rate_[last_fec_used_].consecutive_use++;
-
- // we update the stats only once very 5 rounds (1sec) that is the rate at
- // which we compute residual losses
- if (round_id_ % ROUNDS_PER_SEC == 0) {
- double residual_losses = state_->getResidualLossRate() * 100;
- // update residual loss rate
- fec_per_loss_rate_[bin].avg_residual_losses =
- (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) +
- (1 - MOVING_AVG_ALPHA) * residual_losses;
-
- if ((fec_per_loss_rate_[bin].last_update - round_id_) <
- WAIT_BEFORE_FEC_UPDATE) {
- // this bin is been updated recently so don't modify it and
- // return the current state
- return fec_per_loss_rate_[bin].fec_to_ask;
- }
-
- // if the residual loss rate is too high and we can ask more fec packets and
- // we are using this configuration since at least 5 sec update fec
- if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE &&
- fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) &&
- fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) {
- // so increase the number of fec packets to ask
- fec_per_loss_rate_[bin].fec_to_ask++;
- fec_per_loss_rate_[bin].last_update = round_id_;
- fec_per_loss_rate_[bin].avg_residual_losses = 0.0;
- }
- }
-
- return fec_per_loss_rate_[bin].fec_to_ask;
+ return fec_per_loss_rate_[bin];
}
void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
@@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) {
deleteRtx(seq);
}
-// private methods
-
-void RecoveryStrategy::reduceFec() {
- for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)loss_rate / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
- fec_per_loss_rate_[bin].fec_to_ask--;
- }
- }
-}
-
} // end namespace rtc
} // end namespace protocol