diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc_state.cc | 152 |
1 files changed, 129 insertions, 23 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 9c965bfed..c99205a26 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <glog/logging.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_state.h> @@ -22,11 +23,13 @@ namespace protocol { namespace rtc { -RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, +RTCState::RTCState(Indexer *indexer, + ProbeHandler::SendProbeCallback &&rtt_probes_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service) - : rtt_probes_(std::make_shared<ProbeHandler>( - std::move(rtt_probes_callback), io_service)), + : indexer_(indexer), + rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback), + io_service)), discovered_rtt_callback_(std::move(discovered_rtt_callback)) { init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); initParams(); @@ -45,14 +48,22 @@ void RTCState::initParams() { // loss counters packets_lost_ = 0; + definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = 0; highest_seq_received_ = 0; highest_seq_received_in_order_ = 0; last_seq_nacked_ = 0; loss_rate_ = 0.0; + avg_loss_rate_ = 0.0; + max_loss_rate_ = 0.0; + last_round_loss_rate_ = 0.0; residual_loss_rate_ = 0.0; + // fec counters + pending_fec_pkt_ = 0; + received_fec_pkt_ = 0; + // bw counters received_bytes_ = 0; avg_packet_size_ = INIT_PACKET_SIZE; @@ -90,8 +101,14 @@ void RTCState::initParams() { // pending interests pending_interests_.clear(); + // skipped interest + last_interest_sent_ = 0; + skipped_interests_.clear(); + // init rtt - first_interest_sent_ = ~0; + first_interest_sent_time_ = ~0; + first_interest_sent_seq_ = 0; + init_rtt_ = false; rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); rtt_probes_->sendProbes(); @@ -106,18 +123,51 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) { uint32_t seq = interest_name->getSuffix(); pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); - if(sent_interests_ == 0) first_interest_sent_ = now; + if (sent_interests_ == 0) { + first_interest_sent_time_ = now; + first_interest_sent_seq_ = seq; + } + + if (indexer_->isFec(seq)) { + pending_fec_pkt_++; + } + + if (last_interest_sent_ == 0 && seq != 0) { + last_interest_sent_ = seq; // init last interest sent + } + + // TODO what happen in case of jumps? + // look for skipped interests + skipped_interests_.erase(seq); // remove seq if it is there + for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { + if (indexer_->isFec(i)) { + skipped_interests_.insert(i); + } + } + + last_interest_sent_ = seq; sent_interests_++; sent_interests_last_round_++; } -void RTCState::onTimeout(uint32_t seq) { +void RTCState::onTimeout(uint32_t seq, bool lost) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); } received_timeouts_++; + + if (lost) onPacketLost(seq); +} + +void RTCState::onLossDetected(uint32_t seq) { + if (!indexer_->isFec(seq)) { + packets_lost_++; + } else if (skipped_interests_.find(seq) == skipped_interests_.end() && + seq >= first_interest_sent_seq_) { + packets_lost_++; + } } void RTCState::onRetransmission(uint32_t seq) { @@ -128,7 +178,9 @@ void RTCState::onRetransmission(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); +#if 0 packets_lost_++; +#endif } sent_rtx_++; sent_rtx_last_round_++; @@ -165,6 +217,16 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, received_packets_last_round_++; } +void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { + uint32_t seq = content_object.getName().getSuffix(); + updateReceivedBytes(content_object); + addRecvOrLost(seq, PacketState::RECEIVED); + received_fec_pkt_++; + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; +} + void RTCState::onNackPacketReceived(const core::ContentObject &nack, bool compute_stats) { uint32_t seq = nack.getName().getSuffix(); @@ -197,12 +259,14 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, // old nack, seq is lost // update last nacked if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; - TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "lost packet " << seq << " beacuse of a past nack"; onPacketLost(seq); } else if (seq > production_seq) { // future nack // remove the nack from the pending interest map // (the packet is not received/lost yet) + if (indexer_->isFec(seq)) pending_fec_pkt_--; pending_interests_.erase(seq); } else { // this should be a quite rear event. simply remove the @@ -221,17 +285,28 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, } void RTCState::onPacketLost(uint32_t seq) { - TRANSPORT_LOGD("packet %u is lost", seq); +#if 0 + DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost"; auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { // this packet was never retransmitted so it does // not appear in the loss count packets_lost_++; } +#endif + if (!indexer_->isFec(seq)) { + definitely_lost_pkt_++; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + } addRecvOrLost(seq, PacketState::LOST); } -void RTCState::onPacketRecovered(uint32_t seq) { +void RTCState::onPacketRecoveredRtx(uint32_t seq) { + losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); +} + +void RTCState::onPacketRecoveredFec(uint32_t seq) { losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); } @@ -258,7 +333,6 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t production_seq = probe_pkt->getProductionSegement(); uint32_t production_rate = probe_pkt->getProductionRate(); - if (path_it == path_table_.end()) { // found a new path std::shared_ptr<RTCDataPath> newPath = @@ -298,13 +372,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { // wait forever received_probes_++; - if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){ - if(received_probes_ == 1){ - // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others + if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { + if (received_probes_ == 1) { + // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the + // others main_path_ = path; setInitRttTimer(INIT_RTT_PROBE_WAIT); } - if(received_probes_ == INIT_RTT_PROBES) { + if (received_probes_ == INIT_RTT_PROBES) { // we are done init_rtt_timer_->cancel(); checkInitRttTimer(); @@ -314,7 +389,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { received_packets_last_round_++; // ignore probes sent before the first interest - if((now - rtt) <= first_interest_sent_) return false; + if ((now - rtt) <= first_interest_sent_time_) return false; return true; } @@ -327,11 +402,11 @@ void RTCState::onNewRound(double round_len, bool in_sync) { double bytes_per_sec = ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); - if(received_rate_ == 0) + if (received_rate_ == 0) received_rate_ = bytes_per_sec; else received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + - ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); // search for an active path. There should be only one active path (meaning a // path that leads to the producer socket -no cache- and from which we are @@ -354,7 +429,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } - if (in_sync) updateLossRate(); + // if (in_sync) updateLossRate(); + updateLossRate(); // handle nacks if (!nack_on_last_round_ && received_bytes_ > 0) { @@ -385,6 +461,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // reset counters received_bytes_ = 0; packets_lost_ = 0; + definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = highest_seq_received_; @@ -397,6 +474,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; + received_fec_pkt_ = 0; + rounds_++; } @@ -465,6 +544,7 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, } void RTCState::updateLossRate() { + last_round_loss_rate_ = loss_rate_; loss_rate_ = 0.0; residual_loss_rate_ = 0.0; @@ -475,9 +555,29 @@ void RTCState::updateLossRate() { // division by 0 if (number_theorically_received_packets_ == 0) return; + // XXX this may be quite inefficient if the rate is high + // maybe is better to iterate over the set? + for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) { + auto it = skipped_interests_.find(i); + if (it != skipped_interests_.end()) { + if (number_theorically_received_packets_ > 0) + number_theorically_received_packets_--; + skipped_interests_.erase(it); + } + } + loss_rate_ = (double)((double)(packets_lost_) / (double)number_theorically_received_packets_); + if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec + if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_; + + if (avg_loss_rate_ == 0) + avg_loss_rate_ = loss_rate_; + else + avg_loss_rate_ = + avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); + residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / (double)number_theorically_received_packets_); @@ -485,6 +585,10 @@ void RTCState::updateLossRate() { } void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { + if (indexer_->isFec(seq)) { + pending_fec_pkt_--; + } + pending_interests_.erase(seq); if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { received_or_lost_packets_.erase(received_or_lost_packets_.begin()); @@ -507,10 +611,12 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { // 1) there is a gap in the sequence so we do not update largest_in_seq_ // 2) all the packets from largest_in_seq_ to seq are in // received_or_lost_packets_ an we upate largest_in_seq_ + // or are FEC packets for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { if (received_or_lost_packets_.find(i) == - received_or_lost_packets_.end()) { + received_or_lost_packets_.end() && + !indexer_->isFec(i)) { break; } // this packet is in order so we can update the @@ -520,17 +626,17 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { } } -void RTCState::setInitRttTimer(uint32_t wait){ +void RTCState::setInitRttTimer(uint32_t wait) { init_rtt_timer_->cancel(); init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); init_rtt_timer_->async_wait([this](std::error_code ec) { - if(ec) return; + if (ec) return; checkInitRttTimer(); }); } void RTCState::checkInitRttTimer() { - if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){ + if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { // we didn't received enough probes, restart received_probes_ = 0; rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); @@ -547,7 +653,7 @@ void RTCState::checkInitRttTimer() { double prod_rate = getProducerRate(); double rtt = (double)getRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); - uint32_t pkt_in_rtt_ = (uint32_t)std::floor(((prod_rate / packet_size) * rtt) * 0.8); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; discovered_rtt_callback_(); |