aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2022-04-22 17:55:01 +0200
committerMauro Sardara <msardara@cisco.com>2022-04-26 15:30:21 +0200
commita1ac96f497719b897793ac14b287cb8d840651c1 (patch)
tree12c608fe352c21d944b0340ce8d3f0be0fb23b11 /libtransport/src
parent1ac07d842a3a6ce0fb7fa4039241c8ec1a71419b (diff)
HICN-722: Updates on transport, RTC, manifest usage for RTC, infra.
Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Angelo Mantellini <manangel@cisco.com> Co-authored-by: Jacques Samain <jsamain@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> manifest: optimize manifest processing manifest: add FEC parameters to manifests manifest: refactor verification process manifest: report auth alerts in hiperf instead of aborting manifest: remove FEC buffer callback in consumer manifest: refactor and enable manifests by default manifest: update manifest header with transport parameters manifest: batch interests for first manifest from RTC producer manifest: refactor processing of RTC manifests manifest: update manifest-related socket options of consumers manifest: update unit tests for manifests manifest: pack manifest headers manifest: verify FEC packets auth: add consumer socket option to set max unverified delay manifest: process manifests after full FEC decoding manifest: manage forward jumps in RTC verifier fec: remove useless fec codes rs: add new code rate rs: add new code rate rs: add new code rate rs: add new code rate libtransport: increase internal packet cache size remove internal cisco info in cmake manifest: add option to set manifest capacity data_input_node.c: add information about adj_index[VLIB_RX] on received data packetsi sysrepo plugin: update build Change-Id: I0cf64d91bd0a1b7cad4eeaa9871f58f5f10434af Signed-off-by: Mauro Sardara <msardara@cisco.com> Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/auth/crypto_hash.cc40
-rw-r--r--libtransport/src/auth/signer.cc53
-rw-r--r--libtransport/src/auth/verifier.cc49
-rw-r--r--libtransport/src/core/manifest_format.h5
-rw-r--r--libtransport/src/core/manifest_format_fixed.cc8
-rw-r--r--libtransport/src/core/manifest_format_fixed.h6
-rw-r--r--libtransport/src/core/packet.cc13
-rw-r--r--libtransport/src/implementation/socket_consumer.h35
-rw-r--r--libtransport/src/protocols/fec/fec.cc2
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc96
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.h7
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc37
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h8
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc159
-rw-r--r--libtransport/src/protocols/rtc/rtc.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h7
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc42
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h4
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc71
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h19
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc81
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h31
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc36
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc44
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc11
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc8
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc9
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc138
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h37
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc249
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h96
-rw-r--r--libtransport/src/protocols/transport_protocol.cc6
-rw-r--r--libtransport/src/test/test_auth.cc24
-rw-r--r--libtransport/src/test/test_core_manifest.cc2
-rw-r--r--libtransport/src/utils/max_filter.h6
-rw-r--r--libtransport/src/utils/min_filter.h6
41 files changed, 898 insertions, 565 deletions
diff --git a/libtransport/src/auth/crypto_hash.cc b/libtransport/src/auth/crypto_hash.cc
index f60f46051..08be47aff 100644
--- a/libtransport/src/auth/crypto_hash.cc
+++ b/libtransport/src/auth/crypto_hash.cc
@@ -14,6 +14,9 @@
*/
#include <hicn/transport/auth/crypto_hash.h>
+#include <hicn/transport/core/global_object_pool.h>
+
+#include "glog/logging.h"
namespace transport {
namespace auth {
@@ -27,18 +30,20 @@ CryptoHash::CryptoHash(const CryptoHash &other)
CryptoHash::CryptoHash(CryptoHash &&other)
: digest_type_(std::move(other.digest_type_)),
- digest_(other.digest_),
- digest_size_(other.digest_size_) {
- other.reset();
-}
+ digest_(std::move(other.digest_)),
+ digest_size_(other.digest_size_) {}
-CryptoHash::CryptoHash(CryptoHashType hash_type) { setType(hash_type); }
+CryptoHash::CryptoHash(CryptoHashType hash_type)
+ : digest_(core::PacketManager<>::getInstance().getMemBuf()) {
+ setType(hash_type);
+}
CryptoHash::CryptoHash(const uint8_t *hash, size_t size,
CryptoHashType hash_type)
: digest_type_(hash_type), digest_size_(size) {
- digest_.resize(size);
- memcpy(digest_.data(), hash, size);
+ digest_ = core::PacketManager<>::getInstance().getMemBuf();
+ digest_->append(size);
+ memcpy(digest_->writableData(), hash, size);
}
CryptoHash::CryptoHash(const std::vector<uint8_t> &hash,
@@ -55,7 +60,7 @@ CryptoHash &CryptoHash::operator=(const CryptoHash &other) {
}
bool CryptoHash::operator==(const CryptoHash &other) const {
- return (digest_type_ == other.digest_type_ && digest_ == other.digest_);
+ return (digest_type_ == other.digest_type_ && *digest_ == *other.digest_);
}
void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) {
@@ -65,8 +70,8 @@ void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) {
throw errors::RuntimeException("Unknown hash type");
}
- EVP_Digest(buffer, len, digest_.data(), (unsigned int *)&digest_size_,
- (*hash_evp)(), nullptr);
+ EVP_Digest(buffer, len, digest_->writableData(),
+ (unsigned int *)&digest_size_, (*hash_evp)(), nullptr);
}
void CryptoHash::computeDigest(const std::vector<uint8_t> &buffer) {
@@ -95,7 +100,7 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) {
p = p->next();
} while (p != buffer);
- if (EVP_DigestFinal_ex(mcdtx, digest_.data(),
+ if (EVP_DigestFinal_ex(mcdtx, digest_->writableData(),
(unsigned int *)&digest_size_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
@@ -103,15 +108,16 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) {
EVP_MD_CTX_free(mcdtx);
}
-std::vector<uint8_t> CryptoHash::getDigest() const { return digest_; }
+const utils::MemBuf::Ptr &CryptoHash::getDigest() const { return digest_; }
std::string CryptoHash::getStringDigest() const {
std::stringstream string_digest;
string_digest << std::hex << std::setfill('0');
- for (auto byte : digest_) {
- string_digest << std::hex << std::setw(2) << static_cast<int>(byte);
+ for (size_t i = 0; i < digest_size_; ++i) {
+ string_digest << std::hex << std::setw(2)
+ << static_cast<int>(digest_->data()[i]);
}
return string_digest.str();
@@ -122,10 +128,10 @@ CryptoHashType CryptoHash::getType() const { return digest_type_; }
size_t CryptoHash::getSize() const { return digest_size_; }
void CryptoHash::setType(CryptoHashType hash_type) {
- reset();
digest_type_ = hash_type;
digest_size_ = CryptoHash::getSize(hash_type);
- digest_.resize(digest_size_);
+ DCHECK(digest_size_ <= digest_->tailroom());
+ digest_->setLength(digest_size_);
}
void CryptoHash::display() {
@@ -152,8 +158,8 @@ void CryptoHash::display() {
void CryptoHash::reset() {
digest_type_ = CryptoHashType::UNKNOWN;
- digest_.clear();
digest_size_ = 0;
+ digest_->setLength(0);
}
CryptoHashEVP CryptoHash::getEVP(CryptoHashType hash_type) {
diff --git a/libtransport/src/auth/signer.cc b/libtransport/src/auth/signer.cc
index e74e2f1b8..918e271f5 100644
--- a/libtransport/src/auth/signer.cc
+++ b/libtransport/src/auth/signer.cc
@@ -17,6 +17,8 @@
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
+#include "hicn/transport/core/global_object_pool.h"
+
namespace transport {
namespace auth {
@@ -24,7 +26,10 @@ namespace auth {
// Base Signer
// ---------------------------------------------------------
Signer::Signer()
- : suite_(CryptoSuite::UNKNOWN), signature_len_(0), key_(nullptr) {}
+ : suite_(CryptoSuite::UNKNOWN),
+ signature_(core::PacketManager<>::getInstance().getMemBuf()),
+ signature_len_(0),
+ key_(nullptr) {}
Signer::~Signer() {}
@@ -51,8 +56,8 @@ void Signer::signPacket(PacketPtr packet) {
packet->setValidationAlgorithm(suite_);
// Set key ID
- std::vector<uint8_t> key_id = key_id_.getDigest();
- packet->setKeyId({key_id.data(), key_id.size()});
+ const utils::MemBuf::Ptr &key_id = key_id_.getDigest();
+ packet->setKeyId({key_id->writableData(), key_id->length()});
// Reset fields to compute the packet hash
packet->resetForHash();
@@ -93,14 +98,16 @@ void Signer::signBuffer(const std::vector<uint8_t> &buffer) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
- if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) !=
- 1) {
+ if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(),
+ &signature_len_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
}
void Signer::signBuffer(const utils::MemBuf *buffer) {
@@ -135,24 +142,27 @@ void Signer::signBuffer(const utils::MemBuf *buffer) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
- if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) !=
- 1) {
+ if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(),
+ &signature_len_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
}
-std::vector<uint8_t> Signer::getSignature() const { return signature_; }
+const utils::MemBuf::Ptr &Signer::getSignature() const { return signature_; }
std::string Signer::getStringSignature() const {
std::stringstream string_sig;
string_sig << std::hex << std::setfill('0');
- for (auto byte : signature_) {
- string_sig << std::hex << std::setw(2) << static_cast<int>(byte);
+ for (size_t i = 0; i < signature_len_; ++i) {
+ string_sig << std::hex << std::setw(2)
+ << static_cast<int>(signature_->data()[i]);
}
return string_sig.str();
@@ -193,12 +203,14 @@ void VoidSigner::signBuffer(const utils::MemBuf *buffer) {}
// ---------------------------------------------------------
AsymmetricSigner::AsymmetricSigner(CryptoSuite suite,
std::shared_ptr<EVP_PKEY> key,
- std::shared_ptr<EVP_PKEY> pub_key) {
+ std::shared_ptr<EVP_PKEY> pub_key)
+ : Signer() {
setKey(suite, key, pub_key);
}
AsymmetricSigner::AsymmetricSigner(std::string keystore_path,
- std::string password) {
+ std::string password)
+ : Signer() {
FILE *p12file = fopen(keystore_path.c_str(), "r");
if (p12file == nullptr) {
@@ -230,7 +242,8 @@ void AsymmetricSigner::setKey(CryptoSuite suite, std::shared_ptr<EVP_PKEY> key,
suite_ = suite;
key_ = key;
signature_len_ = EVP_PKEY_size(key.get());
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
std::vector<uint8_t> pbk(i2d_PublicKey(pub_key.get(), nullptr));
uint8_t *pbk_ptr = pbk.data();
@@ -254,7 +267,8 @@ size_t AsymmetricSigner::getSignatureFieldSize() const {
// Symmetric Signer
// ---------------------------------------------------------
SymmetricSigner::SymmetricSigner(CryptoSuite suite,
- const std::string &passphrase) {
+ const std::string &passphrase)
+ : Signer() {
suite_ = suite;
key_ = std::shared_ptr<EVP_PKEY>(
EVP_PKEY_new_raw_private_key(EVP_PKEY_HMAC, nullptr,
@@ -270,7 +284,8 @@ SymmetricSigner::SymmetricSigner(CryptoSuite suite,
}
signature_len_ = EVP_MD_size((*hash_evp)());
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
key_id_.computeDigest((uint8_t *)passphrase.c_str(), passphrase.size());
}
diff --git a/libtransport/src/auth/verifier.cc b/libtransport/src/auth/verifier.cc
index 0c35437f3..5d5f01711 100644
--- a/libtransport/src/auth/verifier.cc
+++ b/libtransport/src/auth/verifier.cc
@@ -14,8 +14,11 @@
*/
#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <protocols/errors.h>
+#include "glog/logging.h"
+
namespace transport {
namespace auth {
@@ -49,8 +52,10 @@ bool Verifier::verifyPacket(PacketPtr packet) {
hicn_packet_copy_header(format, packet->packet_start_, &header_copy, false);
// Retrieve packet signature
- std::vector<uint8_t> signature_raw = packet->getSignature();
- signature_raw.resize(packet->getSignatureSize());
+ utils::MemBuf::Ptr signature_raw = packet->getSignature();
+ std::size_t signature_len = packet->getSignatureSize();
+ DCHECK(signature_len <= signature_raw->tailroom());
+ signature_raw->setLength(signature_len);
// Reset fields that are not used to compute signature
packet->resetForHash();
@@ -62,7 +67,7 @@ bool Verifier::verifyPacket(PacketPtr packet) {
// Restore header
hicn_packet_copy_header(format, &header_copy, packet->packet_start_, false);
packet->setSignature(signature_raw);
- packet->setSignatureSize(signature_raw.size());
+ packet->setSignatureSize(signature_raw->length());
return valid_packet;
}
@@ -165,13 +170,13 @@ void Verifier::callVerificationFailedCallback(Suffix suffix,
bool VoidVerifier::verifyPacket(PacketPtr packet) { return true; }
bool VoidVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
return true;
}
bool VoidVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
return true;
}
@@ -232,7 +237,7 @@ void AsymmetricVerifier::useCertificate(std::shared_ptr<X509> cert) {
}
bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -255,12 +260,12 @@ bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Digest update failed");
}
- return EVP_DigestVerifyFinal(mdctx.get(), signature.data(),
- signature.size()) == 1;
+ return EVP_DigestVerifyFinal(mdctx.get(), signature->data(),
+ signature->length()) == 1;
}
bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -288,8 +293,8 @@ bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
p = p->next();
} while (p != buffer);
- return EVP_DigestVerifyFinal(mdctx.get(), signature.data(),
- signature.size()) == 1;
+ return EVP_DigestVerifyFinal(mdctx.get(), signature->data(),
+ signature->length()) == 1;
}
// ---------------------------------------------------------
@@ -309,7 +314,7 @@ void SymmetricVerifier::setPassphrase(const std::string &passphrase) {
}
bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -317,7 +322,9 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Unknown hash type");
}
- std::vector<uint8_t> signature_bis(signature.size());
+ const utils::MemBuf::Ptr &signature_bis =
+ core::PacketManager<>::getInstance().getMemBuf();
+ signature_bis->append(signature->length());
size_t signature_bis_len;
std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free);
@@ -334,16 +341,17 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Digest update failed");
}
- if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(),
+ if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(),
&signature_bis_len) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- return signature == signature_bis && signature.size() == signature_bis_len;
+ return signature->length() == signature_bis_len &&
+ *signature == *signature_bis;
}
bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -352,7 +360,9 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
}
const utils::MemBuf *p = buffer;
- std::vector<uint8_t> signature_bis(signature.size());
+ const utils::MemBuf::Ptr &signature_bis =
+ core::PacketManager<>::getInstance().getMemBuf();
+ signature_bis->append(signature->length());
size_t signature_bis_len;
std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free);
@@ -373,12 +383,13 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
p = p->next();
} while (p != buffer);
- if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(),
+ if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(),
&signature_bis_len) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- return signature == signature_bis && signature.size() == signature_bis_len;
+ return signature->length() == signature_bis_len &&
+ *signature == *signature_bis;
}
} // namespace auth
diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h
index 38f26067e..caee210cd 100644
--- a/libtransport/src/core/manifest_format.h
+++ b/libtransport/src/core/manifest_format.h
@@ -18,6 +18,7 @@
#include <hicn/transport/auth/crypto_hash.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <protocols/fec_utils.h>
#include <cinttypes>
#include <type_traits>
@@ -41,11 +42,11 @@ struct ParamsRTC {
std::uint64_t timestamp;
std::uint32_t prod_rate;
std::uint32_t prod_seg;
- std::uint32_t support_fec;
+ protocol::fec::FECType fec_type;
bool operator==(const ParamsRTC &other) const {
return (timestamp == other.timestamp && prod_rate == other.prod_rate &&
- prod_seg == other.prod_seg && support_fec == other.support_fec);
+ prod_seg == other.prod_seg && fec_type == other.fec_type);
}
};
diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc
index 4c8a5e031..428d6ad12 100644
--- a/libtransport/src/core/manifest_format_fixed.cc
+++ b/libtransport/src/core/manifest_format_fixed.cc
@@ -154,22 +154,20 @@ FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl(
.timestamp = params.timestamp,
.prod_rate = params.prod_rate,
.prod_seg = params.prod_seg,
- .support_fec = params.support_fec,
+ .fec_type = static_cast<uint32_t>(params.fec_type),
};
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl(
uint32_t suffix, const auth::CryptoHash &hash) {
- std::vector<uint8_t> _hash = hash.getDigest();
-
manifest_entries_.push_back(ManifestEntry{
.suffix = htonl(suffix),
.hash = {0},
});
std::memcpy(reinterpret_cast<uint8_t *>(manifest_entries_.back().hash),
- _hash.data(), _hash.size());
+ hash.getDigest()->data(), hash.getSize());
if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) {
throw errors::RuntimeException("Manifest size exceeded the packet MTU!");
@@ -301,7 +299,7 @@ ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const {
.timestamp = params_rtc_->timestamp,
.prod_rate = params_rtc_->prod_rate,
.prod_seg = params_rtc_->prod_seg,
- .support_fec = params_rtc_->support_fec,
+ .fec_type = static_cast<protocol::fec::FECType>(params_rtc_->fec_type),
};
}
diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h
index ade4bf02c..5fd2a673d 100644
--- a/libtransport/src/core/manifest_format_fixed.h
+++ b/libtransport/src/core/manifest_format_fixed.h
@@ -65,9 +65,7 @@ namespace core {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Current Segment |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// |F| |
-// + Reserved for future parameters +
-// | |
+// | FEC Type |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Manifest Entry:
@@ -137,7 +135,7 @@ struct __attribute__((__packed__)) TransportParamsRTC {
std::uint64_t timestamp;
std::uint32_t prod_rate;
std::uint32_t prod_seg;
- std::uint32_t support_fec;
+ std::uint32_t fec_type;
};
static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE);
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index df27444af..0c08246af 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -15,6 +15,7 @@
#include <glog/logging.h>
#include <hicn/transport/auth/crypto_hash.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/core/packet.h>
#include <hicn/transport/errors/malformed_packet_exception.h>
#include <hicn/transport/utils/hash.h>
@@ -481,7 +482,7 @@ uint8_t Packet::getTTL() const {
bool Packet::hasAH() const { return _is_ah(format_); }
-std::vector<uint8_t> Packet::getSignature() const {
+utils::MemBuf::Ptr Packet::getSignature() const {
if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
@@ -493,7 +494,11 @@ std::vector<uint8_t> Packet::getSignature() const {
throw errors::RuntimeException("Error getting signature.");
}
- return std::vector<uint8_t>(signature, signature + getSignatureFieldSize());
+ utils::MemBuf::Ptr membuf = PacketManager<>::getInstance().getMemBuf();
+ membuf->append(getSignatureFieldSize());
+ memcpy(membuf->writableData(), signature, getSignatureFieldSize());
+
+ return membuf;
}
std::size_t Packet::getSignatureFieldSize() const {
@@ -570,7 +575,7 @@ auth::CryptoSuite Packet::getValidationAlgorithm() const {
return auth::CryptoSuite(return_value);
}
-void Packet::setSignature(const std::vector<uint8_t> &signature) {
+void Packet::setSignature(const utils::MemBuf::Ptr &signature) {
if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
@@ -580,7 +585,7 @@ void Packet::setSignature(const std::vector<uint8_t> &signature) {
if (ret < 0) {
throw errors::RuntimeException("Error getting signature.");
}
- memcpy(signature_field, signature.data(), signature.size());
+ memcpy(signature_field, signature->data(), signature->length());
}
void Packet::setSignatureFieldSize(std::size_t size) {
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index ebdac7f93..33e70888f 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -56,7 +56,8 @@ class ConsumerSocket : public Socket {
rate_estimation_observer_(nullptr),
rate_estimation_batching_parameter_(default_values::batch),
rate_estimation_choice_(0),
- max_unverified_delay_(default_values::max_unverified_delay),
+ unverified_interval_(default_values::unverified_interval),
+ unverified_ratio_(default_values::unverified_ratio),
verifier_(std::make_shared<auth::VoidVerifier>()),
verify_signature_(false),
reset_window_(false),
@@ -71,7 +72,6 @@ class ConsumerSocket : public Socket {
timer_interval_milliseconds_(0),
recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY),
aggregated_data_(false),
- fec_setting_(""),
guard_raaqm_params_() {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
@@ -197,6 +197,10 @@ class ConsumerSocket : public Socket {
current_window_size_ = socket_option_value;
break;
+ case UNVERIFIED_RATIO:
+ unverified_ratio_ = socket_option_value;
+ break;
+
case GAMMA_VALUE:
gamma_ = socket_option_value;
break;
@@ -238,8 +242,8 @@ class ConsumerSocket : public Socket {
interest_lifetime_ = socket_option_value;
break;
- case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
- max_unverified_delay_ = socket_option_value;
+ case GeneralTransportOptions::UNVERIFIED_INTERVAL:
+ unverified_interval_ = socket_option_value;
break;
case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
@@ -437,11 +441,6 @@ class ConsumerSocket : public Socket {
result = SOCKET_OPTION_SET;
}
break;
- case GeneralTransportOptions::FEC_TYPE:
- fec_setting_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
default:
return result;
}
@@ -507,6 +506,10 @@ class ConsumerSocket : public Socket {
socket_option_value = current_window_size_;
break;
+ case GeneralTransportOptions::UNVERIFIED_RATIO:
+ socket_option_value = unverified_ratio_;
+ break;
+
// RAAQM parameters
case RaaqmTransportOptions::GAMMA_VALUE:
@@ -547,8 +550,8 @@ class ConsumerSocket : public Socket {
socket_option_value = interest_lifetime_;
break;
- case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
- socket_option_value = max_unverified_delay_;
+ case GeneralTransportOptions::UNVERIFIED_INTERVAL:
+ socket_option_value = unverified_interval_;
break;
case RaaqmTransportOptions::SAMPLE_NUMBER:
@@ -702,13 +705,9 @@ class ConsumerSocket : public Socket {
case DataLinkOptions::OUTPUT_INTERFACE:
socket_option_value = output_interface_;
break;
- case GeneralTransportOptions::FEC_TYPE:
- socket_option_value = fec_setting_;
- break;
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
@@ -828,7 +827,8 @@ class ConsumerSocket : public Socket {
int rate_estimation_choice_;
// Verification parameters
- int max_unverified_delay_;
+ uint32_t unverified_interval_;
+ double unverified_ratio_;
std::shared_ptr<auth::Verifier> verifier_;
transport::auth::KeyId *key_id_;
std::atomic_bool verify_signature_;
@@ -857,9 +857,6 @@ class ConsumerSocket : public Socket {
RtcTransportRecoveryStrategies recovery_strategy_;
bool aggregated_data_;
- // FEC setting
- std::string fec_setting_;
-
utils::SpinLock guard_raaqm_params_;
std::string output_interface_;
};
diff --git a/libtransport/src/protocols/fec/fec.cc b/libtransport/src/protocols/fec/fec.cc
index 912e7a40f..5881d4d92 100644
--- a/libtransport/src/protocols/fec/fec.cc
+++ b/libtransport/src/protocols/fec/fec.cc
@@ -719,4 +719,4 @@ int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) {
}
free(m_dec);
return 0;
-}
+} \ No newline at end of file
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index 242abd30d..e49f58167 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -33,13 +33,13 @@ RTCProductionProtocol::RTCProductionProtocol(
implementation::ProducerSocket *icn_socket)
: ProductionProtocol(icn_socket),
current_seg_(1),
+ prev_produced_bytes_(0),
+ prev_produced_packets_(0),
produced_bytes_(0),
produced_packets_(0),
- produced_fec_packets_(0),
- max_packet_production_(1),
- bytes_production_rate_(0),
+ max_packet_production_(UINT32_MAX),
+ bytes_production_rate_(UINT32_MAX),
packets_production_rate_(0),
- fec_packets_production_rate_(0),
last_produced_data_ts_(0),
last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
@@ -116,32 +116,47 @@ void RTCProductionProtocol::scheduleRoundTimer() {
auto sp = self.lock();
if (sp && sp->isRunning()) {
- sp->updateStats();
+ sp->updateStats(true);
}
});
}
-void RTCProductionProtocol::updateStats() {
+void RTCProductionProtocol::updateStats(bool new_round) {
uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t duration = now - last_round_;
- if (duration == 0) duration = 1;
+ if (!new_round) {
+ duration += rtc::PRODUCER_STATS_INTERVAL;
+ } else {
+ prev_produced_bytes_ = 0;
+ prev_produced_packets_ = 0;
+ }
+
double per_second = rtc::MILLI_IN_A_SEC / duration;
uint32_t prev_packets_production_rate = packets_production_rate_;
- bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
- packets_production_rate_ = ceil((double)produced_packets_ * per_second);
- fec_packets_production_rate_ =
- ceil((double)produced_fec_packets_ * per_second);
+ // bytes_production_rate_ does not take into account FEC!!! this is because
+ // each client requests a differen amount of FEC packet so the client itself
+ // increase the production rate in the right way
+ bytes_production_rate_ =
+ ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second);
+ packets_production_rate_ =
+ ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Updating production rate: produced_bytes_ = " << produced_bytes_
- << " bps = " << bytes_production_rate_;
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
// update the production rate as soon as it increases by 10% with respect to
// the last round
max_packet_production_ =
- produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ produced_packets_ + ceil((double)produced_packets_ * 0.10);
if (max_packet_production_ < rtc::WIN_MIN)
max_packet_production_ = rtc::WIN_MIN;
@@ -158,11 +173,14 @@ void RTCProductionProtocol::updateStats() {
sendNacksForPendingInterests();
}
- produced_bytes_ = 0;
- produced_packets_ = 0;
- produced_fec_packets_ = 0;
- last_round_ = now;
- scheduleRoundTimer();
+ if (new_round) {
+ prev_produced_bytes_ = produced_bytes_;
+ prev_produced_packets_ = produced_packets_;
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
+ }
}
uint32_t RTCProductionProtocol::produceStream(
@@ -387,7 +405,7 @@ RTCProductionProtocol::createManifest(const Name &content_name) const {
.timestamp = now,
.prod_rate = bytes_production_rate_,
.prod_seg = current_seg_,
- .support_fec = false,
+ .fec_type = fec_type_,
});
return manifest;
@@ -434,16 +452,13 @@ void RTCProductionProtocol::producePktInternal(
produced_bytes_ +=
content_object->headerSize() + content_object->payloadSize();
produced_packets_++;
- } else {
- produced_fec_packets_++;
}
if (!data_aggregation_ && produced_packets_ >= max_packet_production_) {
// in this case all the pending interests may be used to accomodate the
// sudden increase in the production rate. calling the updateStats we will
// notify all the clients
- round_timer_->cancel();
- updateStats();
+ updateStats(false);
}
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -616,7 +631,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
// if the production rate 0 use delayed nacks
if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = ~0;
+ uint64_t next_timer = UINT64_MAX;
if (!timers_map_.empty()) {
next_timer = timers_map_.begin()->first;
}
@@ -652,8 +667,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
(double)((double)((double)lifetime *
rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_ +
- fec_packets_production_rate_)));
+ (double)(packets_production_rate_)));
if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
sendNack(interest_seg);
@@ -723,20 +737,30 @@ void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
void RTCProductionProtocol::sendNacksForPendingInterests() {
std::unordered_set<uint32_t> to_remove;
- uint32_t packet_gap = 100000; // set it to a high value (100sec)
- if (packets_production_rate_ != 0)
- packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+ uint32_t pps = ceil((double)(packets_production_rate_)*rtc::
+ INTEREST_LIFETIME_REDUCTION_FACTOR);
uint64_t now = utils::SteadyTime::nowMs().count();
-
for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_) {
- uint64_t production_time =
- ((it->first - current_seg_) * packet_gap) + now;
- if (production_time >= it->second) {
+ if (it->first > current_seg_ && it->second > now) {
+ double exp_time_in_sec =
+ (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC;
+ uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec);
+
+ if (it->first > (current_seg_ + packets_prod_before_expire)) {
sendNack(it->first);
to_remove.insert(it->first);
}
+ } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ ||
+ it->second <= now)) {
+ // this branch should never be execcuted
+ // first condition: the packet was already prdocued and we have and old
+ // interest pending. send a nack to notify the consumer if needed. the
+ // case it->first = current_seg_ is not handled because
+ // the interest will be satified by the next data packet.
+ // second condition: the interest is expired.
+ sendNack(it->first);
+ to_remove.insert(it->first);
}
}
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h
index 7f50a2505..c0424a39c 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.h
+++ b/libtransport/src/protocols/prod_protocol_rtc.h
@@ -77,7 +77,7 @@ class RTCProductionProtocol : public ProductionProtocol {
const Name &name) const;
// stats
- void updateStats();
+ void updateStats(bool new_round);
void scheduleRoundTimer();
// pending intersts functions
@@ -108,16 +108,17 @@ class RTCProductionProtocol : public ProductionProtocol {
uint32_t prod_label_; // path label of the producer
uint32_t cache_label_; // path label for content from the producer cache
+ uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars
+ uint32_t prev_produced_packets_;
+
uint32_t produced_bytes_; // bytes produced in the last round
uint32_t produced_packets_; // packet produed in the last round
- uint32_t produced_fec_packets_; // fec packets produced last round
uint32_t max_packet_production_; // never exceed this number of packets
// without update stats
uint32_t bytes_production_rate_; // bytes per sec
uint32_t packets_production_rate_; // pps
- uint32_t fec_packets_production_rate_; // pps
uint64_t last_produced_data_ts_; // ms
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index abb234757..6a84914ab 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -36,11 +36,18 @@ ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
ProbeHandler::~ProbeHandler() {}
-uint64_t ProbeHandler::getRtt(uint32_t seq) {
+uint64_t ProbeHandler::getRtt(uint32_t seq, bool is_valid) {
auto it = pending_probes_.find(seq);
if (it == pending_probes_.end()) return 0;
+ if (!is_valid) {
+ // delete the probe anyway
+ pending_probes_.erase(it);
+ valid_batch_ = false;
+ return 0;
+ }
+
uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t rtt = now - it->second;
if (rtt < 1) rtt = 1;
@@ -52,6 +59,7 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) {
}
double ProbeHandler::getProbeLossRate() {
+ if (!valid_batch_) return 1.0;
return 1.0 - ((double)recv_probes_ / (double)sent_probes_);
}
@@ -71,11 +79,26 @@ void ProbeHandler::stopProbes() {
max_probes_ = 0;
sent_probes_ = 0;
recv_probes_ = 0;
+ valid_batch_ = true;
probe_timer_->cancel();
}
void ProbeHandler::sendProbes() {
if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+ auto s = self.lock();
+ if (s) {
+ s->generateProbe();
+ }
+ });
+}
+
+void ProbeHandler::generateProbe() {
+ if (probe_interval_ == 0) return;
if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
uint64_t now = utils::SteadyTime::nowMs().count();
@@ -97,17 +120,7 @@ void ProbeHandler::sendProbes() {
}
}
- if (probe_interval_ == 0) return;
-
- std::weak_ptr<ProbeHandler> self(shared_from_this());
- probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
- probe_timer_->async_wait([self](const std::error_code &ec) {
- if (ec) return;
- auto s = self.lock();
- if (s) {
- s->sendProbes();
- }
- });
+ sendProbes();
}
ProbeType ProbeHandler::getProbeType(uint32_t seq) {
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
index 2de908176..d989194d4 100644
--- a/libtransport/src/protocols/rtc/probe_handler.h
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -42,7 +42,7 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
~ProbeHandler();
// If the function returns 0 the probe is not valid.
- uint64_t getRtt(uint32_t seq);
+ uint64_t getRtt(uint32_t seq, bool is_valid);
// this function may return a residual loss rate higher than the real one if
// we don't wait enough time for the probes to come back
@@ -63,11 +63,17 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
static ProbeType getProbeType(uint32_t seq);
private:
+ void generateProbe();
+
uint32_t probe_interval_; // us
uint32_t max_probes_; // packets
uint32_t sent_probes_; // packets
uint32_t recv_probes_; // packets
+ bool valid_batch_; // if at least one probe in a batch is considered not
+ // valid (e.g. prod rate == ~0) the full batch is invalid.
+ // the bool is set to true when sendProbe is called
+
std::unique_ptr<asio::steady_timer> probe_timer_;
// Map from packet suffixes to timestamp
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index df6522471..d2682edfa 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -63,16 +63,25 @@ std::size_t RTCTransportProtocol::transportHeaderLength() {
// private
void RTCTransportProtocol::initParams() {
TransportProtocol::reset();
- fwd_strategy_.setCallback(on_fwd_strategy_);
-
std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ fwd_strategy_.setCallback([self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy);
+ }
+ });
+
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t max_unverified_delay;
- socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME,
- max_unverified_delay);
+ uint32_t unverified_interval;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
+ unverified_interval);
+
+ double unverified_ratio;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
+ unverified_ratio);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -84,8 +93,15 @@ void RTCTransportProtocol::initParams() {
ptr->sendRtxInterest(seq);
}
},
- on_rec_strategy_);
- verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay);
+ [self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy);
+ }
+ });
+
+ verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
+ unverified_ratio);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -102,7 +118,6 @@ void RTCTransportProtocol::initParams() {
}
},
portal_->getThread().getIoService());
- state_->initParams();
rc_->setState(state_);
rc_->turnOnRateControl();
@@ -153,21 +168,8 @@ void RTCTransportProtocol::initParams() {
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
RTC_INTEREST_LIFETIME);
- // FEC
- using namespace std::placeholders;
- enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1),
- /* We leave the buffer allocation to the fec decoder */
- fec::FECBase::BufferRequested(0));
-
- if (fec_decoder_) {
- indexer_verifier_->enableFec(fec_type_);
- indexer_verifier_->setNFec(0);
- ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
- fec::FECUtils::getSourceSymbols(fec_type_));
- fec_decoder_->setIOService(portal_->getThread().getIoService());
- } else {
- indexer_verifier_->disableFec();
- }
+ // init state params
+ state_->initParams();
}
// private
@@ -223,29 +225,26 @@ void RTCTransportProtocol::newRound() {
uint32_t received_nacks = state->getReceivedNacksInRound();
uint32_t received_fec = state->getReceivedFecPackets();
- bool in_sync = (ptr->current_state_ == SyncState::in_sync);
- ptr->ldr_->onNewRound(in_sync);
- ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
- ptr->rc_->onNewRound((double)ROUND_LEN);
-
// update sync state if needed
+ double cache_rate = state->getPacketFromCacheRatio();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+
if (ptr->current_state_ == SyncState::in_sync) {
- double cache_rate = state->getPacketFromCacheRatio();
if (cache_rate > MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::catch_up;
}
} else {
- double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double received_rate =
- state->getReceivedRate() + state->getRecoveredFecRate();
- uint32_t round_without_nacks = state->getRoundsWithoutNacks();
- double cache_ratio = state->getPacketFromCacheRatio();
if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
- received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ cache_rate < MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::in_sync;
}
}
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
+
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Calling updateSyncWindow in newRound function";
ptr->updateSyncWindow();
@@ -335,7 +334,7 @@ void RTCTransportProtocol::updateSyncWindow() {
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- uint32_t buffer = PRODUCER_BUFFER_MS;
+ uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0);
current_sync_win_ +=
ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
@@ -360,15 +359,6 @@ void RTCTransportProtocol::updateSyncWindow() {
scheduleNextInterests();
}
-void RTCTransportProtocol::decreaseSyncWindow() {
- // called on future nack
- // we have a new sample of the production rate, so update max win first
- computeMaxSyncWindow();
- current_sync_win_--;
- current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
- scheduleNextInterests();
-}
-
void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
if (!isRunning() && !is_first_) return;
@@ -468,7 +458,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
auto ptr = self.lock();
if (ptr && ptr->isRunning()) {
if (!ptr->scheduler_timer_on_) return;
-
ptr->scheduler_timer_on_ = false;
ptr->scheduleNextInterests();
}
@@ -688,8 +677,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
if (!is_rtx) current_state_ = SyncState::catch_up;
-
- updateSyncWindow();
} else {
// if production_seg == nack_segment we consider this a future nack, since
// production_seg is not yet created. this may happen in case of low
@@ -702,23 +689,50 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// the client is asking for content in the future
// switch to in sync state and decrease the window
current_state_ = SyncState::in_sync;
- decreaseSyncWindow();
}
+ updateSyncWindow();
}
void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
- bool valid = state_->onProbePacketReceived(content_object);
- if (!valid) return;
+ uint32_t suffix = content_object.getName().getSuffix();
+ ParamsRTC params = RTCState::getProbeParams(content_object);
+
+ if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) {
+ fec::FECType fec_type = params.fec_type;
+
+ if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) {
+ // Update FEC type
+ fec_type_ = fec_type;
+
+ // Enable FEC
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this,
+ std::placeholders::_1),
+ fec::FECBase::BufferRequested(0));
+
+ // Update FEC parameters
+ indexer_verifier_->enableFec(fec_type);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type),
+ fec::FECUtils::getSourceSymbols(fec_type));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
+ } else if (fec_type == fec::FECType::UNKNOWN) {
+ indexer_verifier_->disableFec();
+ }
+ }
- uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg;
+ if (!state_->onProbePacketReceived(content_object)) return;
- // As for the nacks set next_segment
+ // As for NACKs, set next_segment
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "on probe next seg = " << indexer_verifier_->checkNextSuffix()
- << ", jump to " << production_seg;
- indexer_verifier_->jumpToIndex(production_seg);
-
- ldr_->onProbePacketReceived(content_object);
+ << ", jump to " << params.prod_seg;
+ indexer_verifier_->jumpToIndex(params.prod_seg);
+
+ bool loss_detected = ldr_->onProbePacketReceived(content_object);
+ // we are not out of sync here but we are starting to download content from
+ // the cache, maybe beacuse the production rate increased suddenly. for this
+ // reason we put the state to catch up to increase the window
+ if (loss_detected) current_state_ = SyncState::catch_up;
updateSyncWindow();
}
@@ -822,6 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// The packet is considered received, return early
onDataPacketReceived(*content_ptr, compute_stats);
+ // this is a rtx but we may need to feed it in the decoder
+ decodePacket(content_object, is_manifest);
return;
}
@@ -846,18 +862,8 @@ void RTCTransportProtocol::onContentObjectReceived(
state_->dataToBeReceived(segment_number);
}
- // Send packet to FEC decoder
- if (fec_decoder_) {
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "Send packet " << segment_number << " to FEC decoder";
-
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
- uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
-
- fec_decoder_->onDataPacket(content_object, offset, metadata);
- }
+ // send packet to the decoder
+ decodePacket(content_object, is_manifest);
// We can return early if FEC
if (is_fec) {
@@ -947,6 +953,21 @@ void RTCTransportProtocol::sendStatsToApp(
}
}
+void RTCTransportProtocol::decodePacket(ContentObject &content_object,
+ bool is_manifest) {
+ if (!fec_decoder_) return;
+
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << content_object.getName() << " to FEC decoder";
+
+ uint32_t offset = is_manifest
+ ? content_object.headerSize()
+ : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+}
+
void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
Packet::Format format;
socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index 37706eb1c..3763f33c7 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -65,7 +65,6 @@ class RTCTransportProtocol : public TransportProtocol {
// window functions
void computeMaxSyncWindow();
void updateSyncWindow();
- void decreaseSyncWindow();
// packet functions
void sendRtxInterest(uint32_t seq);
@@ -89,6 +88,8 @@ class RTCTransportProtocol : public TransportProtocol {
uint32_t received_nacks, uint32_t received_fec);
// FEC functions
+ // send the received content object to the decoder
+ void decodePacket(ContentObject &content_object, bool is_manifest);
void onFecPackets(fec::BufferArray &packets);
// Utils
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index 03efd8e84..96e39d07e 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -34,7 +34,7 @@ const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
// increasing this number we increase the time that an
// interest will wait for the data packet to be produced
// at the producer socket
-const uint32_t PRODUCER_BUFFER_MS = 200; // ms
+const uint32_t PRODUCER_BUFFER_MS = 300; // ms
// interest scheduler
// const uint32_t MAX_INTERESTS_IN_BATCH = 5;
@@ -72,7 +72,8 @@ const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
// to get an answer. we wait 100ms between each try
const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
// once we get the first probe we wait at most 60ms for the others
-const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+const uint32_t INIT_RTT_PROBE_WAIT =
+ ((INIT_RTT_PROBES * INIT_RTT_PROBE_INTERVAL) / 1000) * 2; // ms
// we reuires at least 5 probes to be recevied
const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms
const uint32_t MAX_PENDING_PROBES = 10;
@@ -81,7 +82,7 @@ const uint32_t MAX_PENDING_PROBES = 10;
const double MAX_QUEUING_DELAY = 50.0; // ms
// data from cache
-const double MAX_DATA_FROM_CACHE = 0.25; // 25%
+const double MAX_DATA_FROM_CACHE = 0.10; // 10%
// window const
const uint32_t INITIAL_WIN = 5; // pkts
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
index 9503eed3e..c6bc751e6 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -35,8 +35,9 @@ RTCForwardingStrategy::RTCForwardingStrategy()
RTCForwardingStrategy::~RTCForwardingStrategy() {}
-void RTCForwardingStrategy::setCallback(interface::StrategyCallback* callback) {
- callback_ = callback;
+void RTCForwardingStrategy::setCallback(
+ interface::StrategyCallback&& callback) {
+ callback_ = std::move(callback);
}
void RTCForwardingStrategy::initFwdStrategy(
@@ -55,27 +56,24 @@ void RTCForwardingStrategy::initFwdStrategy(
}
void RTCForwardingStrategy::checkStrategy() {
- if (*callback_) {
- strategy_t used_strategy = selected_strategy_;
- if (used_strategy == BOTH) used_strategy = current_strategy_;
- assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
- used_strategy == NONE);
-
- notification::ForwardingStrategy strategy =
- notification::ForwardingStrategy::NONE;
- switch (used_strategy) {
- case BEST_PATH:
- strategy = notification::ForwardingStrategy::BEST_PATH;
- break;
- case REPLICATION:
- strategy = notification::ForwardingStrategy::REPLICATION;
- break;
- default:
- break;
- }
-
- (*callback_)(strategy);
+ strategy_t used_strategy = selected_strategy_;
+ if (used_strategy == BOTH) used_strategy = current_strategy_;
+ assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
+ used_strategy == NONE);
+
+ notification::ForwardingStrategy strategy =
+ notification::ForwardingStrategy::NONE;
+ switch (used_strategy) {
+ case BEST_PATH:
+ strategy = notification::ForwardingStrategy::BEST_PATH;
+ break;
+ case REPLICATION:
+ strategy = notification::ForwardingStrategy::REPLICATION;
+ break;
+ default:
+ break;
}
+ callback_(strategy);
if (!init_) return;
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
index 821b28051..9825877fd 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -44,7 +44,7 @@ class RTCForwardingStrategy {
strategy_t strategy);
void checkStrategy();
- void setCallback(interface::StrategyCallback* callback);
+ void setCallback(interface::StrategyCallback&& callback);
private:
void checkStrategyBestPath();
@@ -68,7 +68,7 @@ class RTCForwardingStrategy {
core::Prefix prefix_;
std::shared_ptr<core::Portal> portal_;
RTCState* state_;
- interface::StrategyCallback* callback_;
+ interface::StrategyCallback callback_;
};
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index 1ca1cf48d..abf6cda2c 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -36,17 +36,17 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
Indexer *indexer, asio::io_service &io_service,
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
- interface::StrategyCallback *external_callback) {
+ interface::StrategyCallback &&external_callback) {
rs_type_ = type;
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_BESTPATH ||
@@ -55,12 +55,12 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_ALL_FWD_STRATEGIES) {
rs_ = std::make_shared<RecoveryStrategyLowRate>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else {
// default
rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
}
}
@@ -97,19 +97,21 @@ void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
rs_->onNewRound(in_sync);
}
-void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
+bool RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
if (!lost) {
- detectLoss(seq, seq + 1);
+ return detectLoss(seq, seq + 1, false);
} else {
rs_->onLostTimeout(seq);
}
+ return false;
}
-void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
+bool RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
rs_->receivedPacket(seq);
+ return false;
}
-void RTCLossDetectionAndRecovery::onDataPacketReceived(
+bool RTCLossDetectionAndRecovery::onDataPacketReceived(
const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
bool is_rtx = rs_->isRtx(seq);
@@ -118,10 +120,13 @@ void RTCLossDetectionAndRecovery::onDataPacketReceived(
<< "received data. add from "
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
if (!is_rtx)
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq);
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq,
+ false);
+
+ return false;
}
-void RTCLossDetectionAndRecovery::onNackPacketReceived(
+bool RTCLossDetectionAndRecovery::onNackPacketReceived(
const core::ContentObject &nack) {
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
@@ -140,11 +145,18 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
<< "received nack. add from "
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
<< production_seq;
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
- production_seq);
+
+ // if it is a future nack store it in the list set of nacked seq
+ if (production_seq <= seq) rs_->receivedFutureNack(seq);
+
+ // call the detectLoss function using the probe flag = true. in fact the
+ // losses detected using nacks are the same as the one detected using probes,
+ // we should not increase the loss counter
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq, true);
}
-void RTCLossDetectionAndRecovery::onProbePacketReceived(
+bool RTCLossDetectionAndRecovery::onProbePacketReceived(
const core::ContentObject &probe) {
// we don't log the reception of a probe packet for the sentinel timer because
// probes are not taken into account into the sync window. we use them as
@@ -157,12 +169,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived(
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
<< production_seq;
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
- production_seq);
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq, true);
}
-void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) {
- if (start >= stop) return;
+bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop,
+ bool recv_probe) {
+ if (start >= stop) return false;
// skip nacked packets
if (start <= rs_->getState()->getLastSeqNacked()) {
@@ -174,13 +187,31 @@ void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) {
start = rs_->getState()->getHighestSeqReceivedInOrder() + 1;
}
+ bool loss_detected = false;
for (uint32_t seq = start; seq < stop; seq++) {
if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) {
if (rs_->lossDetected(seq)) {
- rs_->getState()->onLossDetected(seq);
+ loss_detected = true;
+ if ((recv_probe || rs_->wasNacked(seq)) && !rs_->isFecOn()) {
+ // these losses were detected using a probe and fec is off.
+ // in this case most likelly the procotol is about to go out of sync
+ // and the packets are not really lost (e.g. increase in prod rate).
+ // for this reason we do not
+ // count the losses in the stats. Instead we do the following
+ // 1. send RTX for the packets in case they were really lost
+ // 2. return to the RTC protocol that a loss was detected using a
+ // probe. the protocol will switch to catch_up mode to increase the
+ // size of the window
+ rs_->requestPossibleLostPacket(seq);
+ } else {
+ // if fec is on we don't need to mask pontetial losses, so increase
+ // the loss rate
+ rs_->notifyNewLossDetedcted(seq);
+ }
}
}
}
+ return loss_detected;
}
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index e7f8ce5db..7f683eaa6 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -36,7 +36,7 @@ class RTCLossDetectionAndRecovery
RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service,
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
~RTCLossDetectionAndRecovery();
@@ -47,17 +47,19 @@ class RTCLossDetectionAndRecovery
void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
- void turnOnRecovery() { rs_->tunrOnRecovery(); }
+ void turnOnRecovery() { rs_->turnOnRecovery(); }
bool isRtxOn() { return rs_->isRtxOn(); }
void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type);
void onNewRound(bool in_sync);
- void onTimeout(uint32_t seq, bool lost);
- void onPacketRecoveredFec(uint32_t seq);
- void onDataPacketReceived(const core::ContentObject &content_object);
- void onNackPacketReceived(const core::ContentObject &nack);
- void onProbePacketReceived(const core::ContentObject &probe);
+
+ // the following functions return true if a loss is detected, false otherwise
+ bool onTimeout(uint32_t seq, bool lost);
+ bool onPacketRecoveredFec(uint32_t seq);
+ bool onDataPacketReceived(const core::ContentObject &content_object);
+ bool onNackPacketReceived(const core::ContentObject &nack);
+ bool onProbePacketReceived(const core::ContentObject &probe);
void clear() { rs_->clear(); }
@@ -67,7 +69,8 @@ class RTCLossDetectionAndRecovery
}
private:
- void detectLoss(uint32_t start, uint32_t stop);
+ // returns true if a loss is detected, false otherwise
+ bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe);
interface::RtcTransportRecoveryStrategies rs_type_;
std::shared_ptr<RecoveryStrategy> rs_;
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
index 888105eab..66ae5086c 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -29,20 +29,22 @@ using namespace transport::interface;
RecoveryStrategy::RecoveryStrategy(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- bool use_rtx, bool use_fec, interface::StrategyCallback *external_callback)
+ bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback)
: recovery_on_(false),
+ rtx_during_fec_(0),
next_rtx_timer_(MAX_TIMER_RTX),
send_rtx_callback_(std::move(callback)),
indexer_(indexer),
round_id_(0),
last_fec_used_(0),
- callback_(external_callback) {
+ callback_(std::move(external_callback)) {
setRtxFec(use_rtx, use_fec);
timer_ = std::make_unique<asio::steady_timer>(io_service);
}
RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
- : rtx_state_(std::move(rs.rtx_state_)),
+ : rtx_during_fec_(0),
+ rtx_state_(std::move(rs.rtx_state_)),
rtx_timers_(std::move(rs.rtx_timers_)),
recover_with_fec_(std::move(rs.recover_with_fec_)),
timer_(std::move(rs.timer_)),
@@ -55,7 +57,7 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
rc_(std::move(rs.rc_)),
round_id_(std::move(rs.round_id_)),
last_fec_used_(std::move(rs.last_fec_used_)),
- callback_(rs.callback_) {
+ callback_(std::move(rs.callback_)) {
setFecParams(n_, k_);
}
@@ -68,7 +70,7 @@ void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
// XXX for the moment we go in steps of 5% loss rate.
// max loss rate = 95%
for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)loss_rate / 100.0;
+ double dec_loss_rate = (double)(loss_rate + 5) / 100.0;
double exp_losses = (double)k_ * dec_loss_rate;
uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
@@ -87,18 +89,39 @@ bool RecoveryStrategy::lossDetected(uint32_t seq) {
return false;
}
- auto it = recover_with_fec_.find(seq);
- if (it != recover_with_fec_.end()) {
+ auto it_fec = recover_with_fec_.find(seq);
+ if (it_fec != recover_with_fec_.end()) {
// this packet is already in list of packets to recover with fec
// this list contians also fec packets that will not be recovered with rtx
return false;
}
- // new loss detected, recover it according to the strategy
- newPacketLoss(seq);
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ // this packet was nacked so we do not use it to determine the loss rate
+ return false;
+ }
+
return true;
}
+void RecoveryStrategy::notifyNewLossDetedcted(uint32_t seq) {
+ // new loss detected
+ // first record the loss. second do what is needed to recover it
+ state_->onLossDetected(seq);
+ newPacketLoss(seq);
+}
+
+void RecoveryStrategy::requestPossibleLostPacket(uint32_t seq) {
+ // these are packets for which we send a RTX but we do not increase the loss
+ // counter beacuse we don't know if they are lost or not
+ addNewRtx(seq, false);
+}
+
+void RecoveryStrategy::receivedFutureNack(uint32_t seq) {
+ nacked_seq_.insert(seq);
+}
+
void RecoveryStrategy::clear() {
rtx_state_.clear();
rtx_timers_.clear();
@@ -236,6 +259,9 @@ void RecoveryStrategy::retransmit() {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "send rtx for sequence " << seq << ", next send in "
<< (rtx_it->second.next_send_ - now);
+
+ // if fec is on increase the number of RTX send during fec
+ if (fec_on_) rtx_during_fec_++;
send_rtx_callback_(seq);
sent_counter++;
}
@@ -306,7 +332,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) {
}
// fec functions
-uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) {
+uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in %
if (loss_rate > 95) loss_rate = 95; // max loss rate
@@ -365,21 +391,25 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) {
void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
std::optional<bool> fec_on) {
if (rtx_on) rtx_on_ = *rtx_on;
- if (fec_on) fec_on_ = *fec_on;
+ if (fec_on) {
+ if (fec_on_ == false && (*fec_on) == true) { // turn on fec
+ // reset the number of RTX sent during fec
+ rtx_during_fec_ = 0;
+ }
+ fec_on_ = *fec_on;
+ }
- if (*callback_) {
- notification::RecoveryStrategy strategy =
- notification::RecoveryStrategy::RECOVERY_OFF;
+ notification::RecoveryStrategy strategy =
+ notification::RecoveryStrategy::RECOVERY_OFF;
- if (rtx_on_ && fec_on_)
- strategy = notification::RecoveryStrategy::RTX_AND_FEC;
- else if (rtx_on_)
- strategy = notification::RecoveryStrategy::RTX_ONLY;
- else if (fec_on_)
- strategy = notification::RecoveryStrategy::FEC_ONLY;
+ if (rtx_on_ && fec_on_)
+ strategy = notification::RecoveryStrategy::RTX_AND_FEC;
+ else if (rtx_on_)
+ strategy = notification::RecoveryStrategy::RTX_ONLY;
+ else if (fec_on_)
+ strategy = notification::RecoveryStrategy::FEC_ONLY;
- (*callback_)(strategy);
- }
+ callback_(strategy);
}
// common functions
@@ -392,6 +422,12 @@ void RecoveryStrategy::removePacketState(uint32_t seq) {
return;
}
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ nacked_seq_.erase(it_nack);
+ return;
+ }
+
deleteRtx(seq);
}
@@ -406,7 +442,6 @@ void RecoveryStrategy::reduceFec() {
uint32_t bin = ceil(loss_rate / 5.0) - 1;
if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
fec_per_loss_rate_[bin].fec_to_ask--;
- // std::cout << "reduce fec to ask for bin " << bin << std::endl;
}
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
index 9ffc69a1b..482aedc9d 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -44,7 +44,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service, bool use_rtx, bool use_fec,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategy(RecoveryStrategy &&rs);
@@ -56,8 +56,6 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
void setFecParams(uint32_t n, uint32_t k);
- void tunrOnRecovery() { recovery_on_ = true; }
-
bool isRtx(uint32_t seq) {
if (rtx_state_.find(seq) != rtx_state_.end()) return true;
return false;
@@ -68,12 +66,22 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
return false;
}
+ bool wasNacked(uint32_t seq) {
+ if (nacked_seq_.find(seq) != nacked_seq_.end()) return true;
+ return false;
+ }
+
bool isRtxOn() { return rtx_on_; }
+ bool isFecOn() { return fec_on_; }
RTCState *getState() { return state_; }
bool lossDetected(uint32_t seq);
+ void notifyNewLossDetedcted(uint32_t seq);
+ void requestPossibleLostPacket(uint32_t seq);
+ void receivedFutureNack(uint32_t seq);
void clear();
+ virtual void turnOnRecovery() = 0;
virtual void onNewRound(bool in_sync) = 0;
virtual void newPacketLoss(uint32_t seq) = 0;
virtual void receivedPacket(uint32_t seq) = 0;
@@ -96,7 +104,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void deleteRtx(uint32_t seq);
// fec functions
- uint32_t computeFecPacketsToAsk(bool in_sync);
+ uint32_t computeFecPacketsToAsk();
// common functons
void removePacketState(uint32_t seq);
@@ -105,6 +113,12 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
bool rtx_on_;
bool fec_on_;
+ // number of RTX sent after fec turned on
+ // this is used to take into account jitter and out of order packets
+ // if we detect losses but we do not sent any RTX it means that the holes in
+ // the sequence are caused by the jitter
+ uint32_t rtx_during_fec_;
+
// this map keeps track of the retransmitted interest, ordered from the oldest
// to the newest one. the state contains the timer of the first send of the
// interest (from pendingIntetests_), the timer of the next send (key of the
@@ -117,6 +131,13 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
// lost packets that will be recovered with fec
std::unordered_set<uint32_t> recover_with_fec_;
+ // packet for which we recived a future nack
+ // in case we detect a loss for a nacked packet we send an RTX but we do not
+ // increase the loss counter. this is done because it may happen that the
+ // producer rate checkes over time and in flight interest may be satified by
+ // data packet after the reception of nacks
+ std::unordered_set<uint32_t> nacked_seq_;
+
// rtx vars
std::unique_ptr<asio::steady_timer> timer_;
uint64_t next_rtx_timer_;
@@ -144,7 +165,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
uint32_t round_id_; // number of rounds
uint32_t last_fec_used_;
std::vector<fec_state_> fec_per_loss_rate_;
- interface::StrategyCallback *callback_;
+ interface::StrategyCallback callback_;
};
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
index e2c60ca77..4be751ec9 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- external_callback), // start with rtx
+ std::move(external_callback)), // start with rtx
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
@@ -37,22 +37,40 @@ RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs)
setRtxFec(true, false);
// we have to re-init congestion and
// probing
+ switch_rounds_ = 0;
congestion_state_ = false;
probing_state_ = false;
}
RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {}
+void RecoveryStrategyDelayBased::turnOnRecovery() {
+ recovery_on_ = true;
+ uint64_t rtt = state_->getMinRTT();
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (rtt > 80 && fec_to_ask != 0) {
+ // we need to start FEC (see fec only strategy for more details)
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // use RTX
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ }
+}
+
void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) {
if (fec_to_ask == 0) {
setRtxFec(true, false);
switch_rounds_ = 0;
} else {
switch_rounds_++;
- if (switch_rounds_ >= 5) {
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ != 0) { // go to fec only if it is needed (RTX are on)
setRtxFec(false, true);
} else {
- setRtxFec({}, true);
+ setRtxFec(true, true);
}
}
}
@@ -76,9 +94,13 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
// switch from rtx to fec or keep use fec. Notice that if some rtx are
// waiting to be scheduled, they will be sent normally, but no new rtx will
// be created If the loss rate is 0 keep to use RTX.
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
softSwitchToFec(fec_to_ask);
- indexer_->setNFec(fec_to_ask);
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
return;
}
@@ -112,7 +134,7 @@ void RecoveryStrategyDelayBased::probing() {
// for the moment ask for all fec and exit the probing phase
probing_state_ = false;
setRtxFec(false, true);
- indexer_->setNFec(computeFecPacketsToAsk(true));
+ indexer_->setNFec(computeFecPacketsToAsk());
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
index 0dd199965..5ca90f4cb 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -26,12 +26,13 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy {
public:
RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
~RecoveryStrategyDelayBased();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
index 36d8e39f0..c44212bda 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -25,22 +25,40 @@ namespace rtc {
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
- : RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
- external_callback),
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ std::move(external_callback)),
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
- setRtxFec(false, true);
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
congestion_state_ = false;
probing_state_ = false;
}
RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {}
+void RecoveryStrategyFecOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ // init strategy
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (fec_to_ask > 0) {
+ // the probing phase detected a lossy link. we immedialty start the fec and
+ // we disable the check to prevent to send fec packets before RTX. in fact
+ // here we know that we have losses and it is not a problem of OOO packets
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // keep only RTX on
+ setRtxFec(true, true);
+ }
+}
+
void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
if (!recovery_on_) {
indexer_->setNFec(0);
@@ -66,7 +84,7 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
if (probing_state_) {
probing();
} else {
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
// If fec_to_ask == 0 we use rtx even if in these strategy we use only fec.
// In this way the first packet loss that triggers the usage of fec can be
// recovered using rtx, otherwise it will always be lost
@@ -75,13 +93,19 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
switch_rounds_ = 0;
} else {
switch_rounds_++;
- if (switch_rounds_ >= 5) {
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ !=
+ 0) { // go to fec only if it is needed (RTX are on)
setRtxFec(false, true);
} else {
- setRtxFec({}, true);
+ setRtxFec(true, true); // keep both
}
}
- indexer_->setNFec(fec_to_ask);
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
}
}
@@ -92,7 +116,7 @@ void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) {
if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
addNewRtx(seq, true);
} else {
- // if not pending add rtc
+ // if not pending add to list to recover with fec
recover_with_fec_.insert(seq);
state_->onPossibleLossWithNoRtx(seq);
}
@@ -107,7 +131,7 @@ void RecoveryStrategyFecOnly::probing() {
// TODO
// for the moment ask for all fec and exit the probing phase
probing_state_ = false;
- uint32_t fec_to_ask = computeFecPacketsToAsk(true);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
indexer_->setNFec(fec_to_ask);
}
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
index 37b505d35..1ab78b842 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -26,12 +26,13 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy {
public:
RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
~RecoveryStrategyFecOnly();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
index bd153d209..48dd3e34f 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyLowRate::RecoveryStrategyLowRate(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
- external_callback), // start with fec
+ std::move(external_callback)), // start with fec
fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
rtx_allowed_consecutive_rounds_(0) {
initSwitchVector();
@@ -66,7 +66,7 @@ void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec,
}
void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
if (fec_to_ask == 0) {
// fec is off, turn on RTX immediatly to avoid packet losses
setRecoveryParameters(true, false, 0);
@@ -128,6 +128,11 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
}
}
+void RecoveryStrategyLowRate::turnOnRecovery() {
+ recovery_on_ = 1;
+ // the stategy will be init in the new round function
+}
+
void RecoveryStrategyLowRate::onNewRound(bool in_sync) {
if (!recovery_on_) {
// disable fec so that no extra packet will be sent
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
index f0c7bd0d5..d66b197e2 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -34,12 +34,13 @@ class RecoveryStrategyLowRate : public RecoveryStrategy {
public:
RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyLowRate(RecoveryStrategy &&rs);
~RecoveryStrategyLowRate();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
index 499e978f1..16b14eff6 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
- external_callback) {}
+ std::move(external_callback)) {}
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
@@ -36,6 +36,10 @@ RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {}
+void RecoveryStrategyRecoveryOff::turnOnRecovery() {
+ // nothing to do
+ return;
+}
void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) {
// nothing to do
return;
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
index 98cd1e6a5..3a9e71e7d 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -26,12 +26,13 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
public:
RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
~RecoveryStrategyRecoveryOff();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
index c1ae9b53d..8e5db5439 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- external_callback) {}
+ std::move(external_callback)) {}
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
@@ -36,6 +36,11 @@ RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {}
+void RecoveryStrategyRtxOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ setRtxFec(true, false);
+}
+
void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) {
// nothing to do
return;
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
index 7ae909454..e90e5ba13 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -26,12 +26,13 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy {
public:
RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
~RecoveryStrategyRtxOnly();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 6a21531f8..5b3b5e4c3 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -56,7 +56,6 @@ void RTCState::initParams() {
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
avg_loss_rate_ = -1.0;
- max_loss_rate_ = 0.0;
last_round_loss_rate_ = 0.0;
// loss rate per sec
@@ -85,14 +84,13 @@ void RTCState::initParams() {
fec_recovered_rate_ = 0.0;
// nack counter
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
// packets counter
received_packets_last_round_ = 0;
received_data_last_round_ = 0;
received_data_from_cache_ = 0;
- data_from_cache_rate_ = 0;
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
@@ -103,7 +101,7 @@ void RTCState::initParams() {
last_production_seq_ = 0;
producer_is_active_ = false;
- last_prod_update_ = 0;
+ last_prod_update_seq_ = 0;
// paths stats
path_table_.clear();
@@ -180,7 +178,9 @@ void RTCState::onLossDetected(uint32_t seq) {
PacketState state = getPacketState(seq);
// if the packet is already marked with a state, do nothing
- if (state == PacketState::UNKNOWN) {
+ // to be considered lost the packet must be pending
+ if (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end()) {
packets_lost_++;
addToPacketCache(seq, PacketState::LOST);
}
@@ -225,8 +225,8 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
core::ParamsRTC params = RTCState::getDataParams(content_object);
- if (last_prod_update_ < params.timestamp) {
- last_prod_update_ = params.timestamp;
+ if (last_prod_update_seq_ < seq) {
+ last_prod_update_seq_ = seq;
production_rate_ = (double)params.prod_rate;
}
@@ -267,14 +267,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
uint32_t seq = nack.getName().getSuffix();
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
- uint64_t production_time = nack_pkt->getTimestamp();
uint32_t production_seq = nack_pkt->getProductionSegment();
uint32_t production_rate = nack_pkt->getProductionRate();
if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
- last_prod_update_ < production_time) {
+ last_prod_update_seq_ < production_seq) {
// update production rate
- last_prod_update_ = production_time;
last_production_seq_ = production_seq;
production_rate_ = (double)production_rate;
}
@@ -282,7 +280,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (compute_stats) {
// this is not an RTX
updatePathStats(nack, true);
- nack_on_last_round_ = true;
}
// for statistics pourpose we log all nacks, also the one received for
@@ -297,6 +294,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "lost packet " << seq << " beacuse of a past nack";
+ if (compute_stats) past_nack_on_last_round_ = true;
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
@@ -317,29 +315,15 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
}
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence number is not 1
- if (production_rate_ != 0 || production_seq != 1) {
- producer_is_active_ = true;
- }
-
received_packets_last_round_++;
}
void RTCState::onPacketLost(uint32_t seq) {
-#if 0
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
- auto it = pending_interests_.find(seq);
- if (it != pending_interests_.end()) {
- // this packet was never retransmitted so it does
- // not appear in the loss count
- packets_lost_++;
- }
-#endif
if (!indexer_->isFec(seq)) {
PacketState state = getPacketState(seq);
- if (state == PacketState::LOST || state == PacketState::UNKNOWN) {
+ if (state == PacketState::LOST ||
+ (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end())) {
definitely_lost_pkt_++;
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
@@ -350,7 +334,12 @@ void RTCState::onPacketLost(uint32_t seq) {
void RTCState::onPacketRecoveredRtx(uint32_t seq) {
packets_sent_to_app_++;
if (seq > highest_seq_received_) highest_seq_received_ = seq;
- losses_recovered_++;
+
+ // increase the recovered packet counter only if the packet was marked as LOST
+ // before.
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST) losses_recovered_++;
+
addRecvOrLost(seq, PacketState::RECEIVED);
}
@@ -371,19 +360,30 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
- if (getPacketState(seq) == PacketState::UNKNOWN)
- onLossDetected(seq); // the pkt was lost but didn't account for it yet
+ // the packet could be not marked as lost yet. onLossDetected checks if add in
+ // the packet in the lost count or not
+ onLossDetected(seq);
addRecvOrLost(seq, PacketState::RECEIVED);
}
bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
+
+ bool is_valid = true;
+ uint32_t max = UINT32_MAX;
+ if (params.prod_rate == max) is_valid = false;
uint64_t rtt;
- rtt = probe_handler_->getRtt(seq);
+ rtt = probe_handler_->getRtt(seq, is_valid);
if (rtt == 0) return false; // this is not a valid probe
+ if (!is_valid) return false; // not a valid probe
+
+ // if we are here the producer is active
+ producer_is_active_ = true;
+
// Like for data and nacks update the path stats. Here the RTT is computed
// by the probe handler. Both probes for rtt and bw are good to estimate
// info on the path.
@@ -406,24 +406,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint64_t now = utils::SteadyTime::nowMs().count();
- core::ParamsRTC params = RTCState::getProbeParams(probe);
-
int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
- if (last_prod_update_ < params.timestamp) {
+ if (last_prod_update_seq_ < params.prod_seg) {
last_production_seq_ = params.prod_seg;
- last_prod_update_ = params.timestamp;
production_rate_ = (double)params.prod_rate;
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence numner is not 1
- if (production_rate_ != 0 || params.prod_seg != 1) {
- producer_is_active_ = true;
- }
-
// check for init RTT. if received_probes_ is equal to 0 schedule a timer to
// wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
// wait forever
@@ -453,12 +443,12 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
void RTCState::onJumpForward(uint32_t next_seq) {
for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
seq++) {
- auto it = pending_interests_.find(seq);
PacketState packet_state = getPacketState(seq);
- if (it == pending_interests_.end() &&
- packet_state != PacketState::RECEIVED &&
+ if (packet_state != PacketState::RECEIVED &&
packet_state != PacketState::DEFINITELY_LOST) {
- onLossDetected(seq);
+ // here we considere the packet as definitely lost whitout increase the
+ // lost packet counter because this loss is not due to the network
+ // condition but the transport wants to skip the packet
onPacketLost(seq);
}
}
@@ -491,29 +481,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
fec_recovered_rate_ =
(fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
-#if 0
- // search for an active path. There should be only one active path (meaning a
- // path that leads to the producer socket -no cache- and from which we are
- // currently getting data packets) at any time. However it may happen that
- // there are mulitple active paths in case of mobility (the old path will
- // remain active for a short ammount of time). The main path is selected as
- // the active path from where the consumer received the latest data packet
-
- uint64_t last_packet_ts = 0;
- main_path_ = nullptr;
-
- for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->roundEnd();
- if (it->second->isActive()) {
- uint64_t ts = it->second->getLastPacketTS();
- if (ts > last_packet_ts) {
- last_packet_ts = ts;
- main_path_ = it->second;
- }
- }
- }
-#endif
-
// search for an active path. Is it possible to have multiple path that are
// used at the same time. We use as reference path the one from where we gets
// more packets. This means that the path should have better lantecy or less
@@ -544,7 +511,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
updateLossRate(in_sync);
// handle nacks
- if (!nack_on_last_round_ && received_bytes_ > 0) {
+ if (!past_nack_on_last_round_ && received_bytes_ > 0) {
rounds_without_nacks_++;
} else {
rounds_without_nacks_ = 0;
@@ -561,14 +528,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- // compute cache/producer ratio
- if (received_data_last_round_ != 0) {
- double new_rate =
- (double)received_data_from_cache_ / (double)received_data_last_round_;
- data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
- (new_rate * (1 - MOVING_AVG_ALPHA));
- }
-
// reset counters
received_bytes_ = 0;
received_fec_bytes_ = 0;
@@ -578,7 +537,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
received_packets_last_round_ = 0;
@@ -700,6 +659,7 @@ void RTCState::updateLossRate(bool in_sync) {
expected_packets_ +=
((highest_seq_received_ - first_seq_in_round_) - fec_packets);
} else {
+ expected_packets_ = 0;
packets_sent_to_app_ = 0;
}
@@ -715,7 +675,6 @@ void RTCState::updateLossRate(bool in_sync) {
(double)((double)(lost_per_sec_) / (double)total_expected_packets_);
loss_history_.pushBack(per_sec_loss_rate_);
- max_loss_rate_ = getMaxLoss();
if (in_sync && expected_packets_ != 0) {
// compute residual loss rate
@@ -778,7 +737,9 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
// however we may need to increse the number or lost packets
// XXX: in case we want to use rtx to recover fec packets,
// this may prevent to detect a packet loss and no rtx will be sent
- onLossDetected(i);
+ if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) {
+ onLossDetected(i);
+ }
} else {
// this is a data packet and we need to get it
break;
@@ -806,8 +767,9 @@ void RTCState::setInitRttTimer(uint32_t wait) {
}
void RTCState::checkInitRttTimer() {
- if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
- // we didn't received enough probes, restart
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV ||
+ probe_handler_->getProbeLossRate() == 1.0) {
+ // we didn't received enough probes or they were not valid, restart
received_probes_ = 0;
probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
@@ -819,7 +781,6 @@ void RTCState::checkInitRttTimer() {
init_rtt_ = true;
main_path_->roundEnd();
loss_history_.pushBack(probe_handler_->getProbeLossRate());
- max_loss_rate_ = getMaxLoss();
probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
@@ -829,17 +790,12 @@ void RTCState::checkInitRttTimer() {
double prod_rate = getProducerRate();
double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt));
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();
}
-double RTCState::getMaxLoss() {
- if (loss_history_.size() != 0) return loss_history_.begin();
- return 0;
-}
-
core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
core::ParamsRTC params;
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 8bf48ccc2..4bd2f76a0 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -162,7 +162,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
double getPerRoundLossRate() const { return loss_rate_; }
double getPerSecondLossRate() const { return per_sec_loss_rate_; }
double getAvgLossRate() const { return avg_loss_rate_; }
- double getMaxLossRate() const { return max_loss_rate_; }
+ double getMaxLossRate() const {
+ if (loss_history_.size() != 0) return loss_history_.begin();
+ return 0;
+ }
+
double getLastRoundLossRate() const { return last_round_loss_rate_; }
double getResidualLossRate() const { return residual_loss_rate_; }
@@ -177,8 +181,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
return highest_seq_received_in_order_;
}
- double getMaxLoss();
-
// fec packets
uint32_t getReceivedFecPackets() const { return received_fec_pkt_; }
uint32_t getPendingFecPackets() const { return pending_fec_pkt_; }
@@ -213,7 +215,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
bool isProducerActive() const { return producer_is_active_; }
// packets from cache
- double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+ // this should be called at the end of a round beacuse otherwise we may have
+ // not enough packets to get a good stat
+ double getPacketFromCacheRatio() const {
+ if (received_data_last_round_ == 0) return 0;
+ return (double)received_data_from_cache_ /
+ (double)received_data_last_round_;
+ }
PendingInterestsMap::iterator getPendingInterestsMapBegin() {
return pending_interests_.begin();
@@ -286,7 +294,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
uint32_t last_seq_nacked_; // segment for which we got an oldNack
double loss_rate_;
double avg_loss_rate_;
- double max_loss_rate_;
double last_round_loss_rate_;
utils::MaxFilter<double> loss_history_;
@@ -314,17 +321,16 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
double fec_recovered_rate_; // rate recovered using fec
// nack counters
- // the bool takes tracks only about the valid nacks (no rtx) and it is used to
- // switch between the states. Instead received_nacks_last_round_ logs all the
- // nacks for statistics
- bool nack_on_last_round_;
+ // the bool takes tracks only about the valid past nacks (no rtx) and it is
+ // used to switch between the states. Instead received_nacks_last_round_ logs
+ // all the nacks for statistics
+ bool past_nack_on_last_round_;
uint32_t received_nacks_last_round_;
// packets counters
uint32_t received_packets_last_round_;
uint32_t received_data_last_round_;
uint32_t received_data_from_cache_;
- double data_from_cache_rate_;
uint32_t sent_interests_last_round_;
uint32_t sent_rtx_last_round_;
@@ -344,10 +350,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// producer state
bool
producer_is_active_; // the prodcuer is active if we receive some packets
- uint32_t
- last_production_seq_; // last production seq received by the producer
- uint64_t last_prod_update_; // timestamp of the last packets used to update
- // stats from the producer
+ uint32_t last_production_seq_; // last production seq received by the
+ // producer used to init the sync protcol
+ uint32_t last_prod_update_seq_; // seq number of the last packet used to
+ // update the update from the producer.
+ // assumption: the highest seq number carries
+ // the most up to date info. in case of
+ // probes we look at the produced seq number
// paths stats
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
index 29968dd02..7b6330a1f 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.cc
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -22,8 +22,11 @@ namespace protocol {
namespace rtc {
RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_delay)
- : verifier_(verifier), max_unverified_delay_(max_unverified_delay) {}
+ uint32_t max_unverified_interval,
+ double max_unverified_ratio)
+ : verifier_(verifier),
+ max_unverified_interval_(max_unverified_interval),
+ max_unverified_ratio_(max_unverified_ratio) {}
void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
rtc_state_ = rtc_state;
@@ -33,15 +36,20 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
verifier_ = verifier;
}
-void RTCVerifier::setMaxUnverifiedDelay(uint32_t max_unverified_delay) {
- max_unverified_delay_ = max_unverified_delay;
+void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) {
+ max_unverified_interval_ = max_unverified_interval;
+}
+
+void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) {
+ max_unverified_ratio_ = max_unverified_ratio;
}
auth::VerificationPolicy RTCVerifier::verify(
core::ContentObject &content_object, bool is_fec) {
- uint32_t suffix = content_object.getName().getSuffix();
- core::PayloadType payload_type = content_object.getPayloadType();
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy default_policy = auth::VerificationPolicy::ABORT;
+ core::PayloadType payload_type = content_object.getPayloadType();
bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE;
bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
bool is_manifest = !is_probe && !is_nack && !is_fec &&
@@ -55,29 +63,31 @@ auth::VerificationPolicy RTCVerifier::verify(
if (is_data) return verifyData(content_object);
if (is_manifest) return verifyManifest(content_object);
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
+ verifier_->callVerificationFailedCallback(suffix, default_policy);
+ return default_policy;
}
auth::VerificationPolicy RTCVerifier::verifyProbe(
core::ContentObject &content_object) {
- switch (ProbeHandler::getProbeType(content_object.getName().getSuffix())) {
- case ProbeType::INIT: {
- auth::VerificationPolicy policy = verifyManifest(content_object);
- if (policy != auth::VerificationPolicy::ACCEPT) {
- return policy;
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+
+ switch (ProbeHandler::getProbeType(suffix)) {
+ case ProbeType::INIT:
+ policy = verifyManifest(content_object);
+ if (policy == auth::VerificationPolicy::ACCEPT) {
+ policy = processManifest(content_object);
}
- return processManifest(content_object);
- }
+ break;
case ProbeType::RTT:
- return verifyNack(content_object);
+ policy = verifyNack(content_object);
+ break;
default:
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(
- content_object.getName().getSuffix(), policy);
- return policy;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ break;
}
+
+ return policy;
}
auth::VerificationPolicy RTCVerifier::verifyNack(
@@ -92,28 +102,30 @@ auth::VerificationPolicy RTCVerifier::verifyFec(
auth::VerificationPolicy RTCVerifier::verifyData(
core::ContentObject &content_object) {
- uint32_t suffix = content_object.getName().getSuffix();
-
if (_is_ah(content_object.getFormat())) {
return verifier_->verifyPackets(&content_object);
}
- unverified_bytes_[suffix] =
- content_object.headerSize() + content_object.payloadSize();
- unverified_packets_[suffix] =
- content_object.computeDigest(manifest_hash_algo_);
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ Timestamp now = utils::SteadyTime::nowMs().count();
+
+ // Flush old packets
+ Timestamp oldest = flush_packets(now);
- // An alert is raised when too much packets remain unverified
- if (getTotalUnverified() > max_unverified_bytes_) {
- unverified_bytes_.clear();
- unverified_packets_.clear();
+ // Add packet to map of unverified packets
+ packets_unverif_.add(
+ {.suffix = suffix, .timestamp = now, .size = content_object.length()},
+ content_object.computeDigest(manifest_hash_algo_));
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
+ // Check that the ratio of unverified packets stays below the limit
+ if (now - oldest < max_unverified_interval_ ||
+ getBufferRatio() < max_unverified_ratio_) {
+ policy = auth::VerificationPolicy::ACCEPT;
}
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
}
auth::VerificationPolicy RTCVerifier::verifyManifest(
@@ -123,8 +135,10 @@ auth::VerificationPolicy RTCVerifier::verifyManifest(
auth::VerificationPolicy RTCVerifier::processManifest(
core::ContentObject &content_object) {
- uint32_t suffix = content_object.getName().getSuffix();
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT;
+ // Decode manifest
core::ContentObjectManifest manifest(content_object);
manifest.decode();
@@ -133,65 +147,62 @@ auth::VerificationPolicy RTCVerifier::processManifest(
last_manifest_ = suffix;
}
- // Extract parameters
+ // Extract hash algorithm and hashes
manifest_hash_algo_ = manifest.getHashAlgorithm();
- core::ParamsRTC params = manifest.getParamsRTC();
-
- if (params.prod_rate > 0) {
- max_unverified_bytes_ = static_cast<uint64_t>(
- (max_unverified_delay_ / 1000.0) * params.prod_rate);
- }
-
- if (max_unverified_bytes_ == 0 || !rtc_state_) {
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
- }
-
- // Extract hashes
auth::Verifier::SuffixMap suffix_map =
core::ContentObjectManifest::getSuffixMap(&manifest);
// Return early if the manifest is empty
if (suffix_map.empty()) {
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
}
- // Remove lost packets from digest map
+ // Add hashes to map of all manifest hashes
manifest_digests_.insert(suffix_map.begin(), suffix_map.end());
+
+ // Remove discarded and definitely lost packets from digest map
for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) {
+ auto it_erased = packets_unverif_erased_.find(it->first);
+
+ if (it_erased != packets_unverif_erased_.end()) {
+ packets_unverif_erased_.erase(it_erased);
+ it = manifest_digests_.erase(it);
+ continue;
+ }
+
if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) {
- unverified_packets_.erase(it->first);
- unverified_bytes_.erase(it->first);
it = manifest_digests_.erase(it);
- } else {
- ++it;
+ continue;
}
+
+ ++it;
}
// Verify packets
auth::Verifier::PolicyMap policies =
- verifier_->verifyHashes(unverified_packets_, manifest_digests_);
+ verifier_->verifyHashes(packets_unverif_.suffixMap(), manifest_digests_);
- for (const auto &policy : policies) {
- switch (policy.second) {
+ for (const auto &p : policies) {
+ switch (p.second) {
case auth::VerificationPolicy::ACCEPT: {
- manifest_digests_.erase(policy.first);
- unverified_packets_.erase(policy.first);
- unverified_bytes_.erase(policy.first);
+ auto packet_unverif_it = packets_unverif_.packetIt(p.first);
+ Packet packet_verif = *packet_unverif_it;
+ packets_unverif_.remove(packet_unverif_it);
+ packets_verif_.add(packet_verif);
+ manifest_digests_.erase(p.first);
break;
}
case auth::VerificationPolicy::UNKNOWN:
break;
case auth::VerificationPolicy::DROP:
case auth::VerificationPolicy::ABORT:
- auth::VerificationPolicy p = policy.second;
- verifier_->callVerificationFailedCallback(policy.first, p);
- return p;
+ return p.second;
}
}
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
}
void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
@@ -203,35 +214,101 @@ void RTCVerifier::onJumpForward(uint32_t next_suffix) {
return;
}
- // When we jump forward in the suffix sequence, we remove packets that
- // probably won't be verified. Those packets have a suffix in the range
- // [last_manifest_ + 1, next_suffix[.
- for (auto it = unverified_packets_.begin();
- it != unverified_packets_.end();) {
- if (it->first > last_manifest_) {
- unverified_bytes_.erase(it->first);
- it = unverified_packets_.erase(it);
- } else {
- ++it;
+ // When we jump forward in the suffix sequence, we remove packets that won't
+ // be verified. Those packets have a suffix in the range [last_manifest_ + 1,
+ // next_suffix[.
+ for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix;
+ ++suffix) {
+ auto packet_it = packets_unverif_.packetIt(suffix);
+ if (packet_it != packets_unverif_.set().end()) {
+ packets_unverif_.remove(packet_it);
}
}
}
-uint32_t RTCVerifier::getTotalUnverified() const {
- uint32_t total = 0;
+double RTCVerifier::getBufferRatio() const {
+ size_t total = packets_verif_.size() + packets_unverif_.size();
+ double total_unverified = static_cast<double>(packets_unverif_.size());
+ return total ? total_unverified / total : 0.0;
+}
+
+RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) {
+ Timestamp oldest_verified = packets_verif_.set().empty()
+ ? now
+ : packets_verif_.set().begin()->timestamp;
+ Timestamp oldest_unverified = packets_unverif_.set().empty()
+ ? now
+ : packets_unverif_.set().begin()->timestamp;
+
+ // Prune verified packets older than the unverified interval
+ for (auto it = packets_verif_.set().begin();
+ it != packets_verif_.set().end();) {
+ if (now - it->timestamp < max_unverified_interval_) {
+ break;
+ }
+ it = packets_verif_.remove(it);
+ }
- for (auto bytes : unverified_bytes_) {
- if (bytes.second > UINT32_MAX - total) {
- total = UINT32_MAX;
+ // Prune unverified packets older than the unverified interval
+ for (auto it = packets_unverif_.set().begin();
+ it != packets_unverif_.set().end();) {
+ if (now - it->timestamp < max_unverified_interval_) {
break;
}
- total += bytes.second;
+ packets_unverif_erased_.insert(it->suffix);
+ it = packets_unverif_.remove(it);
+ }
+
+ return std::min(oldest_verified, oldest_unverified);
+}
+
+std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add(
+ const Packet &packet) {
+ auto inserted = packets_.insert(packet);
+ size_ += inserted.second ? packet.size : 0;
+ return inserted;
+}
+
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove(
+ PacketSet::iterator packet_it) {
+ size_ -= packet_it->size;
+ return packets_.erase(packet_it);
+}
+
+const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const {
+ return packets_;
+};
+
+size_t RTCVerifier::Packets::size() const { return size_; };
+
+std::pair<RTCVerifier::PacketSet::iterator, bool>
+RTCVerifier::PacketsUnverif::add(const Packet &packet,
+ const auth::CryptoHash &digest) {
+ auto inserted = add(packet);
+ if (inserted.second) {
+ packets_map_[packet.suffix] = inserted.first;
+ digests_map_[packet.suffix] = digest;
}
+ return inserted;
+}
- return total;
+RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove(
+ PacketSet::iterator packet_it) {
+ size_ -= packet_it->size;
+ packets_map_.erase(packet_it->suffix);
+ digests_map_.erase(packet_it->suffix);
+ return packets_.erase(packet_it);
}
-uint32_t RTCVerifier::getMaxUnverified() const { return max_unverified_bytes_; }
+RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt(
+ auth::Suffix suffix) {
+ return packets_map_.at(suffix);
+};
+
+const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap()
+ const {
+ return digests_map_;
+}
} // end namespace rtc
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
index 596bd8536..098984057 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.h
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -26,8 +26,9 @@ namespace rtc {
class RTCVerifier {
public:
- RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_delay);
+ explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t max_unverified_interval,
+ double max_unverified_ratio);
virtual ~RTCVerifier() = default;
@@ -35,13 +36,9 @@ class RTCVerifier {
void setVerifier(std::shared_ptr<auth::Verifier> verifier);
- void setMaxUnverifiedDelay(uint32_t max_unverified_delay);
+ void setMaxUnverifiedInterval(uint32_t max_unverified_interval);
- void onDataRecoveredFec(uint32_t suffix);
- void onJumpForward(uint32_t next_suffix);
-
- uint32_t getTotalUnverified() const;
- uint32_t getMaxUnverified() const;
+ void setMaxUnverifiedRatio(double max_unverified_ratio);
auth::VerificationPolicy verify(core::ContentObject &content_object,
bool is_fec = false);
@@ -53,27 +50,84 @@ class RTCVerifier {
auth::VerificationPolicy processManifest(core::ContentObject &content_object);
+ void onDataRecoveredFec(uint32_t suffix);
+ void onJumpForward(uint32_t next_suffix);
+
+ double getBufferRatio() const;
+
protected:
+ struct Packet;
+ using Timestamp = uint64_t;
+ using PacketSet = std::set<Packet>;
+
+ struct Packet {
+ auth::Suffix suffix;
+ Timestamp timestamp;
+ size_t size;
+
+ bool operator==(const Packet &b) const {
+ return timestamp == b.timestamp && suffix == b.suffix;
+ }
+ bool operator<(const Packet &b) const {
+ return timestamp == b.timestamp ? suffix < b.suffix
+ : timestamp < b.timestamp;
+ }
+ };
+
+ class Packets {
+ public:
+ virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet);
+ virtual PacketSet::iterator remove(PacketSet::iterator packet_it);
+ const PacketSet &set() const;
+ size_t size() const;
+
+ protected:
+ PacketSet packets_;
+ size_t size_;
+ };
+
+ class PacketsVerif : public Packets {};
+
+ class PacketsUnverif : public Packets {
+ public:
+ using Packets::add;
+ std::pair<PacketSet::iterator, bool> add(const Packet &packet,
+ const auth::CryptoHash &digest);
+ PacketSet::iterator remove(PacketSet::iterator packet_it) override;
+ PacketSet::iterator packetIt(auth::Suffix suffix);
+ const auth::Verifier::SuffixMap &suffixMap() const;
+
+ private:
+ std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_;
+ auth::Verifier::SuffixMap digests_map_;
+ };
+
// The RTC state.
std::shared_ptr<RTCState> rtc_state_;
// The verifier instance.
std::shared_ptr<auth::Verifier> verifier_;
+ // Window to consider when verifying packets.
+ uint32_t max_unverified_interval_;
+ // Ratio of unverified packets over which an alert is triggered.
+ double max_unverified_ratio_;
+ // The suffix of the last processed manifest.
+ auth::Suffix last_manifest_;
// Hash algorithm used by manifests.
auth::CryptoHashType manifest_hash_algo_;
- // The last manifest processed.
- auth::Suffix last_manifest_;
- // Hold digests extracted from all manifests received.
+ // Digests extracted from all manifests received.
auth::Verifier::SuffixMap manifest_digests_;
- // Hold hashes of all content objects received before they are verified.
- auth::Verifier::SuffixMap unverified_packets_;
- // Hold number of unverified bytes.
- std::unordered_map<auth::Suffix, uint32_t> unverified_bytes_;
- // Maximum delay (in ms) for an unverified byte to become verifed.
- uint32_t max_unverified_delay_;
- // Maximum number of unverified bytes before aborting the connection.
- uint64_t max_unverified_bytes_;
+ // Verified packets with timestamp >= now - max_unverified_interval_.
+ PacketsVerif packets_verif_;
+ // Unverified packets with timestamp >= now - max_unverified_interval_.
+ PacketsUnverif packets_unverif_;
+ // Unverified erased packets with timestamp < now - max_unverified_interval_.
+ std::unordered_set<auth::Suffix> packets_unverif_erased_;
+
+ // Flushes all packets with timestamp < now - max_unverified_interval_.
+ // Returns the timestamp of the oldest packet, verified or not.
+ Timestamp flush_packets(Timestamp now);
};
-} // end namespace rtc
+} // namespace rtc
} // namespace protocol
} // namespace transport
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
index f1e49ec0b..a73b9fb7b 100644
--- a/libtransport/src/protocols/transport_protocol.cc
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -80,12 +80,6 @@ int TransportProtocol::start() {
socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
- std::string fec_type_str = "";
- socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str);
- if (fec_type_str != "") {
- fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str());
- }
-
// Set it is the first time we schedule an interest
is_first_ = true;
diff --git a/libtransport/src/test/test_auth.cc b/libtransport/src/test/test_auth.cc
index d7fd55433..0c47dd958 100644
--- a/libtransport/src/test/test_auth.cc
+++ b/libtransport/src/test/test_auth.cc
@@ -117,13 +117,11 @@ TEST_F(AuthTest, AsymmetricBufferRSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
@@ -157,13 +155,11 @@ TEST_F(AuthTest, AsymmetricBufferDSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
@@ -233,13 +229,11 @@ TEST_F(AuthTest, AsymmetricBufferECDSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
} // namespace auth
@@ -290,11 +284,9 @@ TEST_F(AuthTest, HMACbuffer) {
std::string payload = "bonjour";
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
SymmetricVerifier hmac(PASSPHRASE);
- bool res = hmac.verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = hmac.verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
diff --git a/libtransport/src/test/test_core_manifest.cc b/libtransport/src/test/test_core_manifest.cc
index 23fd5e342..b998ce96b 100644
--- a/libtransport/src/test/test_core_manifest.cc
+++ b/libtransport/src/test/test_core_manifest.cc
@@ -173,7 +173,7 @@ TEST_F(ManifestTest, SetParamsRTC) {
.timestamp = 1,
.prod_rate = 2,
.prod_seg = 3,
- .support_fec = 1,
+ .fec_type = protocol::fec::FECType::UNKNOWN,
};
manifest1_.setParamsRTC(params);
diff --git a/libtransport/src/utils/max_filter.h b/libtransport/src/utils/max_filter.h
index 7a2c6aace..db1a1a565 100644
--- a/libtransport/src/utils/max_filter.h
+++ b/libtransport/src/utils/max_filter.h
@@ -28,7 +28,7 @@ class MaxFilter {
public:
MaxFilter(std::size_t size) : size_(size) {}
- std::size_t size() { return by_arrival_.size(); }
+ std::size_t size() const { return by_arrival_.size(); }
template <typename R>
void pushBack(R&& value) {
@@ -45,9 +45,9 @@ class MaxFilter {
by_order_.clear();
}
- const T& begin() { return *by_order_.crbegin(); }
+ const T& begin() const { return *by_order_.crbegin(); }
- const T& rBegin() { return *by_order_.rbegin(); }
+ const T& rBegin() const { return *by_order_.rbegin(); }
private:
std::multiset<T> by_order_;
diff --git a/libtransport/src/utils/min_filter.h b/libtransport/src/utils/min_filter.h
index 4c7ae81f1..4a3882601 100644
--- a/libtransport/src/utils/min_filter.h
+++ b/libtransport/src/utils/min_filter.h
@@ -28,7 +28,7 @@ class MinFilter {
public:
MinFilter(std::size_t size) : size_(size) {}
- std::size_t size() { return by_arrival_.size(); }
+ std::size_t size() const { return by_arrival_.size(); }
template <typename R>
void pushBack(R&& value) {
@@ -45,9 +45,9 @@ class MinFilter {
by_order_.clear();
}
- const T& begin() { return *by_order_.cbegin(); }
+ const T& begin() const { return *by_order_.cbegin(); }
- const T& rBegin() { return *by_order_.crbegin(); }
+ const T& rBegin() const { return *by_order_.crbegin(); }
private:
std::multiset<T> by_order_;