diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc_state.cc | 486 |
1 files changed, 362 insertions, 124 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index c99205a26..6a21531f8 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -24,15 +24,15 @@ namespace protocol { namespace rtc { RTCState::RTCState(Indexer *indexer, - ProbeHandler::SendProbeCallback &&rtt_probes_callback, + ProbeHandler::SendProbeCallback &&probe_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service) - : indexer_(indexer), - rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback), - io_service)), + : loss_history_(10), // log 10sec history + indexer_(indexer), + probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback), + io_service)), discovered_rtt_callback_(std::move(discovered_rtt_callback)) { init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); - initParams(); } RTCState::~RTCState() {} @@ -55,9 +55,19 @@ void RTCState::initParams() { highest_seq_received_in_order_ = 0; last_seq_nacked_ = 0; loss_rate_ = 0.0; - avg_loss_rate_ = 0.0; + avg_loss_rate_ = -1.0; max_loss_rate_ = 0.0; last_round_loss_rate_ = 0.0; + + // loss rate per sec + lost_per_sec_ = 0; + total_expected_packets_ = 0; + per_sec_loss_rate_ = 0.0; + + // residual losses counters + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; residual_loss_rate_ = 0.0; // fec counters @@ -66,9 +76,13 @@ void RTCState::initParams() { // bw counters received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; + avg_packet_size_ = INIT_PACKET_SIZE; production_rate_ = 0.0; received_rate_ = 0.0; + fec_recovered_rate_ = 0.0; // nack counter nack_on_last_round_ = false; @@ -95,31 +109,30 @@ void RTCState::initParams() { path_table_.clear(); main_path_ = nullptr; - // packet received - received_or_lost_packets_.clear(); + // packet cache (not pending anymore) + packet_cache_.clear(); // pending interests pending_interests_.clear(); - // skipped interest + // used to keep track of the skipped interest last_interest_sent_ = 0; - skipped_interests_.clear(); // init rtt first_interest_sent_time_ = ~0; first_interest_sent_seq_ = 0; + // start probing the producer init_rtt_ = false; - rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); - rtt_probes_->sendProbes(); + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); } // packet events void RTCState::onSendNewInterest(const core::Name *interest_name) { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t seq = interest_name->getSuffix(); pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); @@ -137,11 +150,12 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) { } // TODO what happen in case of jumps? - // look for skipped interests - skipped_interests_.erase(seq); // remove seq if it is there + eraseFromPacketCache( + seq); // if we send this interest we don't know its state for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { if (indexer_->isFec(i)) { - skipped_interests_.insert(i); + // only fec packets can be skipped + addToPacketCache(i, PacketState::SKIPPED); } } @@ -155,6 +169,7 @@ void RTCState::onTimeout(uint32_t seq, bool lost) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; } received_timeouts_++; @@ -162,11 +177,12 @@ void RTCState::onTimeout(uint32_t seq, bool lost) { } void RTCState::onLossDetected(uint32_t seq) { - if (!indexer_->isFec(seq)) { - packets_lost_++; - } else if (skipped_interests_.find(seq) == skipped_interests_.end() && - seq >= first_interest_sent_seq_) { + PacketState state = getPacketState(seq); + + // if the packet is already marked with a state, do nothing + if (state == PacketState::UNKNOWN) { packets_lost_++; + addToPacketCache(seq, PacketState::LOST); } } @@ -178,30 +194,40 @@ void RTCState::onRetransmission(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); -#if 0 - packets_lost_++; -#endif + if (indexer_->isFec(seq)) pending_fec_pkt_--; } sent_rtx_++; sent_rtx_last_round_++; } +void RTCState::onPossibleLossWithNoRtx(uint32_t seq) { + // if fec is on or rtx is disable we don't need to do anything to recover a + // packet. however in both cases we need to remove possible missing packets + // from the window of pendinig interest in order to free space without wating + // for the timeout. + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } +} + void RTCState::onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats) { uint32_t seq = content_object.getName().getSuffix(); + if (compute_stats) { updatePathStats(content_object, false); received_data_last_round_++; } received_data_++; + packets_sent_to_app_++; - struct data_packet_t *data_pkt = - (struct data_packet_t *)content_object.getPayload()->data(); - uint64_t production_time = data_pkt->getTimestamp(); - if (last_prod_update_ < production_time) { - last_prod_update_ = production_time; - uint32_t production_rate = data_pkt->getProductionRate(); - production_rate_ = (double)production_rate; + core::ParamsRTC params = RTCState::getDataParams(content_object); + + if (last_prod_update_ < params.timestamp) { + last_prod_update_ = params.timestamp; + production_rate_ = (double)params.prod_rate; } updatePacketSize(content_object); @@ -219,9 +245,18 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); - updateReceivedBytes(content_object); + // updateReceivedBytes(content_object); + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + PacketState state = getPacketState(seq); + if (state != PacketState::LOST) { + // increase only for not lost packets + received_fec_pkt_++; + } addRecvOrLost(seq, PacketState::RECEIVED); - received_fec_pkt_++; // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; @@ -233,7 +268,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); uint64_t production_time = nack_pkt->getTimestamp(); - uint32_t production_seq = nack_pkt->getProductionSegement(); + uint32_t production_seq = nack_pkt->getProductionSegment(); uint32_t production_rate = nack_pkt->getProductionRate(); if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || @@ -255,6 +290,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, received_nacks_++; received_nacks_last_round_++; + bool to_delete = false; if (production_seq > seq) { // old nack, seq is lost // update last nacked @@ -266,12 +302,19 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, // future nack // remove the nack from the pending interest map // (the packet is not received/lost yet) - if (indexer_->isFec(seq)) pending_fec_pkt_--; - pending_interests_.erase(seq); + to_delete = true; } else { // this should be a quite rear event. simply remove the // packet from the pending interest list - pending_interests_.erase(seq); + to_delete = true; + } + + if (to_delete) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } } // the producer is responding @@ -295,44 +338,58 @@ void RTCState::onPacketLost(uint32_t seq) { } #endif if (!indexer_->isFec(seq)) { - definitely_lost_pkt_++; - DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + PacketState state = getPacketState(seq); + if (state == PacketState::LOST || state == PacketState::UNKNOWN) { + definitely_lost_pkt_++; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + } } - addRecvOrLost(seq, PacketState::LOST); + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } void RTCState::onPacketRecoveredRtx(uint32_t seq) { + packets_sent_to_app_++; + if (seq > highest_seq_received_) highest_seq_received_ = seq; losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); } -void RTCState::onPacketRecoveredFec(uint32_t seq) { +void RTCState::onFecPacketRecoveredRtx(uint32_t seq) { + // This is the same as onPacketRecoveredRtx, but in this is case the + // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards + if (seq > highest_seq_received_) highest_seq_received_ = seq; + losses_recovered_++; +} + +void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { losses_recovered_++; + packets_sent_to_app_++; + recovered_bytes_with_fec_ += size; + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + // adding header to the count + recovered_bytes_with_fec_ += 60; // XXX get header size some where + + if (getPacketState(seq) == PacketState::UNKNOWN) + onLossDetected(seq); // the pkt was lost but didn't account for it yet + addRecvOrLost(seq, PacketState::RECEIVED); } bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); - uint64_t rtt; - - rtt = rtt_probes_->getRtt(seq); + uint64_t rtt; + rtt = probe_handler_->getRtt(seq); if (rtt == 0) return false; // this is not a valid probe - // like for data and nacks update the path stats. Here the RTT is computed - // by the probe handler. Both probes for rtt and bw are good to esimate - // info on the path + // Like for data and nacks update the path stats. Here the RTT is computed + // by the probe handler. Both probes for rtt and bw are good to estimate + // info on the path. uint32_t path_label = probe.getPathLabel(); - auto path_it = path_table_.find(path_label); - // update production rate and last_seq_nacked like in case of a nack - struct nack_packet_t *probe_pkt = - (struct nack_packet_t *)probe.getPayload()->data(); - uint64_t sender_timestamp = probe_pkt->getTimestamp(); - uint32_t production_seq = probe_pkt->getProductionSegement(); - uint32_t production_rate = probe_pkt->getProductionRate(); - if (path_it == path_table_.end()) { // found a new path std::shared_ptr<RTCDataPath> newPath = @@ -344,26 +401,26 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { auto path = path_it->second; - path->insertRttSample(rtt); + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); path->receivedNack(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); + + core::ParamsRTC params = RTCState::getProbeParams(probe); - int64_t OWD = now - sender_timestamp; + int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); - if (last_prod_update_ < sender_timestamp) { - last_production_seq_ = production_seq; - last_prod_update_ = sender_timestamp; - production_rate_ = (double)production_rate; + if (last_prod_update_ < params.timestamp) { + last_production_seq_ = params.prod_seg; + last_prod_update_ = params.timestamp; + production_rate_ = (double)params.prod_rate; } // the producer is responding // we consider it active only if the production rate is not 0 // or the production sequence numner is not 1 - if (production_rate_ != 0 || production_seq != 1) { + if (production_rate_ != 0 || params.prod_seg != 1) { producer_is_active_ = true; } @@ -375,7 +432,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { if (received_probes_ == 1) { // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the - // others + // others. main_path_ = path; setInitRttTimer(INIT_RTT_PROBE_WAIT); } @@ -393,11 +450,21 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { return true; } -void RTCState::onNewRound(double round_len, bool in_sync) { - // XXX - // here we take into account only the single path case so we assume that we - // don't use two paths in parellel for this single flow +void RTCState::onJumpForward(uint32_t next_seq) { + for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; + seq++) { + auto it = pending_interests_.find(seq); + PacketState packet_state = getPacketState(seq); + if (it == pending_interests_.end() && + packet_state != PacketState::RECEIVED && + packet_state != PacketState::DEFINITELY_LOST) { + onLossDetected(seq); + onPacketLost(seq); + } + } +} +void RTCState::onNewRound(double round_len, bool in_sync) { if (path_table_.empty()) return; double bytes_per_sec = @@ -407,7 +474,24 @@ void RTCState::onNewRound(double round_len, bool in_sync) { else received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + double fec_bytes_per_sec = + ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len)); + + if (fec_received_rate_ == 0) + fec_received_rate_ = fec_bytes_per_sec; + else + fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec); + + double fec_recovered_bytes_per_sec = + ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len)); + if (fec_recovered_rate_ == 0) + fec_recovered_rate_ = fec_recovered_bytes_per_sec; + else + fec_recovered_rate_ = + (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); + +#if 0 // search for an active path. There should be only one active path (meaning a // path that leads to the producer socket -no cache- and from which we are // currently getting data packets) at any time. However it may happen that @@ -428,9 +512,36 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } } +#endif + + // search for an active path. Is it possible to have multiple path that are + // used at the same time. We use as reference path the one from where we gets + // more packets. This means that the path should have better lantecy or less + // channel losses + + uint32_t last_round_packets = 0; + std::shared_ptr<RTCDataPath> old_main_path = main_path_; + main_path_ = nullptr; + + for (auto it = path_table_.begin(); it != path_table_.end(); it++) { + if (it->second->isActive()) { + uint32_t pkt = it->second->getPacketsLastRound(); + if (pkt > last_round_packets) { + last_round_packets = pkt; + main_path_ = it->second; + } + } + it->second->roundEnd(); + } - // if (in_sync) updateLossRate(); - updateLossRate(); + if (main_path_ == nullptr) main_path_ = old_main_path; + + // in case we get a new main path we reset the stats of the old one. this is + // beacuse, in case we need to switch back we don't what to take decisions on + // old stats that may be outdated. + if (main_path_ != old_main_path) old_main_path->clearRtt(); + + updateLossRate(in_sync); // handle nacks if (!nack_on_last_round_ && received_bytes_ > 0) { @@ -460,6 +571,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // reset counters received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; packets_lost_ = 0; definitely_lost_pkt_ = 0; losses_recovered_ = 0; @@ -516,20 +629,16 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, // it means that we are processing an interest // that is not pending - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t RTT = now - interest_sent_time; - path->insertRttSample(RTT); + path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false); // compute OWD (the first part of the nack and data packet header are the // same, so we cast to data data packet) - struct data_packet_t *packet = - (struct data_packet_t *)content_object.getPayload()->data(); - uint64_t sender_timestamp = packet->getTimestamp(); - int64_t OWD = now - sender_timestamp; + core::ParamsRTC params = RTCState::getDataParams(content_object); + int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); // compute IAT or set path to producer @@ -543,59 +652,106 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, } } -void RTCState::updateLossRate() { +void RTCState::updateLossRate(bool in_sync) { last_round_loss_rate_ = loss_rate_; loss_rate_ = 0.0; - residual_loss_rate_ = 0.0; uint32_t number_theorically_received_packets_ = highest_seq_received_ - first_seq_in_round_; - // in this case no new packet was recevied after the previuos round, avoid - // division by 0 - if (number_theorically_received_packets_ == 0) return; - // XXX this may be quite inefficient if the rate is high // maybe is better to iterate over the set? - for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) { - auto it = skipped_interests_.find(i); - if (it != skipped_interests_.end()) { + + uint32_t fec_packets = 0; + for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) { + PacketState state = getPacketState(i); + if (state == PacketState::SKIPPED) { if (number_theorically_received_packets_ > 0) number_theorically_received_packets_--; - skipped_interests_.erase(it); } + if (indexer_->isFec(i)) fec_packets++; } + if (indexer_->isFec(highest_seq_received_)) fec_packets++; - loss_rate_ = (double)((double)(packets_lost_) / - (double)number_theorically_received_packets_); + // in this case no new packet was received after the previous round, avoid + // division by 0 + if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return; - if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec - if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_; + if (number_theorically_received_packets_ != 0) + loss_rate_ = (double)((double)(packets_lost_) / + (double)number_theorically_received_packets_); + else + // we didn't receive anything except NACKs that triggered losses + loss_rate_ = 1.0; - if (avg_loss_rate_ == 0) + if (avg_loss_rate_ == -1.0) avg_loss_rate_ = loss_rate_; else avg_loss_rate_ = avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); - residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / - (double)number_theorically_received_packets_); + // update counters for loss rate per second + total_expected_packets_ += number_theorically_received_packets_; + lost_per_sec_ += packets_lost_; + + if (in_sync) { + // update counters for residual losses + // fec packets are not sent to the app so we don't want to count them here + expected_packets_ += + ((highest_seq_received_ - first_seq_in_round_) - fec_packets); + } else { + packets_sent_to_app_ = 0; + } + + if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) { + // compute loss rate per second + if (lost_per_sec_ > total_expected_packets_) + lost_per_sec_ = total_expected_packets_; + + if (total_expected_packets_ == 0) + per_sec_loss_rate_ = 0; + else + per_sec_loss_rate_ = + (double)((double)(lost_per_sec_) / (double)total_expected_packets_); + + loss_history_.pushBack(per_sec_loss_rate_); + max_loss_rate_ = getMaxLoss(); + + if (in_sync && expected_packets_ != 0) { + // compute residual loss rate + if (packets_sent_to_app_ > expected_packets_) { + // this may happen if we get packet from the prev bin that get recovered + // on the current one + packets_sent_to_app_ = expected_packets_; + } + + residual_loss_rate_ = + 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_); + if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0; + } + + lost_per_sec_ = 0; + total_expected_packets_ = 0; + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; + } + + rounds_from_last_compute_++; +} - if (residual_loss_rate_ < 0) residual_loss_rate_ = 0; +void RTCState::dataToBeReceived(uint32_t seq) { + addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { - if (indexer_->isFec(seq)) { - pending_fec_pkt_--; + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; } - pending_interests_.erase(seq); - if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { - received_or_lost_packets_.erase(received_or_lost_packets_.begin()); - } - // notice that it may happen that a packet that we consider lost arrives after - // some time, in this case we simply overwrite the packet state. - received_or_lost_packets_[seq] = state; + addToPacketCache(seq, state); // keep track of the last packet received/lost // without holes. @@ -608,16 +764,25 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { } else if (seq <= highest_seq_received_in_order_) { // here we do nothing } else if (seq > highest_seq_received_in_order_) { - // 1) there is a gap in the sequence so we do not update largest_in_seq_ - // 2) all the packets from largest_in_seq_ to seq are in - // received_or_lost_packets_ an we upate largest_in_seq_ - // or are FEC packets + // 1) there is a gap in the sequence so we do not update + // highest_seq_received_in_order_ + // 2) all the packets from highest_seq_received_in_order_ to seq are + // received or lost or are fec packetis. In this case we increase + // highest_seq_received_in_order_ until we find an hole in the sequence for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { - if (received_or_lost_packets_.find(i) == - received_or_lost_packets_.end() && - !indexer_->isFec(i)) { - break; + PacketState state = getPacketState(i); + if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) { + if (indexer_->isFec(i)) { + // this is a fec packet and we don't care to receive it + // however we may need to increse the number or lost packets + // XXX: in case we want to use rtx to recover fec packets, + // this may prevent to detect a packet loss and no rtx will be sent + onLossDetected(i); + } else { + // this is a data packet and we need to get it + break; + } } // this packet is in order so we can update the // highest_seq_received_in_order_ @@ -629,9 +794,14 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { void RTCState::setInitRttTimer(uint32_t wait) { init_rtt_timer_->cancel(); init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); - init_rtt_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCState> self = shared_from_this(); + init_rtt_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - checkInitRttTimer(); + + if (auto ptr = self.lock()) { + ptr->checkInitRttTimer(); + } }); } @@ -639,19 +809,25 @@ void RTCState::checkInitRttTimer() { if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { // we didn't received enough probes, restart received_probes_ = 0; - rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); - rtt_probes_->sendProbes(); + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); return; } + init_rtt_ = true; main_path_->roundEnd(); - rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0); - rtt_probes_->sendProbes(); + loss_history_.pushBack(probe_handler_->getProbeLossRate()); + max_loss_rate_ = getMaxLoss(); + + probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); + probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); + probe_handler_->sendProbes(); // init last_seq_nacked_. skip packets that may come from the cache double prod_rate = getProducerRate(); - double rtt = (double)getRTT() / MILLI_IN_A_SEC; + double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; @@ -659,6 +835,68 @@ void RTCState::checkInitRttTimer() { discovered_rtt_callback_(); } +double RTCState::getMaxLoss() { + if (loss_history_.size() != 0) return loss_history_.begin(); + return 0; +} + +core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params; + + switch (ProbeHandler::getProbeType(seq)) { + case ProbeType::INIT: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(probe)); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + case ProbeType::RTT: { + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = probe_pkt->getTimestamp(), + .prod_rate = probe_pkt->getProductionRate(), + .prod_seg = probe_pkt->getProductionSegment(), + }; + break; + } + default: + break; + } + + return params; +} + +core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { + core::ParamsRTC params; + + switch (data.getPayloadType()) { + case core::PayloadType::DATA: { + struct data_packet_t *data_pkt = + (struct data_packet_t *)data.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = data_pkt->getTimestamp(), + .prod_rate = data_pkt->getProductionRate(), + .prod_seg = data.getName().getSuffix(), + }; + break; + } + case core::PayloadType::MANIFEST: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(data)); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + default: + break; + } + + return params; +} + } // namespace rtc } // namespace protocol |