aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_state.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc602
1 files changed, 418 insertions, 184 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index c99205a26..82ac0b9c1 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -24,15 +24,15 @@ namespace protocol {
namespace rtc {
RTCState::RTCState(Indexer *indexer,
- ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ ProbeHandler::SendProbeCallback &&probe_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service)
- : indexer_(indexer),
- rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback),
- io_service)),
+ : loss_history_(10), // log 10sec history
+ indexer_(indexer),
+ probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback),
+ io_service)),
discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
- initParams();
}
RTCState::~RTCState() {}
@@ -55,9 +55,18 @@ void RTCState::initParams() {
highest_seq_received_in_order_ = 0;
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
- avg_loss_rate_ = 0.0;
- max_loss_rate_ = 0.0;
+ avg_loss_rate_ = -1.0;
last_round_loss_rate_ = 0.0;
+
+ // loss rate per sec
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ per_sec_loss_rate_ = 0.0;
+
+ // residual losses counters
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
residual_loss_rate_ = 0.0;
// fec counters
@@ -66,19 +75,22 @@ void RTCState::initParams() {
// bw counters
received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
+
avg_packet_size_ = INIT_PACKET_SIZE;
production_rate_ = 0.0;
received_rate_ = 0.0;
+ fec_recovered_rate_ = 0.0;
// nack counter
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
// packets counter
received_packets_last_round_ = 0;
received_data_last_round_ = 0;
received_data_from_cache_ = 0;
- data_from_cache_rate_ = 0;
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
@@ -89,37 +101,37 @@ void RTCState::initParams() {
last_production_seq_ = 0;
producer_is_active_ = false;
- last_prod_update_ = 0;
+ last_prod_update_seq_ = 0;
// paths stats
path_table_.clear();
main_path_ = nullptr;
+ edge_path_ = nullptr;
- // packet received
- received_or_lost_packets_.clear();
+ // packet cache (not pending anymore)
+ packet_cache_.clear();
// pending interests
pending_interests_.clear();
- // skipped interest
+ // used to keep track of the skipped interest
last_interest_sent_ = 0;
- skipped_interests_.clear();
// init rtt
first_interest_sent_time_ = ~0;
first_interest_sent_seq_ = 0;
+ // start probing the producer
init_rtt_ = false;
- rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
- rtt_probes_->sendProbes();
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
setInitRttTimer(INIT_RTT_PROBE_RESTART);
}
// packet events
void RTCState::onSendNewInterest(const core::Name *interest_name) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint32_t seq = interest_name->getSuffix();
pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
@@ -137,11 +149,12 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) {
}
// TODO what happen in case of jumps?
- // look for skipped interests
- skipped_interests_.erase(seq); // remove seq if it is there
+ eraseFromPacketCache(
+ seq); // if we send this interest we don't know its state
for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) {
if (indexer_->isFec(i)) {
- skipped_interests_.insert(i);
+ // only fec packets can be skipped
+ addToPacketCache(i, PacketState::SKIPPED);
}
}
@@ -155,6 +168,7 @@ void RTCState::onTimeout(uint32_t seq, bool lost) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
received_timeouts_++;
@@ -162,11 +176,14 @@ void RTCState::onTimeout(uint32_t seq, bool lost) {
}
void RTCState::onLossDetected(uint32_t seq) {
- if (!indexer_->isFec(seq)) {
- packets_lost_++;
- } else if (skipped_interests_.find(seq) == skipped_interests_.end() &&
- seq >= first_interest_sent_seq_) {
+ PacketState state = getPacketState(seq);
+
+ // if the packet is already marked with a state, do nothing
+ // to be considered lost the packet must be pending
+ if (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end()) {
packets_lost_++;
+ addToPacketCache(seq, PacketState::LOST);
}
}
@@ -178,38 +195,46 @@ void RTCState::onRetransmission(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
-#if 0
- packets_lost_++;
-#endif
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
sent_rtx_++;
sent_rtx_last_round_++;
}
+void RTCState::onPossibleLossWithNoRtx(uint32_t seq) {
+ // if fec is on or rtx is disable we don't need to do anything to recover a
+ // packet. however in both cases we need to remove possible missing packets
+ // from the window of pendinig interest in order to free space without wating
+ // for the timeout.
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+}
+
void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
bool compute_stats) {
uint32_t seq = content_object.getName().getSuffix();
+
if (compute_stats) {
updatePathStats(content_object, false);
received_data_last_round_++;
}
received_data_++;
+ packets_sent_to_app_++;
- struct data_packet_t *data_pkt =
- (struct data_packet_t *)content_object.getPayload()->data();
- uint64_t production_time = data_pkt->getTimestamp();
- if (last_prod_update_ < production_time) {
- last_prod_update_ = production_time;
- uint32_t production_rate = data_pkt->getProductionRate();
- production_rate_ = (double)production_rate;
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+
+ if (last_prod_update_seq_ < seq) {
+ last_prod_update_seq_ = seq;
+ production_rate_ = (double)params.prod_rate;
}
updatePacketSize(content_object);
- updateReceivedBytes(content_object);
+ updateReceivedBytes(content_object, false);
addRecvOrLost(seq, PacketState::RECEIVED);
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -219,9 +244,14 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
- updateReceivedBytes(content_object);
+ updateReceivedBytes(content_object, true);
+
+ PacketState state = getPacketState(seq);
+ if (state != PacketState::LOST) {
+ // increase only for not lost packets
+ received_fec_pkt_++;
+ }
addRecvOrLost(seq, PacketState::RECEIVED);
- received_fec_pkt_++;
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -232,14 +262,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
uint32_t seq = nack.getName().getSuffix();
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
- uint64_t production_time = nack_pkt->getTimestamp();
- uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_seq = nack_pkt->getProductionSegment();
uint32_t production_rate = nack_pkt->getProductionRate();
if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
- last_prod_update_ < production_time) {
+ last_prod_update_seq_ < production_seq) {
// update production rate
- last_prod_update_ = production_time;
last_production_seq_ = production_seq;
production_rate_ = (double)production_rate;
}
@@ -247,7 +275,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (compute_stats) {
// this is not an RTX
updatePathStats(nack, true);
- nack_on_last_round_ = true;
}
// for statistics pourpose we log all nacks, also the one received for
@@ -255,83 +282,132 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
received_nacks_++;
received_nacks_last_round_++;
+ bool to_delete = false;
if (production_seq > seq) {
// old nack, seq is lost
// update last nacked
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "lost packet " << seq << " beacuse of a past nack";
+ if (compute_stats) past_nack_on_last_round_ = true;
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
// remove the nack from the pending interest map
// (the packet is not received/lost yet)
- if (indexer_->isFec(seq)) pending_fec_pkt_--;
- pending_interests_.erase(seq);
+ to_delete = true;
} else {
// this should be a quite rear event. simply remove the
// packet from the pending interest list
- pending_interests_.erase(seq);
+ to_delete = true;
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence number is not 1
- if (production_rate_ != 0 || production_seq != 1) {
- producer_is_active_ = true;
+ if (to_delete) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
}
received_packets_last_round_++;
}
void RTCState::onPacketLost(uint32_t seq) {
-#if 0
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
- auto it = pending_interests_.find(seq);
- if (it != pending_interests_.end()) {
- // this packet was never retransmitted so it does
- // not appear in the loss count
- packets_lost_++;
- }
-#endif
if (!indexer_->isFec(seq)) {
- definitely_lost_pkt_++;
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST ||
+ (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end())) {
+ definitely_lost_pkt_++;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ }
}
- addRecvOrLost(seq, PacketState::LOST);
+
+ addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
}
-void RTCState::onPacketRecoveredRtx(uint32_t seq) {
- losses_recovered_++;
+void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt) {
+ uint32_t seq = content_object.getName().getSuffix();
+ packets_sent_to_app_++;
+
+ // increase the recovered packet counter only if the packet was marked as LOST
+ // before.
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST) losses_recovered_++;
+
addRecvOrLost(seq, PacketState::RECEIVED);
+ updateReceivedBytes(content_object, false);
+
+ if (rtt == 0) return; // nothing to do
+
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+ if (path_it == path_table_.end()) {
+ // this is a new path and it must be a cache
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+ if (path->pathToProducer())
+ return; // this packet is coming from a producer
+ // even if we sent an RTX. this may happen
+ // for RTX that are sent too fast or in
+ // case of multipath
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
}
-void RTCState::onPacketRecoveredFec(uint32_t seq) {
+void RTCState::onFecPacketRecoveredRtx(
+ const core::ContentObject &content_object) {
+ // This is the same as onPacketRecoveredRtx, but in this is case the
+ // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
losses_recovered_++;
+ updateReceivedBytes(content_object, true);
+}
+
+void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
+ losses_recovered_++;
+ packets_sent_to_app_++;
+ recovered_bytes_with_fec_ += size;
+
+ // adding header to the count
+ recovered_bytes_with_fec_ += 60; // XXX get header size some where
+
+ // the packet could be not marked as lost yet. onLossDetected checks if add in
+ // the packet in the lost count or not
+ onLossDetected(seq);
+
addRecvOrLost(seq, PacketState::RECEIVED);
}
bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
- uint64_t rtt;
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
- rtt = rtt_probes_->getRtt(seq);
+ bool is_valid = true;
+ uint32_t max = UINT32_MAX;
+ if (params.prod_rate == max) is_valid = false;
+ uint64_t rtt;
+ rtt = probe_handler_->getRtt(seq, is_valid);
if (rtt == 0) return false; // this is not a valid probe
- // like for data and nacks update the path stats. Here the RTT is computed
- // by the probe handler. Both probes for rtt and bw are good to esimate
- // info on the path
- uint32_t path_label = probe.getPathLabel();
+ if (!is_valid) return false; // not a valid probe
- auto path_it = path_table_.find(path_label);
+ // if we are here the producer is active
+ producer_is_active_ = true;
- // update production rate and last_seq_nacked like in case of a nack
- struct nack_packet_t *probe_pkt =
- (struct nack_packet_t *)probe.getPayload()->data();
- uint64_t sender_timestamp = probe_pkt->getTimestamp();
- uint32_t production_seq = probe_pkt->getProductionSegement();
- uint32_t production_rate = probe_pkt->getProductionRate();
+ // Like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to estimate
+ // info on the path.
+ uint32_t path_label = probe.getPathLabel();
+ auto path_it = path_table_.find(path_label);
if (path_it == path_table_.end()) {
// found a new path
@@ -344,27 +420,17 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
auto path = path_it->second;
- path->insertRttSample(rtt);
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
path->receivedNack();
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
- int64_t OWD = now - sender_timestamp;
+ int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
- if (last_prod_update_ < sender_timestamp) {
- last_production_seq_ = production_seq;
- last_prod_update_ = sender_timestamp;
- production_rate_ = (double)production_rate;
- }
-
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence numner is not 1
- if (production_rate_ != 0 || production_seq != 1) {
- producer_is_active_ = true;
+ if (last_prod_update_seq_ < params.prod_seg) {
+ last_production_seq_ = params.prod_seg;
+ production_rate_ = (double)params.prod_rate;
}
// check for init RTT. if received_probes_ is equal to 0 schedule a timer to
@@ -375,7 +441,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) {
if (received_probes_ == 1) {
// we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the
- // others
+ // others.
main_path_ = path;
setInitRttTimer(INIT_RTT_PROBE_WAIT);
}
@@ -393,11 +459,21 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
return true;
}
-void RTCState::onNewRound(double round_len, bool in_sync) {
- // XXX
- // here we take into account only the single path case so we assume that we
- // don't use two paths in parellel for this single flow
+void RTCState::onJumpForward(uint32_t next_seq) {
+ for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
+ seq++) {
+ PacketState packet_state = getPacketState(seq);
+ if (packet_state != PacketState::RECEIVED &&
+ packet_state != PacketState::DEFINITELY_LOST) {
+ // here we considere the packet as definitely lost whitout increase the
+ // lost packet counter because this loss is not due to the network
+ // condition but the transport wants to skip the packet
+ onPacketLost(seq);
+ }
+ }
+}
+void RTCState::onNewRound(double round_len, bool in_sync) {
if (path_table_.empty()) return;
double bytes_per_sec =
@@ -407,33 +483,65 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
else
received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+ double fec_bytes_per_sec =
+ ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len));
+
+ if (fec_received_rate_ == 0)
+ fec_received_rate_ = fec_bytes_per_sec;
+ else
+ fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec);
- // search for an active path. There should be only one active path (meaning a
- // path that leads to the producer socket -no cache- and from which we are
- // currently getting data packets) at any time. However it may happen that
- // there are mulitple active paths in case of mobility (the old path will
- // remain active for a short ammount of time). The main path is selected as
- // the active path from where the consumer received the latest data packet
+ double fec_recovered_bytes_per_sec =
+ ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len));
- uint64_t last_packet_ts = 0;
+ if (fec_recovered_rate_ == 0)
+ fec_recovered_rate_ = fec_recovered_bytes_per_sec;
+ else
+ fec_recovered_rate_ =
+ (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
+
+ // search for an active path. Is it possible to have multiple path that are
+ // used at the same time. We use as reference path the one from where we gets
+ // more packets. This means that the path should have better lantecy or less
+ // channel losses
+
+ uint32_t last_round_packets = 0;
+ uint64_t min_edge_rtt = UINT_MAX;
+ std::shared_ptr<RTCDataPath> old_main_path = main_path_;
main_path_ = nullptr;
+ edge_path_ = nullptr;
for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->roundEnd();
- if (it->second->isActive()) {
- uint64_t ts = it->second->getLastPacketTS();
- if (ts > last_packet_ts) {
- last_packet_ts = ts;
+ if (it->second->isValidProducer()) {
+ uint32_t pkt = it->second->getPacketsLastRound();
+ if (pkt > last_round_packets) {
+ last_round_packets = pkt;
main_path_ = it->second;
}
+ } else if (it->second->isActive() && !it->second->pathToProducer()) {
+ // this is a path to a cache from where we are receiving content
+ if (it->second->getMinRtt() < min_edge_rtt) {
+ min_edge_rtt = it->second->getMinRtt();
+ edge_path_ = it->second;
+ }
}
+ it->second->roundEnd();
}
- // if (in_sync) updateLossRate();
- updateLossRate();
+ if (main_path_ == nullptr) main_path_ = old_main_path;
+ if (edge_path_ == nullptr) edge_path_ = main_path_;
+ if (edge_path_->getMinRtt() >= main_path_->getMinRtt())
+ edge_path_ = main_path_;
+
+ // in case we get a new main path we reset the stats of the old one. this is
+ // beacuse, in case we need to switch back we don't what to take decisions on
+ // old stats that may be outdated.
+ if (main_path_ != old_main_path) old_main_path->clearRtt();
+
+ updateLossRate(in_sync);
// handle nacks
- if (!nack_on_last_round_ && received_bytes_ > 0) {
+ if (!past_nack_on_last_round_ && received_bytes_ > 0) {
rounds_without_nacks_++;
} else {
rounds_without_nacks_ = 0;
@@ -450,22 +558,16 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- // compute cache/producer ratio
- if (received_data_last_round_ != 0) {
- double new_rate =
- (double)received_data_from_cache_ / (double)received_data_last_round_;
- data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
- (new_rate * (1 - MOVING_AVG_ALPHA));
- }
-
// reset counters
received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
packets_lost_ = 0;
definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
received_packets_last_round_ = 0;
@@ -479,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
rounds_++;
}
-void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
- received_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec) {
+ if (isFec) {
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ } else {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ }
}
void RTCState::updatePacketSize(const core::ContentObject &content_object) {
@@ -516,20 +624,16 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
// it means that we are processing an interest
// that is not pending
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t RTT = now - interest_sent_time;
- path->insertRttSample(RTT);
+ path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false);
// compute OWD (the first part of the nack and data packet header are the
// same, so we cast to data data packet)
- struct data_packet_t *packet =
- (struct data_packet_t *)content_object.getPayload()->data();
- uint64_t sender_timestamp = packet->getTimestamp();
- int64_t OWD = now - sender_timestamp;
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+ int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
// compute IAT or set path to producer
@@ -543,59 +647,110 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
}
}
-void RTCState::updateLossRate() {
+void RTCState::updateLossRate(bool in_sync) {
last_round_loss_rate_ = loss_rate_;
loss_rate_ = 0.0;
- residual_loss_rate_ = 0.0;
uint32_t number_theorically_received_packets_ =
highest_seq_received_ - first_seq_in_round_;
- // in this case no new packet was recevied after the previuos round, avoid
- // division by 0
- if (number_theorically_received_packets_ == 0) return;
-
// XXX this may be quite inefficient if the rate is high
// maybe is better to iterate over the set?
- for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) {
- auto it = skipped_interests_.find(i);
- if (it != skipped_interests_.end()) {
+
+ uint32_t fec_packets = 0;
+ for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) {
+ PacketState state = getPacketState(i);
+ if (state == PacketState::SKIPPED) {
if (number_theorically_received_packets_ > 0)
number_theorically_received_packets_--;
- skipped_interests_.erase(it);
}
+ if (indexer_->isFec(i)) fec_packets++;
}
+ if (indexer_->isFec(highest_seq_received_)) fec_packets++;
- loss_rate_ = (double)((double)(packets_lost_) /
- (double)number_theorically_received_packets_);
+ // in this case no new packet was received after the previous round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return;
- if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec
- if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_;
+ if (number_theorically_received_packets_ != 0)
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+ else
+ // we didn't receive anything except NACKs that triggered losses
+ loss_rate_ = 1.0;
- if (avg_loss_rate_ == 0)
+ if (avg_loss_rate_ == -1.0)
avg_loss_rate_ = loss_rate_;
else
avg_loss_rate_ =
avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA);
- residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
- (double)number_theorically_received_packets_);
+ // update counters for loss rate per second
+ total_expected_packets_ += number_theorically_received_packets_;
+ lost_per_sec_ += packets_lost_;
+
+ if (in_sync) {
+ // update counters for residual losses
+ // fec packets are not sent to the app so we don't want to count them here
+ expected_packets_ +=
+ ((highest_seq_received_ - first_seq_in_round_) - fec_packets);
+ } else {
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ }
+
+ if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) {
+ // compute loss rate per second
+ if (lost_per_sec_ > total_expected_packets_)
+ lost_per_sec_ = total_expected_packets_;
+
+ if (total_expected_packets_ == 0)
+ per_sec_loss_rate_ = 0;
+ else
+ per_sec_loss_rate_ =
+ (double)((double)(lost_per_sec_) / (double)total_expected_packets_);
+
+ loss_history_.pushBack(per_sec_loss_rate_);
+
+ if (in_sync && expected_packets_ != 0) {
+ // compute residual loss rate
+ if (packets_sent_to_app_ > expected_packets_) {
+ // this may happen if we get packet from the prev bin that get recovered
+ // on the current one
+ packets_sent_to_app_ = expected_packets_;
+ }
+
+ residual_loss_rate_ =
+ 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_);
+ if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0;
+ }
- if (residual_loss_rate_ < 0) residual_loss_rate_ = 0;
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
+ }
+
+ rounds_from_last_compute_++;
+}
+
+void RTCState::dataToBeReceived(uint32_t seq) {
+ addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
+}
+
+void RTCState::updateHighestSeqReceived(uint32_t seq) {
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
}
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
- if (indexer_->isFec(seq)) {
- pending_fec_pkt_--;
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
- pending_interests_.erase(seq);
- if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
- received_or_lost_packets_.erase(received_or_lost_packets_.begin());
- }
- // notice that it may happen that a packet that we consider lost arrives after
- // some time, in this case we simply overwrite the packet state.
- received_or_lost_packets_[seq] = state;
+ addToPacketCache(seq, state);
// keep track of the last packet received/lost
// without holes.
@@ -608,16 +763,27 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
} else if (seq <= highest_seq_received_in_order_) {
// here we do nothing
} else if (seq > highest_seq_received_in_order_) {
- // 1) there is a gap in the sequence so we do not update largest_in_seq_
- // 2) all the packets from largest_in_seq_ to seq are in
- // received_or_lost_packets_ an we upate largest_in_seq_
- // or are FEC packets
+ // 1) there is a gap in the sequence so we do not update
+ // highest_seq_received_in_order_
+ // 2) all the packets from highest_seq_received_in_order_ to seq are
+ // received or lost or are fec packetis. In this case we increase
+ // highest_seq_received_in_order_ until we find an hole in the sequence
for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
- if (received_or_lost_packets_.find(i) ==
- received_or_lost_packets_.end() &&
- !indexer_->isFec(i)) {
- break;
+ PacketState state = getPacketState(i);
+ if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) {
+ if (indexer_->isFec(i)) {
+ // this is a fec packet and we don't care to receive it
+ // however we may need to increse the number or lost packets
+ // XXX: in case we want to use rtx to recover fec packets,
+ // this may prevent to detect a packet loss and no rtx will be sent
+ if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) {
+ onLossDetected(i);
+ }
+ } else {
+ // this is a data packet and we need to get it
+ break;
+ }
}
// this packet is in order so we can update the
// highest_seq_received_in_order_
@@ -629,36 +795,104 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
void RTCState::setInitRttTimer(uint32_t wait) {
init_rtt_timer_->cancel();
init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
- init_rtt_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCState> self = shared_from_this();
+ init_rtt_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- checkInitRttTimer();
+
+ if (auto ptr = self.lock()) {
+ ptr->checkInitRttTimer();
+ }
});
}
void RTCState::checkInitRttTimer() {
- if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
- // we didn't received enough probes, restart
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV ||
+ probe_handler_->getProbeLossRate() == 1.0) {
+ // we didn't received enough probes or they were not valid, restart
received_probes_ = 0;
- rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
- rtt_probes_->sendProbes();
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
setInitRttTimer(INIT_RTT_PROBE_RESTART);
return;
}
+
init_rtt_ = true;
main_path_->roundEnd();
- rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0);
- rtt_probes_->sendProbes();
+ loss_history_.pushBack(probe_handler_->getProbeLossRate());
+
+ probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
+ probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
+ probe_handler_->sendProbes();
// init last_seq_nacked_. skip packets that may come from the cache
double prod_rate = getProducerRate();
- double rtt = (double)getRTT() / MILLI_IN_A_SEC;
+ double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt));
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();
}
+core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params;
+
+ switch (ProbeHandler::getProbeType(seq)) {
+ case ProbeType::INIT: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(probe).shared_from_this());
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ case ProbeType::RTT: {
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = probe_pkt->getTimestamp(),
+ .prod_rate = probe_pkt->getProductionRate(),
+ .prod_seg = probe_pkt->getProductionSegment(),
+ };
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
+core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
+ core::ParamsRTC params;
+
+ switch (data.getPayloadType()) {
+ case core::PayloadType::DATA: {
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)data.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = data_pkt->getTimestamp(),
+ .prod_rate = data_pkt->getProductionRate(),
+ .prod_seg = data.getName().getSuffix(),
+ };
+ break;
+ }
+ case core::PayloadType::MANIFEST: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(data).shared_from_this());
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
} // namespace rtc
} // namespace protocol