aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-03-30 22:29:28 +0200
committerMauro Sardara <msardara@cisco.com>2022-03-31 19:51:47 +0200
commitc46e5df56b67bb8ea7a068d39324c640084ead2b (patch)
treeeddeb17785938e09bc42eec98ee09b8a28846de6 /libtransport/src/protocols/rtc
parent18fa668f25d3cc5463417ce7df6637e31578e898 (diff)
feat: boostrap hicn 22.02
The current patch provides several new features, improvements, bug fixes and also complete rewrite of entire components. - lib The hicn packet parser has been improved with a new packet format fully based on UDP. The TCP header is still temporarily supported but the UDP header will replace completely the new hicn packet format. Improvements have been made to make sure every packet parsing operation is made via this library. The current new header can be used as header between the payload and the UDP header or as trailer in the UDP surplus area to be tested when UDP options will start to be used. - hicn-light The portable packet forwarder has been completely rewritten from scratch with the twofold objective to improve performance and code size but also to drop dependencies such as libparc which is now removed by the current implementation. - hicn control the control library is the agent that is used to program the packet forwarders via their binary API. This component has benefited from significant improvements in terms of interaction model which is now event driven and more robust to failures. - VPP plugin has been updated to support VPP 22.02 - transport Major improvement have been made to the RTC protocol, to the support of IO modules and to the security sub system. Signed manifests are the default data authenticity and integrity framework. Confidentiality can be enabled by sharing the encryption key to the prod/cons layer. The library has been tested with group key based applications such as broadcast/multicast and real-time on-line meetings with trusted server keys or MLS. - testing Unit testing has been introduced using GoogleTest. One third of the code base is covered by unit testing with priority on critical features. Functional testing has also been introduce using Docker, linux bridging and Robot Framework to define test with Less Code techniques to facilitate the extension of the coverage. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Angelo Mantellini <manangel@cisco.com> Co-authored-by: Jacques Samain <jsamain@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: I75d0ef70f86d921e3ef503c99271216ff583c215 Signed-off-by: Luca Muscariello <muscariello@ieee.org> Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt23
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc38
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h32
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc616
-rw-r--r--libtransport/src/protocols/rtc/rtc.h24
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc99
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h26
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc155
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h78
-rw-r--r--libtransport/src/protocols/rtc/rtc_indexer.h45
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc539
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h100
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h163
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc.h12
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc74
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_iat.cc287
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_iat.h93
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc6
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.h5
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.cc109
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.h21
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc418
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h154
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc122
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h53
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc118
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h51
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc167
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h69
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc60
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h44
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc61
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h44
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc486
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h150
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc238
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h79
39 files changed, 4014 insertions, 939 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
index 873b345d0..be8e0189c 100644
--- a/libtransport/src/protocols/rtc/CMakeLists.txt
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -16,22 +16,43 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.h
)
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.cc
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index abaca6ad9..abb234757 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/utils/chrono_typedefs.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_consts.h>
@@ -27,6 +28,7 @@ ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
: probe_interval_(0),
max_probes_(0),
sent_probes_(0),
+ recv_probes_(0),
probe_timer_(std::make_unique<asio::steady_timer>(io_service)),
rand_eng_((std::random_device())()),
distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ),
@@ -39,17 +41,25 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) {
if (it == pending_probes_.end()) return 0;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t rtt = now - it->second;
if (rtt < 1) rtt = 1;
pending_probes_.erase(it);
+ recv_probes_++;
return rtt;
}
+double ProbeHandler::getProbeLossRate() {
+ return 1.0 - ((double)recv_probes_ / (double)sent_probes_);
+}
+
+void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) {
+ assert(min <= max && min >= MIN_PROBE_SEQ);
+ distr_ = std::uniform_int_distribution<uint32_t>(min, max);
+}
+
void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) {
stopProbes();
probe_interval_ = probe_interval;
@@ -60,6 +70,7 @@ void ProbeHandler::stopProbes() {
probe_interval_ = 0;
max_probes_ = 0;
sent_probes_ = 0;
+ recv_probes_ = 0;
probe_timer_->cancel();
}
@@ -67,9 +78,7 @@ void ProbeHandler::sendProbes() {
if (probe_interval_ == 0) return;
if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint32_t seq = distr_(rand_eng_);
pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now));
@@ -92,14 +101,25 @@ void ProbeHandler::sendProbes() {
std::weak_ptr<ProbeHandler> self(shared_from_this());
probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
- probe_timer_->async_wait([self](std::error_code ec) {
+ probe_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- if (auto s = self.lock()) {
+ auto s = self.lock();
+ if (s) {
s->sendProbes();
}
});
}
+ProbeType ProbeHandler::getProbeType(uint32_t seq) {
+ if (MIN_INIT_PROBE_SEQ <= seq && seq <= MAX_INIT_PROBE_SEQ) {
+ return ProbeType::INIT;
+ }
+ if (MIN_RTT_PROBE_SEQ <= seq && seq <= MAX_RTT_PROBE_SEQ) {
+ return ProbeType::RTT;
+ }
+ return ProbeType::NOT_PROBE;
+}
+
} // namespace rtc
} // namespace protocol
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
index e34b23df0..2de908176 100644
--- a/libtransport/src/protocols/rtc/probe_handler.h
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -26,6 +26,12 @@ namespace protocol {
namespace rtc {
+enum class ProbeType {
+ NOT_PROBE,
+ INIT,
+ RTT,
+};
+
class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
public:
using SendProbeCallback = std::function<void(uint32_t)>;
@@ -35,31 +41,39 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
~ProbeHandler();
- // if the function returns 0 the probe is not valaid
+ // If the function returns 0 the probe is not valid.
uint64_t getRtt(uint32_t seq);
- // reset the probes parameters. it stop the current probing.
- // to restar call sendProbes.
- // probe_interval = 0 means that no event will be scheduled
- // max_probe = 0 means no limit to the number of probe to send
+ // this function may return a residual loss rate higher than the real one if
+ // we don't wait enough time for the probes to come back
+ double getProbeLossRate();
+
+ // Set the probe suffix range [min, max]
+ void setSuffixRange(uint32_t min, uint32_t max);
+
+ // Reset the probes parameters and stops the current probing.
+ // probe_interval = 0 means that no event will be scheduled.
+ // max_probe = 0 means no limit to the number of probe to send.
void setProbes(uint32_t probe_interval, uint32_t max_probes);
- // stop to schedule probes
void stopProbes();
void sendProbes();
+ static ProbeType getProbeType(uint32_t seq);
+
private:
uint32_t probe_interval_; // us
uint32_t max_probes_; // packets
uint32_t sent_probes_; // packets
+ uint32_t recv_probes_; // packets
std::unique_ptr<asio::steady_timer> probe_timer_;
- // map from seqnumber to timestamp
+ // Map from packet suffixes to timestamp
std::unordered_map<uint32_t, uint64_t> pending_probes_;
- // random generator
+ // Random generator
std::default_random_engine rand_eng_;
std::uniform_int_distribution<uint32_t> distr_;
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index 0cb4cda1d..df6522471 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -22,7 +22,7 @@
#include <protocols/rtc/rtc.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_indexer.h>
-#include <protocols/rtc/rtc_rc_queue.h>
+#include <protocols/rtc/rtc_rc_congestion_detection.h>
#include <algorithm>
@@ -37,13 +37,15 @@ using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
- new DatagramReassembly(icn_socket, this)),
+ new RtcReassembly(icn_socket, this)),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
- round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
scheduler_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getIoService());
- pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ pacing_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
}
RTCTransportProtocol::~RTCTransportProtocol() {}
@@ -61,25 +63,52 @@ std::size_t RTCTransportProtocol::transportHeaderLength() {
// private
void RTCTransportProtocol::initParams() {
TransportProtocol::reset();
+ fwd_strategy_.setCallback(on_fwd_strategy_);
- rc_ = std::make_shared<RTCRateControlQueue>();
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ uint32_t max_unverified_delay;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME,
+ max_unverified_delay);
+
+ rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
- indexer_verifier_.get(),
- std::bind(&RTCTransportProtocol::sendRtxInterest, this,
- std::placeholders::_1),
- portal_->getIoService());
+ indexer_verifier_.get(), portal_->getThread().getIoService(),
+ interface::RtcTransportRecoveryStrategies::RTX_ONLY,
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendRtxInterest(seq);
+ }
+ },
+ on_rec_strategy_);
+ verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
- std::bind(&RTCTransportProtocol::sendProbeInterest, this,
- std::placeholders::_1),
- std::bind(&RTCTransportProtocol::discoveredRtt, this),
- portal_->getIoService());
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendProbeInterest(seq);
+ }
+ },
+ [self]() {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->discoveredRtt();
+ }
+ },
+ portal_->getThread().getIoService());
+ state_->initParams();
rc_->setState(state_);
- // TODO: for the moment we keep the congestion control disabled
- // rc_->tunrOnRateControl();
- ldr_->setState(state_);
+ rc_->turnOnRateControl();
+ ldr_->setState(state_.get());
+ ldr_->setRateControl(rc_.get());
+ verifier_->setState(state_);
// protocol state
start_send_interest_ = false;
@@ -102,6 +131,10 @@ void RTCTransportProtocol::initParams() {
}
#else
max_aggregated_interest_ = 1;
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) {
+ LOG(INFO) << "Max Aggregated: " << max_aggr;
+ max_aggregated_interest_ = std::stoul(std::string(max_aggr));
+ }
#endif
max_sent_int_ =
@@ -131,6 +164,7 @@ void RTCTransportProtocol::initParams() {
indexer_verifier_->setNFec(0);
ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
fec::FECUtils::getSourceSymbols(fec_type_));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
} else {
indexer_verifier_->disableFec();
}
@@ -162,61 +196,97 @@ void RTCTransportProtocol::inactiveProducer() {
void RTCTransportProtocol::newRound() {
round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
- // TODO pass weak_ptr here
- round_timer_->async_wait([this, n{number_}](std::error_code ec) {
- if (ec) return;
- if (n != number_) {
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ round_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) {
return;
}
+ auto ptr = self.lock();
+
+ if (!ptr || !ptr->isRunning()) {
+ return;
+ }
+
+ auto &state = ptr->state_;
+
// saving counters that will be reset on new round
- uint32_t sent_retx = state_->getSentRtxInRound();
- uint32_t received_bytes = state_->getReceivedBytesInRound();
- uint32_t sent_interest = state_->getSentInterestInRound();
- uint32_t lost_data = state_->getLostData();
- uint32_t definitely_lost = state_->getDefinitelyLostPackets();
- uint32_t recovered_losses = state_->getRecoveredLosses();
- uint32_t received_nacks = state_->getReceivedNacksInRound();
- uint32_t received_fec = state_->getReceivedFecPackets();
-
- bool in_sync = (current_state_ == SyncState::in_sync);
- ldr_->onNewRound(in_sync);
- state_->onNewRound((double)ROUND_LEN, in_sync);
- rc_->onNewRound((double)ROUND_LEN);
+ uint32_t sent_retx = state->getSentRtxInRound();
+ uint32_t received_bytes =
+ (state->getReceivedBytesInRound() + // data packets received
+ state->getReceivedFecBytesInRound()); // fec packets received
+ uint32_t sent_interest = state->getSentInterestInRound();
+ uint32_t lost_data = state->getLostData();
+ uint32_t definitely_lost = state->getDefinitelyLostPackets();
+ uint32_t recovered_losses = state->getRecoveredLosses();
+ uint32_t received_nacks = state->getReceivedNacksInRound();
+ uint32_t received_fec = state->getReceivedFecPackets();
+
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
// update sync state if needed
- if (current_state_ == SyncState::in_sync) {
- double cache_rate = state_->getPacketFromCacheRatio();
+ if (ptr->current_state_ == SyncState::in_sync) {
+ double cache_rate = state->getPacketFromCacheRatio();
if (cache_rate > MAX_DATA_FROM_CACHE) {
- current_state_ = SyncState::catch_up;
+ ptr->current_state_ = SyncState::catch_up;
}
} else {
- double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double received_rate = state_->getReceivedRate();
- uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
- double cache_ratio = state_->getPacketFromCacheRatio();
+ double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate =
+ state->getReceivedRate() + state->getRecoveredFecRate();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+ double cache_ratio = state->getPacketFromCacheRatio();
if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
- current_state_ = SyncState::in_sync;
+ ptr->current_state_ = SyncState::in_sync;
}
}
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Calling updateSyncWindow in newRound function";
- updateSyncWindow();
+ ptr->updateSyncWindow();
- sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
- definitely_lost, recovered_losses, received_nacks,
- received_fec);
- newRound();
+ ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ definitely_lost, recovered_losses, received_nacks,
+ received_fec);
+ ptr->fwd_strategy_.checkStrategy();
+ ptr->newRound();
});
}
void RTCTransportProtocol::discoveredRtt() {
start_send_interest_ = true;
- ldr_->turnOnRTX();
+ uint32_t strategy;
+ socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
+ ldr_->changeRecoveryStrategy(
+ (interface::RtcTransportRecoveryStrategies)strategy);
+ ldr_->turnOnRecovery();
ldr_->onNewRound(false);
+
+ // set forwarding strategy switch if selected
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ Prefix prefix(*name, 128);
+ if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::BEST_PATH);
+ } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::REPLICATION);
+ } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::BOTH);
+ }
+
updateSyncWindow();
}
@@ -244,7 +314,7 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
(production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) /
packet_size);
- max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow());
}
void RTCTransportProtocol::updateSyncWindow() {
@@ -259,25 +329,14 @@ void RTCTransportProtocol::updateSyncWindow() {
}
double prod_rate = state_->getProducerRate();
- double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = state_->getAveragePacketSize();
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
- double fec_interest_overhead = (double)state_->getPendingFecPackets() /
- (double)(state_->getPendingInterestNumber() -
- state_->getPendingFecPackets());
-
- double fec_overhead =
- std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead);
-
- prod_rate += (prod_rate * fec_overhead);
-
current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
uint32_t buffer = PRODUCER_BUFFER_MS;
- if (rtt > 150)
- buffer = buffer * 2; // if the RTT is too large we increase the
- // the size of the buffer
+
current_sync_win_ +=
ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
@@ -285,8 +344,17 @@ void RTCTransportProtocol::updateSyncWindow() {
current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
}
+ uint32_t min_win = WIN_MIN;
+ bool aggregated_data_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA,
+ aggregated_data_on);
+ if (aggregated_data_on) {
+ min_win = WIN_MIN_WITH_AGGREGARED_DATA;
+ min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3));
+ }
+
current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
- current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ current_sync_win_ = std::max(current_sync_win_, min_win);
}
scheduleNextInterests();
@@ -322,7 +390,7 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq;
interest_name->setSuffix(seq);
sendInterest(*interest_name);
}
@@ -330,13 +398,18 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
void RTCTransportProtocol::scheduleNextInterests() {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
- if (!isRunning() && !is_first_) return;
+ if (!isRunning() && !is_first_) {
+ return;
+ }
- if (pacing_timer_on_) return; // wait pacing timer for the next send
+ if (pacing_timer_on_) {
+ return; // wait pacing timer for the next send
+ }
- if (!start_send_interest_)
+ if (!start_send_interest_) {
return; // RTT discovering phase is not finished so
// do not start to send interests
+ }
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer.";
@@ -352,7 +425,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
&interest_name);
uint32_t next_seg = 0;
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg;
interest_name->setSuffix(next_seg);
if (portal_->interestIsPending(*interest_name)) {
@@ -370,28 +443,37 @@ void RTCTransportProtocol::scheduleNextInterests() {
<< " -- current_sync_win_: " << current_sync_win_;
uint32_t pending = state_->getPendingInterestNumber();
- if (pending >= current_sync_win_) return; // no space in the window
+ uint32_t pending_fec = state_->getPendingFecPackets();
+
+ if ((pending - pending_fec) >= current_sync_win_)
+ return; // no space in the window
- if ((current_sync_win_ - pending) < max_aggregated_interest_) {
+ // XXX double check if aggregated interests are still working here
+ if ((current_sync_win_ - (pending - pending_fec)) <
+ max_aggregated_interest_) {
if (scheduler_timer_on_) return; // timer already scheduled
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t time = now - last_interest_sent_time_;
if (time < WAIT_FOR_INTEREST_BATCH) {
uint64_t next = WAIT_FOR_INTEREST_BATCH - time;
scheduler_timer_on_ = true;
scheduler_timer_->expires_from_now(std::chrono::milliseconds(next));
- scheduler_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- if (!scheduler_timer_on_) return;
- scheduler_timer_on_ = false;
- scheduleNextInterests();
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->scheduler_timer_on_) return;
+
+ ptr->scheduler_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
});
- return; // whait for the timer
+ return; // wait for the timer
}
}
@@ -403,7 +485,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1);
}
- // skipe received packets
+ // skip received packets
if (indexer_verifier_->checkNextSuffix() <=
state_->getHighestSeqReceivedInOrder()) {
indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
@@ -417,7 +499,8 @@ void RTCTransportProtocol::scheduleNextInterests() {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes;
- while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ while (((state_->getPendingInterestNumber() -
+ state_->getPendingFecPackets()) < current_sync_win_) &&
(sent_interests < max_sent_int_)) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "In while loop. Window size: " << current_sync_win_;
@@ -428,19 +511,20 @@ void RTCTransportProtocol::scheduleNextInterests() {
// send the packet only if:
// 1) it is not pending yet (not true for rtx)
- // 2) the packet is not received or lost
+ // 2) the packet is not received or def lost
// 3) is not in the rtx list
// 4) is fec and is not in order (!= last sent + 1)
+ PacketState packet_state = state_->getPacketState(next_seg);
if (portal_->interestIsPending(*name) ||
- state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN ||
- ldr_->isRtx(next_seg) ||
+ packet_state == PacketState::RECEIVED ||
+ packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) ||
(indexer_verifier_->isFec(next_seg) &&
next_seg != last_interest_sent_seq_ + 1)) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "skip interest " << next_seg << " because: pending "
- << portal_->interestIsPending(*name) << ", recv "
- << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN)
- << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec "
+ << portal_->interestIsPending(*name) << ", recv or lost"
+ << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg))
+ << ", is old fec "
<< ((indexer_verifier_->isFec(next_seg) &&
next_seg != last_interest_sent_seq_ + 1));
continue;
@@ -462,10 +546,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
sent_packets++;
sent_interests++;
sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
- last_interest_sent_time_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
aggregated_counter = 0;
}
}
@@ -473,25 +554,29 @@ void RTCTransportProtocol::scheduleNextInterests() {
// exiting the while we may have some pending interest to send
if (aggregated_counter != 0) {
sent_packets++;
- last_interest_sent_time_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
}
- if (state_->getPendingInterestNumber() < current_sync_win_) {
+ if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) <
+ current_sync_win_) {
// we still have space in the window but we already sent too many packets
// wait PACING_WAIT to avoid drops in the kernel
pacing_timer_on_ = true;
pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT));
- scheduler_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- if (!pacing_timer_on_) return;
- pacing_timer_on_ = false;
- scheduleNextInterests();
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->pacing_timer_on_) return;
+
+ ptr->pacing_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
});
}
}
@@ -500,13 +585,13 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
const Name &name) {
uint32_t segment_number = name.getSuffix();
- if (segment_number >= MIN_PROBE_SEQ) {
+ if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) {
// this is a timeout on a probe, do nothing
return;
}
- PacketState state = state_->isReceivedOrLost(segment_number);
- if (state != PacketState::UNKNOWN) {
+ PacketState state = state_->getPacketState(segment_number);
+ if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) {
// we may recover a packets using fec, ignore this timer
return;
}
@@ -524,13 +609,18 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "handle timeout for packet " << segment_number << " using rtx";
if (ldr_->isRtxOn()) {
- ldr_->onTimeout(segment_number);
- if (indexer_verifier_->isFec(segment_number))
+ if (indexer_verifier_->isFec(segment_number)) {
+ // if this is a fec packet we do not recover it with rtx so we consider
+ // the packet to be lost
+ ldr_->onTimeout(segment_number, true);
state_->onTimeout(segment_number, true);
- else
+ } else {
+ ldr_->onTimeout(segment_number, false);
state_->onTimeout(segment_number, false);
+ }
} else {
// in this case we wil never recover the timeout
+ ldr_->onTimeout(segment_number, true);
state_->onTimeout(segment_number, true);
}
scheduleNextInterests();
@@ -559,7 +649,7 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
void RTCTransportProtocol::onNack(const ContentObject &content_object) {
struct nack_packet_t *nack =
(struct nack_packet_t *)content_object.getPayload()->data();
- uint32_t production_seg = nack->getProductionSegement();
+ uint32_t production_seg = nack->getProductionSegment();
uint32_t nack_segment = content_object.getName().getSuffix();
bool is_rtx = ldr_->isRtx(nack_segment);
@@ -592,6 +682,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// remove the nack is it exists
if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+ state_->onJumpForward(production_seg);
+ verifier_->onJumpForward(production_seg);
// the client is asking for content in the past
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
@@ -618,11 +710,9 @@ void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
bool valid = state_->onProbePacketReceived(content_object);
if (!valid) return;
- struct nack_packet_t *probe =
- (struct nack_packet_t *)content_object.getPayload()->data();
- uint32_t production_seg = probe->getProductionSegement();
+ uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg;
- // as for the nacks set next_segment
+ // As for the nacks set next_segment
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "on probe next seg = " << indexer_verifier_->checkNextSuffix()
<< ", jump to " << production_seg;
@@ -636,12 +726,39 @@ void RTCTransportProtocol::onContentObjectReceived(
Interest &interest, ContentObject &content_object, std::error_code &ec) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Received content object of size: " << content_object.payloadSize();
- uint32_t payload_size = content_object.payloadSize();
+
uint32_t segment_number = content_object.getName().getSuffix();
+ PayloadType payload_type = content_object.getPayloadType();
+ PacketState state;
+
+ ContentObject *content_ptr = &content_object;
+ ContentObject::Ptr manifest_ptr = nullptr;
+
+ bool is_probe =
+ ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE;
+ bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
+ bool is_fec = indexer_verifier_->isFec(segment_number);
+ bool is_manifest =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST;
+ bool is_data =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA;
+ bool compute_stats = is_data || is_manifest;
ec = make_error_code(protocol_error::not_reassemblable);
- if (segment_number >= MIN_PROBE_SEQ) {
+ // A helper function to process manifests or data packets received
+ auto onDataPacketReceived = [this](ContentObject &content_object,
+ bool compute_stats) {
+ ldr_->onDataPacketReceived(content_object);
+ rc_->onDataPacketReceived(content_object, compute_stats);
+ updateSyncWindow();
+ };
+
+ // First verify the packet signature and apply the corresponding policy
+ auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec);
+ indexer_verifier_->applyPolicy(interest, content_object, false, policy);
+
+ if (is_probe) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -650,7 +767,7 @@ void RTCTransportProtocol::onContentObjectReceived(
return;
}
- if (payload_size == NACK_HEADER_SIZE) {
+ if (is_nack) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -659,57 +776,122 @@ void RTCTransportProtocol::onContentObjectReceived(
return;
}
+ // content_ptr will point either to the input data packet or to a manifest
+ // whose FEC header has been removed
+ if (is_manifest) {
+ manifest_ptr = removeFecHeader(content_object);
+ if (manifest_ptr) {
+ content_ptr = manifest_ptr.get();
+ }
+ }
+
+ // From there, the packet is either a FEC, a manifest or a data packet.
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number;
- bool compute_stats = true;
+ // Do not count timed out packets in stats
auto tn_it = timeouts_or_nacks_.find(segment_number);
if (tn_it != timeouts_or_nacks_.end()) {
compute_stats = false;
timeouts_or_nacks_.erase(tn_it);
}
- if (ldr_->isRtx(segment_number)) {
+
+ // Do not count retransmissions or losses in stats
+ if (ldr_->isRtx(segment_number) ||
+ ldr_->isPossibleLossWithNoRtx(segment_number)) {
compute_stats = false;
}
- // check if the packet was already received
- PacketState state = state_->isReceivedOrLost(segment_number);
+ // Fetch packet state
+ state = state_->getPacketState(segment_number);
- if (state != PacketState::RECEIVED) {
- // send packet to decoder
- if (fec_decoder_) {
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "send packet " << segment_number << " to FEC decoder";
- fec_decoder_->onDataPacket(
- content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE);
- }
- if (!indexer_verifier_->isFec(segment_number)) {
- // the packet may be alredy sent to the ap by the decoder, check again if
- // it is already received
- state = state_->isReceivedOrLost(segment_number);
- if (state != PacketState::RECEIVED) {
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number;
+ // Check if the packet is a retransmission
+ if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
+ if (is_data || is_manifest) {
+ state_->onPacketRecoveredRtx(segment_number);
- state_->onDataPacketReceived(content_object, compute_stats);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
- if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), content_object);
- }
- ec = make_error_code(protocol_error::success);
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
}
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number;
- state_->onFecPacketReceived(content_object);
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+
+ // The packet is considered received, return early
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
}
- } else {
+
+ if (is_fec) {
+ state_->onFecPacketRecoveredRtx(segment_number);
+ }
+ }
+
+ // Fetch packet state again; it may have changed
+ state = state_->getPacketState(segment_number);
+
+ // Check if the packet was already received
+ if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Received duplicated content " << segment_number << ", drop it";
ec = make_error_code(protocol_error::duplicated_content);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
}
- ldr_->onDataPacketReceived(content_object);
- rc_->onDataPacketReceived(content_object);
+ if (!is_fec) {
+ state_->dataToBeReceived(segment_number);
+ }
- updateSyncWindow();
+ // Send packet to FEC decoder
+ if (fec_decoder_) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << segment_number << " to FEC decoder";
+
+ uint32_t offset = is_manifest
+ ? content_object.headerSize()
+ : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+ }
+
+ // We can return early if FEC
+ if (is_fec) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number;
+ state_->onFecPacketReceived(content_object);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
+ }
+
+ // The packet may have been already sent to the app by the decoder, check
+ // again if it is already received
+ state = state_->getPacketState(
+ segment_number); // state == RECEIVED or TO_BE_RECEIVED
+
+ if (state != PacketState::RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << (is_manifest ? "Received manifest " : "Received data ")
+ << segment_number;
+
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
+ }
+
+ state_->onDataPacketReceived(*content_ptr, compute_stats);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+ }
+
+ onDataPacketReceived(*content_ptr, compute_stats);
}
void RTCTransportProtocol::sendStatsToApp(
@@ -729,33 +911,149 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateReceivedNacks(received_nacks);
stats_->updateReceivedFEC(received_fec);
- stats_->updateAverageWindowSize(current_sync_win_);
- stats_->updateLossRatio(state_->getLossRate());
- stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
+ stats_->updateLossRatio(state_->getPerSecondLossRate());
+ uint64_t rtt = state_->getAvgRTT();
+ stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt));
+
stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
stats_->updateDefinitelyLostData(definitely_lost);
stats_->updateRecoveredData(recovered_losses);
stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
(*stats_summary_)(*socket_->getInterface(), *stats_);
+ bool in_congestion = rc_->inCongestionState();
+ stats_->updateCongestionState(in_congestion);
+ double residual_losses = state_->getResidualLossRate();
+ stats_->updateResidualLossRate(residual_losses);
+ stats_->updateQualityScore(state_->getQualityScore());
+
+ // set alerts
+ if (rtt > MAX_RTT)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+
+ if (in_congestion)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION);
+ else
+ stats_->clearAlert(
+ interface::TransportStatistics::statsAlerts::CONGESTION);
+
+ if (residual_losses > MAX_RESIDUAL_LOSSES)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES);
}
}
-void RTCTransportProtocol::onFecPackets(
- std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
+ Packet::Format format;
+ socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
+ format);
+
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+
for (auto &packet : packets) {
- PacketState state = state_->isReceivedOrLost(packet.first);
- if (state != PacketState::RECEIVED) {
- state_->onPacketRecoveredFec(packet.first);
- ldr_->onPacketRecoveredFec(packet.first);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Recovered packet " << packet.first << " through FEC.";
- reassembly_->reassemble(*packet.second, packet.first);
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Packet" << packet.first << "already received.";
+ uint32_t seq_number = packet.getIndex();
+ uint32_t metadata = packet.getMetadata();
+ fec::buffer buffer = packet.getBuffer();
+
+ PayloadType payload_type = static_cast<PayloadType>(metadata);
+ switch (payload_type) {
+ case PayloadType::DATA:
+ case PayloadType::MANIFEST:
+ break;
+ case PayloadType::UNSPECIFIED:
+ default:
+ payload_type = PayloadType::DATA;
+ break;
}
+
+ switch (state_->getPacketState(seq_number)) {
+ case PacketState::RECEIVED:
+ case PacketState::TO_BE_RECEIVED: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Packet " << seq_number << " already received";
+ break;
+ }
+ default: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Recovered packet " << seq_number << " through FEC";
+
+ if (payload_type == PayloadType::MANIFEST) {
+ name->setSuffix(seq_number);
+
+ auto interest =
+ core::PacketManager<>::getInstance().getPacket<Interest>(format);
+ interest->setName(*name);
+
+ auto content_object = toContentObject(
+ *name, format, payload_type, buffer->data(), buffer->length());
+
+ processManifest(*interest, *content_object);
+ }
+
+ state_->onPacketRecoveredFec(seq_number, buffer->length());
+ ldr_->onPacketRecoveredFec(seq_number);
+
+ if (payload_type == PayloadType::DATA) {
+ verifier_->onDataRecoveredFec(seq_number);
+ reassembly_->reassemble(*buffer, seq_number);
+ }
+
+ break;
+ }
+ }
+ }
+}
+
+void RTCTransportProtocol::processManifest(Interest &interest,
+ ContentObject &manifest) {
+ auth::VerificationPolicy policy = verifier_->processManifest(manifest);
+ indexer_verifier_->applyPolicy(interest, manifest, false, policy);
+}
+
+ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
+ const ContentObject &content_object) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) {
+ return nullptr;
}
+
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize();
+ const uint8_t *payload =
+ content_object.data() + content_object.headerSize() + fec_header_size;
+ size_t payload_size = content_object.payloadSize() - fec_header_size;
+
+ ContentObject::Ptr co =
+ toContentObject(content_object.getName(), content_object.getFormat(),
+ content_object.getPayloadType(), payload, payload_size);
+
+ return co;
+}
+
+ContentObject::Ptr RTCTransportProtocol::toContentObject(
+ const Name &name, Packet::Format format, PayloadType payload_type,
+ const uint8_t *payload, std::size_t payload_size,
+ std::size_t additional_header_size) {
+ // Recreate ContentObject
+ ContentObject::Ptr co =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ format, additional_header_size);
+ co->updateLength(payload_size);
+ co->append(payload_size);
+ co->trimStart(co->headerSize());
+
+ // Copy payload
+ std::memcpy(co->writableData(), payload, payload_size);
+
+ // Restore network headers and some fields
+ co->prepend(co->headerSize());
+ co->setName(name);
+ co->setPayloadType(payload_type);
+
+ return co;
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index e6431264d..37706eb1c 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,10 +15,12 @@
#pragma once
-#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_forwarding_strategy.h>
#include <protocols/rtc/rtc_ldr.h>
#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_reassembly.h>
#include <protocols/rtc/rtc_state.h>
+#include <protocols/rtc/rtc_verifier.h>
#include <protocols/transport_protocol.h>
#include <unordered_set>
@@ -44,6 +46,8 @@ class RTCTransportProtocol : public TransportProtocol {
std::size_t transportHeaderLength() override;
+ auto shared_from_this() { return utils::shared_from(this); }
+
private:
enum class SyncState { catch_up = 0, in_sync = 1, last };
@@ -76,6 +80,7 @@ class RTCTransportProtocol : public TransportProtocol {
void onPacketDropped(Interest &interest, ContentObject &content_object,
const std::error_code &reason) override {}
void onReassemblyFailed(std::uint32_t missing_segment) override {}
+ void processManifest(Interest &interest, ContentObject &manifest);
// interaction with app functions
void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
@@ -84,11 +89,20 @@ class RTCTransportProtocol : public TransportProtocol {
uint32_t received_nacks, uint32_t received_fec);
// FEC functions
- void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets);
+ void onFecPackets(fec::BufferArray &packets);
+
+ // Utils
+ ContentObject::Ptr removeFecHeader(const ContentObject &content_object);
+ ContentObject::Ptr toContentObject(const Name &name, Packet::Format format,
+ PayloadType payload_type,
+ const uint8_t *payload,
+ std::size_t payload_size,
+ std::size_t additional_header_size = 0);
// protocol state
bool start_send_interest_;
SyncState current_state_;
+
// cwin vars
uint32_t current_sync_win_;
uint32_t max_sync_win_;
@@ -120,6 +134,10 @@ class RTCTransportProtocol : public TransportProtocol {
std::shared_ptr<RTCState> state_;
std::shared_ptr<RTCRateControl> rc_;
std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
+ std::shared_ptr<RTCVerifier> verifier_;
+
+ // forwarding strategy selection
+ RTCForwardingStrategy fwd_strategy_;
uint32_t number_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index d04bc1b1f..03efd8e84 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -45,22 +45,22 @@ const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per
const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try
// to aggregate interest in the
// same packet
-const uint32_t MAX_PACING_BATCH = 5;
-// number of interest that we can send inside
-// the loop before they get dropped by the
-// kernel.
+const uint32_t MAX_PACING_BATCH = 5; // number of interest that we can send
+ // inside the loop before they get dropped
+ // by the kernel.
const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As
// for MAX_PACING_BATCH this value was
// computed during tests
const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop
// packet const
-const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes
const uint32_t RTC_INTEREST_LIFETIME = 2000;
// probes sequence range
const uint32_t MIN_PROBE_SEQ = 0xefffffff;
-const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MIN_INIT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MAX_INIT_PROBE_SEQ = 0xf7ffffff - 1;
+const uint32_t MIN_RTT_PROBE_SEQ = MAX_INIT_PROBE_SEQ + 1;
const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
// RTT_PROBE_INTERVAL will be used during the section while
// INIT_RTT_PROBE_INTERVAL is used at the beginning to
@@ -78,15 +78,16 @@ const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms
const uint32_t MAX_PENDING_PROBES = 10;
// congestion
-const double MAX_QUEUING_DELAY = 100.0; // ms
+const double MAX_QUEUING_DELAY = 50.0; // ms
// data from cache
const double MAX_DATA_FROM_CACHE = 0.25; // 25%
// window const
-const uint32_t INITIAL_WIN = 5; // pkts
-const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
-const uint32_t WIN_MIN = 5; // pkts
+const uint32_t INITIAL_WIN = 5; // pkts
+const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
+const uint32_t WIN_MIN = 5; // pkts
+const uint32_t WIN_MIN_WITH_AGGREGARED_DATA = 10; // pkts
const double CATCH_UP_WIN_INCREMENT = 1.2;
// used in rate control
const double WIN_DECREASE_FACTOR = 0.5;
@@ -105,10 +106,8 @@ const double MOVING_AVG_ALPHA = 0.8;
const double MILLI_IN_A_SEC = 1000.0;
const double MICRO_IN_A_SEC = 1000000.0;
-
-const double MAX_CACHED_PACKETS = 262144; // 2^18
- // about 50 sec of traffic at 50Mbps
- // with 1200 bytes packets
+const uint32_t ROUNDS_PER_SEC = (uint32_t)(MILLI_IN_A_SEC / ROUND_LEN);
+const uint32_t ROUNDS_PER_MIN = (uint32_t)ROUNDS_PER_SEC * 60;
const uint32_t MAX_ROUND_WHIOUT_PACKETS =
(20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
@@ -120,12 +119,24 @@ const uint64_t MAX_TIMER_RTX = ~0;
const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
const double CATCH_UP_RTT_INCREMENT = 1.2;
+const double MAX_RESIDUAL_LOSS_RATE = 2.0; // %
+const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5;
// used by producer
const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
-const uint32_t MIN_PRODUCTION_RATE = 10; // pps
- // min prod rate
- // set running several test
+const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window *
+ // rounds in a second
+const uint32_t NACK_DELAY = 1500; // ms
+const uint32_t FEC_PACING_TIME = 5; // ms
+
+// aggregated data consts
+const uint16_t MAX_RTC_PAYLOAD_SIZE = 1200; // bytes
+const uint16_t MAX_AGGREGATED_PACKETS = 5; // pkt
+const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms
+
+// alert thresholds
+const uint32_t MAX_RTT = 200; // ms
+const double MAX_RESIDUAL_LOSSES = 0.05; // %
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
index c098088a3..b3abf5ea8 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.cc
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,14 +13,17 @@
* limitations under the License.
*/
+#include <hicn/transport/utils/chrono_typedefs.h>
#include <protocols/rtc/rtc_data_path.h>
#include <stdlib.h>
#include <algorithm>
#include <cfloat>
#include <chrono>
+#include <cmath>
#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+#define AVG_RTT_TIME 1000 // (ms) 1sec
namespace transport {
@@ -32,6 +35,8 @@ RTCDataPath::RTCDataPath(uint32_t path_id)
: path_id_(path_id),
min_rtt(UINT_MAX),
prev_min_rtt(UINT_MAX),
+ max_rtt(0),
+ prev_max_rtt(0),
min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
// real OWD, but the measured one, that depends on the
// clock of sender and receiver. the only meaningful
@@ -46,20 +51,46 @@ RTCDataPath::RTCDataPath(uint32_t path_id)
largest_recv_seq_(0),
largest_recv_seq_time_(0),
avg_inter_arrival_(DBL_MAX),
+ rtt_sum_(0),
+ last_avg_rtt_compute_(0),
+ rtt_samples_(0),
+ avg_rtt_(0.0),
received_nacks_(false),
- received_packets_(false),
+ received_packets_(0),
rounds_without_packets_(0),
last_received_data_packet_(0),
- RTT_history_(HISTORY_LEN),
+ min_RTT_history_(HISTORY_LEN),
+ max_RTT_history_(HISTORY_LEN),
OWD_history_(HISTORY_LEN){};
-void RTCDataPath::insertRttSample(uint64_t rtt) {
- // for the rtt we only keep track of the min one
+void RTCDataPath::insertRttSample(
+ const utils::SteadyTime::Milliseconds& rtt_milliseconds, bool is_probe) {
+ // compute min rtt
+ uint64_t rtt = rtt_milliseconds.count();
if (rtt < min_rtt) min_rtt = rtt;
- last_received_data_packet_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ last_received_data_packet_ = now;
+
+ // compute avg rtt
+ if (is_probe) {
+ // max rtt is computed only on probes to avoid to take into account the
+ // production time at the server
+ if (rtt > max_rtt) max_rtt = rtt;
+
+ rtt_sum_ += rtt;
+ rtt_samples_++;
+ }
+
+ if ((now - last_avg_rtt_compute_) >= AVG_RTT_TIME) {
+ // compute a new avg rtt
+ // if rtt_samples_ = 0 keep the same rtt
+ if (rtt_samples_ != 0) avg_rtt_ = (double)rtt_sum_ / (double)rtt_samples_;
+
+ rtt_sum_ = 0;
+ rtt_samples_ = 0;
+ last_avg_rtt_compute_ = now;
+ }
}
void RTCDataPath::insertOwdSample(int64_t owd) {
@@ -87,15 +118,13 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
// owd is computed only for valid data packets so we count only
// this for decide if we recevie traffic or not
- received_packets_ = true;
+ received_packets_++;
}
void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
// got packet in sequence, compute gap
if (largest_recv_seq_ == (segment_number - 1)) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t delta = now - largest_recv_seq_time_;
largest_recv_seq_ = segment_number;
largest_recv_seq_time_ = now;
@@ -110,10 +139,7 @@ void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
// ooo packet, update the stasts if needed
if (largest_recv_seq_ <= segment_number) {
largest_recv_seq_ = segment_number;
- largest_recv_seq_time_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ largest_recv_seq_time_ = utils::SteadyTime::nowMs().count();
}
}
@@ -146,10 +172,20 @@ void RTCDataPath::roundEnd() {
min_rtt = prev_min_rtt;
}
+ // same for max_rtt
+ if (max_rtt != 0) {
+ prev_max_rtt = max_rtt;
+ } else {
+ max_rtt = prev_max_rtt;
+ }
+
if (min_rtt == 0) min_rtt = 1;
+ if (max_rtt == 0) max_rtt = 1;
- RTT_history_.pushBack(min_rtt);
+ min_RTT_history_.pushBack(min_rtt);
+ max_RTT_history_.pushBack(max_rtt);
min_rtt = UINT_MAX;
+ max_rtt = 0;
// do the same for min owd
if (min_owd != INT_MAX) {
@@ -163,32 +199,47 @@ void RTCDataPath::roundEnd() {
min_owd = INT_MAX;
}
- if (!received_packets_)
+ if (received_packets_ == 0)
rounds_without_packets_++;
else
rounds_without_packets_ = 0;
- received_packets_ = false;
+ received_packets_ = 0;
}
uint32_t RTCDataPath::getPathId() { return path_id_; }
-double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+double RTCDataPath::getQueuingDealy() {
+ if (queuing_delay == DBL_MAX) return 0;
+ return queuing_delay;
+}
uint64_t RTCDataPath::getMinRtt() {
- if (RTT_history_.size() != 0) return RTT_history_.begin();
+ if (min_RTT_history_.size() != 0) return min_RTT_history_.begin();
+ return 0;
+}
+
+uint64_t RTCDataPath::getAvgRtt() { return std::round(avg_rtt_); }
+
+uint64_t RTCDataPath::getMaxRtt() {
+ if (max_RTT_history_.size() != 0) return max_RTT_history_.begin();
return 0;
}
int64_t RTCDataPath::getMinOwd() {
if (OWD_history_.size() != 0) return OWD_history_.begin();
- return 0;
+ return INT_MAX;
}
double RTCDataPath::getJitter() { return jitter_; }
uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; }
-void RTCDataPath::clearRtt() { RTT_history_.clear(); }
+uint32_t RTCDataPath::getPacketsLastRound() { return received_packets_; }
+
+void RTCDataPath::clearRtt() {
+ min_RTT_history_.clear();
+ max_RTT_history_.clear();
+}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
index c5c37fc0d..5afbbb87f 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.h
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,7 +15,9 @@
#pragma once
+#include <hicn/transport/utils/chrono_typedefs.h>
#include <stdint.h>
+#include <utils/max_filter.h>
#include <utils/min_filter.h>
#include <climits>
@@ -34,19 +36,23 @@ class RTCDataPath {
RTCDataPath(uint32_t path_id);
public:
- void insertRttSample(uint64_t rtt);
+ void insertRttSample(const utils::SteadyTime::Milliseconds &rtt,
+ bool is_probe);
void insertOwdSample(int64_t owd);
void computeInterArrivalGap(uint32_t segment_number);
void receivedNack();
uint32_t getPathId();
uint64_t getMinRtt();
+ uint64_t getAvgRtt();
+ uint64_t getMaxRtt();
double getQueuingDealy();
double getInterArrivalGap();
double getJitter();
bool isActive();
bool pathToProducer();
uint64_t getLastPacketTS();
+ uint32_t getPacketsLastRound();
void clearRtt();
@@ -60,6 +66,9 @@ class RTCDataPath {
uint64_t min_rtt;
uint64_t prev_min_rtt;
+ uint64_t max_rtt;
+ uint64_t prev_max_rtt;
+
int64_t min_owd;
int64_t prev_min_owd;
@@ -74,19 +83,26 @@ class RTCDataPath {
uint64_t largest_recv_seq_time_;
double avg_inter_arrival_;
+ // compute the avg rtt over one sec
+ uint64_t rtt_sum_;
+ uint64_t last_avg_rtt_compute_;
+ uint32_t rtt_samples_;
+ double avg_rtt_;
+
// flags to check if a path is active
// we considere a path active if it reaches a producer
//(not a cache) --aka we got at least one nack on this path--
// and if we receives packets
bool received_nacks_;
- bool received_packets_;
- uint8_t rounds_without_packets_; // if we don't get any packet
+ uint32_t received_packets_;
+ uint32_t rounds_without_packets_; // if we don't get any packet
// for MAX_ROUNDS_WITHOUT_PKTS
// we consider the path inactive
uint64_t last_received_data_packet_; // timestamp for the last data received
// on this path
- utils::MinFilter<uint64_t> RTT_history_;
+ utils::MinFilter<uint64_t> min_RTT_history_;
+ utils::MaxFilter<uint64_t> max_RTT_history_;
utils::MinFilter<int64_t> OWD_history_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
new file mode 100644
index 000000000..9503eed3e
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/notification.h>
+#include <protocols/rtc/rtc_forwarding_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace transport::interface;
+
+RTCForwardingStrategy::RTCForwardingStrategy()
+ : init_(false),
+ forwarder_set_(false),
+ selected_strategy_(NONE),
+ current_strategy_(NONE),
+ rounds_since_last_set_(0),
+ portal_(nullptr),
+ state_(nullptr) {}
+
+RTCForwardingStrategy::~RTCForwardingStrategy() {}
+
+void RTCForwardingStrategy::setCallback(interface::StrategyCallback* callback) {
+ callback_ = callback;
+}
+
+void RTCForwardingStrategy::initFwdStrategy(
+ std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state,
+ strategy_t strategy) {
+ init_ = true;
+ selected_strategy_ = strategy;
+ if (strategy == BOTH)
+ current_strategy_ = BEST_PATH;
+ else
+ current_strategy_ = strategy;
+ rounds_since_last_set_ = 0;
+ prefix_ = prefix;
+ portal_ = portal;
+ state_ = state;
+}
+
+void RTCForwardingStrategy::checkStrategy() {
+ if (*callback_) {
+ strategy_t used_strategy = selected_strategy_;
+ if (used_strategy == BOTH) used_strategy = current_strategy_;
+ assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
+ used_strategy == NONE);
+
+ notification::ForwardingStrategy strategy =
+ notification::ForwardingStrategy::NONE;
+ switch (used_strategy) {
+ case BEST_PATH:
+ strategy = notification::ForwardingStrategy::BEST_PATH;
+ break;
+ case REPLICATION:
+ strategy = notification::ForwardingStrategy::REPLICATION;
+ break;
+ default:
+ break;
+ }
+
+ (*callback_)(strategy);
+ }
+
+ if (!init_) return;
+
+ if (selected_strategy_ == NONE) return;
+
+ if (selected_strategy_ == BEST_PATH) {
+ checkStrategyBestPath();
+ return;
+ }
+
+ if (selected_strategy_ == REPLICATION) {
+ checkStrategyReplication();
+ return;
+ }
+
+ checkStrategyBoth();
+}
+
+void RTCForwardingStrategy::checkStrategyBestPath() {
+ if (!forwarder_set_) {
+ setStrategy(BEST_PATH);
+ forwarder_set_ = true;
+ return;
+ }
+
+ uint8_t qs = state_->getQualityScore();
+
+ if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
+
+ // try to switch path
+ setStrategy(BEST_PATH);
+}
+
+void RTCForwardingStrategy::checkStrategyReplication() {
+ if (!forwarder_set_) {
+ setStrategy(REPLICATION);
+ forwarder_set_ = true;
+ return;
+ }
+
+ // here we have nothing to do for the moment
+ return;
+}
+
+void RTCForwardingStrategy::checkStrategyBoth() {
+ if (!forwarder_set_) {
+ setStrategy(current_strategy_);
+ forwarder_set_ = true;
+ return;
+ }
+
+ checkStrategyBestPath();
+
+ // TODO
+ // for the moment we use only best path.
+ // but later:
+ // 1. if both paths are bad use replication
+ // 2. while using replication compute the effectiveness. if the majority of
+ // the packets are coming from a single path, try to use bestpath
+}
+
+void RTCForwardingStrategy::setStrategy(strategy_t strategy) {
+ rounds_since_last_set_ = 0;
+ current_strategy_ = strategy;
+ portal_->setForwardingStrategy(prefix_,
+ string_strategies_[current_strategy_]);
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
new file mode 100644
index 000000000..821b28051
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/portal.h>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <array>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCForwardingStrategy {
+ public:
+ enum strategy_t {
+ BEST_PATH,
+ REPLICATION,
+ BOTH,
+ NONE,
+ };
+
+ RTCForwardingStrategy();
+ ~RTCForwardingStrategy();
+
+ void initFwdStrategy(std::shared_ptr<core::Portal> portal,
+ core::Prefix& prefix, RTCState* state,
+ strategy_t strategy);
+
+ void checkStrategy();
+ void setCallback(interface::StrategyCallback* callback);
+
+ private:
+ void checkStrategyBestPath();
+ void checkStrategyReplication();
+ void checkStrategyBoth();
+
+ void setStrategy(strategy_t strategy);
+
+ std::array<std::string, 4> string_strategies_ = {"bestpath", "replication",
+ "both", "none"};
+
+ bool init_; // true if all val are initializes
+ bool forwarder_set_; // true if the strategy is been set at least
+ // once
+ strategy_t selected_strategy_; // this is the strategy selected using socket
+ // options. this can also be equal to BOTH
+ strategy_t current_strategy_; // if both strategies can be used this
+ // indicates the one that is currently in use
+ // that can be only replication or best path
+ uint32_t rounds_since_last_set_;
+ core::Prefix prefix_;
+ std::shared_ptr<core::Portal> portal_;
+ RTCState* state_;
+ interface::StrategyCallback* callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h
index 4aee242bb..cda156b22 100644
--- a/libtransport/src/protocols/rtc/rtc_indexer.h
+++ b/libtransport/src/protocols/rtc/rtc_indexer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -18,8 +18,10 @@
#include <protocols/errors.h>
#include <protocols/fec_utils.h>
#include <protocols/indexer.h>
+#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/transport_protocol.h>
+#include <utils/suffix_strategy.h>
#include <deque>
@@ -54,7 +56,7 @@ class RtcIndexer : public Indexer {
n_fec_ = 0;
}
- uint32_t checkNextSuffix() override { return next_suffix_; }
+ uint32_t checkNextSuffix() const override { return next_suffix_; }
uint32_t getNextSuffix() override {
if (isFec(next_suffix_)) {
@@ -77,7 +79,7 @@ class RtcIndexer : public Indexer {
first_suffix_ = suffix % LIMIT;
}
- uint32_t getFirstSuffix() override { return first_suffix_; }
+ uint32_t getFirstSuffix() const override { return first_suffix_; }
uint32_t jumpToIndex(uint32_t index) override {
next_suffix_ = index % LIMIT;
@@ -87,30 +89,8 @@ class RtcIndexer : public Indexer {
void onContentObject(core::Interest &interest,
core::ContentObject &content_object,
bool reassembly) override {
- setVerifier();
- auto ret = verifier_->verifyPackets(&content_object);
-
- switch (ret) {
- case auth::VerificationPolicy::ACCEPT: {
- if (reassembly) {
- reassembly_->reassemble(content_object);
- }
- break;
- }
-
- case auth::VerificationPolicy::UNKNOWN:
- case auth::VerificationPolicy::DROP: {
- transport_->onPacketDropped(
- interest, content_object,
- make_error_code(protocol_error::verification_failed));
- break;
- }
-
- case auth::VerificationPolicy::ABORT: {
- transport_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
+ if (reassembly) {
+ reassembly_->reassemble(content_object);
}
}
@@ -120,13 +100,12 @@ class RtcIndexer : public Indexer {
uint32_t getNextReassemblySegment() override {
throw errors::RuntimeException(
"Get reassembly segment called on rtc indexer. RTC indexer does not "
- "provide "
- "reassembly.");
+ "provide reassembly.");
}
bool isFinalSuffixDiscovered() override { return true; }
- uint32_t getFinalSuffix() override { return LIMIT; }
+ uint32_t getFinalSuffix() const override { return LIMIT; }
void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; }
@@ -137,13 +116,13 @@ class RtcIndexer : public Indexer {
n_current_fec_ = n_fec_;
}
- uint32_t getNFec() override { return n_fec_; }
+ uint32_t getNFec() const override { return n_fec_; }
bool isFec(uint32_t index) override {
return isFec(fec_type_, index, first_suffix_);
}
- double getFecOverhead() override {
+ double getFecOverhead() const override {
if (fec_type_ == fec::FECType::UNKNOWN) {
return 0;
}
@@ -152,7 +131,7 @@ class RtcIndexer : public Indexer {
return (double)n_fec_ / k;
}
- double getMaxFecOverhead() override {
+ double getMaxFecOverhead() const override {
if (fec_type_ == fec::FECType::UNKNOWN) {
return 0;
}
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index f0de48871..1ca1cf48d 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -16,6 +16,12 @@
#include <glog/logging.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rs_delay.h>
+#include <protocols/rtc/rtc_rs_fec_only.h>
+#include <protocols/rtc/rtc_rs_low_rate.h>
+#include <protocols/rtc/rtc_rs_recovery_off.h>
+#include <protocols/rtc/rtc_rs_rtx_only.h>
+#include <protocols/rtc/rtc_state.h>
#include <algorithm>
#include <unordered_set>
@@ -27,146 +33,115 @@ namespace protocol {
namespace rtc {
RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
- Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service)
- : rtx_on_(false),
- fec_on_(false),
- next_rtx_timer_(MAX_TIMER_RTX),
- last_event_(0),
- sentinel_timer_interval_(MAX_TIMER_RTX),
- indexer_(indexer),
- send_rtx_callback_(std::move(callback)) {
- timer_ = std::make_unique<asio::steady_timer>(io_service);
- sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
+ Indexer *indexer, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies type,
+ RecoveryStrategy::SendRtxCallback &&callback,
+ interface::StrategyCallback *external_callback) {
+ rs_type_ = type;
+ if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
+ rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
+ indexer, std::move(callback), io_service, external_callback);
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ rs_ = std::make_shared<RecoveryStrategyDelayBased>(
+ indexer, std::move(callback), io_service, external_callback);
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ rs_ = std::make_shared<RecoveryStrategyFecOnly>(
+ indexer, std::move(callback), io_service, external_callback);
+ } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ rs_ = std::make_shared<RecoveryStrategyLowRate>(
+ indexer, std::move(callback), io_service, external_callback);
+ } else {
+ // default
+ rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
+ rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
+ indexer, std::move(callback), io_service, external_callback);
+ }
}
RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
-void RTCLossDetectionAndRecovery::turnOnRTX() {
- rtx_on_ = true;
- scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
-}
-
-void RTCLossDetectionAndRecovery::turnOffRTX() {
- rtx_on_ = false;
- clear();
-}
-
-uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) {
- uint32_t current_fec = indexer_->getNFec();
- double current_loss_rate = state_->getLossRate();
- double last_loss_rate = state_->getLastRoundLossRate();
-
- // when in sync ask for fec only if there are losses for 2 rounds
- if (in_sync && current_fec == 0 &&
- (current_loss_rate == 0 || last_loss_rate == 0))
- return 0;
-
- double loss_rate = state_->getMaxLossRate() * 1.5;
-
- if (!in_sync && loss_rate == 0) loss_rate = 0.05;
- if (loss_rate > 0.5) loss_rate = 0.5;
-
- double exp_losses = (double)k_ * loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate));
-
- if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_;
-
- return fec_to_ask;
+void RTCLossDetectionAndRecovery::changeRecoveryStrategy(
+ interface::RtcTransportRecoveryStrategies type) {
+ if (type == rs_type_) return;
+
+ rs_type_ = type;
+ if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
+ rs_ =
+ std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ rs_ = std::make_shared<RecoveryStrategyLowRate>(std::move(*(rs_.get())));
+ } else {
+ // default
+ rs_ = std::make_shared<RecoveryStrategyRtxOnly>(std::move(*(rs_.get())));
+ }
}
void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
- uint64_t rtt = state_->getRTT();
- if (!fec_on_ && rtt >= 100) {
- // turn on fec, here we may have no info so ask for all packets
- fec_on_ = true;
- turnOffRTX();
- indexer_->setNFec(computeFecPacketsToAsk(in_sync));
- return;
- }
-
- if (fec_on_ && rtt > 80) {
- // keep using fec, maybe update it
- indexer_->setNFec(computeFecPacketsToAsk(in_sync));
- return;
- }
-
- if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
- // turn on rtx
- fec_on_ = false;
- indexer_->setNFec(0);
- turnOnRTX();
- return;
- }
+ rs_->incRoundId();
+ rs_->onNewRound(in_sync);
}
-void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
- // always add timeouts to the RTX list to avoid to send the same packet as if
- // it was not a rtx
- addToRetransmissions(seq, seq + 1);
- last_event_ = getNow();
+void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
+ if (!lost) {
+ detectLoss(seq, seq + 1);
+ } else {
+ rs_->onLostTimeout(seq);
+ }
}
void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
- // if an RTX is scheduled for a packet recovered using FEC delete it
- deleteRtx(seq);
- recover_with_fec_.erase(seq);
+ rs_->receivedPacket(seq);
}
void RTCLossDetectionAndRecovery::onDataPacketReceived(
const core::ContentObject &content_object) {
- last_event_ = getNow();
-
uint32_t seq = content_object.getName().getSuffix();
- if (deleteRtx(seq)) {
- state_->onPacketRecoveredRtx(seq);
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "received data. add from "
- << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
- addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
- }
+ bool is_rtx = rs_->isRtx(seq);
+ rs_->receivedPacket(seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received data. add from "
+ << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
+ if (!is_rtx)
+ detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq);
}
void RTCLossDetectionAndRecovery::onNackPacketReceived(
const core::ContentObject &nack) {
- last_event_ = getNow();
-
- uint32_t seq = nack.getName().getSuffix();
-
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
- uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_seq = nack_pkt->getProductionSegment();
+ uint32_t seq = nack.getName().getSuffix();
- if (production_seq > seq) {
- // this is a past nack, all data before productionSeq are lost. if
- // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to
- // recover any packet. If this is not the case we can try to recover the
- // packets between state_->getHighestSeqReceivedInOrder() and productionSeq.
- // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with
- // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
- // 14 that are not arrived yet
- deleteRtx(seq);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from "
- << state_->getHighestSeqReceivedInOrder() + 1
- << " to " << production_seq;
- addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
- production_seq);
- } else {
- // future nack. here there should be a gap between the last data received
- // and this packet and is it possible to recover the packets between the
- // last received data and the production seq. we should not use the seq
- // number of the nack since we know that is too early to ask for this seq
- // number
- // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
- // with productionSeq = 18. this says that all the packets between 12 and 18
- // may got lost and we should ask them
- deleteRtx(seq);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from "
- << state_->getHighestSeqReceivedInOrder() + 1
- << " to " << production_seq;
- addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
- production_seq);
- }
+ // received a nack. we can try to recover all data packets between the last
+ // received data and the production seq in the nack. this is similar to the
+ // recption of a probe
+ // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
+ // with productionSeq = 18. this says that all the packets between 12 and 18
+ // may got lost and we should ask them
+
+ rs_->receivedPacket(seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received nack. add from "
+ << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
+ << production_seq;
+ detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
}
void RTCLossDetectionAndRecovery::onProbePacketReceived(
@@ -174,336 +149,38 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived(
// we don't log the reception of a probe packet for the sentinel timer because
// probes are not taken into account into the sync window. we use them as
// future nacks to detect possible packets lost
- struct nack_packet_t *probe_pkt =
- (struct nack_packet_t *)probe.getPayload()->data();
- uint32_t production_seq = probe_pkt->getProductionSegement();
+
+ uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg;
+
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "received probe. add from "
- << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq;
+ << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
+ << production_seq;
- addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
- production_seq);
+ detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
}
-void RTCLossDetectionAndRecovery::clear() {
- rtx_state_.clear();
- rtx_timers_.clear();
- sentinel_timer_->cancel();
- if (next_rtx_timer_ != MAX_TIMER_RTX) {
- next_rtx_timer_ = MAX_TIMER_RTX;
- timer_->cancel();
- }
-}
+void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) {
+ if (start >= stop) return;
-void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
- uint32_t stop) {
// skip nacked packets
- if (start <= state_->getLastSeqNacked()) {
- start = state_->getLastSeqNacked() + 1;
+ if (start <= rs_->getState()->getLastSeqNacked()) {
+ start = rs_->getState()->getLastSeqNacked() + 1;
}
// skip received or lost packets
- if (start <= state_->getHighestSeqReceivedInOrder()) {
- start = state_->getHighestSeqReceivedInOrder() + 1;
+ if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) {
+ start = rs_->getState()->getHighestSeqReceivedInOrder() + 1;
}
for (uint32_t seq = start; seq < stop; seq++) {
- if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
- if (rtx_on_) {
- if (!indexer_->isFec(seq)) {
- // handle it with rtx
- if (!isRtx(seq)) {
- state_->onLossDetected(seq);
- rtxState state;
- state.first_send_ = state_->getInterestSentTime(seq);
- if (state.first_send_ == 0) // this interest was never sent before
- state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
- state.rtx_count_ = 0;
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "Add " << seq << " to retransmissions. next rtx is %lu "
- << state.next_send_ - getNow();
- rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
- rtx_timers_.insert(
- std::pair<uint64_t, uint32_t>(state.next_send_, seq));
- }
- } else {
- // is fec, do not send it
- auto it = recover_with_fec_.find(seq);
- if (it == recover_with_fec_.end()) {
- state_->onLossDetected(seq);
- recover_with_fec_.insert(seq);
- }
- }
- } else {
- // keep track of losses but recover with FEC
- auto it = recover_with_fec_.find(seq);
- if (it == recover_with_fec_.end()) {
- state_->onLossDetected(seq);
- recover_with_fec_.insert(seq);
- }
- }
- }
- }
- scheduleNextRtx();
-}
-
-uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
- bool new_rtx) {
- uint64_t now = getNow();
- if (new_rtx) {
- // for the new rtx we wait one estimated IAT after the loss detection. this
- // is bacause, assuming that packets arrive with a constant IAT, we should
- // get a new packet every IAT
- double prod_rate = state_->getProducerRate();
- uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
- uint32_t jitter = 0;
-
- if (prod_rate != 0) {
- double packet_size = state_->getAveragePacketSize();
- estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
- jitter = ceil(state_->getJitter());
- }
-
- uint32_t wait = estimated_iat + jitter;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "first rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter;
-
- return now + wait;
- } else {
- // wait one RTT
- // however if the IAT is larger than the RTT, wait one IAT
- uint32_t wait = SENTINEL_TIMER_INTERVAL;
-
- double prod_rate = state_->getProducerRate();
- if (prod_rate == 0) {
- return now + SENTINEL_TIMER_INTERVAL;
- }
-
- double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
-
- uint64_t rtt = state_->getRTT();
- if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = rtt;
-
- if (estimated_iat > rtt) wait = estimated_iat;
-
- uint32_t jitter = ceil(state_->getJitter());
- wait += jitter;
-
- // it may happen that the channel is congested and we have some additional
- // queuing delay to take into account
- uint32_t queue = ceil(state_->getQueuing());
- wait += queue;
-
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "next rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter << " queue = " << queue;
-
- return now + wait;
- }
-}
-
-void RTCLossDetectionAndRecovery::retransmit() {
- if (rtx_timers_.size() == 0) return;
-
- uint64_t now = getNow();
-
- auto it = rtx_timers_.begin();
- std::unordered_set<uint32_t> lost_pkt;
- uint32_t sent_counter = 0;
- while (it != rtx_timers_.end() && it->first <= now &&
- sent_counter < MAX_RTX_IN_BATCH) {
- uint32_t seq = it->second;
- auto rtx_it =
- rtx_state_.find(seq); // this should always return a valid iter
- if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
- (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
- seq < state_->getLastSeqNacked()) {
- // max rtx reached or packet too old or packet nacked, this packet is lost
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "packet " << seq << " lost because 1) max rtx: "
- << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: "
- << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE)
- << " 3) nacked: " << (seq < state_->getLastSeqNacked());
- lost_pkt.insert(seq);
- it++;
- } else {
- // resend the packet
- state_->onRetransmission(seq);
- double prod_rate = state_->getProducerRate();
- if (prod_rate != 0) rtx_it->second.rtx_count_++;
- rtx_it->second.next_send_ = computeNextSend(seq, false);
- it = rtx_timers_.erase(it);
- rtx_timers_.insert(
- std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "send rtx for sequence " << seq << ", next send in "
- << (rtx_it->second.next_send_ - now);
- send_rtx_callback_(seq);
- sent_counter++;
- }
- }
-
- // remove packets if needed
- for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
- uint32_t seq = *lost_it;
- state_->onPacketLost(seq);
- deleteRtx(seq);
- }
-}
-
-void RTCLossDetectionAndRecovery::scheduleNextRtx() {
- if (rtx_timers_.size() == 0) {
- // all the rtx were removed, reset timer
- next_rtx_timer_ = MAX_TIMER_RTX;
- return;
- }
-
- // check if timer is alreay set
- if (next_rtx_timer_ != MAX_TIMER_RTX) {
- // a new check for rtx is already scheduled
- if (next_rtx_timer_ > rtx_timers_.begin()->first) {
- // we need to re-schedule it
- timer_->cancel();
- } else {
- // wait for the next timer
- return;
- }
- }
-
- // set a new timer
- next_rtx_timer_ = rtx_timers_.begin()->first;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- uint64_t wait = 1;
- if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
- wait = next_rtx_timer_ - now;
-
- std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
- timer_->expires_from_now(std::chrono::milliseconds(wait));
- timer_->async_wait([self](std::error_code ec) {
- if (ec) return;
- if (auto s = self.lock()) {
- s->retransmit();
- s->next_rtx_timer_ = MAX_TIMER_RTX;
- s->scheduleNextRtx();
- }
- });
-}
-
-bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) {
- auto it_rtx = rtx_state_.find(seq);
- if (it_rtx == rtx_state_.end()) return false; // rtx not found
-
- uint64_t ts = it_rtx->second.next_send_;
- auto it_timers = rtx_timers_.find(ts);
- while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
- if (it_timers->second == seq) {
- rtx_timers_.erase(it_timers);
- break;
- }
- it_timers++;
- }
-
- bool lost = it_rtx->second.rtx_count_ > 0;
- rtx_state_.erase(it_rtx);
-
- return lost;
-}
-
-void RTCLossDetectionAndRecovery::scheduleSentinelTimer(
- uint64_t expires_from_now) {
- std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
- sentinel_timer_->expires_from_now(
- std::chrono::milliseconds(expires_from_now));
- sentinel_timer_->async_wait([self](std::error_code ec) {
- if (ec) return;
- if (auto s = self.lock()) {
- s->sentinelTimer();
- }
- });
-}
-
-void RTCLossDetectionAndRecovery::sentinelTimer() {
- uint64_t now = getNow();
-
- bool expired = false;
- bool sent = false;
- if ((now - last_event_) >= sentinel_timer_interval_) {
- // at least a sentinel_timer_interval_ elapsed since last event
- expired = true;
- if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
- // this happens at the beginning (or if the producer stops for some
- // reason) we need to keep sending interest 0 until we get an answer
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "sentinel timer: the producer is not active, send packet 0";
- state_->onRetransmission(0);
- send_rtx_callback_(0);
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, "
- "send the 10 oldest packets";
- sent = true;
- uint32_t rtx = 0;
- auto it = state_->getPendingInterestsMapBegin();
- auto end = state_->getPendingInterestsMapEnd();
- while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
- uint32_t seq = it->first;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "sentinel timer, add " << seq << " to the rtx list";
- addToRetransmissions(seq, seq + 1);
- rtx++;
- it++;
- }
- }
- } else {
- // sentinel timer did not expire because we registered at least one event
- }
-
- uint32_t next_timer;
- double prod_rate = state_->getProducerRate();
- if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL;
- next_timer = SENTINEL_TIMER_INTERVAL;
- } else {
- double prod_rate = state_->getProducerRate();
- double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
- uint32_t jitter = ceil(state_->getJitter());
-
- // try to reduce the number of timers if the estimated IAT is too small
- next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "next sentinel in " << next_timer
- << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0)
- << ", iat: " << estimated_iat << ", jitter: " << jitter;
-
- if (!expired) {
- // discount the amout of time that is already passed
- uint32_t discount = now - last_event_;
- if (next_timer > discount) {
- next_timer = next_timer - discount;
- } else {
- // in this case we trigger the timer in 1 ms
- next_timer = 1;
+ if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) {
+ if (rs_->lossDetected(seq)) {
+ rs_->getState()->onLossDetected(seq);
}
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer;
- } else if (sent) {
- // wait at least one producer stats interval + owd to check if the
- // production rate is reducing.
- uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
- next_timer = std::max(next_timer, min_wait);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "wait for updates from prod, next timer: " << next_timer;
}
}
-
- scheduleSentinelTimer(next_timer);
}
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index 1b9f9afd6..e7f8ce5db 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,16 +15,14 @@
#pragma once
#include <hicn/transport/config.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+// RtcTransportRecoveryStrategies
#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
-#include <protocols/indexer.h>
-#include <protocols/rtc/rtc_consts.h>
-#include <protocols/rtc/rtc_state.h>
+#include <protocols/rtc/rtc_recovery_strategy.h>
#include <functional>
-#include <map>
-#include <unordered_map>
namespace transport {
@@ -34,91 +32,45 @@ namespace rtc {
class RTCLossDetectionAndRecovery
: public std::enable_shared_from_this<RTCLossDetectionAndRecovery> {
- struct rtx_state_ {
- uint64_t first_send_;
- uint64_t next_send_;
- uint32_t rtx_count_;
- };
-
- using rtxState = struct rtx_state_;
- using SendRtxCallback = std::function<void(uint32_t)>;
-
public:
- RTCLossDetectionAndRecovery(Indexer *indexer, SendRtxCallback &&callback,
- asio::io_service &io_service);
+ RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies type,
+ RecoveryStrategy::SendRtxCallback &&callback,
+ interface::StrategyCallback *external_callback);
~RTCLossDetectionAndRecovery();
- void setState(std::shared_ptr<RTCState> state) { state_ = state; }
- void setFecParams(uint32_t n, uint32_t k) {
- n_ = n;
- k_ = k;
+ void setState(RTCState *state) { rs_->setState(state); }
+ void setRateControl(RTCRateControl *rateControl) {
+ rs_->setRateControl(rateControl);
}
- void turnOnRTX();
- void turnOffRTX();
- bool isRtxOn() { return rtx_on_; }
+
+ void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
+
+ void turnOnRecovery() { rs_->tunrOnRecovery(); }
+ bool isRtxOn() { return rs_->isRtxOn(); }
+
+ void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type);
void onNewRound(bool in_sync);
- void onTimeout(uint32_t seq);
+ void onTimeout(uint32_t seq, bool lost);
void onPacketRecoveredFec(uint32_t seq);
void onDataPacketReceived(const core::ContentObject &content_object);
void onNackPacketReceived(const core::ContentObject &nack);
void onProbePacketReceived(const core::ContentObject &probe);
- void clear();
+ void clear() { rs_->clear(); }
- bool isRtx(uint32_t seq) {
- if (rtx_state_.find(seq) != rtx_state_.end()) return true;
- return false;
+ bool isRtx(uint32_t seq) { return rs_->isRtx(seq); }
+ bool isPossibleLossWithNoRtx(uint32_t seq) {
+ return rs_->isPossibleLossWithNoRtx(seq);
}
private:
- void addToRetransmissions(uint32_t start, uint32_t stop);
- uint64_t computeNextSend(uint32_t seq, bool new_rtx);
- void retransmit();
- void scheduleNextRtx();
- bool deleteRtx(uint32_t seq);
- void scheduleSentinelTimer(uint64_t expires_from_now);
- void sentinelTimer();
- uint32_t computeFecPacketsToAsk(bool in_sync);
-
- uint64_t getNow() {
- using namespace std::chrono;
- uint64_t now =
- duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
- .count();
- return now;
- }
-
- // this map keeps track of the retransmitted interest, ordered from the oldest
- // to the newest one. the state contains the timer of the first send of the
- // interest (from pendingIntetests_), the timer of the next send (key of the
- // multimap) and the number of rtx
- std::map<uint32_t, rtxState> rtx_state_;
- // this map stored the rtx by timer. The key is the time at which the rtx
- // should be sent, and the val is the interest seq number
- std::multimap<uint64_t, uint32_t> rtx_timers_;
-
- // lost packets that will be recovered with fec
- std::unordered_set<uint32_t> recover_with_fec_;
-
- bool rtx_on_;
- bool fec_on_;
- uint64_t next_rtx_timer_;
- uint64_t last_event_;
- uint64_t sentinel_timer_interval_;
-
- // fec params
- uint32_t n_;
- uint32_t k_;
-
- std::unique_ptr<asio::steady_timer> timer_;
- std::unique_ptr<asio::steady_timer> sentinel_timer_;
- std::shared_ptr<RTCState> state_;
-
- Indexer *indexer_;
+ void detectLoss(uint32_t start, uint32_t stop);
- SendRtxCallback send_rtx_callback_;
+ interface::RtcTransportRecoveryStrategies rs_type_;
+ std::shared_ptr<RecoveryStrategy> rs_;
};
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
index 7dc2f82c3..391aedfc6 100644
--- a/libtransport/src/protocols/rtc/rtc_packet.h
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -24,6 +24,27 @@
* +-----------------------------------------+
*/
+/* aggregated packets
+ * +---------------------------------+
+ * |c| #pkts | len1 | len2 | .... |
+ * +----------------------------------
+ *
+ * +---------------------------------+
+ * |c| #pkts | resv | len 1 |
+ * +----------------------------------
+ *
+ * aggregated packets header.
+ * header position. just after the data packet header
+ *
+ * c: 1 bit: 0 8bit encoding, 1 16bit encoding
+ * #pkts: 7 bits: number of application packets contained
+ * 8bits encoding:
+ * lenX: 8 bits: len in bites of packet X
+ * 16bits econding:
+ * resv: 8 bits: reserved field (unused)
+ * lenX: 16bits: len in bytes of packet X
+ */
+
#pragma once
#ifndef _WIN32
#include <arpa/inet.h>
@@ -31,6 +52,8 @@
#include <hicn/transport/portability/win_portability.h>
#endif
+#include <cstring>
+
namespace transport {
namespace protocol {
@@ -82,8 +105,144 @@ struct nack_packet_t {
inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
- inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
- inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+ inline uint32_t getProductionSegment() const { return ntohl(prod_seg); }
+ inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); }
+};
+
+class AggrPktHeader {
+ public:
+ // XXX buf always point to the payload after the data header
+ AggrPktHeader(uint8_t *buf, uint16_t max_packet_len, uint16_t pkt_number)
+ : buf_(buf), pkt_num_(pkt_number) {
+ *buf_ = 0; // reset the first byte to correctly add the header
+ // encoding and the packet number
+ if (max_packet_len > 0xff) {
+ setAggrPktEncoding16bit();
+ } else {
+ setAggrPktEncoding8bit();
+ }
+ setAggrPktNUmber(pkt_number);
+ header_len_ = computeHeaderLen();
+ memset(buf_ + 1, 0, header_len_ - 1);
+ }
+
+ // XXX buf always point to the payload after the data header
+ AggrPktHeader(uint8_t *buf) : buf_(buf) {
+ encoding_ = getAggrPktEncoding();
+ pkt_num_ = getAggrPktNumber();
+ header_len_ = computeHeaderLen();
+ }
+
+ ~AggrPktHeader(){};
+
+ int addPacketToHeader(uint8_t index, uint16_t len) {
+ if (index > pkt_num_) return -1;
+
+ setAggrPktLen(index, len);
+ return 0;
+ }
+
+ int getPointerToPacket(uint8_t index, uint8_t **pkt_ptr, uint16_t *pkt_len) {
+ if (index > pkt_num_) return -1;
+
+ uint16_t len = 0;
+ for (int i = 0; i < index; i++)
+ len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1
+
+ uint16_t offset = len + header_len_;
+ *pkt_ptr = buf_ + offset;
+ *pkt_len = getAggrPktLen(index);
+ return 0;
+ }
+
+ int getPacketOffsets(uint8_t index, uint16_t *pkt_offset, uint16_t *pkt_len) {
+ if (index > pkt_num_) return -1;
+
+ uint16_t len = 0;
+ for (int i = 0; i < index; i++)
+ len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1
+
+ uint16_t offset = len + header_len_;
+ *pkt_offset = offset;
+ *pkt_len = getAggrPktLen(index);
+
+ return 0;
+ }
+
+ uint8_t *getPayloadAppendPtr() { return buf_ + header_len_; }
+
+ uint16_t getHeaderLen() { return header_len_; }
+
+ uint8_t getNumberOfPackets() { return pkt_num_; }
+
+ private:
+ inline uint16_t computeHeaderLen() const {
+ uint16_t len = 4; // min len in bytes
+ if (!encoding_) {
+ while (pkt_num_ >= len) {
+ len += 4;
+ }
+ } else {
+ while (pkt_num_ * 2 >= len) {
+ len += 4;
+ }
+ }
+ return len;
+ }
+
+ inline uint8_t getAggrPktEncoding() const {
+ // get the first bit of the first byte
+ return (*buf_ >> 7);
+ }
+
+ inline void setAggrPktEncoding8bit() {
+ // reset the first bit of the first byte
+ encoding_ = 0;
+ *buf_ &= 0x7f;
+ }
+
+ inline void setAggrPktEncoding16bit() {
+ // set the first bit of the first byte
+ encoding_ = 1;
+ *buf_ ^= 0x80;
+ }
+
+ inline uint8_t getAggrPktNumber() const {
+ // return the first byte with the first bit = 0
+ return (*buf_ & 0x7f);
+ }
+
+ inline void setAggrPktNUmber(uint8_t val) {
+ // set the val without modifying the first bit
+ *buf_ &= 0x80; // reset everithing but the first bit
+ val &= 0x7f; // reset the first bit
+ *buf_ |= val; // or the vals, done!
+ }
+
+ inline uint16_t getAggrPktLen(uint8_t pkt_index) const {
+ pkt_index++;
+ if (!encoding_) { // 8 bits
+ return (uint16_t) * (buf_ + pkt_index);
+ } else { // 16 bits
+ uint16_t *buf_16 = (uint16_t *)buf_;
+ return ntohs(*(buf_16 + pkt_index));
+ }
+ }
+
+ inline void setAggrPktLen(uint8_t pkt_index, uint16_t len) {
+ pkt_index++;
+ if (!encoding_) { // 8 bits
+ *(buf_ + pkt_index) = (uint8_t)len;
+ } else { // 16 bits
+ uint16_t *buf_16 = (uint16_t *)buf_;
+ *(buf_16 + pkt_index) = htons(len);
+ }
+ }
+
+ uint8_t *buf_;
+ uint8_t encoding_;
+ uint8_t pkt_num_;
+ uint16_t header_len_;
};
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h
index 34d090092..62636ce40 100644
--- a/libtransport/src/protocols/rtc/rtc_rc.h
+++ b/libtransport/src/protocols/rtc/rtc_rc.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -34,11 +34,15 @@ class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> {
void turnOnRateControl() { rc_on_ = true; }
void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; };
- uint32_t getCongesionWindow() { return congestion_win_; };
+ uint32_t getCongestionWindow() { return congestion_win_; };
+ bool inCongestionState() {
+ if (congestion_state_ == CongestionState::Congested) return true;
+ return false;
+ }
virtual void onNewRound(double round_len) = 0;
- virtual void onDataPacketReceived(
- const core::ContentObject &content_object) = 0;
+ virtual void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) = 0;
protected:
enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last };
diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc
new file mode 100644
index 000000000..6cd3094b5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_congestion_detection.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlCongestionDetection::RTCRateControlCongestionDetection()
+ : rounds_without_congestion_(4), last_queue_(0) {} // must be > 3
+
+RTCRateControlCongestionDetection::~RTCRateControlCongestionDetection() {}
+
+void RTCRateControlCongestionDetection::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ if (last_queue_ == queue) {
+ // if last_queue == queue the consumer didn't receive any
+ // packet from the producer. we do not change the current congestion state.
+ // we just increase the counter of rounds whithout congestion if needed
+ // (in case of congestion the counter is already set to 0)
+ if (congestion_state_ == CongestionState::Normal)
+ rounds_without_congestion_++;
+ } else {
+ if (queue > MAX_QUEUING_DELAY) {
+ // here we detect congestion.
+ congestion_state_ = CongestionState::Congested;
+ rounds_without_congestion_ = 0;
+ } else {
+ // wait 3 rounds before switch back to no congestion
+ if (rounds_without_congestion_ > 3) {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+ rounds_without_congestion_++;
+ }
+ last_queue_ = queue;
+ }
+}
+
+void RTCRateControlCongestionDetection::onDataPacketReceived(
+ const core::ContentObject &content_object, bool compute_stats) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h
new file mode 100644
index 000000000..9afa6c39a
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlCongestionDetection : public RTCRateControl {
+ public:
+ RTCRateControlCongestionDetection();
+
+ ~RTCRateControlCongestionDetection();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.cc b/libtransport/src/protocols/rtc/rtc_rc_iat.cc
new file mode 100644
index 000000000..f06f377f3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_iat.cc
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_iat.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlIAT::RTCRateControlIAT()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ rounds_with_congestion_(0),
+ last_queue_(0),
+ last_rcv_time_(0),
+ last_prod_time_(0),
+ last_seq_number_(0),
+ target_rate_avg_(0),
+ round_index_(0),
+ congestion_cause_(CongestionCause::UNKNOWN) {}
+
+RTCRateControlIAT::~RTCRateControlIAT() {}
+
+void RTCRateControlIAT::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate() +
+ protocol_state_->getRecoveredFecRate();
+
+ double target_rate =
+ protocol_state_->getProducerRate(); // * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
+ // double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ target_rate_avg_ = target_rate_avg_ * (1 - MOVING_AVG_ALPHA) +
+ target_rate * MOVING_AVG_ALPHA;
+
+ if (prev_congestion_state == CongestionState::Congested) {
+ if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ congestion_state_ = CongestionState::Congested;
+
+ received_rate_.push_back(received_rate);
+ target_rate_.push_back(target_rate);
+
+ // We assume the cause does not change
+ // Note that the first assumption about the cause could be wrong
+ // the cause of congestion could change
+ if (congestion_cause_ == CongestionCause::UNKNOWN)
+ if (rounds_with_congestion_ >= 1)
+ congestion_cause_ = apply_classification_tree(
+ rounds_with_congestion_ > ROUND_TO_WAIT_FORCE_DECISION);
+
+ rounds_with_congestion_++;
+ } else {
+ congestion_state_ = CongestionState::Normal;
+
+ // clear past history
+ reset_congestion_statistics();
+
+ // TODO maybe we can use some of these values for the stdev of the
+ // congestion mode
+ for (int i = 0; i < ROUND_HISTORY_SIZE; i++) {
+ iat_on_hold_[i].clear();
+ }
+ }
+ } else if (queue > MAX_QUEUING_DELAY) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ rounds_with_congestion_ = 0;
+
+ if (rounds_without_congestion_ > ROUND_TO_RESET_CAUSE)
+ congestion_cause_ = CongestionCause::UNKNOWN;
+ }
+ congestion_state_ = CongestionState::Congested;
+ received_rate_.push_back(received_rate);
+ target_rate_.push_back(target_rate);
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ reset_congestion_statistics();
+
+ int past_index = (round_index_ + 1) % ROUND_HISTORY_SIZE;
+ for (std::vector<double>::iterator it = iat_on_hold_[past_index].begin();
+ it != iat_on_hold_[past_index].end(); ++it) {
+ congestion_free_iat_.push_back(*it);
+ if (congestion_free_iat_.size() > 50) {
+ congestion_free_iat_.erase(congestion_free_iat_.begin());
+ }
+ }
+ iat_on_hold_[past_index].clear();
+ round_index_ = (round_index_ + 1) % ROUND_HISTORY_SIZE;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ // disabling for the moment the congestion window setup
+ // congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ // disabling for the moment the congestion window setup
+ // uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ // congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ // disabling for the moment the congestion window setup
+ // congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ // congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+
+ if (received_rate_.size() > 1000)
+ received_rate_.erase(received_rate_.begin());
+ if (target_rate_.size() > 1000) target_rate_.erase(target_rate_.begin());
+}
+
+void RTCRateControlIAT::onDataPacketReceived(
+ const core::ContentObject &content_object, bool compute_stats) {
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number == (last_seq_number_ + 1) && compute_stats) {
+ uint64_t iat = now - last_rcv_time_;
+ uint64_t ist = params.timestamp - last_prod_time_;
+ if (now >= last_rcv_time_ && params.timestamp > last_prod_time_) {
+ if (iat >= ist && ist < MIN_IST_VALUE) {
+ if (congestion_state_ == CongestionState::Congested) {
+ iat_.push_back((iat - ist));
+ } else {
+ // no congestion, but we do not always add new values, but only when
+ // there is no sign of congestion
+ double queue = protocol_state_->getQueuing();
+ if (queue <= CONGESTION_FREE_QUEUEING_DELAY) {
+ iat_on_hold_[round_index_].push_back((iat - ist));
+ }
+ }
+ }
+ }
+ }
+
+ last_seq_number_ = segment_number;
+ last_rcv_time_ = now;
+ last_prod_time_ = params.timestamp;
+
+ if (iat_.size() > 1000) iat_.erase(iat_.begin());
+ return;
+}
+
+CongestionCause RTCRateControlIAT::apply_classification_tree(bool force_reply) {
+ if (iat_.size() <= 2 || received_rate_.size() < 2)
+ return CongestionCause::UNKNOWN;
+
+ double received_ratio = 0;
+ double iat_ratio = 0;
+ double iat_stdev = compute_iat_stdev(iat_);
+ double iat_congestion_free_stdev = compute_iat_stdev(congestion_free_iat_);
+
+ double iat_avg = 0.0;
+
+ double recv_avg = 0.0;
+ double recv_max = 0.0;
+
+ double target_rate_avg = 0.0;
+
+ int counter = 0;
+ std::vector<double>::reverse_iterator target_it = target_rate_.rbegin();
+ for (std::vector<double>::reverse_iterator it = received_rate_.rbegin();
+ it != received_rate_.rend(); ++it) {
+ recv_avg += *it;
+ target_rate_avg += *target_it;
+ if (counter < ROUND_HISTORY_SIZE)
+ if (recv_max < *it) {
+ recv_max = *it; // we consider only the last 2 seconds
+ }
+ counter++;
+ target_it++;
+ }
+ recv_avg = recv_avg / received_rate_.size();
+ target_rate_avg = target_rate_avg / target_rate_.size();
+
+ for (std::vector<double>::iterator it = iat_.begin(); it != iat_.end();
+ ++it) {
+ iat_avg += *it;
+ }
+ iat_avg = iat_avg / iat_.size();
+
+ double congestion_free_iat_avg = 0.0;
+ for (std::vector<double>::iterator it = congestion_free_iat_.begin();
+ it != congestion_free_iat_.end(); ++it) {
+ congestion_free_iat_avg += *it;
+ }
+ congestion_free_iat_avg =
+ congestion_free_iat_avg / congestion_free_iat_.size();
+
+ received_ratio = recv_avg / target_rate_avg;
+
+ iat_ratio = iat_stdev / iat_congestion_free_stdev;
+
+ CongestionCause congestion_cause = CongestionCause::UNKNOWN;
+ // applying classification tree model
+ if (received_ratio <= 0.87)
+ if (iat_stdev <= 6.48)
+ if (received_ratio <= 0.83)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else if (force_reply)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low
+ else if (iat_ratio <= 2.46)
+ if (force_reply)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low
+ else
+ congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC;
+ else if (received_ratio <= 0.913 && iat_stdev <= 0.784)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC;
+
+ return congestion_cause;
+}
+
+void RTCRateControlIAT::reset_congestion_statistics() {
+ iat_.clear();
+ received_rate_.clear();
+ target_rate_.clear();
+}
+
+double RTCRateControlIAT::compute_iat_stdev(std::vector<double> v) {
+ if (v.size() == 0) return 0;
+
+ float sum = 0.0, mean, standard_deviation = 0.0;
+ for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) {
+ sum += *it;
+ }
+
+ mean = sum / v.size();
+ for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) {
+ standard_deviation += pow(*it - mean, 2);
+ }
+ return sqrt(standard_deviation / v.size());
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.h b/libtransport/src/protocols/rtc/rtc_rc_iat.h
new file mode 100644
index 000000000..715637807
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_iat.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const int ROUND_HISTORY_SIZE = 10; // equivalent to two seconds
+const int ROUND_TO_WAIT_FORCE_DECISION = 5;
+
+// once congestion is gone, we need to wait for k rounds before changing the
+// congestion cause in the case it appears again
+const int ROUND_TO_RESET_CAUSE = 5;
+
+const int MIN_IST_VALUE = 150; // samples of ist larger than 150ms are
+ // discarded
+const double CONGESTION_FREE_QUEUEING_DELAY = 10;
+
+enum class CongestionCause : uint8_t {
+ COMPETING_CROSS_TRAFFIC,
+ FRIENDLY_CROSS_TRAFFIC,
+ UNKNOWN_CROSS_TRAFFIC,
+ LINK_CAPACITY,
+ UNKNOWN
+};
+
+class RTCRateControlIAT : public RTCRateControl {
+ public:
+ RTCRateControlIAT();
+
+ ~RTCRateControlIAT();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void reset_congestion_statistics();
+
+ double compute_iat_stdev(std::vector<double> v);
+
+ CongestionCause apply_classification_tree(bool force_reply);
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ uint32_t rounds_with_congestion_;
+ double last_queue_;
+ uint64_t last_rcv_time_;
+ uint64_t last_prod_time_;
+ uint32_t last_seq_number_;
+ double target_rate_avg_;
+
+ // Iat values are not immediately added to the congestion free set of values
+ std::array<std::vector<double>, ROUND_HISTORY_SIZE> iat_on_hold_;
+ uint32_t round_index_;
+
+ // with congestion statistics
+ std::vector<double> iat_;
+ std::vector<double> received_rate_;
+ std::vector<double> target_rate_;
+
+ // congestion free statistics
+ std::vector<double> congestion_free_iat_;
+
+ CongestionCause congestion_cause_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
index a1c89e329..ecabc5205 100644
--- a/libtransport/src/protocols/rtc/rtc_rc_queue.cc
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -37,7 +37,7 @@ void RTCRateControlQueue::onNewRound(double round_len) {
double received_rate = protocol_state_->getReceivedRate();
double target_rate =
protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = protocol_state_->getAveragePacketSize();
double queue = protocol_state_->getQueuing();
@@ -94,7 +94,7 @@ void RTCRateControlQueue::onNewRound(double round_len) {
}
void RTCRateControlQueue::onDataPacketReceived(
- const core::ContentObject &content_object) {
+ const core::ContentObject &content_object, bool compute_stats) {
// nothing to do
return;
}
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h
index 407354d43..cdf78fd47 100644
--- a/libtransport/src/protocols/rtc/rtc_rc_queue.h
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -30,7 +30,8 @@ class RTCRateControlQueue : public RTCRateControl {
~RTCRateControlQueue();
void onNewRound(double round_len);
- void onDataPacketReceived(const core::ContentObject &content_object);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
auto shared_from_this() { return utils::shared_from(this); }
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc
new file mode 100644
index 000000000..992bab50e
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <protocols/rtc/rtc_reassembly.h>
+#include <protocols/transport_protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RtcReassembly::RtcReassembly(implementation::ConsumerSocket* icn_socket,
+ TransportProtocol* transport_protocol)
+ : DatagramReassembly(icn_socket, transport_protocol) {
+ is_setup_ = false;
+}
+
+void RtcReassembly::reassemble(core::ContentObject& content_object) {
+ if (!is_setup_) {
+ is_setup_ = true;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_);
+ }
+
+ auto read_buffer = content_object.getPayload();
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length();
+
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength());
+
+ if (data_aggregation_) {
+ rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data());
+
+ for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) {
+ std::unique_ptr<utils::MemBuf> segment = read_buffer->clone();
+
+ uint16_t pkt_start = 0;
+ uint16_t pkt_len = 0;
+ int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len);
+ if (res == -1) {
+ // this should not happen
+ break;
+ }
+
+ segment->trimStart(pkt_start);
+ segment->trimEnd(segment->length() - pkt_len);
+
+ Reassembly::read_buffer_ = std::move(segment);
+ Reassembly::notifyApplication();
+ }
+ } else {
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+ }
+}
+
+void RtcReassembly::reassemble(utils::MemBuf& buffer, uint32_t suffix) {
+ if (!is_setup_) {
+ is_setup_ = true;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_);
+ }
+
+ if (data_aggregation_) {
+ rtc::AggrPktHeader hdr((uint8_t*)buffer.data());
+
+ for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) {
+ std::unique_ptr<utils::MemBuf> segment = buffer.clone();
+
+ uint16_t pkt_start = 0;
+ uint16_t pkt_len = 0;
+ int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len);
+ if (res == -1) {
+ // this should not happen
+ break;
+ }
+
+ segment->trimStart(pkt_start);
+ segment->trimEnd(segment->length() - pkt_len);
+
+ Reassembly::read_buffer_ = std::move(segment);
+ Reassembly::notifyApplication();
+ }
+
+ } else {
+ Reassembly::read_buffer_ = buffer.cloneOne();
+ Reassembly::notifyApplication();
+ }
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h
index 15722a6d5..132004605 100644
--- a/libtransport/src/protocols/rtc/rtc_reassembly.h
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -28,17 +28,14 @@ namespace rtc {
class RtcReassembly : public DatagramReassembly {
public:
RtcReassembly(implementation::ConsumerSocket *icn_socket,
- TransportProtocol *transport_protocol)
- : DatagramReassembly(icn_socket, transport_protocol) {}
-
- void reassemble(core::ContentObject &content_object) override {
- auto read_buffer = content_object.getPayload();
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Size of payload: " << read_buffer->length();
- read_buffer->trimStart(transport_protocol_->transportHeaderLength());
- Reassembly::read_buffer_ = std::move(read_buffer);
- Reassembly::notifyApplication();
- }
+ TransportProtocol *transport_protocol);
+
+ void reassemble(core::ContentObject &content_object) override;
+ void reassemble(utils::MemBuf &buffer, uint32_t suffix) override;
+
+ private:
+ bool is_setup_;
+ bool data_aggregation_;
};
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
new file mode 100644
index 000000000..888105eab
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -0,0 +1,418 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <hicn/transport/interfaces/notification.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace transport::interface;
+
+RecoveryStrategy::RecoveryStrategy(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ bool use_rtx, bool use_fec, interface::StrategyCallback *external_callback)
+ : recovery_on_(false),
+ next_rtx_timer_(MAX_TIMER_RTX),
+ send_rtx_callback_(std::move(callback)),
+ indexer_(indexer),
+ round_id_(0),
+ last_fec_used_(0),
+ callback_(external_callback) {
+ setRtxFec(use_rtx, use_fec);
+ timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
+ : rtx_state_(std::move(rs.rtx_state_)),
+ rtx_timers_(std::move(rs.rtx_timers_)),
+ recover_with_fec_(std::move(rs.recover_with_fec_)),
+ timer_(std::move(rs.timer_)),
+ next_rtx_timer_(std::move(rs.next_rtx_timer_)),
+ send_rtx_callback_(std::move(rs.send_rtx_callback_)),
+ n_(std::move(rs.n_)),
+ k_(std::move(rs.k_)),
+ indexer_(std::move(rs.indexer_)),
+ state_(std::move(rs.state_)),
+ rc_(std::move(rs.rc_)),
+ round_id_(std::move(rs.round_id_)),
+ last_fec_used_(std::move(rs.last_fec_used_)),
+ callback_(rs.callback_) {
+ setFecParams(n_, k_);
+}
+
+RecoveryStrategy::~RecoveryStrategy() {}
+
+void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
+ n_ = n;
+ k_ = k;
+
+ // XXX for the moment we go in steps of 5% loss rate.
+ // max loss rate = 95%
+ for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
+ double dec_loss_rate = (double)loss_rate / 100.0;
+ double exp_losses = (double)k_ * dec_loss_rate;
+ uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
+
+ fec_state_ f;
+ f.fec_to_ask = std::min(fec_to_ask, (n_ - k_));
+ f.last_update = round_id_;
+ f.avg_residual_losses = 0.0;
+ f.consecutive_use = 0;
+ fec_per_loss_rate_.push_back(f);
+ }
+}
+
+bool RecoveryStrategy::lossDetected(uint32_t seq) {
+ if (isRtx(seq)) {
+ // this packet is already in the list of rtx
+ return false;
+ }
+
+ auto it = recover_with_fec_.find(seq);
+ if (it != recover_with_fec_.end()) {
+ // this packet is already in list of packets to recover with fec
+ // this list contians also fec packets that will not be recovered with rtx
+ return false;
+ }
+
+ // new loss detected, recover it according to the strategy
+ newPacketLoss(seq);
+ return true;
+}
+
+void RecoveryStrategy::clear() {
+ rtx_state_.clear();
+ rtx_timers_.clear();
+ recover_with_fec_.clear();
+
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ timer_->cancel();
+ }
+}
+
+// rtx functions
+void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
+ if (!indexer_->isFec(seq) || force) {
+ // this packet needs to be re-transmitted
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Add " << seq << " to retransmissions. next rtx is in "
+ << state.next_send_ - getNow() << " ms";
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+
+ // if a new rtx is introduced, check the rtx timer
+ scheduleNextRtx();
+ } else {
+ // do not re-send fec packets but keep track of them
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+}
+
+uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) {
+ uint64_t now = getNow();
+ if (new_rtx) {
+ // for the new rtx we wait one estimated IAT after the loss detection. this
+ // is bacause, assuming that packets arrive with a constant IAT, we should
+ // get a new packet every IAT
+ double prod_rate = state_->getProducerRate();
+ uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
+ uint32_t jitter = 0;
+
+ if (prod_rate != 0) {
+ double packet_size = state_->getAveragePacketSize();
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
+ }
+
+ uint32_t wait = 1;
+ if (estimated_iat < 18) {
+ // for low rate app we do not wait to send a RTX
+ // we consider low rate stream with less than 50pps (iat >= 20ms)
+ // (e.g. audio in videoconf, mobile games).
+ // in the check we use 18ms to accomodate for measurements errors
+ // for flows with higher rate wait 1 ait + jitter
+ wait = estimated_iat + jitter;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "first rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter;
+
+ return now + wait;
+ } else {
+ // wait one RTT
+ uint32_t wait = SENTINEL_TIMER_INTERVAL;
+
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate == 0) {
+ return now + SENTINEL_TIMER_INTERVAL;
+ }
+
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+
+ uint64_t rtt = state_->getMinRTT();
+ if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
+ wait = rtt;
+
+ uint32_t jitter = ceil(state_->getJitter());
+ wait += jitter;
+
+ // it may happen that the channel is congested and we have some additional
+ // queuing delay to take into account
+ uint32_t queue = ceil(state_->getQueuing());
+ wait += queue;
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter << " queue = " << queue;
+
+ return now + wait;
+ }
+}
+
+void RecoveryStrategy::retransmit() {
+ if (rtx_timers_.size() == 0) return;
+
+ uint64_t now = getNow();
+
+ auto it = rtx_timers_.begin();
+ std::unordered_set<uint32_t> lost_pkt;
+ uint32_t sent_counter = 0;
+ while (it != rtx_timers_.end() && it->first <= now &&
+ sent_counter < MAX_RTX_IN_BATCH) {
+ uint32_t seq = it->second;
+ auto rtx_it =
+ rtx_state_.find(seq); // this should always return a valid iter
+ if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
+ (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
+ seq < state_->getLastSeqNacked()) {
+ // max rtx reached or packet too old or packet nacked, this packet is lost
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "packet " << seq << " lost because 1) max rtx: "
+ << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: "
+ << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE)
+ << " 3) nacked: " << (seq < state_->getLastSeqNacked());
+ lost_pkt.insert(seq);
+ it++;
+ } else {
+ // resend the packet
+ state_->onRetransmission(seq);
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) rtx_it->second.rtx_count_++;
+ rtx_it->second.next_send_ = computeNextSend(seq, false);
+ it = rtx_timers_.erase(it);
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "send rtx for sequence " << seq << ", next send in "
+ << (rtx_it->second.next_send_ - now);
+ send_rtx_callback_(seq);
+ sent_counter++;
+ }
+ }
+
+ // remove packets if needed
+ for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
+ uint32_t seq = *lost_it;
+ state_->onPacketLost(seq);
+ deleteRtx(seq);
+ }
+}
+
+void RecoveryStrategy::scheduleNextRtx() {
+ if (rtx_timers_.size() == 0) {
+ // all the rtx were removed, reset timer
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ return;
+ }
+
+ // check if timer is alreay set
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ // a new check for rtx is already scheduled
+ if (next_rtx_timer_ > rtx_timers_.begin()->first) {
+ // we need to re-schedule it
+ timer_->cancel();
+ } else {
+ // wait for the next timer
+ return;
+ }
+ }
+
+ // set a new timer
+ next_rtx_timer_ = rtx_timers_.begin()->first;
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ uint64_t wait = 1;
+ if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
+ wait = next_rtx_timer_ - now;
+
+ std::weak_ptr<RecoveryStrategy> self(shared_from_this());
+ timer_->expires_from_now(std::chrono::milliseconds(wait));
+ timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->retransmit();
+ s->next_rtx_timer_ = MAX_TIMER_RTX;
+ s->scheduleNextRtx();
+ }
+ });
+}
+
+void RecoveryStrategy::deleteRtx(uint32_t seq) {
+ auto it_rtx = rtx_state_.find(seq);
+ if (it_rtx == rtx_state_.end()) return; // rtx not found
+
+ // remove the rtx from the timers list
+ uint64_t ts = it_rtx->second.next_send_;
+ auto it_timers = rtx_timers_.find(ts);
+ while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
+ if (it_timers->second == seq) {
+ rtx_timers_.erase(it_timers);
+ break;
+ }
+ it_timers++;
+ }
+ // remove rtx
+ rtx_state_.erase(it_rtx);
+}
+
+// fec functions
+uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) {
+ double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in %
+
+ if (loss_rate > 95) loss_rate = 95; // max loss rate
+
+ if (loss_rate == 0) return 0;
+
+ // once per minute try to reduce the fec rate. it may happen that for some bin
+ // we ask too many fec packet. here we try to reduce this values gently
+ if (round_id_ % ROUNDS_PER_MIN == 0) {
+ reduceFec();
+ }
+
+ // keep track of the last used fec. if we use a new bin on this round reset
+ // consecutive use and avg loss in the prev bin
+ uint32_t bin = ceil(loss_rate / 5.0) - 1;
+ if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1;
+
+ if (bin != last_fec_used_) {
+ fec_per_loss_rate_[last_fec_used_].consecutive_use = 0;
+ fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0;
+ }
+ last_fec_used_ = bin;
+ fec_per_loss_rate_[last_fec_used_].consecutive_use++;
+
+ // we update the stats only once very 5 rounds (1sec) that is the rate at
+ // which we compute residual losses
+ if (round_id_ % ROUNDS_PER_SEC == 0) {
+ double residual_losses = state_->getResidualLossRate() * 100;
+ // update residual loss rate
+ fec_per_loss_rate_[bin].avg_residual_losses =
+ (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) +
+ (1 - MOVING_AVG_ALPHA) * residual_losses;
+
+ if ((fec_per_loss_rate_[bin].last_update - round_id_) <
+ WAIT_BEFORE_FEC_UPDATE) {
+ // this bin is been updated recently so don't modify it and
+ // return the current state
+ return fec_per_loss_rate_[bin].fec_to_ask;
+ }
+
+ // if the residual loss rate is too high and we can ask more fec packets and
+ // we are using this configuration since at least 5 sec update fec
+ if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE &&
+ fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) &&
+ fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) {
+ // so increase the number of fec packets to ask
+ fec_per_loss_rate_[bin].fec_to_ask++;
+ fec_per_loss_rate_[bin].last_update = round_id_;
+ fec_per_loss_rate_[bin].avg_residual_losses = 0.0;
+ }
+ }
+
+ return fec_per_loss_rate_[bin].fec_to_ask;
+}
+
+void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
+ std::optional<bool> fec_on) {
+ if (rtx_on) rtx_on_ = *rtx_on;
+ if (fec_on) fec_on_ = *fec_on;
+
+ if (*callback_) {
+ notification::RecoveryStrategy strategy =
+ notification::RecoveryStrategy::RECOVERY_OFF;
+
+ if (rtx_on_ && fec_on_)
+ strategy = notification::RecoveryStrategy::RTX_AND_FEC;
+ else if (rtx_on_)
+ strategy = notification::RecoveryStrategy::RTX_ONLY;
+ else if (fec_on_)
+ strategy = notification::RecoveryStrategy::FEC_ONLY;
+
+ (*callback_)(strategy);
+ }
+}
+
+// common functions
+void RecoveryStrategy::onLostTimeout(uint32_t seq) { removePacketState(seq); }
+
+void RecoveryStrategy::removePacketState(uint32_t seq) {
+ auto it_fec = recover_with_fec_.find(seq);
+ if (it_fec != recover_with_fec_.end()) {
+ recover_with_fec_.erase(it_fec);
+ return;
+ }
+
+ deleteRtx(seq);
+}
+
+// private methods
+
+void RecoveryStrategy::reduceFec() {
+ for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
+ double dec_loss_rate = (double)loss_rate / 100.0;
+ double exp_losses = (double)k_ * dec_loss_rate;
+ uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
+
+ uint32_t bin = ceil(loss_rate / 5.0) - 1;
+ if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
+ fec_per_loss_rate_[bin].fec_to_ask--;
+ // std::cout << "reduce fec to ask for bin " << bin << std::endl;
+ }
+ }
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
new file mode 100644
index 000000000..9ffc69a1b
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <protocols/indexer.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <map>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
+ protected:
+ struct rtx_state_ {
+ uint64_t first_send_;
+ uint64_t next_send_;
+ uint32_t rtx_count_;
+ };
+
+ using rtxState = struct rtx_state_;
+
+ public:
+ using SendRtxCallback = std::function<void(uint32_t)>;
+
+ RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service, bool use_rtx, bool use_fec,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategy(RecoveryStrategy &&rs);
+
+ virtual ~RecoveryStrategy();
+
+ void setRtxFec(std::optional<bool> rtx_on = {},
+ std::optional<bool> fec_on = {});
+ void setState(RTCState *state) { state_ = state; }
+ void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
+ void setFecParams(uint32_t n, uint32_t k);
+
+ void tunrOnRecovery() { recovery_on_ = true; }
+
+ bool isRtx(uint32_t seq) {
+ if (rtx_state_.find(seq) != rtx_state_.end()) return true;
+ return false;
+ }
+
+ bool isPossibleLossWithNoRtx(uint32_t seq) {
+ if (recover_with_fec_.find(seq) != recover_with_fec_.end()) return true;
+ return false;
+ }
+
+ bool isRtxOn() { return rtx_on_; }
+
+ RTCState *getState() { return state_; }
+ bool lossDetected(uint32_t seq);
+ void clear();
+
+ virtual void onNewRound(bool in_sync) = 0;
+ virtual void newPacketLoss(uint32_t seq) = 0;
+ virtual void receivedPacket(uint32_t seq) = 0;
+ void onLostTimeout(uint32_t seq);
+
+ void incRoundId() { round_id_++; }
+
+ // utils
+ uint64_t getNow() {
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ return now;
+ }
+
+ protected:
+ // rtx functions
+ void addNewRtx(uint32_t seq, bool force);
+ uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ void retransmit();
+ void scheduleNextRtx();
+ void deleteRtx(uint32_t seq);
+
+ // fec functions
+ uint32_t computeFecPacketsToAsk(bool in_sync);
+
+ // common functons
+ void removePacketState(uint32_t seq);
+
+ bool recovery_on_;
+ bool rtx_on_;
+ bool fec_on_;
+
+ // this map keeps track of the retransmitted interest, ordered from the oldest
+ // to the newest one. the state contains the timer of the first send of the
+ // interest (from pendingIntetests_), the timer of the next send (key of the
+ // multimap) and the number of rtx
+ std::map<uint32_t, rtxState> rtx_state_;
+ // this map stored the rtx by timer. The key is the time at which the rtx
+ // should be sent, and the val is the interest seq number
+ std::multimap<uint64_t, uint32_t> rtx_timers_;
+
+ // lost packets that will be recovered with fec
+ std::unordered_set<uint32_t> recover_with_fec_;
+
+ // rtx vars
+ std::unique_ptr<asio::steady_timer> timer_;
+ uint64_t next_rtx_timer_;
+ SendRtxCallback send_rtx_callback_;
+
+ // fec vars
+ uint32_t n_;
+ uint32_t k_;
+ Indexer *indexer_;
+
+ RTCState *state_;
+ RTCRateControl *rc_;
+
+ private:
+ struct fec_state_ {
+ uint32_t fec_to_ask;
+ uint32_t last_update; // round id of the last update
+ // (wait 10 ruonds (2sec) between updates)
+ uint32_t consecutive_use; // consecutive ruonds where this fec was used
+ double avg_residual_losses;
+ };
+
+ void reduceFec();
+
+ uint32_t round_id_; // number of rounds
+ uint32_t last_fec_used_;
+ std::vector<fec_state_> fec_per_loss_rate_;
+ interface::StrategyCallback *callback_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
new file mode 100644
index 000000000..e2c60ca77
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_delay.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::StrategyCallback *external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ external_callback), // start with rtx
+ congestion_state_(false),
+ probing_state_(false),
+ switch_rounds_(0) {}
+
+RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(true, false);
+ // we have to re-init congestion and
+ // probing
+ congestion_state_ = false;
+ probing_state_ = false;
+}
+
+RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {}
+
+void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) {
+ if (fec_to_ask == 0) {
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ } else {
+ switch_rounds_++;
+ if (switch_rounds_ >= 5) {
+ setRtxFec(false, true);
+ } else {
+ setRtxFec({}, true);
+ }
+ }
+}
+
+void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ // disable fec so that no extra packet will be sent
+ // for rtx we check if recovery is on in newPacketLoss
+ setRtxFec(true, false);
+ indexer_->setNFec(0);
+ return;
+ }
+
+ uint64_t rtt = state_->getMinRTT();
+
+ bool congestion = false;
+ // XXX at the moment we are not looking at congestion events
+ // congestion = rc_->inCongestionState();
+
+ if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) {
+ // switch from rtx to fec or keep use fec. Notice that if some rtx are
+ // waiting to be scheduled, they will be sent normally, but no new rtx will
+ // be created If the loss rate is 0 keep to use RTX.
+ uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ softSwitchToFec(fec_to_ask);
+ indexer_->setNFec(fec_to_ask);
+ return;
+ }
+
+ if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
+ // turn on rtx
+ softSwitchToFec(0);
+ indexer_->setNFec(0);
+ return;
+ }
+}
+
+void RecoveryStrategyDelayBased::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyDelayBased::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+void RecoveryStrategyDelayBased::probing() {
+ // TODO
+ // for the moment ask for all fec and exit the probing phase
+ probing_state_ = false;
+ setRtxFec(false, true);
+ indexer_->setNFec(computeFecPacketsToAsk(true));
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
new file mode 100644
index 000000000..0dd199965
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyDelayBased : public RecoveryStrategy {
+ public:
+ RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyDelayBased();
+
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ void softSwitchToFec(uint32_t fec_to_ask);
+
+ bool congestion_state_;
+ bool probing_state_;
+ uint32_t switch_rounds_;
+
+ void probing();
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
new file mode 100644
index 000000000..36d8e39f0
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_fec_only.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::StrategyCallback *external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
+ external_callback),
+ congestion_state_(false),
+ probing_state_(false),
+ switch_rounds_(0) {}
+
+RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(false, true);
+ congestion_state_ = false;
+ probing_state_ = false;
+}
+
+RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {}
+
+void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ indexer_->setNFec(0);
+ return;
+ }
+
+ // XXX for the moment we are considering congestion events
+ // if(rc_->inCongestionState()){
+ // congestion_state_ = true;
+ // probing_state_ = false;
+ // indexer_->setNFec(0);
+ // return;
+ // }
+
+ // no congestion
+ if (congestion_state_) {
+ // this is the first round after congestion
+ // enter probing phase
+ probing_state_ = true;
+ congestion_state_ = false;
+ }
+
+ if (probing_state_) {
+ probing();
+ } else {
+ uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ // If fec_to_ask == 0 we use rtx even if in these strategy we use only fec.
+ // In this way the first packet loss that triggers the usage of fec can be
+ // recovered using rtx, otherwise it will always be lost
+ if (fec_to_ask == 0) {
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ } else {
+ switch_rounds_++;
+ if (switch_rounds_ >= 5) {
+ setRtxFec(false, true);
+ } else {
+ setRtxFec({}, true);
+ }
+ }
+ indexer_->setNFec(fec_to_ask);
+ }
+}
+
+void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ // if not pending add rtc
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyFecOnly::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+void RecoveryStrategyFecOnly::probing() {
+ // TODO
+ // for the moment ask for all fec and exit the probing phase
+ probing_state_ = false;
+ uint32_t fec_to_ask = computeFecPacketsToAsk(true);
+ indexer_->setNFec(fec_to_ask);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
new file mode 100644
index 000000000..37b505d35
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyFecOnly : public RecoveryStrategy {
+ public:
+ RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyFecOnly();
+
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ bool congestion_state_;
+ bool probing_state_;
+ uint32_t switch_rounds_;
+
+ void probing();
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
new file mode 100644
index 000000000..bd153d209
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_low_rate.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyLowRate::RecoveryStrategyLowRate(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::StrategyCallback *external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
+ external_callback), // start with fec
+ fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
+ rtx_allowed_consecutive_rounds_(0) {
+ initSwitchVector();
+}
+
+RecoveryStrategyLowRate::RecoveryStrategyLowRate(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)),
+ fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
+ rtx_allowed_consecutive_rounds_(0) {
+ setRtxFec(false, true);
+ initSwitchVector();
+}
+
+RecoveryStrategyLowRate::~RecoveryStrategyLowRate() {}
+
+void RecoveryStrategyLowRate::initSwitchVector() {
+ // TODO adjust thresholds here when new data are collected
+ // see resutls in
+ // https://confluence-eng-gpk1.cisco.com/conf/display/SPT/dailyreports
+ thresholds_t t1;
+ t1.rtt = 15; // 15ms
+ t1.loss_rtx_to_fec = 15; // 15%
+ t1.loss_fec_to_rtx = 10; // 10%
+ thresholds_t t2;
+ t2.rtt = 35; // 35ms
+ t2.loss_rtx_to_fec = 5; // 5%
+ t2.loss_fec_to_rtx = 1; // 1%
+ switch_vector.push_back(t1);
+ switch_vector.push_back(t2);
+}
+
+void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec,
+ uint32_t fec_to_ask) {
+ setRtxFec(use_rtx, use_fec);
+ indexer_->setNFec(fec_to_ask);
+}
+
+void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
+ uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ if (fec_to_ask == 0) {
+ // fec is off, turn on RTX immediatly to avoid packet losses
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ return;
+ }
+
+ uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100);
+ uint32_t rtt = state_->getAvgRTT();
+
+ bool use_rtx = false;
+ for (size_t i = 0; i < switch_vector.size(); i++) {
+ uint32_t max_loss_rate = 0;
+ if (fec_on_)
+ max_loss_rate = switch_vector[i].loss_fec_to_rtx;
+ else
+ max_loss_rate = switch_vector[i].loss_rtx_to_fec;
+
+ if (rtt < switch_vector[i].rtt && loss_rate < max_loss_rate) {
+ use_rtx = true;
+ rtx_allowed_consecutive_rounds_++;
+ break;
+ }
+ }
+
+ if (!use_rtx) rtx_allowed_consecutive_rounds_ = 0;
+
+ if (use_rtx) {
+ if (fec_on_) {
+ // here we should swtich from RTX to FEC
+ // wait 10sec where the switch is allowed before actually switch
+ if (rtx_allowed_consecutive_rounds_ >=
+ ((MILLI_IN_A_SEC / ROUND_LEN) * 10)) { // 10 sec
+ // use RTX
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ } else {
+ // keep using FEC (and maybe RTX)
+ setRecoveryParameters(true, true, fec_to_ask);
+ fec_consecutive_rounds_++;
+ }
+ } else {
+ // keep using RTX
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ }
+ } else {
+ // use FEC and RTX
+ setRecoveryParameters(true, true, fec_to_ask);
+ fec_consecutive_rounds_++;
+ }
+
+ // everytime that we anable FEC we keep also RTX on. in this way the first
+ // losses that are not covered by FEC are recovered using RTX. after 5 sec we
+ // disable fec
+ if (fec_consecutive_rounds_ >= ((MILLI_IN_A_SEC / ROUND_LEN) * 5)) {
+ // turn off RTX
+ setRtxFec(false);
+ }
+}
+
+void RecoveryStrategyLowRate::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ // disable fec so that no extra packet will be sent
+ // for rtx we check if recovery is on in newPacketLoss
+ setRtxFec(true, false);
+ indexer_->setNFec(0);
+ return;
+ }
+
+ // XXX since this strategy will be used only for flow at low rate we do not
+ // consider congestion events like in other strategies
+
+ selectRecoveryStrategy(in_sync);
+}
+
+void RecoveryStrategyLowRate::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyLowRate::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
new file mode 100644
index 000000000..f0c7bd0d5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+struct thresholds_t {
+ uint32_t rtt;
+ uint32_t loss_rtx_to_fec; // loss rate used to move from rtx to fec
+ uint32_t loss_fec_to_rtx; // loss rate used to move from fec to rtx
+};
+
+class RecoveryStrategyLowRate : public RecoveryStrategy {
+ public:
+ RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategyLowRate(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyLowRate();
+
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ void initSwitchVector();
+ void setRecoveryParameters(bool use_rtx, bool use_fec, uint32_t fec_to_ask);
+ void selectRecoveryStrategy(bool in_sync);
+
+ uint32_t fec_consecutive_rounds_;
+ uint32_t rtx_allowed_consecutive_rounds_;
+
+ // this table contains the thresholds that indicates when to switch from RTX
+ // to FEC and viceversa. values in the vector are detected with a set of
+ // experiments. the vector is used in the following way: if rtt and loss rate
+ // are less than one of the values in the in the vector, losses are
+ // recovered using RTX. otherwive losses are recovered using FEC. as for FEC
+ // only and delay based strategy, the swith from RTX to FEC is smooth,
+ // meaning that FEC and RTX are used together for some rounds
+ std::vector<thresholds_t> switch_vector;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
new file mode 100644
index 000000000..499e978f1
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_recovery_off.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::StrategyCallback *external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
+ external_callback) {}
+
+RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(false, false);
+}
+
+RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {}
+
+void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) {
+ // nothing to do
+ return;
+}
+
+void RecoveryStrategyRecoveryOff::newPacketLoss(uint32_t seq) {
+ // here we only keep track of the lost packets to avoid to
+ // count them multple times in the counters. for this we
+ // use the recover_with_fec_ set
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+}
+
+void RecoveryStrategyRecoveryOff::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
new file mode 100644
index 000000000..98cd1e6a5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
+ public:
+ RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyRecoveryOff();
+
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
new file mode 100644
index 000000000..c1ae9b53d
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_rtx_only.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::StrategyCallback *external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ external_callback) {}
+
+RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(true, false);
+}
+
+RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {}
+
+void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) {
+ // nothing to do
+ return;
+}
+
+void RecoveryStrategyRtxOnly::newPacketLoss(uint32_t seq) {
+ if (!recovery_on_) {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ return;
+ }
+ addNewRtx(seq, false);
+}
+
+void RecoveryStrategyRtxOnly::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
new file mode 100644
index 000000000..7ae909454
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyRtxOnly : public RecoveryStrategy {
+ public:
+ RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::StrategyCallback *external_callback);
+
+ RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyRtxOnly();
+
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index c99205a26..6a21531f8 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -24,15 +24,15 @@ namespace protocol {
namespace rtc {
RTCState::RTCState(Indexer *indexer,
- ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ ProbeHandler::SendProbeCallback &&probe_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service)
- : indexer_(indexer),
- rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback),
- io_service)),
+ : loss_history_(10), // log 10sec history
+ indexer_(indexer),
+ probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback),
+ io_service)),
discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
- initParams();
}
RTCState::~RTCState() {}
@@ -55,9 +55,19 @@ void RTCState::initParams() {
highest_seq_received_in_order_ = 0;
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
- avg_loss_rate_ = 0.0;
+ avg_loss_rate_ = -1.0;
max_loss_rate_ = 0.0;
last_round_loss_rate_ = 0.0;
+
+ // loss rate per sec
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ per_sec_loss_rate_ = 0.0;
+
+ // residual losses counters
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
residual_loss_rate_ = 0.0;
// fec counters
@@ -66,9 +76,13 @@ void RTCState::initParams() {
// bw counters
received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
+
avg_packet_size_ = INIT_PACKET_SIZE;
production_rate_ = 0.0;
received_rate_ = 0.0;
+ fec_recovered_rate_ = 0.0;
// nack counter
nack_on_last_round_ = false;
@@ -95,31 +109,30 @@ void RTCState::initParams() {
path_table_.clear();
main_path_ = nullptr;
- // packet received
- received_or_lost_packets_.clear();
+ // packet cache (not pending anymore)
+ packet_cache_.clear();
// pending interests
pending_interests_.clear();
- // skipped interest
+ // used to keep track of the skipped interest
last_interest_sent_ = 0;
- skipped_interests_.clear();
// init rtt
first_interest_sent_time_ = ~0;
first_interest_sent_seq_ = 0;
+ // start probing the producer
init_rtt_ = false;
- rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
- rtt_probes_->sendProbes();
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
setInitRttTimer(INIT_RTT_PROBE_RESTART);
}
// packet events
void RTCState::onSendNewInterest(const core::Name *interest_name) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint32_t seq = interest_name->getSuffix();
pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
@@ -137,11 +150,12 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) {
}
// TODO what happen in case of jumps?
- // look for skipped interests
- skipped_interests_.erase(seq); // remove seq if it is there
+ eraseFromPacketCache(
+ seq); // if we send this interest we don't know its state
for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) {
if (indexer_->isFec(i)) {
- skipped_interests_.insert(i);
+ // only fec packets can be skipped
+ addToPacketCache(i, PacketState::SKIPPED);
}
}
@@ -155,6 +169,7 @@ void RTCState::onTimeout(uint32_t seq, bool lost) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
received_timeouts_++;
@@ -162,11 +177,12 @@ void RTCState::onTimeout(uint32_t seq, bool lost) {
}
void RTCState::onLossDetected(uint32_t seq) {
- if (!indexer_->isFec(seq)) {
- packets_lost_++;
- } else if (skipped_interests_.find(seq) == skipped_interests_.end() &&
- seq >= first_interest_sent_seq_) {
+ PacketState state = getPacketState(seq);
+
+ // if the packet is already marked with a state, do nothing
+ if (state == PacketState::UNKNOWN) {
packets_lost_++;
+ addToPacketCache(seq, PacketState::LOST);
}
}
@@ -178,30 +194,40 @@ void RTCState::onRetransmission(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
-#if 0
- packets_lost_++;
-#endif
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
sent_rtx_++;
sent_rtx_last_round_++;
}
+void RTCState::onPossibleLossWithNoRtx(uint32_t seq) {
+ // if fec is on or rtx is disable we don't need to do anything to recover a
+ // packet. however in both cases we need to remove possible missing packets
+ // from the window of pendinig interest in order to free space without wating
+ // for the timeout.
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+}
+
void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
bool compute_stats) {
uint32_t seq = content_object.getName().getSuffix();
+
if (compute_stats) {
updatePathStats(content_object, false);
received_data_last_round_++;
}
received_data_++;
+ packets_sent_to_app_++;
- struct data_packet_t *data_pkt =
- (struct data_packet_t *)content_object.getPayload()->data();
- uint64_t production_time = data_pkt->getTimestamp();
- if (last_prod_update_ < production_time) {
- last_prod_update_ = production_time;
- uint32_t production_rate = data_pkt->getProductionRate();
- production_rate_ = (double)production_rate;
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+
+ if (last_prod_update_ < params.timestamp) {
+ last_prod_update_ = params.timestamp;
+ production_rate_ = (double)params.prod_rate;
}
updatePacketSize(content_object);
@@ -219,9 +245,18 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
- updateReceivedBytes(content_object);
+ // updateReceivedBytes(content_object);
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+
+ PacketState state = getPacketState(seq);
+ if (state != PacketState::LOST) {
+ // increase only for not lost packets
+ received_fec_pkt_++;
+ }
addRecvOrLost(seq, PacketState::RECEIVED);
- received_fec_pkt_++;
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -233,7 +268,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
uint64_t production_time = nack_pkt->getTimestamp();
- uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_seq = nack_pkt->getProductionSegment();
uint32_t production_rate = nack_pkt->getProductionRate();
if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
@@ -255,6 +290,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
received_nacks_++;
received_nacks_last_round_++;
+ bool to_delete = false;
if (production_seq > seq) {
// old nack, seq is lost
// update last nacked
@@ -266,12 +302,19 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
// future nack
// remove the nack from the pending interest map
// (the packet is not received/lost yet)
- if (indexer_->isFec(seq)) pending_fec_pkt_--;
- pending_interests_.erase(seq);
+ to_delete = true;
} else {
// this should be a quite rear event. simply remove the
// packet from the pending interest list
- pending_interests_.erase(seq);
+ to_delete = true;
+ }
+
+ if (to_delete) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
}
// the producer is responding
@@ -295,44 +338,58 @@ void RTCState::onPacketLost(uint32_t seq) {
}
#endif
if (!indexer_->isFec(seq)) {
- definitely_lost_pkt_++;
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST || state == PacketState::UNKNOWN) {
+ definitely_lost_pkt_++;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ }
}
- addRecvOrLost(seq, PacketState::LOST);
+ addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
}
void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+ packets_sent_to_app_++;
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
}
-void RTCState::onPacketRecoveredFec(uint32_t seq) {
+void RTCState::onFecPacketRecoveredRtx(uint32_t seq) {
+ // This is the same as onPacketRecoveredRtx, but in this is case the
+ // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+ losses_recovered_++;
+}
+
+void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
losses_recovered_++;
+ packets_sent_to_app_++;
+ recovered_bytes_with_fec_ += size;
+
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+
+ // adding header to the count
+ recovered_bytes_with_fec_ += 60; // XXX get header size some where
+
+ if (getPacketState(seq) == PacketState::UNKNOWN)
+ onLossDetected(seq); // the pkt was lost but didn't account for it yet
+
addRecvOrLost(seq, PacketState::RECEIVED);
}
bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
- uint64_t rtt;
-
- rtt = rtt_probes_->getRtt(seq);
+ uint64_t rtt;
+ rtt = probe_handler_->getRtt(seq);
if (rtt == 0) return false; // this is not a valid probe
- // like for data and nacks update the path stats. Here the RTT is computed
- // by the probe handler. Both probes for rtt and bw are good to esimate
- // info on the path
+ // Like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to estimate
+ // info on the path.
uint32_t path_label = probe.getPathLabel();
-
auto path_it = path_table_.find(path_label);
- // update production rate and last_seq_nacked like in case of a nack
- struct nack_packet_t *probe_pkt =
- (struct nack_packet_t *)probe.getPayload()->data();
- uint64_t sender_timestamp = probe_pkt->getTimestamp();
- uint32_t production_seq = probe_pkt->getProductionSegement();
- uint32_t production_rate = probe_pkt->getProductionRate();
-
if (path_it == path_table_.end()) {
// found a new path
std::shared_ptr<RTCDataPath> newPath =
@@ -344,26 +401,26 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
auto path = path_it->second;
- path->insertRttSample(rtt);
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
path->receivedNack();
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
- int64_t OWD = now - sender_timestamp;
+ int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
- if (last_prod_update_ < sender_timestamp) {
- last_production_seq_ = production_seq;
- last_prod_update_ = sender_timestamp;
- production_rate_ = (double)production_rate;
+ if (last_prod_update_ < params.timestamp) {
+ last_production_seq_ = params.prod_seg;
+ last_prod_update_ = params.timestamp;
+ production_rate_ = (double)params.prod_rate;
}
// the producer is responding
// we consider it active only if the production rate is not 0
// or the production sequence numner is not 1
- if (production_rate_ != 0 || production_seq != 1) {
+ if (production_rate_ != 0 || params.prod_seg != 1) {
producer_is_active_ = true;
}
@@ -375,7 +432,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) {
if (received_probes_ == 1) {
// we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the
- // others
+ // others.
main_path_ = path;
setInitRttTimer(INIT_RTT_PROBE_WAIT);
}
@@ -393,11 +450,21 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
return true;
}
-void RTCState::onNewRound(double round_len, bool in_sync) {
- // XXX
- // here we take into account only the single path case so we assume that we
- // don't use two paths in parellel for this single flow
+void RTCState::onJumpForward(uint32_t next_seq) {
+ for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
+ seq++) {
+ auto it = pending_interests_.find(seq);
+ PacketState packet_state = getPacketState(seq);
+ if (it == pending_interests_.end() &&
+ packet_state != PacketState::RECEIVED &&
+ packet_state != PacketState::DEFINITELY_LOST) {
+ onLossDetected(seq);
+ onPacketLost(seq);
+ }
+ }
+}
+void RTCState::onNewRound(double round_len, bool in_sync) {
if (path_table_.empty()) return;
double bytes_per_sec =
@@ -407,7 +474,24 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
else
received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+ double fec_bytes_per_sec =
+ ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len));
+
+ if (fec_received_rate_ == 0)
+ fec_received_rate_ = fec_bytes_per_sec;
+ else
+ fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec);
+
+ double fec_recovered_bytes_per_sec =
+ ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len));
+ if (fec_recovered_rate_ == 0)
+ fec_recovered_rate_ = fec_recovered_bytes_per_sec;
+ else
+ fec_recovered_rate_ =
+ (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
+
+#if 0
// search for an active path. There should be only one active path (meaning a
// path that leads to the producer socket -no cache- and from which we are
// currently getting data packets) at any time. However it may happen that
@@ -428,9 +512,36 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
}
+#endif
+
+ // search for an active path. Is it possible to have multiple path that are
+ // used at the same time. We use as reference path the one from where we gets
+ // more packets. This means that the path should have better lantecy or less
+ // channel losses
+
+ uint32_t last_round_packets = 0;
+ std::shared_ptr<RTCDataPath> old_main_path = main_path_;
+ main_path_ = nullptr;
+
+ for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
+ if (it->second->isActive()) {
+ uint32_t pkt = it->second->getPacketsLastRound();
+ if (pkt > last_round_packets) {
+ last_round_packets = pkt;
+ main_path_ = it->second;
+ }
+ }
+ it->second->roundEnd();
+ }
- // if (in_sync) updateLossRate();
- updateLossRate();
+ if (main_path_ == nullptr) main_path_ = old_main_path;
+
+ // in case we get a new main path we reset the stats of the old one. this is
+ // beacuse, in case we need to switch back we don't what to take decisions on
+ // old stats that may be outdated.
+ if (main_path_ != old_main_path) old_main_path->clearRtt();
+
+ updateLossRate(in_sync);
// handle nacks
if (!nack_on_last_round_ && received_bytes_ > 0) {
@@ -460,6 +571,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// reset counters
received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
packets_lost_ = 0;
definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
@@ -516,20 +629,16 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
// it means that we are processing an interest
// that is not pending
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t RTT = now - interest_sent_time;
- path->insertRttSample(RTT);
+ path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false);
// compute OWD (the first part of the nack and data packet header are the
// same, so we cast to data data packet)
- struct data_packet_t *packet =
- (struct data_packet_t *)content_object.getPayload()->data();
- uint64_t sender_timestamp = packet->getTimestamp();
- int64_t OWD = now - sender_timestamp;
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+ int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
// compute IAT or set path to producer
@@ -543,59 +652,106 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
}
}
-void RTCState::updateLossRate() {
+void RTCState::updateLossRate(bool in_sync) {
last_round_loss_rate_ = loss_rate_;
loss_rate_ = 0.0;
- residual_loss_rate_ = 0.0;
uint32_t number_theorically_received_packets_ =
highest_seq_received_ - first_seq_in_round_;
- // in this case no new packet was recevied after the previuos round, avoid
- // division by 0
- if (number_theorically_received_packets_ == 0) return;
-
// XXX this may be quite inefficient if the rate is high
// maybe is better to iterate over the set?
- for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) {
- auto it = skipped_interests_.find(i);
- if (it != skipped_interests_.end()) {
+
+ uint32_t fec_packets = 0;
+ for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) {
+ PacketState state = getPacketState(i);
+ if (state == PacketState::SKIPPED) {
if (number_theorically_received_packets_ > 0)
number_theorically_received_packets_--;
- skipped_interests_.erase(it);
}
+ if (indexer_->isFec(i)) fec_packets++;
}
+ if (indexer_->isFec(highest_seq_received_)) fec_packets++;
- loss_rate_ = (double)((double)(packets_lost_) /
- (double)number_theorically_received_packets_);
+ // in this case no new packet was received after the previous round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return;
- if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec
- if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_;
+ if (number_theorically_received_packets_ != 0)
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+ else
+ // we didn't receive anything except NACKs that triggered losses
+ loss_rate_ = 1.0;
- if (avg_loss_rate_ == 0)
+ if (avg_loss_rate_ == -1.0)
avg_loss_rate_ = loss_rate_;
else
avg_loss_rate_ =
avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA);
- residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
- (double)number_theorically_received_packets_);
+ // update counters for loss rate per second
+ total_expected_packets_ += number_theorically_received_packets_;
+ lost_per_sec_ += packets_lost_;
+
+ if (in_sync) {
+ // update counters for residual losses
+ // fec packets are not sent to the app so we don't want to count them here
+ expected_packets_ +=
+ ((highest_seq_received_ - first_seq_in_round_) - fec_packets);
+ } else {
+ packets_sent_to_app_ = 0;
+ }
+
+ if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) {
+ // compute loss rate per second
+ if (lost_per_sec_ > total_expected_packets_)
+ lost_per_sec_ = total_expected_packets_;
+
+ if (total_expected_packets_ == 0)
+ per_sec_loss_rate_ = 0;
+ else
+ per_sec_loss_rate_ =
+ (double)((double)(lost_per_sec_) / (double)total_expected_packets_);
+
+ loss_history_.pushBack(per_sec_loss_rate_);
+ max_loss_rate_ = getMaxLoss();
+
+ if (in_sync && expected_packets_ != 0) {
+ // compute residual loss rate
+ if (packets_sent_to_app_ > expected_packets_) {
+ // this may happen if we get packet from the prev bin that get recovered
+ // on the current one
+ packets_sent_to_app_ = expected_packets_;
+ }
+
+ residual_loss_rate_ =
+ 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_);
+ if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0;
+ }
+
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
+ }
+
+ rounds_from_last_compute_++;
+}
- if (residual_loss_rate_ < 0) residual_loss_rate_ = 0;
+void RTCState::dataToBeReceived(uint32_t seq) {
+ addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
}
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
- if (indexer_->isFec(seq)) {
- pending_fec_pkt_--;
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
}
- pending_interests_.erase(seq);
- if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
- received_or_lost_packets_.erase(received_or_lost_packets_.begin());
- }
- // notice that it may happen that a packet that we consider lost arrives after
- // some time, in this case we simply overwrite the packet state.
- received_or_lost_packets_[seq] = state;
+ addToPacketCache(seq, state);
// keep track of the last packet received/lost
// without holes.
@@ -608,16 +764,25 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
} else if (seq <= highest_seq_received_in_order_) {
// here we do nothing
} else if (seq > highest_seq_received_in_order_) {
- // 1) there is a gap in the sequence so we do not update largest_in_seq_
- // 2) all the packets from largest_in_seq_ to seq are in
- // received_or_lost_packets_ an we upate largest_in_seq_
- // or are FEC packets
+ // 1) there is a gap in the sequence so we do not update
+ // highest_seq_received_in_order_
+ // 2) all the packets from highest_seq_received_in_order_ to seq are
+ // received or lost or are fec packetis. In this case we increase
+ // highest_seq_received_in_order_ until we find an hole in the sequence
for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
- if (received_or_lost_packets_.find(i) ==
- received_or_lost_packets_.end() &&
- !indexer_->isFec(i)) {
- break;
+ PacketState state = getPacketState(i);
+ if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) {
+ if (indexer_->isFec(i)) {
+ // this is a fec packet and we don't care to receive it
+ // however we may need to increse the number or lost packets
+ // XXX: in case we want to use rtx to recover fec packets,
+ // this may prevent to detect a packet loss and no rtx will be sent
+ onLossDetected(i);
+ } else {
+ // this is a data packet and we need to get it
+ break;
+ }
}
// this packet is in order so we can update the
// highest_seq_received_in_order_
@@ -629,9 +794,14 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
void RTCState::setInitRttTimer(uint32_t wait) {
init_rtt_timer_->cancel();
init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
- init_rtt_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCState> self = shared_from_this();
+ init_rtt_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- checkInitRttTimer();
+
+ if (auto ptr = self.lock()) {
+ ptr->checkInitRttTimer();
+ }
});
}
@@ -639,19 +809,25 @@ void RTCState::checkInitRttTimer() {
if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
// we didn't received enough probes, restart
received_probes_ = 0;
- rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
- rtt_probes_->sendProbes();
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
setInitRttTimer(INIT_RTT_PROBE_RESTART);
return;
}
+
init_rtt_ = true;
main_path_->roundEnd();
- rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0);
- rtt_probes_->sendProbes();
+ loss_history_.pushBack(probe_handler_->getProbeLossRate());
+ max_loss_rate_ = getMaxLoss();
+
+ probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
+ probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
+ probe_handler_->sendProbes();
// init last_seq_nacked_. skip packets that may come from the cache
double prod_rate = getProducerRate();
- double rtt = (double)getRTT() / MILLI_IN_A_SEC;
+ double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
@@ -659,6 +835,68 @@ void RTCState::checkInitRttTimer() {
discovered_rtt_callback_();
}
+double RTCState::getMaxLoss() {
+ if (loss_history_.size() != 0) return loss_history_.begin();
+ return 0;
+}
+
+core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params;
+
+ switch (ProbeHandler::getProbeType(seq)) {
+ case ProbeType::INIT: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(probe));
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ case ProbeType::RTT: {
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = probe_pkt->getTimestamp(),
+ .prod_rate = probe_pkt->getProductionRate(),
+ .prod_seg = probe_pkt->getProductionSegment(),
+ };
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
+core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
+ core::ParamsRTC params;
+
+ switch (data.getPayloadType()) {
+ case core::PayloadType::DATA: {
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)data.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = data_pkt->getTimestamp(),
+ .prod_rate = data_pkt->getProductionRate(),
+ .prod_seg = data.getName().getSuffix(),
+ };
+ break;
+ }
+ case core::PayloadType::MANIFEST: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(data));
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
} // namespace rtc
} // namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 729ba7a1b..8bf48ccc2 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,13 +14,16 @@
*/
#pragma once
+#include <core/facade.h>
#include <hicn/transport/config.h>
#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
+#include <hicn/transport/utils/rtc_quality_score.h>
#include <protocols/indexer.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_data_path.h>
+#include <utils/max_filter.h>
#include <map>
#include <set>
@@ -31,25 +34,50 @@ namespace protocol {
namespace rtc {
-enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
+// packet state
+// RECEIVED: the packet was already received
+// LOST: the packet is marked as lost but can be recovered
+// DEFINITELY_LOST: the packet is lost and cannot be recovered
+// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec
+// decoder may decide to send the packet directly to the app. to avoid
+// duplicated the packet is marked with this state
+// SKIPPED: an interest that was not sent, only for FEC packets
+// UNKNOWN: unknown state
+enum class PacketState : uint8_t {
+ RECEIVED,
+ TO_BE_RECEIVED,
+ LOST,
+ DEFINITELY_LOST,
+ SKIPPED,
+ UNKNOWN
+};
+
+class RTCState : public std::enable_shared_from_this<RTCState> {
+ using PendingInterestsMap = std::map<uint32_t, uint64_t>;
+
+ private:
+ const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small
+ // for high rate apps
-class RTCState : std::enable_shared_from_this<RTCState> {
public:
using DiscoveredRttCallback = std::function<void()>;
public:
- RTCState(Indexer *indexer,
- ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service);
~RTCState();
+ // initialization
+ void initParams();
+
// packet events
void onSendNewInterest(const core::Name *interest_name);
void onTimeout(uint32_t seq, bool lost);
void onLossDetected(uint32_t seq);
void onRetransmission(uint32_t seq);
+ void onPossibleLossWithNoRtx(uint32_t seq);
void onDataPacketReceived(const core::ContentObject &content_object,
bool compute_stats);
void onFecPacketReceived(const core::ContentObject &content_object);
@@ -57,8 +85,10 @@ class RTCState : std::enable_shared_from_this<RTCState> {
bool compute_stats);
void onPacketLost(uint32_t seq);
void onPacketRecoveredRtx(uint32_t seq);
- void onPacketRecoveredFec(uint32_t seq);
+ void onFecPacketRecoveredRtx(uint32_t seq);
+ void onPacketRecoveredFec(uint32_t seq, uint32_t size);
bool onProbePacketReceived(const core::ContentObject &probe);
+ void onJumpForward(uint32_t next_seq);
// protocol state
void onNewRound(double round_len, bool in_sync);
@@ -72,10 +102,21 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// delay metrics
bool isRttDiscovered() const { return init_rtt_; }
- uint64_t getRTT() const {
+ uint64_t getMinRTT() const {
if (mainPathIsValid()) return main_path_->getMinRtt();
return 0;
}
+
+ uint64_t getAvgRTT() const {
+ if (mainPathIsValid()) return main_path_->getAvgRtt();
+ return 0;
+ }
+
+ uint64_t getMaxRTT() const {
+ if (mainPathIsValid()) return main_path_->getMaxRtt();
+ return 0;
+ }
+
void resetRttStats() {
if (mainPathIsValid()) main_path_->clearRtt();
}
@@ -98,6 +139,7 @@ class RTCState : std::enable_shared_from_this<RTCState> {
uint64_t getInterestSentTime(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) return it->second;
+
return 0;
}
@@ -110,14 +152,15 @@ class RTCState : std::enable_shared_from_this<RTCState> {
return pending_interests_.size();
}
- PacketState isReceivedOrLost(uint32_t seq) {
- auto it = received_or_lost_packets_.find(seq);
- if (it != received_or_lost_packets_.end()) return it->second;
+ PacketState getPacketState(uint32_t seq) {
+ auto it = packet_cache_.find(seq);
+ if (it != packet_cache_.end()) return it->second;
return PacketState::UNKNOWN;
}
// loss rate
- double getLossRate() const { return loss_rate_; }
+ double getPerRoundLossRate() const { return loss_rate_; }
+ double getPerSecondLossRate() const { return per_sec_loss_rate_; }
double getAvgLossRate() const { return avg_loss_rate_; }
double getMaxLossRate() const { return max_loss_rate_; }
double getLastRoundLossRate() const { return last_round_loss_rate_; }
@@ -134,15 +177,22 @@ class RTCState : std::enable_shared_from_this<RTCState> {
return highest_seq_received_in_order_;
}
+ double getMaxLoss();
+
// fec packets
uint32_t getReceivedFecPackets() const { return received_fec_pkt_; }
uint32_t getPendingFecPackets() const { return pending_fec_pkt_; }
// generic stats
uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; }
+ uint32_t getRecoveredFecBytesInRound() const {
+ return recovered_bytes_with_fec_;
+ }
uint32_t getReceivedNacksInRound() const {
return received_nacks_last_round_;
}
+ uint32_t getReceivedDataInRound() const { return received_data_last_round_; }
uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
@@ -150,6 +200,9 @@ class RTCState : std::enable_shared_from_this<RTCState> {
double getAvailableBw() const { return 0.0; }; // TODO
double getProducerRate() const { return production_rate_; }
double getReceivedRate() const { return received_rate_; }
+ double getReceivedFecRate() const { return fec_received_rate_; }
+ double getRecoveredFecRate() const { return fec_recovered_rate_; }
+
double getAveragePacketSize() const { return avg_packet_size_; }
// nacks
@@ -162,22 +215,46 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// packets from cache
double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
- std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() {
+ PendingInterestsMap::iterator getPendingInterestsMapBegin() {
return pending_interests_.begin();
}
- std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() {
+ PendingInterestsMap::iterator getPendingInterestsMapEnd() {
return pending_interests_.end();
}
+ // quality
+ uint8_t getQualityScore() {
+ uint8_t qs = quality_score_.getQualityScore(
+ getMaxRTT(), std::round(getResidualLossRate() * 100));
+ return qs;
+ }
+
+ // We received a data pkt that will be set to RECEIVED, but first we have to
+ // go through FEC. We do not want to consider this pkt as recovered, thus we
+ // set it as TO_BE_RECEIVED.
+ void dataToBeReceived(uint32_t seq);
+
+ // Extract RTC parameters from probes (init or RTT probes) and data packets.
+ static core::ParamsRTC getProbeParams(const core::ContentObject &probe);
+ static core::ParamsRTC getDataParams(const core::ContentObject &data);
+
private:
- void initParams();
+ void addToPacketCache(uint32_t seq, PacketState state) {
+ // this function adds or updates the current state
+ if (packet_cache_.size() >= MAX_CACHED_PACKETS) {
+ packet_cache_.erase(packet_cache_.begin());
+ }
+ packet_cache_[seq] = state;
+ }
+
+ void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); }
// update stats
void updateState();
void updateReceivedBytes(const core::ContentObject &content_object);
void updatePacketSize(const core::ContentObject &content_object);
void updatePathStats(const core::ContentObject &content_object, bool is_nack);
- void updateLossRate();
+ void updateLossRate(bool in_sycn);
void addRecvOrLost(uint32_t seq, PacketState state);
@@ -211,22 +288,39 @@ class RTCState : std::enable_shared_from_this<RTCState> {
double avg_loss_rate_;
double max_loss_rate_;
double last_round_loss_rate_;
+ utils::MaxFilter<double> loss_history_;
+
+ // per second loss rate
+ uint32_t lost_per_sec_;
+ uint32_t total_expected_packets_;
+ double per_sec_loss_rate_;
+
+ // conunters for residual losses
+ // residual losses are computed every second and are used
+ // as feedback to the upper levels (e.g application)
+ uint32_t expected_packets_;
+ uint32_t packets_sent_to_app_;
+ uint32_t rounds_from_last_compute_;
double residual_loss_rate_;
// bw counters
uint32_t received_bytes_;
+ uint32_t received_fec_bytes_;
+ uint32_t recovered_bytes_with_fec_;
double avg_packet_size_;
- double production_rate_; // rate communicated by the producer using nacks
- double received_rate_; // rate recevied by the consumer
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer (only data)
+ double fec_received_rate_; // fec rate recevied by the consumer
+ double fec_recovered_rate_; // rate recovered using fec
- // nack counter
+ // nack counters
// the bool takes tracks only about the valid nacks (no rtx) and it is used to
// switch between the states. Instead received_nacks_last_round_ logs all the
// nacks for statistics
bool nack_on_last_round_;
uint32_t received_nacks_last_round_;
- // packets counter
+ // packets counters
uint32_t received_packets_last_round_;
uint32_t received_data_last_round_;
uint32_t received_data_from_cache_;
@@ -234,11 +328,11 @@ class RTCState : std::enable_shared_from_this<RTCState> {
uint32_t sent_interests_last_round_;
uint32_t sent_rtx_last_round_;
- // fec counter
+ // fec counters
uint32_t received_fec_pkt_;
uint32_t pending_fec_pkt_;
- // round conunters
+ // round counters
uint32_t rounds_;
uint32_t rounds_without_nacks_;
uint32_t rounds_without_packets_;
@@ -261,23 +355,27 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// packet received
// cache where to store info about the last MAX_CACHED_PACKETS
- std::map<uint32_t, PacketState> received_or_lost_packets_;
+ // these are packets that are received or lost or definitely lost and are not
+ // anymore in the pending intetest list
+ std::map<uint32_t, PacketState> packet_cache_;
// pending interests
- std::map<uint32_t, uint64_t> pending_interests_;
+ PendingInterestsMap pending_interests_;
// indexer
Indexer *indexer_;
- // skipped interests
+ // used to keep track of the skipped interests
uint32_t last_interest_sent_;
- std::unordered_set<uint32_t> skipped_interests_;
// probes
- std::shared_ptr<ProbeHandler> rtt_probes_;
+ std::shared_ptr<ProbeHandler> probe_handler_;
bool init_rtt_;
std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+ // quality score
+ RTCQualityScore quality_score_;
+
// callbacks
DiscoveredRttCallback discovered_rtt_callback_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
new file mode 100644
index 000000000..29968dd02
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -0,0 +1,238 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/facade.h>
+#include <protocols/rtc/rtc_packet.h>
+#include <protocols/rtc/rtc_verifier.h>
+
+namespace transport {
+namespace protocol {
+namespace rtc {
+
+RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t max_unverified_delay)
+ : verifier_(verifier), max_unverified_delay_(max_unverified_delay) {}
+
+void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
+ rtc_state_ = rtc_state;
+}
+
+void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
+ verifier_ = verifier;
+}
+
+void RTCVerifier::setMaxUnverifiedDelay(uint32_t max_unverified_delay) {
+ max_unverified_delay_ = max_unverified_delay;
+}
+
+auth::VerificationPolicy RTCVerifier::verify(
+ core::ContentObject &content_object, bool is_fec) {
+ uint32_t suffix = content_object.getName().getSuffix();
+ core::PayloadType payload_type = content_object.getPayloadType();
+
+ bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE;
+ bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
+ bool is_manifest = !is_probe && !is_nack && !is_fec &&
+ payload_type == core::PayloadType::MANIFEST;
+ bool is_data = !is_probe && !is_nack && !is_fec &&
+ payload_type == core::PayloadType::DATA;
+
+ if (is_probe) return verifyProbe(content_object);
+ if (is_nack) return verifyNack(content_object);
+ if (is_fec) return verifyFec(content_object);
+ if (is_data) return verifyData(content_object);
+ if (is_manifest) return verifyManifest(content_object);
+
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
+}
+
+auth::VerificationPolicy RTCVerifier::verifyProbe(
+ core::ContentObject &content_object) {
+ switch (ProbeHandler::getProbeType(content_object.getName().getSuffix())) {
+ case ProbeType::INIT: {
+ auth::VerificationPolicy policy = verifyManifest(content_object);
+ if (policy != auth::VerificationPolicy::ACCEPT) {
+ return policy;
+ }
+ return processManifest(content_object);
+ }
+ case ProbeType::RTT:
+ return verifyNack(content_object);
+ default:
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ verifier_->callVerificationFailedCallback(
+ content_object.getName().getSuffix(), policy);
+ return policy;
+ }
+}
+
+auth::VerificationPolicy RTCVerifier::verifyNack(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::verifyFec(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::verifyData(
+ core::ContentObject &content_object) {
+ uint32_t suffix = content_object.getName().getSuffix();
+
+ if (_is_ah(content_object.getFormat())) {
+ return verifier_->verifyPackets(&content_object);
+ }
+
+ unverified_bytes_[suffix] =
+ content_object.headerSize() + content_object.payloadSize();
+ unverified_packets_[suffix] =
+ content_object.computeDigest(manifest_hash_algo_);
+
+ // An alert is raised when too much packets remain unverified
+ if (getTotalUnverified() > max_unverified_bytes_) {
+ unverified_bytes_.clear();
+ unverified_packets_.clear();
+
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
+ }
+
+ return auth::VerificationPolicy::ACCEPT;
+}
+
+auth::VerificationPolicy RTCVerifier::verifyManifest(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::processManifest(
+ core::ContentObject &content_object) {
+ uint32_t suffix = content_object.getName().getSuffix();
+
+ core::ContentObjectManifest manifest(content_object);
+ manifest.decode();
+
+ // Update last manifest
+ if (suffix > last_manifest_) {
+ last_manifest_ = suffix;
+ }
+
+ // Extract parameters
+ manifest_hash_algo_ = manifest.getHashAlgorithm();
+ core::ParamsRTC params = manifest.getParamsRTC();
+
+ if (params.prod_rate > 0) {
+ max_unverified_bytes_ = static_cast<uint64_t>(
+ (max_unverified_delay_ / 1000.0) * params.prod_rate);
+ }
+
+ if (max_unverified_bytes_ == 0 || !rtc_state_) {
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
+ }
+
+ // Extract hashes
+ auth::Verifier::SuffixMap suffix_map =
+ core::ContentObjectManifest::getSuffixMap(&manifest);
+
+ // Return early if the manifest is empty
+ if (suffix_map.empty()) {
+ return auth::VerificationPolicy::ACCEPT;
+ }
+
+ // Remove lost packets from digest map
+ manifest_digests_.insert(suffix_map.begin(), suffix_map.end());
+ for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) {
+ if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) {
+ unverified_packets_.erase(it->first);
+ unverified_bytes_.erase(it->first);
+ it = manifest_digests_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ // Verify packets
+ auth::Verifier::PolicyMap policies =
+ verifier_->verifyHashes(unverified_packets_, manifest_digests_);
+
+ for (const auto &policy : policies) {
+ switch (policy.second) {
+ case auth::VerificationPolicy::ACCEPT: {
+ manifest_digests_.erase(policy.first);
+ unverified_packets_.erase(policy.first);
+ unverified_bytes_.erase(policy.first);
+ break;
+ }
+ case auth::VerificationPolicy::UNKNOWN:
+ break;
+ case auth::VerificationPolicy::DROP:
+ case auth::VerificationPolicy::ABORT:
+ auth::VerificationPolicy p = policy.second;
+ verifier_->callVerificationFailedCallback(policy.first, p);
+ return p;
+ }
+ }
+
+ return auth::VerificationPolicy::ACCEPT;
+}
+
+void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
+ manifest_digests_.erase(suffix);
+}
+
+void RTCVerifier::onJumpForward(uint32_t next_suffix) {
+ if (next_suffix <= last_manifest_ + 1) {
+ return;
+ }
+
+ // When we jump forward in the suffix sequence, we remove packets that
+ // probably won't be verified. Those packets have a suffix in the range
+ // [last_manifest_ + 1, next_suffix[.
+ for (auto it = unverified_packets_.begin();
+ it != unverified_packets_.end();) {
+ if (it->first > last_manifest_) {
+ unverified_bytes_.erase(it->first);
+ it = unverified_packets_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+uint32_t RTCVerifier::getTotalUnverified() const {
+ uint32_t total = 0;
+
+ for (auto bytes : unverified_bytes_) {
+ if (bytes.second > UINT32_MAX - total) {
+ total = UINT32_MAX;
+ break;
+ }
+ total += bytes.second;
+ }
+
+ return total;
+}
+
+uint32_t RTCVerifier::getMaxUnverified() const { return max_unverified_bytes_; }
+
+} // end namespace rtc
+} // end namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
new file mode 100644
index 000000000..596bd8536
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/facade.h>
+#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/content_object.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+namespace protocol {
+namespace rtc {
+
+class RTCVerifier {
+ public:
+ RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t max_unverified_delay);
+
+ virtual ~RTCVerifier() = default;
+
+ void setState(std::shared_ptr<RTCState> rtc_state);
+
+ void setVerifier(std::shared_ptr<auth::Verifier> verifier);
+
+ void setMaxUnverifiedDelay(uint32_t max_unverified_delay);
+
+ void onDataRecoveredFec(uint32_t suffix);
+ void onJumpForward(uint32_t next_suffix);
+
+ uint32_t getTotalUnverified() const;
+ uint32_t getMaxUnverified() const;
+
+ auth::VerificationPolicy verify(core::ContentObject &content_object,
+ bool is_fec = false);
+ auth::VerificationPolicy verifyProbe(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyNack(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyFec(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyData(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyManifest(core::ContentObject &content_object);
+
+ auth::VerificationPolicy processManifest(core::ContentObject &content_object);
+
+ protected:
+ // The RTC state.
+ std::shared_ptr<RTCState> rtc_state_;
+ // The verifier instance.
+ std::shared_ptr<auth::Verifier> verifier_;
+ // Hash algorithm used by manifests.
+ auth::CryptoHashType manifest_hash_algo_;
+ // The last manifest processed.
+ auth::Suffix last_manifest_;
+ // Hold digests extracted from all manifests received.
+ auth::Verifier::SuffixMap manifest_digests_;
+ // Hold hashes of all content objects received before they are verified.
+ auth::Verifier::SuffixMap unverified_packets_;
+ // Hold number of unverified bytes.
+ std::unordered_map<auth::Suffix, uint32_t> unverified_bytes_;
+ // Maximum delay (in ms) for an unverified byte to become verifed.
+ uint32_t max_unverified_delay_;
+ // Maximum number of unverified bytes before aborting the connection.
+ uint64_t max_unverified_bytes_;
+};
+
+} // end namespace rtc
+} // namespace protocol
+} // namespace transport