aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc125
-rw-r--r--libtransport/src/protocols/rtc/rtc.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h76
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc13
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h5
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h6
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc73
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h4
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h64
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.cc2
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc197
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h35
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc17
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc4
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc80
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h24
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc157
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h81
28 files changed, 602 insertions, 487 deletions
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index 6a84914ab..60eceeb19 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_consts.h>
@@ -64,7 +65,7 @@ double ProbeHandler::getProbeLossRate() {
}
void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) {
- assert(min <= max && min >= MIN_PROBE_SEQ);
+ DCHECK(min <= max && min >= MIN_PROBE_SEQ);
distr_ = std::uniform_int_distribution<uint32_t>(min, max);
}
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index d2682edfa..9a56269f3 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
new RtcReassembly(icn_socket, this)),
+ max_aggregated_interest_(1),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
round_timer_ =
@@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() {
TransportProtocol::resume();
}
-std::size_t RTCTransportProtocol::transportHeaderLength() {
+std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) {
return DATA_HEADER_SIZE +
- (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0);
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0);
}
// private
@@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() {
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t unverified_interval;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
- unverified_interval);
+ uint32_t factor_relevant;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ factor_relevant);
- double unverified_ratio;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
- unverified_ratio);
+ uint32_t factor_alert;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ factor_alert);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() {
}
});
- verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
- unverified_ratio);
+ verifier_ =
+ std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() {
last_interest_sent_time_ = 0;
last_interest_sent_seq_ = 0;
-#if 0
- if(portal_->isConnectedToFwd()){
- max_aggregated_interest_ = 1;
- }else{
- max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
- }
-#else
- max_aggregated_interest_ = 1;
- if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) {
- LOG(INFO) << "Max Aggregated: " << max_aggr;
- max_aggregated_interest_ = std::stoul(std::string(max_aggr));
+ // Aggregated interests setup
+ bool aggregated_interests_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS,
+ aggregated_interests_on);
+ if (aggregated_interests_on) {
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS"))
+ max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr));
+ else
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+
+ max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_,
+ 1 + MAX_SUFFIXES_IN_MANIFEST);
}
-#endif
+ LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_;
max_sent_int_ =
std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
@@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() {
socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
ldr_->changeRecoveryStrategy(
(interface::RtcTransportRecoveryStrategies)strategy);
+
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) ldr_->setContentSharingMode();
ldr_->turnOnRecovery();
ldr_->onNewRound(false);
@@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() {
Name *name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
Prefix prefix(*name, 128);
- if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BEST_PATH);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_REPLICATION) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::REPLICATION);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_ALL_FWD_STRATEGIES) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BOTH);
- }
-
+ fwd_strategy_.initFwdStrategy(
+ portal_, prefix, state_.get(),
+ (interface::RtcTransportRecoveryStrategies)strategy);
updateSyncWindow();
}
@@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
return;
}
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE))
+ production_rate = MIN_PROD_RATE_SHARING_MODE;
+
production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
uint32_t lifetime = default_values::interest_lifetime;
@@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() {
double prod_rate = state_->getProducerRate();
double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = state_->getAveragePacketSize();
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE))
+ prod_rate = MIN_PROD_RATE_SHARING_MODE;
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
@@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
sendInterest(*interest_name);
}
+void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ // we got a timeout for this packet so it is not pending anymore
+ interest_name->setSuffix(seq);
+ state_->onSendNewInterest(interest_name);
+ sendInterest(*interest_name);
+}
+
void RTCTransportProtocol::scheduleNextInterests() {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
@@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
// skip received packets
- if (indexer_verifier_->checkNextSuffix() <=
- state_->getHighestSeqReceivedInOrder()) {
- indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
+ uint32_t max_received = state_->getHighestSeqReceivedInOrder();
+ if (indexer_verifier_->checkNextSuffix() <= max_received) {
+ indexer_verifier_->jumpToIndex(max_received + 1);
}
uint32_t sent_interests = 0;
@@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
<< "In while loop. Window size: " << current_sync_win_;
uint32_t next_seg = indexer_verifier_->getNextSuffix();
-
name->setSuffix(next_seg);
// send the packet only if:
@@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
}
timeouts_or_nacks_.insert(segment_number);
-
if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
segment_number <= state_->getHighestSeqReceived()) {
// we retransmit packets only if the producer is active, otherwise we
@@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
<< "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
<< ", jump to " << segment_number;
// add an extra space in the window
- current_sync_win_++;
indexer_verifier_->jumpToIndex(segment_number);
}
state_->onTimeout(segment_number, false);
+ sendInterestForTimeout(segment_number);
scheduleNextInterests();
}
@@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
state_->onJumpForward(production_seg);
- verifier_->onJumpForward(production_seg);
// the client is asking for content in the past
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
@@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// Check if the packet is a retransmission
if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
if (is_data || is_manifest) {
- state_->onPacketRecoveredRtx(segment_number);
+ uint64_t rtt = ldr_->getRtxRtt(segment_number);
+ state_->onPacketRecoveredRtx(content_object, rtt);
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived(
}
if (is_fec) {
- state_->onFecPacketRecoveredRtx(segment_number);
+ state_->onFecPacketRecoveredRtx(content_object);
}
}
@@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
stats_->updateLossRatio(state_->getPerSecondLossRate());
uint64_t rtt = state_->getAvgRTT();
- stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt));
+ stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000));
stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
@@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object,
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Send packet " << content_object.getName() << " to FEC decoder";
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t offset =
+ is_manifest
+ ? (uint32_t)content_object.headerSize()
+ : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE);
uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
fec_decoder_->onDataPacket(content_object, offset, metadata);
@@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
processManifest(*interest, *content_object);
}
- state_->onPacketRecoveredFec(seq_number, buffer->length());
+ state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length());
ldr_->onPacketRecoveredFec(seq_number);
if (payload_type == PayloadType::DATA) {
@@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest,
ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
const ContentObject &content_object) {
- if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) {
return nullptr;
}
- size_t fec_header_size = fec_decoder_->getFecHeaderSize();
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize(false);
const uint8_t *payload =
content_object.data() + content_object.headerSize() + fec_header_size;
size_t payload_size = content_object.payloadSize() - fec_header_size;
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index 3763f33c7..a8a474216 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -44,7 +44,7 @@ class RTCTransportProtocol : public TransportProtocol {
void resume() override;
- std::size_t transportHeaderLength() override;
+ std::size_t transportHeaderLength(bool isFEC) override;
auto shared_from_this() { return utils::shared_from(this); }
@@ -69,6 +69,7 @@ class RTCTransportProtocol : public TransportProtocol {
// packet functions
void sendRtxInterest(uint32_t seq);
void sendProbeInterest(uint32_t seq);
+ void sendInterestForTimeout(uint32_t seq);
void scheduleNextInterests() override;
void onInterestTimeout(Interest::Ptr &interest, const Name &name) override;
void onNack(const ContentObject &content_object);
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index 96e39d07e..29b5a3a12 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -54,7 +54,7 @@ const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As
const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop
// packet const
-const uint32_t RTC_INTEREST_LIFETIME = 2000;
+const uint32_t RTC_INTEREST_LIFETIME = 4000;
// probes sequence range
const uint32_t MIN_PROBE_SEQ = 0xefffffff;
@@ -93,6 +93,7 @@ const double CATCH_UP_WIN_INCREMENT = 1.2;
// used in rate control
const double WIN_DECREASE_FACTOR = 0.5;
const double WIN_INCREASE_FACTOR = 1.5;
+const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes
// round in congestion
const double ROUNDS_BEFORE_TAKE_ACTION = 5;
@@ -120,14 +121,14 @@ const uint64_t MAX_TIMER_RTX = ~0;
const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
const double CATCH_UP_RTT_INCREMENT = 1.2;
-const double MAX_RESIDUAL_LOSS_RATE = 2.0; // %
-const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5;
+const double MAX_RESIDUAL_LOSS_RATE = 1.0; // %
+const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC;
+const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms
// used by producer
const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window *
// rounds in a second
-const uint32_t NACK_DELAY = 1500; // ms
const uint32_t FEC_PACING_TIME = 5; // ms
// aggregated data consts
@@ -139,6 +140,73 @@ const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms
const uint32_t MAX_RTT = 200; // ms
const double MAX_RESIDUAL_LOSSES = 0.05; // %
+const uint8_t FEC_MATRIX[64][10] = {
+ {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1
+ {1, 2, 3, 3, 4, 5, 5, 6, 7, 9},
+ {2, 2, 3, 4, 5, 6, 7, 8, 9, 11},
+ {2, 3, 4, 5, 5, 7, 8, 9, 11, 13},
+ {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5
+ {2, 3, 4, 6, 7, 8, 10, 12, 14, 16},
+ {2, 4, 5, 6, 8, 9, 11, 13, 15, 18},
+ {3, 4, 5, 7, 8, 10, 12, 14, 16, 19},
+ {3, 4, 6, 7, 9, 11, 13, 15, 18, 21},
+ {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10
+ {3, 5, 6, 8, 10, 12, 14, 17, 20, 24},
+ {3, 5, 7, 8, 10, 13, 15, 18, 21, 26},
+ {3, 5, 7, 9, 11, 13, 16, 19, 23, 27},
+ {3, 5, 7, 9, 12, 14, 17, 20, 24, 28},
+ {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15
+ {4, 6, 8, 10, 13, 15, 19, 22, 26, 31},
+ {4, 6, 8, 11, 13, 16, 19, 23, 27, 33},
+ {4, 6, 9, 11, 14, 17, 20, 24, 29, 34},
+ {4, 6, 9, 11, 14, 17, 21, 25, 30, 35},
+ {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20
+ {4, 7, 9, 12, 15, 19, 22, 27, 32, 38},
+ {4, 7, 10, 13, 16, 19, 23, 28, 33, 40},
+ {5, 7, 10, 13, 16, 20, 24, 29, 34, 41},
+ {5, 7, 10, 13, 17, 20, 25, 30, 35, 42},
+ {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25
+ {5, 8, 11, 14, 18, 22, 26, 31, 38, 45},
+ {5, 8, 11, 15, 18, 22, 27, 32, 39, 46},
+ {5, 8, 11, 15, 19, 23, 28, 33, 40, 48},
+ {5, 8, 12, 15, 19, 24, 28, 34, 41, 49},
+ {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30
+ {5, 9, 12, 16, 20, 25, 30, 36, 43, 51},
+ {5, 9, 13, 16, 21, 25, 31, 37, 44, 53},
+ {6, 9, 13, 17, 21, 26, 31, 38, 45, 54},
+ {6, 9, 13, 17, 22, 26, 32, 39, 46, 55},
+ {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35
+ {6, 10, 14, 18, 22, 28, 34, 40, 48, 58},
+ {6, 10, 14, 18, 23, 28, 34, 41, 49, 59},
+ {6, 10, 14, 19, 23, 29, 35, 42, 50, 60},
+ {6, 10, 14, 19, 24, 29, 36, 43, 52, 62},
+ {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40
+ {6, 11, 15, 20, 25, 31, 37, 45, 54, 64},
+ {6, 11, 15, 20, 25, 31, 38, 46, 55, 65},
+ {7, 11, 15, 20, 26, 32, 39, 46, 56, 67},
+ {7, 11, 16, 21, 26, 32, 39, 47, 57, 68},
+ {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45
+ {7, 11, 16, 21, 27, 33, 41, 49, 59, 70},
+ {7, 12, 16, 22, 27, 34, 41, 50, 60, 72},
+ {7, 12, 17, 22, 28, 34, 42, 51, 61, 73},
+ {7, 12, 17, 22, 28, 35, 43, 52, 62, 74},
+ {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50
+ {7, 12, 17, 23, 29, 36, 44, 53, 64, 77},
+ {7, 12, 18, 23, 30, 37, 45, 54, 65, 78},
+ {7, 13, 18, 24, 30, 37, 45, 55, 66, 79},
+ {8, 13, 18, 24, 31, 38, 46, 56, 67, 80},
+ {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55
+ {8, 13, 19, 25, 31, 39, 47, 57, 69, 83},
+ {8, 13, 19, 25, 32, 39, 48, 58, 70, 84},
+ {8, 13, 19, 25, 32, 40, 49, 59, 71, 85},
+ {8, 14, 19, 26, 33, 41, 50, 60, 72, 86},
+ {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60
+ {8, 14, 20, 26, 34, 42, 51, 61, 74, 89},
+ {8, 14, 20, 27, 34, 42, 52, 62, 75, 90},
+ {8, 14, 20, 27, 34, 43, 52, 63, 76, 91},
+ {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64
+};
+
} // namespace rtc
} // namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
index b3abf5ea8..a421396b1 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.cc
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -91,6 +91,8 @@ void RTCDataPath::insertRttSample(
rtt_samples_ = 0;
last_avg_rtt_compute_ = now;
}
+
+ received_packets_++;
}
void RTCDataPath::insertOwdSample(int64_t owd) {
@@ -115,10 +117,6 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
int64_t diff = std::abs(owd - last_owd_);
last_owd_ = owd;
jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
-
- // owd is computed only for valid data packets so we count only
- // this for decide if we recevie traffic or not
- received_packets_++;
}
void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
@@ -150,12 +148,17 @@ double RTCDataPath::getInterArrivalGap() {
return avg_inter_arrival_;
}
-bool RTCDataPath::isActive() {
+bool RTCDataPath::isValidProducer() {
if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
return true;
return false;
}
+bool RTCDataPath::isActive() {
+ if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true;
+ return false;
+}
+
bool RTCDataPath::pathToProducer() {
if (received_nacks_) return true;
return false;
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
index 5afbbb87f..ba5201fe8 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.h
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -49,8 +49,9 @@ class RTCDataPath {
double getQueuingDealy();
double getInterArrivalGap();
double getJitter();
- bool isActive();
- bool pathToProducer();
+ bool isActive(); // pakets recevied from this path in the last rounds
+ bool pathToProducer(); // path from a producer
+ bool isValidProducer(); // path from a producer that is also active
uint64_t getLastPacketTS();
uint32_t getPacketsLastRound();
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
index c6bc751e6..4bbd7eac0 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/interfaces/notification.h>
+#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_forwarding_strategy.h>
namespace transport {
@@ -24,8 +25,13 @@ namespace rtc {
using namespace transport::interface;
+const double FWD_MAX_QUEUE = 30.0; // ms
+const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms
+const double FWD_MAX_LOSS_RATE = 0.1;
+
RTCForwardingStrategy::RTCForwardingStrategy()
- : init_(false),
+ : low_rate_app_(false),
+ init_(false),
forwarder_set_(false),
selected_strategy_(NONE),
current_strategy_(NONE),
@@ -42,17 +48,56 @@ void RTCForwardingStrategy::setCallback(
void RTCForwardingStrategy::initFwdStrategy(
std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state,
- strategy_t strategy) {
- init_ = true;
- selected_strategy_ = strategy;
- if (strategy == BOTH)
- current_strategy_ = BEST_PATH;
- else
- current_strategy_ = strategy;
- rounds_since_last_set_ = 0;
- prefix_ = prefix;
- portal_ = portal;
- state_ = state;
+ interface::RtcTransportRecoveryStrategies strategy) {
+ switch (strategy) {
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF:
+ case interface::RtcTransportRecoveryStrategies::RTX_ONLY:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY:
+ case interface::RtcTransportRecoveryStrategies::DELAY_BASED:
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES:
+ default:
+ // fwd strategies are not used
+ init_ = false;
+ }
+
+ if (init_) {
+ rounds_since_last_set_ = 0;
+ prefix_ = prefix;
+ portal_ = portal;
+ state_ = state;
+ }
}
void RTCForwardingStrategy::checkStrategy() {
@@ -99,16 +144,35 @@ void RTCForwardingStrategy::checkStrategyBestPath() {
return;
}
- uint8_t qs = state_->getQualityScore();
+ if (low_rate_app_) {
+ // this is used for gaming
+ uint8_t qs = state_->getQualityScore();
- if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
- // between each switch
- rounds_since_last_set_++;
- return;
- }
+ if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
- // try to switch path
- setStrategy(BEST_PATH);
+ // try to switch path
+ setStrategy(BEST_PATH);
+ } else {
+ if (rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
+
+ double queue = state_->getQueuing();
+ double rtt = state_->getAvgRTT();
+ double loss_rate = state_->getPerSecondLossRate();
+
+ if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT ||
+ loss_rate > FWD_MAX_LOSS_RATE) {
+ // try to switch path
+ setStrategy(BEST_PATH);
+ }
+ }
}
void RTCForwardingStrategy::checkStrategyReplication() {
@@ -133,7 +197,7 @@ void RTCForwardingStrategy::checkStrategyBoth() {
// TODO
// for the moment we use only best path.
- // but later:
+ // for later:
// 1. if both paths are bad use replication
// 2. while using replication compute the effectiveness. if the majority of
// the packets are coming from a single path, try to use bestpath
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
index 9825877fd..c2227e09f 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -41,7 +41,7 @@ class RTCForwardingStrategy {
void initFwdStrategy(std::shared_ptr<core::Portal> portal,
core::Prefix& prefix, RTCState* state,
- strategy_t strategy);
+ interface::RtcTransportRecoveryStrategies strategy);
void checkStrategy();
void setCallback(interface::StrategyCallback&& callback);
@@ -56,6 +56,10 @@ class RTCForwardingStrategy {
std::array<std::string, 4> string_strategies_ = {"bestpath", "replication",
"both", "none"};
+ bool low_rate_app_; // if set to true the best path strategy will
+ // trigger a path switch based on the quality
+ // score, otherwise it will use the RTT,
+ // queuing delay and loss rate
bool init_; // true if all val are initializes
bool forwarder_set_; // true if the strategy is been set at least
// once
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index abf6cda2c..6e88a8636 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -37,16 +37,24 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
interface::StrategyCallback &&external_callback) {
- rs_type_ = type;
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
- indexer, std::move(callback), io_service, std::move(external_callback));
- } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(
- indexer, std::move(callback), io_service, std::move(external_callback));
- } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_BESTPATH ||
@@ -55,12 +63,14 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_ALL_FWD_STRATEGIES) {
rs_ = std::make_shared<RecoveryStrategyLowRate>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
} else {
// default
- rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
+ type = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
}
}
@@ -68,15 +78,21 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
void RTCLossDetectionAndRecovery::changeRecoveryStrategy(
interface::RtcTransportRecoveryStrategies type) {
- if (type == rs_type_) return;
+ if (type == rs_->getType()) return;
- rs_type_ = type;
+ rs_->updateType(type);
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ =
std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get())));
- } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get())));
- } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get())));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
@@ -116,14 +132,15 @@ bool RTCLossDetectionAndRecovery::onDataPacketReceived(
uint32_t seq = content_object.getName().getSuffix();
bool is_rtx = rs_->isRtx(seq);
rs_->receivedPacket(seq);
+ bool ret = false;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "received data. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
+ << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq;
if (!is_rtx)
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq,
- false);
+ ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false);
- return false;
+ rs_->getState()->updateHighestSeqReceived(seq);
+ return ret;
}
bool RTCLossDetectionAndRecovery::onNackPacketReceived(
@@ -141,10 +158,9 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived(
// may got lost and we should ask them
rs_->receivedPacket(seq);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "received nack. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
- << production_seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
// if it is a future nack store it in the list set of nacked seq
if (production_seq <= seq) rs_->receivedFutureNack(seq);
@@ -152,7 +168,7 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived(
// call the detectLoss function using the probe flag = true. in fact the
// losses detected using nacks are the same as the one detected using probes,
// we should not increase the loss counter
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
production_seq, true);
}
@@ -164,12 +180,11 @@ bool RTCLossDetectionAndRecovery::onProbePacketReceived(
uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "received probe. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
- << production_seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
production_seq, true);
}
@@ -183,8 +198,8 @@ bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop,
}
// skip received or lost packets
- if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) {
- start = rs_->getState()->getHighestSeqReceivedInOrder() + 1;
+ if (start <= rs_->getState()->getHighestSeqReceived()) {
+ start = rs_->getState()->getHighestSeqReceived() + 1;
}
bool loss_detected = false;
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index 7f683eaa6..24f22ffed 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -47,6 +47,7 @@ class RTCLossDetectionAndRecovery
void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
+ void setContentSharingMode() { rs_->setContentSharingMode(); }
void turnOnRecovery() { rs_->turnOnRecovery(); }
bool isRtxOn() { return rs_->isRtxOn(); }
@@ -68,11 +69,12 @@ class RTCLossDetectionAndRecovery
return rs_->isPossibleLossWithNoRtx(seq);
}
+ uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); }
+
private:
// returns true if a loss is detected, false otherwise
bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe);
- interface::RtcTransportRecoveryStrategies rs_type_;
std::shared_ptr<RecoveryStrategy> rs_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
index 391aedfc6..ffbbd78fd 100644
--- a/libtransport/src/protocols/rtc/rtc_packet.h
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -52,6 +52,8 @@
#include <hicn/transport/portability/win_portability.h>
#endif
+#include <hicn/transport/portability/endianess.h>
+
#include <cstring>
namespace transport {
@@ -60,24 +62,6 @@ namespace protocol {
namespace rtc {
-inline uint64_t _ntohll(const uint64_t *input) {
- uint64_t return_val;
- uint8_t *tmp = (uint8_t *)&return_val;
-
- tmp[0] = (uint8_t)(*input >> 56);
- tmp[1] = (uint8_t)(*input >> 48);
- tmp[2] = (uint8_t)(*input >> 40);
- tmp[3] = (uint8_t)(*input >> 32);
- tmp[4] = (uint8_t)(*input >> 24);
- tmp[5] = (uint8_t)(*input >> 16);
- tmp[6] = (uint8_t)(*input >> 8);
- tmp[7] = (uint8_t)(*input >> 0);
-
- return return_val;
-}
-
-inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
-
const uint32_t DATA_HEADER_SIZE = 12; // bytes
// XXX: sizeof(data_packet_t) is 16
// beacuse of padding
@@ -87,11 +71,19 @@ struct data_packet_t {
uint64_t timestamp;
uint32_t prod_rate;
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
};
struct nack_packet_t {
@@ -99,14 +91,26 @@ struct nack_packet_t {
uint32_t prod_rate;
uint32_t prod_seg;
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
- inline uint32_t getProductionSegment() const { return ntohl(prod_seg); }
- inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); }
+ inline uint32_t getProductionSegment() const {
+ return portability::net_to_host(prod_seg);
+ }
+ inline void setProductionSegment(uint32_t seg) {
+ prod_seg = portability::host_to_net(seg);
+ }
};
class AggrPktHeader {
@@ -225,7 +229,7 @@ class AggrPktHeader {
return (uint16_t) * (buf_ + pkt_index);
} else { // 16 bits
uint16_t *buf_16 = (uint16_t *)buf_;
- return ntohs(*(buf_16 + pkt_index));
+ return portability::net_to_host(*(buf_16 + pkt_index));
}
}
@@ -235,7 +239,7 @@ class AggrPktHeader {
*(buf_ + pkt_index) = (uint8_t)len;
} else { // 16 bits
uint16_t *buf_16 = (uint16_t *)buf_;
- *(buf_16 + pkt_index) = htons(len);
+ *(buf_16 + pkt_index) = portability::host_to_net(len);
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc
index 992bab50e..b1b0fcaba 100644
--- a/libtransport/src/protocols/rtc/rtc_reassembly.cc
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc
@@ -40,7 +40,7 @@ void RtcReassembly::reassemble(core::ContentObject& content_object) {
auto read_buffer = content_object.getPayload();
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length();
- read_buffer->trimStart(transport_protocol_->transportHeaderLength());
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength(false));
if (data_aggregation_) {
rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data());
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
index 66ae5086c..257fdd09b 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -29,8 +29,12 @@ using namespace transport::interface;
RecoveryStrategy::RecoveryStrategy(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback)
- : recovery_on_(false),
+ bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : rs_type_(rs_type),
+ recovery_on_(false),
+ content_sharing_mode_(false),
rtx_during_fec_(0),
next_rtx_timer_(MAX_TIMER_RTX),
send_rtx_callback_(std::move(callback)),
@@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy(
}
RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
- : rtx_during_fec_(0),
+ : rs_type_(rs.rs_type_),
+ content_sharing_mode_(rs.content_sharing_mode_),
+ rtx_during_fec_(0),
rtx_state_(std::move(rs.rtx_state_)),
rtx_timers_(std::move(rs.rtx_timers_)),
recover_with_fec_(std::move(rs.recover_with_fec_)),
@@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
RecoveryStrategy::~RecoveryStrategy() {}
void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
+ // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64
n_ = n;
k_ = k;
// XXX for the moment we go in steps of 5% loss rate.
- // max loss rate = 95%
+ uint32_t i = 0;
for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)(loss_rate + 5) / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- fec_state_ f;
- f.fec_to_ask = std::min(fec_to_ask, (n_ - k_));
- f.last_update = round_id_;
- f.avg_residual_losses = 0.0;
- f.consecutive_use = 0;
- fec_per_loss_rate_.push_back(f);
+ uint32_t fec_to_ask = 0;
+ if (n_ != 0 && k_ != 0) {
+ if (rs_type_ ==
+ interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) {
+ // the max loss rate in the matrix is 50%
+ uint32_t index = i;
+ if (i > 9) index = 9;
+ fec_to_ask = FEC_MATRIX[k_ - 1][index];
+ } else {
+ double dec_loss_rate = (double)(loss_rate + 5);
+ if (dec_loss_rate == 100.0) dec_loss_rate = 95.0;
+ dec_loss_rate = dec_loss_rate / 100.0;
+ double exp_losses = ceil((double)k_ * dec_loss_rate);
+ fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25);
+ }
+ }
+ fec_to_ask = std::min(fec_to_ask, (n_ - k_));
+ fec_per_loss_rate_.push_back(fec_to_ask);
+
+ i++;
}
}
+uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) {
+ auto it = rtx_state_.find(seq);
+
+ if (it == rtx_state_.end()) return 0;
+
+ // we can compute the RTT of an RTX only if it was send once. Infact if the
+ // RTX was sent twice or more the data may be alredy in flight and the RTT
+ // will be underestimated. This may happen also for packets that we
+ // retransmitted too soon. in that case the RTT will be filtered out by
+ // checking the path label
+ if (it->second.rtx_count_ != 1) return 0;
+
+ // this a potentialy valid packet, compute the RTT
+ return (utils::SteadyTime::nowMs().count() - it->second.last_send_);
+}
+
bool RecoveryStrategy::lossDetected(uint32_t seq) {
if (isRtx(seq)) {
// this packet is already in the list of rtx
@@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
state.first_send_ = state_->getInterestSentTime(seq);
if (state.first_send_ == 0) // this interest was never sent before
state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
+ state.last_send_ = state.first_send_; // we didn't send an RTX for this
+ // packet yet
state.rtx_count_ = 0;
+ state.next_send_ = computeNextSend(seq, state.rtx_count_);
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Add " << seq << " to retransmissions. next rtx is in "
<< state.next_send_ - getNow() << " ms";
@@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
}
}
-uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) {
+uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) {
uint64_t now = getNow();
- if (new_rtx) {
- // for the new rtx we wait one estimated IAT after the loss detection. this
- // is bacause, assuming that packets arrive with a constant IAT, we should
- // get a new packet every IAT
- double prod_rate = state_->getProducerRate();
- uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
- uint32_t jitter = 0;
+ if (rtx_counter == 0) {
+ uint32_t wait = 1;
+ if (content_sharing_mode_) return now + wait;
- if (prod_rate != 0) {
- double packet_size = state_->getAveragePacketSize();
- estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
- jitter = ceil(state_->getJitter());
- }
+ uint32_t jitter = SENTINEL_TIMER_INTERVAL;
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) jitter = ceil(state_->getJitter());
- uint32_t wait = 1;
- if (estimated_iat < 18) {
- // for low rate app we do not wait to send a RTX
- // we consider low rate stream with less than 50pps (iat >= 20ms)
- // (e.g. audio in videoconf, mobile games).
- // in the check we use 18ms to accomodate for measurements errors
- // for flows with higher rate wait 1 ait + jitter
- wait = estimated_iat + jitter;
- }
+ wait += jitter;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "first rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait
+ << " ms, jitter = " << jitter;
return now + wait;
} else {
- // wait one RTT
- uint32_t wait = SENTINEL_TIMER_INTERVAL;
-
+ // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx
double prod_rate = state_->getProducerRate();
if (prod_rate == 0) {
return now + SENTINEL_TIMER_INTERVAL;
}
- double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint64_t rtt = 0;
+ // if the transport detects an edge we try first to get the RTX from the
+ // edge. if no interest get a reply we move to the full RTT
+ if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) {
+ rtt = state_->getEdgeRtt();
+ } else {
+ rtt = state_->getAvgRTT();
+ }
- uint64_t rtt = state_->getMinRTT();
if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = rtt;
+
+ if (content_sharing_mode_) return now + rtt;
+
+ uint32_t wait = (uint32_t)rtt;
uint32_t jitter = ceil(state_->getJitter());
wait += jitter;
- // it may happen that the channel is congested and we have some additional
- // queuing delay to take into account
- uint32_t queue = ceil(state_->getQueuing());
- wait += queue;
-
DLOG_IF(INFO, VLOG_IS_ON(3))
- << "next rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter << " queue = " << queue;
+ << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt
+ << " jtter = " << jitter;
return now + wait;
}
@@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() {
state_->onRetransmission(seq);
double prod_rate = state_->getProducerRate();
if (prod_rate != 0) rtx_it->second.rtx_count_++;
- rtx_it->second.next_send_ = computeNextSend(seq, false);
+ rtx_it->second.last_send_ = now;
+ rtx_it->second.next_send_ =
+ computeNextSend(seq, rtx_it->second.rtx_count_);
it = rtx_timers_.erase(it);
rtx_timers_.insert(
std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
@@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) {
}
it_timers++;
}
+
// remove rtx
rtx_state_.erase(it_rtx);
}
@@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
if (loss_rate == 0) return 0;
- // once per minute try to reduce the fec rate. it may happen that for some bin
- // we ask too many fec packet. here we try to reduce this values gently
- if (round_id_ % ROUNDS_PER_MIN == 0) {
- reduceFec();
- }
-
// keep track of the last used fec. if we use a new bin on this round reset
// consecutive use and avg loss in the prev bin
uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1;
+ if (bin > fec_per_loss_rate_.size() - 1)
+ bin = (uint32_t)fec_per_loss_rate_.size() - 1;
- if (bin != last_fec_used_) {
- fec_per_loss_rate_[last_fec_used_].consecutive_use = 0;
- fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0;
- }
- last_fec_used_ = bin;
- fec_per_loss_rate_[last_fec_used_].consecutive_use++;
-
- // we update the stats only once very 5 rounds (1sec) that is the rate at
- // which we compute residual losses
- if (round_id_ % ROUNDS_PER_SEC == 0) {
- double residual_losses = state_->getResidualLossRate() * 100;
- // update residual loss rate
- fec_per_loss_rate_[bin].avg_residual_losses =
- (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) +
- (1 - MOVING_AVG_ALPHA) * residual_losses;
-
- if ((fec_per_loss_rate_[bin].last_update - round_id_) <
- WAIT_BEFORE_FEC_UPDATE) {
- // this bin is been updated recently so don't modify it and
- // return the current state
- return fec_per_loss_rate_[bin].fec_to_ask;
- }
-
- // if the residual loss rate is too high and we can ask more fec packets and
- // we are using this configuration since at least 5 sec update fec
- if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE &&
- fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) &&
- fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) {
- // so increase the number of fec packets to ask
- fec_per_loss_rate_[bin].fec_to_ask++;
- fec_per_loss_rate_[bin].last_update = round_id_;
- fec_per_loss_rate_[bin].avg_residual_losses = 0.0;
- }
- }
-
- return fec_per_loss_rate_[bin].fec_to_ask;
+ return fec_per_loss_rate_[bin];
}
void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
@@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) {
deleteRtx(seq);
}
-// private methods
-
-void RecoveryStrategy::reduceFec() {
- for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)loss_rate / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
- fec_per_loss_rate_[bin].fec_to_ask--;
- }
- }
-}
-
} // end namespace rtc
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
index 482aedc9d..aceb85888 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -32,9 +32,10 @@ namespace rtc {
class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
protected:
struct rtx_state_ {
- uint64_t first_send_;
- uint64_t next_send_;
- uint32_t rtx_count_;
+ uint64_t first_send_; // first time this interest was sent
+ uint64_t last_send_; // last time this rtx was sent
+ uint64_t next_send_; // next retransmission time
+ uint32_t rtx_count_; // number or rtx
};
using rtxState = struct rtx_state_;
@@ -44,6 +45,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service, bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategy(RecoveryStrategy &&rs);
@@ -55,6 +57,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void setState(RTCState *state) { state_ = state; }
void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
void setFecParams(uint32_t n, uint32_t k);
+ void setContentSharingMode() { content_sharing_mode_ = true; }
bool isRtx(uint32_t seq) {
if (rtx_state_.find(seq) != rtx_state_.end()) return true;
@@ -71,10 +74,20 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
return false;
}
+ interface::RtcTransportRecoveryStrategies getType() {
+ return rs_type_;
+ }
+ void updateType(interface::RtcTransportRecoveryStrategies type) {
+ rs_type_ = type;
+ }
bool isRtxOn() { return rtx_on_; }
bool isFecOn() { return fec_on_; }
RTCState *getState() { return state_; }
+
+ // if the function returns 0 it means that the packet is not an RTX or it is
+ // not a valid packet to safely compute the RTT
+ uint64_t getRtxRtt(uint32_t seq);
bool lossDetected(uint32_t seq);
void notifyNewLossDetedcted(uint32_t seq);
void requestPossibleLostPacket(uint32_t seq);
@@ -98,7 +111,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
protected:
// rtx functions
void addNewRtx(uint32_t seq, bool force);
- uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter);
void retransmit();
void scheduleNextRtx();
void deleteRtx(uint32_t seq);
@@ -109,9 +122,11 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
// common functons
void removePacketState(uint32_t seq);
+ interface::RtcTransportRecoveryStrategies rs_type_;
bool recovery_on_;
bool rtx_on_;
bool fec_on_;
+ bool content_sharing_mode_;
// number of RTX sent after fec turned on
// this is used to take into account jitter and out of order packets
@@ -152,19 +167,9 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RTCRateControl *rc_;
private:
- struct fec_state_ {
- uint32_t fec_to_ask;
- uint32_t last_update; // round id of the last update
- // (wait 10 ruonds (2sec) between updates)
- uint32_t consecutive_use; // consecutive ruonds where this fec was used
- double avg_residual_losses;
- };
-
- void reduceFec();
-
uint32_t round_id_; // number of rounds
uint32_t last_fec_used_;
- std::vector<fec_state_> fec_per_loss_rate_;
+ std::vector<uint32_t> fec_per_loss_rate_;
interface::StrategyCallback callback_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
index 4be751ec9..7d7a01133 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -25,8 +25,10 @@ namespace rtc {
RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ rs_type,
std::move(external_callback)), // start with rtx
congestion_state_(false),
probing_state_(false),
@@ -48,7 +50,7 @@ void RecoveryStrategyDelayBased::turnOnRecovery() {
recovery_on_ = true;
uint64_t rtt = state_->getMinRTT();
uint32_t fec_to_ask = computeFecPacketsToAsk();
- if (rtt > 80 && fec_to_ask != 0) {
+ if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) {
// we need to start FEC (see fec only strategy for more details)
setRtxFec(true, true);
rtx_during_fec_ = 1; // avoid to stop fec
@@ -84,16 +86,16 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
return;
}
- uint64_t rtt = state_->getMinRTT();
+ uint64_t rtt = state_->getAvgRTT();
- bool congestion = false;
// XXX at the moment we are not looking at congestion events
- // congestion = rc_->inCongestionState();
+ // bool congestion = rc_->inCongestionState();
- if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) {
+ if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) ||
+ (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) {
// switch from rtx to fec or keep use fec. Notice that if some rtx are
// waiting to be scheduled, they will be sent normally, but no new rtx will
- // be created If the loss rate is 0 keep to use RTX.
+ // be created if the loss rate is 0 keep to use RTX.
uint32_t fec_to_ask = computeFecPacketsToAsk();
softSwitchToFec(fec_to_ask);
if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
@@ -104,7 +106,8 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
return;
}
- if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
+ if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) ||
+ (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) {
// turn on rtx
softSwitchToFec(0);
indexer_->setNFec(0);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
index 5ca90f4cb..9e1c41388 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -26,6 +26,7 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy {
public:
RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
index c44212bda..5b10823ec 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- std::move(external_callback)),
+ rs_type, std::move(external_callback)),
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
index 1ab78b842..42df25bd9 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -26,6 +26,7 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy {
public:
RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
index 48dd3e34f..dbad563cd 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -25,8 +25,10 @@ namespace rtc {
RecoveryStrategyLowRate::RecoveryStrategyLowRate(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
+ rs_type,
std::move(external_callback)), // start with fec
fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
rtx_allowed_consecutive_rounds_(0) {
@@ -75,7 +77,7 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
}
uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100);
- uint32_t rtt = state_->getAvgRTT();
+ uint32_t rtt = (uint32_t)state_->getAvgRTT();
bool use_rtx = false;
for (size_t i = 0; i < switch_vector.size(); i++) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
index d66b197e2..0e76efaca 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -34,6 +34,7 @@ class RecoveryStrategyLowRate : public RecoveryStrategy {
public:
RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyLowRate(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
index 16b14eff6..00c6a0504 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
- std::move(external_callback)) {}
+ rs_type, std::move(external_callback)) {}
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
index 3a9e71e7d..3d59cc473 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -26,6 +26,7 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
public:
RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
index 8e5db5439..4d7cf7a82 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- std::move(external_callback)) {}
+ rs_type, std::move(external_callback)) {}
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
index e90e5ba13..03dbed1c7 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -26,6 +26,7 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy {
public:
RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 5b3b5e4c3..82ac0b9c1 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -106,6 +106,7 @@ void RTCState::initParams() {
// paths stats
path_table_.clear();
main_path_ = nullptr;
+ edge_path_ = nullptr;
// packet cache (not pending anymore)
packet_cache_.clear();
@@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
}
updatePacketSize(content_object);
- updateReceivedBytes(content_object);
+ updateReceivedBytes(content_object, false);
addRecvOrLost(seq, PacketState::RECEIVED);
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
- // updateReceivedBytes(content_object);
- received_fec_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
-
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
+ updateReceivedBytes(content_object, true);
PacketState state = getPacketState(seq);
if (state != PacketState::LOST) {
@@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) {
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
}
+
addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
}
-void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt) {
+ uint32_t seq = content_object.getName().getSuffix();
packets_sent_to_app_++;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
// increase the recovered packet counter only if the packet was marked as LOST
// before.
@@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) {
if (state == PacketState::LOST) losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
+ updateReceivedBytes(content_object, false);
+
+ if (rtt == 0) return; // nothing to do
+
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+ if (path_it == path_table_.end()) {
+ // this is a new path and it must be a cache
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+ if (path->pathToProducer())
+ return; // this packet is coming from a producer
+ // even if we sent an RTX. this may happen
+ // for RTX that are sent too fast or in
+ // case of multipath
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
}
-void RTCState::onFecPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onFecPacketRecoveredRtx(
+ const core::ContentObject &content_object) {
// This is the same as onPacketRecoveredRtx, but in this is case the
// pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
losses_recovered_++;
+ updateReceivedBytes(content_object, true);
}
void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
@@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
packets_sent_to_app_++;
recovered_bytes_with_fec_ += size;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
@@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// channel losses
uint32_t last_round_packets = 0;
+ uint64_t min_edge_rtt = UINT_MAX;
std::shared_ptr<RTCDataPath> old_main_path = main_path_;
main_path_ = nullptr;
+ edge_path_ = nullptr;
for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- if (it->second->isActive()) {
+ if (it->second->isValidProducer()) {
uint32_t pkt = it->second->getPacketsLastRound();
if (pkt > last_round_packets) {
last_round_packets = pkt;
main_path_ = it->second;
}
+ } else if (it->second->isActive() && !it->second->pathToProducer()) {
+ // this is a path to a cache from where we are receiving content
+ if (it->second->getMinRtt() < min_edge_rtt) {
+ min_edge_rtt = it->second->getMinRtt();
+ edge_path_ = it->second;
+ }
}
it->second->roundEnd();
}
if (main_path_ == nullptr) main_path_ = old_main_path;
+ if (edge_path_ == nullptr) edge_path_ = main_path_;
+ if (edge_path_->getMinRtt() >= main_path_->getMinRtt())
+ edge_path_ = main_path_;
// in case we get a new main path we reset the stats of the old one. this is
// beacuse, in case we need to switch back we don't what to take decisions on
@@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
rounds_++;
}
-void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
- received_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec) {
+ if (isFec) {
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ } else {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ }
}
void RTCState::updatePacketSize(const core::ContentObject &content_object) {
@@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) {
addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
}
+void RTCState::updateHighestSeqReceived(uint32_t seq) {
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+}
+
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
@@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
switch (ProbeHandler::getProbeType(seq)) {
case ProbeType::INIT: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(probe));
+ const_cast<core::ContentObject &>(probe).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;
@@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
}
case core::PayloadType::MANIFEST: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(data));
+ const_cast<core::ContentObject &>(data).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 4bd2f76a0..ac3cc621f 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -84,8 +84,9 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
void onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats);
void onPacketLost(uint32_t seq);
- void onPacketRecoveredRtx(uint32_t seq);
- void onFecPacketRecoveredRtx(uint32_t seq);
+ void onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt);
+ void onFecPacketRecoveredRtx(const core::ContentObject &content_object);
void onPacketRecoveredFec(uint32_t seq, uint32_t size);
bool onProbePacketReceived(const core::ContentObject &probe);
void onJumpForward(uint32_t next_seq);
@@ -117,6 +118,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
return 0;
}
+ uint64_t getEdgeRtt() const {
+ if (edge_path_ != nullptr) return edge_path_->getMinRtt();
+ return 0;
+ }
+
void resetRttStats() {
if (mainPathIsValid()) main_path_->clearRtt();
}
@@ -149,7 +155,7 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
}
uint32_t getPendingInterestNumber() const {
- return pending_interests_.size();
+ return (uint32_t)pending_interests_.size();
}
PacketState getPacketState(uint32_t seq) {
@@ -242,6 +248,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// set it as TO_BE_RECEIVED.
void dataToBeReceived(uint32_t seq);
+ void updateHighestSeqReceived(uint32_t seq);
+
// Extract RTC parameters from probes (init or RTT probes) and data packets.
static core::ParamsRTC getProbeParams(const core::ContentObject &probe);
static core::ParamsRTC getDataParams(const core::ContentObject &data);
@@ -259,7 +267,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// update stats
void updateState();
- void updateReceivedBytes(const core::ContentObject &content_object);
+ void updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec);
void updatePacketSize(const core::ContentObject &content_object);
void updatePathStats(const core::ContentObject &content_object, bool is_nack);
void updateLossRate(bool in_sycn);
@@ -360,7 +369,12 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// paths stats
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
- std::shared_ptr<RTCDataPath> main_path_;
+ std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects
+ // the consumer to the producer. in
+ // case of multipath the trasnport
+ // uses the most active path
+ std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it
+ // exists
// packet received
// cache where to store info about the last MAX_CACHED_PACKETS
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
index 7b6330a1f..861ceee89 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.cc
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -22,11 +22,11 @@ namespace protocol {
namespace rtc {
RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_interval,
- double max_unverified_ratio)
+ uint32_t factor_relevant, uint32_t factor_alert)
: verifier_(verifier),
- max_unverified_interval_(max_unverified_interval),
- max_unverified_ratio_(max_unverified_ratio) {}
+ factor_relevant_(factor_relevant),
+ factor_alert_(factor_alert),
+ manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {}
void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
rtc_state_ = rtc_state;
@@ -36,12 +36,16 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
verifier_ = verifier;
}
-void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) {
- max_unverified_interval_ = max_unverified_interval;
+void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) {
+ factor_relevant_ = factor_relevant;
}
-void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) {
- max_unverified_ratio_ = max_unverified_ratio;
+void RTCVerifier::setFactorAlert(uint32_t factor_alert) {
+ factor_alert_ = factor_alert;
+}
+
+auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) {
+ return verifier_->verifyPackets(&interest);
}
auth::VerificationPolicy RTCVerifier::verify(
@@ -108,19 +112,27 @@ auth::VerificationPolicy RTCVerifier::verifyData(
auth::Suffix suffix = content_object.getName().getSuffix();
auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- Timestamp now = utils::SteadyTime::nowMs().count();
- // Flush old packets
- Timestamp oldest = flush_packets(now);
+ uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_;
+ uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_;
- // Add packet to map of unverified packets
- packets_unverif_.add(
- {.suffix = suffix, .timestamp = now, .size = content_object.length()},
- content_object.computeDigest(manifest_hash_algo_));
+ // Flush packets outside relevance window
+ for (auto it = packets_unverif_.set().begin();
+ it != packets_unverif_.set().end();) {
+ if (it->first > current_index_ - threshold_relevant) {
+ break;
+ }
+ packets_unverif_erased_.insert((unsigned int)it->first);
+ it = packets_unverif_.remove(it);
+ }
+
+ // Add packet to set of unverified packets
+ packets_unverif_.add({current_index_, suffix},
+ content_object.computeDigest(manifest_hash_algo_));
+ current_index_++;
- // Check that the ratio of unverified packets stays below the limit
- if (now - oldest < max_unverified_interval_ ||
- getBufferRatio() < max_unverified_ratio_) {
+ // Check that the number of unverified packets is below the alert threshold
+ if (packets_unverif_.set().size() <= threshold_alert) {
policy = auth::VerificationPolicy::ACCEPT;
}
@@ -139,18 +151,13 @@ auth::VerificationPolicy RTCVerifier::processManifest(
auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT;
// Decode manifest
- core::ContentObjectManifest manifest(content_object);
+ core::ContentObjectManifest manifest(content_object.shared_from_this());
manifest.decode();
- // Update last manifest
- if (suffix > last_manifest_) {
- last_manifest_ = suffix;
- }
-
- // Extract hash algorithm and hashes
+ // Extract manifest data
+ manifest_max_capacity_ = manifest.getMaxCapacity();
manifest_hash_algo_ = manifest.getHashAlgorithm();
- auth::Verifier::SuffixMap suffix_map =
- core::ContentObjectManifest::getSuffixMap(&manifest);
+ auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap();
// Return early if the manifest is empty
if (suffix_map.empty()) {
@@ -186,10 +193,7 @@ auth::VerificationPolicy RTCVerifier::processManifest(
for (const auto &p : policies) {
switch (p.second) {
case auth::VerificationPolicy::ACCEPT: {
- auto packet_unverif_it = packets_unverif_.packetIt(p.first);
- Packet packet_verif = *packet_unverif_it;
- packets_unverif_.remove(packet_unverif_it);
- packets_verif_.add(packet_verif);
+ packets_unverif_.remove(packets_unverif_.packet(p.first));
manifest_digests_.erase(p.first);
break;
}
@@ -209,69 +213,20 @@ void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
manifest_digests_.erase(suffix);
}
-void RTCVerifier::onJumpForward(uint32_t next_suffix) {
- if (next_suffix <= last_manifest_ + 1) {
- return;
- }
-
- // When we jump forward in the suffix sequence, we remove packets that won't
- // be verified. Those packets have a suffix in the range [last_manifest_ + 1,
- // next_suffix[.
- for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix;
- ++suffix) {
- auto packet_it = packets_unverif_.packetIt(suffix);
- if (packet_it != packets_unverif_.set().end()) {
- packets_unverif_.remove(packet_it);
- }
- }
-}
-
-double RTCVerifier::getBufferRatio() const {
- size_t total = packets_verif_.size() + packets_unverif_.size();
- double total_unverified = static_cast<double>(packets_unverif_.size());
- return total ? total_unverified / total : 0.0;
-}
-
-RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) {
- Timestamp oldest_verified = packets_verif_.set().empty()
- ? now
- : packets_verif_.set().begin()->timestamp;
- Timestamp oldest_unverified = packets_unverif_.set().empty()
- ? now
- : packets_unverif_.set().begin()->timestamp;
-
- // Prune verified packets older than the unverified interval
- for (auto it = packets_verif_.set().begin();
- it != packets_verif_.set().end();) {
- if (now - it->timestamp < max_unverified_interval_) {
- break;
- }
- it = packets_verif_.remove(it);
- }
-
- // Prune unverified packets older than the unverified interval
- for (auto it = packets_unverif_.set().begin();
- it != packets_unverif_.set().end();) {
- if (now - it->timestamp < max_unverified_interval_) {
- break;
- }
- packets_unverif_erased_.insert(it->suffix);
- it = packets_unverif_.remove(it);
- }
-
- return std::min(oldest_verified, oldest_unverified);
-}
-
std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add(
- const Packet &packet) {
+ const Packet &packet, const auth::CryptoHash &digest) {
auto inserted = packets_.insert(packet);
- size_ += inserted.second ? packet.size : 0;
+ if (inserted.second) {
+ packets_map_[packet.second] = inserted.first;
+ suffix_map_[packet.second] = digest;
+ }
return inserted;
}
RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove(
PacketSet::iterator packet_it) {
- size_ -= packet_it->size;
+ packets_map_.erase(packet_it->second);
+ suffix_map_.erase(packet_it->second);
return packets_.erase(packet_it);
}
@@ -279,35 +234,13 @@ const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const {
return packets_;
};
-size_t RTCVerifier::Packets::size() const { return size_; };
-
-std::pair<RTCVerifier::PacketSet::iterator, bool>
-RTCVerifier::PacketsUnverif::add(const Packet &packet,
- const auth::CryptoHash &digest) {
- auto inserted = add(packet);
- if (inserted.second) {
- packets_map_[packet.suffix] = inserted.first;
- digests_map_[packet.suffix] = digest;
- }
- return inserted;
-}
-
-RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove(
- PacketSet::iterator packet_it) {
- size_ -= packet_it->size;
- packets_map_.erase(packet_it->suffix);
- digests_map_.erase(packet_it->suffix);
- return packets_.erase(packet_it);
-}
-
-RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt(
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet(
auth::Suffix suffix) {
return packets_map_.at(suffix);
};
-const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap()
- const {
- return digests_map_;
+const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const {
+ return suffix_map_;
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
index 098984057..c83faf08a 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.h
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -27,19 +27,16 @@ namespace rtc {
class RTCVerifier {
public:
explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_interval,
- double max_unverified_ratio);
+ uint32_t factor_relevant, uint32_t factor_alert);
virtual ~RTCVerifier() = default;
void setState(std::shared_ptr<RTCState> rtc_state);
-
void setVerifier(std::shared_ptr<auth::Verifier> verifier);
+ void setFactorRelevant(uint32_t factor_relevant);
+ void setFactorAlert(uint32_t factor_alert);
- void setMaxUnverifiedInterval(uint32_t max_unverified_interval);
-
- void setMaxUnverifiedRatio(double max_unverified_ratio);
-
+ auth::VerificationPolicy verify(core::Interest &interest);
auth::VerificationPolicy verify(core::ContentObject &content_object,
bool is_fec = false);
auth::VerificationPolicy verifyProbe(core::ContentObject &content_object);
@@ -51,81 +48,47 @@ class RTCVerifier {
auth::VerificationPolicy processManifest(core::ContentObject &content_object);
void onDataRecoveredFec(uint32_t suffix);
- void onJumpForward(uint32_t next_suffix);
-
- double getBufferRatio() const;
protected:
- struct Packet;
- using Timestamp = uint64_t;
+ using Index = uint64_t;
+ using Packet = std::pair<Index, auth::Suffix>;
using PacketSet = std::set<Packet>;
- struct Packet {
- auth::Suffix suffix;
- Timestamp timestamp;
- size_t size;
-
- bool operator==(const Packet &b) const {
- return timestamp == b.timestamp && suffix == b.suffix;
- }
- bool operator<(const Packet &b) const {
- return timestamp == b.timestamp ? suffix < b.suffix
- : timestamp < b.timestamp;
- }
- };
-
class Packets {
public:
- virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet);
- virtual PacketSet::iterator remove(PacketSet::iterator packet_it);
- const PacketSet &set() const;
- size_t size() const;
-
- protected:
- PacketSet packets_;
- size_t size_;
- };
-
- class PacketsVerif : public Packets {};
-
- class PacketsUnverif : public Packets {
- public:
- using Packets::add;
std::pair<PacketSet::iterator, bool> add(const Packet &packet,
const auth::CryptoHash &digest);
- PacketSet::iterator remove(PacketSet::iterator packet_it) override;
- PacketSet::iterator packetIt(auth::Suffix suffix);
+ PacketSet::iterator remove(PacketSet::iterator packet_it);
+ const PacketSet &set() const;
+ PacketSet::iterator packet(auth::Suffix suffix);
const auth::Verifier::SuffixMap &suffixMap() const;
private:
+ PacketSet packets_;
std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_;
- auth::Verifier::SuffixMap digests_map_;
+ auth::Verifier::SuffixMap suffix_map_;
};
// The RTC state.
std::shared_ptr<RTCState> rtc_state_;
// The verifier instance.
std::shared_ptr<auth::Verifier> verifier_;
- // Window to consider when verifying packets.
- uint32_t max_unverified_interval_;
- // Ratio of unverified packets over which an alert is triggered.
- double max_unverified_ratio_;
- // The suffix of the last processed manifest.
- auth::Suffix last_manifest_;
+ // Used to compute the relevance windows size (in packets).
+ uint32_t factor_relevant_;
+ // Used to compute the alert threshold (in packets).
+ uint32_t factor_alert_;
+ // The maximum number of entries a manifest can contain.
+ uint8_t manifest_max_capacity_;
// Hash algorithm used by manifests.
auth::CryptoHashType manifest_hash_algo_;
// Digests extracted from all manifests received.
auth::Verifier::SuffixMap manifest_digests_;
- // Verified packets with timestamp >= now - max_unverified_interval_.
- PacketsVerif packets_verif_;
- // Unverified packets with timestamp >= now - max_unverified_interval_.
- PacketsUnverif packets_unverif_;
- // Unverified erased packets with timestamp < now - max_unverified_interval_.
+ // The number of data packets processed.
+ Index current_index_;
+ // Unverified packets with index in relevance window.
+ Packets packets_unverif_;
+ // Unverified erased packets with index outside relevance window.
std::unordered_set<auth::Suffix> packets_unverif_erased_;
-
- // Flushes all packets with timestamp < now - max_unverified_interval_.
- // Returns the timestamp of the oldest packet, verified or not.
- Timestamp flush_packets(Timestamp now);
};
} // namespace rtc