aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_ldr.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_ldr.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc212
1 files changed, 149 insertions, 63 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index 0ef381fe1..f0de48871 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_ldr.h>
@@ -26,11 +27,13 @@ namespace protocol {
namespace rtc {
RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
- SendRtxCallback &&callback, asio::io_service &io_service)
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service)
: rtx_on_(false),
+ fec_on_(false),
next_rtx_timer_(MAX_TIMER_RTX),
last_event_(0),
sentinel_timer_interval_(MAX_TIMER_RTX),
+ indexer_(indexer),
send_rtx_callback_(std::move(callback)) {
timer_ = std::make_unique<asio::steady_timer>(io_service);
sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
@@ -40,7 +43,7 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
void RTCLossDetectionAndRecovery::turnOnRTX() {
rtx_on_ = true;
- scheduleSentinelTimer((uint32_t)(state_->getRTT() * CATCH_UP_RTT_INCREMENT));
+ scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
}
void RTCLossDetectionAndRecovery::turnOffRTX() {
@@ -48,6 +51,54 @@ void RTCLossDetectionAndRecovery::turnOffRTX() {
clear();
}
+uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) {
+ uint32_t current_fec = indexer_->getNFec();
+ double current_loss_rate = state_->getLossRate();
+ double last_loss_rate = state_->getLastRoundLossRate();
+
+ // when in sync ask for fec only if there are losses for 2 rounds
+ if (in_sync && current_fec == 0 &&
+ (current_loss_rate == 0 || last_loss_rate == 0))
+ return 0;
+
+ double loss_rate = state_->getMaxLossRate() * 1.5;
+
+ if (!in_sync && loss_rate == 0) loss_rate = 0.05;
+ if (loss_rate > 0.5) loss_rate = 0.5;
+
+ double exp_losses = (double)k_ * loss_rate;
+ uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate));
+
+ if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_;
+
+ return fec_to_ask;
+}
+
+void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
+ uint64_t rtt = state_->getRTT();
+ if (!fec_on_ && rtt >= 100) {
+ // turn on fec, here we may have no info so ask for all packets
+ fec_on_ = true;
+ turnOffRTX();
+ indexer_->setNFec(computeFecPacketsToAsk(in_sync));
+ return;
+ }
+
+ if (fec_on_ && rtt > 80) {
+ // keep using fec, maybe update it
+ indexer_->setNFec(computeFecPacketsToAsk(in_sync));
+ return;
+ }
+
+ if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
+ // turn on rtx
+ fec_on_ = false;
+ indexer_->setNFec(0);
+ turnOnRTX();
+ return;
+ }
+}
+
void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
// always add timeouts to the RTX list to avoid to send the same packet as if
// it was not a rtx
@@ -55,17 +106,23 @@ void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
last_event_ = getNow();
}
+void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
+ // if an RTX is scheduled for a packet recovered using FEC delete it
+ deleteRtx(seq);
+ recover_with_fec_.erase(seq);
+}
+
void RTCLossDetectionAndRecovery::onDataPacketReceived(
const core::ContentObject &content_object) {
last_event_ = getNow();
uint32_t seq = content_object.getName().getSuffix();
if (deleteRtx(seq)) {
- state_->onPacketRecovered(seq);
+ state_->onPacketRecoveredRtx(seq);
} else {
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
- TRANSPORT_LOGD("received data. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received data. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
}
}
@@ -76,8 +133,6 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
uint32_t seq = nack.getName().getSuffix();
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
-
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
uint32_t production_seq = nack_pkt->getProductionSegement();
@@ -91,8 +146,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
// productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
// 14 that are not arrived yet
deleteRtx(seq);
- TRANSPORT_LOGD("received past nack. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1
+ << " to " << production_seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
} else {
@@ -105,8 +161,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
// with productionSeq = 18. this says that all the packets between 12 and 18
// may got lost and we should ask them
deleteRtx(seq);
- TRANSPORT_LOGD("received futrue nack. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1
+ << " to " << production_seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
}
@@ -117,12 +174,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived(
// we don't log the reception of a probe packet for the sentinel timer because
// probes are not taken into account into the sync window. we use them as
// future nacks to detect possible packets lost
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
struct nack_packet_t *probe_pkt =
(struct nack_packet_t *)probe.getPayload()->data();
uint32_t production_seq = probe_pkt->getProductionSegement();
- TRANSPORT_LOGD("received probe. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received probe. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq;
+
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
}
@@ -150,20 +208,41 @@ void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
}
for (uint32_t seq = start; seq < stop; seq++) {
- if (!isRtx(seq) && // is not already an rtx
- // is not received or lost
- state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
- // add rtx
- rtxState state;
- state.first_send_ = state_->getInterestSentTime(seq);
- if (state.first_send_ == 0) // this interest was never sent before
- state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
- state.rtx_count_ = 0;
- TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq,
- (state.next_send_ - getNow()));
- rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
- rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
+ if (rtx_on_) {
+ if (!indexer_->isFec(seq)) {
+ // handle it with rtx
+ if (!isRtx(seq)) {
+ state_->onLossDetected(seq);
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Add " << seq << " to retransmissions. next rtx is %lu "
+ << state.next_send_ - getNow();
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ }
+ } else {
+ // is fec, do not send it
+ auto it = recover_with_fec_.find(seq);
+ if (it == recover_with_fec_.end()) {
+ state_->onLossDetected(seq);
+ recover_with_fec_.insert(seq);
+ }
+ }
+ } else {
+ // keep track of losses but recover with FEC
+ auto it = recover_with_fec_.find(seq);
+ if (it == recover_with_fec_.end()) {
+ state_->onLossDetected(seq);
+ recover_with_fec_.insert(seq);
+ }
+ }
}
}
scheduleNextRtx();
@@ -182,13 +261,15 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
if (prod_rate != 0) {
double packet_size = state_->getAveragePacketSize();
- estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
- jitter = (uint32_t)ceil(state_->getJitter());
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
}
uint32_t wait = estimated_iat + jitter;
- TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u",
- seq, wait, state_->getRTT(), estimated_iat, jitter);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "first rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter;
return now + wait;
} else {
@@ -202,25 +283,26 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
}
double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
uint64_t rtt = state_->getRTT();
if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = (uint32_t)rtt;
+ wait = rtt;
if (estimated_iat > rtt) wait = estimated_iat;
- uint32_t jitter = (uint32_t)ceil(state_->getJitter());
+ uint32_t jitter = ceil(state_->getJitter());
wait += jitter;
// it may happen that the channel is congested and we have some additional
// queuing delay to take into account
- uint32_t queue = (uint32_t)ceil(state_->getQueuing());
+ uint32_t queue = ceil(state_->getQueuing());
wait += queue;
- TRANSPORT_LOGD(
- "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u",
- seq, wait, state_->getRTT(), estimated_iat, jitter, queue);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter << " queue = " << queue;
return now + wait;
}
@@ -235,7 +317,7 @@ void RTCLossDetectionAndRecovery::retransmit() {
std::unordered_set<uint32_t> lost_pkt;
uint32_t sent_counter = 0;
while (it != rtx_timers_.end() && it->first <= now &&
- sent_counter < MAX_INTERESTS_IN_BATCH) {
+ sent_counter < MAX_RTX_IN_BATCH) {
uint32_t seq = it->second;
auto rtx_it =
rtx_state_.find(seq); // this should always return a valid iter
@@ -243,11 +325,11 @@ void RTCLossDetectionAndRecovery::retransmit() {
(now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
seq < state_->getLastSeqNacked()) {
// max rtx reached or packet too old or packet nacked, this packet is lost
- TRANSPORT_LOGD(
- "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u",
- seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX),
- ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE),
- (seq < state_->getLastSeqNacked()));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "packet " << seq << " lost because 1) max rtx: "
+ << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: "
+ << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE)
+ << " 3) nacked: " << (seq < state_->getLastSeqNacked());
lost_pkt.insert(seq);
it++;
} else {
@@ -259,8 +341,9 @@ void RTCLossDetectionAndRecovery::retransmit() {
it = rtx_timers_.erase(it);
rtx_timers_.insert(
std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
- TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq,
- (rtx_it->second.next_send_ - now));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "send rtx for sequence " << seq << ", next send in "
+ << (rtx_it->second.next_send_ - now);
send_rtx_callback_(seq);
sent_counter++;
}
@@ -358,20 +441,21 @@ void RTCLossDetectionAndRecovery::sentinelTimer() {
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
// this happens at the beginning (or if the producer stops for some
// reason) we need to keep sending interest 0 until we get an answer
- TRANSPORT_LOGD(
- "sentinel timer: the producer is not active, send packet 0");
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "sentinel timer: the producer is not active, send packet 0";
state_->onRetransmission(0);
send_rtx_callback_(0);
} else {
- TRANSPORT_LOGD(
- "sentinel timer: the producer is active, send the 10 oldest packets");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, "
+ "send the 10 oldest packets";
sent = true;
uint32_t rtx = 0;
auto it = state_->getPendingInterestsMapBegin();
auto end = state_->getPendingInterestsMapEnd();
while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
uint32_t seq = it->first;
- TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "sentinel timer, add " << seq << " to the rtx list";
addToRetransmissions(seq, seq + 1);
rtx++;
it++;
@@ -384,36 +468,38 @@ void RTCLossDetectionAndRecovery::sentinelTimer() {
uint32_t next_timer;
double prod_rate = state_->getProducerRate();
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
- TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL;
next_timer = SENTINEL_TIMER_INTERVAL;
} else {
double prod_rate = state_->getProducerRate();
double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
- uint32_t jitter = (uint32_t)ceil(state_->getJitter());
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t jitter = ceil(state_->getJitter());
// try to reduce the number of timers if the estimated IAT is too small
next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
- TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u",
- next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat,
- jitter);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next sentinel in " << next_timer
+ << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0)
+ << ", iat: " << estimated_iat << ", jitter: " << jitter;
if (!expired) {
// discount the amout of time that is already passed
- uint32_t discount = (uint32_t)(now - last_event_);
+ uint32_t discount = now - last_event_;
if (next_timer > discount) {
next_timer = next_timer - discount;
} else {
// in this case we trigger the timer in 1 ms
next_timer = 1;
}
- TRANSPORT_LOGD("timer after discout: %u", next_timer);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer;
} else if (sent) {
// wait at least one producer stats interval + owd to check if the
// production rate is reducing.
- uint32_t min_wait = PRODUCER_STATS_INTERVAL + (uint32_t)ceil(state_->getQueuing());
+ uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
next_timer = std::max(next_timer, min_wait);
- TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "wait for updates from prod, next timer: " << next_timer;
}
}