aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols')
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc9
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.h3
-rw-r--r--libtransport/src/protocols/cbr.cc2
-rw-r--r--libtransport/src/protocols/datagram_reassembly.cc5
-rw-r--r--libtransport/src/protocols/datagram_reassembly.h4
-rw-r--r--libtransport/src/protocols/fec/rely.cc2
-rw-r--r--libtransport/src/protocols/fec/rely.h25
-rw-r--r--libtransport/src/protocols/fec/rs.cc7
-rw-r--r--libtransport/src/protocols/fec/rs.h33
-rw-r--r--libtransport/src/protocols/fec_base.h4
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc31
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer_bytestream.h2
-rw-r--r--libtransport/src/protocols/prod_protocol_bytestream.cc72
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc426
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.h45
-rw-r--r--libtransport/src/protocols/production_protocol.cc4
-rw-r--r--libtransport/src/protocols/production_protocol.h3
-rw-r--r--libtransport/src/protocols/raaqm.cc8
-rw-r--r--libtransport/src/protocols/raaqm.h2
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc4
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h2
-rw-r--r--libtransport/src/protocols/rate_estimation.cc10
-rw-r--r--libtransport/src/protocols/rate_estimation.h8
-rw-r--r--libtransport/src/protocols/reassembly.h6
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc125
-rw-r--r--libtransport/src/protocols/rtc/rtc.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h76
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc13
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h5
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h6
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc73
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h4
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h64
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.cc2
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc197
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h35
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc17
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc4
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h1
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc80
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h24
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc157
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h81
-rw-r--r--libtransport/src/protocols/transport_protocol.cc24
-rw-r--r--libtransport/src/protocols/transport_protocol.h5
54 files changed, 887 insertions, 948 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index 3278595b7..b9eaf3bec 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -36,15 +36,6 @@ ByteStreamReassembly::ByteStreamReassembly(
index_(Indexer::invalid_index),
download_complete_(false) {}
-void ByteStreamReassembly::reassemble(
- std::unique_ptr<ContentObjectManifest> &&manifest) {
- if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) {
- received_packets_.emplace(
- std::make_pair(manifest->getName().getSuffix(), nullptr));
- assembleContent();
- }
-}
-
void ByteStreamReassembly::reassemble(ContentObject &content_object) {
if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) {
received_packets_.emplace(
diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h
index bfcac3181..a1f965d5c 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.h
+++ b/libtransport/src/protocols/byte_stream_reassembly.h
@@ -29,9 +29,6 @@ class ByteStreamReassembly : public Reassembly {
protected:
void reassemble(core::ContentObject &content_object) override;
- void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
-
void reassemble(utils::MemBuf &buffer, uint32_t suffix) override;
bool copyContent(core::ContentObject &content_object);
diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc
index 446ea8b99..e3f0f1336 100644
--- a/libtransport/src/protocols/cbr.cc
+++ b/libtransport/src/protocols/cbr.cc
@@ -38,7 +38,7 @@ void CbrTransportProtocol::afterContentReception(
const Interest &interest, const ContentObject &content_object) {
auto segment = content_object.getName().getSuffix();
auto now = utils::SteadyTime::Clock::now();
- auto rtt = utils::SteadyTime::getDurationMs(
+ auto rtt = utils::SteadyTime::getDurationUs(
interest_timepoints_[segment & mask], now);
// Update stats
updateStats(segment, rtt, now);
diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc
index 3a32c81f5..a04b0eecf 100644
--- a/libtransport/src/protocols/datagram_reassembly.cc
+++ b/libtransport/src/protocols/datagram_reassembly.cc
@@ -29,8 +29,9 @@ void DatagramReassembly::reassemble(core::ContentObject& content_object) {
auto read_buffer = content_object.getPayload();
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Size of payload: " << read_buffer->length() << ". Trimming "
- << transport_protocol_->transportHeaderLength();
- read_buffer->trimStart(transport_protocol_->transportHeaderLength());
+ << transport_protocol_->transportHeaderLength(false);
+ // here we have only src data packet
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength(false));
Reassembly::read_buffer_ = std::move(read_buffer);
Reassembly::notifyApplication();
}
diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h
index 0def32dd2..cefdca93b 100644
--- a/libtransport/src/protocols/datagram_reassembly.h
+++ b/libtransport/src/protocols/datagram_reassembly.h
@@ -29,10 +29,6 @@ class DatagramReassembly : public Reassembly {
virtual void reassemble(core::ContentObject &content_object) override;
void reassemble(utils::MemBuf &buffer, uint32_t suffix) override;
virtual void reInitialize() override;
- virtual void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
- return;
- }
bool reassembleUnverified() override { return true; }
};
diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc
index d4d98a90b..9e0a06dd8 100644
--- a/libtransport/src/protocols/fec/rely.cc
+++ b/libtransport/src/protocols/fec/rely.cc
@@ -79,7 +79,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
// Check new payload size and make sure it fits in packet buffer
auto new_payload_size = produce_bytes();
- int difference = new_payload_size - length;
+ int difference = (int)(new_payload_size - length);
DCHECK(difference > 0);
DCHECK(content_object.ensureCapacity(difference));
diff --git a/libtransport/src/protocols/fec/rely.h b/libtransport/src/protocols/fec/rely.h
index 001a26002..cc81222b2 100644
--- a/libtransport/src/protocols/fec/rely.h
+++ b/libtransport/src/protocols/fec/rely.h
@@ -15,6 +15,7 @@
#pragma once
+#include <hicn/transport/portability/endianess.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <hicn/transport/utils/membuf.h>
#include <protocols/fec/fec_info.h>
@@ -80,11 +81,19 @@ class RelyBase : public virtual FECBase {
*/
class fec_metadata {
public:
- void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); }
- uint32_t getSeqNumberBase() const { return ntohl(seq_number); }
-
- void setMetadataBase(uint32_t value) { metadata = htonl(value); }
- uint32_t getMetadataBase() const { return ntohl(metadata); }
+ void setSeqNumberBase(uint32_t suffix) {
+ seq_number = portability::host_to_net(suffix);
+ }
+ uint32_t getSeqNumberBase() const {
+ return portability::net_to_host(seq_number);
+ }
+
+ void setMetadataBase(uint32_t value) {
+ metadata = portability::host_to_net(value);
+ }
+ uint32_t getMetadataBase() const {
+ return portability::net_to_host(metadata);
+ }
private:
uint32_t seq_number;
@@ -162,8 +171,9 @@ class RelyEncoder : RelyBase, rely::encoder, public ProducerFEC {
/**
* @brief Get the fec header size, if added to source packets
+ * there is not need to distinguish between source and FEC packets here
*/
- std::size_t getFecHeaderSize() override {
+ std::size_t getFecHeaderSize(bool isFEC) override {
return header_bytes() + sizeof(fec_metadata) + 4;
}
@@ -184,8 +194,9 @@ class RelyDecoder : RelyBase, rely::decoder, public ConsumerFEC {
/**
* @brief Get the fec header size, if added to source packets
+ * there is not need to distinguish between source and FEC packets here
*/
- std::size_t getFecHeaderSize() override {
+ std::size_t getFecHeaderSize(bool isFEC) override {
return header_bytes() + sizeof(fec_metadata);
}
diff --git a/libtransport/src/protocols/fec/rs.cc b/libtransport/src/protocols/fec/rs.cc
index 9c0a3d4fb..d42740c32 100644
--- a/libtransport/src/protocols/fec/rs.cc
+++ b/libtransport/src/protocols/fec/rs.cc
@@ -146,7 +146,8 @@ void BlockCode::encode() {
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Calling encode with max_buffer_size_ = " << max_buffer_size_;
for (uint32_t i = k_; i < n_; i++) {
- fec_encode(code_, data, data[i], i, max_buffer_size_ + METADATA_BYTES);
+ fec_encode(code_, data, data[i], i,
+ (int)(max_buffer_size_ + METADATA_BYTES));
}
// Re-include header in repair packets
@@ -213,7 +214,8 @@ void BlockCode::decode() {
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Calling decode with max_buffer_size_ = " << max_buffer_size_;
- fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_);
+ fec_decode(code_, data, reinterpret_cast<int *>(index),
+ (int)max_buffer_size_);
// Find the index in the block for recovered packets
for (uint32_t i = 0, j = 0; i < k_; i++) {
@@ -228,6 +230,7 @@ void BlockCode::decode() {
auto &packet = operator[](i).getBuffer();
fec_metadata *metadata = reinterpret_cast<fec_metadata *>(
packet->writableData() + max_buffer_size_ - METADATA_BYTES);
+ DCHECK(metadata->getPacketLength() <= packet->capacity());
// Adjust buffer length
packet->setLength(metadata->getPacketLength());
// Adjust metadata
diff --git a/libtransport/src/protocols/fec/rs.h b/libtransport/src/protocols/fec/rs.h
index 034c32bdc..6672eaa6b 100644
--- a/libtransport/src/protocols/fec/rs.h
+++ b/libtransport/src/protocols/fec/rs.h
@@ -18,6 +18,7 @@
#include <arpa/inet.h>
#include <hicn/transport/portability/c_portability.h>
+#include <hicn/transport/portability/endianess.h>
#include <hicn/transport/utils/membuf.h>
#include <protocols/fec/fec_info.h>
#include <protocols/fec_base.h>
@@ -153,8 +154,10 @@ struct fec_header {
*/
uint8_t padding;
- void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); }
- uint32_t getSeqNumberBase() { return ntohl(seq_number); }
+ void setSeqNumberBase(uint32_t suffix) {
+ seq_number = portability::host_to_net(suffix);
+ }
+ uint32_t getSeqNumberBase() { return portability::net_to_host(seq_number); }
void setEncodedSymbolId(uint8_t esi) { encoded_symbol_id = esi; }
uint8_t getEncodedSymbolId() { return encoded_symbol_id; }
void setSourceBlockLen(uint8_t k) { source_block_len = k; }
@@ -163,6 +166,8 @@ struct fec_header {
uint8_t getNFecSymbols() { return n_fec_symbols; }
};
+static_assert(sizeof(fec_header) <= 8, "fec_header is too large");
+
class rs;
/**
@@ -177,11 +182,17 @@ class BlockCode : public Packets {
*/
class __attribute__((__packed__)) fec_metadata {
public:
- void setPacketLength(uint16_t length) { packet_length = htons(length); }
- uint32_t getPacketLength() { return ntohs(packet_length); }
+ void setPacketLength(uint16_t length) {
+ packet_length = portability::host_to_net(length);
+ }
+ uint32_t getPacketLength() {
+ return portability::net_to_host(packet_length);
+ }
- void setMetadataBase(uint32_t value) { metadata = htonl(value); }
- uint32_t getMetadataBase() { return ntohl(metadata); }
+ void setMetadataBase(uint32_t value) {
+ metadata = portability::host_to_net(value);
+ }
+ uint32_t getMetadataBase() { return portability::net_to_host(metadata); }
private:
uint16_t packet_length; /* Used to get the real size of the packet after we
@@ -388,8 +399,11 @@ class RSEncoder : public rs, public ProducerFEC {
/**
* @brief Get the fec header size, if added to source packets
+ * in RS the source packets do not transport any FEC header
*/
- std::size_t getFecHeaderSize() override { return 0; }
+ std::size_t getFecHeaderSize(bool isFEC) override {
+ return isFEC ? sizeof(fec_header) : 0;
+ }
void clear() override {
rs::clear();
@@ -435,8 +449,11 @@ class RSDecoder : public rs, public ConsumerFEC {
/**
* @brief Get the fec header size, if added to source packets
+ * in RS the source packets do not transport any FEC header
*/
- std::size_t getFecHeaderSize() override { return 0; }
+ std::size_t getFecHeaderSize(bool isFEC) override {
+ return isFEC ? sizeof(fec_header) : 0;
+ }
/**
* Clear decoder to reuse
diff --git a/libtransport/src/protocols/fec_base.h b/libtransport/src/protocols/fec_base.h
index bda3ee756..28f6a820a 100644
--- a/libtransport/src/protocols/fec_base.h
+++ b/libtransport/src/protocols/fec_base.h
@@ -101,8 +101,10 @@ class FECBase {
/**
* @brief Get size of FEC header.
+ * the fec header size may be different if a packet is a data packet or a FEC
+ * packet
*/
- virtual std::size_t getFecHeaderSize() = 0;
+ virtual std::size_t getFecHeaderSize(bool isFEC) = 0;
/**
* Set callback to call after packet encoding / decoding
diff --git a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc
index b5ab8184f..0b15559a4 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc
+++ b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc
@@ -66,40 +66,33 @@ void ManifestIncrementalIndexer::onUntrustedManifest(
return;
}
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(content_object));
- manifest->decode();
+ core::ContentObjectManifest manifest(content_object.shared_from_this());
+ manifest.decode();
- processTrustedManifest(interest, std::move(manifest), reassembly);
+ processTrustedManifest(interest, manifest, reassembly);
}
void ManifestIncrementalIndexer::processTrustedManifest(
- core::Interest &interest, std::unique_ptr<ContentObjectManifest> manifest,
+ core::Interest &interest, core::ContentObjectManifest &manifest,
bool reassembly) {
- if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
- core::ManifestVersion::VERSION_1)) {
- throw errors::RuntimeException("Received manifest with unknown version.");
- }
-
- switch (manifest->getType()) {
+ switch (manifest.getType()) {
case core::ManifestType::INLINE_MANIFEST: {
suffix_strategy_->setFinalSuffix(
- manifest->getParamsBytestream().final_segment);
+ manifest.getParamsBytestream().final_segment);
// The packets to verify with the received manifest
std::vector<auth::PacketPtr> packets;
// Convert the received manifest to a map of packet suffixes to hashes
- auth::Verifier::SuffixMap current_manifest =
- core::ContentObjectManifest::getSuffixMap(manifest.get());
+ auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap();
// Update 'suffix_map_' with new hashes from the received manifest and
// build 'packets'
- for (auto it = current_manifest.begin(); it != current_manifest.end();) {
+ for (auto it = suffix_map.begin(); it != suffix_map.end();) {
if (unverified_segments_.find(it->first) ==
unverified_segments_.end()) {
suffix_map_[it->first] = std::move(it->second);
- current_manifest.erase(it++);
+ suffix_map.erase(it++);
continue;
}
@@ -109,7 +102,7 @@ void ManifestIncrementalIndexer::processTrustedManifest(
// Verify unverified segments using the received manifest
auth::Verifier::PolicyMap policies =
- verifier_->verifyPackets(packets, current_manifest);
+ verifier_->verifyPackets(packets, suffix_map);
for (unsigned int i = 0; i < packets.size(); ++i) {
auth::Suffix suffix = packets[i]->getName().getSuffix();
@@ -126,7 +119,9 @@ void ManifestIncrementalIndexer::processTrustedManifest(
}
if (reassembly) {
- reassembly_->reassemble(std::move(manifest));
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest.getPacket());
+ reassembly_->reassemble(*manifest_co);
}
break;
}
diff --git a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h
index 12876f35c..8527b55c1 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h
+++ b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.h
@@ -76,7 +76,7 @@ class ManifestIncrementalIndexer : public IncrementalIndexer {
core::ContentObject &content_object,
bool reassembly);
void processTrustedManifest(core::Interest &interest,
- std::unique_ptr<ContentObjectManifest> manifest,
+ core::ContentObjectManifest &manifest,
bool reassembly);
void onUntrustedContentObject(core::Interest &interest,
core::ContentObject &content_object,
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc
index 2a3ec07e1..7f103e12b 100644
--- a/libtransport/src/protocols/prod_protocol_bytestream.cc
+++ b/libtransport/src/protocols/prod_protocol_bytestream.cc
@@ -111,18 +111,18 @@ uint32_t ByteStreamProductionProtocol::produceStream(
uint64_t manifest_free_space;
uint32_t nb_manifests;
std::shared_ptr<core::ContentObjectManifest> manifest;
- uint32_t manifest_capacity = making_manifest_;
+ uint32_t manifest_capacity = manifest_max_capacity_;
bool is_last_manifest = false;
ParamsBytestream transport_params;
manifest_format = Packet::toAHFormat(default_format);
- content_format =
- !making_manifest_ ? Packet::toAHFormat(default_format) : default_format;
+ content_format = !manifest_max_capacity_ ? Packet::toAHFormat(default_format)
+ : default_format;
- content_header_size =
- core::Packet::getHeaderSizeFromFormat(content_format, signature_length);
- manifest_header_size =
- core::Packet::getHeaderSizeFromFormat(manifest_format, signature_length);
+ content_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat(
+ content_format, signature_length);
+ manifest_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat(
+ manifest_format, signature_length);
content_free_space =
std::min(max_segment_size, data_packet_size - content_header_size);
manifest_free_space =
@@ -135,34 +135,39 @@ uint32_t ByteStreamProductionProtocol::produceStream(
nb_segments++;
}
- if (making_manifest_) {
+ if (manifest_max_capacity_) {
nb_manifests = static_cast<uint32_t>(
std::ceil(float(nb_segments) / manifest_capacity));
final_block_number += nb_segments + nb_manifests - 1;
transport_params.final_segment =
is_last ? final_block_number : utils::SuffixStrategy::MAX_SUFFIX;
- manifest.reset(ContentObjectManifest::createManifest(
+ manifest = ContentObjectManifest::createContentManifest(
manifest_format,
name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- is_last_manifest, name, hash_algo, signature_length));
-
- manifest->setLifetime(content_object_expiry_time);
+ signature_length);
+ manifest->setHeaders(core::ManifestType::INLINE_MANIFEST,
+ manifest_max_capacity_, hash_algo, is_last_manifest,
+ name);
manifest->setParamsBytestream(transport_params);
+ manifest->getPacket()->setLifetime(content_object_expiry_time);
}
auto self = shared_from_this();
for (unsigned int packaged_segments = 0; packaged_segments < nb_segments;
packaged_segments++) {
- if (making_manifest_) {
- if (manifest->estimateManifestSize(1) > manifest_free_space) {
+ if (manifest_max_capacity_) {
+ if (manifest->Encoder::manifestSize(1) > manifest_free_space) {
manifest->encode();
- signer_->signPacket(manifest.get());
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest->getPacket());
+
+ signer_->signPacket(manifest_co.get());
// Send the current manifest
- passContentObjectToCallbacks(manifest, self);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName();
+ passContentObjectToCallbacks(manifest_co, self);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send manifest " << manifest_co->getName();
// Send content objects stored in the queue
while (!content_queue_.empty()) {
@@ -175,15 +180,15 @@ uint32_t ByteStreamProductionProtocol::produceStream(
// Create new manifest. The reference to the last manifest has been
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference.
- manifest.reset(ContentObjectManifest::createManifest(
+ manifest = ContentObjectManifest::createContentManifest(
manifest_format,
name.setSuffix(suffix_strategy->getNextManifestSuffix()),
- core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, is_last_manifest, name,
- hash_algo, signature_length));
-
- manifest->setLifetime(content_object_expiry_time);
+ signature_length);
+ manifest->setHeaders(core::ManifestType::INLINE_MANIFEST,
+ manifest_max_capacity_, hash_algo,
+ is_last_manifest, name);
manifest->setParamsBytestream(transport_params);
+ manifest->getPacket()->setLifetime(content_object_expiry_time);
}
}
@@ -191,7 +196,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
uint32_t content_suffix = suffix_strategy->getNextContentSuffix();
auto content_object = std::make_shared<ContentObject>(
name.setSuffix(content_suffix), content_format,
- !making_manifest_ ? signature_length : 0);
+ !manifest_max_capacity_ ? signature_length : 0);
content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
@@ -203,7 +208,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
b->append(buffer_size - bytes_segmented);
bytes_segmented += (int)(buffer_size - bytes_segmented);
- if (is_last && making_manifest_) {
+ if (is_last && manifest_max_capacity_) {
is_last_manifest = true;
} else if (is_last) {
content_object->setLast();
@@ -219,9 +224,9 @@ uint32_t ByteStreamProductionProtocol::produceStream(
// Either we sign the content object or we save its hash into the current
// manifest
- if (making_manifest_) {
+ if (manifest_max_capacity_) {
auth::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(content_suffix, hash);
+ manifest->addEntry(content_suffix, hash);
content_queue_.push(content_object);
} else {
signer_->signPacket(content_object.get());
@@ -232,16 +237,19 @@ uint32_t ByteStreamProductionProtocol::produceStream(
}
// We send the manifest that hasn't been fully filled yet
- if (making_manifest_) {
+ if (manifest_max_capacity_) {
if (is_last_manifest) {
manifest->setIsLast(is_last_manifest);
}
manifest->encode();
- signer_->signPacket(manifest.get());
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest->getPacket());
+
+ signer_->signPacket(manifest_co.get());
- passContentObjectToCallbacks(manifest, self);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName();
+ passContentObjectToCallbacks(manifest_co, self);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest_co->getName();
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front(), self);
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index e49f58167..cb8dff6e4 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -43,9 +43,6 @@ RTCProductionProtocol::RTCProductionProtocol(
last_produced_data_ts_(0),
last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
- queue_timer_on_(false),
- consumer_in_sync_(false),
- on_consumer_in_sync_(nullptr),
pending_fec_pace_(false),
max_len_(0),
queue_len_(0),
@@ -54,8 +51,6 @@ RTCProductionProtocol::RTCProductionProtocol(
std::uniform_int_distribution<> dis(0, 255);
prod_label_ = dis(gen_);
cache_label_ = (prod_label_ + 1) % 256;
- interests_queue_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
round_timer_ =
std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
fec_pacing_timer_ =
@@ -69,16 +64,7 @@ RTCProductionProtocol::~RTCProductionProtocol() {}
void RTCProductionProtocol::setProducerParam() {
// Flow name: here we assume there is only one prefix registered in the portal
- flow_name_ = portal_->getServedNamespaces().begin()->getName();
-
- // Manifest
- uint32_t making_manifest;
- socket_->getSocketOption(interface::GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest);
-
- // Signer
- std::shared_ptr<auth::Signer> signer;
- socket_->getSocketOption(interface::GeneralTransportOptions::SIGNER, signer);
+ flow_name_ = portal_->getServedNamespaces().begin()->makeName();
// Default format
core::Packet::Format default_format;
@@ -94,15 +80,22 @@ void RTCProductionProtocol::setProducerParam() {
socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA,
data_aggregation_);
- size_t signature_size = signer->getSignatureFieldSize();
- data_header_format_ = {
- !making_manifest ? Packet::toAHFormat(default_format) : default_format,
- !making_manifest ? signature_size : 0};
+ size_t signature_size = signer_->getSignatureFieldSize();
+ data_header_format_ = {!manifest_max_capacity_
+ ? Packet::toAHFormat(default_format)
+ : default_format,
+ !manifest_max_capacity_ ? signature_size : 0};
manifest_header_format_ = {Packet::toAHFormat(default_format),
signature_size};
nack_header_format_ = {Packet::toAHFormat(default_format), signature_size};
fec_header_format_ = {Packet::toAHFormat(default_format), signature_size};
+ // Initialize verifier for aggregated interests
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER,
+ verifier);
+ verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0);
+
// Schedule round timer
scheduleRoundTimer();
}
@@ -143,15 +136,17 @@ void RTCProductionProtocol::updateStats(bool new_round) {
packets_production_rate_ =
ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
- // add fec packets looking at the fec code. we don't use directly the number
- // of fec packets produced in 1 round because it may happen that different
- // numbers of blocks are generated during the rounds and this creates
- // inconsistencies in the estimation of the production rate
- uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
- uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+ if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) {
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
- packets_production_rate_ +=
- ceil((double)packets_production_rate_ / (double)k) * (n - k);
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
+ }
// update the production rate as soon as it increases by 10% with respect to
// the last round
@@ -168,11 +163,6 @@ void RTCProductionProtocol::updateStats(bool new_round) {
allow_delayed_nacks_ = false;
}
- // check if the production rate is decreased. if yes send nacks if needed
- if (prev_packets_production_rate < packets_production_rate_) {
- sendNacksForPendingInterests();
- }
-
if (new_round) {
prev_produced_bytes_ = produced_bytes_;
prev_produced_packets_ = produced_packets_;
@@ -203,16 +193,25 @@ void RTCProductionProtocol::produce(ContentObject &content_object) {
uint32_t RTCProductionProtocol::produceDatagram(
const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
std::size_t buffer_size = buffer->length();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Maybe Sending content object: " << content_name;
+
if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name;
+
uint32_t data_packet_size;
socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
data_packet_size);
-
- if (TRANSPORT_EXPECT_FALSE(
- (Packet::getHeaderSizeFromFormat(data_header_format_.first,
- data_header_format_.second) +
- rtc::DATA_HEADER_SIZE + buffer_size) > data_packet_size)) {
+ // this is a source packet but we check the fec header size of FEC packet in
+ // order to leave room for the header when FEC packets will be generated
+ uint32_t fec_header = 0;
+ if (fec_encoder_) fec_encoder_->getFecHeaderSize(true);
+ uint32_t headers_size =
+ (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first,
+ data_header_format_.second) +
+ rtc::DATA_HEADER_SIZE + fec_header;
+ if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) {
return 0;
}
@@ -338,47 +337,42 @@ void RTCProductionProtocol::emptyQueue() {
}
void RTCProductionProtocol::sendManifest(const Name &name) {
- if (!making_manifest_) {
+ if (!manifest_max_capacity_) {
return;
}
- Name manifest_name(name);
-
- uint32_t data_packet_size;
- socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
- data_packet_size);
-
- // The maximum number of entries a manifest can hold
- uint32_t manifest_capacity = making_manifest_;
+ Name manifest_name = name;
// If there is not enough hashes to fill a manifest, return early
- if (manifest_entries_.size() < manifest_capacity) {
+ if (manifest_entries_.size() < manifest_max_capacity_) {
return;
}
// Create a new manifest
std::shared_ptr<core::ContentObjectManifest> manifest =
createManifest(manifest_name.setSuffix(current_seg_));
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest->getPacket());
// Fill the manifest with packet hashes that were previously saved
uint32_t nb_entries;
- for (nb_entries = 0; nb_entries < manifest_capacity; ++nb_entries) {
+ for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) {
if (manifest_entries_.empty()) {
break;
}
std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front();
- manifest->addSuffixHash(front.first, front.second);
+ manifest->addEntry(front.first, front.second);
manifest_entries_.pop();
}
DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Sending manifest " << manifest->getName().getSuffix() << " of size "
- << nb_entries;
+ << "Sending manifest " << manifest_co->getName().getSuffix()
+ << " of size " << nb_entries;
// Encode and send the manifest
manifest->encode();
portal_->getThread().tryRunHandlerNow(
- [this, content_object{std::move(manifest)}, manifest_name]() mutable {
+ [this, content_object{std::move(manifest_co)}, manifest_name]() mutable {
produceInternal(std::move(content_object), manifest_name);
});
}
@@ -394,11 +388,12 @@ RTCProductionProtocol::createManifest(const Name &content_name) const {
uint64_t now = utils::SteadyTime::nowMs().count();
// Create a new manifest
- std::shared_ptr<core::ContentObjectManifest> manifest(
- ContentObjectManifest::createManifest(
- manifest_header_format_.first, name, core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, false, name, hash_algo,
- manifest_header_format_.second));
+ std::shared_ptr<core::ContentObjectManifest> manifest =
+ ContentObjectManifest::createContentManifest(
+ manifest_header_format_.first, name, manifest_header_format_.second);
+ manifest->setHeaders(core::ManifestType::INLINE_MANIFEST,
+ manifest_max_capacity_, hash_algo, false /* is_last */,
+ name);
// Set connection parameters
manifest->setParamsRTC(ParamsRTC{
@@ -444,7 +439,15 @@ void RTCProductionProtocol::producePktInternal(
// set hicn stuff
Name n(content_name);
content_object->setName(n.setSuffix(current_seg_));
- content_object->setLifetime(500); // XXX this should be set by the APP
+
+ uint32_t expiry_time = 0;
+ socket_->getSocketOption(
+ interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ expiry_time);
+ if (expiry_time == interface::default_values::content_object_expiry_time)
+ expiry_time = 500; // the data expiration time should be set by the App. if
+ // the App does not specify it the default is 500ms
+ content_object->setLifetime(expiry_time);
content_object->setPathLabel(prod_label_);
// update stats
@@ -466,9 +469,9 @@ void RTCProductionProtocol::producePktInternal(
// pass packet to FEC encoder
if (fec_encoder_ && !fec) {
- uint32_t offset =
- is_manifest ? content_object->headerSize()
- : content_object->headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize()
+ : (uint32_t)content_object->headerSize() +
+ rtc::DATA_HEADER_SIZE;
uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType());
fec_encoder_->onPacketProduced(*content_object, offset, metadata);
@@ -481,19 +484,14 @@ void RTCProductionProtocol::producePktInternal(
*content_object);
}
- auto seq_it = seqs_map_.find(current_seg_);
- if (seq_it != seqs_map_.end()) {
- sendContentObject(content_object, false, fec);
- }
+ // TODO we may want to send FEC only if an interest is pending in the pit in
+ sendContentObject(content_object, false, fec);
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
*content_object);
}
- // remove interests from the interest cache if it exists
- removeFromInterestQueue(current_seg_);
-
if (!fec) last_produced_data_ts_ = now;
// Update current segment
@@ -563,59 +561,65 @@ void RTCProductionProtocol::onInterest(Interest &interest) {
on_interest_input_->operator()(*socket_->getInterface(), interest);
}
- auto suffix = interest.firstSuffix();
- // numberOfSuffixes returns only the prefixes in the payalod
- // we add + 1 to count also the seq in the name
- auto n_suffixes = interest.numberOfSuffixes() + 1;
- Name name = interest.getName();
- bool prev_consumer_state = consumer_in_sync_;
-
- for (uint32_t i = 0; i < n_suffixes; i++) {
- if (i > 0) {
- name.setSuffix(*(suffix + (i - 1)));
- }
+ if (!interest.isValid()) throw std::runtime_error("Bad interest format");
+ if (interest.hasManifest() &&
+ verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT)
+ throw std::runtime_error("Interset manifest verification failed");
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name;
+ uint32_t *suffix = interest.firstSuffix();
+ uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes();
+ uint32_t *request_bitmap = interest.getRequestBitmap();
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(name);
+ Name name = interest.getName();
+ uint32_t pos = 0; // Position of current suffix in manifest
- if (content_object) {
- if (*on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_->operator()(
- *socket_->getInterface(), interest);
- }
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received interest " << name << " (" << n_suffixes_in_manifest
+ << " suffixes in manifest)";
+
+ // Process the suffix in the interest header
+ // (first loop iteration), then suffixes in the manifest
+ do {
+ if (!interest.hasManifest() || is_bit_set(request_bitmap, pos)) {
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(name);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(
+ *socket_->getInterface(), interest);
+ }
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(),
- *content_object);
- }
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Send content %u (onInterest) " << content_object->getName();
- content_object->setPathLabel(cache_label_);
- sendContentObject(content_object);
- } else {
- if (*on_interest_process_) {
- on_interest_process_->operator()(*socket_->getInterface(), interest);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content %u (onInterest) " << content_object->getName();
+ content_object->setPathLabel(cache_label_);
+ sendContentObject(content_object);
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ processInterest(name.getSuffix(), interest.getLifetime());
}
- processInterest(name.getSuffix(), interest.getLifetime());
}
- }
- if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_)
- on_consumer_in_sync_(*socket_->getInterface(), interest);
+ // Retrieve next suffix in the manifest
+ if (interest.hasManifest()) {
+ uint32_t seq = *suffix;
+ suffix++;
+
+ name.setSuffix(seq);
+ interest.setName(name);
+ }
+ } while (pos++ < n_suffixes_in_manifest);
}
void RTCProductionProtocol::processInterest(uint32_t interest_seg,
uint32_t lifetime) {
- if (interest_seg == 0) {
- // first packet from the consumer, reset sync state
- consumer_in_sync_ = false;
- }
-
- uint64_t now = utils::SteadyTime::nowMs().count();
-
switch (rtc::ProbeHandler::getProbeType(interest_seg)) {
case rtc::ProbeType::INIT:
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg;
@@ -629,183 +633,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
break;
}
- // if the production rate 0 use delayed nacks
- if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = UINT64_MAX;
- if (!timers_map_.empty()) {
- next_timer = timers_map_.begin()->first;
- }
-
- uint64_t expiration = now + rtc::NACK_DELAY;
- addToInterestQueue(interest_seg, expiration);
-
- // here we have at least one interest in the queue, we need to start or
- // update the timer
- if (!queue_timer_on_) {
- // set timeout
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- } else {
- // re-schedule the timer because a new interest will expires sooner
- if (next_timer > timers_map_.begin()->first) {
- interests_queue_timer_->cancel();
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
- }
- return;
- }
-
- if (queue_timer_on_) {
- // the producer is producing. Send nacks to packets that will expire
- // before the data production and remove the timer
- queue_timer_on_ = false;
- interests_queue_timer_->cancel();
- sendNacksForPendingInterests();
- }
-
- uint32_t max_gap = (uint32_t)floor(
- (double)((double)((double)lifetime *
- rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
- rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_)));
-
- if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
- sendNack(interest_seg);
- } else {
- if (!consumer_in_sync_ && on_consumer_in_sync_) {
- // we consider the remote consumer to be in sync as soon as it covers
- // 70% of the production window with interests
- uint32_t perc = ceil((double)max_gap * 0.7);
- if (interest_seg > (perc + current_seg_)) {
- consumer_in_sync_ = true;
- // on_consumer_in_sync_(*socket_->getInterface(), interest);
- }
- }
- uint64_t expiration =
- now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
- addToInterestQueue(interest_seg, expiration);
- }
-}
-
-void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) {
- interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait));
- std::weak_ptr<RTCProductionProtocol> self = shared_from_this();
- interests_queue_timer_->async_wait([self](const std::error_code &ec) {
- if (ec) {
- return;
- }
-
- auto sp = self.lock();
- if (sp && sp->isRunning()) {
- sp->interestQueueTimer();
- }
- });
-}
-
-void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
- uint64_t expiration) {
- // check if the seq number exists already
- auto it_seqs = seqs_map_.find(interest_seg);
- if (it_seqs != seqs_map_.end()) {
- // the seq already exists
- if (expiration < it_seqs->second) {
- // we need to update the timer becasue we got a smaller one
- // 1) remove the entry from the multimap
- // 2) update this entry
- auto range = timers_map_.equal_range(it_seqs->second);
- for (auto it_timers = range.first; it_timers != range.second;
- it_timers++) {
- if (it_timers->second == it_seqs->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- timers_map_.insert(
- std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- it_seqs->second = expiration;
- } else {
- // nothing to do here
- return;
- }
- } else {
- // add the new seq
- timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration));
- }
-}
-
-void RTCProductionProtocol::sendNacksForPendingInterests() {
- std::unordered_set<uint32_t> to_remove;
-
- uint32_t pps = ceil((double)(packets_production_rate_)*rtc::
- INTEREST_LIFETIME_REDUCTION_FACTOR);
-
- uint64_t now = utils::SteadyTime::nowMs().count();
- for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_ && it->second > now) {
- double exp_time_in_sec =
- (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC;
- uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec);
-
- if (it->first > (current_seg_ + packets_prod_before_expire)) {
- sendNack(it->first);
- to_remove.insert(it->first);
- }
- } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ ||
- it->second <= now)) {
- // this branch should never be execcuted
- // first condition: the packet was already prdocued and we have and old
- // interest pending. send a nack to notify the consumer if needed. the
- // case it->first = current_seg_ is not handled because
- // the interest will be satified by the next data packet.
- // second condition: the interest is expired.
- sendNack(it->first);
- to_remove.insert(it->first);
- }
- }
-
- // delete nacked interests
- for (auto it = to_remove.begin(); it != to_remove.end(); it++) {
- removeFromInterestQueue(*it);
- }
-}
-
-void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) {
- auto seq_it = seqs_map_.find(interest_seg);
- if (seq_it != seqs_map_.end()) {
- auto range = timers_map_.equal_range(seq_it->second);
- for (auto it_timers = range.first; it_timers != range.second; it_timers++) {
- if (it_timers->second == seq_it->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- seqs_map_.erase(seq_it);
- }
-}
-
-void RTCProductionProtocol::interestQueueTimer() {
- uint64_t now = utils::SteadyTime::nowMs().count();
-
- for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
- uint64_t expire = it_timers->first;
- if (expire <= now) {
- uint32_t seq = it_timers->second;
- sendNack(seq);
- // remove the interest from the other map
- seqs_map_.erase(seq);
- it_timers = timers_map_.erase(it_timers);
- } else {
- // stop, we are done!
- break;
- }
- }
- if (timers_map_.empty()) {
- queue_timer_on_ = false;
- } else {
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
+ if (interest_seg < current_seg_) sendNack(interest_seg);
}
void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) {
@@ -814,18 +642,20 @@ void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) {
std::shared_ptr<core::ContentObjectManifest> manifest_probe =
createManifest(manifest_name);
+ auto manifest_probe_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket());
- manifest_probe->setLifetime(0);
- manifest_probe->setPathLabel(prod_label_);
+ manifest_probe_co->setLifetime(0);
+ manifest_probe_co->setPathLabel(prod_label_);
manifest_probe->encode();
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
- *manifest_probe);
+ *manifest_probe_co);
}
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence;
- sendContentObject(manifest_probe, true, false);
+ sendContentObject(manifest_probe_co, true, false);
}
void RTCProductionProtocol::sendNack(uint32_t sequence) {
@@ -847,20 +677,6 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) {
nack->setLifetime(0);
nack->setPathLabel(prod_label_);
- if (!consumer_in_sync_ && on_consumer_in_sync_ &&
- rtc::ProbeHandler::getProbeType(sequence) == rtc::ProbeType::NOT_PROBE &&
- sequence > next_packet) {
- consumer_in_sync_ = true;
- Packet::Format format;
- socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
- format);
-
- auto interest =
- core::PacketManager<>::getInstance().getPacket<Interest>(format);
- interest->setName(n);
- on_consumer_in_sync_(*socket_->getInterface(), *interest);
- }
-
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(), *nack);
}
@@ -881,7 +697,7 @@ void RTCProductionProtocol::sendContentObject(
portal_->sendContentObject(*content_object);
// Compute and save data packet digest
- if (making_manifest_ && !is_ah) {
+ if (manifest_max_capacity_ && !is_ah) {
auth::CryptoHashType hash_algo;
socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM,
hash_algo);
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h
index c0424a39c..285ccb646 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.h
+++ b/libtransport/src/protocols/prod_protocol_rtc.h
@@ -17,6 +17,7 @@
#include <hicn/transport/core/name.h>
#include <protocols/production_protocol.h>
+#include <protocols/rtc/rtc_verifier.h>
#include <atomic>
#include <map>
@@ -50,11 +51,6 @@ class RTCProductionProtocol : public ProductionProtocol {
buffer, buffer_size, buffer_size));
}
- void setConsumerInSyncCallback(
- interface::ProducerInterestCallback &&callback) {
- on_consumer_in_sync_ = std::move(callback);
- }
-
auto shared_from_this() { return utils::shared_from(this); }
private:
@@ -80,13 +76,6 @@ class RTCProductionProtocol : public ProductionProtocol {
void updateStats(bool new_round);
void scheduleRoundTimer();
- // pending intersts functions
- void addToInterestQueue(uint32_t interest_seg, uint64_t expiration);
- void sendNacksForPendingInterests();
- void removeFromInterestQueue(uint32_t interest_seg);
- void scheduleQueueTimer(uint64_t wait);
- void interestQueueTimer();
-
// FEC functions
void onFecPackets(fec::BufferArray &packets);
fec::buffer getBuffer(std::size_t size);
@@ -111,14 +100,14 @@ class RTCProductionProtocol : public ProductionProtocol {
uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars
uint32_t prev_produced_packets_;
- uint32_t produced_bytes_; // bytes produced in the last round
- uint32_t produced_packets_; // packet produed in the last round
+ uint32_t produced_bytes_; // bytes produced in the last round
+ uint32_t produced_packets_; // packet produed in the last round
uint32_t max_packet_production_; // never exceed this number of packets
// without update stats
- uint32_t bytes_production_rate_; // bytes per sec
- uint32_t packets_production_rate_; // pps
+ uint32_t bytes_production_rate_; // bytes per sec
+ uint32_t packets_production_rate_; // pps
uint64_t last_produced_data_ts_; // ms
@@ -134,27 +123,6 @@ class RTCProductionProtocol : public ProductionProtocol {
// of the new rate.
bool allow_delayed_nacks_;
- // queue for the received interests
- // this map maps the expiration time of an interest to
- // its sequence number. the map is sorted by timeouts
- // the same timeout may be used for multiple sequence numbers
- // but for each sequence number we store only the smallest
- // expiry time. In this way the mapping from seqs_map_ to
- // timers_map_ is unique
- std::multimap<uint64_t, uint32_t> timers_map_;
-
- // this map does the opposite, this map is not ordered
- std::unordered_map<uint32_t, uint64_t> seqs_map_;
- bool queue_timer_on_;
- std::unique_ptr<asio::steady_timer> interests_queue_timer_;
-
- // this callback is called when the remote consumer is in sync with high
- // probability. it is called only the first time that the switch happen.
- // XXX this makes sense only in P2P mode, while in standard mode is
- // impossible to know the state of the consumers so it should not be used.
- bool consumer_in_sync_;
- interface::ProducerInterestCallback on_consumer_in_sync_;
-
// Save FEC packets here before sending them
std::queue<ContentObject::Ptr> pending_fec_packets_;
std::queue<std::pair<uint64_t, ContentObject::Ptr>> paced_fec_packets_;
@@ -172,6 +140,9 @@ class RTCProductionProtocol : public ProductionProtocol {
// Manifest
std::queue<std::pair<uint32_t, auth::CryptoHash>>
manifest_entries_; // map a packet suffix to a packet hash
+
+ // Verifier for aggregated interests
+ std::shared_ptr<rtc::RTCVerifier> verifier_;
};
} // namespace protocol
diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc
index 8b781e38a..039a6a55a 100644
--- a/libtransport/src/protocols/production_protocol.cc
+++ b/libtransport/src/protocols/production_protocol.cc
@@ -78,8 +78,8 @@ int ProductionProtocol::start() {
socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_);
- socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest_);
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_MAX_CAPACITY,
+ manifest_max_capacity_);
std::string fec_type_str = "";
socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str);
diff --git a/libtransport/src/protocols/production_protocol.h b/libtransport/src/protocols/production_protocol.h
index 8e10d2f40..09718631f 100644
--- a/libtransport/src/protocols/production_protocol.h
+++ b/libtransport/src/protocols/production_protocol.h
@@ -79,6 +79,7 @@ class ProductionProtocol
if (fec_str && (fec_type_ == fec::FECType::UNKNOWN)) {
LOG(INFO) << "Using FEC " << fec_str;
fec_type_ = fec::FECUtils::fecTypeFromString(fec_str);
+ CHECK(fec_type_ != fec::FECType::UNKNOWN);
}
if (fec_type_ == fec::FECType::UNKNOWN) {
@@ -123,7 +124,7 @@ class ProductionProtocol
// Signature and manifest
std::shared_ptr<auth::Signer> signer_;
- uint32_t making_manifest_;
+ uint32_t manifest_max_capacity_;
bool is_async_;
fec::FECType fec_type_;
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 131367d78..bcbc15aef 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -371,7 +371,7 @@ void RaaqmTransportProtocol::onPacketDropped(Interest &interest,
}
interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(segment);
+ interest_to_retransmit_.push((unsigned int)segment);
} else {
LOG(ERROR) << "Stop: received not trusted packet "
<< interest_retransmissions_[segment & mask] << " times";
@@ -429,7 +429,7 @@ void RaaqmTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
return;
}
- interest_to_retransmit_.push(segment);
+ interest_to_retransmit_.push((unsigned int)segment);
scheduleNextInterests();
} else {
LOG(ERROR) << "Stop: reached max retx limit.";
@@ -491,7 +491,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
throw std::runtime_error("RAAQM ERROR: no current path found, exit");
} else {
auto now = utils::SteadyTime::Clock::now();
- utils::SteadyTime::Milliseconds rtt = utils::SteadyTime::getDurationMs(
+ auto rtt = utils::SteadyTime::getDurationUs(
interest_timepoints_[segment & mask], now);
// Update stats
@@ -525,7 +525,7 @@ void RaaqmTransportProtocol::RAAQM() {
}
void RaaqmTransportProtocol::updateStats(
- uint32_t suffix, const utils::SteadyTime::Milliseconds &rtt,
+ uint32_t suffix, const utils::SteadyTime::Microseconds &rtt,
utils::SteadyTime::TimePoint &now) {
// Update RTT statistics
stats_->updateAverageRtt(rtt);
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index ec344c23a..a7ef23b68 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -57,7 +57,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void afterDataUnsatisfied(uint64_t segment);
virtual void updateStats(uint32_t suffix,
- const utils::SteadyTime::Milliseconds &rtt,
+ const utils::SteadyTime::Microseconds &rtt,
utils::SteadyTime::TimePoint &now);
private:
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index d06fee918..b8e6e6285 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -50,9 +50,9 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
alpha_(ALPHA) {}
RaaqmDataPath &RaaqmDataPath::insertNewRtt(
- const utils::SteadyTime::Milliseconds &new_rtt,
+ const utils::SteadyTime::Microseconds &new_rtt,
const utils::SteadyTime::TimePoint &now) {
- rtt_ = new_rtt.count();
+ rtt_ = new_rtt.count() / 1000;
rtt_samples_.pushBack(rtt_);
rtt_max_ = rtt_samples_.rBegin();
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index b6f7c5ac1..dd24dad51 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -49,7 +49,7 @@ class RaaqmDataPath {
* max of RTT.
* @param new_rtt is the value of the new RTT
*/
- RaaqmDataPath &insertNewRtt(const utils::SteadyTime::Milliseconds &new_rtt,
+ RaaqmDataPath &insertNewRtt(const utils::SteadyTime::Microseconds &new_rtt,
const utils::SteadyTime::TimePoint &now);
/**
diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc
index d834b53e6..01c18c6cb 100644
--- a/libtransport/src/protocols/rate_estimation.cc
+++ b/libtransport/src/protocols/rate_estimation.cc
@@ -107,9 +107,9 @@ InterRttEstimator::~InterRttEstimator() {
}
void InterRttEstimator::onRttUpdate(
- const utils::SteadyTime::Milliseconds &rtt) {
+ const utils::SteadyTime::Microseconds &rtt) {
pthread_mutex_lock(&(this->mutex_));
- this->rtt_ = rtt.count();
+ this->rtt_ = rtt.count() / 1000.0;
this->number_of_packets_++;
this->avg_rtt_ += this->rtt_;
pthread_mutex_unlock(&(this->mutex_));
@@ -256,7 +256,7 @@ void SimpleEstimator::onDataReceived(int packet_size) {
this->total_size_ += packet_size;
}
-void SimpleEstimator::onRttUpdate(const utils::SteadyTime::Milliseconds &rtt) {
+void SimpleEstimator::onRttUpdate(const utils::SteadyTime::Microseconds &rtt) {
this->number_of_packets_++;
if (this->number_of_packets_ == this->batching_param_) {
@@ -300,9 +300,9 @@ BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg,
}
void BatchingPacketsEstimator::onRttUpdate(
- const utils::SteadyTime::Milliseconds &rtt) {
+ const utils::SteadyTime::Microseconds &rtt) {
this->number_of_packets_++;
- this->avg_rtt_ += rtt.count();
+ this->avg_rtt_ += rtt.count() / 1000.0;
if (number_of_packets_ == this->batching_param_) {
if (estimation_ == 0) {
diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h
index b71de12e4..d809b2b7c 100644
--- a/libtransport/src/protocols/rate_estimation.h
+++ b/libtransport/src/protocols/rate_estimation.h
@@ -31,7 +31,7 @@ class IcnRateEstimator : utils::NonCopyable {
virtual ~IcnRateEstimator(){};
- virtual void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt){};
+ virtual void onRttUpdate(const utils::SteadyTime::Microseconds &rtt){};
virtual void onDataReceived(int packetSize){};
@@ -66,7 +66,7 @@ class InterRttEstimator : public IcnRateEstimator {
~InterRttEstimator();
- void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt);
+ void onRttUpdate(const utils::SteadyTime::Microseconds &rtt);
void onDataReceived(int packet_size) {
if (packet_size > this->max_packet_size_) {
@@ -101,7 +101,7 @@ class BatchingPacketsEstimator : public IcnRateEstimator {
public:
BatchingPacketsEstimator(double alpha_arg, int batchingParam);
- void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt);
+ void onRttUpdate(const utils::SteadyTime::Microseconds &rtt);
void onDataReceived(int packet_size) {
if (packet_size > this->max_packet_size_) {
@@ -148,7 +148,7 @@ class SimpleEstimator : public IcnRateEstimator {
public:
SimpleEstimator(double alpha, int batching_param);
- void onRttUpdate(const utils::SteadyTime::Milliseconds &rtt);
+ void onRttUpdate(const utils::SteadyTime::Microseconds &rtt);
void onDataReceived(int packet_size);
diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h
index b0879201d..c0c4de3d8 100644
--- a/libtransport/src/protocols/reassembly.h
+++ b/libtransport/src/protocols/reassembly.h
@@ -57,12 +57,6 @@ class Reassembly {
virtual void reassemble(utils::MemBuf &buffer, uint32_t suffix) = 0;
/**
- * Handle reassembly of manifest
- */
- virtual void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
-
- /**
* Reset reassembler for new round
*/
virtual void reInitialize() = 0;
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index 6a84914ab..60eceeb19 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_consts.h>
@@ -64,7 +65,7 @@ double ProbeHandler::getProbeLossRate() {
}
void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) {
- assert(min <= max && min >= MIN_PROBE_SEQ);
+ DCHECK(min <= max && min >= MIN_PROBE_SEQ);
distr_ = std::uniform_int_distribution<uint32_t>(min, max);
}
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index d2682edfa..9a56269f3 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
new RtcReassembly(icn_socket, this)),
+ max_aggregated_interest_(1),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
round_timer_ =
@@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() {
TransportProtocol::resume();
}
-std::size_t RTCTransportProtocol::transportHeaderLength() {
+std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) {
return DATA_HEADER_SIZE +
- (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0);
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0);
}
// private
@@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() {
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t unverified_interval;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
- unverified_interval);
+ uint32_t factor_relevant;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ factor_relevant);
- double unverified_ratio;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
- unverified_ratio);
+ uint32_t factor_alert;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ factor_alert);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() {
}
});
- verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
- unverified_ratio);
+ verifier_ =
+ std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() {
last_interest_sent_time_ = 0;
last_interest_sent_seq_ = 0;
-#if 0
- if(portal_->isConnectedToFwd()){
- max_aggregated_interest_ = 1;
- }else{
- max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
- }
-#else
- max_aggregated_interest_ = 1;
- if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) {
- LOG(INFO) << "Max Aggregated: " << max_aggr;
- max_aggregated_interest_ = std::stoul(std::string(max_aggr));
+ // Aggregated interests setup
+ bool aggregated_interests_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS,
+ aggregated_interests_on);
+ if (aggregated_interests_on) {
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS"))
+ max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr));
+ else
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+
+ max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_,
+ 1 + MAX_SUFFIXES_IN_MANIFEST);
}
-#endif
+ LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_;
max_sent_int_ =
std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
@@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() {
socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
ldr_->changeRecoveryStrategy(
(interface::RtcTransportRecoveryStrategies)strategy);
+
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) ldr_->setContentSharingMode();
ldr_->turnOnRecovery();
ldr_->onNewRound(false);
@@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() {
Name *name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
Prefix prefix(*name, 128);
- if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BEST_PATH);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_REPLICATION) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::REPLICATION);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_ALL_FWD_STRATEGIES) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BOTH);
- }
-
+ fwd_strategy_.initFwdStrategy(
+ portal_, prefix, state_.get(),
+ (interface::RtcTransportRecoveryStrategies)strategy);
updateSyncWindow();
}
@@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
return;
}
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE))
+ production_rate = MIN_PROD_RATE_SHARING_MODE;
+
production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
uint32_t lifetime = default_values::interest_lifetime;
@@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() {
double prod_rate = state_->getProducerRate();
double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = state_->getAveragePacketSize();
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE))
+ prod_rate = MIN_PROD_RATE_SHARING_MODE;
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
@@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
sendInterest(*interest_name);
}
+void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ // we got a timeout for this packet so it is not pending anymore
+ interest_name->setSuffix(seq);
+ state_->onSendNewInterest(interest_name);
+ sendInterest(*interest_name);
+}
+
void RTCTransportProtocol::scheduleNextInterests() {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
@@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
// skip received packets
- if (indexer_verifier_->checkNextSuffix() <=
- state_->getHighestSeqReceivedInOrder()) {
- indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
+ uint32_t max_received = state_->getHighestSeqReceivedInOrder();
+ if (indexer_verifier_->checkNextSuffix() <= max_received) {
+ indexer_verifier_->jumpToIndex(max_received + 1);
}
uint32_t sent_interests = 0;
@@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
<< "In while loop. Window size: " << current_sync_win_;
uint32_t next_seg = indexer_verifier_->getNextSuffix();
-
name->setSuffix(next_seg);
// send the packet only if:
@@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
}
timeouts_or_nacks_.insert(segment_number);
-
if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
segment_number <= state_->getHighestSeqReceived()) {
// we retransmit packets only if the producer is active, otherwise we
@@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
<< "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
<< ", jump to " << segment_number;
// add an extra space in the window
- current_sync_win_++;
indexer_verifier_->jumpToIndex(segment_number);
}
state_->onTimeout(segment_number, false);
+ sendInterestForTimeout(segment_number);
scheduleNextInterests();
}
@@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
state_->onJumpForward(production_seg);
- verifier_->onJumpForward(production_seg);
// the client is asking for content in the past
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
@@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// Check if the packet is a retransmission
if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
if (is_data || is_manifest) {
- state_->onPacketRecoveredRtx(segment_number);
+ uint64_t rtt = ldr_->getRtxRtt(segment_number);
+ state_->onPacketRecoveredRtx(content_object, rtt);
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived(
}
if (is_fec) {
- state_->onFecPacketRecoveredRtx(segment_number);
+ state_->onFecPacketRecoveredRtx(content_object);
}
}
@@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
stats_->updateLossRatio(state_->getPerSecondLossRate());
uint64_t rtt = state_->getAvgRTT();
- stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt));
+ stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000));
stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
@@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object,
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Send packet " << content_object.getName() << " to FEC decoder";
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t offset =
+ is_manifest
+ ? (uint32_t)content_object.headerSize()
+ : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE);
uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
fec_decoder_->onDataPacket(content_object, offset, metadata);
@@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
processManifest(*interest, *content_object);
}
- state_->onPacketRecoveredFec(seq_number, buffer->length());
+ state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length());
ldr_->onPacketRecoveredFec(seq_number);
if (payload_type == PayloadType::DATA) {
@@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest,
ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
const ContentObject &content_object) {
- if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) {
return nullptr;
}
- size_t fec_header_size = fec_decoder_->getFecHeaderSize();
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize(false);
const uint8_t *payload =
content_object.data() + content_object.headerSize() + fec_header_size;
size_t payload_size = content_object.payloadSize() - fec_header_size;
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index 3763f33c7..a8a474216 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -44,7 +44,7 @@ class RTCTransportProtocol : public TransportProtocol {
void resume() override;
- std::size_t transportHeaderLength() override;
+ std::size_t transportHeaderLength(bool isFEC) override;
auto shared_from_this() { return utils::shared_from(this); }
@@ -69,6 +69,7 @@ class RTCTransportProtocol : public TransportProtocol {
// packet functions
void sendRtxInterest(uint32_t seq);
void sendProbeInterest(uint32_t seq);
+ void sendInterestForTimeout(uint32_t seq);
void scheduleNextInterests() override;
void onInterestTimeout(Interest::Ptr &interest, const Name &name) override;
void onNack(const ContentObject &content_object);
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index 96e39d07e..29b5a3a12 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -54,7 +54,7 @@ const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As
const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop
// packet const
-const uint32_t RTC_INTEREST_LIFETIME = 2000;
+const uint32_t RTC_INTEREST_LIFETIME = 4000;
// probes sequence range
const uint32_t MIN_PROBE_SEQ = 0xefffffff;
@@ -93,6 +93,7 @@ const double CATCH_UP_WIN_INCREMENT = 1.2;
// used in rate control
const double WIN_DECREASE_FACTOR = 0.5;
const double WIN_INCREASE_FACTOR = 1.5;
+const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes
// round in congestion
const double ROUNDS_BEFORE_TAKE_ACTION = 5;
@@ -120,14 +121,14 @@ const uint64_t MAX_TIMER_RTX = ~0;
const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
const double CATCH_UP_RTT_INCREMENT = 1.2;
-const double MAX_RESIDUAL_LOSS_RATE = 2.0; // %
-const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5;
+const double MAX_RESIDUAL_LOSS_RATE = 1.0; // %
+const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC;
+const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms
// used by producer
const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window *
// rounds in a second
-const uint32_t NACK_DELAY = 1500; // ms
const uint32_t FEC_PACING_TIME = 5; // ms
// aggregated data consts
@@ -139,6 +140,73 @@ const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms
const uint32_t MAX_RTT = 200; // ms
const double MAX_RESIDUAL_LOSSES = 0.05; // %
+const uint8_t FEC_MATRIX[64][10] = {
+ {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1
+ {1, 2, 3, 3, 4, 5, 5, 6, 7, 9},
+ {2, 2, 3, 4, 5, 6, 7, 8, 9, 11},
+ {2, 3, 4, 5, 5, 7, 8, 9, 11, 13},
+ {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5
+ {2, 3, 4, 6, 7, 8, 10, 12, 14, 16},
+ {2, 4, 5, 6, 8, 9, 11, 13, 15, 18},
+ {3, 4, 5, 7, 8, 10, 12, 14, 16, 19},
+ {3, 4, 6, 7, 9, 11, 13, 15, 18, 21},
+ {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10
+ {3, 5, 6, 8, 10, 12, 14, 17, 20, 24},
+ {3, 5, 7, 8, 10, 13, 15, 18, 21, 26},
+ {3, 5, 7, 9, 11, 13, 16, 19, 23, 27},
+ {3, 5, 7, 9, 12, 14, 17, 20, 24, 28},
+ {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15
+ {4, 6, 8, 10, 13, 15, 19, 22, 26, 31},
+ {4, 6, 8, 11, 13, 16, 19, 23, 27, 33},
+ {4, 6, 9, 11, 14, 17, 20, 24, 29, 34},
+ {4, 6, 9, 11, 14, 17, 21, 25, 30, 35},
+ {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20
+ {4, 7, 9, 12, 15, 19, 22, 27, 32, 38},
+ {4, 7, 10, 13, 16, 19, 23, 28, 33, 40},
+ {5, 7, 10, 13, 16, 20, 24, 29, 34, 41},
+ {5, 7, 10, 13, 17, 20, 25, 30, 35, 42},
+ {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25
+ {5, 8, 11, 14, 18, 22, 26, 31, 38, 45},
+ {5, 8, 11, 15, 18, 22, 27, 32, 39, 46},
+ {5, 8, 11, 15, 19, 23, 28, 33, 40, 48},
+ {5, 8, 12, 15, 19, 24, 28, 34, 41, 49},
+ {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30
+ {5, 9, 12, 16, 20, 25, 30, 36, 43, 51},
+ {5, 9, 13, 16, 21, 25, 31, 37, 44, 53},
+ {6, 9, 13, 17, 21, 26, 31, 38, 45, 54},
+ {6, 9, 13, 17, 22, 26, 32, 39, 46, 55},
+ {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35
+ {6, 10, 14, 18, 22, 28, 34, 40, 48, 58},
+ {6, 10, 14, 18, 23, 28, 34, 41, 49, 59},
+ {6, 10, 14, 19, 23, 29, 35, 42, 50, 60},
+ {6, 10, 14, 19, 24, 29, 36, 43, 52, 62},
+ {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40
+ {6, 11, 15, 20, 25, 31, 37, 45, 54, 64},
+ {6, 11, 15, 20, 25, 31, 38, 46, 55, 65},
+ {7, 11, 15, 20, 26, 32, 39, 46, 56, 67},
+ {7, 11, 16, 21, 26, 32, 39, 47, 57, 68},
+ {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45
+ {7, 11, 16, 21, 27, 33, 41, 49, 59, 70},
+ {7, 12, 16, 22, 27, 34, 41, 50, 60, 72},
+ {7, 12, 17, 22, 28, 34, 42, 51, 61, 73},
+ {7, 12, 17, 22, 28, 35, 43, 52, 62, 74},
+ {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50
+ {7, 12, 17, 23, 29, 36, 44, 53, 64, 77},
+ {7, 12, 18, 23, 30, 37, 45, 54, 65, 78},
+ {7, 13, 18, 24, 30, 37, 45, 55, 66, 79},
+ {8, 13, 18, 24, 31, 38, 46, 56, 67, 80},
+ {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55
+ {8, 13, 19, 25, 31, 39, 47, 57, 69, 83},
+ {8, 13, 19, 25, 32, 39, 48, 58, 70, 84},
+ {8, 13, 19, 25, 32, 40, 49, 59, 71, 85},
+ {8, 14, 19, 26, 33, 41, 50, 60, 72, 86},
+ {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60
+ {8, 14, 20, 26, 34, 42, 51, 61, 74, 89},
+ {8, 14, 20, 27, 34, 42, 52, 62, 75, 90},
+ {8, 14, 20, 27, 34, 43, 52, 63, 76, 91},
+ {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64
+};
+
} // namespace rtc
} // namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
index b3abf5ea8..a421396b1 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.cc
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -91,6 +91,8 @@ void RTCDataPath::insertRttSample(
rtt_samples_ = 0;
last_avg_rtt_compute_ = now;
}
+
+ received_packets_++;
}
void RTCDataPath::insertOwdSample(int64_t owd) {
@@ -115,10 +117,6 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
int64_t diff = std::abs(owd - last_owd_);
last_owd_ = owd;
jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
-
- // owd is computed only for valid data packets so we count only
- // this for decide if we recevie traffic or not
- received_packets_++;
}
void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
@@ -150,12 +148,17 @@ double RTCDataPath::getInterArrivalGap() {
return avg_inter_arrival_;
}
-bool RTCDataPath::isActive() {
+bool RTCDataPath::isValidProducer() {
if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
return true;
return false;
}
+bool RTCDataPath::isActive() {
+ if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true;
+ return false;
+}
+
bool RTCDataPath::pathToProducer() {
if (received_nacks_) return true;
return false;
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
index 5afbbb87f..ba5201fe8 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.h
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -49,8 +49,9 @@ class RTCDataPath {
double getQueuingDealy();
double getInterArrivalGap();
double getJitter();
- bool isActive();
- bool pathToProducer();
+ bool isActive(); // pakets recevied from this path in the last rounds
+ bool pathToProducer(); // path from a producer
+ bool isValidProducer(); // path from a producer that is also active
uint64_t getLastPacketTS();
uint32_t getPacketsLastRound();
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
index c6bc751e6..4bbd7eac0 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/interfaces/notification.h>
+#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_forwarding_strategy.h>
namespace transport {
@@ -24,8 +25,13 @@ namespace rtc {
using namespace transport::interface;
+const double FWD_MAX_QUEUE = 30.0; // ms
+const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms
+const double FWD_MAX_LOSS_RATE = 0.1;
+
RTCForwardingStrategy::RTCForwardingStrategy()
- : init_(false),
+ : low_rate_app_(false),
+ init_(false),
forwarder_set_(false),
selected_strategy_(NONE),
current_strategy_(NONE),
@@ -42,17 +48,56 @@ void RTCForwardingStrategy::setCallback(
void RTCForwardingStrategy::initFwdStrategy(
std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state,
- strategy_t strategy) {
- init_ = true;
- selected_strategy_ = strategy;
- if (strategy == BOTH)
- current_strategy_ = BEST_PATH;
- else
- current_strategy_ = strategy;
- rounds_since_last_set_ = 0;
- prefix_ = prefix;
- portal_ = portal;
- state_ = state;
+ interface::RtcTransportRecoveryStrategies strategy) {
+ switch (strategy) {
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF:
+ case interface::RtcTransportRecoveryStrategies::RTX_ONLY:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY:
+ case interface::RtcTransportRecoveryStrategies::DELAY_BASED:
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES:
+ default:
+ // fwd strategies are not used
+ init_ = false;
+ }
+
+ if (init_) {
+ rounds_since_last_set_ = 0;
+ prefix_ = prefix;
+ portal_ = portal;
+ state_ = state;
+ }
}
void RTCForwardingStrategy::checkStrategy() {
@@ -99,16 +144,35 @@ void RTCForwardingStrategy::checkStrategyBestPath() {
return;
}
- uint8_t qs = state_->getQualityScore();
+ if (low_rate_app_) {
+ // this is used for gaming
+ uint8_t qs = state_->getQualityScore();
- if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
- // between each switch
- rounds_since_last_set_++;
- return;
- }
+ if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
- // try to switch path
- setStrategy(BEST_PATH);
+ // try to switch path
+ setStrategy(BEST_PATH);
+ } else {
+ if (rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
+
+ double queue = state_->getQueuing();
+ double rtt = state_->getAvgRTT();
+ double loss_rate = state_->getPerSecondLossRate();
+
+ if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT ||
+ loss_rate > FWD_MAX_LOSS_RATE) {
+ // try to switch path
+ setStrategy(BEST_PATH);
+ }
+ }
}
void RTCForwardingStrategy::checkStrategyReplication() {
@@ -133,7 +197,7 @@ void RTCForwardingStrategy::checkStrategyBoth() {
// TODO
// for the moment we use only best path.
- // but later:
+ // for later:
// 1. if both paths are bad use replication
// 2. while using replication compute the effectiveness. if the majority of
// the packets are coming from a single path, try to use bestpath
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
index 9825877fd..c2227e09f 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -41,7 +41,7 @@ class RTCForwardingStrategy {
void initFwdStrategy(std::shared_ptr<core::Portal> portal,
core::Prefix& prefix, RTCState* state,
- strategy_t strategy);
+ interface::RtcTransportRecoveryStrategies strategy);
void checkStrategy();
void setCallback(interface::StrategyCallback&& callback);
@@ -56,6 +56,10 @@ class RTCForwardingStrategy {
std::array<std::string, 4> string_strategies_ = {"bestpath", "replication",
"both", "none"};
+ bool low_rate_app_; // if set to true the best path strategy will
+ // trigger a path switch based on the quality
+ // score, otherwise it will use the RTT,
+ // queuing delay and loss rate
bool init_; // true if all val are initializes
bool forwarder_set_; // true if the strategy is been set at least
// once
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index abf6cda2c..6e88a8636 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -37,16 +37,24 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
interface::StrategyCallback &&external_callback) {
- rs_type_ = type;
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
- indexer, std::move(callback), io_service, std::move(external_callback));
- } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(
- indexer, std::move(callback), io_service, std::move(external_callback));
- } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_BESTPATH ||
@@ -55,12 +63,14 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_ALL_FWD_STRATEGIES) {
rs_ = std::make_shared<RecoveryStrategyLowRate>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
} else {
// default
- rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
+ type = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
- indexer, std::move(callback), io_service, std::move(external_callback));
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
}
}
@@ -68,15 +78,21 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
void RTCLossDetectionAndRecovery::changeRecoveryStrategy(
interface::RtcTransportRecoveryStrategies type) {
- if (type == rs_type_) return;
+ if (type == rs_->getType()) return;
- rs_type_ = type;
+ rs_->updateType(type);
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ =
std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get())));
- } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get())));
- } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get())));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
@@ -116,14 +132,15 @@ bool RTCLossDetectionAndRecovery::onDataPacketReceived(
uint32_t seq = content_object.getName().getSuffix();
bool is_rtx = rs_->isRtx(seq);
rs_->receivedPacket(seq);
+ bool ret = false;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "received data. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
+ << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq;
if (!is_rtx)
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq,
- false);
+ ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false);
- return false;
+ rs_->getState()->updateHighestSeqReceived(seq);
+ return ret;
}
bool RTCLossDetectionAndRecovery::onNackPacketReceived(
@@ -141,10 +158,9 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived(
// may got lost and we should ask them
rs_->receivedPacket(seq);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "received nack. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
- << production_seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
// if it is a future nack store it in the list set of nacked seq
if (production_seq <= seq) rs_->receivedFutureNack(seq);
@@ -152,7 +168,7 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived(
// call the detectLoss function using the probe flag = true. in fact the
// losses detected using nacks are the same as the one detected using probes,
// we should not increase the loss counter
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
production_seq, true);
}
@@ -164,12 +180,11 @@ bool RTCLossDetectionAndRecovery::onProbePacketReceived(
uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "received probe. add from "
- << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
- << production_seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
- return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
production_seq, true);
}
@@ -183,8 +198,8 @@ bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop,
}
// skip received or lost packets
- if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) {
- start = rs_->getState()->getHighestSeqReceivedInOrder() + 1;
+ if (start <= rs_->getState()->getHighestSeqReceived()) {
+ start = rs_->getState()->getHighestSeqReceived() + 1;
}
bool loss_detected = false;
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index 7f683eaa6..24f22ffed 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -47,6 +47,7 @@ class RTCLossDetectionAndRecovery
void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
+ void setContentSharingMode() { rs_->setContentSharingMode(); }
void turnOnRecovery() { rs_->turnOnRecovery(); }
bool isRtxOn() { return rs_->isRtxOn(); }
@@ -68,11 +69,12 @@ class RTCLossDetectionAndRecovery
return rs_->isPossibleLossWithNoRtx(seq);
}
+ uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); }
+
private:
// returns true if a loss is detected, false otherwise
bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe);
- interface::RtcTransportRecoveryStrategies rs_type_;
std::shared_ptr<RecoveryStrategy> rs_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
index 391aedfc6..ffbbd78fd 100644
--- a/libtransport/src/protocols/rtc/rtc_packet.h
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -52,6 +52,8 @@
#include <hicn/transport/portability/win_portability.h>
#endif
+#include <hicn/transport/portability/endianess.h>
+
#include <cstring>
namespace transport {
@@ -60,24 +62,6 @@ namespace protocol {
namespace rtc {
-inline uint64_t _ntohll(const uint64_t *input) {
- uint64_t return_val;
- uint8_t *tmp = (uint8_t *)&return_val;
-
- tmp[0] = (uint8_t)(*input >> 56);
- tmp[1] = (uint8_t)(*input >> 48);
- tmp[2] = (uint8_t)(*input >> 40);
- tmp[3] = (uint8_t)(*input >> 32);
- tmp[4] = (uint8_t)(*input >> 24);
- tmp[5] = (uint8_t)(*input >> 16);
- tmp[6] = (uint8_t)(*input >> 8);
- tmp[7] = (uint8_t)(*input >> 0);
-
- return return_val;
-}
-
-inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
-
const uint32_t DATA_HEADER_SIZE = 12; // bytes
// XXX: sizeof(data_packet_t) is 16
// beacuse of padding
@@ -87,11 +71,19 @@ struct data_packet_t {
uint64_t timestamp;
uint32_t prod_rate;
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
};
struct nack_packet_t {
@@ -99,14 +91,26 @@ struct nack_packet_t {
uint32_t prod_rate;
uint32_t prod_seg;
- inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
- inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
- inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
- inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
- inline uint32_t getProductionSegment() const { return ntohl(prod_seg); }
- inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); }
+ inline uint32_t getProductionSegment() const {
+ return portability::net_to_host(prod_seg);
+ }
+ inline void setProductionSegment(uint32_t seg) {
+ prod_seg = portability::host_to_net(seg);
+ }
};
class AggrPktHeader {
@@ -225,7 +229,7 @@ class AggrPktHeader {
return (uint16_t) * (buf_ + pkt_index);
} else { // 16 bits
uint16_t *buf_16 = (uint16_t *)buf_;
- return ntohs(*(buf_16 + pkt_index));
+ return portability::net_to_host(*(buf_16 + pkt_index));
}
}
@@ -235,7 +239,7 @@ class AggrPktHeader {
*(buf_ + pkt_index) = (uint8_t)len;
} else { // 16 bits
uint16_t *buf_16 = (uint16_t *)buf_;
- *(buf_16 + pkt_index) = htons(len);
+ *(buf_16 + pkt_index) = portability::host_to_net(len);
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc
index 992bab50e..b1b0fcaba 100644
--- a/libtransport/src/protocols/rtc/rtc_reassembly.cc
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc
@@ -40,7 +40,7 @@ void RtcReassembly::reassemble(core::ContentObject& content_object) {
auto read_buffer = content_object.getPayload();
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length();
- read_buffer->trimStart(transport_protocol_->transportHeaderLength());
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength(false));
if (data_aggregation_) {
rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data());
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
index 66ae5086c..257fdd09b 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -29,8 +29,12 @@ using namespace transport::interface;
RecoveryStrategy::RecoveryStrategy(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback)
- : recovery_on_(false),
+ bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : rs_type_(rs_type),
+ recovery_on_(false),
+ content_sharing_mode_(false),
rtx_during_fec_(0),
next_rtx_timer_(MAX_TIMER_RTX),
send_rtx_callback_(std::move(callback)),
@@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy(
}
RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
- : rtx_during_fec_(0),
+ : rs_type_(rs.rs_type_),
+ content_sharing_mode_(rs.content_sharing_mode_),
+ rtx_during_fec_(0),
rtx_state_(std::move(rs.rtx_state_)),
rtx_timers_(std::move(rs.rtx_timers_)),
recover_with_fec_(std::move(rs.recover_with_fec_)),
@@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
RecoveryStrategy::~RecoveryStrategy() {}
void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
+ // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64
n_ = n;
k_ = k;
// XXX for the moment we go in steps of 5% loss rate.
- // max loss rate = 95%
+ uint32_t i = 0;
for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)(loss_rate + 5) / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- fec_state_ f;
- f.fec_to_ask = std::min(fec_to_ask, (n_ - k_));
- f.last_update = round_id_;
- f.avg_residual_losses = 0.0;
- f.consecutive_use = 0;
- fec_per_loss_rate_.push_back(f);
+ uint32_t fec_to_ask = 0;
+ if (n_ != 0 && k_ != 0) {
+ if (rs_type_ ==
+ interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) {
+ // the max loss rate in the matrix is 50%
+ uint32_t index = i;
+ if (i > 9) index = 9;
+ fec_to_ask = FEC_MATRIX[k_ - 1][index];
+ } else {
+ double dec_loss_rate = (double)(loss_rate + 5);
+ if (dec_loss_rate == 100.0) dec_loss_rate = 95.0;
+ dec_loss_rate = dec_loss_rate / 100.0;
+ double exp_losses = ceil((double)k_ * dec_loss_rate);
+ fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25);
+ }
+ }
+ fec_to_ask = std::min(fec_to_ask, (n_ - k_));
+ fec_per_loss_rate_.push_back(fec_to_ask);
+
+ i++;
}
}
+uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) {
+ auto it = rtx_state_.find(seq);
+
+ if (it == rtx_state_.end()) return 0;
+
+ // we can compute the RTT of an RTX only if it was send once. Infact if the
+ // RTX was sent twice or more the data may be alredy in flight and the RTT
+ // will be underestimated. This may happen also for packets that we
+ // retransmitted too soon. in that case the RTT will be filtered out by
+ // checking the path label
+ if (it->second.rtx_count_ != 1) return 0;
+
+ // this a potentialy valid packet, compute the RTT
+ return (utils::SteadyTime::nowMs().count() - it->second.last_send_);
+}
+
bool RecoveryStrategy::lossDetected(uint32_t seq) {
if (isRtx(seq)) {
// this packet is already in the list of rtx
@@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
state.first_send_ = state_->getInterestSentTime(seq);
if (state.first_send_ == 0) // this interest was never sent before
state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
+ state.last_send_ = state.first_send_; // we didn't send an RTX for this
+ // packet yet
state.rtx_count_ = 0;
+ state.next_send_ = computeNextSend(seq, state.rtx_count_);
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Add " << seq << " to retransmissions. next rtx is in "
<< state.next_send_ - getNow() << " ms";
@@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
}
}
-uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) {
+uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) {
uint64_t now = getNow();
- if (new_rtx) {
- // for the new rtx we wait one estimated IAT after the loss detection. this
- // is bacause, assuming that packets arrive with a constant IAT, we should
- // get a new packet every IAT
- double prod_rate = state_->getProducerRate();
- uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
- uint32_t jitter = 0;
+ if (rtx_counter == 0) {
+ uint32_t wait = 1;
+ if (content_sharing_mode_) return now + wait;
- if (prod_rate != 0) {
- double packet_size = state_->getAveragePacketSize();
- estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
- jitter = ceil(state_->getJitter());
- }
+ uint32_t jitter = SENTINEL_TIMER_INTERVAL;
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) jitter = ceil(state_->getJitter());
- uint32_t wait = 1;
- if (estimated_iat < 18) {
- // for low rate app we do not wait to send a RTX
- // we consider low rate stream with less than 50pps (iat >= 20ms)
- // (e.g. audio in videoconf, mobile games).
- // in the check we use 18ms to accomodate for measurements errors
- // for flows with higher rate wait 1 ait + jitter
- wait = estimated_iat + jitter;
- }
+ wait += jitter;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "first rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait
+ << " ms, jitter = " << jitter;
return now + wait;
} else {
- // wait one RTT
- uint32_t wait = SENTINEL_TIMER_INTERVAL;
-
+ // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx
double prod_rate = state_->getProducerRate();
if (prod_rate == 0) {
return now + SENTINEL_TIMER_INTERVAL;
}
- double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint64_t rtt = 0;
+ // if the transport detects an edge we try first to get the RTX from the
+ // edge. if no interest get a reply we move to the full RTT
+ if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) {
+ rtt = state_->getEdgeRtt();
+ } else {
+ rtt = state_->getAvgRTT();
+ }
- uint64_t rtt = state_->getMinRTT();
if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = rtt;
+
+ if (content_sharing_mode_) return now + rtt;
+
+ uint32_t wait = (uint32_t)rtt;
uint32_t jitter = ceil(state_->getJitter());
wait += jitter;
- // it may happen that the channel is congested and we have some additional
- // queuing delay to take into account
- uint32_t queue = ceil(state_->getQueuing());
- wait += queue;
-
DLOG_IF(INFO, VLOG_IS_ON(3))
- << "next rtx for " << seq << " in " << wait
- << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat
- << " jttr = " << jitter << " queue = " << queue;
+ << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt
+ << " jtter = " << jitter;
return now + wait;
}
@@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() {
state_->onRetransmission(seq);
double prod_rate = state_->getProducerRate();
if (prod_rate != 0) rtx_it->second.rtx_count_++;
- rtx_it->second.next_send_ = computeNextSend(seq, false);
+ rtx_it->second.last_send_ = now;
+ rtx_it->second.next_send_ =
+ computeNextSend(seq, rtx_it->second.rtx_count_);
it = rtx_timers_.erase(it);
rtx_timers_.insert(
std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
@@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) {
}
it_timers++;
}
+
// remove rtx
rtx_state_.erase(it_rtx);
}
@@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
if (loss_rate == 0) return 0;
- // once per minute try to reduce the fec rate. it may happen that for some bin
- // we ask too many fec packet. here we try to reduce this values gently
- if (round_id_ % ROUNDS_PER_MIN == 0) {
- reduceFec();
- }
-
// keep track of the last used fec. if we use a new bin on this round reset
// consecutive use and avg loss in the prev bin
uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1;
+ if (bin > fec_per_loss_rate_.size() - 1)
+ bin = (uint32_t)fec_per_loss_rate_.size() - 1;
- if (bin != last_fec_used_) {
- fec_per_loss_rate_[last_fec_used_].consecutive_use = 0;
- fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0;
- }
- last_fec_used_ = bin;
- fec_per_loss_rate_[last_fec_used_].consecutive_use++;
-
- // we update the stats only once very 5 rounds (1sec) that is the rate at
- // which we compute residual losses
- if (round_id_ % ROUNDS_PER_SEC == 0) {
- double residual_losses = state_->getResidualLossRate() * 100;
- // update residual loss rate
- fec_per_loss_rate_[bin].avg_residual_losses =
- (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) +
- (1 - MOVING_AVG_ALPHA) * residual_losses;
-
- if ((fec_per_loss_rate_[bin].last_update - round_id_) <
- WAIT_BEFORE_FEC_UPDATE) {
- // this bin is been updated recently so don't modify it and
- // return the current state
- return fec_per_loss_rate_[bin].fec_to_ask;
- }
-
- // if the residual loss rate is too high and we can ask more fec packets and
- // we are using this configuration since at least 5 sec update fec
- if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE &&
- fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) &&
- fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) {
- // so increase the number of fec packets to ask
- fec_per_loss_rate_[bin].fec_to_ask++;
- fec_per_loss_rate_[bin].last_update = round_id_;
- fec_per_loss_rate_[bin].avg_residual_losses = 0.0;
- }
- }
-
- return fec_per_loss_rate_[bin].fec_to_ask;
+ return fec_per_loss_rate_[bin];
}
void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
@@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) {
deleteRtx(seq);
}
-// private methods
-
-void RecoveryStrategy::reduceFec() {
- for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)loss_rate / 100.0;
- double exp_losses = (double)k_ * dec_loss_rate;
- uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
-
- uint32_t bin = ceil(loss_rate / 5.0) - 1;
- if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
- fec_per_loss_rate_[bin].fec_to_ask--;
- }
- }
-}
-
} // end namespace rtc
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
index 482aedc9d..aceb85888 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -32,9 +32,10 @@ namespace rtc {
class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
protected:
struct rtx_state_ {
- uint64_t first_send_;
- uint64_t next_send_;
- uint32_t rtx_count_;
+ uint64_t first_send_; // first time this interest was sent
+ uint64_t last_send_; // last time this rtx was sent
+ uint64_t next_send_; // next retransmission time
+ uint32_t rtx_count_; // number or rtx
};
using rtxState = struct rtx_state_;
@@ -44,6 +45,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service, bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategy(RecoveryStrategy &&rs);
@@ -55,6 +57,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void setState(RTCState *state) { state_ = state; }
void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
void setFecParams(uint32_t n, uint32_t k);
+ void setContentSharingMode() { content_sharing_mode_ = true; }
bool isRtx(uint32_t seq) {
if (rtx_state_.find(seq) != rtx_state_.end()) return true;
@@ -71,10 +74,20 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
return false;
}
+ interface::RtcTransportRecoveryStrategies getType() {
+ return rs_type_;
+ }
+ void updateType(interface::RtcTransportRecoveryStrategies type) {
+ rs_type_ = type;
+ }
bool isRtxOn() { return rtx_on_; }
bool isFecOn() { return fec_on_; }
RTCState *getState() { return state_; }
+
+ // if the function returns 0 it means that the packet is not an RTX or it is
+ // not a valid packet to safely compute the RTT
+ uint64_t getRtxRtt(uint32_t seq);
bool lossDetected(uint32_t seq);
void notifyNewLossDetedcted(uint32_t seq);
void requestPossibleLostPacket(uint32_t seq);
@@ -98,7 +111,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
protected:
// rtx functions
void addNewRtx(uint32_t seq, bool force);
- uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter);
void retransmit();
void scheduleNextRtx();
void deleteRtx(uint32_t seq);
@@ -109,9 +122,11 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
// common functons
void removePacketState(uint32_t seq);
+ interface::RtcTransportRecoveryStrategies rs_type_;
bool recovery_on_;
bool rtx_on_;
bool fec_on_;
+ bool content_sharing_mode_;
// number of RTX sent after fec turned on
// this is used to take into account jitter and out of order packets
@@ -152,19 +167,9 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RTCRateControl *rc_;
private:
- struct fec_state_ {
- uint32_t fec_to_ask;
- uint32_t last_update; // round id of the last update
- // (wait 10 ruonds (2sec) between updates)
- uint32_t consecutive_use; // consecutive ruonds where this fec was used
- double avg_residual_losses;
- };
-
- void reduceFec();
-
uint32_t round_id_; // number of rounds
uint32_t last_fec_used_;
- std::vector<fec_state_> fec_per_loss_rate_;
+ std::vector<uint32_t> fec_per_loss_rate_;
interface::StrategyCallback callback_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
index 4be751ec9..7d7a01133 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -25,8 +25,10 @@ namespace rtc {
RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ rs_type,
std::move(external_callback)), // start with rtx
congestion_state_(false),
probing_state_(false),
@@ -48,7 +50,7 @@ void RecoveryStrategyDelayBased::turnOnRecovery() {
recovery_on_ = true;
uint64_t rtt = state_->getMinRTT();
uint32_t fec_to_ask = computeFecPacketsToAsk();
- if (rtt > 80 && fec_to_ask != 0) {
+ if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) {
// we need to start FEC (see fec only strategy for more details)
setRtxFec(true, true);
rtx_during_fec_ = 1; // avoid to stop fec
@@ -84,16 +86,16 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
return;
}
- uint64_t rtt = state_->getMinRTT();
+ uint64_t rtt = state_->getAvgRTT();
- bool congestion = false;
// XXX at the moment we are not looking at congestion events
- // congestion = rc_->inCongestionState();
+ // bool congestion = rc_->inCongestionState();
- if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) {
+ if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) ||
+ (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) {
// switch from rtx to fec or keep use fec. Notice that if some rtx are
// waiting to be scheduled, they will be sent normally, but no new rtx will
- // be created If the loss rate is 0 keep to use RTX.
+ // be created if the loss rate is 0 keep to use RTX.
uint32_t fec_to_ask = computeFecPacketsToAsk();
softSwitchToFec(fec_to_ask);
if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
@@ -104,7 +106,8 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
return;
}
- if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
+ if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) ||
+ (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) {
// turn on rtx
softSwitchToFec(0);
indexer_->setNFec(0);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
index 5ca90f4cb..9e1c41388 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -26,6 +26,7 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy {
public:
RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
index c44212bda..5b10823ec 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- std::move(external_callback)),
+ rs_type, std::move(external_callback)),
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
index 1ab78b842..42df25bd9 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -26,6 +26,7 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy {
public:
RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
index 48dd3e34f..dbad563cd 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -25,8 +25,10 @@ namespace rtc {
RecoveryStrategyLowRate::RecoveryStrategyLowRate(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
+ rs_type,
std::move(external_callback)), // start with fec
fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
rtx_allowed_consecutive_rounds_(0) {
@@ -75,7 +77,7 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
}
uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100);
- uint32_t rtt = state_->getAvgRTT();
+ uint32_t rtt = (uint32_t)state_->getAvgRTT();
bool use_rtx = false;
for (size_t i = 0; i < switch_vector.size(); i++) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
index d66b197e2..0e76efaca 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -34,6 +34,7 @@ class RecoveryStrategyLowRate : public RecoveryStrategy {
public:
RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyLowRate(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
index 16b14eff6..00c6a0504 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
- std::move(external_callback)) {}
+ rs_type, std::move(external_callback)) {}
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
index 3a9e71e7d..3d59cc473 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -26,6 +26,7 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
public:
RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
index 8e5db5439..4d7cf7a82 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -25,9 +25,10 @@ namespace rtc {
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- std::move(external_callback)) {}
+ rs_type, std::move(external_callback)) {}
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
index e90e5ba13..03dbed1c7 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -26,6 +26,7 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy {
public:
RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
interface::StrategyCallback &&external_callback);
RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 5b3b5e4c3..82ac0b9c1 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -106,6 +106,7 @@ void RTCState::initParams() {
// paths stats
path_table_.clear();
main_path_ = nullptr;
+ edge_path_ = nullptr;
// packet cache (not pending anymore)
packet_cache_.clear();
@@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
}
updatePacketSize(content_object);
- updateReceivedBytes(content_object);
+ updateReceivedBytes(content_object, false);
addRecvOrLost(seq, PacketState::RECEIVED);
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
- // updateReceivedBytes(content_object);
- received_fec_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
-
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
+ updateReceivedBytes(content_object, true);
PacketState state = getPacketState(seq);
if (state != PacketState::LOST) {
@@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) {
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
}
+
addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
}
-void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt) {
+ uint32_t seq = content_object.getName().getSuffix();
packets_sent_to_app_++;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
// increase the recovered packet counter only if the packet was marked as LOST
// before.
@@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) {
if (state == PacketState::LOST) losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
+ updateReceivedBytes(content_object, false);
+
+ if (rtt == 0) return; // nothing to do
+
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+ if (path_it == path_table_.end()) {
+ // this is a new path and it must be a cache
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+ if (path->pathToProducer())
+ return; // this packet is coming from a producer
+ // even if we sent an RTX. this may happen
+ // for RTX that are sent too fast or in
+ // case of multipath
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
}
-void RTCState::onFecPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onFecPacketRecoveredRtx(
+ const core::ContentObject &content_object) {
// This is the same as onPacketRecoveredRtx, but in this is case the
// pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
losses_recovered_++;
+ updateReceivedBytes(content_object, true);
}
void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
@@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
packets_sent_to_app_++;
recovered_bytes_with_fec_ += size;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
@@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// channel losses
uint32_t last_round_packets = 0;
+ uint64_t min_edge_rtt = UINT_MAX;
std::shared_ptr<RTCDataPath> old_main_path = main_path_;
main_path_ = nullptr;
+ edge_path_ = nullptr;
for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- if (it->second->isActive()) {
+ if (it->second->isValidProducer()) {
uint32_t pkt = it->second->getPacketsLastRound();
if (pkt > last_round_packets) {
last_round_packets = pkt;
main_path_ = it->second;
}
+ } else if (it->second->isActive() && !it->second->pathToProducer()) {
+ // this is a path to a cache from where we are receiving content
+ if (it->second->getMinRtt() < min_edge_rtt) {
+ min_edge_rtt = it->second->getMinRtt();
+ edge_path_ = it->second;
+ }
}
it->second->roundEnd();
}
if (main_path_ == nullptr) main_path_ = old_main_path;
+ if (edge_path_ == nullptr) edge_path_ = main_path_;
+ if (edge_path_->getMinRtt() >= main_path_->getMinRtt())
+ edge_path_ = main_path_;
// in case we get a new main path we reset the stats of the old one. this is
// beacuse, in case we need to switch back we don't what to take decisions on
@@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
rounds_++;
}
-void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
- received_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec) {
+ if (isFec) {
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ } else {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ }
}
void RTCState::updatePacketSize(const core::ContentObject &content_object) {
@@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) {
addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
}
+void RTCState::updateHighestSeqReceived(uint32_t seq) {
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+}
+
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
@@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
switch (ProbeHandler::getProbeType(seq)) {
case ProbeType::INIT: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(probe));
+ const_cast<core::ContentObject &>(probe).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;
@@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
}
case core::PayloadType::MANIFEST: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(data));
+ const_cast<core::ContentObject &>(data).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 4bd2f76a0..ac3cc621f 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -84,8 +84,9 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
void onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats);
void onPacketLost(uint32_t seq);
- void onPacketRecoveredRtx(uint32_t seq);
- void onFecPacketRecoveredRtx(uint32_t seq);
+ void onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt);
+ void onFecPacketRecoveredRtx(const core::ContentObject &content_object);
void onPacketRecoveredFec(uint32_t seq, uint32_t size);
bool onProbePacketReceived(const core::ContentObject &probe);
void onJumpForward(uint32_t next_seq);
@@ -117,6 +118,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
return 0;
}
+ uint64_t getEdgeRtt() const {
+ if (edge_path_ != nullptr) return edge_path_->getMinRtt();
+ return 0;
+ }
+
void resetRttStats() {
if (mainPathIsValid()) main_path_->clearRtt();
}
@@ -149,7 +155,7 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
}
uint32_t getPendingInterestNumber() const {
- return pending_interests_.size();
+ return (uint32_t)pending_interests_.size();
}
PacketState getPacketState(uint32_t seq) {
@@ -242,6 +248,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// set it as TO_BE_RECEIVED.
void dataToBeReceived(uint32_t seq);
+ void updateHighestSeqReceived(uint32_t seq);
+
// Extract RTC parameters from probes (init or RTT probes) and data packets.
static core::ParamsRTC getProbeParams(const core::ContentObject &probe);
static core::ParamsRTC getDataParams(const core::ContentObject &data);
@@ -259,7 +267,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// update stats
void updateState();
- void updateReceivedBytes(const core::ContentObject &content_object);
+ void updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec);
void updatePacketSize(const core::ContentObject &content_object);
void updatePathStats(const core::ContentObject &content_object, bool is_nack);
void updateLossRate(bool in_sycn);
@@ -360,7 +369,12 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// paths stats
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
- std::shared_ptr<RTCDataPath> main_path_;
+ std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects
+ // the consumer to the producer. in
+ // case of multipath the trasnport
+ // uses the most active path
+ std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it
+ // exists
// packet received
// cache where to store info about the last MAX_CACHED_PACKETS
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
index 7b6330a1f..861ceee89 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.cc
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -22,11 +22,11 @@ namespace protocol {
namespace rtc {
RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_interval,
- double max_unverified_ratio)
+ uint32_t factor_relevant, uint32_t factor_alert)
: verifier_(verifier),
- max_unverified_interval_(max_unverified_interval),
- max_unverified_ratio_(max_unverified_ratio) {}
+ factor_relevant_(factor_relevant),
+ factor_alert_(factor_alert),
+ manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {}
void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
rtc_state_ = rtc_state;
@@ -36,12 +36,16 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
verifier_ = verifier;
}
-void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) {
- max_unverified_interval_ = max_unverified_interval;
+void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) {
+ factor_relevant_ = factor_relevant;
}
-void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) {
- max_unverified_ratio_ = max_unverified_ratio;
+void RTCVerifier::setFactorAlert(uint32_t factor_alert) {
+ factor_alert_ = factor_alert;
+}
+
+auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) {
+ return verifier_->verifyPackets(&interest);
}
auth::VerificationPolicy RTCVerifier::verify(
@@ -108,19 +112,27 @@ auth::VerificationPolicy RTCVerifier::verifyData(
auth::Suffix suffix = content_object.getName().getSuffix();
auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- Timestamp now = utils::SteadyTime::nowMs().count();
- // Flush old packets
- Timestamp oldest = flush_packets(now);
+ uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_;
+ uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_;
- // Add packet to map of unverified packets
- packets_unverif_.add(
- {.suffix = suffix, .timestamp = now, .size = content_object.length()},
- content_object.computeDigest(manifest_hash_algo_));
+ // Flush packets outside relevance window
+ for (auto it = packets_unverif_.set().begin();
+ it != packets_unverif_.set().end();) {
+ if (it->first > current_index_ - threshold_relevant) {
+ break;
+ }
+ packets_unverif_erased_.insert((unsigned int)it->first);
+ it = packets_unverif_.remove(it);
+ }
+
+ // Add packet to set of unverified packets
+ packets_unverif_.add({current_index_, suffix},
+ content_object.computeDigest(manifest_hash_algo_));
+ current_index_++;
- // Check that the ratio of unverified packets stays below the limit
- if (now - oldest < max_unverified_interval_ ||
- getBufferRatio() < max_unverified_ratio_) {
+ // Check that the number of unverified packets is below the alert threshold
+ if (packets_unverif_.set().size() <= threshold_alert) {
policy = auth::VerificationPolicy::ACCEPT;
}
@@ -139,18 +151,13 @@ auth::VerificationPolicy RTCVerifier::processManifest(
auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT;
// Decode manifest
- core::ContentObjectManifest manifest(content_object);
+ core::ContentObjectManifest manifest(content_object.shared_from_this());
manifest.decode();
- // Update last manifest
- if (suffix > last_manifest_) {
- last_manifest_ = suffix;
- }
-
- // Extract hash algorithm and hashes
+ // Extract manifest data
+ manifest_max_capacity_ = manifest.getMaxCapacity();
manifest_hash_algo_ = manifest.getHashAlgorithm();
- auth::Verifier::SuffixMap suffix_map =
- core::ContentObjectManifest::getSuffixMap(&manifest);
+ auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap();
// Return early if the manifest is empty
if (suffix_map.empty()) {
@@ -186,10 +193,7 @@ auth::VerificationPolicy RTCVerifier::processManifest(
for (const auto &p : policies) {
switch (p.second) {
case auth::VerificationPolicy::ACCEPT: {
- auto packet_unverif_it = packets_unverif_.packetIt(p.first);
- Packet packet_verif = *packet_unverif_it;
- packets_unverif_.remove(packet_unverif_it);
- packets_verif_.add(packet_verif);
+ packets_unverif_.remove(packets_unverif_.packet(p.first));
manifest_digests_.erase(p.first);
break;
}
@@ -209,69 +213,20 @@ void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
manifest_digests_.erase(suffix);
}
-void RTCVerifier::onJumpForward(uint32_t next_suffix) {
- if (next_suffix <= last_manifest_ + 1) {
- return;
- }
-
- // When we jump forward in the suffix sequence, we remove packets that won't
- // be verified. Those packets have a suffix in the range [last_manifest_ + 1,
- // next_suffix[.
- for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix;
- ++suffix) {
- auto packet_it = packets_unverif_.packetIt(suffix);
- if (packet_it != packets_unverif_.set().end()) {
- packets_unverif_.remove(packet_it);
- }
- }
-}
-
-double RTCVerifier::getBufferRatio() const {
- size_t total = packets_verif_.size() + packets_unverif_.size();
- double total_unverified = static_cast<double>(packets_unverif_.size());
- return total ? total_unverified / total : 0.0;
-}
-
-RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) {
- Timestamp oldest_verified = packets_verif_.set().empty()
- ? now
- : packets_verif_.set().begin()->timestamp;
- Timestamp oldest_unverified = packets_unverif_.set().empty()
- ? now
- : packets_unverif_.set().begin()->timestamp;
-
- // Prune verified packets older than the unverified interval
- for (auto it = packets_verif_.set().begin();
- it != packets_verif_.set().end();) {
- if (now - it->timestamp < max_unverified_interval_) {
- break;
- }
- it = packets_verif_.remove(it);
- }
-
- // Prune unverified packets older than the unverified interval
- for (auto it = packets_unverif_.set().begin();
- it != packets_unverif_.set().end();) {
- if (now - it->timestamp < max_unverified_interval_) {
- break;
- }
- packets_unverif_erased_.insert(it->suffix);
- it = packets_unverif_.remove(it);
- }
-
- return std::min(oldest_verified, oldest_unverified);
-}
-
std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add(
- const Packet &packet) {
+ const Packet &packet, const auth::CryptoHash &digest) {
auto inserted = packets_.insert(packet);
- size_ += inserted.second ? packet.size : 0;
+ if (inserted.second) {
+ packets_map_[packet.second] = inserted.first;
+ suffix_map_[packet.second] = digest;
+ }
return inserted;
}
RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove(
PacketSet::iterator packet_it) {
- size_ -= packet_it->size;
+ packets_map_.erase(packet_it->second);
+ suffix_map_.erase(packet_it->second);
return packets_.erase(packet_it);
}
@@ -279,35 +234,13 @@ const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const {
return packets_;
};
-size_t RTCVerifier::Packets::size() const { return size_; };
-
-std::pair<RTCVerifier::PacketSet::iterator, bool>
-RTCVerifier::PacketsUnverif::add(const Packet &packet,
- const auth::CryptoHash &digest) {
- auto inserted = add(packet);
- if (inserted.second) {
- packets_map_[packet.suffix] = inserted.first;
- digests_map_[packet.suffix] = digest;
- }
- return inserted;
-}
-
-RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove(
- PacketSet::iterator packet_it) {
- size_ -= packet_it->size;
- packets_map_.erase(packet_it->suffix);
- digests_map_.erase(packet_it->suffix);
- return packets_.erase(packet_it);
-}
-
-RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt(
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet(
auth::Suffix suffix) {
return packets_map_.at(suffix);
};
-const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap()
- const {
- return digests_map_;
+const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const {
+ return suffix_map_;
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
index 098984057..c83faf08a 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.h
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -27,19 +27,16 @@ namespace rtc {
class RTCVerifier {
public:
explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_interval,
- double max_unverified_ratio);
+ uint32_t factor_relevant, uint32_t factor_alert);
virtual ~RTCVerifier() = default;
void setState(std::shared_ptr<RTCState> rtc_state);
-
void setVerifier(std::shared_ptr<auth::Verifier> verifier);
+ void setFactorRelevant(uint32_t factor_relevant);
+ void setFactorAlert(uint32_t factor_alert);
- void setMaxUnverifiedInterval(uint32_t max_unverified_interval);
-
- void setMaxUnverifiedRatio(double max_unverified_ratio);
-
+ auth::VerificationPolicy verify(core::Interest &interest);
auth::VerificationPolicy verify(core::ContentObject &content_object,
bool is_fec = false);
auth::VerificationPolicy verifyProbe(core::ContentObject &content_object);
@@ -51,81 +48,47 @@ class RTCVerifier {
auth::VerificationPolicy processManifest(core::ContentObject &content_object);
void onDataRecoveredFec(uint32_t suffix);
- void onJumpForward(uint32_t next_suffix);
-
- double getBufferRatio() const;
protected:
- struct Packet;
- using Timestamp = uint64_t;
+ using Index = uint64_t;
+ using Packet = std::pair<Index, auth::Suffix>;
using PacketSet = std::set<Packet>;
- struct Packet {
- auth::Suffix suffix;
- Timestamp timestamp;
- size_t size;
-
- bool operator==(const Packet &b) const {
- return timestamp == b.timestamp && suffix == b.suffix;
- }
- bool operator<(const Packet &b) const {
- return timestamp == b.timestamp ? suffix < b.suffix
- : timestamp < b.timestamp;
- }
- };
-
class Packets {
public:
- virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet);
- virtual PacketSet::iterator remove(PacketSet::iterator packet_it);
- const PacketSet &set() const;
- size_t size() const;
-
- protected:
- PacketSet packets_;
- size_t size_;
- };
-
- class PacketsVerif : public Packets {};
-
- class PacketsUnverif : public Packets {
- public:
- using Packets::add;
std::pair<PacketSet::iterator, bool> add(const Packet &packet,
const auth::CryptoHash &digest);
- PacketSet::iterator remove(PacketSet::iterator packet_it) override;
- PacketSet::iterator packetIt(auth::Suffix suffix);
+ PacketSet::iterator remove(PacketSet::iterator packet_it);
+ const PacketSet &set() const;
+ PacketSet::iterator packet(auth::Suffix suffix);
const auth::Verifier::SuffixMap &suffixMap() const;
private:
+ PacketSet packets_;
std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_;
- auth::Verifier::SuffixMap digests_map_;
+ auth::Verifier::SuffixMap suffix_map_;
};
// The RTC state.
std::shared_ptr<RTCState> rtc_state_;
// The verifier instance.
std::shared_ptr<auth::Verifier> verifier_;
- // Window to consider when verifying packets.
- uint32_t max_unverified_interval_;
- // Ratio of unverified packets over which an alert is triggered.
- double max_unverified_ratio_;
- // The suffix of the last processed manifest.
- auth::Suffix last_manifest_;
+ // Used to compute the relevance windows size (in packets).
+ uint32_t factor_relevant_;
+ // Used to compute the alert threshold (in packets).
+ uint32_t factor_alert_;
+ // The maximum number of entries a manifest can contain.
+ uint8_t manifest_max_capacity_;
// Hash algorithm used by manifests.
auth::CryptoHashType manifest_hash_algo_;
// Digests extracted from all manifests received.
auth::Verifier::SuffixMap manifest_digests_;
- // Verified packets with timestamp >= now - max_unverified_interval_.
- PacketsVerif packets_verif_;
- // Unverified packets with timestamp >= now - max_unverified_interval_.
- PacketsUnverif packets_unverif_;
- // Unverified erased packets with timestamp < now - max_unverified_interval_.
+ // The number of data packets processed.
+ Index current_index_;
+ // Unverified packets with index in relevance window.
+ Packets packets_unverif_;
+ // Unverified erased packets with index outside relevance window.
std::unordered_set<auth::Suffix> packets_unverif_erased_;
-
- // Flushes all packets with timestamp < now - max_unverified_interval_.
- // Returns the timestamp of the oldest packet, verified or not.
- Timestamp flush_packets(Timestamp now);
};
} // namespace rtc
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
index a73b9fb7b..b1803709b 100644
--- a/libtransport/src/protocols/transport_protocol.cc
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -79,6 +79,7 @@ int TransportProtocol::start() {
&on_payload_);
socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
+ socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_);
// Set it is the first time we schedule an interest
is_first_ = true;
@@ -143,14 +144,22 @@ void TransportProtocol::sendInterest(
Packet::Format format;
socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
format);
+ size_t signature_size = 0;
- auto interest =
- core::PacketManager<>::getInstance().getPacket<Interest>(format);
+ // If aggregated interest, add spapce for signature
+ if (len > 0) {
+ format = Packet::toAHFormat(format);
+ signature_size = signer_->getSignatureFieldSize();
+ }
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(
+ format, signature_size);
interest->setName(interest_name);
for (uint32_t i = 0; i < len; i++) {
interest->appendSuffix(additional_suffixes->at(i));
}
+ interest->encodeSuffixes();
uint32_t lifetime = default_values::interest_lifetime;
socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
@@ -165,7 +174,16 @@ void TransportProtocol::sendInterest(
return;
}
- portal_->sendInterest(std::move(interest));
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) lifetime = ceil((double)lifetime * 0.9);
+
+ // Compute signature
+ bool is_ah = _is_ah(interest->getFormat());
+ if (is_ah) signer_->signPacket(interest.get());
+
+ portal_->sendInterest(interest, lifetime);
}
void TransportProtocol::onError(const std::error_code &ec) {
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h
index ad8cf0346..e71992561 100644
--- a/libtransport/src/protocols/transport_protocol.h
+++ b/libtransport/src/protocols/transport_protocol.h
@@ -64,7 +64,7 @@ class TransportProtocol
*
* @return The header length in bytes.
*/
- virtual std::size_t transportHeaderLength() { return 0; }
+ virtual std::size_t transportHeaderLength(bool isFEC) { return 0; }
virtual void scheduleNextInterests() = 0;
@@ -141,6 +141,9 @@ class TransportProtocol
bool is_async_;
fec::FECType fec_type_;
+
+ // Signer for aggregated interests
+ std::shared_ptr<auth::Signer> signer_;
};
} // end namespace protocol