aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_state.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc138
1 files changed, 47 insertions, 91 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 6a21531f8..5b3b5e4c3 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -56,7 +56,6 @@ void RTCState::initParams() {
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
avg_loss_rate_ = -1.0;
- max_loss_rate_ = 0.0;
last_round_loss_rate_ = 0.0;
// loss rate per sec
@@ -85,14 +84,13 @@ void RTCState::initParams() {
fec_recovered_rate_ = 0.0;
// nack counter
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
// packets counter
received_packets_last_round_ = 0;
received_data_last_round_ = 0;
received_data_from_cache_ = 0;
- data_from_cache_rate_ = 0;
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
@@ -103,7 +101,7 @@ void RTCState::initParams() {
last_production_seq_ = 0;
producer_is_active_ = false;
- last_prod_update_ = 0;
+ last_prod_update_seq_ = 0;
// paths stats
path_table_.clear();
@@ -180,7 +178,9 @@ void RTCState::onLossDetected(uint32_t seq) {
PacketState state = getPacketState(seq);
// if the packet is already marked with a state, do nothing
- if (state == PacketState::UNKNOWN) {
+ // to be considered lost the packet must be pending
+ if (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end()) {
packets_lost_++;
addToPacketCache(seq, PacketState::LOST);
}
@@ -225,8 +225,8 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
core::ParamsRTC params = RTCState::getDataParams(content_object);
- if (last_prod_update_ < params.timestamp) {
- last_prod_update_ = params.timestamp;
+ if (last_prod_update_seq_ < seq) {
+ last_prod_update_seq_ = seq;
production_rate_ = (double)params.prod_rate;
}
@@ -267,14 +267,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
uint32_t seq = nack.getName().getSuffix();
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
- uint64_t production_time = nack_pkt->getTimestamp();
uint32_t production_seq = nack_pkt->getProductionSegment();
uint32_t production_rate = nack_pkt->getProductionRate();
if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
- last_prod_update_ < production_time) {
+ last_prod_update_seq_ < production_seq) {
// update production rate
- last_prod_update_ = production_time;
last_production_seq_ = production_seq;
production_rate_ = (double)production_rate;
}
@@ -282,7 +280,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (compute_stats) {
// this is not an RTX
updatePathStats(nack, true);
- nack_on_last_round_ = true;
}
// for statistics pourpose we log all nacks, also the one received for
@@ -297,6 +294,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "lost packet " << seq << " beacuse of a past nack";
+ if (compute_stats) past_nack_on_last_round_ = true;
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
@@ -317,29 +315,15 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
}
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence number is not 1
- if (production_rate_ != 0 || production_seq != 1) {
- producer_is_active_ = true;
- }
-
received_packets_last_round_++;
}
void RTCState::onPacketLost(uint32_t seq) {
-#if 0
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
- auto it = pending_interests_.find(seq);
- if (it != pending_interests_.end()) {
- // this packet was never retransmitted so it does
- // not appear in the loss count
- packets_lost_++;
- }
-#endif
if (!indexer_->isFec(seq)) {
PacketState state = getPacketState(seq);
- if (state == PacketState::LOST || state == PacketState::UNKNOWN) {
+ if (state == PacketState::LOST ||
+ (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end())) {
definitely_lost_pkt_++;
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
@@ -350,7 +334,12 @@ void RTCState::onPacketLost(uint32_t seq) {
void RTCState::onPacketRecoveredRtx(uint32_t seq) {
packets_sent_to_app_++;
if (seq > highest_seq_received_) highest_seq_received_ = seq;
- losses_recovered_++;
+
+ // increase the recovered packet counter only if the packet was marked as LOST
+ // before.
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST) losses_recovered_++;
+
addRecvOrLost(seq, PacketState::RECEIVED);
}
@@ -371,19 +360,30 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
- if (getPacketState(seq) == PacketState::UNKNOWN)
- onLossDetected(seq); // the pkt was lost but didn't account for it yet
+ // the packet could be not marked as lost yet. onLossDetected checks if add in
+ // the packet in the lost count or not
+ onLossDetected(seq);
addRecvOrLost(seq, PacketState::RECEIVED);
}
bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
+
+ bool is_valid = true;
+ uint32_t max = UINT32_MAX;
+ if (params.prod_rate == max) is_valid = false;
uint64_t rtt;
- rtt = probe_handler_->getRtt(seq);
+ rtt = probe_handler_->getRtt(seq, is_valid);
if (rtt == 0) return false; // this is not a valid probe
+ if (!is_valid) return false; // not a valid probe
+
+ // if we are here the producer is active
+ producer_is_active_ = true;
+
// Like for data and nacks update the path stats. Here the RTT is computed
// by the probe handler. Both probes for rtt and bw are good to estimate
// info on the path.
@@ -406,24 +406,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint64_t now = utils::SteadyTime::nowMs().count();
- core::ParamsRTC params = RTCState::getProbeParams(probe);
-
int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
- if (last_prod_update_ < params.timestamp) {
+ if (last_prod_update_seq_ < params.prod_seg) {
last_production_seq_ = params.prod_seg;
- last_prod_update_ = params.timestamp;
production_rate_ = (double)params.prod_rate;
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence numner is not 1
- if (production_rate_ != 0 || params.prod_seg != 1) {
- producer_is_active_ = true;
- }
-
// check for init RTT. if received_probes_ is equal to 0 schedule a timer to
// wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
// wait forever
@@ -453,12 +443,12 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
void RTCState::onJumpForward(uint32_t next_seq) {
for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
seq++) {
- auto it = pending_interests_.find(seq);
PacketState packet_state = getPacketState(seq);
- if (it == pending_interests_.end() &&
- packet_state != PacketState::RECEIVED &&
+ if (packet_state != PacketState::RECEIVED &&
packet_state != PacketState::DEFINITELY_LOST) {
- onLossDetected(seq);
+ // here we considere the packet as definitely lost whitout increase the
+ // lost packet counter because this loss is not due to the network
+ // condition but the transport wants to skip the packet
onPacketLost(seq);
}
}
@@ -491,29 +481,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
fec_recovered_rate_ =
(fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
-#if 0
- // search for an active path. There should be only one active path (meaning a
- // path that leads to the producer socket -no cache- and from which we are
- // currently getting data packets) at any time. However it may happen that
- // there are mulitple active paths in case of mobility (the old path will
- // remain active for a short ammount of time). The main path is selected as
- // the active path from where the consumer received the latest data packet
-
- uint64_t last_packet_ts = 0;
- main_path_ = nullptr;
-
- for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->roundEnd();
- if (it->second->isActive()) {
- uint64_t ts = it->second->getLastPacketTS();
- if (ts > last_packet_ts) {
- last_packet_ts = ts;
- main_path_ = it->second;
- }
- }
- }
-#endif
-
// search for an active path. Is it possible to have multiple path that are
// used at the same time. We use as reference path the one from where we gets
// more packets. This means that the path should have better lantecy or less
@@ -544,7 +511,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
updateLossRate(in_sync);
// handle nacks
- if (!nack_on_last_round_ && received_bytes_ > 0) {
+ if (!past_nack_on_last_round_ && received_bytes_ > 0) {
rounds_without_nacks_++;
} else {
rounds_without_nacks_ = 0;
@@ -561,14 +528,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- // compute cache/producer ratio
- if (received_data_last_round_ != 0) {
- double new_rate =
- (double)received_data_from_cache_ / (double)received_data_last_round_;
- data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
- (new_rate * (1 - MOVING_AVG_ALPHA));
- }
-
// reset counters
received_bytes_ = 0;
received_fec_bytes_ = 0;
@@ -578,7 +537,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
received_packets_last_round_ = 0;
@@ -700,6 +659,7 @@ void RTCState::updateLossRate(bool in_sync) {
expected_packets_ +=
((highest_seq_received_ - first_seq_in_round_) - fec_packets);
} else {
+ expected_packets_ = 0;
packets_sent_to_app_ = 0;
}
@@ -715,7 +675,6 @@ void RTCState::updateLossRate(bool in_sync) {
(double)((double)(lost_per_sec_) / (double)total_expected_packets_);
loss_history_.pushBack(per_sec_loss_rate_);
- max_loss_rate_ = getMaxLoss();
if (in_sync && expected_packets_ != 0) {
// compute residual loss rate
@@ -778,7 +737,9 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
// however we may need to increse the number or lost packets
// XXX: in case we want to use rtx to recover fec packets,
// this may prevent to detect a packet loss and no rtx will be sent
- onLossDetected(i);
+ if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) {
+ onLossDetected(i);
+ }
} else {
// this is a data packet and we need to get it
break;
@@ -806,8 +767,9 @@ void RTCState::setInitRttTimer(uint32_t wait) {
}
void RTCState::checkInitRttTimer() {
- if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
- // we didn't received enough probes, restart
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV ||
+ probe_handler_->getProbeLossRate() == 1.0) {
+ // we didn't received enough probes or they were not valid, restart
received_probes_ = 0;
probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
@@ -819,7 +781,6 @@ void RTCState::checkInitRttTimer() {
init_rtt_ = true;
main_path_->roundEnd();
loss_history_.pushBack(probe_handler_->getProbeLossRate());
- max_loss_rate_ = getMaxLoss();
probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
@@ -829,17 +790,12 @@ void RTCState::checkInitRttTimer() {
double prod_rate = getProducerRate();
double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt));
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();
}
-double RTCState::getMaxLoss() {
- if (loss_history_.size() != 0) return loss_history_.begin();
- return 0;
-}
-
core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
core::ParamsRTC params;