aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_state.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.h')
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h207
1 files changed, 164 insertions, 43 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 729ba7a1b..ac3cc621f 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,13 +14,16 @@
*/
#pragma once
+#include <core/facade.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
+#include <hicn/transport/utils/rtc_quality_score.h>
#include <protocols/indexer.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_data_path.h>
+#include <utils/max_filter.h>
#include <map>
#include <set>
@@ -31,34 +34,62 @@ namespace protocol {
namespace rtc {
-enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
+// packet state
+// RECEIVED: the packet was already received
+// LOST: the packet is marked as lost but can be recovered
+// DEFINITELY_LOST: the packet is lost and cannot be recovered
+// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec
+// decoder may decide to send the packet directly to the app. to avoid
+// duplicated the packet is marked with this state
+// SKIPPED: an interest that was not sent, only for FEC packets
+// UNKNOWN: unknown state
+enum class PacketState : uint8_t {
+ RECEIVED,
+ TO_BE_RECEIVED,
+ LOST,
+ DEFINITELY_LOST,
+ SKIPPED,
+ UNKNOWN
+};
+
+class RTCState : public std::enable_shared_from_this<RTCState> {
+ using PendingInterestsMap = std::map<uint32_t, uint64_t>;
+
+ private:
+ const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small
+ // for high rate apps
-class RTCState : std::enable_shared_from_this<RTCState> {
public:
using DiscoveredRttCallback = std::function<void()>;
public:
- RTCState(Indexer *indexer,
- ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service);
~RTCState();
+ // initialization
+ void initParams();
+
// packet events
void onSendNewInterest(const core::Name *interest_name);
void onTimeout(uint32_t seq, bool lost);
void onLossDetected(uint32_t seq);
void onRetransmission(uint32_t seq);
+ void onPossibleLossWithNoRtx(uint32_t seq);
void onDataPacketReceived(const core::ContentObject &content_object,
bool compute_stats);
void onFecPacketReceived(const core::ContentObject &content_object);
void onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats);
void onPacketLost(uint32_t seq);
- void onPacketRecoveredRtx(uint32_t seq);
- void onPacketRecoveredFec(uint32_t seq);
+ void onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt);
+ void onFecPacketRecoveredRtx(const core::ContentObject &content_object);
+ void onPacketRecoveredFec(uint32_t seq, uint32_t size);
bool onProbePacketReceived(const core::ContentObject &probe);
+ void onJumpForward(uint32_t next_seq);
// protocol state
void onNewRound(double round_len, bool in_sync);
@@ -72,10 +103,26 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// delay metrics
bool isRttDiscovered() const { return init_rtt_; }
- uint64_t getRTT() const {
+ uint64_t getMinRTT() const {
if (mainPathIsValid()) return main_path_->getMinRtt();
return 0;
}
+
+ uint64_t getAvgRTT() const {
+ if (mainPathIsValid()) return main_path_->getAvgRtt();
+ return 0;
+ }
+
+ uint64_t getMaxRTT() const {
+ if (mainPathIsValid()) return main_path_->getMaxRtt();
+ return 0;
+ }
+
+ uint64_t getEdgeRtt() const {
+ if (edge_path_ != nullptr) return edge_path_->getMinRtt();
+ return 0;
+ }
+
void resetRttStats() {
if (mainPathIsValid()) main_path_->clearRtt();
}
@@ -98,6 +145,7 @@ class RTCState : std::enable_shared_from_this<RTCState> {
uint64_t getInterestSentTime(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) return it->second;
+
return 0;
}
@@ -107,19 +155,24 @@ class RTCState : std::enable_shared_from_this<RTCState> {
}
uint32_t getPendingInterestNumber() const {
- return pending_interests_.size();
+ return (uint32_t)pending_interests_.size();
}
- PacketState isReceivedOrLost(uint32_t seq) {
- auto it = received_or_lost_packets_.find(seq);
- if (it != received_or_lost_packets_.end()) return it->second;
+ PacketState getPacketState(uint32_t seq) {
+ auto it = packet_cache_.find(seq);
+ if (it != packet_cache_.end()) return it->second;
return PacketState::UNKNOWN;
}
// loss rate
- double getLossRate() const { return loss_rate_; }
+ double getPerRoundLossRate() const { return loss_rate_; }
+ double getPerSecondLossRate() const { return per_sec_loss_rate_; }
double getAvgLossRate() const { return avg_loss_rate_; }
- double getMaxLossRate() const { return max_loss_rate_; }
+ double getMaxLossRate() const {
+ if (loss_history_.size() != 0) return loss_history_.begin();
+ return 0;
+ }
+
double getLastRoundLossRate() const { return last_round_loss_rate_; }
double getResidualLossRate() const { return residual_loss_rate_; }
@@ -140,9 +193,14 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// generic stats
uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; }
+ uint32_t getRecoveredFecBytesInRound() const {
+ return recovered_bytes_with_fec_;
+ }
uint32_t getReceivedNacksInRound() const {
return received_nacks_last_round_;
}
+ uint32_t getReceivedDataInRound() const { return received_data_last_round_; }
uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
@@ -150,6 +208,9 @@ class RTCState : std::enable_shared_from_this<RTCState> {
double getAvailableBw() const { return 0.0; }; // TODO
double getProducerRate() const { return production_rate_; }
double getReceivedRate() const { return received_rate_; }
+ double getReceivedFecRate() const { return fec_received_rate_; }
+ double getRecoveredFecRate() const { return fec_recovered_rate_; }
+
double getAveragePacketSize() const { return avg_packet_size_; }
// nacks
@@ -160,24 +221,57 @@ class RTCState : std::enable_shared_from_this<RTCState> {
bool isProducerActive() const { return producer_is_active_; }
// packets from cache
- double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+ // this should be called at the end of a round beacuse otherwise we may have
+ // not enough packets to get a good stat
+ double getPacketFromCacheRatio() const {
+ if (received_data_last_round_ == 0) return 0;
+ return (double)received_data_from_cache_ /
+ (double)received_data_last_round_;
+ }
- std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() {
+ PendingInterestsMap::iterator getPendingInterestsMapBegin() {
return pending_interests_.begin();
}
- std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() {
+ PendingInterestsMap::iterator getPendingInterestsMapEnd() {
return pending_interests_.end();
}
+ // quality
+ uint8_t getQualityScore() {
+ uint8_t qs = quality_score_.getQualityScore(
+ getMaxRTT(), std::round(getResidualLossRate() * 100));
+ return qs;
+ }
+
+ // We received a data pkt that will be set to RECEIVED, but first we have to
+ // go through FEC. We do not want to consider this pkt as recovered, thus we
+ // set it as TO_BE_RECEIVED.
+ void dataToBeReceived(uint32_t seq);
+
+ void updateHighestSeqReceived(uint32_t seq);
+
+ // Extract RTC parameters from probes (init or RTT probes) and data packets.
+ static core::ParamsRTC getProbeParams(const core::ContentObject &probe);
+ static core::ParamsRTC getDataParams(const core::ContentObject &data);
+
private:
- void initParams();
+ void addToPacketCache(uint32_t seq, PacketState state) {
+ // this function adds or updates the current state
+ if (packet_cache_.size() >= MAX_CACHED_PACKETS) {
+ packet_cache_.erase(packet_cache_.begin());
+ }
+ packet_cache_[seq] = state;
+ }
+
+ void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); }
// update stats
void updateState();
- void updateReceivedBytes(const core::ContentObject &content_object);
+ void updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec);
void updatePacketSize(const core::ContentObject &content_object);
void updatePathStats(const core::ContentObject &content_object, bool is_nack);
- void updateLossRate();
+ void updateLossRate(bool in_sycn);
void addRecvOrLost(uint32_t seq, PacketState state);
@@ -209,36 +303,51 @@ class RTCState : std::enable_shared_from_this<RTCState> {
uint32_t last_seq_nacked_; // segment for which we got an oldNack
double loss_rate_;
double avg_loss_rate_;
- double max_loss_rate_;
double last_round_loss_rate_;
+ utils::MaxFilter<double> loss_history_;
+
+ // per second loss rate
+ uint32_t lost_per_sec_;
+ uint32_t total_expected_packets_;
+ double per_sec_loss_rate_;
+
+ // conunters for residual losses
+ // residual losses are computed every second and are used
+ // as feedback to the upper levels (e.g application)
+ uint32_t expected_packets_;
+ uint32_t packets_sent_to_app_;
+ uint32_t rounds_from_last_compute_;
double residual_loss_rate_;
// bw counters
uint32_t received_bytes_;
+ uint32_t received_fec_bytes_;
+ uint32_t recovered_bytes_with_fec_;
double avg_packet_size_;
- double production_rate_; // rate communicated by the producer using nacks
- double received_rate_; // rate recevied by the consumer
-
- // nack counter
- // the bool takes tracks only about the valid nacks (no rtx) and it is used to
- // switch between the states. Instead received_nacks_last_round_ logs all the
- // nacks for statistics
- bool nack_on_last_round_;
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer (only data)
+ double fec_received_rate_; // fec rate recevied by the consumer
+ double fec_recovered_rate_; // rate recovered using fec
+
+ // nack counters
+ // the bool takes tracks only about the valid past nacks (no rtx) and it is
+ // used to switch between the states. Instead received_nacks_last_round_ logs
+ // all the nacks for statistics
+ bool past_nack_on_last_round_;
uint32_t received_nacks_last_round_;
- // packets counter
+ // packets counters
uint32_t received_packets_last_round_;
uint32_t received_data_last_round_;
uint32_t received_data_from_cache_;
- double data_from_cache_rate_;
uint32_t sent_interests_last_round_;
uint32_t sent_rtx_last_round_;
- // fec counter
+ // fec counters
uint32_t received_fec_pkt_;
uint32_t pending_fec_pkt_;
- // round conunters
+ // round counters
uint32_t rounds_;
uint32_t rounds_without_nacks_;
uint32_t rounds_without_packets_;
@@ -250,34 +359,46 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// producer state
bool
producer_is_active_; // the prodcuer is active if we receive some packets
- uint32_t
- last_production_seq_; // last production seq received by the producer
- uint64_t last_prod_update_; // timestamp of the last packets used to update
- // stats from the producer
+ uint32_t last_production_seq_; // last production seq received by the
+ // producer used to init the sync protcol
+ uint32_t last_prod_update_seq_; // seq number of the last packet used to
+ // update the update from the producer.
+ // assumption: the highest seq number carries
+ // the most up to date info. in case of
+ // probes we look at the produced seq number
// paths stats
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
- std::shared_ptr<RTCDataPath> main_path_;
+ std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects
+ // the consumer to the producer. in
+ // case of multipath the trasnport
+ // uses the most active path
+ std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it
+ // exists
// packet received
// cache where to store info about the last MAX_CACHED_PACKETS
- std::map<uint32_t, PacketState> received_or_lost_packets_;
+ // these are packets that are received or lost or definitely lost and are not
+ // anymore in the pending intetest list
+ std::map<uint32_t, PacketState> packet_cache_;
// pending interests
- std::map<uint32_t, uint64_t> pending_interests_;
+ PendingInterestsMap pending_interests_;
// indexer
Indexer *indexer_;
- // skipped interests
+ // used to keep track of the skipped interests
uint32_t last_interest_sent_;
- std::unordered_set<uint32_t> skipped_interests_;
// probes
- std::shared_ptr<ProbeHandler> rtt_probes_;
+ std::shared_ptr<ProbeHandler> probe_handler_;
bool init_rtt_;
std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+ // quality score
+ RTCQualityScore quality_score_;
+
// callbacks
DiscoveredRttCallback discovered_rtt_callback_;
};