diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.h')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc_state.h | 150 |
1 files changed, 124 insertions, 26 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index 729ba7a1b..8bf48ccc2 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,13 +14,16 @@ */ #pragma once +#include <core/facade.h> #include <hicn/transport/config.h> #include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> +#include <hicn/transport/utils/rtc_quality_score.h> #include <protocols/indexer.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_data_path.h> +#include <utils/max_filter.h> #include <map> #include <set> @@ -31,25 +34,50 @@ namespace protocol { namespace rtc { -enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN }; +// packet state +// RECEIVED: the packet was already received +// LOST: the packet is marked as lost but can be recovered +// DEFINITELY_LOST: the packet is lost and cannot be recovered +// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec +// decoder may decide to send the packet directly to the app. to avoid +// duplicated the packet is marked with this state +// SKIPPED: an interest that was not sent, only for FEC packets +// UNKNOWN: unknown state +enum class PacketState : uint8_t { + RECEIVED, + TO_BE_RECEIVED, + LOST, + DEFINITELY_LOST, + SKIPPED, + UNKNOWN +}; + +class RTCState : public std::enable_shared_from_this<RTCState> { + using PendingInterestsMap = std::map<uint32_t, uint64_t>; + + private: + const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small + // for high rate apps -class RTCState : std::enable_shared_from_this<RTCState> { public: using DiscoveredRttCallback = std::function<void()>; public: - RTCState(Indexer *indexer, - ProbeHandler::SendProbeCallback &&rtt_probes_callback, + RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service); ~RTCState(); + // initialization + void initParams(); + // packet events void onSendNewInterest(const core::Name *interest_name); void onTimeout(uint32_t seq, bool lost); void onLossDetected(uint32_t seq); void onRetransmission(uint32_t seq); + void onPossibleLossWithNoRtx(uint32_t seq); void onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats); void onFecPacketReceived(const core::ContentObject &content_object); @@ -57,8 +85,10 @@ class RTCState : std::enable_shared_from_this<RTCState> { bool compute_stats); void onPacketLost(uint32_t seq); void onPacketRecoveredRtx(uint32_t seq); - void onPacketRecoveredFec(uint32_t seq); + void onFecPacketRecoveredRtx(uint32_t seq); + void onPacketRecoveredFec(uint32_t seq, uint32_t size); bool onProbePacketReceived(const core::ContentObject &probe); + void onJumpForward(uint32_t next_seq); // protocol state void onNewRound(double round_len, bool in_sync); @@ -72,10 +102,21 @@ class RTCState : std::enable_shared_from_this<RTCState> { // delay metrics bool isRttDiscovered() const { return init_rtt_; } - uint64_t getRTT() const { + uint64_t getMinRTT() const { if (mainPathIsValid()) return main_path_->getMinRtt(); return 0; } + + uint64_t getAvgRTT() const { + if (mainPathIsValid()) return main_path_->getAvgRtt(); + return 0; + } + + uint64_t getMaxRTT() const { + if (mainPathIsValid()) return main_path_->getMaxRtt(); + return 0; + } + void resetRttStats() { if (mainPathIsValid()) main_path_->clearRtt(); } @@ -98,6 +139,7 @@ class RTCState : std::enable_shared_from_this<RTCState> { uint64_t getInterestSentTime(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) return it->second; + return 0; } @@ -110,14 +152,15 @@ class RTCState : std::enable_shared_from_this<RTCState> { return pending_interests_.size(); } - PacketState isReceivedOrLost(uint32_t seq) { - auto it = received_or_lost_packets_.find(seq); - if (it != received_or_lost_packets_.end()) return it->second; + PacketState getPacketState(uint32_t seq) { + auto it = packet_cache_.find(seq); + if (it != packet_cache_.end()) return it->second; return PacketState::UNKNOWN; } // loss rate - double getLossRate() const { return loss_rate_; } + double getPerRoundLossRate() const { return loss_rate_; } + double getPerSecondLossRate() const { return per_sec_loss_rate_; } double getAvgLossRate() const { return avg_loss_rate_; } double getMaxLossRate() const { return max_loss_rate_; } double getLastRoundLossRate() const { return last_round_loss_rate_; } @@ -134,15 +177,22 @@ class RTCState : std::enable_shared_from_this<RTCState> { return highest_seq_received_in_order_; } + double getMaxLoss(); + // fec packets uint32_t getReceivedFecPackets() const { return received_fec_pkt_; } uint32_t getPendingFecPackets() const { return pending_fec_pkt_; } // generic stats uint32_t getReceivedBytesInRound() const { return received_bytes_; } + uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; } + uint32_t getRecoveredFecBytesInRound() const { + return recovered_bytes_with_fec_; + } uint32_t getReceivedNacksInRound() const { return received_nacks_last_round_; } + uint32_t getReceivedDataInRound() const { return received_data_last_round_; } uint32_t getSentInterestInRound() const { return sent_interests_last_round_; } uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; } @@ -150,6 +200,9 @@ class RTCState : std::enable_shared_from_this<RTCState> { double getAvailableBw() const { return 0.0; }; // TODO double getProducerRate() const { return production_rate_; } double getReceivedRate() const { return received_rate_; } + double getReceivedFecRate() const { return fec_received_rate_; } + double getRecoveredFecRate() const { return fec_recovered_rate_; } + double getAveragePacketSize() const { return avg_packet_size_; } // nacks @@ -162,22 +215,46 @@ class RTCState : std::enable_shared_from_this<RTCState> { // packets from cache double getPacketFromCacheRatio() const { return data_from_cache_rate_; } - std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() { + PendingInterestsMap::iterator getPendingInterestsMapBegin() { return pending_interests_.begin(); } - std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() { + PendingInterestsMap::iterator getPendingInterestsMapEnd() { return pending_interests_.end(); } + // quality + uint8_t getQualityScore() { + uint8_t qs = quality_score_.getQualityScore( + getMaxRTT(), std::round(getResidualLossRate() * 100)); + return qs; + } + + // We received a data pkt that will be set to RECEIVED, but first we have to + // go through FEC. We do not want to consider this pkt as recovered, thus we + // set it as TO_BE_RECEIVED. + void dataToBeReceived(uint32_t seq); + + // Extract RTC parameters from probes (init or RTT probes) and data packets. + static core::ParamsRTC getProbeParams(const core::ContentObject &probe); + static core::ParamsRTC getDataParams(const core::ContentObject &data); + private: - void initParams(); + void addToPacketCache(uint32_t seq, PacketState state) { + // this function adds or updates the current state + if (packet_cache_.size() >= MAX_CACHED_PACKETS) { + packet_cache_.erase(packet_cache_.begin()); + } + packet_cache_[seq] = state; + } + + void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); } // update stats void updateState(); void updateReceivedBytes(const core::ContentObject &content_object); void updatePacketSize(const core::ContentObject &content_object); void updatePathStats(const core::ContentObject &content_object, bool is_nack); - void updateLossRate(); + void updateLossRate(bool in_sycn); void addRecvOrLost(uint32_t seq, PacketState state); @@ -211,22 +288,39 @@ class RTCState : std::enable_shared_from_this<RTCState> { double avg_loss_rate_; double max_loss_rate_; double last_round_loss_rate_; + utils::MaxFilter<double> loss_history_; + + // per second loss rate + uint32_t lost_per_sec_; + uint32_t total_expected_packets_; + double per_sec_loss_rate_; + + // conunters for residual losses + // residual losses are computed every second and are used + // as feedback to the upper levels (e.g application) + uint32_t expected_packets_; + uint32_t packets_sent_to_app_; + uint32_t rounds_from_last_compute_; double residual_loss_rate_; // bw counters uint32_t received_bytes_; + uint32_t received_fec_bytes_; + uint32_t recovered_bytes_with_fec_; double avg_packet_size_; - double production_rate_; // rate communicated by the producer using nacks - double received_rate_; // rate recevied by the consumer + double production_rate_; // rate communicated by the producer using nacks + double received_rate_; // rate recevied by the consumer (only data) + double fec_received_rate_; // fec rate recevied by the consumer + double fec_recovered_rate_; // rate recovered using fec - // nack counter + // nack counters // the bool takes tracks only about the valid nacks (no rtx) and it is used to // switch between the states. Instead received_nacks_last_round_ logs all the // nacks for statistics bool nack_on_last_round_; uint32_t received_nacks_last_round_; - // packets counter + // packets counters uint32_t received_packets_last_round_; uint32_t received_data_last_round_; uint32_t received_data_from_cache_; @@ -234,11 +328,11 @@ class RTCState : std::enable_shared_from_this<RTCState> { uint32_t sent_interests_last_round_; uint32_t sent_rtx_last_round_; - // fec counter + // fec counters uint32_t received_fec_pkt_; uint32_t pending_fec_pkt_; - // round conunters + // round counters uint32_t rounds_; uint32_t rounds_without_nacks_; uint32_t rounds_without_packets_; @@ -261,23 +355,27 @@ class RTCState : std::enable_shared_from_this<RTCState> { // packet received // cache where to store info about the last MAX_CACHED_PACKETS - std::map<uint32_t, PacketState> received_or_lost_packets_; + // these are packets that are received or lost or definitely lost and are not + // anymore in the pending intetest list + std::map<uint32_t, PacketState> packet_cache_; // pending interests - std::map<uint32_t, uint64_t> pending_interests_; + PendingInterestsMap pending_interests_; // indexer Indexer *indexer_; - // skipped interests + // used to keep track of the skipped interests uint32_t last_interest_sent_; - std::unordered_set<uint32_t> skipped_interests_; // probes - std::shared_ptr<ProbeHandler> rtt_probes_; + std::shared_ptr<ProbeHandler> probe_handler_; bool init_rtt_; std::unique_ptr<asio::steady_timer> init_rtt_timer_; + // quality score + RTCQualityScore quality_score_; + // callbacks DiscoveredRttCallback discovered_rtt_callback_; }; |