diff options
Diffstat (limited to 'libtransport/src')
41 files changed, 898 insertions, 565 deletions
diff --git a/libtransport/src/auth/crypto_hash.cc b/libtransport/src/auth/crypto_hash.cc index f60f46051..08be47aff 100644 --- a/libtransport/src/auth/crypto_hash.cc +++ b/libtransport/src/auth/crypto_hash.cc @@ -14,6 +14,9 @@ */ #include <hicn/transport/auth/crypto_hash.h> +#include <hicn/transport/core/global_object_pool.h> + +#include "glog/logging.h" namespace transport { namespace auth { @@ -27,18 +30,20 @@ CryptoHash::CryptoHash(const CryptoHash &other) CryptoHash::CryptoHash(CryptoHash &&other) : digest_type_(std::move(other.digest_type_)), - digest_(other.digest_), - digest_size_(other.digest_size_) { - other.reset(); -} + digest_(std::move(other.digest_)), + digest_size_(other.digest_size_) {} -CryptoHash::CryptoHash(CryptoHashType hash_type) { setType(hash_type); } +CryptoHash::CryptoHash(CryptoHashType hash_type) + : digest_(core::PacketManager<>::getInstance().getMemBuf()) { + setType(hash_type); +} CryptoHash::CryptoHash(const uint8_t *hash, size_t size, CryptoHashType hash_type) : digest_type_(hash_type), digest_size_(size) { - digest_.resize(size); - memcpy(digest_.data(), hash, size); + digest_ = core::PacketManager<>::getInstance().getMemBuf(); + digest_->append(size); + memcpy(digest_->writableData(), hash, size); } CryptoHash::CryptoHash(const std::vector<uint8_t> &hash, @@ -55,7 +60,7 @@ CryptoHash &CryptoHash::operator=(const CryptoHash &other) { } bool CryptoHash::operator==(const CryptoHash &other) const { - return (digest_type_ == other.digest_type_ && digest_ == other.digest_); + return (digest_type_ == other.digest_type_ && *digest_ == *other.digest_); } void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) { @@ -65,8 +70,8 @@ void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) { throw errors::RuntimeException("Unknown hash type"); } - EVP_Digest(buffer, len, digest_.data(), (unsigned int *)&digest_size_, - (*hash_evp)(), nullptr); + EVP_Digest(buffer, len, digest_->writableData(), + (unsigned int *)&digest_size_, (*hash_evp)(), nullptr); } void CryptoHash::computeDigest(const std::vector<uint8_t> &buffer) { @@ -95,7 +100,7 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) { p = p->next(); } while (p != buffer); - if (EVP_DigestFinal_ex(mcdtx, digest_.data(), + if (EVP_DigestFinal_ex(mcdtx, digest_->writableData(), (unsigned int *)&digest_size_) != 1) { throw errors::RuntimeException("Digest computation failed"); } @@ -103,15 +108,16 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) { EVP_MD_CTX_free(mcdtx); } -std::vector<uint8_t> CryptoHash::getDigest() const { return digest_; } +const utils::MemBuf::Ptr &CryptoHash::getDigest() const { return digest_; } std::string CryptoHash::getStringDigest() const { std::stringstream string_digest; string_digest << std::hex << std::setfill('0'); - for (auto byte : digest_) { - string_digest << std::hex << std::setw(2) << static_cast<int>(byte); + for (size_t i = 0; i < digest_size_; ++i) { + string_digest << std::hex << std::setw(2) + << static_cast<int>(digest_->data()[i]); } return string_digest.str(); @@ -122,10 +128,10 @@ CryptoHashType CryptoHash::getType() const { return digest_type_; } size_t CryptoHash::getSize() const { return digest_size_; } void CryptoHash::setType(CryptoHashType hash_type) { - reset(); digest_type_ = hash_type; digest_size_ = CryptoHash::getSize(hash_type); - digest_.resize(digest_size_); + DCHECK(digest_size_ <= digest_->tailroom()); + digest_->setLength(digest_size_); } void CryptoHash::display() { @@ -152,8 +158,8 @@ void CryptoHash::display() { void CryptoHash::reset() { digest_type_ = CryptoHashType::UNKNOWN; - digest_.clear(); digest_size_ = 0; + digest_->setLength(0); } CryptoHashEVP CryptoHash::getEVP(CryptoHashType hash_type) { diff --git a/libtransport/src/auth/signer.cc b/libtransport/src/auth/signer.cc index e74e2f1b8..918e271f5 100644 --- a/libtransport/src/auth/signer.cc +++ b/libtransport/src/auth/signer.cc @@ -17,6 +17,8 @@ #include <hicn/transport/auth/signer.h> #include <hicn/transport/utils/chrono_typedefs.h> +#include "hicn/transport/core/global_object_pool.h" + namespace transport { namespace auth { @@ -24,7 +26,10 @@ namespace auth { // Base Signer // --------------------------------------------------------- Signer::Signer() - : suite_(CryptoSuite::UNKNOWN), signature_len_(0), key_(nullptr) {} + : suite_(CryptoSuite::UNKNOWN), + signature_(core::PacketManager<>::getInstance().getMemBuf()), + signature_len_(0), + key_(nullptr) {} Signer::~Signer() {} @@ -51,8 +56,8 @@ void Signer::signPacket(PacketPtr packet) { packet->setValidationAlgorithm(suite_); // Set key ID - std::vector<uint8_t> key_id = key_id_.getDigest(); - packet->setKeyId({key_id.data(), key_id.size()}); + const utils::MemBuf::Ptr &key_id = key_id_.getDigest(); + packet->setKeyId({key_id->writableData(), key_id->length()}); // Reset fields to compute the packet hash packet->resetForHash(); @@ -93,14 +98,16 @@ void Signer::signBuffer(const std::vector<uint8_t> &buffer) { throw errors::RuntimeException("Digest computation failed"); } - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); - if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) != - 1) { + if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(), + &signature_len_) != 1) { throw errors::RuntimeException("Digest computation failed"); } - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); } void Signer::signBuffer(const utils::MemBuf *buffer) { @@ -135,24 +142,27 @@ void Signer::signBuffer(const utils::MemBuf *buffer) { throw errors::RuntimeException("Digest computation failed"); } - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); - if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) != - 1) { + if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(), + &signature_len_) != 1) { throw errors::RuntimeException("Digest computation failed"); } - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); } -std::vector<uint8_t> Signer::getSignature() const { return signature_; } +const utils::MemBuf::Ptr &Signer::getSignature() const { return signature_; } std::string Signer::getStringSignature() const { std::stringstream string_sig; string_sig << std::hex << std::setfill('0'); - for (auto byte : signature_) { - string_sig << std::hex << std::setw(2) << static_cast<int>(byte); + for (size_t i = 0; i < signature_len_; ++i) { + string_sig << std::hex << std::setw(2) + << static_cast<int>(signature_->data()[i]); } return string_sig.str(); @@ -193,12 +203,14 @@ void VoidSigner::signBuffer(const utils::MemBuf *buffer) {} // --------------------------------------------------------- AsymmetricSigner::AsymmetricSigner(CryptoSuite suite, std::shared_ptr<EVP_PKEY> key, - std::shared_ptr<EVP_PKEY> pub_key) { + std::shared_ptr<EVP_PKEY> pub_key) + : Signer() { setKey(suite, key, pub_key); } AsymmetricSigner::AsymmetricSigner(std::string keystore_path, - std::string password) { + std::string password) + : Signer() { FILE *p12file = fopen(keystore_path.c_str(), "r"); if (p12file == nullptr) { @@ -230,7 +242,8 @@ void AsymmetricSigner::setKey(CryptoSuite suite, std::shared_ptr<EVP_PKEY> key, suite_ = suite; key_ = key; signature_len_ = EVP_PKEY_size(key.get()); - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); std::vector<uint8_t> pbk(i2d_PublicKey(pub_key.get(), nullptr)); uint8_t *pbk_ptr = pbk.data(); @@ -254,7 +267,8 @@ size_t AsymmetricSigner::getSignatureFieldSize() const { // Symmetric Signer // --------------------------------------------------------- SymmetricSigner::SymmetricSigner(CryptoSuite suite, - const std::string &passphrase) { + const std::string &passphrase) + : Signer() { suite_ = suite; key_ = std::shared_ptr<EVP_PKEY>( EVP_PKEY_new_raw_private_key(EVP_PKEY_HMAC, nullptr, @@ -270,7 +284,8 @@ SymmetricSigner::SymmetricSigner(CryptoSuite suite, } signature_len_ = EVP_MD_size((*hash_evp)()); - signature_.resize(signature_len_); + DCHECK(signature_len_ <= signature_->tailroom()); + signature_->setLength(signature_len_); key_id_.computeDigest((uint8_t *)passphrase.c_str(), passphrase.size()); } diff --git a/libtransport/src/auth/verifier.cc b/libtransport/src/auth/verifier.cc index 0c35437f3..5d5f01711 100644 --- a/libtransport/src/auth/verifier.cc +++ b/libtransport/src/auth/verifier.cc @@ -14,8 +14,11 @@ */ #include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/global_object_pool.h> #include <protocols/errors.h> +#include "glog/logging.h" + namespace transport { namespace auth { @@ -49,8 +52,10 @@ bool Verifier::verifyPacket(PacketPtr packet) { hicn_packet_copy_header(format, packet->packet_start_, &header_copy, false); // Retrieve packet signature - std::vector<uint8_t> signature_raw = packet->getSignature(); - signature_raw.resize(packet->getSignatureSize()); + utils::MemBuf::Ptr signature_raw = packet->getSignature(); + std::size_t signature_len = packet->getSignatureSize(); + DCHECK(signature_len <= signature_raw->tailroom()); + signature_raw->setLength(signature_len); // Reset fields that are not used to compute signature packet->resetForHash(); @@ -62,7 +67,7 @@ bool Verifier::verifyPacket(PacketPtr packet) { // Restore header hicn_packet_copy_header(format, &header_copy, packet->packet_start_, false); packet->setSignature(signature_raw); - packet->setSignatureSize(signature_raw.size()); + packet->setSignatureSize(signature_raw->length()); return valid_packet; } @@ -165,13 +170,13 @@ void Verifier::callVerificationFailedCallback(Suffix suffix, bool VoidVerifier::verifyPacket(PacketPtr packet) { return true; } bool VoidVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { return true; } bool VoidVerifier::verifyBuffer(const utils::MemBuf *buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { return true; } @@ -232,7 +237,7 @@ void AsymmetricVerifier::useCertificate(std::shared_ptr<X509> cert) { } bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type); @@ -255,12 +260,12 @@ bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, throw errors::RuntimeException("Digest update failed"); } - return EVP_DigestVerifyFinal(mdctx.get(), signature.data(), - signature.size()) == 1; + return EVP_DigestVerifyFinal(mdctx.get(), signature->data(), + signature->length()) == 1; } bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type); @@ -288,8 +293,8 @@ bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer, p = p->next(); } while (p != buffer); - return EVP_DigestVerifyFinal(mdctx.get(), signature.data(), - signature.size()) == 1; + return EVP_DigestVerifyFinal(mdctx.get(), signature->data(), + signature->length()) == 1; } // --------------------------------------------------------- @@ -309,7 +314,7 @@ void SymmetricVerifier::setPassphrase(const std::string &passphrase) { } bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type); @@ -317,7 +322,9 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, throw errors::RuntimeException("Unknown hash type"); } - std::vector<uint8_t> signature_bis(signature.size()); + const utils::MemBuf::Ptr &signature_bis = + core::PacketManager<>::getInstance().getMemBuf(); + signature_bis->append(signature->length()); size_t signature_bis_len; std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free); @@ -334,16 +341,17 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer, throw errors::RuntimeException("Digest update failed"); } - if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(), + if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(), &signature_bis_len) != 1) { throw errors::RuntimeException("Digest computation failed"); } - return signature == signature_bis && signature.size() == signature_bis_len; + return signature->length() == signature_bis_len && + *signature == *signature_bis; } bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer, - const std::vector<uint8_t> &signature, + const utils::MemBuf::Ptr &signature, CryptoHashType hash_type) { CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type); @@ -352,7 +360,9 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer, } const utils::MemBuf *p = buffer; - std::vector<uint8_t> signature_bis(signature.size()); + const utils::MemBuf::Ptr &signature_bis = + core::PacketManager<>::getInstance().getMemBuf(); + signature_bis->append(signature->length()); size_t signature_bis_len; std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free); @@ -373,12 +383,13 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer, p = p->next(); } while (p != buffer); - if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(), + if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(), &signature_bis_len) != 1) { throw errors::RuntimeException("Digest computation failed"); } - return signature == signature_bis && signature.size() == signature_bis_len; + return signature->length() == signature_bis_len && + *signature == *signature_bis; } } // namespace auth diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h index 38f26067e..caee210cd 100644 --- a/libtransport/src/core/manifest_format.h +++ b/libtransport/src/core/manifest_format.h @@ -18,6 +18,7 @@ #include <hicn/transport/auth/crypto_hash.h> #include <hicn/transport/core/name.h> #include <hicn/transport/interfaces/socket_options_keys.h> +#include <protocols/fec_utils.h> #include <cinttypes> #include <type_traits> @@ -41,11 +42,11 @@ struct ParamsRTC { std::uint64_t timestamp; std::uint32_t prod_rate; std::uint32_t prod_seg; - std::uint32_t support_fec; + protocol::fec::FECType fec_type; bool operator==(const ParamsRTC &other) const { return (timestamp == other.timestamp && prod_rate == other.prod_rate && - prod_seg == other.prod_seg && support_fec == other.support_fec); + prod_seg == other.prod_seg && fec_type == other.fec_type); } }; diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc index 4c8a5e031..428d6ad12 100644 --- a/libtransport/src/core/manifest_format_fixed.cc +++ b/libtransport/src/core/manifest_format_fixed.cc @@ -154,22 +154,20 @@ FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl( .timestamp = params.timestamp, .prod_rate = params.prod_rate, .prod_seg = params.prod_seg, - .support_fec = params.support_fec, + .fec_type = static_cast<uint32_t>(params.fec_type), }; return *this; } FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl( uint32_t suffix, const auth::CryptoHash &hash) { - std::vector<uint8_t> _hash = hash.getDigest(); - manifest_entries_.push_back(ManifestEntry{ .suffix = htonl(suffix), .hash = {0}, }); std::memcpy(reinterpret_cast<uint8_t *>(manifest_entries_.back().hash), - _hash.data(), _hash.size()); + hash.getDigest()->data(), hash.getSize()); if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) { throw errors::RuntimeException("Manifest size exceeded the packet MTU!"); @@ -301,7 +299,7 @@ ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const { .timestamp = params_rtc_->timestamp, .prod_rate = params_rtc_->prod_rate, .prod_seg = params_rtc_->prod_seg, - .support_fec = params_rtc_->support_fec, + .fec_type = static_cast<protocol::fec::FECType>(params_rtc_->fec_type), }; } diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h index ade4bf02c..5fd2a673d 100644 --- a/libtransport/src/core/manifest_format_fixed.h +++ b/libtransport/src/core/manifest_format_fixed.h @@ -65,9 +65,7 @@ namespace core { // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // | Current Segment | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// |F| | -// + Reserved for future parameters + -// | | +// | FEC Type | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // Manifest Entry: @@ -137,7 +135,7 @@ struct __attribute__((__packed__)) TransportParamsRTC { std::uint64_t timestamp; std::uint32_t prod_rate; std::uint32_t prod_seg; - std::uint32_t support_fec; + std::uint32_t fec_type; }; static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE); diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc index df27444af..0c08246af 100644 --- a/libtransport/src/core/packet.cc +++ b/libtransport/src/core/packet.cc @@ -15,6 +15,7 @@ #include <glog/logging.h> #include <hicn/transport/auth/crypto_hash.h> +#include <hicn/transport/core/global_object_pool.h> #include <hicn/transport/core/packet.h> #include <hicn/transport/errors/malformed_packet_exception.h> #include <hicn/transport/utils/hash.h> @@ -481,7 +482,7 @@ uint8_t Packet::getTTL() const { bool Packet::hasAH() const { return _is_ah(format_); } -std::vector<uint8_t> Packet::getSignature() const { +utils::MemBuf::Ptr Packet::getSignature() const { if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } @@ -493,7 +494,11 @@ std::vector<uint8_t> Packet::getSignature() const { throw errors::RuntimeException("Error getting signature."); } - return std::vector<uint8_t>(signature, signature + getSignatureFieldSize()); + utils::MemBuf::Ptr membuf = PacketManager<>::getInstance().getMemBuf(); + membuf->append(getSignatureFieldSize()); + memcpy(membuf->writableData(), signature, getSignatureFieldSize()); + + return membuf; } std::size_t Packet::getSignatureFieldSize() const { @@ -570,7 +575,7 @@ auth::CryptoSuite Packet::getValidationAlgorithm() const { return auth::CryptoSuite(return_value); } -void Packet::setSignature(const std::vector<uint8_t> &signature) { +void Packet::setSignature(const utils::MemBuf::Ptr &signature) { if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } @@ -580,7 +585,7 @@ void Packet::setSignature(const std::vector<uint8_t> &signature) { if (ret < 0) { throw errors::RuntimeException("Error getting signature."); } - memcpy(signature_field, signature.data(), signature.size()); + memcpy(signature_field, signature->data(), signature->length()); } void Packet::setSignatureFieldSize(std::size_t size) { diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index ebdac7f93..33e70888f 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -56,7 +56,8 @@ class ConsumerSocket : public Socket { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), - max_unverified_delay_(default_values::max_unverified_delay), + unverified_interval_(default_values::unverified_interval), + unverified_ratio_(default_values::unverified_ratio), verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), reset_window_(false), @@ -71,7 +72,6 @@ class ConsumerSocket : public Socket { timer_interval_milliseconds_(0), recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), aggregated_data_(false), - fec_setting_(""), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: @@ -197,6 +197,10 @@ class ConsumerSocket : public Socket { current_window_size_ = socket_option_value; break; + case UNVERIFIED_RATIO: + unverified_ratio_ = socket_option_value; + break; + case GAMMA_VALUE: gamma_ = socket_option_value; break; @@ -238,8 +242,8 @@ class ConsumerSocket : public Socket { interest_lifetime_ = socket_option_value; break; - case GeneralTransportOptions::MAX_UNVERIFIED_TIME: - max_unverified_delay_ = socket_option_value; + case GeneralTransportOptions::UNVERIFIED_INTERVAL: + unverified_interval_ = socket_option_value; break; case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: @@ -437,11 +441,6 @@ class ConsumerSocket : public Socket { result = SOCKET_OPTION_SET; } break; - case GeneralTransportOptions::FEC_TYPE: - fec_setting_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - default: return result; } @@ -507,6 +506,10 @@ class ConsumerSocket : public Socket { socket_option_value = current_window_size_; break; + case GeneralTransportOptions::UNVERIFIED_RATIO: + socket_option_value = unverified_ratio_; + break; + // RAAQM parameters case RaaqmTransportOptions::GAMMA_VALUE: @@ -547,8 +550,8 @@ class ConsumerSocket : public Socket { socket_option_value = interest_lifetime_; break; - case GeneralTransportOptions::MAX_UNVERIFIED_TIME: - socket_option_value = max_unverified_delay_; + case GeneralTransportOptions::UNVERIFIED_INTERVAL: + socket_option_value = unverified_interval_; break; case RaaqmTransportOptions::SAMPLE_NUMBER: @@ -702,13 +705,9 @@ class ConsumerSocket : public Socket { case DataLinkOptions::OUTPUT_INTERFACE: socket_option_value = output_interface_; break; - case GeneralTransportOptions::FEC_TYPE: - socket_option_value = fec_setting_; - break; default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -828,7 +827,8 @@ class ConsumerSocket : public Socket { int rate_estimation_choice_; // Verification parameters - int max_unverified_delay_; + uint32_t unverified_interval_; + double unverified_ratio_; std::shared_ptr<auth::Verifier> verifier_; transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; @@ -857,9 +857,6 @@ class ConsumerSocket : public Socket { RtcTransportRecoveryStrategies recovery_strategy_; bool aggregated_data_; - // FEC setting - std::string fec_setting_; - utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; diff --git a/libtransport/src/protocols/fec/fec.cc b/libtransport/src/protocols/fec/fec.cc index 912e7a40f..5881d4d92 100644 --- a/libtransport/src/protocols/fec/fec.cc +++ b/libtransport/src/protocols/fec/fec.cc @@ -719,4 +719,4 @@ int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) { } free(m_dec); return 0; -} +}
\ No newline at end of file diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index 242abd30d..e49f58167 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -33,13 +33,13 @@ RTCProductionProtocol::RTCProductionProtocol( implementation::ProducerSocket *icn_socket) : ProductionProtocol(icn_socket), current_seg_(1), + prev_produced_bytes_(0), + prev_produced_packets_(0), produced_bytes_(0), produced_packets_(0), - produced_fec_packets_(0), - max_packet_production_(1), - bytes_production_rate_(0), + max_packet_production_(UINT32_MAX), + bytes_production_rate_(UINT32_MAX), packets_production_rate_(0), - fec_packets_production_rate_(0), last_produced_data_ts_(0), last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), @@ -116,32 +116,47 @@ void RTCProductionProtocol::scheduleRoundTimer() { auto sp = self.lock(); if (sp && sp->isRunning()) { - sp->updateStats(); + sp->updateStats(true); } }); } -void RTCProductionProtocol::updateStats() { +void RTCProductionProtocol::updateStats(bool new_round) { uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t duration = now - last_round_; - if (duration == 0) duration = 1; + if (!new_round) { + duration += rtc::PRODUCER_STATS_INTERVAL; + } else { + prev_produced_bytes_ = 0; + prev_produced_packets_ = 0; + } + double per_second = rtc::MILLI_IN_A_SEC / duration; uint32_t prev_packets_production_rate = packets_production_rate_; - bytes_production_rate_ = ceil((double)produced_bytes_ * per_second); - packets_production_rate_ = ceil((double)produced_packets_ * per_second); - fec_packets_production_rate_ = - ceil((double)produced_fec_packets_ * per_second); + // bytes_production_rate_ does not take into account FEC!!! this is because + // each client requests a differen amount of FEC packet so the client itself + // increase the production rate in the right way + bytes_production_rate_ = + ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second); + packets_production_rate_ = + ceil((double)(produced_packets_ + prev_produced_packets_) * per_second); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Updating production rate: produced_bytes_ = " << produced_bytes_ - << " bps = " << bytes_production_rate_; + // add fec packets looking at the fec code. we don't use directly the number + // of fec packets produced in 1 round because it may happen that different + // numbers of blocks are generated during the rounds and this creates + // inconsistencies in the estimation of the production rate + uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); + uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); + + packets_production_rate_ += + ceil((double)packets_production_rate_ / (double)k) * (n - k); // update the production rate as soon as it increases by 10% with respect to // the last round max_packet_production_ = - produced_packets_ + ceil((double)produced_packets_ * 0.1); + produced_packets_ + ceil((double)produced_packets_ * 0.10); if (max_packet_production_ < rtc::WIN_MIN) max_packet_production_ = rtc::WIN_MIN; @@ -158,11 +173,14 @@ void RTCProductionProtocol::updateStats() { sendNacksForPendingInterests(); } - produced_bytes_ = 0; - produced_packets_ = 0; - produced_fec_packets_ = 0; - last_round_ = now; - scheduleRoundTimer(); + if (new_round) { + prev_produced_bytes_ = produced_bytes_; + prev_produced_packets_ = produced_packets_; + produced_bytes_ = 0; + produced_packets_ = 0; + last_round_ = now; + scheduleRoundTimer(); + } } uint32_t RTCProductionProtocol::produceStream( @@ -387,7 +405,7 @@ RTCProductionProtocol::createManifest(const Name &content_name) const { .timestamp = now, .prod_rate = bytes_production_rate_, .prod_seg = current_seg_, - .support_fec = false, + .fec_type = fec_type_, }); return manifest; @@ -434,16 +452,13 @@ void RTCProductionProtocol::producePktInternal( produced_bytes_ += content_object->headerSize() + content_object->payloadSize(); produced_packets_++; - } else { - produced_fec_packets_++; } if (!data_aggregation_ && produced_packets_ >= max_packet_production_) { // in this case all the pending interests may be used to accomodate the // sudden increase in the production rate. calling the updateStats we will // notify all the clients - round_timer_->cancel(); - updateStats(); + updateStats(false); } DLOG_IF(INFO, VLOG_IS_ON(3)) @@ -616,7 +631,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, // if the production rate 0 use delayed nacks if (allow_delayed_nacks_ && interest_seg >= current_seg_) { - uint64_t next_timer = ~0; + uint64_t next_timer = UINT64_MAX; if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } @@ -652,8 +667,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, (double)((double)((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / rtc::MILLI_IN_A_SEC) * - (double)(packets_production_rate_ + - fec_packets_production_rate_))); + (double)(packets_production_rate_))); if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { sendNack(interest_seg); @@ -723,20 +737,30 @@ void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, void RTCProductionProtocol::sendNacksForPendingInterests() { std::unordered_set<uint32_t> to_remove; - uint32_t packet_gap = 100000; // set it to a high value (100sec) - if (packets_production_rate_ != 0) - packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); + uint32_t pps = ceil((double)(packets_production_rate_)*rtc:: + INTEREST_LIFETIME_REDUCTION_FACTOR); uint64_t now = utils::SteadyTime::nowMs().count(); - for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { - if (it->first > current_seg_) { - uint64_t production_time = - ((it->first - current_seg_) * packet_gap) + now; - if (production_time >= it->second) { + if (it->first > current_seg_ && it->second > now) { + double exp_time_in_sec = + (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC; + uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec); + + if (it->first > (current_seg_ + packets_prod_before_expire)) { sendNack(it->first); to_remove.insert(it->first); } + } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ || + it->second <= now)) { + // this branch should never be execcuted + // first condition: the packet was already prdocued and we have and old + // interest pending. send a nack to notify the consumer if needed. the + // case it->first = current_seg_ is not handled because + // the interest will be satified by the next data packet. + // second condition: the interest is expired. + sendNack(it->first); + to_remove.insert(it->first); } } diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h index 7f50a2505..c0424a39c 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.h +++ b/libtransport/src/protocols/prod_protocol_rtc.h @@ -77,7 +77,7 @@ class RTCProductionProtocol : public ProductionProtocol { const Name &name) const; // stats - void updateStats(); + void updateStats(bool new_round); void scheduleRoundTimer(); // pending intersts functions @@ -108,16 +108,17 @@ class RTCProductionProtocol : public ProductionProtocol { uint32_t prod_label_; // path label of the producer uint32_t cache_label_; // path label for content from the producer cache + uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars + uint32_t prev_produced_packets_; + uint32_t produced_bytes_; // bytes produced in the last round uint32_t produced_packets_; // packet produed in the last round - uint32_t produced_fec_packets_; // fec packets produced last round uint32_t max_packet_production_; // never exceed this number of packets // without update stats uint32_t bytes_production_rate_; // bytes per sec uint32_t packets_production_rate_; // pps - uint32_t fec_packets_production_rate_; // pps uint64_t last_produced_data_ts_; // ms diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc index abb234757..6a84914ab 100644 --- a/libtransport/src/protocols/rtc/probe_handler.cc +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -36,11 +36,18 @@ ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback, ProbeHandler::~ProbeHandler() {} -uint64_t ProbeHandler::getRtt(uint32_t seq) { +uint64_t ProbeHandler::getRtt(uint32_t seq, bool is_valid) { auto it = pending_probes_.find(seq); if (it == pending_probes_.end()) return 0; + if (!is_valid) { + // delete the probe anyway + pending_probes_.erase(it); + valid_batch_ = false; + return 0; + } + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t rtt = now - it->second; if (rtt < 1) rtt = 1; @@ -52,6 +59,7 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) { } double ProbeHandler::getProbeLossRate() { + if (!valid_batch_) return 1.0; return 1.0 - ((double)recv_probes_ / (double)sent_probes_); } @@ -71,11 +79,26 @@ void ProbeHandler::stopProbes() { max_probes_ = 0; sent_probes_ = 0; recv_probes_ = 0; + valid_batch_ = true; probe_timer_->cancel(); } void ProbeHandler::sendProbes() { if (probe_interval_ == 0) return; + + std::weak_ptr<ProbeHandler> self(shared_from_this()); + probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); + probe_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + auto s = self.lock(); + if (s) { + s->generateProbe(); + } + }); +} + +void ProbeHandler::generateProbe() { + if (probe_interval_ == 0) return; if (max_probes_ != 0 && sent_probes_ >= max_probes_) return; uint64_t now = utils::SteadyTime::nowMs().count(); @@ -97,17 +120,7 @@ void ProbeHandler::sendProbes() { } } - if (probe_interval_ == 0) return; - - std::weak_ptr<ProbeHandler> self(shared_from_this()); - probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); - probe_timer_->async_wait([self](const std::error_code &ec) { - if (ec) return; - auto s = self.lock(); - if (s) { - s->sendProbes(); - } - }); + sendProbes(); } ProbeType ProbeHandler::getProbeType(uint32_t seq) { diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h index 2de908176..d989194d4 100644 --- a/libtransport/src/protocols/rtc/probe_handler.h +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -42,7 +42,7 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { ~ProbeHandler(); // If the function returns 0 the probe is not valid. - uint64_t getRtt(uint32_t seq); + uint64_t getRtt(uint32_t seq, bool is_valid); // this function may return a residual loss rate higher than the real one if // we don't wait enough time for the probes to come back @@ -63,11 +63,17 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { static ProbeType getProbeType(uint32_t seq); private: + void generateProbe(); + uint32_t probe_interval_; // us uint32_t max_probes_; // packets uint32_t sent_probes_; // packets uint32_t recv_probes_; // packets + bool valid_batch_; // if at least one probe in a batch is considered not + // valid (e.g. prod rate == ~0) the full batch is invalid. + // the bool is set to true when sendProbe is called + std::unique_ptr<asio::steady_timer> probe_timer_; // Map from packet suffixes to timestamp diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index df6522471..d2682edfa 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -63,16 +63,25 @@ std::size_t RTCTransportProtocol::transportHeaderLength() { // private void RTCTransportProtocol::initParams() { TransportProtocol::reset(); - fwd_strategy_.setCallback(on_fwd_strategy_); - std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + fwd_strategy_.setCallback([self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy); + } + }); + std::shared_ptr<auth::Verifier> verifier; socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - uint32_t max_unverified_delay; - socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME, - max_unverified_delay); + uint32_t unverified_interval; + socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL, + unverified_interval); + + double unverified_ratio; + socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, + unverified_ratio); rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( @@ -84,8 +93,15 @@ void RTCTransportProtocol::initParams() { ptr->sendRtxInterest(seq); } }, - on_rec_strategy_); - verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay); + [self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy); + } + }); + + verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval, + unverified_ratio); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), @@ -102,7 +118,6 @@ void RTCTransportProtocol::initParams() { } }, portal_->getThread().getIoService()); - state_->initParams(); rc_->setState(state_); rc_->turnOnRateControl(); @@ -153,21 +168,8 @@ void RTCTransportProtocol::initParams() { socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, RTC_INTEREST_LIFETIME); - // FEC - using namespace std::placeholders; - enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1), - /* We leave the buffer allocation to the fec decoder */ - fec::FECBase::BufferRequested(0)); - - if (fec_decoder_) { - indexer_verifier_->enableFec(fec_type_); - indexer_verifier_->setNFec(0); - ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), - fec::FECUtils::getSourceSymbols(fec_type_)); - fec_decoder_->setIOService(portal_->getThread().getIoService()); - } else { - indexer_verifier_->disableFec(); - } + // init state params + state_->initParams(); } // private @@ -223,29 +225,26 @@ void RTCTransportProtocol::newRound() { uint32_t received_nacks = state->getReceivedNacksInRound(); uint32_t received_fec = state->getReceivedFecPackets(); - bool in_sync = (ptr->current_state_ == SyncState::in_sync); - ptr->ldr_->onNewRound(in_sync); - ptr->state_->onNewRound((double)ROUND_LEN, in_sync); - ptr->rc_->onNewRound((double)ROUND_LEN); - // update sync state if needed + double cache_rate = state->getPacketFromCacheRatio(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + if (ptr->current_state_ == SyncState::in_sync) { - double cache_rate = state->getPacketFromCacheRatio(); if (cache_rate > MAX_DATA_FROM_CACHE) { ptr->current_state_ = SyncState::catch_up; } } else { - double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION; - double received_rate = - state->getReceivedRate() + state->getRecoveredFecRate(); - uint32_t round_without_nacks = state->getRoundsWithoutNacks(); - double cache_ratio = state->getPacketFromCacheRatio(); if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && - received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { + cache_rate < MAX_DATA_FROM_CACHE) { ptr->current_state_ = SyncState::in_sync; } } + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Calling updateSyncWindow in newRound function"; ptr->updateSyncWindow(); @@ -335,7 +334,7 @@ void RTCTransportProtocol::updateSyncWindow() { // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); - uint32_t buffer = PRODUCER_BUFFER_MS; + uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0); current_sync_win_ += ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); @@ -360,15 +359,6 @@ void RTCTransportProtocol::updateSyncWindow() { scheduleNextInterests(); } -void RTCTransportProtocol::decreaseSyncWindow() { - // called on future nack - // we have a new sample of the production rate, so update max win first - computeMaxSyncWindow(); - current_sync_win_--; - current_sync_win_ = std::max(current_sync_win_, WIN_MIN); - scheduleNextInterests(); -} - void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { if (!isRunning() && !is_first_) return; @@ -468,7 +458,6 @@ void RTCTransportProtocol::scheduleNextInterests() { auto ptr = self.lock(); if (ptr && ptr->isRunning()) { if (!ptr->scheduler_timer_on_) return; - ptr->scheduler_timer_on_ = false; ptr->scheduleNextInterests(); } @@ -688,8 +677,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // switch to catch up state and increase the window // this is true only if the packet is not an RTX if (!is_rtx) current_state_ = SyncState::catch_up; - - updateSyncWindow(); } else { // if production_seg == nack_segment we consider this a future nack, since // production_seg is not yet created. this may happen in case of low @@ -702,23 +689,50 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // the client is asking for content in the future // switch to in sync state and decrease the window current_state_ = SyncState::in_sync; - decreaseSyncWindow(); } + updateSyncWindow(); } void RTCTransportProtocol::onProbe(const ContentObject &content_object) { - bool valid = state_->onProbePacketReceived(content_object); - if (!valid) return; + uint32_t suffix = content_object.getName().getSuffix(); + ParamsRTC params = RTCState::getProbeParams(content_object); + + if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) { + fec::FECType fec_type = params.fec_type; + + if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) { + // Update FEC type + fec_type_ = fec_type; + + // Enable FEC + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, + std::placeholders::_1), + fec::FECBase::BufferRequested(0)); + + // Update FEC parameters + indexer_verifier_->enableFec(fec_type); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type), + fec::FECUtils::getSourceSymbols(fec_type)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); + } else if (fec_type == fec::FECType::UNKNOWN) { + indexer_verifier_->disableFec(); + } + } - uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg; + if (!state_->onProbePacketReceived(content_object)) return; - // As for the nacks set next_segment + // As for NACKs, set next_segment DLOG_IF(INFO, VLOG_IS_ON(3)) << "on probe next seg = " << indexer_verifier_->checkNextSuffix() - << ", jump to " << production_seg; - indexer_verifier_->jumpToIndex(production_seg); - - ldr_->onProbePacketReceived(content_object); + << ", jump to " << params.prod_seg; + indexer_verifier_->jumpToIndex(params.prod_seg); + + bool loss_detected = ldr_->onProbePacketReceived(content_object); + // we are not out of sync here but we are starting to download content from + // the cache, maybe beacuse the production rate increased suddenly. for this + // reason we put the state to catch up to increase the window + if (loss_detected) current_state_ = SyncState::catch_up; updateSyncWindow(); } @@ -822,6 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived( // The packet is considered received, return early onDataPacketReceived(*content_ptr, compute_stats); + // this is a rtx but we may need to feed it in the decoder + decodePacket(content_object, is_manifest); return; } @@ -846,18 +862,8 @@ void RTCTransportProtocol::onContentObjectReceived( state_->dataToBeReceived(segment_number); } - // Send packet to FEC decoder - if (fec_decoder_) { - DLOG_IF(INFO, VLOG_IS_ON(4)) - << "Send packet " << segment_number << " to FEC decoder"; - - uint32_t offset = is_manifest - ? content_object.headerSize() - : content_object.headerSize() + rtc::DATA_HEADER_SIZE; - uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); - - fec_decoder_->onDataPacket(content_object, offset, metadata); - } + // send packet to the decoder + decodePacket(content_object, is_manifest); // We can return early if FEC if (is_fec) { @@ -947,6 +953,21 @@ void RTCTransportProtocol::sendStatsToApp( } } +void RTCTransportProtocol::decodePacket(ContentObject &content_object, + bool is_manifest) { + if (!fec_decoder_) return; + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << content_object.getName() << " to FEC decoder"; + + uint32_t offset = is_manifest + ? content_object.headerSize() + : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); +} + void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { Packet::Format format; socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h index 37706eb1c..3763f33c7 100644 --- a/libtransport/src/protocols/rtc/rtc.h +++ b/libtransport/src/protocols/rtc/rtc.h @@ -65,7 +65,6 @@ class RTCTransportProtocol : public TransportProtocol { // window functions void computeMaxSyncWindow(); void updateSyncWindow(); - void decreaseSyncWindow(); // packet functions void sendRtxInterest(uint32_t seq); @@ -89,6 +88,8 @@ class RTCTransportProtocol : public TransportProtocol { uint32_t received_nacks, uint32_t received_fec); // FEC functions + // send the received content object to the decoder + void decodePacket(ContentObject &content_object, bool is_manifest); void onFecPackets(fec::BufferArray &packets); // Utils diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h index 03efd8e84..96e39d07e 100644 --- a/libtransport/src/protocols/rtc/rtc_consts.h +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -34,7 +34,7 @@ const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8; // increasing this number we increase the time that an // interest will wait for the data packet to be produced // at the producer socket -const uint32_t PRODUCER_BUFFER_MS = 200; // ms +const uint32_t PRODUCER_BUFFER_MS = 300; // ms // interest scheduler // const uint32_t MAX_INTERESTS_IN_BATCH = 5; @@ -72,7 +72,8 @@ const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT // to get an answer. we wait 100ms between each try const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms // once we get the first probe we wait at most 60ms for the others -const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms +const uint32_t INIT_RTT_PROBE_WAIT = + ((INIT_RTT_PROBES * INIT_RTT_PROBE_INTERVAL) / 1000) * 2; // ms // we reuires at least 5 probes to be recevied const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms const uint32_t MAX_PENDING_PROBES = 10; @@ -81,7 +82,7 @@ const uint32_t MAX_PENDING_PROBES = 10; const double MAX_QUEUING_DELAY = 50.0; // ms // data from cache -const double MAX_DATA_FROM_CACHE = 0.25; // 25% +const double MAX_DATA_FROM_CACHE = 0.10; // 10% // window const const uint32_t INITIAL_WIN = 5; // pkts diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc index 9503eed3e..c6bc751e6 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc @@ -35,8 +35,9 @@ RTCForwardingStrategy::RTCForwardingStrategy() RTCForwardingStrategy::~RTCForwardingStrategy() {} -void RTCForwardingStrategy::setCallback(interface::StrategyCallback* callback) { - callback_ = callback; +void RTCForwardingStrategy::setCallback( + interface::StrategyCallback&& callback) { + callback_ = std::move(callback); } void RTCForwardingStrategy::initFwdStrategy( @@ -55,27 +56,24 @@ void RTCForwardingStrategy::initFwdStrategy( } void RTCForwardingStrategy::checkStrategy() { - if (*callback_) { - strategy_t used_strategy = selected_strategy_; - if (used_strategy == BOTH) used_strategy = current_strategy_; - assert(used_strategy == BEST_PATH || used_strategy == REPLICATION || - used_strategy == NONE); - - notification::ForwardingStrategy strategy = - notification::ForwardingStrategy::NONE; - switch (used_strategy) { - case BEST_PATH: - strategy = notification::ForwardingStrategy::BEST_PATH; - break; - case REPLICATION: - strategy = notification::ForwardingStrategy::REPLICATION; - break; - default: - break; - } - - (*callback_)(strategy); + strategy_t used_strategy = selected_strategy_; + if (used_strategy == BOTH) used_strategy = current_strategy_; + assert(used_strategy == BEST_PATH || used_strategy == REPLICATION || + used_strategy == NONE); + + notification::ForwardingStrategy strategy = + notification::ForwardingStrategy::NONE; + switch (used_strategy) { + case BEST_PATH: + strategy = notification::ForwardingStrategy::BEST_PATH; + break; + case REPLICATION: + strategy = notification::ForwardingStrategy::REPLICATION; + break; + default: + break; } + callback_(strategy); if (!init_) return; diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h index 821b28051..9825877fd 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h @@ -44,7 +44,7 @@ class RTCForwardingStrategy { strategy_t strategy); void checkStrategy(); - void setCallback(interface::StrategyCallback* callback); + void setCallback(interface::StrategyCallback&& callback); private: void checkStrategyBestPath(); @@ -68,7 +68,7 @@ class RTCForwardingStrategy { core::Prefix prefix_; std::shared_ptr<core::Portal> portal_; RTCState* state_; - interface::StrategyCallback* callback_; + interface::StrategyCallback callback_; }; } // namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index 1ca1cf48d..abf6cda2c 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -36,17 +36,17 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( Indexer *indexer, asio::io_service &io_service, interface::RtcTransportRecoveryStrategies type, RecoveryStrategy::SendRtxCallback &&callback, - interface::StrategyCallback *external_callback) { + interface::StrategyCallback &&external_callback) { rs_type_ = type; if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { rs_ = std::make_shared<RecoveryStrategyRecoveryOff>( - indexer, std::move(callback), io_service, external_callback); + indexer, std::move(callback), io_service, std::move(external_callback)); } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { rs_ = std::make_shared<RecoveryStrategyDelayBased>( - indexer, std::move(callback), io_service, external_callback); + indexer, std::move(callback), io_service, std::move(external_callback)); } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { rs_ = std::make_shared<RecoveryStrategyFecOnly>( - indexer, std::move(callback), io_service, external_callback); + indexer, std::move(callback), io_service, std::move(external_callback)); } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_BESTPATH || @@ -55,12 +55,12 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_ALL_FWD_STRATEGIES) { rs_ = std::make_shared<RecoveryStrategyLowRate>( - indexer, std::move(callback), io_service, external_callback); + indexer, std::move(callback), io_service, std::move(external_callback)); } else { // default rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY; rs_ = std::make_shared<RecoveryStrategyRtxOnly>( - indexer, std::move(callback), io_service, external_callback); + indexer, std::move(callback), io_service, std::move(external_callback)); } } @@ -97,19 +97,21 @@ void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) { rs_->onNewRound(in_sync); } -void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) { +bool RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) { if (!lost) { - detectLoss(seq, seq + 1); + return detectLoss(seq, seq + 1, false); } else { rs_->onLostTimeout(seq); } + return false; } -void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { +bool RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { rs_->receivedPacket(seq); + return false; } -void RTCLossDetectionAndRecovery::onDataPacketReceived( +bool RTCLossDetectionAndRecovery::onDataPacketReceived( const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); bool is_rtx = rs_->isRtx(seq); @@ -118,10 +120,13 @@ void RTCLossDetectionAndRecovery::onDataPacketReceived( << "received data. add from " << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq; if (!is_rtx) - detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq); + return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq, + false); + + return false; } -void RTCLossDetectionAndRecovery::onNackPacketReceived( +bool RTCLossDetectionAndRecovery::onNackPacketReceived( const core::ContentObject &nack) { struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); @@ -140,11 +145,18 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( << "received nack. add from " << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq; - detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, - production_seq); + + // if it is a future nack store it in the list set of nacked seq + if (production_seq <= seq) rs_->receivedFutureNack(seq); + + // call the detectLoss function using the probe flag = true. in fact the + // losses detected using nacks are the same as the one detected using probes, + // we should not increase the loss counter + return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + production_seq, true); } -void RTCLossDetectionAndRecovery::onProbePacketReceived( +bool RTCLossDetectionAndRecovery::onProbePacketReceived( const core::ContentObject &probe) { // we don't log the reception of a probe packet for the sentinel timer because // probes are not taken into account into the sync window. we use them as @@ -157,12 +169,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived( << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq; - detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, - production_seq); + return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + production_seq, true); } -void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) { - if (start >= stop) return; +bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop, + bool recv_probe) { + if (start >= stop) return false; // skip nacked packets if (start <= rs_->getState()->getLastSeqNacked()) { @@ -174,13 +187,31 @@ void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) { start = rs_->getState()->getHighestSeqReceivedInOrder() + 1; } + bool loss_detected = false; for (uint32_t seq = start; seq < stop; seq++) { if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) { if (rs_->lossDetected(seq)) { - rs_->getState()->onLossDetected(seq); + loss_detected = true; + if ((recv_probe || rs_->wasNacked(seq)) && !rs_->isFecOn()) { + // these losses were detected using a probe and fec is off. + // in this case most likelly the procotol is about to go out of sync + // and the packets are not really lost (e.g. increase in prod rate). + // for this reason we do not + // count the losses in the stats. Instead we do the following + // 1. send RTX for the packets in case they were really lost + // 2. return to the RTC protocol that a loss was detected using a + // probe. the protocol will switch to catch_up mode to increase the + // size of the window + rs_->requestPossibleLostPacket(seq); + } else { + // if fec is on we don't need to mask pontetial losses, so increase + // the loss rate + rs_->notifyNewLossDetedcted(seq); + } } } } + return loss_detected; } } // namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h index e7f8ce5db..7f683eaa6 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.h +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -36,7 +36,7 @@ class RTCLossDetectionAndRecovery RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service, interface::RtcTransportRecoveryStrategies type, RecoveryStrategy::SendRtxCallback &&callback, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); ~RTCLossDetectionAndRecovery(); @@ -47,17 +47,19 @@ class RTCLossDetectionAndRecovery void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); } - void turnOnRecovery() { rs_->tunrOnRecovery(); } + void turnOnRecovery() { rs_->turnOnRecovery(); } bool isRtxOn() { return rs_->isRtxOn(); } void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type); void onNewRound(bool in_sync); - void onTimeout(uint32_t seq, bool lost); - void onPacketRecoveredFec(uint32_t seq); - void onDataPacketReceived(const core::ContentObject &content_object); - void onNackPacketReceived(const core::ContentObject &nack); - void onProbePacketReceived(const core::ContentObject &probe); + + // the following functions return true if a loss is detected, false otherwise + bool onTimeout(uint32_t seq, bool lost); + bool onPacketRecoveredFec(uint32_t seq); + bool onDataPacketReceived(const core::ContentObject &content_object); + bool onNackPacketReceived(const core::ContentObject &nack); + bool onProbePacketReceived(const core::ContentObject &probe); void clear() { rs_->clear(); } @@ -67,7 +69,8 @@ class RTCLossDetectionAndRecovery } private: - void detectLoss(uint32_t start, uint32_t stop); + // returns true if a loss is detected, false otherwise + bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe); interface::RtcTransportRecoveryStrategies rs_type_; std::shared_ptr<RecoveryStrategy> rs_; diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc index 888105eab..66ae5086c 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -29,20 +29,22 @@ using namespace transport::interface; RecoveryStrategy::RecoveryStrategy( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - bool use_rtx, bool use_fec, interface::StrategyCallback *external_callback) + bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback) : recovery_on_(false), + rtx_during_fec_(0), next_rtx_timer_(MAX_TIMER_RTX), send_rtx_callback_(std::move(callback)), indexer_(indexer), round_id_(0), last_fec_used_(0), - callback_(external_callback) { + callback_(std::move(external_callback)) { setRtxFec(use_rtx, use_fec); timer_ = std::make_unique<asio::steady_timer>(io_service); } RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) - : rtx_state_(std::move(rs.rtx_state_)), + : rtx_during_fec_(0), + rtx_state_(std::move(rs.rtx_state_)), rtx_timers_(std::move(rs.rtx_timers_)), recover_with_fec_(std::move(rs.recover_with_fec_)), timer_(std::move(rs.timer_)), @@ -55,7 +57,7 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) rc_(std::move(rs.rc_)), round_id_(std::move(rs.round_id_)), last_fec_used_(std::move(rs.last_fec_used_)), - callback_(rs.callback_) { + callback_(std::move(rs.callback_)) { setFecParams(n_, k_); } @@ -68,7 +70,7 @@ void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { // XXX for the moment we go in steps of 5% loss rate. // max loss rate = 95% for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)loss_rate / 100.0; + double dec_loss_rate = (double)(loss_rate + 5) / 100.0; double exp_losses = (double)k_ * dec_loss_rate; uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); @@ -87,18 +89,39 @@ bool RecoveryStrategy::lossDetected(uint32_t seq) { return false; } - auto it = recover_with_fec_.find(seq); - if (it != recover_with_fec_.end()) { + auto it_fec = recover_with_fec_.find(seq); + if (it_fec != recover_with_fec_.end()) { // this packet is already in list of packets to recover with fec // this list contians also fec packets that will not be recovered with rtx return false; } - // new loss detected, recover it according to the strategy - newPacketLoss(seq); + auto it_nack = nacked_seq_.find(seq); + if (it_nack != nacked_seq_.end()) { + // this packet was nacked so we do not use it to determine the loss rate + return false; + } + return true; } +void RecoveryStrategy::notifyNewLossDetedcted(uint32_t seq) { + // new loss detected + // first record the loss. second do what is needed to recover it + state_->onLossDetected(seq); + newPacketLoss(seq); +} + +void RecoveryStrategy::requestPossibleLostPacket(uint32_t seq) { + // these are packets for which we send a RTX but we do not increase the loss + // counter beacuse we don't know if they are lost or not + addNewRtx(seq, false); +} + +void RecoveryStrategy::receivedFutureNack(uint32_t seq) { + nacked_seq_.insert(seq); +} + void RecoveryStrategy::clear() { rtx_state_.clear(); rtx_timers_.clear(); @@ -236,6 +259,9 @@ void RecoveryStrategy::retransmit() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx for sequence " << seq << ", next send in " << (rtx_it->second.next_send_ - now); + + // if fec is on increase the number of RTX send during fec + if (fec_on_) rtx_during_fec_++; send_rtx_callback_(seq); sent_counter++; } @@ -306,7 +332,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) { } // fec functions -uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) { +uint32_t RecoveryStrategy::computeFecPacketsToAsk() { double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in % if (loss_rate > 95) loss_rate = 95; // max loss rate @@ -365,21 +391,25 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) { void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, std::optional<bool> fec_on) { if (rtx_on) rtx_on_ = *rtx_on; - if (fec_on) fec_on_ = *fec_on; + if (fec_on) { + if (fec_on_ == false && (*fec_on) == true) { // turn on fec + // reset the number of RTX sent during fec + rtx_during_fec_ = 0; + } + fec_on_ = *fec_on; + } - if (*callback_) { - notification::RecoveryStrategy strategy = - notification::RecoveryStrategy::RECOVERY_OFF; + notification::RecoveryStrategy strategy = + notification::RecoveryStrategy::RECOVERY_OFF; - if (rtx_on_ && fec_on_) - strategy = notification::RecoveryStrategy::RTX_AND_FEC; - else if (rtx_on_) - strategy = notification::RecoveryStrategy::RTX_ONLY; - else if (fec_on_) - strategy = notification::RecoveryStrategy::FEC_ONLY; + if (rtx_on_ && fec_on_) + strategy = notification::RecoveryStrategy::RTX_AND_FEC; + else if (rtx_on_) + strategy = notification::RecoveryStrategy::RTX_ONLY; + else if (fec_on_) + strategy = notification::RecoveryStrategy::FEC_ONLY; - (*callback_)(strategy); - } + callback_(strategy); } // common functions @@ -392,6 +422,12 @@ void RecoveryStrategy::removePacketState(uint32_t seq) { return; } + auto it_nack = nacked_seq_.find(seq); + if (it_nack != nacked_seq_.end()) { + nacked_seq_.erase(it_nack); + return; + } + deleteRtx(seq); } @@ -406,7 +442,6 @@ void RecoveryStrategy::reduceFec() { uint32_t bin = ceil(loss_rate / 5.0) - 1; if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) { fec_per_loss_rate_[bin].fec_to_ask--; - // std::cout << "reduce fec to ask for bin " << bin << std::endl; } } } diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h index 9ffc69a1b..482aedc9d 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h @@ -44,7 +44,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, bool use_rtx, bool use_fec, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategy(RecoveryStrategy &&rs); @@ -56,8 +56,6 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; } void setFecParams(uint32_t n, uint32_t k); - void tunrOnRecovery() { recovery_on_ = true; } - bool isRtx(uint32_t seq) { if (rtx_state_.find(seq) != rtx_state_.end()) return true; return false; @@ -68,12 +66,22 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { return false; } + bool wasNacked(uint32_t seq) { + if (nacked_seq_.find(seq) != nacked_seq_.end()) return true; + return false; + } + bool isRtxOn() { return rtx_on_; } + bool isFecOn() { return fec_on_; } RTCState *getState() { return state_; } bool lossDetected(uint32_t seq); + void notifyNewLossDetedcted(uint32_t seq); + void requestPossibleLostPacket(uint32_t seq); + void receivedFutureNack(uint32_t seq); void clear(); + virtual void turnOnRecovery() = 0; virtual void onNewRound(bool in_sync) = 0; virtual void newPacketLoss(uint32_t seq) = 0; virtual void receivedPacket(uint32_t seq) = 0; @@ -96,7 +104,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { void deleteRtx(uint32_t seq); // fec functions - uint32_t computeFecPacketsToAsk(bool in_sync); + uint32_t computeFecPacketsToAsk(); // common functons void removePacketState(uint32_t seq); @@ -105,6 +113,12 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { bool rtx_on_; bool fec_on_; + // number of RTX sent after fec turned on + // this is used to take into account jitter and out of order packets + // if we detect losses but we do not sent any RTX it means that the holes in + // the sequence are caused by the jitter + uint32_t rtx_during_fec_; + // this map keeps track of the retransmitted interest, ordered from the oldest // to the newest one. the state contains the timer of the first send of the // interest (from pendingIntetests_), the timer of the next send (key of the @@ -117,6 +131,13 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { // lost packets that will be recovered with fec std::unordered_set<uint32_t> recover_with_fec_; + // packet for which we recived a future nack + // in case we detect a loss for a nacked packet we send an RTX but we do not + // increase the loss counter. this is done because it may happen that the + // producer rate checkes over time and in flight interest may be satified by + // data packet after the reception of nacks + std::unordered_set<uint32_t> nacked_seq_; + // rtx vars std::unique_ptr<asio::steady_timer> timer_; uint64_t next_rtx_timer_; @@ -144,7 +165,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { uint32_t round_id_; // number of rounds uint32_t last_fec_used_; std::vector<fec_state_> fec_per_loss_rate_; - interface::StrategyCallback *callback_; + interface::StrategyCallback callback_; }; } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc index e2c60ca77..4be751ec9 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc @@ -25,9 +25,9 @@ namespace rtc { RecoveryStrategyDelayBased::RecoveryStrategyDelayBased( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback) + interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - external_callback), // start with rtx + std::move(external_callback)), // start with rtx congestion_state_(false), probing_state_(false), switch_rounds_(0) {} @@ -37,22 +37,40 @@ RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs) setRtxFec(true, false); // we have to re-init congestion and // probing + switch_rounds_ = 0; congestion_state_ = false; probing_state_ = false; } RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {} +void RecoveryStrategyDelayBased::turnOnRecovery() { + recovery_on_ = true; + uint64_t rtt = state_->getMinRTT(); + uint32_t fec_to_ask = computeFecPacketsToAsk(); + if (rtt > 80 && fec_to_ask != 0) { + // we need to start FEC (see fec only strategy for more details) + setRtxFec(true, true); + rtx_during_fec_ = 1; // avoid to stop fec + indexer_->setNFec(fec_to_ask); + } else { + // use RTX + setRtxFec(true, false); + switch_rounds_ = 0; + } +} + void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) { if (fec_to_ask == 0) { setRtxFec(true, false); switch_rounds_ = 0; } else { switch_rounds_++; - if (switch_rounds_ >= 5) { + if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) && + rtx_during_fec_ != 0) { // go to fec only if it is needed (RTX are on) setRtxFec(false, true); } else { - setRtxFec({}, true); + setRtxFec(true, true); } } } @@ -76,9 +94,13 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { // switch from rtx to fec or keep use fec. Notice that if some rtx are // waiting to be scheduled, they will be sent normally, but no new rtx will // be created If the loss rate is 0 keep to use RTX. - uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + uint32_t fec_to_ask = computeFecPacketsToAsk(); softSwitchToFec(fec_to_ask); - indexer_->setNFec(fec_to_ask); + if (rtx_during_fec_ == 0) // if we do not send any RTX the losses + // registered may be due to jitter + indexer_->setNFec(0); + else + indexer_->setNFec(fec_to_ask); return; } @@ -112,7 +134,7 @@ void RecoveryStrategyDelayBased::probing() { // for the moment ask for all fec and exit the probing phase probing_state_ = false; setRtxFec(false, true); - indexer_->setNFec(computeFecPacketsToAsk(true)); + indexer_->setNFec(computeFecPacketsToAsk()); } } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h index 0dd199965..5ca90f4cb 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.h +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h @@ -26,12 +26,13 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy { public: RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategyDelayBased(RecoveryStrategy &&rs); ~RecoveryStrategyDelayBased(); + void turnOnRecovery(); void onNewRound(bool in_sync); void newPacketLoss(uint32_t seq); void receivedPacket(uint32_t seq); diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc index 36d8e39f0..c44212bda 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc @@ -25,22 +25,40 @@ namespace rtc { RecoveryStrategyFecOnly::RecoveryStrategyFecOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback) - : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, - external_callback), + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + std::move(external_callback)), congestion_state_(false), probing_state_(false), switch_rounds_(0) {} RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { - setRtxFec(false, true); + setRtxFec(true, false); + switch_rounds_ = 0; congestion_state_ = false; probing_state_ = false; } RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {} +void RecoveryStrategyFecOnly::turnOnRecovery() { + recovery_on_ = true; + // init strategy + uint32_t fec_to_ask = computeFecPacketsToAsk(); + if (fec_to_ask > 0) { + // the probing phase detected a lossy link. we immedialty start the fec and + // we disable the check to prevent to send fec packets before RTX. in fact + // here we know that we have losses and it is not a problem of OOO packets + setRtxFec(true, true); + rtx_during_fec_ = 1; // avoid to stop fec + indexer_->setNFec(fec_to_ask); + } else { + // keep only RTX on + setRtxFec(true, true); + } +} + void RecoveryStrategyFecOnly::onNewRound(bool in_sync) { if (!recovery_on_) { indexer_->setNFec(0); @@ -66,7 +84,7 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) { if (probing_state_) { probing(); } else { - uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + uint32_t fec_to_ask = computeFecPacketsToAsk(); // If fec_to_ask == 0 we use rtx even if in these strategy we use only fec. // In this way the first packet loss that triggers the usage of fec can be // recovered using rtx, otherwise it will always be lost @@ -75,13 +93,19 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) { switch_rounds_ = 0; } else { switch_rounds_++; - if (switch_rounds_ >= 5) { + if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) && + rtx_during_fec_ != + 0) { // go to fec only if it is needed (RTX are on) setRtxFec(false, true); } else { - setRtxFec({}, true); + setRtxFec(true, true); // keep both } } - indexer_->setNFec(fec_to_ask); + if (rtx_during_fec_ == 0) // if we do not send any RTX the losses + // registered may be due to jitter + indexer_->setNFec(0); + else + indexer_->setNFec(fec_to_ask); } } @@ -92,7 +116,7 @@ void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) { if (!state_->isPending(seq) && !indexer_->isFec(seq)) { addNewRtx(seq, true); } else { - // if not pending add rtc + // if not pending add to list to recover with fec recover_with_fec_.insert(seq); state_->onPossibleLossWithNoRtx(seq); } @@ -107,7 +131,7 @@ void RecoveryStrategyFecOnly::probing() { // TODO // for the moment ask for all fec and exit the probing phase probing_state_ = false; - uint32_t fec_to_ask = computeFecPacketsToAsk(true); + uint32_t fec_to_ask = computeFecPacketsToAsk(); indexer_->setNFec(fec_to_ask); } diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h index 37b505d35..1ab78b842 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h @@ -26,12 +26,13 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy { public: RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategyFecOnly(RecoveryStrategy &&rs); ~RecoveryStrategyFecOnly(); + void turnOnRecovery(); void onNewRound(bool in_sync); void newPacketLoss(uint32_t seq); void receivedPacket(uint32_t seq); diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc index bd153d209..48dd3e34f 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc @@ -25,9 +25,9 @@ namespace rtc { RecoveryStrategyLowRate::RecoveryStrategyLowRate( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback) + interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, - external_callback), // start with fec + std::move(external_callback)), // start with fec fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec rtx_allowed_consecutive_rounds_(0) { initSwitchVector(); @@ -66,7 +66,7 @@ void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec, } void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { - uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + uint32_t fec_to_ask = computeFecPacketsToAsk(); if (fec_to_ask == 0) { // fec is off, turn on RTX immediatly to avoid packet losses setRecoveryParameters(true, false, 0); @@ -128,6 +128,11 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { } } +void RecoveryStrategyLowRate::turnOnRecovery() { + recovery_on_ = 1; + // the stategy will be init in the new round function +} + void RecoveryStrategyLowRate::onNewRound(bool in_sync) { if (!recovery_on_) { // disable fec so that no extra packet will be sent diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h index f0c7bd0d5..d66b197e2 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h @@ -34,12 +34,13 @@ class RecoveryStrategyLowRate : public RecoveryStrategy { public: RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategyLowRate(RecoveryStrategy &&rs); ~RecoveryStrategyLowRate(); + void turnOnRecovery(); void onNewRound(bool in_sync); void newPacketLoss(uint32_t seq); void receivedPacket(uint32_t seq); diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc index 499e978f1..16b14eff6 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc @@ -25,9 +25,9 @@ namespace rtc { RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback) + interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, false, - external_callback) {} + std::move(external_callback)) {} RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { @@ -36,6 +36,10 @@ RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {} +void RecoveryStrategyRecoveryOff::turnOnRecovery() { + // nothing to do + return; +} void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) { // nothing to do return; diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h index 98cd1e6a5..3a9e71e7d 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h @@ -26,12 +26,13 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy { public: RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs); ~RecoveryStrategyRecoveryOff(); + void turnOnRecovery(); void onNewRound(bool in_sync); void newPacketLoss(uint32_t seq); void receivedPacket(uint32_t seq); diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc index c1ae9b53d..8e5db5439 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc @@ -25,9 +25,9 @@ namespace rtc { RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback) + interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - external_callback) {} + std::move(external_callback)) {} RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { @@ -36,6 +36,11 @@ RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {} +void RecoveryStrategyRtxOnly::turnOnRecovery() { + recovery_on_ = true; + setRtxFec(true, false); +} + void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) { // nothing to do return; diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h index 7ae909454..e90e5ba13 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h @@ -26,12 +26,13 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy { public: RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - interface::StrategyCallback *external_callback); + interface::StrategyCallback &&external_callback); RecoveryStrategyRtxOnly(RecoveryStrategy &&rs); ~RecoveryStrategyRtxOnly(); + void turnOnRecovery(); void onNewRound(bool in_sync); void newPacketLoss(uint32_t seq); void receivedPacket(uint32_t seq); diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 6a21531f8..5b3b5e4c3 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -56,7 +56,6 @@ void RTCState::initParams() { last_seq_nacked_ = 0; loss_rate_ = 0.0; avg_loss_rate_ = -1.0; - max_loss_rate_ = 0.0; last_round_loss_rate_ = 0.0; // loss rate per sec @@ -85,14 +84,13 @@ void RTCState::initParams() { fec_recovered_rate_ = 0.0; // nack counter - nack_on_last_round_ = false; + past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; // packets counter received_packets_last_round_ = 0; received_data_last_round_ = 0; received_data_from_cache_ = 0; - data_from_cache_rate_ = 0; sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; @@ -103,7 +101,7 @@ void RTCState::initParams() { last_production_seq_ = 0; producer_is_active_ = false; - last_prod_update_ = 0; + last_prod_update_seq_ = 0; // paths stats path_table_.clear(); @@ -180,7 +178,9 @@ void RTCState::onLossDetected(uint32_t seq) { PacketState state = getPacketState(seq); // if the packet is already marked with a state, do nothing - if (state == PacketState::UNKNOWN) { + // to be considered lost the packet must be pending + if (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end()) { packets_lost_++; addToPacketCache(seq, PacketState::LOST); } @@ -225,8 +225,8 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, core::ParamsRTC params = RTCState::getDataParams(content_object); - if (last_prod_update_ < params.timestamp) { - last_prod_update_ = params.timestamp; + if (last_prod_update_seq_ < seq) { + last_prod_update_seq_ = seq; production_rate_ = (double)params.prod_rate; } @@ -267,14 +267,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, uint32_t seq = nack.getName().getSuffix(); struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); - uint64_t production_time = nack_pkt->getTimestamp(); uint32_t production_seq = nack_pkt->getProductionSegment(); uint32_t production_rate = nack_pkt->getProductionRate(); if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || - last_prod_update_ < production_time) { + last_prod_update_seq_ < production_seq) { // update production rate - last_prod_update_ = production_time; last_production_seq_ = production_seq; production_rate_ = (double)production_rate; } @@ -282,7 +280,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, if (compute_stats) { // this is not an RTX updatePathStats(nack, true); - nack_on_last_round_ = true; } // for statistics pourpose we log all nacks, also the one received for @@ -297,6 +294,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; DLOG_IF(INFO, VLOG_IS_ON(3)) << "lost packet " << seq << " beacuse of a past nack"; + if (compute_stats) past_nack_on_last_round_ = true; onPacketLost(seq); } else if (seq > production_seq) { // future nack @@ -317,29 +315,15 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, } } - // the producer is responding - // we consider it active only if the production rate is not 0 - // or the production sequence number is not 1 - if (production_rate_ != 0 || production_seq != 1) { - producer_is_active_ = true; - } - received_packets_last_round_++; } void RTCState::onPacketLost(uint32_t seq) { -#if 0 - DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost"; - auto it = pending_interests_.find(seq); - if (it != pending_interests_.end()) { - // this packet was never retransmitted so it does - // not appear in the loss count - packets_lost_++; - } -#endif if (!indexer_->isFec(seq)) { PacketState state = getPacketState(seq); - if (state == PacketState::LOST || state == PacketState::UNKNOWN) { + if (state == PacketState::LOST || + (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end())) { definitely_lost_pkt_++; DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } @@ -350,7 +334,12 @@ void RTCState::onPacketLost(uint32_t seq) { void RTCState::onPacketRecoveredRtx(uint32_t seq) { packets_sent_to_app_++; if (seq > highest_seq_received_) highest_seq_received_ = seq; - losses_recovered_++; + + // increase the recovered packet counter only if the packet was marked as LOST + // before. + PacketState state = getPacketState(seq); + if (state == PacketState::LOST) losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); } @@ -371,19 +360,30 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where - if (getPacketState(seq) == PacketState::UNKNOWN) - onLossDetected(seq); // the pkt was lost but didn't account for it yet + // the packet could be not marked as lost yet. onLossDetected checks if add in + // the packet in the lost count or not + onLossDetected(seq); addRecvOrLost(seq, PacketState::RECEIVED); } bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params = RTCState::getProbeParams(probe); + + bool is_valid = true; + uint32_t max = UINT32_MAX; + if (params.prod_rate == max) is_valid = false; uint64_t rtt; - rtt = probe_handler_->getRtt(seq); + rtt = probe_handler_->getRtt(seq, is_valid); if (rtt == 0) return false; // this is not a valid probe + if (!is_valid) return false; // not a valid probe + + // if we are here the producer is active + producer_is_active_ = true; + // Like for data and nacks update the path stats. Here the RTT is computed // by the probe handler. Both probes for rtt and bw are good to estimate // info on the path. @@ -406,24 +406,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint64_t now = utils::SteadyTime::nowMs().count(); - core::ParamsRTC params = RTCState::getProbeParams(probe); - int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); - if (last_prod_update_ < params.timestamp) { + if (last_prod_update_seq_ < params.prod_seg) { last_production_seq_ = params.prod_seg; - last_prod_update_ = params.timestamp; production_rate_ = (double)params.prod_rate; } - // the producer is responding - // we consider it active only if the production rate is not 0 - // or the production sequence numner is not 1 - if (production_rate_ != 0 || params.prod_seg != 1) { - producer_is_active_ = true; - } - // check for init RTT. if received_probes_ is equal to 0 schedule a timer to // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't // wait forever @@ -453,12 +443,12 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { void RTCState::onJumpForward(uint32_t next_seq) { for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; seq++) { - auto it = pending_interests_.find(seq); PacketState packet_state = getPacketState(seq); - if (it == pending_interests_.end() && - packet_state != PacketState::RECEIVED && + if (packet_state != PacketState::RECEIVED && packet_state != PacketState::DEFINITELY_LOST) { - onLossDetected(seq); + // here we considere the packet as definitely lost whitout increase the + // lost packet counter because this loss is not due to the network + // condition but the transport wants to skip the packet onPacketLost(seq); } } @@ -491,29 +481,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) { fec_recovered_rate_ = (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); -#if 0 - // search for an active path. There should be only one active path (meaning a - // path that leads to the producer socket -no cache- and from which we are - // currently getting data packets) at any time. However it may happen that - // there are mulitple active paths in case of mobility (the old path will - // remain active for a short ammount of time). The main path is selected as - // the active path from where the consumer received the latest data packet - - uint64_t last_packet_ts = 0; - main_path_ = nullptr; - - for (auto it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->roundEnd(); - if (it->second->isActive()) { - uint64_t ts = it->second->getLastPacketTS(); - if (ts > last_packet_ts) { - last_packet_ts = ts; - main_path_ = it->second; - } - } - } -#endif - // search for an active path. Is it possible to have multiple path that are // used at the same time. We use as reference path the one from where we gets // more packets. This means that the path should have better lantecy or less @@ -544,7 +511,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { updateLossRate(in_sync); // handle nacks - if (!nack_on_last_round_ && received_bytes_ > 0) { + if (!past_nack_on_last_round_ && received_bytes_ > 0) { rounds_without_nacks_++; } else { rounds_without_nacks_ = 0; @@ -561,14 +528,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } - // compute cache/producer ratio - if (received_data_last_round_ != 0) { - double new_rate = - (double)received_data_from_cache_ / (double)received_data_last_round_; - data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA + - (new_rate * (1 - MOVING_AVG_ALPHA)); - } - // reset counters received_bytes_ = 0; received_fec_bytes_ = 0; @@ -578,7 +537,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { losses_recovered_ = 0; first_seq_in_round_ = highest_seq_received_; - nack_on_last_round_ = false; + past_nack_on_last_round_ = false; received_nacks_last_round_ = 0; received_packets_last_round_ = 0; @@ -700,6 +659,7 @@ void RTCState::updateLossRate(bool in_sync) { expected_packets_ += ((highest_seq_received_ - first_seq_in_round_) - fec_packets); } else { + expected_packets_ = 0; packets_sent_to_app_ = 0; } @@ -715,7 +675,6 @@ void RTCState::updateLossRate(bool in_sync) { (double)((double)(lost_per_sec_) / (double)total_expected_packets_); loss_history_.pushBack(per_sec_loss_rate_); - max_loss_rate_ = getMaxLoss(); if (in_sync && expected_packets_ != 0) { // compute residual loss rate @@ -778,7 +737,9 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { // however we may need to increse the number or lost packets // XXX: in case we want to use rtx to recover fec packets, // this may prevent to detect a packet loss and no rtx will be sent - onLossDetected(i); + if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) { + onLossDetected(i); + } } else { // this is a data packet and we need to get it break; @@ -806,8 +767,9 @@ void RTCState::setInitRttTimer(uint32_t wait) { } void RTCState::checkInitRttTimer() { - if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { - // we didn't received enough probes, restart + if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV || + probe_handler_->getProbeLossRate() == 1.0) { + // we didn't received enough probes or they were not valid, restart received_probes_ = 0; probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); @@ -819,7 +781,6 @@ void RTCState::checkInitRttTimer() { init_rtt_ = true; main_path_->roundEnd(); loss_history_.pushBack(probe_handler_->getProbeLossRate()); - max_loss_rate_ = getMaxLoss(); probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); @@ -829,17 +790,12 @@ void RTCState::checkInitRttTimer() { double prod_rate = getProducerRate(); double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); - uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt)); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; discovered_rtt_callback_(); } -double RTCState::getMaxLoss() { - if (loss_history_.size() != 0) return loss_history_.begin(); - return 0; -} - core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); core::ParamsRTC params; diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index 8bf48ccc2..4bd2f76a0 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -162,7 +162,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> { double getPerRoundLossRate() const { return loss_rate_; } double getPerSecondLossRate() const { return per_sec_loss_rate_; } double getAvgLossRate() const { return avg_loss_rate_; } - double getMaxLossRate() const { return max_loss_rate_; } + double getMaxLossRate() const { + if (loss_history_.size() != 0) return loss_history_.begin(); + return 0; + } + double getLastRoundLossRate() const { return last_round_loss_rate_; } double getResidualLossRate() const { return residual_loss_rate_; } @@ -177,8 +181,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> { return highest_seq_received_in_order_; } - double getMaxLoss(); - // fec packets uint32_t getReceivedFecPackets() const { return received_fec_pkt_; } uint32_t getPendingFecPackets() const { return pending_fec_pkt_; } @@ -213,7 +215,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> { bool isProducerActive() const { return producer_is_active_; } // packets from cache - double getPacketFromCacheRatio() const { return data_from_cache_rate_; } + // this should be called at the end of a round beacuse otherwise we may have + // not enough packets to get a good stat + double getPacketFromCacheRatio() const { + if (received_data_last_round_ == 0) return 0; + return (double)received_data_from_cache_ / + (double)received_data_last_round_; + } PendingInterestsMap::iterator getPendingInterestsMapBegin() { return pending_interests_.begin(); @@ -286,7 +294,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> { uint32_t last_seq_nacked_; // segment for which we got an oldNack double loss_rate_; double avg_loss_rate_; - double max_loss_rate_; double last_round_loss_rate_; utils::MaxFilter<double> loss_history_; @@ -314,17 +321,16 @@ class RTCState : public std::enable_shared_from_this<RTCState> { double fec_recovered_rate_; // rate recovered using fec // nack counters - // the bool takes tracks only about the valid nacks (no rtx) and it is used to - // switch between the states. Instead received_nacks_last_round_ logs all the - // nacks for statistics - bool nack_on_last_round_; + // the bool takes tracks only about the valid past nacks (no rtx) and it is + // used to switch between the states. Instead received_nacks_last_round_ logs + // all the nacks for statistics + bool past_nack_on_last_round_; uint32_t received_nacks_last_round_; // packets counters uint32_t received_packets_last_round_; uint32_t received_data_last_round_; uint32_t received_data_from_cache_; - double data_from_cache_rate_; uint32_t sent_interests_last_round_; uint32_t sent_rtx_last_round_; @@ -344,10 +350,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // producer state bool producer_is_active_; // the prodcuer is active if we receive some packets - uint32_t - last_production_seq_; // last production seq received by the producer - uint64_t last_prod_update_; // timestamp of the last packets used to update - // stats from the producer + uint32_t last_production_seq_; // last production seq received by the + // producer used to init the sync protcol + uint32_t last_prod_update_seq_; // seq number of the last packet used to + // update the update from the producer. + // assumption: the highest seq number carries + // the most up to date info. in case of + // probes we look at the produced seq number // paths stats std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_; diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc index 29968dd02..7b6330a1f 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.cc +++ b/libtransport/src/protocols/rtc/rtc_verifier.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2017-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -22,8 +22,11 @@ namespace protocol { namespace rtc { RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_delay) - : verifier_(verifier), max_unverified_delay_(max_unverified_delay) {} + uint32_t max_unverified_interval, + double max_unverified_ratio) + : verifier_(verifier), + max_unverified_interval_(max_unverified_interval), + max_unverified_ratio_(max_unverified_ratio) {} void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) { rtc_state_ = rtc_state; @@ -33,15 +36,20 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) { verifier_ = verifier; } -void RTCVerifier::setMaxUnverifiedDelay(uint32_t max_unverified_delay) { - max_unverified_delay_ = max_unverified_delay; +void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) { + max_unverified_interval_ = max_unverified_interval; +} + +void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) { + max_unverified_ratio_ = max_unverified_ratio; } auth::VerificationPolicy RTCVerifier::verify( core::ContentObject &content_object, bool is_fec) { - uint32_t suffix = content_object.getName().getSuffix(); - core::PayloadType payload_type = content_object.getPayloadType(); + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy default_policy = auth::VerificationPolicy::ABORT; + core::PayloadType payload_type = content_object.getPayloadType(); bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE; bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; bool is_manifest = !is_probe && !is_nack && !is_fec && @@ -55,29 +63,31 @@ auth::VerificationPolicy RTCVerifier::verify( if (is_data) return verifyData(content_object); if (is_manifest) return verifyManifest(content_object); - auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - verifier_->callVerificationFailedCallback(suffix, policy); - return policy; + verifier_->callVerificationFailedCallback(suffix, default_policy); + return default_policy; } auth::VerificationPolicy RTCVerifier::verifyProbe( core::ContentObject &content_object) { - switch (ProbeHandler::getProbeType(content_object.getName().getSuffix())) { - case ProbeType::INIT: { - auth::VerificationPolicy policy = verifyManifest(content_object); - if (policy != auth::VerificationPolicy::ACCEPT) { - return policy; + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + + switch (ProbeHandler::getProbeType(suffix)) { + case ProbeType::INIT: + policy = verifyManifest(content_object); + if (policy == auth::VerificationPolicy::ACCEPT) { + policy = processManifest(content_object); } - return processManifest(content_object); - } + break; case ProbeType::RTT: - return verifyNack(content_object); + policy = verifyNack(content_object); + break; default: - auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - verifier_->callVerificationFailedCallback( - content_object.getName().getSuffix(), policy); - return policy; + verifier_->callVerificationFailedCallback(suffix, policy); + break; } + + return policy; } auth::VerificationPolicy RTCVerifier::verifyNack( @@ -92,28 +102,30 @@ auth::VerificationPolicy RTCVerifier::verifyFec( auth::VerificationPolicy RTCVerifier::verifyData( core::ContentObject &content_object) { - uint32_t suffix = content_object.getName().getSuffix(); - if (_is_ah(content_object.getFormat())) { return verifier_->verifyPackets(&content_object); } - unverified_bytes_[suffix] = - content_object.headerSize() + content_object.payloadSize(); - unverified_packets_[suffix] = - content_object.computeDigest(manifest_hash_algo_); + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + Timestamp now = utils::SteadyTime::nowMs().count(); + + // Flush old packets + Timestamp oldest = flush_packets(now); - // An alert is raised when too much packets remain unverified - if (getTotalUnverified() > max_unverified_bytes_) { - unverified_bytes_.clear(); - unverified_packets_.clear(); + // Add packet to map of unverified packets + packets_unverif_.add( + {.suffix = suffix, .timestamp = now, .size = content_object.length()}, + content_object.computeDigest(manifest_hash_algo_)); - auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - verifier_->callVerificationFailedCallback(suffix, policy); - return policy; + // Check that the ratio of unverified packets stays below the limit + if (now - oldest < max_unverified_interval_ || + getBufferRatio() < max_unverified_ratio_) { + policy = auth::VerificationPolicy::ACCEPT; } - return auth::VerificationPolicy::ACCEPT; + verifier_->callVerificationFailedCallback(suffix, policy); + return policy; } auth::VerificationPolicy RTCVerifier::verifyManifest( @@ -123,8 +135,10 @@ auth::VerificationPolicy RTCVerifier::verifyManifest( auth::VerificationPolicy RTCVerifier::processManifest( core::ContentObject &content_object) { - uint32_t suffix = content_object.getName().getSuffix(); + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT; + // Decode manifest core::ContentObjectManifest manifest(content_object); manifest.decode(); @@ -133,65 +147,62 @@ auth::VerificationPolicy RTCVerifier::processManifest( last_manifest_ = suffix; } - // Extract parameters + // Extract hash algorithm and hashes manifest_hash_algo_ = manifest.getHashAlgorithm(); - core::ParamsRTC params = manifest.getParamsRTC(); - - if (params.prod_rate > 0) { - max_unverified_bytes_ = static_cast<uint64_t>( - (max_unverified_delay_ / 1000.0) * params.prod_rate); - } - - if (max_unverified_bytes_ == 0 || !rtc_state_) { - auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - verifier_->callVerificationFailedCallback(suffix, policy); - return policy; - } - - // Extract hashes auth::Verifier::SuffixMap suffix_map = core::ContentObjectManifest::getSuffixMap(&manifest); // Return early if the manifest is empty if (suffix_map.empty()) { - return auth::VerificationPolicy::ACCEPT; + verifier_->callVerificationFailedCallback(suffix, accept_policy); + return accept_policy; } - // Remove lost packets from digest map + // Add hashes to map of all manifest hashes manifest_digests_.insert(suffix_map.begin(), suffix_map.end()); + + // Remove discarded and definitely lost packets from digest map for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) { + auto it_erased = packets_unverif_erased_.find(it->first); + + if (it_erased != packets_unverif_erased_.end()) { + packets_unverif_erased_.erase(it_erased); + it = manifest_digests_.erase(it); + continue; + } + if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) { - unverified_packets_.erase(it->first); - unverified_bytes_.erase(it->first); it = manifest_digests_.erase(it); - } else { - ++it; + continue; } + + ++it; } // Verify packets auth::Verifier::PolicyMap policies = - verifier_->verifyHashes(unverified_packets_, manifest_digests_); + verifier_->verifyHashes(packets_unverif_.suffixMap(), manifest_digests_); - for (const auto &policy : policies) { - switch (policy.second) { + for (const auto &p : policies) { + switch (p.second) { case auth::VerificationPolicy::ACCEPT: { - manifest_digests_.erase(policy.first); - unverified_packets_.erase(policy.first); - unverified_bytes_.erase(policy.first); + auto packet_unverif_it = packets_unverif_.packetIt(p.first); + Packet packet_verif = *packet_unverif_it; + packets_unverif_.remove(packet_unverif_it); + packets_verif_.add(packet_verif); + manifest_digests_.erase(p.first); break; } case auth::VerificationPolicy::UNKNOWN: break; case auth::VerificationPolicy::DROP: case auth::VerificationPolicy::ABORT: - auth::VerificationPolicy p = policy.second; - verifier_->callVerificationFailedCallback(policy.first, p); - return p; + return p.second; } } - return auth::VerificationPolicy::ACCEPT; + verifier_->callVerificationFailedCallback(suffix, accept_policy); + return accept_policy; } void RTCVerifier::onDataRecoveredFec(uint32_t suffix) { @@ -203,35 +214,101 @@ void RTCVerifier::onJumpForward(uint32_t next_suffix) { return; } - // When we jump forward in the suffix sequence, we remove packets that - // probably won't be verified. Those packets have a suffix in the range - // [last_manifest_ + 1, next_suffix[. - for (auto it = unverified_packets_.begin(); - it != unverified_packets_.end();) { - if (it->first > last_manifest_) { - unverified_bytes_.erase(it->first); - it = unverified_packets_.erase(it); - } else { - ++it; + // When we jump forward in the suffix sequence, we remove packets that won't + // be verified. Those packets have a suffix in the range [last_manifest_ + 1, + // next_suffix[. + for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix; + ++suffix) { + auto packet_it = packets_unverif_.packetIt(suffix); + if (packet_it != packets_unverif_.set().end()) { + packets_unverif_.remove(packet_it); } } } -uint32_t RTCVerifier::getTotalUnverified() const { - uint32_t total = 0; +double RTCVerifier::getBufferRatio() const { + size_t total = packets_verif_.size() + packets_unverif_.size(); + double total_unverified = static_cast<double>(packets_unverif_.size()); + return total ? total_unverified / total : 0.0; +} + +RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) { + Timestamp oldest_verified = packets_verif_.set().empty() + ? now + : packets_verif_.set().begin()->timestamp; + Timestamp oldest_unverified = packets_unverif_.set().empty() + ? now + : packets_unverif_.set().begin()->timestamp; + + // Prune verified packets older than the unverified interval + for (auto it = packets_verif_.set().begin(); + it != packets_verif_.set().end();) { + if (now - it->timestamp < max_unverified_interval_) { + break; + } + it = packets_verif_.remove(it); + } - for (auto bytes : unverified_bytes_) { - if (bytes.second > UINT32_MAX - total) { - total = UINT32_MAX; + // Prune unverified packets older than the unverified interval + for (auto it = packets_unverif_.set().begin(); + it != packets_unverif_.set().end();) { + if (now - it->timestamp < max_unverified_interval_) { break; } - total += bytes.second; + packets_unverif_erased_.insert(it->suffix); + it = packets_unverif_.remove(it); + } + + return std::min(oldest_verified, oldest_unverified); +} + +std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add( + const Packet &packet) { + auto inserted = packets_.insert(packet); + size_ += inserted.second ? packet.size : 0; + return inserted; +} + +RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove( + PacketSet::iterator packet_it) { + size_ -= packet_it->size; + return packets_.erase(packet_it); +} + +const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const { + return packets_; +}; + +size_t RTCVerifier::Packets::size() const { return size_; }; + +std::pair<RTCVerifier::PacketSet::iterator, bool> +RTCVerifier::PacketsUnverif::add(const Packet &packet, + const auth::CryptoHash &digest) { + auto inserted = add(packet); + if (inserted.second) { + packets_map_[packet.suffix] = inserted.first; + digests_map_[packet.suffix] = digest; } + return inserted; +} - return total; +RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove( + PacketSet::iterator packet_it) { + size_ -= packet_it->size; + packets_map_.erase(packet_it->suffix); + digests_map_.erase(packet_it->suffix); + return packets_.erase(packet_it); } -uint32_t RTCVerifier::getMaxUnverified() const { return max_unverified_bytes_; } +RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt( + auth::Suffix suffix) { + return packets_map_.at(suffix); +}; + +const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap() + const { + return digests_map_; +} } // end namespace rtc } // end namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h index 596bd8536..098984057 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.h +++ b/libtransport/src/protocols/rtc/rtc_verifier.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2017-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -26,8 +26,9 @@ namespace rtc { class RTCVerifier { public: - RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_delay); + explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier, + uint32_t max_unverified_interval, + double max_unverified_ratio); virtual ~RTCVerifier() = default; @@ -35,13 +36,9 @@ class RTCVerifier { void setVerifier(std::shared_ptr<auth::Verifier> verifier); - void setMaxUnverifiedDelay(uint32_t max_unverified_delay); + void setMaxUnverifiedInterval(uint32_t max_unverified_interval); - void onDataRecoveredFec(uint32_t suffix); - void onJumpForward(uint32_t next_suffix); - - uint32_t getTotalUnverified() const; - uint32_t getMaxUnverified() const; + void setMaxUnverifiedRatio(double max_unverified_ratio); auth::VerificationPolicy verify(core::ContentObject &content_object, bool is_fec = false); @@ -53,27 +50,84 @@ class RTCVerifier { auth::VerificationPolicy processManifest(core::ContentObject &content_object); + void onDataRecoveredFec(uint32_t suffix); + void onJumpForward(uint32_t next_suffix); + + double getBufferRatio() const; + protected: + struct Packet; + using Timestamp = uint64_t; + using PacketSet = std::set<Packet>; + + struct Packet { + auth::Suffix suffix; + Timestamp timestamp; + size_t size; + + bool operator==(const Packet &b) const { + return timestamp == b.timestamp && suffix == b.suffix; + } + bool operator<(const Packet &b) const { + return timestamp == b.timestamp ? suffix < b.suffix + : timestamp < b.timestamp; + } + }; + + class Packets { + public: + virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet); + virtual PacketSet::iterator remove(PacketSet::iterator packet_it); + const PacketSet &set() const; + size_t size() const; + + protected: + PacketSet packets_; + size_t size_; + }; + + class PacketsVerif : public Packets {}; + + class PacketsUnverif : public Packets { + public: + using Packets::add; + std::pair<PacketSet::iterator, bool> add(const Packet &packet, + const auth::CryptoHash &digest); + PacketSet::iterator remove(PacketSet::iterator packet_it) override; + PacketSet::iterator packetIt(auth::Suffix suffix); + const auth::Verifier::SuffixMap &suffixMap() const; + + private: + std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_; + auth::Verifier::SuffixMap digests_map_; + }; + // The RTC state. std::shared_ptr<RTCState> rtc_state_; // The verifier instance. std::shared_ptr<auth::Verifier> verifier_; + // Window to consider when verifying packets. + uint32_t max_unverified_interval_; + // Ratio of unverified packets over which an alert is triggered. + double max_unverified_ratio_; + // The suffix of the last processed manifest. + auth::Suffix last_manifest_; // Hash algorithm used by manifests. auth::CryptoHashType manifest_hash_algo_; - // The last manifest processed. - auth::Suffix last_manifest_; - // Hold digests extracted from all manifests received. + // Digests extracted from all manifests received. auth::Verifier::SuffixMap manifest_digests_; - // Hold hashes of all content objects received before they are verified. - auth::Verifier::SuffixMap unverified_packets_; - // Hold number of unverified bytes. - std::unordered_map<auth::Suffix, uint32_t> unverified_bytes_; - // Maximum delay (in ms) for an unverified byte to become verifed. - uint32_t max_unverified_delay_; - // Maximum number of unverified bytes before aborting the connection. - uint64_t max_unverified_bytes_; + // Verified packets with timestamp >= now - max_unverified_interval_. + PacketsVerif packets_verif_; + // Unverified packets with timestamp >= now - max_unverified_interval_. + PacketsUnverif packets_unverif_; + // Unverified erased packets with timestamp < now - max_unverified_interval_. + std::unordered_set<auth::Suffix> packets_unverif_erased_; + + // Flushes all packets with timestamp < now - max_unverified_interval_. + // Returns the timestamp of the oldest packet, verified or not. + Timestamp flush_packets(Timestamp now); }; -} // end namespace rtc +} // namespace rtc } // namespace protocol } // namespace transport diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc index f1e49ec0b..a73b9fb7b 100644 --- a/libtransport/src/protocols/transport_protocol.cc +++ b/libtransport/src/protocols/transport_protocol.cc @@ -80,12 +80,6 @@ int TransportProtocol::start() { socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); - std::string fec_type_str = ""; - socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str); - if (fec_type_str != "") { - fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str()); - } - // Set it is the first time we schedule an interest is_first_ = true; diff --git a/libtransport/src/test/test_auth.cc b/libtransport/src/test/test_auth.cc index d7fd55433..0c47dd958 100644 --- a/libtransport/src/test/test_auth.cc +++ b/libtransport/src/test/test_auth.cc @@ -117,13 +117,11 @@ TEST_F(AuthTest, AsymmetricBufferRSA) { std::vector<uint8_t> buffer(payload.begin(), payload.end()); signer->signBuffer(buffer); - std::vector<uint8_t> sig = signer->getSignature(); + utils::MemBuf::Ptr sig = signer->getSignature(); std::shared_ptr<AsymmetricVerifier> verif = std::make_shared<AsymmetricVerifier>(pubKey); - bool res = verif->verifyBuffer( - buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()), - CryptoHashType::SHA256); + bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256); EXPECT_EQ(res, true); } @@ -157,13 +155,11 @@ TEST_F(AuthTest, AsymmetricBufferDSA) { std::vector<uint8_t> buffer(payload.begin(), payload.end()); signer->signBuffer(buffer); - std::vector<uint8_t> sig = signer->getSignature(); + utils::MemBuf::Ptr sig = signer->getSignature(); std::shared_ptr<AsymmetricVerifier> verif = std::make_shared<AsymmetricVerifier>(pubKey); - bool res = verif->verifyBuffer( - buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()), - CryptoHashType::SHA256); + bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256); EXPECT_EQ(res, true); } @@ -233,13 +229,11 @@ TEST_F(AuthTest, AsymmetricBufferECDSA) { std::vector<uint8_t> buffer(payload.begin(), payload.end()); signer->signBuffer(buffer); - std::vector<uint8_t> sig = signer->getSignature(); + utils::MemBuf::Ptr sig = signer->getSignature(); std::shared_ptr<AsymmetricVerifier> verif = std::make_shared<AsymmetricVerifier>(pubKey); - bool res = verif->verifyBuffer( - buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()), - CryptoHashType::SHA256); + bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256); EXPECT_EQ(res, true); } // namespace auth @@ -290,11 +284,9 @@ TEST_F(AuthTest, HMACbuffer) { std::string payload = "bonjour"; std::vector<uint8_t> buffer(payload.begin(), payload.end()); signer->signBuffer(buffer); - std::vector<uint8_t> sig = signer->getSignature(); + utils::MemBuf::Ptr sig = signer->getSignature(); SymmetricVerifier hmac(PASSPHRASE); - bool res = hmac.verifyBuffer( - buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()), - CryptoHashType::SHA256); + bool res = hmac.verifyBuffer(buffer, sig, CryptoHashType::SHA256); EXPECT_EQ(res, true); } diff --git a/libtransport/src/test/test_core_manifest.cc b/libtransport/src/test/test_core_manifest.cc index 23fd5e342..b998ce96b 100644 --- a/libtransport/src/test/test_core_manifest.cc +++ b/libtransport/src/test/test_core_manifest.cc @@ -173,7 +173,7 @@ TEST_F(ManifestTest, SetParamsRTC) { .timestamp = 1, .prod_rate = 2, .prod_seg = 3, - .support_fec = 1, + .fec_type = protocol::fec::FECType::UNKNOWN, }; manifest1_.setParamsRTC(params); diff --git a/libtransport/src/utils/max_filter.h b/libtransport/src/utils/max_filter.h index 7a2c6aace..db1a1a565 100644 --- a/libtransport/src/utils/max_filter.h +++ b/libtransport/src/utils/max_filter.h @@ -28,7 +28,7 @@ class MaxFilter { public: MaxFilter(std::size_t size) : size_(size) {} - std::size_t size() { return by_arrival_.size(); } + std::size_t size() const { return by_arrival_.size(); } template <typename R> void pushBack(R&& value) { @@ -45,9 +45,9 @@ class MaxFilter { by_order_.clear(); } - const T& begin() { return *by_order_.crbegin(); } + const T& begin() const { return *by_order_.crbegin(); } - const T& rBegin() { return *by_order_.rbegin(); } + const T& rBegin() const { return *by_order_.rbegin(); } private: std::multiset<T> by_order_; diff --git a/libtransport/src/utils/min_filter.h b/libtransport/src/utils/min_filter.h index 4c7ae81f1..4a3882601 100644 --- a/libtransport/src/utils/min_filter.h +++ b/libtransport/src/utils/min_filter.h @@ -28,7 +28,7 @@ class MinFilter { public: MinFilter(std::size_t size) : size_(size) {} - std::size_t size() { return by_arrival_.size(); } + std::size_t size() const { return by_arrival_.size(); } template <typename R> void pushBack(R&& value) { @@ -45,9 +45,9 @@ class MinFilter { by_order_.clear(); } - const T& begin() { return *by_order_.cbegin(); } + const T& begin() const { return *by_order_.cbegin(); } - const T& rBegin() { return *by_order_.crbegin(); } + const T& rBegin() const { return *by_order_.crbegin(); } private: std::multiset<T> by_order_; |