From a1ac96f497719b897793ac14b287cb8d840651c1 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Fri, 22 Apr 2022 17:55:01 +0200 Subject: HICN-722: Updates on transport, RTC, manifest usage for RTC, infra. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mauro Sardara Co-authored-by: Jordan Augé Co-authored-by: Michele Papalini Co-authored-by: Angelo Mantellini Co-authored-by: Jacques Samain Co-authored-by: Olivier Roques Co-authored-by: Enrico Loparco Co-authored-by: Giulio Grassi manifest: optimize manifest processing manifest: add FEC parameters to manifests manifest: refactor verification process manifest: report auth alerts in hiperf instead of aborting manifest: remove FEC buffer callback in consumer manifest: refactor and enable manifests by default manifest: update manifest header with transport parameters manifest: batch interests for first manifest from RTC producer manifest: refactor processing of RTC manifests manifest: update manifest-related socket options of consumers manifest: update unit tests for manifests manifest: pack manifest headers manifest: verify FEC packets auth: add consumer socket option to set max unverified delay manifest: process manifests after full FEC decoding manifest: manage forward jumps in RTC verifier fec: remove useless fec codes rs: add new code rate rs: add new code rate rs: add new code rate rs: add new code rate libtransport: increase internal packet cache size remove internal cisco info in cmake manifest: add option to set manifest capacity data_input_node.c: add information about adj_index[VLIB_RX] on received data packetsi sysrepo plugin: update build Change-Id: I0cf64d91bd0a1b7cad4eeaa9871f58f5f10434af Signed-off-by: Mauro Sardara Signed-off-by: Luca Muscariello --- libtransport/src/protocols/rtc/rtc_state.cc | 138 ++++++++++------------------ 1 file changed, 47 insertions(+), 91 deletions(-) (limited to 'libtransport/src/protocols/rtc/rtc_state.cc') diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 6a21531f8..5b3b5e4c3 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -56,7 +56,6 @@ void RTCState::initParams() { last_seq_nacked_ = 0; loss_rate_ = 0.0; avg_loss_rate_ = -1.0; - max_loss_rate_ = 0.0; last_round_loss_rate_ = 0.0; // loss rate per sec @@ -85,14 +84,13 @@ void RTCState::initParams() { fec_recovered_rate_ = 0.0; // nack counter - nack_on_last_round_ = false; + past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; // packets counter received_packets_last_round_ = 0; received_data_last_round_ = 0; received_data_from_cache_ = 0; - data_from_cache_rate_ = 0; sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; @@ -103,7 +101,7 @@ void RTCState::initParams() { last_production_seq_ = 0; producer_is_active_ = false; - last_prod_update_ = 0; + last_prod_update_seq_ = 0; // paths stats path_table_.clear(); @@ -180,7 +178,9 @@ void RTCState::onLossDetected(uint32_t seq) { PacketState state = getPacketState(seq); // if the packet is already marked with a state, do nothing - if (state == PacketState::UNKNOWN) { + // to be considered lost the packet must be pending + if (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end()) { packets_lost_++; addToPacketCache(seq, PacketState::LOST); } @@ -225,8 +225,8 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, core::ParamsRTC params = RTCState::getDataParams(content_object); - if (last_prod_update_ < params.timestamp) { - last_prod_update_ = params.timestamp; + if (last_prod_update_seq_ < seq) { + last_prod_update_seq_ = seq; production_rate_ = (double)params.prod_rate; } @@ -267,14 +267,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, uint32_t seq = nack.getName().getSuffix(); struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); - uint64_t production_time = nack_pkt->getTimestamp(); uint32_t production_seq = nack_pkt->getProductionSegment(); uint32_t production_rate = nack_pkt->getProductionRate(); if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || - last_prod_update_ < production_time) { + last_prod_update_seq_ < production_seq) { // update production rate - last_prod_update_ = production_time; last_production_seq_ = production_seq; production_rate_ = (double)production_rate; } @@ -282,7 +280,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, if (compute_stats) { // this is not an RTX updatePathStats(nack, true); - nack_on_last_round_ = true; } // for statistics pourpose we log all nacks, also the one received for @@ -297,6 +294,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; DLOG_IF(INFO, VLOG_IS_ON(3)) << "lost packet " << seq << " beacuse of a past nack"; + if (compute_stats) past_nack_on_last_round_ = true; onPacketLost(seq); } else if (seq > production_seq) { // future nack @@ -317,29 +315,15 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, } } - // the producer is responding - // we consider it active only if the production rate is not 0 - // or the production sequence number is not 1 - if (production_rate_ != 0 || production_seq != 1) { - producer_is_active_ = true; - } - received_packets_last_round_++; } void RTCState::onPacketLost(uint32_t seq) { -#if 0 - DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost"; - auto it = pending_interests_.find(seq); - if (it != pending_interests_.end()) { - // this packet was never retransmitted so it does - // not appear in the loss count - packets_lost_++; - } -#endif if (!indexer_->isFec(seq)) { PacketState state = getPacketState(seq); - if (state == PacketState::LOST || state == PacketState::UNKNOWN) { + if (state == PacketState::LOST || + (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end())) { definitely_lost_pkt_++; DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } @@ -350,7 +334,12 @@ void RTCState::onPacketLost(uint32_t seq) { void RTCState::onPacketRecoveredRtx(uint32_t seq) { packets_sent_to_app_++; if (seq > highest_seq_received_) highest_seq_received_ = seq; - losses_recovered_++; + + // increase the recovered packet counter only if the packet was marked as LOST + // before. + PacketState state = getPacketState(seq); + if (state == PacketState::LOST) losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); } @@ -371,19 +360,30 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where - if (getPacketState(seq) == PacketState::UNKNOWN) - onLossDetected(seq); // the pkt was lost but didn't account for it yet + // the packet could be not marked as lost yet. onLossDetected checks if add in + // the packet in the lost count or not + onLossDetected(seq); addRecvOrLost(seq, PacketState::RECEIVED); } bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params = RTCState::getProbeParams(probe); + + bool is_valid = true; + uint32_t max = UINT32_MAX; + if (params.prod_rate == max) is_valid = false; uint64_t rtt; - rtt = probe_handler_->getRtt(seq); + rtt = probe_handler_->getRtt(seq, is_valid); if (rtt == 0) return false; // this is not a valid probe + if (!is_valid) return false; // not a valid probe + + // if we are here the producer is active + producer_is_active_ = true; + // Like for data and nacks update the path stats. Here the RTT is computed // by the probe handler. Both probes for rtt and bw are good to estimate // info on the path. @@ -406,24 +406,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint64_t now = utils::SteadyTime::nowMs().count(); - core::ParamsRTC params = RTCState::getProbeParams(probe); - int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); - if (last_prod_update_ < params.timestamp) { + if (last_prod_update_seq_ < params.prod_seg) { last_production_seq_ = params.prod_seg; - last_prod_update_ = params.timestamp; production_rate_ = (double)params.prod_rate; } - // the producer is responding - // we consider it active only if the production rate is not 0 - // or the production sequence numner is not 1 - if (production_rate_ != 0 || params.prod_seg != 1) { - producer_is_active_ = true; - } - // check for init RTT. if received_probes_ is equal to 0 schedule a timer to // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't // wait forever @@ -453,12 +443,12 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { void RTCState::onJumpForward(uint32_t next_seq) { for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; seq++) { - auto it = pending_interests_.find(seq); PacketState packet_state = getPacketState(seq); - if (it == pending_interests_.end() && - packet_state != PacketState::RECEIVED && + if (packet_state != PacketState::RECEIVED && packet_state != PacketState::DEFINITELY_LOST) { - onLossDetected(seq); + // here we considere the packet as definitely lost whitout increase the + // lost packet counter because this loss is not due to the network + // condition but the transport wants to skip the packet onPacketLost(seq); } } @@ -491,29 +481,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) { fec_recovered_rate_ = (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); -#if 0 - // search for an active path. There should be only one active path (meaning a - // path that leads to the producer socket -no cache- and from which we are - // currently getting data packets) at any time. However it may happen that - // there are mulitple active paths in case of mobility (the old path will - // remain active for a short ammount of time). The main path is selected as - // the active path from where the consumer received the latest data packet - - uint64_t last_packet_ts = 0; - main_path_ = nullptr; - - for (auto it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->roundEnd(); - if (it->second->isActive()) { - uint64_t ts = it->second->getLastPacketTS(); - if (ts > last_packet_ts) { - last_packet_ts = ts; - main_path_ = it->second; - } - } - } -#endif - // search for an active path. Is it possible to have multiple path that are // used at the same time. We use as reference path the one from where we gets // more packets. This means that the path should have better lantecy or less @@ -544,7 +511,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { updateLossRate(in_sync); // handle nacks - if (!nack_on_last_round_ && received_bytes_ > 0) { + if (!past_nack_on_last_round_ && received_bytes_ > 0) { rounds_without_nacks_++; } else { rounds_without_nacks_ = 0; @@ -561,14 +528,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } - // compute cache/producer ratio - if (received_data_last_round_ != 0) { - double new_rate = - (double)received_data_from_cache_ / (double)received_data_last_round_; - data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA + - (new_rate * (1 - MOVING_AVG_ALPHA)); - } - // reset counters received_bytes_ = 0; received_fec_bytes_ = 0; @@ -578,7 +537,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { losses_recovered_ = 0; first_seq_in_round_ = highest_seq_received_; - nack_on_last_round_ = false; + past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; received_packets_last_round_ = 0; @@ -700,6 +659,7 @@ void RTCState::updateLossRate(bool in_sync) { expected_packets_ += ((highest_seq_received_ - first_seq_in_round_) - fec_packets); } else { + expected_packets_ = 0; packets_sent_to_app_ = 0; } @@ -715,7 +675,6 @@ void RTCState::updateLossRate(bool in_sync) { (double)((double)(lost_per_sec_) / (double)total_expected_packets_); loss_history_.pushBack(per_sec_loss_rate_); - max_loss_rate_ = getMaxLoss(); if (in_sync && expected_packets_ != 0) { // compute residual loss rate @@ -778,7 +737,9 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { // however we may need to increse the number or lost packets // XXX: in case we want to use rtx to recover fec packets, // this may prevent to detect a packet loss and no rtx will be sent - onLossDetected(i); + if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) { + onLossDetected(i); + } } else { // this is a data packet and we need to get it break; @@ -806,8 +767,9 @@ void RTCState::setInitRttTimer(uint32_t wait) { } void RTCState::checkInitRttTimer() { - if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { - // we didn't received enough probes, restart + if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV || + probe_handler_->getProbeLossRate() == 1.0) { + // we didn't received enough probes or they were not valid, restart received_probes_ = 0; probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); @@ -819,7 +781,6 @@ void RTCState::checkInitRttTimer() { init_rtt_ = true; main_path_->roundEnd(); loss_history_.pushBack(probe_handler_->getProbeLossRate()); - max_loss_rate_ = getMaxLoss(); probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); @@ -829,17 +790,12 @@ void RTCState::checkInitRttTimer() { double prod_rate = getProducerRate(); double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); - uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt)); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; discovered_rtt_callback_(); } -double RTCState::getMaxLoss() { - if (loss_history_.size() != 0) return loss_history_.begin(); - return 0; -} - core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); core::ParamsRTC params; -- cgit 1.2.3-korg