/* * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include namespace transport { namespace protocol { namespace rtc { RTCState::RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service) : loss_history_(10), // log 10sec history indexer_(indexer), probe_handler_(std::make_shared(std::move(probe_callback), io_service)), discovered_rtt_callback_(std::move(discovered_rtt_callback)) { init_rtt_timer_ = std::make_unique(io_service); } RTCState::~RTCState() {} void RTCState::initParams() { // packets counters (total) sent_interests_ = 0; sent_rtx_ = 0; received_data_ = 0; received_nacks_ = 0; received_timeouts_ = 0; received_probes_ = 0; // loss counters packets_lost_ = 0; definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = 0; highest_seq_received_ = 0; highest_seq_received_in_order_ = 0; last_seq_nacked_ = 0; loss_rate_ = 0.0; avg_loss_rate_ = -1.0; last_round_loss_rate_ = 0.0; // loss rate per sec lost_per_sec_ = 0; total_expected_packets_ = 0; per_sec_loss_rate_ = 0.0; // residual losses counters expected_packets_ = 0; packets_sent_to_app_ = 0; rounds_from_last_compute_ = 0; residual_loss_rate_ = 0.0; // fec counters pending_fec_pkt_ = 0; received_fec_pkt_ = 0; // bw counters received_bytes_ = 0; received_fec_bytes_ = 0; recovered_bytes_with_fec_ = 0; avg_packet_size_ = INIT_PACKET_SIZE; production_rate_ = 0.0; received_rate_ = 0.0; fec_recovered_rate_ = 0.0; // nack counter past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; // packets counter received_packets_last_round_ = 0; received_data_last_round_ = 0; received_data_from_cache_ = 0; sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; // round conunters rounds_ = 0; rounds_without_nacks_ = 0; rounds_without_packets_ = 0; last_production_seq_ = 0; producer_is_active_ = false; last_prod_update_seq_ = 0; // paths stats path_table_.clear(); main_path_ = nullptr; edge_path_ = nullptr; // packet cache (not pending anymore) packet_cache_.clear(); // pending interests pending_interests_.clear(); // used to keep track of the skipped interest last_interest_sent_ = 0; // init rtt first_interest_sent_time_ = ~0; first_interest_sent_seq_ = 0; // start probing the producer init_rtt_ = false; probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); } // packet events void RTCState::onSendNewInterest(const core::Name *interest_name) { uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t seq = interest_name->getSuffix(); pending_interests_.insert(std::pair(seq, now)); if (sent_interests_ == 0) { first_interest_sent_time_ = now; first_interest_sent_seq_ = seq; } if (indexer_->isFec(seq)) { pending_fec_pkt_++; } if (last_interest_sent_ == 0 && seq != 0) { last_interest_sent_ = seq; // init last interest sent } // TODO what happen in case of jumps? eraseFromPacketCache( seq); // if we send this interest we don't know its state for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { if (indexer_->isFec(i)) { // only fec packets can be skipped addToPacketCache(i, PacketState::SKIPPED); } } last_interest_sent_ = seq; sent_interests_++; sent_interests_last_round_++; } void RTCState::onTimeout(uint32_t seq, bool lost) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); if (indexer_->isFec(seq)) pending_fec_pkt_--; } received_timeouts_++; if (lost) onPacketLost(seq); } void RTCState::onLossDetected(uint32_t seq) { PacketState state = getPacketState(seq); // if the packet is already marked with a state, do nothing // to be considered lost the packet must be pending if (state == PacketState::UNKNOWN && pending_interests_.find(seq) != pending_interests_.end()) { packets_lost_++; addToPacketCache(seq, PacketState::LOST); } } void RTCState::onRetransmission(uint32_t seq) { // remove the interest for the pendingInterest map only after the first rtx. // in this way we can handle the ooo packets that come in late as normla // packet. we consider a packet lost only if we sent at least an RTX for it. // XXX this may become problematic if we stop the RTX transmissions auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); if (indexer_->isFec(seq)) pending_fec_pkt_--; } sent_rtx_++; sent_rtx_last_round_++; } void RTCState::onPossibleLossWithNoRtx(uint32_t seq) { // if fec is on or rtx is disable we don't need to do anything to recover a // packet. however in both cases we need to remove possible missing packets // from the window of pendinig interest in order to free space without wating // for the timeout. auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); if (indexer_->isFec(seq)) pending_fec_pkt_--; } } void RTCState::onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats) { uint32_t seq = content_object.getName().getSuffix(); if (compute_stats) { updatePathStats(content_object, false); received_data_last_round_++; } received_data_++; packets_sent_to_app_++; core::ParamsRTC params = RTCState::getDataParams(content_object); if (last_prod_update_seq_ < seq) { last_prod_update_seq_ = seq; production_rate_ = (double)params.prod_rate; } updatePacketSize(content_object); updateReceivedBytes(content_object, false); addRecvOrLost(seq, PacketState::RECEIVED); // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; received_packets_last_round_++; } void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); updateReceivedBytes(content_object, true); PacketState state = getPacketState(seq); if (state != PacketState::LOST) { // increase only for not lost packets received_fec_pkt_++; } addRecvOrLost(seq, PacketState::RECEIVED); // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; } void RTCState::onNackPacketReceived(const core::ContentObject &nack, bool compute_stats) { uint32_t seq = nack.getName().getSuffix(); struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); uint32_t production_seq = nack_pkt->getProductionSegment(); uint32_t production_rate = nack_pkt->getProductionRate(); if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || last_prod_update_seq_ < production_seq) { // update production rate last_production_seq_ = production_seq; production_rate_ = (double)production_rate; } if (compute_stats) { // this is not an RTX updatePathStats(nack, true); } // for statistics pourpose we log all nacks, also the one received for // retransmitted packets received_nacks_++; received_nacks_last_round_++; bool to_delete = false; if (production_seq > seq) { // old nack, seq is lost // update last nacked if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; DLOG_IF(INFO, VLOG_IS_ON(3)) << "lost packet " << seq << " beacuse of a past nack"; if (compute_stats) past_nack_on_last_round_ = true; onPacketLost(seq); } else if (seq > production_seq) { // future nack // remove the nack from the pending interest map // (the packet is not received/lost yet) to_delete = true; } else { // this should be a quite rear event. simply remove the // packet from the pending interest list to_delete = true; } if (to_delete) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); if (indexer_->isFec(seq)) pending_fec_pkt_--; } } received_packets_last_round_++; } void RTCState::onPacketLost(uint32_t seq) { if (!indexer_->isFec(seq)) { PacketState state = getPacketState(seq); if (state == PacketState::LOST || (state == PacketState::UNKNOWN && pending_interests_.find(seq) != pending_interests_.end())) { definitely_lost_pkt_++; DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } } addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object, uint64_t rtt) { uint32_t seq = content_object.getName().getSuffix(); packets_sent_to_app_++; // increase the recovered packet counter only if the packet was marked as LOST // before. PacketState state = getPacketState(seq); if (state == PacketState::LOST) losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); updateReceivedBytes(content_object, false); if (rtt == 0) return; // nothing to do uint32_t path_label = content_object.getPathLabel(); auto path_it = path_table_.find(path_label); if (path_it == path_table_.end()) { // this is a new path and it must be a cache std::shared_ptr newPath = std::make_shared(path_label); auto ret = path_table_.insert( std::pair>(path_label, newPath)); path_it = ret.first; } auto path = path_it->second; if (path->pathToProducer()) return; // this packet is coming from a producer // even if we sent an RTX. this may happen // for RTX that are sent too fast or in // case of multipath path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); } void RTCState::onFecPacketRecoveredRtx( const core::ContentObject &content_object) { // This is the same as onPacketRecoveredRtx, but in this is case the // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards losses_recovered_++; updateReceivedBytes(content_object, true); } void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { losses_recovered_++; packets_sent_to_app_++; recovered_bytes_with_fec_ += size; // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where // the packet could be not marked as lost yet. onLossDetected checks if add in // the packet in the lost count or not onLossDetected(seq); addRecvOrLost(seq, PacketState::RECEIVED); } bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); core::ParamsRTC params = RTCState::getProbeParams(probe); bool is_valid = true; uint32_t max = UINT32_MAX; if (params.prod_rate == max) is_valid = false; uint64_t rtt; rtt = probe_handler_->getRtt(seq, is_valid); if (rtt == 0) return false; // this is not a valid probe if (!is_valid) return false; // not a valid probe // if we are here the producer is active producer_is_active_ = true; // Like for data and nacks update the path stats. Here the RTT is computed // by the probe handler. Both probes for rtt and bw are good to estimate // info on the path. uint32_t path_label = probe.getPathLabel(); auto path_it = path_table_.find(path_label); if (path_it == path_table_.end()) { // found a new path std::shared_ptr newPath = std::make_shared(path_label); auto ret = path_table_.insert( std::pair>(path_label, newPath)); path_it = ret.first; } auto path = path_it->second; path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); path->receivedNack(); uint64_t now = utils::SteadyTime::nowMs().count(); int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); if (last_prod_update_seq_ < params.prod_seg) { last_production_seq_ = params.prod_seg; production_rate_ = (double)params.prod_rate; } // check for init RTT. if received_probes_ is equal to 0 schedule a timer to // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't // wait forever received_probes_++; if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { if (received_probes_ == 1) { // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the // others. main_path_ = path; setInitRttTimer(INIT_RTT_PROBE_WAIT); } if (received_probes_ == INIT_RTT_PROBES) { // we are done init_rtt_timer_->cancel(); checkInitRttTimer(); } } received_packets_last_round_++; // ignore probes sent before the first interest if ((now - rtt) <= first_interest_sent_time_) return false; return true; } void RTCState::onJumpForward(uint32_t next_seq) { for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; seq++) { PacketState packet_state = getPacketState(seq); if (packet_state != PacketState::RECEIVED && packet_state != PacketState::DEFINITELY_LOST) { // here we considere the packet as definitely lost whitout increase the // lost packet counter because this loss is not due to the network // condition but the transport wants to skip the packet onPacketLost(seq); } } } void RTCState::onNewRound(double round_len, bool in_sync) { if (path_table_.empty()) return; double bytes_per_sec = ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); if (received_rate_ == 0) received_rate_ = bytes_per_sec; else received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); double fec_bytes_per_sec = ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len)); if (fec_received_rate_ == 0) fec_received_rate_ = fec_bytes_per_sec; else fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec); double fec_recovered_bytes_per_sec = ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len)); if (fec_recovered_rate_ == 0) fec_recovered_rate_ = fec_recovered_bytes_per_sec; else fec_recovered_rate_ = (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); // search for an active path. Is it possible to have multiple path that are // used at the same time. We use as reference path the one from where we gets // more packets. This means that the path should have better lantecy or less // channel losses uint32_t last_round_packets = 0; uint64_t min_edge_rtt = UINT_MAX; std::shared_ptr old_main_path = main_path_; main_path_ = nullptr; edge_path_ = nullptr; for (auto it = path_table_.begin(); it != path_table_.end(); it++) { if (it->second->isValidProducer()) { uint32_t pkt = it->second->getPacketsLastRound(); if (pkt > last_round_packets) { last_round_packets = pkt; main_path_ = it->second; } } else if (it->second->isActive() && !it->second->pathToProducer()) { // this is a path to a cache from where we are receiving content if (it->second->getMinRtt() < min_edge_rtt) { min_edge_rtt = it->second->getMinRtt(); edge_path_ = it->second; } } it->second->roundEnd(); } if (main_path_ == nullptr) main_path_ = old_main_path; if (edge_path_ == nullptr) edge_path_ = main_path_; if (edge_path_->getMinRtt() >= main_path_->getMinRtt()) edge_path_ = main_path_; // in case we get a new main path we reset the stats of the old one. this is // beacuse, in case we need to switch back we don't what to take decisions on // old stats that may be outdated. if (main_path_ != old_main_path) old_main_path->clearRtt(); updateLossRate(in_sync); // handle nacks if (!past_nack_on_last_round_ && received_bytes_ > 0) { rounds_without_nacks_++; } else { rounds_without_nacks_ = 0; } // check if the producer is active if (received_packets_last_round_ != 0) { rounds_without_packets_ = 0; } else { rounds_without_packets_++; if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS && producer_is_active_ != false) { initParams(); } } // reset counters received_bytes_ = 0; received_fec_bytes_ = 0; recovered_bytes_with_fec_ = 0; packets_lost_ = 0; definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = highest_seq_received_; past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; received_packets_last_round_ = 0; received_data_last_round_ = 0; received_data_from_cache_ = 0; sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; received_fec_pkt_ = 0; rounds_++; } void RTCState::updateReceivedBytes(const core::ContentObject &content_object, bool isFec) { if (isFec) { received_fec_bytes_ += (uint32_t)(content_object.headerSize() + content_object.payloadSize()); } else { received_bytes_ += (uint32_t)(content_object.headerSize() + content_object.payloadSize()); } } void RTCState::updatePacketSize(const core::ContentObject &content_object) { uint32_t pkt_size = (uint32_t)(content_object.headerSize() + content_object.payloadSize()); avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) + ((1 - MOVING_AVG_ALPHA) * pkt_size); } void RTCState::updatePathStats(const core::ContentObject &content_object, bool is_nack) { // get packet path uint32_t path_label = content_object.getPathLabel(); auto path_it = path_table_.find(path_label); if (path_it == path_table_.end()) { // found a new path std::shared_ptr newPath = std::make_shared(path_label); auto ret = path_table_.insert( std::pair>(path_label, newPath)); path_it = ret.first; } auto path = path_it->second; // compute rtt uint32_t seq = content_object.getName().getSuffix(); uint64_t interest_sent_time = getInterestSentTime(seq); if (interest_sent_time == 0) return; // this should not happen, // it means that we are processing an interest // that is not pending uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t RTT = now - interest_sent_time; path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false); // compute OWD (the first part of the nack and data packet header are the // same, so we cast to data data packet) core::ParamsRTC params = RTCState::getDataParams(content_object); int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); // compute IAT or set path to producer if (!is_nack) { // compute the iat only for the content packets uint32_t segment_number = content_object.getName().getSuffix(); path->computeInterArrivalGap(segment_number); if (!path->pathToProducer()) received_data_from_cache_++; } else { path->receivedNack(); } } void RTCState::updateLossRate(bool in_sync) { last_round_loss_rate_ = loss_rate_; loss_rate_ = 0.0; uint32_t number_theorically_received_packets_ = highest_seq_received_ - first_seq_in_round_; // XXX this may be quite inefficient if the rate is high // maybe is better to iterate over the set? uint32_t fec_packets = 0; for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) { PacketState state = getPacketState(i); if (state == PacketState::SKIPPED) { if (number_theorically_received_packets_ > 0) number_theorically_received_packets_--; } if (indexer_->isFec(i)) fec_packets++; } if (indexer_->isFec(highest_seq_received_)) fec_packets++; // in this case no new packet was received after the previous round, avoid // division by 0 if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return; if (number_theorically_received_packets_ != 0) loss_rate_ = (double)((double)(packets_lost_) / (double)number_theorically_received_packets_); else // we didn't receive anything except NACKs that triggered losses loss_rate_ = 1.0; if (avg_loss_rate_ == -1.0) avg_loss_rate_ = loss_rate_; else avg_loss_rate_ = avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); // update counters for loss rate per second total_expected_packets_ += number_theorically_received_packets_; lost_per_sec_ += packets_lost_; if (in_sync) { // update counters for residual losses // fec packets are not sent to the app so we don't want to count them here expected_packets_ += ((highest_seq_received_ - first_seq_in_round_) - fec_packets); } else { expected_packets_ = 0; packets_sent_to_app_ = 0; } if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) { // compute loss rate per second if (lost_per_sec_ > total_expected_packets_) lost_per_sec_ = total_expected_packets_; if (total_expected_packets_ == 0) per_sec_loss_rate_ = 0; else per_sec_loss_rate_ = (double)((double)(lost_per_sec_) / (double)total_expected_packets_); loss_history_.pushBack(per_sec_loss_rate_); if (in_sync && expected_packets_ != 0) { // compute residual loss rate if (packets_sent_to_app_ > expected_packets_) { // this may happen if we get packet from the prev bin that get recovered // on the current one packets_sent_to_app_ = expected_packets_; } residual_loss_rate_ = 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_); if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0; } lost_per_sec_ = 0; total_expected_packets_ = 0; expected_packets_ = 0; packets_sent_to_app_ = 0; rounds_from_last_compute_ = 0; } rounds_from_last_compute_++; } void RTCState::dataToBeReceived(uint32_t seq) { addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } void RTCState::updateHighestSeqReceived(uint32_t seq) { if (seq > highest_seq_received_) highest_seq_received_ = seq; } void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); if (indexer_->isFec(seq)) pending_fec_pkt_--; } addToPacketCache(seq, state); // keep track of the last packet received/lost // without holes. if (highest_seq_received_in_order_ < last_seq_nacked_) { highest_seq_received_in_order_ = last_seq_nacked_; } if ((highest_seq_received_in_order_ + 1) == seq) { highest_seq_received_in_order_ = seq; } else if (seq <= highest_seq_received_in_order_) { // here we do nothing } else if (seq > highest_seq_received_in_order_) { // 1) there is a gap in the sequence so we do not update // highest_seq_received_in_order_ // 2) all the packets from highest_seq_received_in_order_ to seq are // received or lost or are fec packetis. In this case we increase // highest_seq_received_in_order_ until we find an hole in the sequence for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { PacketState state = getPacketState(i); if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) { if (indexer_->isFec(i)) { // this is a fec packet and we don't care to receive it // however we may need to increse the number or lost packets // XXX: in case we want to use rtx to recover fec packets, // this may prevent to detect a packet loss and no rtx will be sent if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) { onLossDetected(i); } } else { // this is a data packet and we need to get it break; } } // this packet is in order so we can update the // highest_seq_received_in_order_ highest_seq_received_in_order_ = i; } } } void RTCState::setInitRttTimer(uint32_t wait) { init_rtt_timer_->cancel(); init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); std::weak_ptr self = shared_from_this(); init_rtt_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; if (auto ptr = self.lock()) { ptr->checkInitRttTimer(); } }); } void RTCState::checkInitRttTimer() { if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV || probe_handler_->getProbeLossRate() == 1.0) { // we didn't received enough probes or they were not valid, restart received_probes_ = 0; probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); return; } init_rtt_ = true; main_path_->roundEnd(); loss_history_.pushBack(probe_handler_->getProbeLossRate()); probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); probe_handler_->sendProbes(); // init last_seq_nacked_. skip packets that may come from the cache double prod_rate = getProducerRate(); double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt)); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; discovered_rtt_callback_(); } core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); core::ParamsRTC params; switch (ProbeHandler::getProbeType(seq)) { case ProbeType::INIT: { core::ContentObjectManifest manifest( const_cast(probe).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; } case ProbeType::RTT: { struct nack_packet_t *probe_pkt = (struct nack_packet_t *)probe.getPayload()->data(); params = core::ParamsRTC{ .timestamp = probe_pkt->getTimestamp(), .prod_rate = probe_pkt->getProductionRate(), .prod_seg = probe_pkt->getProductionSegment(), }; break; } default: break; } return params; } core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { core::ParamsRTC params; switch (data.getPayloadType()) { case core::PayloadType::DATA: { struct data_packet_t *data_pkt = (struct data_packet_t *)data.getPayload()->data(); params = core::ParamsRTC{ .timestamp = data_pkt->getTimestamp(), .prod_rate = data_pkt->getProductionRate(), .prod_seg = data.getName().getSuffix(), }; break; } case core::PayloadType::MANIFEST: { core::ContentObjectManifest manifest( const_cast(data).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; } default: break; } return params; } } // namespace rtc } // namespace protocol } // namespace transport