diff options
Diffstat (limited to 'libtransport/src/protocols')
54 files changed, 887 insertions, 948 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 3278595b7..b9eaf3bec 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -36,15 +36,6 @@ ByteStreamReassembly::ByteStreamReassembly( index_(Indexer::invalid_index), download_complete_(false) {} -void ByteStreamReassembly::reassemble( - std::unique_ptr<ContentObjectManifest> &&manifest) { - if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) { - received_packets_.emplace( - std::make_pair(manifest->getName().getSuffix(), nullptr)); - assembleContent(); - } -} - void ByteStreamReassembly::reassemble(ContentObject &content_object) { if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) { received_packets_.emplace( diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h index bfcac3181..a1f965d5c 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.h +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -29,9 +29,6 @@ class ByteStreamReassembly : public Reassembly { protected: void reassemble(core::ContentObject &content_object) override; - void reassemble( - std::unique_ptr<core::ContentObjectManifest> &&manifest) override; - void reassemble(utils::MemBuf &buffer, uint32_t suffix) override; bool copyContent(core::ContentObject &content_object); diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index 446ea8b99..e3f0f1336 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -38,7 +38,7 @@ void CbrTransportProtocol::afterContentReception( const Interest &interest, const ContentObject &content_object) { auto segment = content_object.getName().getSuffix(); auto now = utils::SteadyTime::Clock::now(); - auto rtt = utils::SteadyTime::getDurationMs( + auto rtt = utils::SteadyTime::getDurationUs( interest_timepoints_[segment & mask], now); // Update stats updateStats(segment, rtt, now); diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc index 3a32c81f5..a04b0eecf 100644 --- a/libtransport/src/protocols/datagram_reassembly.cc +++ b/libtransport/src/protocols/datagram_reassembly.cc @@ -29,8 +29,9 @@ void DatagramReassembly::reassemble(core::ContentObject& content_object) { auto read_buffer = content_object.getPayload(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Size of payload: " << read_buffer->length() << ". Trimming " - << transport_protocol_->transportHeaderLength(); - read_buffer->trimStart(transport_protocol_->transportHeaderLength()); + << transport_protocol_->transportHeaderLength(false); + // here we have only src data packet + read_buffer->trimStart(transport_protocol_->transportHeaderLength(false)); Reassembly::read_buffer_ = std::move(read_buffer); Reassembly::notifyApplication(); } diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h index 0def32dd2..cefdca93b 100644 --- a/libtransport/src/protocols/datagram_reassembly.h +++ b/libtransport/src/protocols/datagram_reassembly.h @@ -29,10 +29,6 @@ class DatagramReassembly : public Reassembly { virtual void reassemble(core::ContentObject &content_object) override; void reassemble(utils::MemBuf &buffer, uint32_t suffix) override; virtual void reInitialize() override; - virtual void reassemble( - std::unique_ptr<core::ContentObjectManifest> &&manifest) override { - return; - } bool reassembleUnverified() override { return true; } }; diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc index d4d98a90b..9e0a06dd8 100644 --- a/libtransport/src/protocols/fec/rely.cc +++ b/libtransport/src/protocols/fec/rely.cc @@ -79,7 +79,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // Check new payload size and make sure it fits in packet buffer auto new_payload_size = produce_bytes(); - int difference = new_payload_size - length; + int difference = (int)(new_payload_size - length); DCHECK(difference > 0); DCHECK(content_object.ensureCapacity(difference)); diff --git a/libtransport/src/protocols/fec/rely.h b/libtransport/src/protocols/fec/rely.h index 001a26002..cc81222b2 100644 --- a/libtransport/src/protocols/fec/rely.h +++ b/libtransport/src/protocols/fec/rely.h @@ -15,6 +15,7 @@ #pragma once +#include <hicn/transport/portability/endianess.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <hicn/transport/utils/membuf.h> #include <protocols/fec/fec_info.h> @@ -80,11 +81,19 @@ class RelyBase : public virtual FECBase { */ class fec_metadata { public: - void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } - uint32_t getSeqNumberBase() const { return ntohl(seq_number); } - - void setMetadataBase(uint32_t value) { metadata = htonl(value); } - uint32_t getMetadataBase() const { return ntohl(metadata); } + void setSeqNumberBase(uint32_t suffix) { + seq_number = portability::host_to_net(suffix); + } + uint32_t getSeqNumberBase() const { + return portability::net_to_host(seq_number); + } + + void setMetadataBase(uint32_t value) { + metadata = portability::host_to_net(value); + } + uint32_t getMetadataBase() const { + return portability::net_to_host(metadata); + } private: uint32_t seq_number; @@ -162,8 +171,9 @@ class RelyEncoder : RelyBase, rely::encoder, public ProducerFEC { /** * @brief Get the fec header size, if added to source packets + * there is not need to distinguish between source and FEC packets here */ - std::size_t getFecHeaderSize() override { + std::size_t getFecHeaderSize(bool isFEC) override { return header_bytes() + sizeof(fec_metadata) + 4; } @@ -184,8 +194,9 @@ class RelyDecoder : RelyBase, rely::decoder, public ConsumerFEC { /** * @brief Get the fec header size, if added to source packets + * there is not need to distinguish between source and FEC packets here */ - std::size_t getFecHeaderSize() override { + std::size_t getFecHeaderSize(bool isFEC) override { return header_bytes() + sizeof(fec_metadata); } diff --git a/libtransport/src/protocols/fec/rs.cc b/libtransport/src/protocols/fec/rs.cc index 9c0a3d4fb..d42740c32 100644 --- a/libtransport/src/protocols/fec/rs.cc +++ b/libtransport/src/protocols/fec/rs.cc @@ -146,7 +146,8 @@ void BlockCode::encode() { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling encode with max_buffer_size_ = " << max_buffer_size_; for (uint32_t i = k_; i < n_; i++) { - fec_encode(code_, data, data[i], i, max_buffer_size_ + METADATA_BYTES); + fec_encode(code_, data, data[i], i, + (int)(max_buffer_size_ + METADATA_BYTES)); } // Re-include header in repair packets @@ -213,7 +214,8 @@ void BlockCode::decode() { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling decode with max_buffer_size_ = " << max_buffer_size_; - fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); + fec_decode(code_, data, reinterpret_cast<int *>(index), + (int)max_buffer_size_); // Find the index in the block for recovered packets for (uint32_t i = 0, j = 0; i < k_; i++) { @@ -228,6 +230,7 @@ void BlockCode::decode() { auto &packet = operator[](i).getBuffer(); fec_metadata *metadata = reinterpret_cast<fec_metadata *>( packet->writableData() + max_buffer_size_ - METADATA_BYTES); + DCHECK(metadata->getPacketLength() <= packet->capacity()); // Adjust buffer length packet->setLength(metadata->getPacketLength()); // Adjust metadata diff --git a/libtransport/src/protocols/fec/rs.h b/libtransport/src/protocols/fec/rs.h index 034c32bdc..6672eaa6b 100644 --- a/libtransport/src/protocols/fec/rs.h +++ b/libtransport/src/protocols/fec/rs.h @@ -18,6 +18,7 @@ #include <arpa/inet.h> #include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/portability/endianess.h> #include <hicn/transport/utils/membuf.h> #include <protocols/fec/fec_info.h> #include <protocols/fec_base.h> @@ -153,8 +154,10 @@ struct fec_header { */ uint8_t padding; - void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } - uint32_t getSeqNumberBase() { return ntohl(seq_number); } + void setSeqNumberBase(uint32_t suffix) { + seq_number = portability::host_to_net(suffix); + } + uint32_t getSeqNumberBase() { return portability::net_to_host(seq_number); } void setEncodedSymbolId(uint8_t esi) { encoded_symbol_id = esi; } uint8_t getEncodedSymbolId() { return encoded_symbol_id; } void setSourceBlockLen(uint8_t k) { source_block_len = k; } @@ -163,6 +166,8 @@ struct fec_header { uint8_t getNFecSymbols() { return n_fec_symbols; } }; +static_assert(sizeof(fec_header) <= 8, "fec_header is too large"); + class rs; /** @@ -177,11 +182,17 @@ class BlockCode : public Packets { */ class __attribute__((__packed__)) fec_metadata { public: - void setPacketLength(uint16_t length) { packet_length = htons(length); } - uint32_t getPacketLength() { return ntohs(packet_length); } + void setPacketLength(uint16_t length) { + packet_length = portability::host_to_net(length); + } + uint32_t getPacketLength() { + return portability::net_to_host(packet_length); + } - void setMetadataBase(uint32_t value) { metadata = htonl(value); } - uint32_t getMetadataBase() { return ntohl(metadata); } + void setMetadataBase(uint32_t value) { + metadata = portability::host_to_net(value); + } + uint32_t getMetadataBase() { return portability::net_to_host(metadata); } private: uint16_t packet_length; /* Used to get the real size of the packet after we @@ -388,8 +399,11 @@ class RSEncoder : public rs, public ProducerFEC { /** * @brief Get the fec header size, if added to source packets + * in RS the source packets do not transport any FEC header */ - std::size_t getFecHeaderSize() override { return 0; } + std::size_t getFecHeaderSize(bool isFEC) override { + return isFEC ? sizeof(fec_header) : 0; + } void clear() override { rs::clear(); @@ -435,8 +449,11 @@ class RSDecoder : public rs, public ConsumerFEC { /** * @brief Get the fec header size, if added to source packets + * in RS the source packets do not transport any FEC header */ - std::size_t getFecHeaderSize() override { return 0; } + std::size_t getFecHeaderSize(bool isFEC) override { + return isFEC ? sizeof(fec_header) : 0; + } /** * Clear decoder to reuse diff --git a/libtransport/src/protocols/fec_base.h b/libtransport/src/protocols/fec_base.h index bda3ee756..28f6a820a 100644 --- a/libtransport/src/protocols/fec_base.h +++ b/libtransport/src/protocols/fec_base.h @@ -101,8 +101,10 @@ class FECBase { /** * @brief Get size of FEC header. + * the fec header size may be different if a packet is a data packet or a FEC + * packet */ - virtual std::size_t getFecHeaderSize() = 0; + virtual std::size_t getFecHeaderSize(bool isFEC) = 0; /** * Set callback to call after packet encoding / decoding diff --git a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc index b5ab8184f..0b15559a4 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc +++ b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc @@ -66,40 +66,33 @@ void ManifestIncrementalIndexer::onUntrustedManifest( return; } - auto manifest = - std::make_unique<ContentObjectManifest>(std::move(content_object)); - manifest->decode(); + core::ContentObjectManifest manifest(content_object.shared_from_this()); + manifest.decode(); - processTrustedManifest(interest, std::move(manifest), reassembly); + processTrustedManifest(interest, manifest, reassembly); } void ManifestIncrementalIndexer::processTrustedManifest( - core::Interest &interest, std::unique_ptr<ContentObjectManifest> manifest, + core::Interest &interest, core::ContentObjectManifest &manifest, bool reassembly) { - if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != - core::ManifestVersion::VERSION_1)) { - throw errors::RuntimeException("Received manifest with unknown version."); - } - - switch (manifest->getType()) { + switch (manifest.getType()) { case core::ManifestType::INLINE_MANIFEST: { suffix_strategy_->setFinalSuffix( - manifest->getParamsBytestream().final_segment); + manifest.getParamsBytestream().final_segment); // The packets to verify with the received manifest std::vector<auth::PacketPtr> packets; // Convert the received manifest to a map of packet suffixes to hashes - auth::Verifier::SuffixMap current_manifest = - core::ContentObjectManifest::getSuffixMap(manifest.get()); + auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap(); // Update 'suffix_map_' with new hashes from the received manifest and // build 'packets' - for (auto it = current_manifest.begin(); it != current_manifest.end();) { + for (auto it = suffix_map.begin(); it != suffix_map.end();) { if (unverified_segments_.find(it->first) == unverified_segments_.end()) { suffix_map_[it->first] = std::move(it->second); - current_manifest.erase(it++); + suffix_map.erase(it++); continue; } @@ -109,7 +102,7 @@ void ManifestIncrementalIndexer::processTrustedManifest( // Verify unverified segments using the received manifest auth::Verifier::PolicyMap policies = - verifier_->verifyPackets(packets, current_manifest); + verifier_->verifyPackets(packets, suffix_map); for (unsigned int i = 0; i < packets.size(); ++i) { auth::Suffix suffix = packets[i]->getName().getSuffix(); @@ -126,7 +119,9 @@ void ManifestIncrementalIndexer::processTrustedManifest( } if (reassembly) { - reassembly_->reassemble(std::move(manifest)); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest.getPacket()); + reassembly_->reassemble(*manifest_co); } break; } diff --git a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h index 12876f35c..8527b55c1 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h +++ b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h @@ -76,7 +76,7 @@ class ManifestIncrementalIndexer : public IncrementalIndexer { core::ContentObject &content_object, bool reassembly); void processTrustedManifest(core::Interest &interest, - std::unique_ptr<ContentObjectManifest> manifest, + core::ContentObjectManifest &manifest, bool reassembly); void onUntrustedContentObject(core::Interest &interest, core::ContentObject &content_object, diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc index 2a3ec07e1..7f103e12b 100644 --- a/libtransport/src/protocols/prod_protocol_bytestream.cc +++ b/libtransport/src/protocols/prod_protocol_bytestream.cc @@ -111,18 +111,18 @@ uint32_t ByteStreamProductionProtocol::produceStream( uint64_t manifest_free_space; uint32_t nb_manifests; std::shared_ptr<core::ContentObjectManifest> manifest; - uint32_t manifest_capacity = making_manifest_; + uint32_t manifest_capacity = manifest_max_capacity_; bool is_last_manifest = false; ParamsBytestream transport_params; manifest_format = Packet::toAHFormat(default_format); - content_format = - !making_manifest_ ? Packet::toAHFormat(default_format) : default_format; + content_format = !manifest_max_capacity_ ? Packet::toAHFormat(default_format) + : default_format; - content_header_size = - core::Packet::getHeaderSizeFromFormat(content_format, signature_length); - manifest_header_size = - core::Packet::getHeaderSizeFromFormat(manifest_format, signature_length); + content_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat( + content_format, signature_length); + manifest_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat( + manifest_format, signature_length); content_free_space = std::min(max_segment_size, data_packet_size - content_header_size); manifest_free_space = @@ -135,34 +135,39 @@ uint32_t ByteStreamProductionProtocol::produceStream( nb_segments++; } - if (making_manifest_) { + if (manifest_max_capacity_) { nb_manifests = static_cast<uint32_t>( std::ceil(float(nb_segments) / manifest_capacity)); final_block_number += nb_segments + nb_manifests - 1; transport_params.final_segment = is_last ? final_block_number : utils::SuffixStrategy::MAX_SUFFIX; - manifest.reset(ContentObjectManifest::createManifest( + manifest = ContentObjectManifest::createContentManifest( manifest_format, name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - is_last_manifest, name, hash_algo, signature_length)); - - manifest->setLifetime(content_object_expiry_time); + signature_length); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, is_last_manifest, + name); manifest->setParamsBytestream(transport_params); + manifest->getPacket()->setLifetime(content_object_expiry_time); } auto self = shared_from_this(); for (unsigned int packaged_segments = 0; packaged_segments < nb_segments; packaged_segments++) { - if (making_manifest_) { - if (manifest->estimateManifestSize(1) > manifest_free_space) { + if (manifest_max_capacity_) { + if (manifest->Encoder::manifestSize(1) > manifest_free_space) { manifest->encode(); - signer_->signPacket(manifest.get()); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); + + signer_->signPacket(manifest_co.get()); // Send the current manifest - passContentObjectToCallbacks(manifest, self); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName(); + passContentObjectToCallbacks(manifest_co, self); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send manifest " << manifest_co->getName(); // Send content objects stored in the queue while (!content_queue_.empty()) { @@ -175,15 +180,15 @@ uint32_t ByteStreamProductionProtocol::produceStream( // Create new manifest. The reference to the last manifest has been // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference. - manifest.reset(ContentObjectManifest::createManifest( + manifest = ContentObjectManifest::createContentManifest( manifest_format, name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, is_last_manifest, name, - hash_algo, signature_length)); - - manifest->setLifetime(content_object_expiry_time); + signature_length); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, + is_last_manifest, name); manifest->setParamsBytestream(transport_params); + manifest->getPacket()->setLifetime(content_object_expiry_time); } } @@ -191,7 +196,7 @@ uint32_t ByteStreamProductionProtocol::produceStream( uint32_t content_suffix = suffix_strategy->getNextContentSuffix(); auto content_object = std::make_shared<ContentObject>( name.setSuffix(content_suffix), content_format, - !making_manifest_ ? signature_length : 0); + !manifest_max_capacity_ ? signature_length : 0); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); @@ -203,7 +208,7 @@ uint32_t ByteStreamProductionProtocol::produceStream( b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); - if (is_last && making_manifest_) { + if (is_last && manifest_max_capacity_) { is_last_manifest = true; } else if (is_last) { content_object->setLast(); @@ -219,9 +224,9 @@ uint32_t ByteStreamProductionProtocol::produceStream( // Either we sign the content object or we save its hash into the current // manifest - if (making_manifest_) { + if (manifest_max_capacity_) { auth::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(content_suffix, hash); + manifest->addEntry(content_suffix, hash); content_queue_.push(content_object); } else { signer_->signPacket(content_object.get()); @@ -232,16 +237,19 @@ uint32_t ByteStreamProductionProtocol::produceStream( } // We send the manifest that hasn't been fully filled yet - if (making_manifest_) { + if (manifest_max_capacity_) { if (is_last_manifest) { manifest->setIsLast(is_last_manifest); } manifest->encode(); - signer_->signPacket(manifest.get()); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); + + signer_->signPacket(manifest_co.get()); - passContentObjectToCallbacks(manifest, self); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName(); + passContentObjectToCallbacks(manifest_co, self); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest_co->getName(); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front(), self); diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index e49f58167..cb8dff6e4 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -43,9 +43,6 @@ RTCProductionProtocol::RTCProductionProtocol( last_produced_data_ts_(0), last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), - queue_timer_on_(false), - consumer_in_sync_(false), - on_consumer_in_sync_(nullptr), pending_fec_pace_(false), max_len_(0), queue_len_(0), @@ -54,8 +51,6 @@ RTCProductionProtocol::RTCProductionProtocol( std::uniform_int_distribution<> dis(0, 255); prod_label_ = dis(gen_); cache_label_ = (prod_label_ + 1) % 256; - interests_queue_timer_ = - std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); fec_pacing_timer_ = @@ -69,16 +64,7 @@ RTCProductionProtocol::~RTCProductionProtocol() {} void RTCProductionProtocol::setProducerParam() { // Flow name: here we assume there is only one prefix registered in the portal - flow_name_ = portal_->getServedNamespaces().begin()->getName(); - - // Manifest - uint32_t making_manifest; - socket_->getSocketOption(interface::GeneralTransportOptions::MAKE_MANIFEST, - making_manifest); - - // Signer - std::shared_ptr<auth::Signer> signer; - socket_->getSocketOption(interface::GeneralTransportOptions::SIGNER, signer); + flow_name_ = portal_->getServedNamespaces().begin()->makeName(); // Default format core::Packet::Format default_format; @@ -94,15 +80,22 @@ void RTCProductionProtocol::setProducerParam() { socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); - size_t signature_size = signer->getSignatureFieldSize(); - data_header_format_ = { - !making_manifest ? Packet::toAHFormat(default_format) : default_format, - !making_manifest ? signature_size : 0}; + size_t signature_size = signer_->getSignatureFieldSize(); + data_header_format_ = {!manifest_max_capacity_ + ? Packet::toAHFormat(default_format) + : default_format, + !manifest_max_capacity_ ? signature_size : 0}; manifest_header_format_ = {Packet::toAHFormat(default_format), signature_size}; nack_header_format_ = {Packet::toAHFormat(default_format), signature_size}; fec_header_format_ = {Packet::toAHFormat(default_format), signature_size}; + // Initialize verifier for aggregated interests + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER, + verifier); + verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0); + // Schedule round timer scheduleRoundTimer(); } @@ -143,15 +136,17 @@ void RTCProductionProtocol::updateStats(bool new_round) { packets_production_rate_ = ceil((double)(produced_packets_ + prev_produced_packets_) * per_second); - // add fec packets looking at the fec code. we don't use directly the number - // of fec packets produced in 1 round because it may happen that different - // numbers of blocks are generated during the rounds and this creates - // inconsistencies in the estimation of the production rate - uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); - uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); + if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) { + // add fec packets looking at the fec code. we don't use directly the number + // of fec packets produced in 1 round because it may happen that different + // numbers of blocks are generated during the rounds and this creates + // inconsistencies in the estimation of the production rate + uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); + uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); - packets_production_rate_ += - ceil((double)packets_production_rate_ / (double)k) * (n - k); + packets_production_rate_ += + ceil((double)packets_production_rate_ / (double)k) * (n - k); + } // update the production rate as soon as it increases by 10% with respect to // the last round @@ -168,11 +163,6 @@ void RTCProductionProtocol::updateStats(bool new_round) { allow_delayed_nacks_ = false; } - // check if the production rate is decreased. if yes send nacks if needed - if (prev_packets_production_rate < packets_production_rate_) { - sendNacksForPendingInterests(); - } - if (new_round) { prev_produced_bytes_ = produced_bytes_; prev_produced_packets_ = produced_packets_; @@ -203,16 +193,25 @@ void RTCProductionProtocol::produce(ContentObject &content_object) { uint32_t RTCProductionProtocol::produceDatagram( const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { std::size_t buffer_size = buffer->length(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Maybe Sending content object: " << content_name; + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name; + uint32_t data_packet_size; socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); - - if (TRANSPORT_EXPECT_FALSE( - (Packet::getHeaderSizeFromFormat(data_header_format_.first, - data_header_format_.second) + - rtc::DATA_HEADER_SIZE + buffer_size) > data_packet_size)) { + // this is a source packet but we check the fec header size of FEC packet in + // order to leave room for the header when FEC packets will be generated + uint32_t fec_header = 0; + if (fec_encoder_) fec_encoder_->getFecHeaderSize(true); + uint32_t headers_size = + (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first, + data_header_format_.second) + + rtc::DATA_HEADER_SIZE + fec_header; + if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) { return 0; } @@ -338,47 +337,42 @@ void RTCProductionProtocol::emptyQueue() { } void RTCProductionProtocol::sendManifest(const Name &name) { - if (!making_manifest_) { + if (!manifest_max_capacity_) { return; } - Name manifest_name(name); - - uint32_t data_packet_size; - socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, - data_packet_size); - - // The maximum number of entries a manifest can hold - uint32_t manifest_capacity = making_manifest_; + Name manifest_name = name; // If there is not enough hashes to fill a manifest, return early - if (manifest_entries_.size() < manifest_capacity) { + if (manifest_entries_.size() < manifest_max_capacity_) { return; } // Create a new manifest std::shared_ptr<core::ContentObjectManifest> manifest = createManifest(manifest_name.setSuffix(current_seg_)); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); // Fill the manifest with packet hashes that were previously saved uint32_t nb_entries; - for (nb_entries = 0; nb_entries < manifest_capacity; ++nb_entries) { + for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) { if (manifest_entries_.empty()) { break; } std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front(); - manifest->addSuffixHash(front.first, front.second); + manifest->addEntry(front.first, front.second); manifest_entries_.pop(); } DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Sending manifest " << manifest->getName().getSuffix() << " of size " - << nb_entries; + << "Sending manifest " << manifest_co->getName().getSuffix() + << " of size " << nb_entries; // Encode and send the manifest manifest->encode(); portal_->getThread().tryRunHandlerNow( - [this, content_object{std::move(manifest)}, manifest_name]() mutable { + [this, content_object{std::move(manifest_co)}, manifest_name]() mutable { produceInternal(std::move(content_object), manifest_name); }); } @@ -394,11 +388,12 @@ RTCProductionProtocol::createManifest(const Name &content_name) const { uint64_t now = utils::SteadyTime::nowMs().count(); // Create a new manifest - std::shared_ptr<core::ContentObjectManifest> manifest( - ContentObjectManifest::createManifest( - manifest_header_format_.first, name, core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, false, name, hash_algo, - manifest_header_format_.second)); + std::shared_ptr<core::ContentObjectManifest> manifest = + ContentObjectManifest::createContentManifest( + manifest_header_format_.first, name, manifest_header_format_.second); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, false /* is_last */, + name); // Set connection parameters manifest->setParamsRTC(ParamsRTC{ @@ -444,7 +439,15 @@ void RTCProductionProtocol::producePktInternal( // set hicn stuff Name n(content_name); content_object->setName(n.setSuffix(current_seg_)); - content_object->setLifetime(500); // XXX this should be set by the APP + + uint32_t expiry_time = 0; + socket_->getSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + expiry_time); + if (expiry_time == interface::default_values::content_object_expiry_time) + expiry_time = 500; // the data expiration time should be set by the App. if + // the App does not specify it the default is 500ms + content_object->setLifetime(expiry_time); content_object->setPathLabel(prod_label_); // update stats @@ -466,9 +469,9 @@ void RTCProductionProtocol::producePktInternal( // pass packet to FEC encoder if (fec_encoder_ && !fec) { - uint32_t offset = - is_manifest ? content_object->headerSize() - : content_object->headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize() + : (uint32_t)content_object->headerSize() + + rtc::DATA_HEADER_SIZE; uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType()); fec_encoder_->onPacketProduced(*content_object, offset, metadata); @@ -481,19 +484,14 @@ void RTCProductionProtocol::producePktInternal( *content_object); } - auto seq_it = seqs_map_.find(current_seg_); - if (seq_it != seqs_map_.end()) { - sendContentObject(content_object, false, fec); - } + // TODO we may want to send FEC only if an interest is pending in the pit in + sendContentObject(content_object, false, fec); if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *content_object); } - // remove interests from the interest cache if it exists - removeFromInterestQueue(current_seg_); - if (!fec) last_produced_data_ts_ = now; // Update current segment @@ -563,59 +561,65 @@ void RTCProductionProtocol::onInterest(Interest &interest) { on_interest_input_->operator()(*socket_->getInterface(), interest); } - auto suffix = interest.firstSuffix(); - // numberOfSuffixes returns only the prefixes in the payalod - // we add + 1 to count also the seq in the name - auto n_suffixes = interest.numberOfSuffixes() + 1; - Name name = interest.getName(); - bool prev_consumer_state = consumer_in_sync_; - - for (uint32_t i = 0; i < n_suffixes; i++) { - if (i > 0) { - name.setSuffix(*(suffix + (i - 1))); - } + if (!interest.isValid()) throw std::runtime_error("Bad interest format"); + if (interest.hasManifest() && + verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT) + throw std::runtime_error("Interset manifest verification failed"); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name; + uint32_t *suffix = interest.firstSuffix(); + uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes(); + uint32_t *request_bitmap = interest.getRequestBitmap(); - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(name); + Name name = interest.getName(); + uint32_t pos = 0; // Position of current suffix in manifest - if (content_object) { - if (*on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_->operator()( - *socket_->getInterface(), interest); - } + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received interest " << name << " (" << n_suffixes_in_manifest + << " suffixes in manifest)"; + + // Process the suffix in the interest header + // (first loop iteration), then suffixes in the manifest + do { + if (!interest.hasManifest() || is_bit_set(request_bitmap, pos)) { + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(name); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()( + *socket_->getInterface(), interest); + } - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), - *content_object); - } + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Send content %u (onInterest) " << content_object->getName(); - content_object->setPathLabel(cache_label_); - sendContentObject(content_object); - } else { - if (*on_interest_process_) { - on_interest_process_->operator()(*socket_->getInterface(), interest); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content %u (onInterest) " << content_object->getName(); + content_object->setPathLabel(cache_label_); + sendContentObject(content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + processInterest(name.getSuffix(), interest.getLifetime()); } - processInterest(name.getSuffix(), interest.getLifetime()); } - } - if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_) - on_consumer_in_sync_(*socket_->getInterface(), interest); + // Retrieve next suffix in the manifest + if (interest.hasManifest()) { + uint32_t seq = *suffix; + suffix++; + + name.setSuffix(seq); + interest.setName(name); + } + } while (pos++ < n_suffixes_in_manifest); } void RTCProductionProtocol::processInterest(uint32_t interest_seg, uint32_t lifetime) { - if (interest_seg == 0) { - // first packet from the consumer, reset sync state - consumer_in_sync_ = false; - } - - uint64_t now = utils::SteadyTime::nowMs().count(); - switch (rtc::ProbeHandler::getProbeType(interest_seg)) { case rtc::ProbeType::INIT: DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg; @@ -629,183 +633,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, break; } - // if the production rate 0 use delayed nacks - if (allow_delayed_nacks_ && interest_seg >= current_seg_) { - uint64_t next_timer = UINT64_MAX; - if (!timers_map_.empty()) { - next_timer = timers_map_.begin()->first; - } - - uint64_t expiration = now + rtc::NACK_DELAY; - addToInterestQueue(interest_seg, expiration); - - // here we have at least one interest in the queue, we need to start or - // update the timer - if (!queue_timer_on_) { - // set timeout - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } else { - // re-schedule the timer because a new interest will expires sooner - if (next_timer > timers_map_.begin()->first) { - interests_queue_timer_->cancel(); - scheduleQueueTimer(timers_map_.begin()->first - now); - } - } - return; - } - - if (queue_timer_on_) { - // the producer is producing. Send nacks to packets that will expire - // before the data production and remove the timer - queue_timer_on_ = false; - interests_queue_timer_->cancel(); - sendNacksForPendingInterests(); - } - - uint32_t max_gap = (uint32_t)floor( - (double)((double)((double)lifetime * - rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / - rtc::MILLI_IN_A_SEC) * - (double)(packets_production_rate_))); - - if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { - sendNack(interest_seg); - } else { - if (!consumer_in_sync_ && on_consumer_in_sync_) { - // we consider the remote consumer to be in sync as soon as it covers - // 70% of the production window with interests - uint32_t perc = ceil((double)max_gap * 0.7); - if (interest_seg > (perc + current_seg_)) { - consumer_in_sync_ = true; - // on_consumer_in_sync_(*socket_->getInterface(), interest); - } - } - uint64_t expiration = - now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR); - addToInterestQueue(interest_seg, expiration); - } -} - -void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { - interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); - std::weak_ptr<RTCProductionProtocol> self = shared_from_this(); - interests_queue_timer_->async_wait([self](const std::error_code &ec) { - if (ec) { - return; - } - - auto sp = self.lock(); - if (sp && sp->isRunning()) { - sp->interestQueueTimer(); - } - }); -} - -void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, - uint64_t expiration) { - // check if the seq number exists already - auto it_seqs = seqs_map_.find(interest_seg); - if (it_seqs != seqs_map_.end()) { - // the seq already exists - if (expiration < it_seqs->second) { - // we need to update the timer becasue we got a smaller one - // 1) remove the entry from the multimap - // 2) update this entry - auto range = timers_map_.equal_range(it_seqs->second); - for (auto it_timers = range.first; it_timers != range.second; - it_timers++) { - if (it_timers->second == it_seqs->first) { - timers_map_.erase(it_timers); - break; - } - } - timers_map_.insert( - std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - it_seqs->second = expiration; - } else { - // nothing to do here - return; - } - } else { - // add the new seq - timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg)); - seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration)); - } -} - -void RTCProductionProtocol::sendNacksForPendingInterests() { - std::unordered_set<uint32_t> to_remove; - - uint32_t pps = ceil((double)(packets_production_rate_)*rtc:: - INTEREST_LIFETIME_REDUCTION_FACTOR); - - uint64_t now = utils::SteadyTime::nowMs().count(); - for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { - if (it->first > current_seg_ && it->second > now) { - double exp_time_in_sec = - (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC; - uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec); - - if (it->first > (current_seg_ + packets_prod_before_expire)) { - sendNack(it->first); - to_remove.insert(it->first); - } - } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ || - it->second <= now)) { - // this branch should never be execcuted - // first condition: the packet was already prdocued and we have and old - // interest pending. send a nack to notify the consumer if needed. the - // case it->first = current_seg_ is not handled because - // the interest will be satified by the next data packet. - // second condition: the interest is expired. - sendNack(it->first); - to_remove.insert(it->first); - } - } - - // delete nacked interests - for (auto it = to_remove.begin(); it != to_remove.end(); it++) { - removeFromInterestQueue(*it); - } -} - -void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { - auto seq_it = seqs_map_.find(interest_seg); - if (seq_it != seqs_map_.end()) { - auto range = timers_map_.equal_range(seq_it->second); - for (auto it_timers = range.first; it_timers != range.second; it_timers++) { - if (it_timers->second == seq_it->first) { - timers_map_.erase(it_timers); - break; - } - } - seqs_map_.erase(seq_it); - } -} - -void RTCProductionProtocol::interestQueueTimer() { - uint64_t now = utils::SteadyTime::nowMs().count(); - - for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { - uint64_t expire = it_timers->first; - if (expire <= now) { - uint32_t seq = it_timers->second; - sendNack(seq); - // remove the interest from the other map - seqs_map_.erase(seq); - it_timers = timers_map_.erase(it_timers); - } else { - // stop, we are done! - break; - } - } - if (timers_map_.empty()) { - queue_timer_on_ = false; - } else { - queue_timer_on_ = true; - scheduleQueueTimer(timers_map_.begin()->first - now); - } + if (interest_seg < current_seg_) sendNack(interest_seg); } void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { @@ -814,18 +642,20 @@ void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) { std::shared_ptr<core::ContentObjectManifest> manifest_probe = createManifest(manifest_name); + auto manifest_probe_co = + std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket()); - manifest_probe->setLifetime(0); - manifest_probe->setPathLabel(prod_label_); + manifest_probe_co->setLifetime(0); + manifest_probe_co->setPathLabel(prod_label_); manifest_probe->encode(); if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), - *manifest_probe); + *manifest_probe_co); } DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence; - sendContentObject(manifest_probe, true, false); + sendContentObject(manifest_probe_co, true, false); } void RTCProductionProtocol::sendNack(uint32_t sequence) { @@ -847,20 +677,6 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { nack->setLifetime(0); nack->setPathLabel(prod_label_); - if (!consumer_in_sync_ && on_consumer_in_sync_ && - rtc::ProbeHandler::getProbeType(sequence) == rtc::ProbeType::NOT_PROBE && - sequence > next_packet) { - consumer_in_sync_ = true; - Packet::Format format; - socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, - format); - - auto interest = - core::PacketManager<>::getInstance().getPacket<Interest>(format); - interest->setName(n); - on_consumer_in_sync_(*socket_->getInterface(), *interest); - } - if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *nack); } @@ -881,7 +697,7 @@ void RTCProductionProtocol::sendContentObject( portal_->sendContentObject(*content_object); // Compute and save data packet digest - if (making_manifest_ && !is_ah) { + if (manifest_max_capacity_ && !is_ah) { auth::CryptoHashType hash_algo; socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM, hash_algo); diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h index c0424a39c..285ccb646 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.h +++ b/libtransport/src/protocols/prod_protocol_rtc.h @@ -17,6 +17,7 @@ #include <hicn/transport/core/name.h> #include <protocols/production_protocol.h> +#include <protocols/rtc/rtc_verifier.h> #include <atomic> #include <map> @@ -50,11 +51,6 @@ class RTCProductionProtocol : public ProductionProtocol { buffer, buffer_size, buffer_size)); } - void setConsumerInSyncCallback( - interface::ProducerInterestCallback &&callback) { - on_consumer_in_sync_ = std::move(callback); - } - auto shared_from_this() { return utils::shared_from(this); } private: @@ -80,13 +76,6 @@ class RTCProductionProtocol : public ProductionProtocol { void updateStats(bool new_round); void scheduleRoundTimer(); - // pending intersts functions - void addToInterestQueue(uint32_t interest_seg, uint64_t expiration); - void sendNacksForPendingInterests(); - void removeFromInterestQueue(uint32_t interest_seg); - void scheduleQueueTimer(uint64_t wait); - void interestQueueTimer(); - // FEC functions void onFecPackets(fec::BufferArray &packets); fec::buffer getBuffer(std::size_t size); @@ -111,14 +100,14 @@ class RTCProductionProtocol : public ProductionProtocol { uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars uint32_t prev_produced_packets_; - uint32_t produced_bytes_; // bytes produced in the last round - uint32_t produced_packets_; // packet produed in the last round + uint32_t produced_bytes_; // bytes produced in the last round + uint32_t produced_packets_; // packet produed in the last round uint32_t max_packet_production_; // never exceed this number of packets // without update stats - uint32_t bytes_production_rate_; // bytes per sec - uint32_t packets_production_rate_; // pps + uint32_t bytes_production_rate_; // bytes per sec + uint32_t packets_production_rate_; // pps uint64_t last_produced_data_ts_; // ms @@ -134,27 +123,6 @@ class RTCProductionProtocol : public ProductionProtocol { // of the new rate. bool allow_delayed_nacks_; - // queue for the received interests - // this map maps the expiration time of an interest to - // its sequence number. the map is sorted by timeouts - // the same timeout may be used for multiple sequence numbers - // but for each sequence number we store only the smallest - // expiry time. In this way the mapping from seqs_map_ to - // timers_map_ is unique - std::multimap<uint64_t, uint32_t> timers_map_; - - // this map does the opposite, this map is not ordered - std::unordered_map<uint32_t, uint64_t> seqs_map_; - bool queue_timer_on_; - std::unique_ptr<asio::steady_timer> interests_queue_timer_; - - // this callback is called when the remote consumer is in sync with high - // probability. it is called only the first time that the switch happen. - // XXX this makes sense only in P2P mode, while in standard mode is - // impossible to know the state of the consumers so it should not be used. - bool consumer_in_sync_; - interface::ProducerInterestCallback on_consumer_in_sync_; - // Save FEC packets here before sending them std::queue<ContentObject::Ptr> pending_fec_packets_; std::queue<std::pair<uint64_t, ContentObject::Ptr>> paced_fec_packets_; @@ -172,6 +140,9 @@ class RTCProductionProtocol : public ProductionProtocol { // Manifest std::queue<std::pair<uint32_t, auth::CryptoHash>> manifest_entries_; // map a packet suffix to a packet hash + + // Verifier for aggregated interests + std::shared_ptr<rtc::RTCVerifier> verifier_; }; } // namespace protocol diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc index 8b781e38a..039a6a55a 100644 --- a/libtransport/src/protocols/production_protocol.cc +++ b/libtransport/src/protocols/production_protocol.cc @@ -78,8 +78,8 @@ int ProductionProtocol::start() { socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_); - socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - making_manifest_); + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_MAX_CAPACITY, + manifest_max_capacity_); std::string fec_type_str = ""; socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str); diff --git a/libtransport/src/protocols/production_protocol.h b/libtransport/src/protocols/production_protocol.h index 8e10d2f40..09718631f 100644 --- a/libtransport/src/protocols/production_protocol.h +++ b/libtransport/src/protocols/production_protocol.h @@ -79,6 +79,7 @@ class ProductionProtocol if (fec_str && (fec_type_ == fec::FECType::UNKNOWN)) { LOG(INFO) << "Using FEC " << fec_str; fec_type_ = fec::FECUtils::fecTypeFromString(fec_str); + CHECK(fec_type_ != fec::FECType::UNKNOWN); } if (fec_type_ == fec::FECType::UNKNOWN) { @@ -123,7 +124,7 @@ class ProductionProtocol // Signature and manifest std::shared_ptr<auth::Signer> signer_; - uint32_t making_manifest_; + uint32_t manifest_max_capacity_; bool is_async_; fec::FECType fec_type_; diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 131367d78..bcbc15aef 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -371,7 +371,7 @@ void RaaqmTransportProtocol::onPacketDropped(Interest &interest, } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(segment); + interest_to_retransmit_.push((unsigned int)segment); } else { LOG(ERROR) << "Stop: received not trusted packet " << interest_retransmissions_[segment & mask] << " times"; @@ -429,7 +429,7 @@ void RaaqmTransportProtocol::onInterestTimeout(Interest::Ptr &interest, return; } - interest_to_retransmit_.push(segment); + interest_to_retransmit_.push((unsigned int)segment); scheduleNextInterests(); } else { LOG(ERROR) << "Stop: reached max retx limit."; @@ -491,7 +491,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { throw std::runtime_error("RAAQM ERROR: no current path found, exit"); } else { auto now = utils::SteadyTime::Clock::now(); - utils::SteadyTime::Milliseconds rtt = utils::SteadyTime::getDurationMs( + auto rtt = utils::SteadyTime::getDurationUs( interest_timepoints_[segment & mask], now); // Update stats @@ -525,7 +525,7 @@ void RaaqmTransportProtocol::RAAQM() { } void RaaqmTransportProtocol::updateStats( - uint32_t suffix, const utils::SteadyTime::Milliseconds &rtt, + uint32_t suffix, const utils::SteadyTime::Microseconds &rtt, utils::SteadyTime::TimePoint &now) { // Update RTT statistics stats_->updateAverageRtt(rtt); diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index ec344c23a..a7ef23b68 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -57,7 +57,7 @@ class RaaqmTransportProtocol : public TransportProtocol, virtual void afterDataUnsatisfied(uint64_t segment); virtual void updateStats(uint32_t suffix, - const utils::SteadyTime::Milliseconds &rtt, + const utils::SteadyTime::Microseconds &rtt, utils::SteadyTime::TimePoint &now); private: diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index d06fee918..b8e6e6285 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -50,9 +50,9 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor, alpha_(ALPHA) {} RaaqmDataPath &RaaqmDataPath::insertNewRtt( - const utils::SteadyTime::Milliseconds &new_rtt, + const utils::SteadyTime::Microseconds &new_rtt, const utils::SteadyTime::TimePoint &now) { - rtt_ = new_rtt.count(); + rtt_ = new_rtt.count() / 1000; rtt_samples_.pushBack(rtt_); rtt_max_ = rtt_samples_.rBegin(); diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index b6f7c5ac1..dd24dad51 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -49,7 +49,7 @@ class RaaqmDataPath { * max of RTT. * @param new_rtt is the value of the new RTT */ - RaaqmDataPath &insertNewRtt(const utils::SteadyTime::Milliseconds &new_rtt, + RaaqmDataPath &insertNewRtt(const utils::SteadyTime::Microseconds &new_rtt, const utils::SteadyTime::TimePoint &now); /** diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc index d834b53e6..01c18c6cb 100644 --- a/libtransport/src/protocols/rate_estimation.cc +++ b/libtransport/src/protocols/rate_estimation.cc @@ -107,9 +107,9 @@ InterRttEstimator::~InterRttEstimator() { } void InterRttEstimator::onRttUpdate( - const utils::SteadyTime::Milliseconds &rtt) { + const utils::SteadyTime::Microseconds &rtt) { pthread_mutex_lock(&(this->mutex_)); - this->rtt_ = rtt.count(); + this->rtt_ = rtt.count() / 1000.0; this->number_of_packets_++; this->avg_rtt_ += this->rtt_; pthread_mutex_unlock(&(this->mutex_)); @@ -256,7 +256,7 @@ void SimpleEstimator::onDataReceived(int packet_size) { this->total_size_ += packet_size; } -void SimpleEstimator::onRttUpdate(const utils::SteadyTime::Milliseconds &rtt) { +void SimpleEstimator::onRttUpdate(const utils::SteadyTime::Microseconds &rtt) { this->number_of_packets_++; if (this->number_of_packets_ == this->batching_param_) { @@ -300,9 +300,9 @@ BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, } void BatchingPacketsEstimator::onRttUpdate( - const utils::SteadyTime::Milliseconds &rtt) { + const utils::SteadyTime::Microseconds &rtt) { this->number_of_packets_++; - this->avg_rtt_ += rtt.count(); + this->avg_rtt_ += rtt.count() / 1000.0; if (number_of_packets_ == this->batching_param_) { if (estimation_ == 0) { diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h index b71de12e4..d809b2b7c 100644 --- a/libtransport/src/protocols/rate_estimation.h +++ b/libtransport/src/protocols/rate_estimation.h @@ -31,7 +31,7 @@ class IcnRateEstimator : utils::NonCopyable { virtual ~IcnRateEstimator(){}; - virtual void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt){}; + virtual void onRttUpdate(const utils::SteadyTime::Microseconds &rtt){}; virtual void onDataReceived(int packetSize){}; @@ -66,7 +66,7 @@ class InterRttEstimator : public IcnRateEstimator { ~InterRttEstimator(); - void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt); + void onRttUpdate(const utils::SteadyTime::Microseconds &rtt); void onDataReceived(int packet_size) { if (packet_size > this->max_packet_size_) { @@ -101,7 +101,7 @@ class BatchingPacketsEstimator : public IcnRateEstimator { public: BatchingPacketsEstimator(double alpha_arg, int batchingParam); - void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt); + void onRttUpdate(const utils::SteadyTime::Microseconds &rtt); void onDataReceived(int packet_size) { if (packet_size > this->max_packet_size_) { @@ -148,7 +148,7 @@ class SimpleEstimator : public IcnRateEstimator { public: SimpleEstimator(double alpha, int batching_param); - void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt); + void onRttUpdate(const utils::SteadyTime::Microseconds &rtt); void onDataReceived(int packet_size); diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h index b0879201d..c0c4de3d8 100644 --- a/libtransport/src/protocols/reassembly.h +++ b/libtransport/src/protocols/reassembly.h @@ -57,12 +57,6 @@ class Reassembly { virtual void reassemble(utils::MemBuf &buffer, uint32_t suffix) = 0; /** - * Handle reassembly of manifest - */ - virtual void reassemble( - std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0; - - /** * Reset reassembler for new round */ virtual void reInitialize() = 0; diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc index 6a84914ab..60eceeb19 100644 --- a/libtransport/src/protocols/rtc/probe_handler.cc +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <glog/logging.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> @@ -64,7 +65,7 @@ double ProbeHandler::getProbeLossRate() { } void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) { - assert(min <= max && min >= MIN_PROBE_SEQ); + DCHECK(min <= max && min >= MIN_PROBE_SEQ); distr_ = std::uniform_int_distribution<uint32_t>(min, max); } diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index d2682edfa..9a56269f3 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), new RtcReassembly(icn_socket, this)), + max_aggregated_interest_(1), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); round_timer_ = @@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() { TransportProtocol::resume(); } -std::size_t RTCTransportProtocol::transportHeaderLength() { +std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) { return DATA_HEADER_SIZE + - (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0); + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0); } // private @@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() { std::shared_ptr<auth::Verifier> verifier; socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - uint32_t unverified_interval; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL, - unverified_interval); + uint32_t factor_relevant; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + factor_relevant); - double unverified_ratio; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, - unverified_ratio); + uint32_t factor_alert; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + factor_alert); rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( @@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() { } }); - verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval, - unverified_ratio); + verifier_ = + std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), @@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() { last_interest_sent_time_ = 0; last_interest_sent_seq_ = 0; -#if 0 - if(portal_->isConnectedToFwd()){ - max_aggregated_interest_ = 1; - }else{ - max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; - } -#else - max_aggregated_interest_ = 1; - if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) { - LOG(INFO) << "Max Aggregated: " << max_aggr; - max_aggregated_interest_ = std::stoul(std::string(max_aggr)); + // Aggregated interests setup + bool aggregated_interests_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS, + aggregated_interests_on); + if (aggregated_interests_on) { + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) + max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr)); + else + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + + max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_, + 1 + MAX_SUFFIXES_IN_MANIFEST); } -#endif + LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_; max_sent_int_ = std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); @@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() { socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); ldr_->changeRecoveryStrategy( (interface::RtcTransportRecoveryStrategies)strategy); + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) ldr_->setContentSharingMode(); ldr_->turnOnRecovery(); ldr_->onNewRound(false); @@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() { Name *name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); Prefix prefix(*name, 128); - if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BEST_PATH); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_REPLICATION) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::REPLICATION); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_ALL_FWD_STRATEGIES) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BOTH); - } - + fwd_strategy_.initFwdStrategy( + portal_, prefix, state_.get(), + (interface::RtcTransportRecoveryStrategies)strategy); updateSyncWindow(); } @@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() { return; } + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE)) + production_rate = MIN_PROD_RATE_SHARING_MODE; + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); uint32_t lifetime = default_values::interest_lifetime; @@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() { double prod_rate = state_->getProducerRate(); double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = state_->getAveragePacketSize(); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE)) + prod_rate = MIN_PROD_RATE_SHARING_MODE; // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { @@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { sendInterest(*interest_name); } +void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + // we got a timeout for this packet so it is not pending anymore + interest_name->setSuffix(seq); + state_->onSendNewInterest(interest_name); + sendInterest(*interest_name); +} + void RTCTransportProtocol::scheduleNextInterests() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; @@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() { } // skip received packets - if (indexer_verifier_->checkNextSuffix() <= - state_->getHighestSeqReceivedInOrder()) { - indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); + uint32_t max_received = state_->getHighestSeqReceivedInOrder(); + if (indexer_verifier_->checkNextSuffix() <= max_received) { + indexer_verifier_->jumpToIndex(max_received + 1); } uint32_t sent_interests = 0; @@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() { << "In while loop. Window size: " << current_sync_win_; uint32_t next_seg = indexer_verifier_->getNextSuffix(); - name->setSuffix(next_seg); // send the packet only if: @@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, } timeouts_or_nacks_.insert(segment_number); - if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && segment_number <= state_->getHighestSeqReceived()) { // we retransmit packets only if the producer is active, otherwise we @@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() << ", jump to " << segment_number; // add an extra space in the window - current_sync_win_++; indexer_verifier_->jumpToIndex(segment_number); } state_->onTimeout(segment_number, false); + sendInterestForTimeout(segment_number); scheduleNextInterests(); } @@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); state_->onJumpForward(production_seg); - verifier_->onJumpForward(production_seg); // the client is asking for content in the past // switch to catch up state and increase the window // this is true only if the packet is not an RTX @@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived( // Check if the packet is a retransmission if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { if (is_data || is_manifest) { - state_->onPacketRecoveredRtx(segment_number); + uint64_t rtt = ldr_->getRtxRtt(segment_number); + state_->onPacketRecoveredRtx(content_object, rtt); if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived( } if (is_fec) { - state_->onFecPacketRecoveredRtx(segment_number); + state_->onFecPacketRecoveredRtx(content_object); } } @@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); stats_->updateLossRatio(state_->getPerSecondLossRate()); uint64_t rtt = state_->getAvgRTT(); - stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt)); + stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000)); stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); @@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object, DLOG_IF(INFO, VLOG_IS_ON(4)) << "Send packet " << content_object.getName() << " to FEC decoder"; - uint32_t offset = is_manifest - ? content_object.headerSize() - : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t offset = + is_manifest + ? (uint32_t)content_object.headerSize() + : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE); uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); fec_decoder_->onDataPacket(content_object, offset, metadata); @@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { processManifest(*interest, *content_object); } - state_->onPacketRecoveredFec(seq_number, buffer->length()); + state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length()); ldr_->onPacketRecoveredFec(seq_number); if (payload_type == PayloadType::DATA) { @@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest, ContentObject::Ptr RTCTransportProtocol::removeFecHeader( const ContentObject &content_object) { - if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) { return nullptr; } - size_t fec_header_size = fec_decoder_->getFecHeaderSize(); + size_t fec_header_size = fec_decoder_->getFecHeaderSize(false); const uint8_t *payload = content_object.data() + content_object.headerSize() + fec_header_size; size_t payload_size = content_object.payloadSize() - fec_header_size; diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h index 3763f33c7..a8a474216 100644 --- a/libtransport/src/protocols/rtc/rtc.h +++ b/libtransport/src/protocols/rtc/rtc.h @@ -44,7 +44,7 @@ class RTCTransportProtocol : public TransportProtocol { void resume() override; - std::size_t transportHeaderLength() override; + std::size_t transportHeaderLength(bool isFEC) override; auto shared_from_this() { return utils::shared_from(this); } @@ -69,6 +69,7 @@ class RTCTransportProtocol : public TransportProtocol { // packet functions void sendRtxInterest(uint32_t seq); void sendProbeInterest(uint32_t seq); + void sendInterestForTimeout(uint32_t seq); void scheduleNextInterests() override; void onInterestTimeout(Interest::Ptr &interest, const Name &name) override; void onNack(const ContentObject &content_object); diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h index 96e39d07e..29b5a3a12 100644 --- a/libtransport/src/protocols/rtc/rtc_consts.h +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -54,7 +54,7 @@ const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop // packet const -const uint32_t RTC_INTEREST_LIFETIME = 2000; +const uint32_t RTC_INTEREST_LIFETIME = 4000; // probes sequence range const uint32_t MIN_PROBE_SEQ = 0xefffffff; @@ -93,6 +93,7 @@ const double CATCH_UP_WIN_INCREMENT = 1.2; // used in rate control const double WIN_DECREASE_FACTOR = 0.5; const double WIN_INCREASE_FACTOR = 1.5; +const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes // round in congestion const double ROUNDS_BEFORE_TAKE_ACTION = 5; @@ -120,14 +121,14 @@ const uint64_t MAX_TIMER_RTX = ~0; const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets const double CATCH_UP_RTT_INCREMENT = 1.2; -const double MAX_RESIDUAL_LOSS_RATE = 2.0; // % -const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5; +const double MAX_RESIDUAL_LOSS_RATE = 1.0; // % +const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC; +const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms // used by producer const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window * // rounds in a second -const uint32_t NACK_DELAY = 1500; // ms const uint32_t FEC_PACING_TIME = 5; // ms // aggregated data consts @@ -139,6 +140,73 @@ const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms const uint32_t MAX_RTT = 200; // ms const double MAX_RESIDUAL_LOSSES = 0.05; // % +const uint8_t FEC_MATRIX[64][10] = { + {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1 + {1, 2, 3, 3, 4, 5, 5, 6, 7, 9}, + {2, 2, 3, 4, 5, 6, 7, 8, 9, 11}, + {2, 3, 4, 5, 5, 7, 8, 9, 11, 13}, + {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5 + {2, 3, 4, 6, 7, 8, 10, 12, 14, 16}, + {2, 4, 5, 6, 8, 9, 11, 13, 15, 18}, + {3, 4, 5, 7, 8, 10, 12, 14, 16, 19}, + {3, 4, 6, 7, 9, 11, 13, 15, 18, 21}, + {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10 + {3, 5, 6, 8, 10, 12, 14, 17, 20, 24}, + {3, 5, 7, 8, 10, 13, 15, 18, 21, 26}, + {3, 5, 7, 9, 11, 13, 16, 19, 23, 27}, + {3, 5, 7, 9, 12, 14, 17, 20, 24, 28}, + {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15 + {4, 6, 8, 10, 13, 15, 19, 22, 26, 31}, + {4, 6, 8, 11, 13, 16, 19, 23, 27, 33}, + {4, 6, 9, 11, 14, 17, 20, 24, 29, 34}, + {4, 6, 9, 11, 14, 17, 21, 25, 30, 35}, + {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20 + {4, 7, 9, 12, 15, 19, 22, 27, 32, 38}, + {4, 7, 10, 13, 16, 19, 23, 28, 33, 40}, + {5, 7, 10, 13, 16, 20, 24, 29, 34, 41}, + {5, 7, 10, 13, 17, 20, 25, 30, 35, 42}, + {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25 + {5, 8, 11, 14, 18, 22, 26, 31, 38, 45}, + {5, 8, 11, 15, 18, 22, 27, 32, 39, 46}, + {5, 8, 11, 15, 19, 23, 28, 33, 40, 48}, + {5, 8, 12, 15, 19, 24, 28, 34, 41, 49}, + {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30 + {5, 9, 12, 16, 20, 25, 30, 36, 43, 51}, + {5, 9, 13, 16, 21, 25, 31, 37, 44, 53}, + {6, 9, 13, 17, 21, 26, 31, 38, 45, 54}, + {6, 9, 13, 17, 22, 26, 32, 39, 46, 55}, + {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35 + {6, 10, 14, 18, 22, 28, 34, 40, 48, 58}, + {6, 10, 14, 18, 23, 28, 34, 41, 49, 59}, + {6, 10, 14, 19, 23, 29, 35, 42, 50, 60}, + {6, 10, 14, 19, 24, 29, 36, 43, 52, 62}, + {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40 + {6, 11, 15, 20, 25, 31, 37, 45, 54, 64}, + {6, 11, 15, 20, 25, 31, 38, 46, 55, 65}, + {7, 11, 15, 20, 26, 32, 39, 46, 56, 67}, + {7, 11, 16, 21, 26, 32, 39, 47, 57, 68}, + {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45 + {7, 11, 16, 21, 27, 33, 41, 49, 59, 70}, + {7, 12, 16, 22, 27, 34, 41, 50, 60, 72}, + {7, 12, 17, 22, 28, 34, 42, 51, 61, 73}, + {7, 12, 17, 22, 28, 35, 43, 52, 62, 74}, + {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50 + {7, 12, 17, 23, 29, 36, 44, 53, 64, 77}, + {7, 12, 18, 23, 30, 37, 45, 54, 65, 78}, + {7, 13, 18, 24, 30, 37, 45, 55, 66, 79}, + {8, 13, 18, 24, 31, 38, 46, 56, 67, 80}, + {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55 + {8, 13, 19, 25, 31, 39, 47, 57, 69, 83}, + {8, 13, 19, 25, 32, 39, 48, 58, 70, 84}, + {8, 13, 19, 25, 32, 40, 49, 59, 71, 85}, + {8, 14, 19, 26, 33, 41, 50, 60, 72, 86}, + {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60 + {8, 14, 20, 26, 34, 42, 51, 61, 74, 89}, + {8, 14, 20, 27, 34, 42, 52, 62, 75, 90}, + {8, 14, 20, 27, 34, 43, 52, 63, 76, 91}, + {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64 +}; + } // namespace rtc } // namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc index b3abf5ea8..a421396b1 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.cc +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -91,6 +91,8 @@ void RTCDataPath::insertRttSample( rtt_samples_ = 0; last_avg_rtt_compute_ = now; } + + received_packets_++; } void RTCDataPath::insertOwdSample(int64_t owd) { @@ -115,10 +117,6 @@ void RTCDataPath::insertOwdSample(int64_t owd) { int64_t diff = std::abs(owd - last_owd_); last_owd_ = owd; jitter_ += (1.0 / 16.0) * ((double)diff - jitter_); - - // owd is computed only for valid data packets so we count only - // this for decide if we recevie traffic or not - received_packets_++; } void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { @@ -150,12 +148,17 @@ double RTCDataPath::getInterArrivalGap() { return avg_inter_arrival_; } -bool RTCDataPath::isActive() { +bool RTCDataPath::isValidProducer() { if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true; return false; } +bool RTCDataPath::isActive() { + if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true; + return false; +} + bool RTCDataPath::pathToProducer() { if (received_nacks_) return true; return false; diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h index 5afbbb87f..ba5201fe8 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.h +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -49,8 +49,9 @@ class RTCDataPath { double getQueuingDealy(); double getInterArrivalGap(); double getJitter(); - bool isActive(); - bool pathToProducer(); + bool isActive(); // pakets recevied from this path in the last rounds + bool pathToProducer(); // path from a producer + bool isValidProducer(); // path from a producer that is also active uint64_t getLastPacketTS(); uint32_t getPacketsLastRound(); diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc index c6bc751e6..4bbd7eac0 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc @@ -14,6 +14,7 @@ */ #include <hicn/transport/interfaces/notification.h> +#include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_forwarding_strategy.h> namespace transport { @@ -24,8 +25,13 @@ namespace rtc { using namespace transport::interface; +const double FWD_MAX_QUEUE = 30.0; // ms +const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms +const double FWD_MAX_LOSS_RATE = 0.1; + RTCForwardingStrategy::RTCForwardingStrategy() - : init_(false), + : low_rate_app_(false), + init_(false), forwarder_set_(false), selected_strategy_(NONE), current_strategy_(NONE), @@ -42,17 +48,56 @@ void RTCForwardingStrategy::setCallback( void RTCForwardingStrategy::initFwdStrategy( std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, - strategy_t strategy) { - init_ = true; - selected_strategy_ = strategy; - if (strategy == BOTH) - current_strategy_ = BEST_PATH; - else - current_strategy_ = strategy; - rounds_since_last_set_ = 0; - prefix_ = prefix; - portal_ = portal; - state_ = state; + interface::RtcTransportRecoveryStrategies strategy) { + switch (strategy) { + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF: + case interface::RtcTransportRecoveryStrategies::RTX_ONLY: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY: + case interface::RtcTransportRecoveryStrategies::DELAY_BASED: + case interface::RtcTransportRecoveryStrategies::LOW_RATE: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES: + default: + // fwd strategies are not used + init_ = false; + } + + if (init_) { + rounds_since_last_set_ = 0; + prefix_ = prefix; + portal_ = portal; + state_ = state; + } } void RTCForwardingStrategy::checkStrategy() { @@ -99,16 +144,35 @@ void RTCForwardingStrategy::checkStrategyBestPath() { return; } - uint8_t qs = state_->getQualityScore(); + if (low_rate_app_) { + // this is used for gaming + uint8_t qs = state_->getQualityScore(); - if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec - // between each switch - rounds_since_last_set_++; - return; - } + if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } - // try to switch path - setStrategy(BEST_PATH); + // try to switch path + setStrategy(BEST_PATH); + } else { + if (rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } + + double queue = state_->getQueuing(); + double rtt = state_->getAvgRTT(); + double loss_rate = state_->getPerSecondLossRate(); + + if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT || + loss_rate > FWD_MAX_LOSS_RATE) { + // try to switch path + setStrategy(BEST_PATH); + } + } } void RTCForwardingStrategy::checkStrategyReplication() { @@ -133,7 +197,7 @@ void RTCForwardingStrategy::checkStrategyBoth() { // TODO // for the moment we use only best path. - // but later: + // for later: // 1. if both paths are bad use replication // 2. while using replication compute the effectiveness. if the majority of // the packets are coming from a single path, try to use bestpath diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h index 9825877fd..c2227e09f 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h @@ -41,7 +41,7 @@ class RTCForwardingStrategy { void initFwdStrategy(std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, - strategy_t strategy); + interface::RtcTransportRecoveryStrategies strategy); void checkStrategy(); void setCallback(interface::StrategyCallback&& callback); @@ -56,6 +56,10 @@ class RTCForwardingStrategy { std::array<std::string, 4> string_strategies_ = {"bestpath", "replication", "both", "none"}; + bool low_rate_app_; // if set to true the best path strategy will + // trigger a path switch based on the quality + // score, otherwise it will use the RTT, + // queuing delay and loss rate bool init_; // true if all val are initializes bool forwarder_set_; // true if the strategy is been set at least // once diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index abf6cda2c..6e88a8636 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -37,16 +37,24 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( interface::RtcTransportRecoveryStrategies type, RecoveryStrategy::SendRtxCallback &&callback, interface::StrategyCallback &&external_callback) { - rs_type_ = type; if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { rs_ = std::make_shared<RecoveryStrategyRecoveryOff>( - indexer, std::move(callback), io_service, std::move(external_callback)); - } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { rs_ = std::make_shared<RecoveryStrategyDelayBased>( - indexer, std::move(callback), io_service, std::move(external_callback)); - } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { rs_ = std::make_shared<RecoveryStrategyFecOnly>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_BESTPATH || @@ -55,12 +63,14 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_ALL_FWD_STRATEGIES) { rs_ = std::make_shared<RecoveryStrategyLowRate>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } else { // default - rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY; + type = interface::RtcTransportRecoveryStrategies::RTX_ONLY; rs_ = std::make_shared<RecoveryStrategyRtxOnly>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } } @@ -68,15 +78,21 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} void RTCLossDetectionAndRecovery::changeRecoveryStrategy( interface::RtcTransportRecoveryStrategies type) { - if (type == rs_type_) return; + if (type == rs_->getType()) return; - rs_type_ = type; + rs_->updateType(type); if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get()))); - } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get()))); - } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get()))); } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || type == interface::RtcTransportRecoveryStrategies:: @@ -116,14 +132,15 @@ bool RTCLossDetectionAndRecovery::onDataPacketReceived( uint32_t seq = content_object.getName().getSuffix(); bool is_rtx = rs_->isRtx(seq); rs_->receivedPacket(seq); + bool ret = false; DLOG_IF(INFO, VLOG_IS_ON(3)) << "received data. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq; + << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq; if (!is_rtx) - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq, - false); + ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false); - return false; + rs_->getState()->updateHighestSeqReceived(seq); + return ret; } bool RTCLossDetectionAndRecovery::onNackPacketReceived( @@ -141,10 +158,9 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived( // may got lost and we should ask them rs_->receivedPacket(seq); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "received nack. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " - << production_seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; // if it is a future nack store it in the list set of nacked seq if (production_seq <= seq) rs_->receivedFutureNack(seq); @@ -152,7 +168,7 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived( // call the detectLoss function using the probe flag = true. in fact the // losses detected using nacks are the same as the one detected using probes, // we should not increase the loss counter - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, production_seq, true); } @@ -164,12 +180,11 @@ bool RTCLossDetectionAndRecovery::onProbePacketReceived( uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "received probe. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " - << production_seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, production_seq, true); } @@ -183,8 +198,8 @@ bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop, } // skip received or lost packets - if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) { - start = rs_->getState()->getHighestSeqReceivedInOrder() + 1; + if (start <= rs_->getState()->getHighestSeqReceived()) { + start = rs_->getState()->getHighestSeqReceived() + 1; } bool loss_detected = false; diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h index 7f683eaa6..24f22ffed 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.h +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -47,6 +47,7 @@ class RTCLossDetectionAndRecovery void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); } + void setContentSharingMode() { rs_->setContentSharingMode(); } void turnOnRecovery() { rs_->turnOnRecovery(); } bool isRtxOn() { return rs_->isRtxOn(); } @@ -68,11 +69,12 @@ class RTCLossDetectionAndRecovery return rs_->isPossibleLossWithNoRtx(seq); } + uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); } + private: // returns true if a loss is detected, false otherwise bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe); - interface::RtcTransportRecoveryStrategies rs_type_; std::shared_ptr<RecoveryStrategy> rs_; }; diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h index 391aedfc6..ffbbd78fd 100644 --- a/libtransport/src/protocols/rtc/rtc_packet.h +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -52,6 +52,8 @@ #include <hicn/transport/portability/win_portability.h> #endif +#include <hicn/transport/portability/endianess.h> + #include <cstring> namespace transport { @@ -60,24 +62,6 @@ namespace protocol { namespace rtc { -inline uint64_t _ntohll(const uint64_t *input) { - uint64_t return_val; - uint8_t *tmp = (uint8_t *)&return_val; - - tmp[0] = (uint8_t)(*input >> 56); - tmp[1] = (uint8_t)(*input >> 48); - tmp[2] = (uint8_t)(*input >> 40); - tmp[3] = (uint8_t)(*input >> 32); - tmp[4] = (uint8_t)(*input >> 24); - tmp[5] = (uint8_t)(*input >> 16); - tmp[6] = (uint8_t)(*input >> 8); - tmp[7] = (uint8_t)(*input >> 0); - - return return_val; -} - -inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } - const uint32_t DATA_HEADER_SIZE = 12; // bytes // XXX: sizeof(data_packet_t) is 16 // beacuse of padding @@ -87,11 +71,19 @@ struct data_packet_t { uint64_t timestamp; uint32_t prod_rate; - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } }; struct nack_packet_t { @@ -99,14 +91,26 @@ struct nack_packet_t { uint32_t prod_rate; uint32_t prod_seg; - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } - inline uint32_t getProductionSegment() const { return ntohl(prod_seg); } - inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); } + inline uint32_t getProductionSegment() const { + return portability::net_to_host(prod_seg); + } + inline void setProductionSegment(uint32_t seg) { + prod_seg = portability::host_to_net(seg); + } }; class AggrPktHeader { @@ -225,7 +229,7 @@ class AggrPktHeader { return (uint16_t) * (buf_ + pkt_index); } else { // 16 bits uint16_t *buf_16 = (uint16_t *)buf_; - return ntohs(*(buf_16 + pkt_index)); + return portability::net_to_host(*(buf_16 + pkt_index)); } } @@ -235,7 +239,7 @@ class AggrPktHeader { *(buf_ + pkt_index) = (uint8_t)len; } else { // 16 bits uint16_t *buf_16 = (uint16_t *)buf_; - *(buf_16 + pkt_index) = htons(len); + *(buf_16 + pkt_index) = portability::host_to_net(len); } } diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc index 992bab50e..b1b0fcaba 100644 --- a/libtransport/src/protocols/rtc/rtc_reassembly.cc +++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc @@ -40,7 +40,7 @@ void RtcReassembly::reassemble(core::ContentObject& content_object) { auto read_buffer = content_object.getPayload(); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length(); - read_buffer->trimStart(transport_protocol_->transportHeaderLength()); + read_buffer->trimStart(transport_protocol_->transportHeaderLength(false)); if (data_aggregation_) { rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data()); diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc index 66ae5086c..257fdd09b 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -29,8 +29,12 @@ using namespace transport::interface; RecoveryStrategy::RecoveryStrategy( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback) - : recovery_on_(false), + bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : rs_type_(rs_type), + recovery_on_(false), + content_sharing_mode_(false), rtx_during_fec_(0), next_rtx_timer_(MAX_TIMER_RTX), send_rtx_callback_(std::move(callback)), @@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy( } RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) - : rtx_during_fec_(0), + : rs_type_(rs.rs_type_), + content_sharing_mode_(rs.content_sharing_mode_), + rtx_during_fec_(0), rtx_state_(std::move(rs.rtx_state_)), rtx_timers_(std::move(rs.rtx_timers_)), recover_with_fec_(std::move(rs.recover_with_fec_)), @@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) RecoveryStrategy::~RecoveryStrategy() {} void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { + // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64 n_ = n; k_ = k; // XXX for the moment we go in steps of 5% loss rate. - // max loss rate = 95% + uint32_t i = 0; for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)(loss_rate + 5) / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - fec_state_ f; - f.fec_to_ask = std::min(fec_to_ask, (n_ - k_)); - f.last_update = round_id_; - f.avg_residual_losses = 0.0; - f.consecutive_use = 0; - fec_per_loss_rate_.push_back(f); + uint32_t fec_to_ask = 0; + if (n_ != 0 && k_ != 0) { + if (rs_type_ == + interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) { + // the max loss rate in the matrix is 50% + uint32_t index = i; + if (i > 9) index = 9; + fec_to_ask = FEC_MATRIX[k_ - 1][index]; + } else { + double dec_loss_rate = (double)(loss_rate + 5); + if (dec_loss_rate == 100.0) dec_loss_rate = 95.0; + dec_loss_rate = dec_loss_rate / 100.0; + double exp_losses = ceil((double)k_ * dec_loss_rate); + fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25); + } + } + fec_to_ask = std::min(fec_to_ask, (n_ - k_)); + fec_per_loss_rate_.push_back(fec_to_ask); + + i++; } } +uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) { + auto it = rtx_state_.find(seq); + + if (it == rtx_state_.end()) return 0; + + // we can compute the RTT of an RTX only if it was send once. Infact if the + // RTX was sent twice or more the data may be alredy in flight and the RTT + // will be underestimated. This may happen also for packets that we + // retransmitted too soon. in that case the RTT will be filtered out by + // checking the path label + if (it->second.rtx_count_ != 1) return 0; + + // this a potentialy valid packet, compute the RTT + return (utils::SteadyTime::nowMs().count() - it->second.last_send_); +} + bool RecoveryStrategy::lossDetected(uint32_t seq) { if (isRtx(seq)) { // this packet is already in the list of rtx @@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { state.first_send_ = state_->getInterestSentTime(seq); if (state.first_send_ == 0) // this interest was never sent before state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); + state.last_send_ = state.first_send_; // we didn't send an RTX for this + // packet yet state.rtx_count_ = 0; + state.next_send_ = computeNextSend(seq, state.rtx_count_); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Add " << seq << " to retransmissions. next rtx is in " << state.next_send_ - getNow() << " ms"; @@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { } } -uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) { +uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) { uint64_t now = getNow(); - if (new_rtx) { - // for the new rtx we wait one estimated IAT after the loss detection. this - // is bacause, assuming that packets arrive with a constant IAT, we should - // get a new packet every IAT - double prod_rate = state_->getProducerRate(); - uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; - uint32_t jitter = 0; + if (rtx_counter == 0) { + uint32_t wait = 1; + if (content_sharing_mode_) return now + wait; - if (prod_rate != 0) { - double packet_size = state_->getAveragePacketSize(); - estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - jitter = ceil(state_->getJitter()); - } + uint32_t jitter = SENTINEL_TIMER_INTERVAL; + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) jitter = ceil(state_->getJitter()); - uint32_t wait = 1; - if (estimated_iat < 18) { - // for low rate app we do not wait to send a RTX - // we consider low rate stream with less than 50pps (iat >= 20ms) - // (e.g. audio in videoconf, mobile games). - // in the check we use 18ms to accomodate for measurements errors - // for flows with higher rate wait 1 ait + jitter - wait = estimated_iat + jitter; - } + wait += jitter; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "first rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait + << " ms, jitter = " << jitter; return now + wait; } else { - // wait one RTT - uint32_t wait = SENTINEL_TIMER_INTERVAL; - + // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx double prod_rate = state_->getProducerRate(); if (prod_rate == 0) { return now + SENTINEL_TIMER_INTERVAL; } - double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint64_t rtt = 0; + // if the transport detects an edge we try first to get the RTX from the + // edge. if no interest get a reply we move to the full RTT + if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) { + rtt = state_->getEdgeRtt(); + } else { + rtt = state_->getAvgRTT(); + } - uint64_t rtt = state_->getMinRTT(); if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = rtt; + + if (content_sharing_mode_) return now + rtt; + + uint32_t wait = (uint32_t)rtt; uint32_t jitter = ceil(state_->getJitter()); wait += jitter; - // it may happen that the channel is congested and we have some additional - // queuing delay to take into account - uint32_t queue = ceil(state_->getQueuing()); - wait += queue; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "next rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter << " queue = " << queue; + << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt + << " jtter = " << jitter; return now + wait; } @@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() { state_->onRetransmission(seq); double prod_rate = state_->getProducerRate(); if (prod_rate != 0) rtx_it->second.rtx_count_++; - rtx_it->second.next_send_ = computeNextSend(seq, false); + rtx_it->second.last_send_ = now; + rtx_it->second.next_send_ = + computeNextSend(seq, rtx_it->second.rtx_count_); it = rtx_timers_.erase(it); rtx_timers_.insert( std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); @@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) { } it_timers++; } + // remove rtx rtx_state_.erase(it_rtx); } @@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() { if (loss_rate == 0) return 0; - // once per minute try to reduce the fec rate. it may happen that for some bin - // we ask too many fec packet. here we try to reduce this values gently - if (round_id_ % ROUNDS_PER_MIN == 0) { - reduceFec(); - } - // keep track of the last used fec. if we use a new bin on this round reset // consecutive use and avg loss in the prev bin uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1; + if (bin > fec_per_loss_rate_.size() - 1) + bin = (uint32_t)fec_per_loss_rate_.size() - 1; - if (bin != last_fec_used_) { - fec_per_loss_rate_[last_fec_used_].consecutive_use = 0; - fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0; - } - last_fec_used_ = bin; - fec_per_loss_rate_[last_fec_used_].consecutive_use++; - - // we update the stats only once very 5 rounds (1sec) that is the rate at - // which we compute residual losses - if (round_id_ % ROUNDS_PER_SEC == 0) { - double residual_losses = state_->getResidualLossRate() * 100; - // update residual loss rate - fec_per_loss_rate_[bin].avg_residual_losses = - (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) + - (1 - MOVING_AVG_ALPHA) * residual_losses; - - if ((fec_per_loss_rate_[bin].last_update - round_id_) < - WAIT_BEFORE_FEC_UPDATE) { - // this bin is been updated recently so don't modify it and - // return the current state - return fec_per_loss_rate_[bin].fec_to_ask; - } - - // if the residual loss rate is too high and we can ask more fec packets and - // we are using this configuration since at least 5 sec update fec - if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE && - fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) && - fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) { - // so increase the number of fec packets to ask - fec_per_loss_rate_[bin].fec_to_ask++; - fec_per_loss_rate_[bin].last_update = round_id_; - fec_per_loss_rate_[bin].avg_residual_losses = 0.0; - } - } - - return fec_per_loss_rate_[bin].fec_to_ask; + return fec_per_loss_rate_[bin]; } void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, @@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) { deleteRtx(seq); } -// private methods - -void RecoveryStrategy::reduceFec() { - for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)loss_rate / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) { - fec_per_loss_rate_[bin].fec_to_ask--; - } - } -} - } // end namespace rtc } // end namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h index 482aedc9d..aceb85888 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h @@ -32,9 +32,10 @@ namespace rtc { class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { protected: struct rtx_state_ { - uint64_t first_send_; - uint64_t next_send_; - uint32_t rtx_count_; + uint64_t first_send_; // first time this interest was sent + uint64_t last_send_; // last time this rtx was sent + uint64_t next_send_; // next retransmission time + uint32_t rtx_count_; // number or rtx }; using rtxState = struct rtx_state_; @@ -44,6 +45,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategy(RecoveryStrategy &&rs); @@ -55,6 +57,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { void setState(RTCState *state) { state_ = state; } void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; } void setFecParams(uint32_t n, uint32_t k); + void setContentSharingMode() { content_sharing_mode_ = true; } bool isRtx(uint32_t seq) { if (rtx_state_.find(seq) != rtx_state_.end()) return true; @@ -71,10 +74,20 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { return false; } + interface::RtcTransportRecoveryStrategies getType() { + return rs_type_; + } + void updateType(interface::RtcTransportRecoveryStrategies type) { + rs_type_ = type; + } bool isRtxOn() { return rtx_on_; } bool isFecOn() { return fec_on_; } RTCState *getState() { return state_; } + + // if the function returns 0 it means that the packet is not an RTX or it is + // not a valid packet to safely compute the RTT + uint64_t getRtxRtt(uint32_t seq); bool lossDetected(uint32_t seq); void notifyNewLossDetedcted(uint32_t seq); void requestPossibleLostPacket(uint32_t seq); @@ -98,7 +111,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { protected: // rtx functions void addNewRtx(uint32_t seq, bool force); - uint64_t computeNextSend(uint32_t seq, bool new_rtx); + uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter); void retransmit(); void scheduleNextRtx(); void deleteRtx(uint32_t seq); @@ -109,9 +122,11 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { // common functons void removePacketState(uint32_t seq); + interface::RtcTransportRecoveryStrategies rs_type_; bool recovery_on_; bool rtx_on_; bool fec_on_; + bool content_sharing_mode_; // number of RTX sent after fec turned on // this is used to take into account jitter and out of order packets @@ -152,19 +167,9 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { RTCRateControl *rc_; private: - struct fec_state_ { - uint32_t fec_to_ask; - uint32_t last_update; // round id of the last update - // (wait 10 ruonds (2sec) between updates) - uint32_t consecutive_use; // consecutive ruonds where this fec was used - double avg_residual_losses; - }; - - void reduceFec(); - uint32_t round_id_; // number of rounds uint32_t last_fec_used_; - std::vector<fec_state_> fec_per_loss_rate_; + std::vector<uint32_t> fec_per_loss_rate_; interface::StrategyCallback callback_; }; diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc index 4be751ec9..7d7a01133 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc @@ -25,8 +25,10 @@ namespace rtc { RecoveryStrategyDelayBased::RecoveryStrategyDelayBased( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + rs_type, std::move(external_callback)), // start with rtx congestion_state_(false), probing_state_(false), @@ -48,7 +50,7 @@ void RecoveryStrategyDelayBased::turnOnRecovery() { recovery_on_ = true; uint64_t rtt = state_->getMinRTT(); uint32_t fec_to_ask = computeFecPacketsToAsk(); - if (rtt > 80 && fec_to_ask != 0) { + if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) { // we need to start FEC (see fec only strategy for more details) setRtxFec(true, true); rtx_during_fec_ = 1; // avoid to stop fec @@ -84,16 +86,16 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { return; } - uint64_t rtt = state_->getMinRTT(); + uint64_t rtt = state_->getAvgRTT(); - bool congestion = false; // XXX at the moment we are not looking at congestion events - // congestion = rc_->inCongestionState(); + // bool congestion = rc_->inCongestionState(); - if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) { + if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) || + (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) { // switch from rtx to fec or keep use fec. Notice that if some rtx are // waiting to be scheduled, they will be sent normally, but no new rtx will - // be created If the loss rate is 0 keep to use RTX. + // be created if the loss rate is 0 keep to use RTX. uint32_t fec_to_ask = computeFecPacketsToAsk(); softSwitchToFec(fec_to_ask); if (rtx_during_fec_ == 0) // if we do not send any RTX the losses @@ -104,7 +106,8 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { return; } - if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { + if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) || + (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) { // turn on rtx softSwitchToFec(0); indexer_->setNFec(0); diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h index 5ca90f4cb..9e1c41388 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.h +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h @@ -26,6 +26,7 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy { public: RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyDelayBased(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc index c44212bda..5b10823ec 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyFecOnly::RecoveryStrategyFecOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - std::move(external_callback)), + rs_type, std::move(external_callback)), congestion_state_(false), probing_state_(false), switch_rounds_(0) {} diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h index 1ab78b842..42df25bd9 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h @@ -26,6 +26,7 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy { public: RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyFecOnly(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc index 48dd3e34f..dbad563cd 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc @@ -25,8 +25,10 @@ namespace rtc { RecoveryStrategyLowRate::RecoveryStrategyLowRate( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, + rs_type, std::move(external_callback)), // start with fec fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec rtx_allowed_consecutive_rounds_(0) { @@ -75,7 +77,7 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { } uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100); - uint32_t rtt = state_->getAvgRTT(); + uint32_t rtt = (uint32_t)state_->getAvgRTT(); bool use_rtx = false; for (size_t i = 0; i < switch_vector.size(); i++) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h index d66b197e2..0e76efaca 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h @@ -34,6 +34,7 @@ class RecoveryStrategyLowRate : public RecoveryStrategy { public: RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyLowRate(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc index 16b14eff6..00c6a0504 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, false, - std::move(external_callback)) {} + rs_type, std::move(external_callback)) {} RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h index 3a9e71e7d..3d59cc473 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h @@ -26,6 +26,7 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy { public: RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc index 8e5db5439..4d7cf7a82 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - std::move(external_callback)) {} + rs_type, std::move(external_callback)) {} RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h index e90e5ba13..03dbed1c7 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h @@ -26,6 +26,7 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy { public: RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyRtxOnly(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 5b3b5e4c3..82ac0b9c1 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -106,6 +106,7 @@ void RTCState::initParams() { // paths stats path_table_.clear(); main_path_ = nullptr; + edge_path_ = nullptr; // packet cache (not pending anymore) packet_cache_.clear(); @@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, } updatePacketSize(content_object); - updateReceivedBytes(content_object); + updateReceivedBytes(content_object, false); addRecvOrLost(seq, PacketState::RECEIVED); - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; @@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); - // updateReceivedBytes(content_object); - received_fec_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); - - if (seq > highest_seq_received_) highest_seq_received_ = seq; + updateReceivedBytes(content_object, true); PacketState state = getPacketState(seq); if (state != PacketState::LOST) { @@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } } + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } -void RTCState::onPacketRecoveredRtx(uint32_t seq) { +void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt) { + uint32_t seq = content_object.getName().getSuffix(); packets_sent_to_app_++; - if (seq > highest_seq_received_) highest_seq_received_ = seq; // increase the recovered packet counter only if the packet was marked as LOST // before. @@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) { if (state == PacketState::LOST) losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); + updateReceivedBytes(content_object, false); + + if (rtt == 0) return; // nothing to do + + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + if (path_it == path_table_.end()) { + // this is a new path and it must be a cache + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + if (path->pathToProducer()) + return; // this packet is coming from a producer + // even if we sent an RTX. this may happen + // for RTX that are sent too fast or in + // case of multipath + + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); } -void RTCState::onFecPacketRecoveredRtx(uint32_t seq) { +void RTCState::onFecPacketRecoveredRtx( + const core::ContentObject &content_object) { // This is the same as onPacketRecoveredRtx, but in this is case the // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards - if (seq > highest_seq_received_) highest_seq_received_ = seq; losses_recovered_++; + updateReceivedBytes(content_object, true); } void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { @@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { packets_sent_to_app_++; recovered_bytes_with_fec_ += size; - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where @@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // channel losses uint32_t last_round_packets = 0; + uint64_t min_edge_rtt = UINT_MAX; std::shared_ptr<RTCDataPath> old_main_path = main_path_; main_path_ = nullptr; + edge_path_ = nullptr; for (auto it = path_table_.begin(); it != path_table_.end(); it++) { - if (it->second->isActive()) { + if (it->second->isValidProducer()) { uint32_t pkt = it->second->getPacketsLastRound(); if (pkt > last_round_packets) { last_round_packets = pkt; main_path_ = it->second; } + } else if (it->second->isActive() && !it->second->pathToProducer()) { + // this is a path to a cache from where we are receiving content + if (it->second->getMinRtt() < min_edge_rtt) { + min_edge_rtt = it->second->getMinRtt(); + edge_path_ = it->second; + } } it->second->roundEnd(); } if (main_path_ == nullptr) main_path_ = old_main_path; + if (edge_path_ == nullptr) edge_path_ = main_path_; + if (edge_path_->getMinRtt() >= main_path_->getMinRtt()) + edge_path_ = main_path_; // in case we get a new main path we reset the stats of the old one. this is // beacuse, in case we need to switch back we don't what to take decisions on @@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) { rounds_++; } -void RTCState::updateReceivedBytes(const core::ContentObject &content_object) { - received_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); +void RTCState::updateReceivedBytes(const core::ContentObject &content_object, + bool isFec) { + if (isFec) { + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } else { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } } void RTCState::updatePacketSize(const core::ContentObject &content_object) { @@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) { addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } +void RTCState::updateHighestSeqReceived(uint32_t seq) { + if (seq > highest_seq_received_) highest_seq_received_ = seq; +} + void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { @@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { switch (ProbeHandler::getProbeType(seq)) { case ProbeType::INIT: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(probe)); + const_cast<core::ContentObject &>(probe).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; @@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { } case core::PayloadType::MANIFEST: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(data)); + const_cast<core::ContentObject &>(data).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index 4bd2f76a0..ac3cc621f 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -84,8 +84,9 @@ class RTCState : public std::enable_shared_from_this<RTCState> { void onNackPacketReceived(const core::ContentObject &nack, bool compute_stats); void onPacketLost(uint32_t seq); - void onPacketRecoveredRtx(uint32_t seq); - void onFecPacketRecoveredRtx(uint32_t seq); + void onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt); + void onFecPacketRecoveredRtx(const core::ContentObject &content_object); void onPacketRecoveredFec(uint32_t seq, uint32_t size); bool onProbePacketReceived(const core::ContentObject &probe); void onJumpForward(uint32_t next_seq); @@ -117,6 +118,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> { return 0; } + uint64_t getEdgeRtt() const { + if (edge_path_ != nullptr) return edge_path_->getMinRtt(); + return 0; + } + void resetRttStats() { if (mainPathIsValid()) main_path_->clearRtt(); } @@ -149,7 +155,7 @@ class RTCState : public std::enable_shared_from_this<RTCState> { } uint32_t getPendingInterestNumber() const { - return pending_interests_.size(); + return (uint32_t)pending_interests_.size(); } PacketState getPacketState(uint32_t seq) { @@ -242,6 +248,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // set it as TO_BE_RECEIVED. void dataToBeReceived(uint32_t seq); + void updateHighestSeqReceived(uint32_t seq); + // Extract RTC parameters from probes (init or RTT probes) and data packets. static core::ParamsRTC getProbeParams(const core::ContentObject &probe); static core::ParamsRTC getDataParams(const core::ContentObject &data); @@ -259,7 +267,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // update stats void updateState(); - void updateReceivedBytes(const core::ContentObject &content_object); + void updateReceivedBytes(const core::ContentObject &content_object, + bool isFec); void updatePacketSize(const core::ContentObject &content_object); void updatePathStats(const core::ContentObject &content_object, bool is_nack); void updateLossRate(bool in_sycn); @@ -360,7 +369,12 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // paths stats std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_; - std::shared_ptr<RTCDataPath> main_path_; + std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects + // the consumer to the producer. in + // case of multipath the trasnport + // uses the most active path + std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it + // exists // packet received // cache where to store info about the last MAX_CACHED_PACKETS diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc index 7b6330a1f..861ceee89 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.cc +++ b/libtransport/src/protocols/rtc/rtc_verifier.cc @@ -22,11 +22,11 @@ namespace protocol { namespace rtc { RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_interval, - double max_unverified_ratio) + uint32_t factor_relevant, uint32_t factor_alert) : verifier_(verifier), - max_unverified_interval_(max_unverified_interval), - max_unverified_ratio_(max_unverified_ratio) {} + factor_relevant_(factor_relevant), + factor_alert_(factor_alert), + manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {} void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) { rtc_state_ = rtc_state; @@ -36,12 +36,16 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) { verifier_ = verifier; } -void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) { - max_unverified_interval_ = max_unverified_interval; +void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) { + factor_relevant_ = factor_relevant; } -void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) { - max_unverified_ratio_ = max_unverified_ratio; +void RTCVerifier::setFactorAlert(uint32_t factor_alert) { + factor_alert_ = factor_alert; +} + +auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) { + return verifier_->verifyPackets(&interest); } auth::VerificationPolicy RTCVerifier::verify( @@ -108,19 +112,27 @@ auth::VerificationPolicy RTCVerifier::verifyData( auth::Suffix suffix = content_object.getName().getSuffix(); auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - Timestamp now = utils::SteadyTime::nowMs().count(); - // Flush old packets - Timestamp oldest = flush_packets(now); + uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_; + uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_; - // Add packet to map of unverified packets - packets_unverif_.add( - {.suffix = suffix, .timestamp = now, .size = content_object.length()}, - content_object.computeDigest(manifest_hash_algo_)); + // Flush packets outside relevance window + for (auto it = packets_unverif_.set().begin(); + it != packets_unverif_.set().end();) { + if (it->first > current_index_ - threshold_relevant) { + break; + } + packets_unverif_erased_.insert((unsigned int)it->first); + it = packets_unverif_.remove(it); + } + + // Add packet to set of unverified packets + packets_unverif_.add({current_index_, suffix}, + content_object.computeDigest(manifest_hash_algo_)); + current_index_++; - // Check that the ratio of unverified packets stays below the limit - if (now - oldest < max_unverified_interval_ || - getBufferRatio() < max_unverified_ratio_) { + // Check that the number of unverified packets is below the alert threshold + if (packets_unverif_.set().size() <= threshold_alert) { policy = auth::VerificationPolicy::ACCEPT; } @@ -139,18 +151,13 @@ auth::VerificationPolicy RTCVerifier::processManifest( auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT; // Decode manifest - core::ContentObjectManifest manifest(content_object); + core::ContentObjectManifest manifest(content_object.shared_from_this()); manifest.decode(); - // Update last manifest - if (suffix > last_manifest_) { - last_manifest_ = suffix; - } - - // Extract hash algorithm and hashes + // Extract manifest data + manifest_max_capacity_ = manifest.getMaxCapacity(); manifest_hash_algo_ = manifest.getHashAlgorithm(); - auth::Verifier::SuffixMap suffix_map = - core::ContentObjectManifest::getSuffixMap(&manifest); + auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap(); // Return early if the manifest is empty if (suffix_map.empty()) { @@ -186,10 +193,7 @@ auth::VerificationPolicy RTCVerifier::processManifest( for (const auto &p : policies) { switch (p.second) { case auth::VerificationPolicy::ACCEPT: { - auto packet_unverif_it = packets_unverif_.packetIt(p.first); - Packet packet_verif = *packet_unverif_it; - packets_unverif_.remove(packet_unverif_it); - packets_verif_.add(packet_verif); + packets_unverif_.remove(packets_unverif_.packet(p.first)); manifest_digests_.erase(p.first); break; } @@ -209,69 +213,20 @@ void RTCVerifier::onDataRecoveredFec(uint32_t suffix) { manifest_digests_.erase(suffix); } -void RTCVerifier::onJumpForward(uint32_t next_suffix) { - if (next_suffix <= last_manifest_ + 1) { - return; - } - - // When we jump forward in the suffix sequence, we remove packets that won't - // be verified. Those packets have a suffix in the range [last_manifest_ + 1, - // next_suffix[. - for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix; - ++suffix) { - auto packet_it = packets_unverif_.packetIt(suffix); - if (packet_it != packets_unverif_.set().end()) { - packets_unverif_.remove(packet_it); - } - } -} - -double RTCVerifier::getBufferRatio() const { - size_t total = packets_verif_.size() + packets_unverif_.size(); - double total_unverified = static_cast<double>(packets_unverif_.size()); - return total ? total_unverified / total : 0.0; -} - -RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) { - Timestamp oldest_verified = packets_verif_.set().empty() - ? now - : packets_verif_.set().begin()->timestamp; - Timestamp oldest_unverified = packets_unverif_.set().empty() - ? now - : packets_unverif_.set().begin()->timestamp; - - // Prune verified packets older than the unverified interval - for (auto it = packets_verif_.set().begin(); - it != packets_verif_.set().end();) { - if (now - it->timestamp < max_unverified_interval_) { - break; - } - it = packets_verif_.remove(it); - } - - // Prune unverified packets older than the unverified interval - for (auto it = packets_unverif_.set().begin(); - it != packets_unverif_.set().end();) { - if (now - it->timestamp < max_unverified_interval_) { - break; - } - packets_unverif_erased_.insert(it->suffix); - it = packets_unverif_.remove(it); - } - - return std::min(oldest_verified, oldest_unverified); -} - std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add( - const Packet &packet) { + const Packet &packet, const auth::CryptoHash &digest) { auto inserted = packets_.insert(packet); - size_ += inserted.second ? packet.size : 0; + if (inserted.second) { + packets_map_[packet.second] = inserted.first; + suffix_map_[packet.second] = digest; + } return inserted; } RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove( PacketSet::iterator packet_it) { - size_ -= packet_it->size; + packets_map_.erase(packet_it->second); + suffix_map_.erase(packet_it->second); return packets_.erase(packet_it); } @@ -279,35 +234,13 @@ const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const { return packets_; }; -size_t RTCVerifier::Packets::size() const { return size_; }; - -std::pair<RTCVerifier::PacketSet::iterator, bool> -RTCVerifier::PacketsUnverif::add(const Packet &packet, - const auth::CryptoHash &digest) { - auto inserted = add(packet); - if (inserted.second) { - packets_map_[packet.suffix] = inserted.first; - digests_map_[packet.suffix] = digest; - } - return inserted; -} - -RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove( - PacketSet::iterator packet_it) { - size_ -= packet_it->size; - packets_map_.erase(packet_it->suffix); - digests_map_.erase(packet_it->suffix); - return packets_.erase(packet_it); -} - -RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt( +RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet( auth::Suffix suffix) { return packets_map_.at(suffix); }; -const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap() - const { - return digests_map_; +const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const { + return suffix_map_; } } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h index 098984057..c83faf08a 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.h +++ b/libtransport/src/protocols/rtc/rtc_verifier.h @@ -27,19 +27,16 @@ namespace rtc { class RTCVerifier { public: explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_interval, - double max_unverified_ratio); + uint32_t factor_relevant, uint32_t factor_alert); virtual ~RTCVerifier() = default; void setState(std::shared_ptr<RTCState> rtc_state); - void setVerifier(std::shared_ptr<auth::Verifier> verifier); + void setFactorRelevant(uint32_t factor_relevant); + void setFactorAlert(uint32_t factor_alert); - void setMaxUnverifiedInterval(uint32_t max_unverified_interval); - - void setMaxUnverifiedRatio(double max_unverified_ratio); - + auth::VerificationPolicy verify(core::Interest &interest); auth::VerificationPolicy verify(core::ContentObject &content_object, bool is_fec = false); auth::VerificationPolicy verifyProbe(core::ContentObject &content_object); @@ -51,81 +48,47 @@ class RTCVerifier { auth::VerificationPolicy processManifest(core::ContentObject &content_object); void onDataRecoveredFec(uint32_t suffix); - void onJumpForward(uint32_t next_suffix); - - double getBufferRatio() const; protected: - struct Packet; - using Timestamp = uint64_t; + using Index = uint64_t; + using Packet = std::pair<Index, auth::Suffix>; using PacketSet = std::set<Packet>; - struct Packet { - auth::Suffix suffix; - Timestamp timestamp; - size_t size; - - bool operator==(const Packet &b) const { - return timestamp == b.timestamp && suffix == b.suffix; - } - bool operator<(const Packet &b) const { - return timestamp == b.timestamp ? suffix < b.suffix - : timestamp < b.timestamp; - } - }; - class Packets { public: - virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet); - virtual PacketSet::iterator remove(PacketSet::iterator packet_it); - const PacketSet &set() const; - size_t size() const; - - protected: - PacketSet packets_; - size_t size_; - }; - - class PacketsVerif : public Packets {}; - - class PacketsUnverif : public Packets { - public: - using Packets::add; std::pair<PacketSet::iterator, bool> add(const Packet &packet, const auth::CryptoHash &digest); - PacketSet::iterator remove(PacketSet::iterator packet_it) override; - PacketSet::iterator packetIt(auth::Suffix suffix); + PacketSet::iterator remove(PacketSet::iterator packet_it); + const PacketSet &set() const; + PacketSet::iterator packet(auth::Suffix suffix); const auth::Verifier::SuffixMap &suffixMap() const; private: + PacketSet packets_; std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_; - auth::Verifier::SuffixMap digests_map_; + auth::Verifier::SuffixMap suffix_map_; }; // The RTC state. std::shared_ptr<RTCState> rtc_state_; // The verifier instance. std::shared_ptr<auth::Verifier> verifier_; - // Window to consider when verifying packets. - uint32_t max_unverified_interval_; - // Ratio of unverified packets over which an alert is triggered. - double max_unverified_ratio_; - // The suffix of the last processed manifest. - auth::Suffix last_manifest_; + // Used to compute the relevance windows size (in packets). + uint32_t factor_relevant_; + // Used to compute the alert threshold (in packets). + uint32_t factor_alert_; + // The maximum number of entries a manifest can contain. + uint8_t manifest_max_capacity_; // Hash algorithm used by manifests. auth::CryptoHashType manifest_hash_algo_; // Digests extracted from all manifests received. auth::Verifier::SuffixMap manifest_digests_; - // Verified packets with timestamp >= now - max_unverified_interval_. - PacketsVerif packets_verif_; - // Unverified packets with timestamp >= now - max_unverified_interval_. - PacketsUnverif packets_unverif_; - // Unverified erased packets with timestamp < now - max_unverified_interval_. + // The number of data packets processed. + Index current_index_; + // Unverified packets with index in relevance window. + Packets packets_unverif_; + // Unverified erased packets with index outside relevance window. std::unordered_set<auth::Suffix> packets_unverif_erased_; - - // Flushes all packets with timestamp < now - max_unverified_interval_. - // Returns the timestamp of the oldest packet, verified or not. - Timestamp flush_packets(Timestamp now); }; } // namespace rtc diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc index a73b9fb7b..b1803709b 100644 --- a/libtransport/src/protocols/transport_protocol.cc +++ b/libtransport/src/protocols/transport_protocol.cc @@ -79,6 +79,7 @@ int TransportProtocol::start() { &on_payload_); socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_); // Set it is the first time we schedule an interest is_first_ = true; @@ -143,14 +144,22 @@ void TransportProtocol::sendInterest( Packet::Format format; socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, format); + size_t signature_size = 0; - auto interest = - core::PacketManager<>::getInstance().getPacket<Interest>(format); + // If aggregated interest, add spapce for signature + if (len > 0) { + format = Packet::toAHFormat(format); + signature_size = signer_->getSignatureFieldSize(); + } + + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>( + format, signature_size); interest->setName(interest_name); for (uint32_t i = 0; i < len; i++) { interest->appendSuffix(additional_suffixes->at(i)); } + interest->encodeSuffixes(); uint32_t lifetime = default_values::interest_lifetime; socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, @@ -165,7 +174,16 @@ void TransportProtocol::sendInterest( return; } - portal_->sendInterest(std::move(interest)); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) lifetime = ceil((double)lifetime * 0.9); + + // Compute signature + bool is_ah = _is_ah(interest->getFormat()); + if (is_ah) signer_->signPacket(interest.get()); + + portal_->sendInterest(interest, lifetime); } void TransportProtocol::onError(const std::error_code &ec) { diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h index ad8cf0346..e71992561 100644 --- a/libtransport/src/protocols/transport_protocol.h +++ b/libtransport/src/protocols/transport_protocol.h @@ -64,7 +64,7 @@ class TransportProtocol * * @return The header length in bytes. */ - virtual std::size_t transportHeaderLength() { return 0; } + virtual std::size_t transportHeaderLength(bool isFEC) { return 0; } virtual void scheduleNextInterests() = 0; @@ -141,6 +141,9 @@ class TransportProtocol bool is_async_; fec::FECType fec_type_; + + // Signer for aggregated interests + std::shared_ptr<auth::Signer> signer_; }; } // end namespace protocol |