aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_state.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc152
1 files changed, 129 insertions, 23 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 9c965bfed..c99205a26 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_state.h>
@@ -22,11 +23,13 @@ namespace protocol {
namespace rtc {
-RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+RTCState::RTCState(Indexer *indexer,
+ ProbeHandler::SendProbeCallback &&rtt_probes_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service)
- : rtt_probes_(std::make_shared<ProbeHandler>(
- std::move(rtt_probes_callback), io_service)),
+ : indexer_(indexer),
+ rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback),
+ io_service)),
discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
initParams();
@@ -45,14 +48,22 @@ void RTCState::initParams() {
// loss counters
packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
first_seq_in_round_ = 0;
highest_seq_received_ = 0;
highest_seq_received_in_order_ = 0;
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
+ avg_loss_rate_ = 0.0;
+ max_loss_rate_ = 0.0;
+ last_round_loss_rate_ = 0.0;
residual_loss_rate_ = 0.0;
+ // fec counters
+ pending_fec_pkt_ = 0;
+ received_fec_pkt_ = 0;
+
// bw counters
received_bytes_ = 0;
avg_packet_size_ = INIT_PACKET_SIZE;
@@ -90,8 +101,14 @@ void RTCState::initParams() {
// pending interests
pending_interests_.clear();
+ // skipped interest
+ last_interest_sent_ = 0;
+ skipped_interests_.clear();
+
// init rtt
- first_interest_sent_ = ~0;
+ first_interest_sent_time_ = ~0;
+ first_interest_sent_seq_ = 0;
+
init_rtt_ = false;
rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
rtt_probes_->sendProbes();
@@ -106,18 +123,51 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) {
uint32_t seq = interest_name->getSuffix();
pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
- if(sent_interests_ == 0) first_interest_sent_ = now;
+ if (sent_interests_ == 0) {
+ first_interest_sent_time_ = now;
+ first_interest_sent_seq_ = seq;
+ }
+
+ if (indexer_->isFec(seq)) {
+ pending_fec_pkt_++;
+ }
+
+ if (last_interest_sent_ == 0 && seq != 0) {
+ last_interest_sent_ = seq; // init last interest sent
+ }
+
+ // TODO what happen in case of jumps?
+ // look for skipped interests
+ skipped_interests_.erase(seq); // remove seq if it is there
+ for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) {
+ if (indexer_->isFec(i)) {
+ skipped_interests_.insert(i);
+ }
+ }
+
+ last_interest_sent_ = seq;
sent_interests_++;
sent_interests_last_round_++;
}
-void RTCState::onTimeout(uint32_t seq) {
+void RTCState::onTimeout(uint32_t seq, bool lost) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
}
received_timeouts_++;
+
+ if (lost) onPacketLost(seq);
+}
+
+void RTCState::onLossDetected(uint32_t seq) {
+ if (!indexer_->isFec(seq)) {
+ packets_lost_++;
+ } else if (skipped_interests_.find(seq) == skipped_interests_.end() &&
+ seq >= first_interest_sent_seq_) {
+ packets_lost_++;
+ }
}
void RTCState::onRetransmission(uint32_t seq) {
@@ -128,7 +178,9 @@ void RTCState::onRetransmission(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
+#if 0
packets_lost_++;
+#endif
}
sent_rtx_++;
sent_rtx_last_round_++;
@@ -165,6 +217,16 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
received_packets_last_round_++;
}
+void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
+ uint32_t seq = content_object.getName().getSuffix();
+ updateReceivedBytes(content_object);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+ received_fec_pkt_++;
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+}
+
void RTCState::onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats) {
uint32_t seq = nack.getName().getSuffix();
@@ -197,12 +259,14 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
// old nack, seq is lost
// update last nacked
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
- TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "lost packet " << seq << " beacuse of a past nack";
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
// remove the nack from the pending interest map
// (the packet is not received/lost yet)
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
pending_interests_.erase(seq);
} else {
// this should be a quite rear event. simply remove the
@@ -221,17 +285,28 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
}
void RTCState::onPacketLost(uint32_t seq) {
- TRANSPORT_LOGD("packet %u is lost", seq);
+#if 0
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
// this packet was never retransmitted so it does
// not appear in the loss count
packets_lost_++;
}
+#endif
+ if (!indexer_->isFec(seq)) {
+ definitely_lost_pkt_++;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ }
addRecvOrLost(seq, PacketState::LOST);
}
-void RTCState::onPacketRecovered(uint32_t seq) {
+void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+ losses_recovered_++;
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+void RTCState::onPacketRecoveredFec(uint32_t seq) {
losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
}
@@ -258,7 +333,6 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t production_seq = probe_pkt->getProductionSegement();
uint32_t production_rate = probe_pkt->getProductionRate();
-
if (path_it == path_table_.end()) {
// found a new path
std::shared_ptr<RTCDataPath> newPath =
@@ -298,13 +372,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
// wait forever
received_probes_++;
- if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){
- if(received_probes_ == 1){
- // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others
+ if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) {
+ if (received_probes_ == 1) {
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the
+ // others
main_path_ = path;
setInitRttTimer(INIT_RTT_PROBE_WAIT);
}
- if(received_probes_ == INIT_RTT_PROBES) {
+ if (received_probes_ == INIT_RTT_PROBES) {
// we are done
init_rtt_timer_->cancel();
checkInitRttTimer();
@@ -314,7 +389,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
received_packets_last_round_++;
// ignore probes sent before the first interest
- if((now - rtt) <= first_interest_sent_) return false;
+ if ((now - rtt) <= first_interest_sent_time_) return false;
return true;
}
@@ -327,11 +402,11 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
double bytes_per_sec =
((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
- if(received_rate_ == 0)
+ if (received_rate_ == 0)
received_rate_ = bytes_per_sec;
else
received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
- ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
// search for an active path. There should be only one active path (meaning a
// path that leads to the producer socket -no cache- and from which we are
@@ -354,7 +429,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- if (in_sync) updateLossRate();
+ // if (in_sync) updateLossRate();
+ updateLossRate();
// handle nacks
if (!nack_on_last_round_ && received_bytes_ > 0) {
@@ -385,6 +461,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// reset counters
received_bytes_ = 0;
packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
@@ -397,6 +474,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
+ received_fec_pkt_ = 0;
+
rounds_++;
}
@@ -465,6 +544,7 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
}
void RTCState::updateLossRate() {
+ last_round_loss_rate_ = loss_rate_;
loss_rate_ = 0.0;
residual_loss_rate_ = 0.0;
@@ -475,9 +555,29 @@ void RTCState::updateLossRate() {
// division by 0
if (number_theorically_received_packets_ == 0) return;
+ // XXX this may be quite inefficient if the rate is high
+ // maybe is better to iterate over the set?
+ for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) {
+ auto it = skipped_interests_.find(i);
+ if (it != skipped_interests_.end()) {
+ if (number_theorically_received_packets_ > 0)
+ number_theorically_received_packets_--;
+ skipped_interests_.erase(it);
+ }
+ }
+
loss_rate_ = (double)((double)(packets_lost_) /
(double)number_theorically_received_packets_);
+ if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec
+ if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_;
+
+ if (avg_loss_rate_ == 0)
+ avg_loss_rate_ = loss_rate_;
+ else
+ avg_loss_rate_ =
+ avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA);
+
residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
(double)number_theorically_received_packets_);
@@ -485,6 +585,10 @@ void RTCState::updateLossRate() {
}
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ if (indexer_->isFec(seq)) {
+ pending_fec_pkt_--;
+ }
+
pending_interests_.erase(seq);
if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
received_or_lost_packets_.erase(received_or_lost_packets_.begin());
@@ -507,10 +611,12 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
// 1) there is a gap in the sequence so we do not update largest_in_seq_
// 2) all the packets from largest_in_seq_ to seq are in
// received_or_lost_packets_ an we upate largest_in_seq_
+ // or are FEC packets
for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
if (received_or_lost_packets_.find(i) ==
- received_or_lost_packets_.end()) {
+ received_or_lost_packets_.end() &&
+ !indexer_->isFec(i)) {
break;
}
// this packet is in order so we can update the
@@ -520,17 +626,17 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
}
}
-void RTCState::setInitRttTimer(uint32_t wait){
+void RTCState::setInitRttTimer(uint32_t wait) {
init_rtt_timer_->cancel();
init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
init_rtt_timer_->async_wait([this](std::error_code ec) {
- if(ec) return;
+ if (ec) return;
checkInitRttTimer();
});
}
void RTCState::checkInitRttTimer() {
- if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
// we didn't received enough probes, restart
received_probes_ = 0;
rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
@@ -547,7 +653,7 @@ void RTCState::checkInitRttTimer() {
double prod_rate = getProducerRate();
double rtt = (double)getRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = (uint32_t)std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();