aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt59
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc141
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h93
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc1101
-rw-r--r--libtransport/src/protocols/rtc/rtc.h151
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h214
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc251
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h114
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc217
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h82
-rw-r--r--libtransport/src/protocols/rtc/rtc_indexer.h174
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc236
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h85
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h256
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc.h62
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc74
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_iat.cc287
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_iat.h93
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.h48
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.cc109
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.h43
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc420
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h181
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc147
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h55
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc143
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h53
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc174
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h71
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc65
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc67
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc900
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h410
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc248
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h96
39 files changed, 7165 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
new file mode 100644
index 000000000..be8e0189c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -0,0 +1,59 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.cc
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
new file mode 100644
index 000000000..60eceeb19
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service)
+ : probe_interval_(0),
+ max_probes_(0),
+ sent_probes_(0),
+ recv_probes_(0),
+ probe_timer_(std::make_unique<asio::steady_timer>(io_service)),
+ rand_eng_((std::random_device())()),
+ distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ),
+ send_probe_callback_(std::move(send_callback)) {}
+
+ProbeHandler::~ProbeHandler() {}
+
+uint64_t ProbeHandler::getRtt(uint32_t seq, bool is_valid) {
+ auto it = pending_probes_.find(seq);
+
+ if (it == pending_probes_.end()) return 0;
+
+ if (!is_valid) {
+ // delete the probe anyway
+ pending_probes_.erase(it);
+ valid_batch_ = false;
+ return 0;
+ }
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ uint64_t rtt = now - it->second;
+ if (rtt < 1) rtt = 1;
+
+ pending_probes_.erase(it);
+ recv_probes_++;
+
+ return rtt;
+}
+
+double ProbeHandler::getProbeLossRate() {
+ if (!valid_batch_) return 1.0;
+ return 1.0 - ((double)recv_probes_ / (double)sent_probes_);
+}
+
+void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) {
+ DCHECK(min <= max && min >= MIN_PROBE_SEQ);
+ distr_ = std::uniform_int_distribution<uint32_t>(min, max);
+}
+
+void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) {
+ stopProbes();
+ probe_interval_ = probe_interval;
+ max_probes_ = max_probes;
+}
+
+void ProbeHandler::stopProbes() {
+ probe_interval_ = 0;
+ max_probes_ = 0;
+ sent_probes_ = 0;
+ recv_probes_ = 0;
+ valid_batch_ = true;
+ probe_timer_->cancel();
+}
+
+void ProbeHandler::sendProbes() {
+ if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+ auto s = self.lock();
+ if (s) {
+ s->generateProbe();
+ }
+ });
+}
+
+void ProbeHandler::generateProbe() {
+ if (probe_interval_ == 0) return;
+ if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ uint32_t seq = distr_(rand_eng_);
+ pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+ send_probe_callback_(seq);
+ sent_probes_++;
+
+ // clean up
+ // a probe may get lost. if the pending_probes_ size becomes bigger than
+ // MAX_PENDING_PROBES remove all the probes older than a seconds
+ if (pending_probes_.size() > MAX_PENDING_PROBES) {
+ for (auto it = pending_probes_.begin(); it != pending_probes_.end();) {
+ if ((now - it->second) > 1000)
+ it = pending_probes_.erase(it);
+ else
+ it++;
+ }
+ }
+
+ sendProbes();
+}
+
+ProbeType ProbeHandler::getProbeType(uint32_t seq) {
+ if (MIN_INIT_PROBE_SEQ <= seq && seq <= MAX_INIT_PROBE_SEQ) {
+ return ProbeType::INIT;
+ }
+ if (MIN_RTT_PROBE_SEQ <= seq && seq <= MAX_RTT_PROBE_SEQ) {
+ return ProbeType::RTT;
+ }
+ return ProbeType::NOT_PROBE;
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
new file mode 100644
index 000000000..d989194d4
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/asio_wrapper.h>
+
+#include <functional>
+#include <random>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+enum class ProbeType {
+ NOT_PROBE,
+ INIT,
+ RTT,
+};
+
+class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
+ public:
+ using SendProbeCallback = std::function<void(uint32_t)>;
+
+ public:
+ ProbeHandler(SendProbeCallback &&send_callback, asio::io_service &io_service);
+
+ ~ProbeHandler();
+
+ // If the function returns 0 the probe is not valid.
+ uint64_t getRtt(uint32_t seq, bool is_valid);
+
+ // this function may return a residual loss rate higher than the real one if
+ // we don't wait enough time for the probes to come back
+ double getProbeLossRate();
+
+ // Set the probe suffix range [min, max]
+ void setSuffixRange(uint32_t min, uint32_t max);
+
+ // Reset the probes parameters and stops the current probing.
+ // probe_interval = 0 means that no event will be scheduled.
+ // max_probe = 0 means no limit to the number of probe to send.
+ void setProbes(uint32_t probe_interval, uint32_t max_probes);
+
+ void stopProbes();
+
+ void sendProbes();
+
+ static ProbeType getProbeType(uint32_t seq);
+
+ private:
+ void generateProbe();
+
+ uint32_t probe_interval_; // us
+ uint32_t max_probes_; // packets
+ uint32_t sent_probes_; // packets
+ uint32_t recv_probes_; // packets
+
+ bool valid_batch_; // if at least one probe in a batch is considered not
+ // valid (e.g. prod rate == ~0) the full batch is invalid.
+ // the bool is set to true when sendProbe is called
+
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+
+ // Map from packet suffixes to timestamp
+ std::unordered_map<uint32_t, uint64_t> pending_probes_;
+
+ // Random generator
+ std::default_random_engine rand_eng_;
+ std::uniform_int_distribution<uint32_t> distr_;
+
+ SendProbeCallback send_probe_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
new file mode 100644
index 000000000..9a56269f3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -0,0 +1,1101 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <math.h>
+#include <protocols/errors.h>
+#include <protocols/incremental_indexer_bytestream.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_indexer.h>
+#include <protocols/rtc/rtc_rc_congestion_detection.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
+ new RtcReassembly(icn_socket, this)),
+ max_aggregated_interest_(1),
+ number_(0) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ round_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ scheduler_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ pacing_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {}
+
+void RTCTransportProtocol::resume() {
+ newRound();
+ TransportProtocol::resume();
+}
+
+std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) {
+ return DATA_HEADER_SIZE +
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0);
+}
+
+// private
+void RTCTransportProtocol::initParams() {
+ TransportProtocol::reset();
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+
+ fwd_strategy_.setCallback([self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy);
+ }
+ });
+
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ uint32_t factor_relevant;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ factor_relevant);
+
+ uint32_t factor_alert;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ factor_alert);
+
+ rc_ = std::make_shared<RTCRateControlCongestionDetection>();
+ ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ indexer_verifier_.get(), portal_->getThread().getIoService(),
+ interface::RtcTransportRecoveryStrategies::RTX_ONLY,
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendRtxInterest(seq);
+ }
+ },
+ [self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy);
+ }
+ });
+
+ verifier_ =
+ std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert);
+
+ state_ = std::make_shared<RTCState>(
+ indexer_verifier_.get(),
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendProbeInterest(seq);
+ }
+ },
+ [self]() {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->discoveredRtt();
+ }
+ },
+ portal_->getThread().getIoService());
+
+ rc_->setState(state_);
+ rc_->turnOnRateControl();
+ ldr_->setState(state_.get());
+ ldr_->setRateControl(rc_.get());
+ verifier_->setState(state_);
+
+ // protocol state
+ start_send_interest_ = false;
+ current_state_ = SyncState::catch_up;
+
+ // Cancel timer
+ number_++;
+ round_timer_->cancel();
+
+ scheduler_timer_->cancel();
+ scheduler_timer_on_ = false;
+ last_interest_sent_time_ = 0;
+ last_interest_sent_seq_ = 0;
+
+ // Aggregated interests setup
+ bool aggregated_interests_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS,
+ aggregated_interests_on);
+ if (aggregated_interests_on) {
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS"))
+ max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr));
+ else
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+
+ max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_,
+ 1 + MAX_SUFFIXES_IN_MANIFEST);
+ }
+ LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_;
+
+ max_sent_int_ =
+ std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
+
+ pacing_timer_->cancel();
+ pacing_timer_on_ = false;
+
+ // delete all timeouts and future nacks
+ timeouts_or_nacks_.clear();
+
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ RTC_INTEREST_LIFETIME);
+
+ // init state params
+ state_->initParams();
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called";
+ initParams();
+ newRound();
+}
+
+void RTCTransportProtocol::inactiveProducer() {
+ // when the producer is inactive we reset the consumer state
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_
+ << ", max_sync_win_: " << max_sync_win_;
+
+ // names/packets var
+ indexer_verifier_->reset();
+ indexer_verifier_->enableFec(fec_type_);
+ indexer_verifier_->setNFec(0);
+
+ ldr_->clear();
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ round_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) {
+ return;
+ }
+
+ auto ptr = self.lock();
+
+ if (!ptr || !ptr->isRunning()) {
+ return;
+ }
+
+ auto &state = ptr->state_;
+
+ // saving counters that will be reset on new round
+ uint32_t sent_retx = state->getSentRtxInRound();
+ uint32_t received_bytes =
+ (state->getReceivedBytesInRound() + // data packets received
+ state->getReceivedFecBytesInRound()); // fec packets received
+ uint32_t sent_interest = state->getSentInterestInRound();
+ uint32_t lost_data = state->getLostData();
+ uint32_t definitely_lost = state->getDefinitelyLostPackets();
+ uint32_t recovered_losses = state->getRecoveredLosses();
+ uint32_t received_nacks = state->getReceivedNacksInRound();
+ uint32_t received_fec = state->getReceivedFecPackets();
+
+ // update sync state if needed
+ double cache_rate = state->getPacketFromCacheRatio();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+
+ if (ptr->current_state_ == SyncState::in_sync) {
+ if (cache_rate > MAX_DATA_FROM_CACHE) {
+ ptr->current_state_ = SyncState::catch_up;
+ }
+ } else {
+ if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
+ cache_rate < MAX_DATA_FROM_CACHE) {
+ ptr->current_state_ = SyncState::in_sync;
+ }
+ }
+
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Calling updateSyncWindow in newRound function";
+ ptr->updateSyncWindow();
+
+ ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ definitely_lost, recovered_losses, received_nacks,
+ received_fec);
+ ptr->fwd_strategy_.checkStrategy();
+ ptr->newRound();
+ });
+}
+
+void RTCTransportProtocol::discoveredRtt() {
+ start_send_interest_ = true;
+ uint32_t strategy;
+ socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
+ ldr_->changeRecoveryStrategy(
+ (interface::RtcTransportRecoveryStrategies)strategy);
+
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) ldr_->setContentSharingMode();
+ ldr_->turnOnRecovery();
+ ldr_->onNewRound(false);
+
+ // set forwarding strategy switch if selected
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ Prefix prefix(*name, 128);
+ fwd_strategy_.initFwdStrategy(
+ portal_, prefix, state_.get(),
+ (interface::RtcTransportRecoveryStrategies)strategy);
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::computeMaxSyncWindow() {
+ double production_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ if (production_rate == 0.0 || packet_size == 0.0) {
+ // the consumer has no info about the producer,
+ // keep the previous maxCWin
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Returning in computeMaxSyncWindow because: prod_rate: "
+ << (production_rate == 0.0)
+ << " || packet_size: " << (packet_size == 0.0);
+ return;
+ }
+
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE))
+ production_rate = MIN_PROD_RATE_SHARING_MODE;
+
+ production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
+
+ max_sync_win_ = (uint32_t)ceil(
+ (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) /
+ packet_size);
+
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow());
+}
+
+void RTCTransportProtocol::updateSyncWindow() {
+ computeMaxSyncWindow();
+
+ if (max_sync_win_ == INITIAL_WIN_MAX) {
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return;
+
+ current_sync_win_ = INITIAL_WIN;
+ scheduleNextInterests();
+ return;
+ }
+
+ double prod_rate = state_->getProducerRate();
+ double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
+ double packet_size = state_->getAveragePacketSize();
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE))
+ prod_rate = MIN_PROD_RATE_SHARING_MODE;
+
+ // if some of the info are not available do not update the current win
+ if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0);
+
+ current_sync_win_ +=
+ ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
+
+ if (current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
+ }
+
+ uint32_t min_win = WIN_MIN;
+ bool aggregated_data_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA,
+ aggregated_data_on);
+ if (aggregated_data_on) {
+ min_win = WIN_MIN_WITH_AGGREGARED_DATA;
+ min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3));
+ }
+
+ current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
+ current_sync_win_ = std::max(current_sync_win_, min_win);
+ }
+
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ if (!start_send_interest_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq;
+ interest_name->setSuffix(seq);
+ sendInterest(*interest_name);
+}
+
+void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq;
+ interest_name->setSuffix(seq);
+ sendInterest(*interest_name);
+}
+
+void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ // we got a timeout for this packet so it is not pending anymore
+ interest_name->setSuffix(seq);
+ state_->onSendNewInterest(interest_name);
+ sendInterest(*interest_name);
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
+
+ if (!isRunning() && !is_first_) {
+ return;
+ }
+
+ if (pacing_timer_on_) {
+ return; // wait pacing timer for the next send
+ }
+
+ if (!start_send_interest_) {
+ return; // RTT discovering phase is not finished so
+ // do not start to send interests
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer.";
+ // here we keep seding the same interest until the producer
+ // does not start again
+ if (indexer_verifier_->checkNextSuffix() != 0) {
+ // the producer just become inactive, reset the state
+ inactiveProducer();
+ }
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ uint32_t next_seg = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg;
+ interest_name->setSuffix(next_seg);
+
+ if (portal_->interestIsPending(*interest_name)) {
+ // if interest 0 is already pending we return
+ return;
+ }
+
+ sendInterest(*interest_name);
+ state_->onSendNewInterest(interest_name);
+ return;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Pending interest number: " << state_->getPendingInterestNumber()
+ << " -- current_sync_win_: " << current_sync_win_;
+
+ uint32_t pending = state_->getPendingInterestNumber();
+ uint32_t pending_fec = state_->getPendingFecPackets();
+
+ if ((pending - pending_fec) >= current_sync_win_)
+ return; // no space in the window
+
+ // XXX double check if aggregated interests are still working here
+ if ((current_sync_win_ - (pending - pending_fec)) <
+ max_aggregated_interest_) {
+ if (scheduler_timer_on_) return; // timer already scheduled
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ uint64_t time = now - last_interest_sent_time_;
+ if (time < WAIT_FOR_INTEREST_BATCH) {
+ uint64_t next = WAIT_FOR_INTEREST_BATCH - time;
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(std::chrono::milliseconds(next));
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->scheduler_timer_on_) return;
+ ptr->scheduler_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
+ });
+ return; // wait for the timer
+ }
+ }
+
+ scheduler_timer_on_ = false;
+ scheduler_timer_->cancel();
+
+ // skip nacked pacekts
+ if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) {
+ indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1);
+ }
+
+ // skip received packets
+ uint32_t max_received = state_->getHighestSeqReceivedInOrder();
+ if (indexer_verifier_->checkNextSuffix() <= max_received) {
+ indexer_verifier_->jumpToIndex(max_received + 1);
+ }
+
+ uint32_t sent_interests = 0;
+ uint32_t sent_packets = 0;
+ uint32_t aggregated_counter = 0;
+ Name *name = nullptr;
+ Name interest_name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes;
+
+ while (((state_->getPendingInterestNumber() -
+ state_->getPendingFecPackets()) < current_sync_win_) &&
+ (sent_interests < max_sent_int_)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "In while loop. Window size: " << current_sync_win_;
+
+ uint32_t next_seg = indexer_verifier_->getNextSuffix();
+ name->setSuffix(next_seg);
+
+ // send the packet only if:
+ // 1) it is not pending yet (not true for rtx)
+ // 2) the packet is not received or def lost
+ // 3) is not in the rtx list
+ // 4) is fec and is not in order (!= last sent + 1)
+ PacketState packet_state = state_->getPacketState(next_seg);
+ if (portal_->interestIsPending(*name) ||
+ packet_state == PacketState::RECEIVED ||
+ packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) ||
+ (indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "skip interest " << next_seg << " because: pending "
+ << portal_->interestIsPending(*name) << ", recv or lost"
+ << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg))
+ << ", is old fec "
+ << ((indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1));
+ continue;
+ }
+
+ if (aggregated_counter == 0) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg;
+ interest_name = *name;
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg;
+ additional_suffixes[aggregated_counter - 1] = next_seg;
+ }
+
+ last_interest_sent_seq_ = next_seg;
+ state_->onSendNewInterest(name);
+ aggregated_counter++;
+
+ if (aggregated_counter >= max_aggregated_interest_) {
+ sent_packets++;
+ sent_interests++;
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
+ aggregated_counter = 0;
+ }
+ }
+
+ // exiting the while we may have some pending interest to send
+ if (aggregated_counter != 0) {
+ sent_packets++;
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
+ }
+
+ if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) <
+ current_sync_win_) {
+ // we still have space in the window but we already sent too many packets
+ // wait PACING_WAIT to avoid drops in the kernel
+
+ pacing_timer_on_ = true;
+ pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT));
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->pacing_timer_on_) return;
+
+ ptr->pacing_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
+ });
+ }
+}
+
+void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
+ const Name &name) {
+ uint32_t segment_number = name.getSuffix();
+
+ if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ PacketState state = state_->getPacketState(segment_number);
+ if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) {
+ // we may recover a packets using fec, ignore this timer
+ return;
+ }
+
+ timeouts_or_nacks_.insert(segment_number);
+ if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
+ segment_number <= state_->getHighestSeqReceived()) {
+ // we retransmit packets only if the producer is active, otherwise we
+ // use timeouts to avoid to send too much traffic
+ //
+ // a timeout is sent using RTX only if it is an old packet. if it is for a
+ // seq number that we didn't reach yet, we send the packet using the normal
+ // schedule next interest
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "handle timeout for packet " << segment_number << " using rtx";
+ if (ldr_->isRtxOn()) {
+ if (indexer_verifier_->isFec(segment_number)) {
+ // if this is a fec packet we do not recover it with rtx so we consider
+ // the packet to be lost
+ ldr_->onTimeout(segment_number, true);
+ state_->onTimeout(segment_number, true);
+ } else {
+ ldr_->onTimeout(segment_number, false);
+ state_->onTimeout(segment_number, false);
+ }
+ } else {
+ // in this case we wil never recover the timeout
+ ldr_->onTimeout(segment_number, true);
+ state_->onTimeout(segment_number, true);
+ }
+ scheduleNextInterests();
+ return;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number
+ << " using normal interests";
+
+ if (segment_number < indexer_verifier_->checkNextSuffix()) {
+ // this is a timeout for a packet that will be generated in the future but
+ // we are asking for higher sequence numbers. we need to go back like in the
+ // case of future nacks
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << segment_number;
+ // add an extra space in the window
+ indexer_verifier_->jumpToIndex(segment_number);
+ }
+
+ state_->onTimeout(segment_number, false);
+ sendInterestForTimeout(segment_number);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ struct nack_packet_t *nack =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = nack->getProductionSegment();
+ uint32_t nack_segment = content_object.getName().getSuffix();
+ bool is_rtx = ldr_->isRtx(nack_segment);
+
+ // check if the packet got a timeout
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment
+ << ". Production segment: " << production_seg;
+
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(nack_segment);
+ if (tn_it != timeouts_or_nacks_.end() || is_rtx) {
+ compute_stats = false;
+ // remove packets from timeouts_or_nacks only in case of a past nack
+ }
+
+ state_->onNackPacketReceived(content_object, compute_stats);
+ ldr_->onNackPacketReceived(content_object);
+
+ // both in case of past and future nack we jump to the
+ // production segment in the nack. In case of past nack we will skip unneded
+ // interest (this is already done in the scheduleNextInterest in any case)
+ // while in case of future nacks we can go back in time and ask again for the
+ // content that generated the nack
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On nack next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << production_seg;
+ indexer_verifier_->jumpToIndex(production_seg);
+
+ if (production_seg > nack_segment) {
+ // remove the nack is it exists
+ if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+
+ state_->onJumpForward(production_seg);
+ // the client is asking for content in the past
+ // switch to catch up state and increase the window
+ // this is true only if the packet is not an RTX
+ if (!is_rtx) current_state_ = SyncState::catch_up;
+ } else {
+ // if production_seg == nack_segment we consider this a future nack, since
+ // production_seg is not yet created. this may happen in case of low
+ // production rate (e.g. ping at 1pps)
+
+ // if a future nack was also retransmitted add it to the timeout_or_nacks
+ // set
+ if (is_rtx) timeouts_or_nacks_.insert(nack_segment);
+
+ // the client is asking for content in the future
+ // switch to in sync state and decrease the window
+ current_state_ = SyncState::in_sync;
+ }
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
+ uint32_t suffix = content_object.getName().getSuffix();
+ ParamsRTC params = RTCState::getProbeParams(content_object);
+
+ if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) {
+ fec::FECType fec_type = params.fec_type;
+
+ if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) {
+ // Update FEC type
+ fec_type_ = fec_type;
+
+ // Enable FEC
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this,
+ std::placeholders::_1),
+ fec::FECBase::BufferRequested(0));
+
+ // Update FEC parameters
+ indexer_verifier_->enableFec(fec_type);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type),
+ fec::FECUtils::getSourceSymbols(fec_type));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
+ } else if (fec_type == fec::FECType::UNKNOWN) {
+ indexer_verifier_->disableFec();
+ }
+ }
+
+ if (!state_->onProbePacketReceived(content_object)) return;
+
+ // As for NACKs, set next_segment
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "on probe next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << params.prod_seg;
+ indexer_verifier_->jumpToIndex(params.prod_seg);
+
+ bool loss_detected = ldr_->onProbePacketReceived(content_object);
+ // we are not out of sync here but we are starting to download content from
+ // the cache, maybe beacuse the production rate increased suddenly. for this
+ // reason we put the state to catch up to increase the window
+ if (loss_detected) current_state_ = SyncState::catch_up;
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onContentObjectReceived(
+ Interest &interest, ContentObject &content_object, std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received content object of size: " << content_object.payloadSize();
+
+ uint32_t segment_number = content_object.getName().getSuffix();
+ PayloadType payload_type = content_object.getPayloadType();
+ PacketState state;
+
+ ContentObject *content_ptr = &content_object;
+ ContentObject::Ptr manifest_ptr = nullptr;
+
+ bool is_probe =
+ ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE;
+ bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
+ bool is_fec = indexer_verifier_->isFec(segment_number);
+ bool is_manifest =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST;
+ bool is_data =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA;
+ bool compute_stats = is_data || is_manifest;
+
+ ec = make_error_code(protocol_error::not_reassemblable);
+
+ // A helper function to process manifests or data packets received
+ auto onDataPacketReceived = [this](ContentObject &content_object,
+ bool compute_stats) {
+ ldr_->onDataPacketReceived(content_object);
+ rc_->onDataPacketReceived(content_object, compute_stats);
+ updateSyncWindow();
+ };
+
+ // First verify the packet signature and apply the corresponding policy
+ auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec);
+ indexer_verifier_->applyPolicy(interest, content_object, false, policy);
+
+ if (is_probe) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number;
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onProbe(content_object);
+ return;
+ }
+
+ if (is_nack) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number;
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onNack(content_object);
+ return;
+ }
+
+ // content_ptr will point either to the input data packet or to a manifest
+ // whose FEC header has been removed
+ if (is_manifest) {
+ manifest_ptr = removeFecHeader(content_object);
+ if (manifest_ptr) {
+ content_ptr = manifest_ptr.get();
+ }
+ }
+
+ // From there, the packet is either a FEC, a manifest or a data packet.
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number;
+
+ // Do not count timed out packets in stats
+ auto tn_it = timeouts_or_nacks_.find(segment_number);
+ if (tn_it != timeouts_or_nacks_.end()) {
+ compute_stats = false;
+ timeouts_or_nacks_.erase(tn_it);
+ }
+
+ // Do not count retransmissions or losses in stats
+ if (ldr_->isRtx(segment_number) ||
+ ldr_->isPossibleLossWithNoRtx(segment_number)) {
+ compute_stats = false;
+ }
+
+ // Fetch packet state
+ state = state_->getPacketState(segment_number);
+
+ // Check if the packet is a retransmission
+ if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
+ if (is_data || is_manifest) {
+ uint64_t rtt = ldr_->getRtxRtt(segment_number);
+ state_->onPacketRecoveredRtx(content_object, rtt);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
+ }
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+
+ // The packet is considered received, return early
+ onDataPacketReceived(*content_ptr, compute_stats);
+ // this is a rtx but we may need to feed it in the decoder
+ decodePacket(content_object, is_manifest);
+ return;
+ }
+
+ if (is_fec) {
+ state_->onFecPacketRecoveredRtx(content_object);
+ }
+ }
+
+ // Fetch packet state again; it may have changed
+ state = state_->getPacketState(segment_number);
+
+ // Check if the packet was already received
+ if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received duplicated content " << segment_number << ", drop it";
+ ec = make_error_code(protocol_error::duplicated_content);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
+ }
+
+ if (!is_fec) {
+ state_->dataToBeReceived(segment_number);
+ }
+
+ // send packet to the decoder
+ decodePacket(content_object, is_manifest);
+
+ // We can return early if FEC
+ if (is_fec) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number;
+ state_->onFecPacketReceived(content_object);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
+ }
+
+ // The packet may have been already sent to the app by the decoder, check
+ // again if it is already received
+ state = state_->getPacketState(
+ segment_number); // state == RECEIVED or TO_BE_RECEIVED
+
+ if (state != PacketState::RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << (is_manifest ? "Received manifest " : "Received data ")
+ << segment_number;
+
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
+ }
+
+ state_->onDataPacketReceived(*content_ptr, compute_stats);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+ }
+
+ onDataPacketReceived(*content_ptr, compute_stats);
+}
+
+void RTCTransportProtocol::sendStatsToApp(
+ uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
+ uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses,
+ uint32_t received_nacks, uint32_t received_fec) {
+ if (*stats_summary_) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(state_->getQueuing());
+
+ // stats_->updateInterestFecTx(0); //todo must be implemented
+ // stats_->updateBytesFecRecv(0); //todo must be implemented
+
+ stats_->updateRetxCount(retx_count);
+ stats_->updateBytesRecv(received_bytes);
+ stats_->updateInterestTx(sent_interests);
+ stats_->updateReceivedNacks(received_nacks);
+ stats_->updateReceivedFEC(received_fec);
+
+ stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
+ stats_->updateLossRatio(state_->getPerSecondLossRate());
+ uint64_t rtt = state_->getAvgRTT();
+ stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000));
+
+ stats_->updateQueuingDelay(state_->getQueuing());
+ stats_->updateLostData(lost_data);
+ stats_->updateDefinitelyLostData(definitely_lost);
+ stats_->updateRecoveredData(recovered_losses);
+ stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ bool in_congestion = rc_->inCongestionState();
+ stats_->updateCongestionState(in_congestion);
+ double residual_losses = state_->getResidualLossRate();
+ stats_->updateResidualLossRate(residual_losses);
+ stats_->updateQualityScore(state_->getQualityScore());
+
+ // set alerts
+ if (rtt > MAX_RTT)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+
+ if (in_congestion)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION);
+ else
+ stats_->clearAlert(
+ interface::TransportStatistics::statsAlerts::CONGESTION);
+
+ if (residual_losses > MAX_RESIDUAL_LOSSES)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES);
+ }
+}
+
+void RTCTransportProtocol::decodePacket(ContentObject &content_object,
+ bool is_manifest) {
+ if (!fec_decoder_) return;
+
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << content_object.getName() << " to FEC decoder";
+
+ uint32_t offset =
+ is_manifest
+ ? (uint32_t)content_object.headerSize()
+ : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE);
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+}
+
+void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
+ Packet::Format format;
+ socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
+ format);
+
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+
+ for (auto &packet : packets) {
+ uint32_t seq_number = packet.getIndex();
+ uint32_t metadata = packet.getMetadata();
+ fec::buffer buffer = packet.getBuffer();
+
+ PayloadType payload_type = static_cast<PayloadType>(metadata);
+ switch (payload_type) {
+ case PayloadType::DATA:
+ case PayloadType::MANIFEST:
+ break;
+ case PayloadType::UNSPECIFIED:
+ default:
+ payload_type = PayloadType::DATA;
+ break;
+ }
+
+ switch (state_->getPacketState(seq_number)) {
+ case PacketState::RECEIVED:
+ case PacketState::TO_BE_RECEIVED: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Packet " << seq_number << " already received";
+ break;
+ }
+ default: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Recovered packet " << seq_number << " through FEC";
+
+ if (payload_type == PayloadType::MANIFEST) {
+ name->setSuffix(seq_number);
+
+ auto interest =
+ core::PacketManager<>::getInstance().getPacket<Interest>(format);
+ interest->setName(*name);
+
+ auto content_object = toContentObject(
+ *name, format, payload_type, buffer->data(), buffer->length());
+
+ processManifest(*interest, *content_object);
+ }
+
+ state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length());
+ ldr_->onPacketRecoveredFec(seq_number);
+
+ if (payload_type == PayloadType::DATA) {
+ verifier_->onDataRecoveredFec(seq_number);
+ reassembly_->reassemble(*buffer, seq_number);
+ }
+
+ break;
+ }
+ }
+ }
+}
+
+void RTCTransportProtocol::processManifest(Interest &interest,
+ ContentObject &manifest) {
+ auth::VerificationPolicy policy = verifier_->processManifest(manifest);
+ indexer_verifier_->applyPolicy(interest, manifest, false, policy);
+}
+
+ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
+ const ContentObject &content_object) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) {
+ return nullptr;
+ }
+
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize(false);
+ const uint8_t *payload =
+ content_object.data() + content_object.headerSize() + fec_header_size;
+ size_t payload_size = content_object.payloadSize() - fec_header_size;
+
+ ContentObject::Ptr co =
+ toContentObject(content_object.getName(), content_object.getFormat(),
+ content_object.getPayloadType(), payload, payload_size);
+
+ return co;
+}
+
+ContentObject::Ptr RTCTransportProtocol::toContentObject(
+ const Name &name, Packet::Format format, PayloadType payload_type,
+ const uint8_t *payload, std::size_t payload_size,
+ std::size_t additional_header_size) {
+ // Recreate ContentObject
+ ContentObject::Ptr co =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ format, additional_header_size);
+ co->updateLength(payload_size);
+ co->append(payload_size);
+ co->trimStart(co->headerSize());
+
+ // Copy payload
+ std::memcpy(co->writableData(), payload, payload_size);
+
+ // Restore network headers and some fields
+ co->prepend(co->headerSize());
+ co->setName(name);
+ co->setPayloadType(payload_type);
+
+ return co;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
new file mode 100644
index 000000000..a8a474216
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/rtc/rtc_forwarding_strategy.h>
+#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_reassembly.h>
+#include <protocols/rtc/rtc_state.h>
+#include <protocols/rtc/rtc_verifier.h>
+#include <protocols/transport_protocol.h>
+
+#include <unordered_set>
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCTransportProtocol : public TransportProtocol {
+ public:
+ RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ using TransportProtocol::start;
+
+ using TransportProtocol::stop;
+
+ void resume() override;
+
+ std::size_t transportHeaderLength(bool isFEC) override;
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ enum class SyncState { catch_up = 0, in_sync = 1, last };
+
+ private:
+ // setup functions
+ void initParams();
+ void reset() override;
+
+ void inactiveProducer();
+
+ // protocol functions
+ void discoveredRtt();
+ void newRound();
+
+ // window functions
+ void computeMaxSyncWindow();
+ void updateSyncWindow();
+
+ // packet functions
+ void sendRtxInterest(uint32_t seq);
+ void sendProbeInterest(uint32_t seq);
+ void sendInterestForTimeout(uint32_t seq);
+ void scheduleNextInterests() override;
+ void onInterestTimeout(Interest::Ptr &interest, const Name &name) override;
+ void onNack(const ContentObject &content_object);
+ void onProbe(const ContentObject &content_object);
+ void onContentObjectReceived(Interest &interest,
+ ContentObject &content_object,
+ std::error_code &ec) override;
+ void onPacketDropped(Interest &interest, ContentObject &content_object,
+ const std::error_code &reason) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
+ void processManifest(Interest &interest, ContentObject &manifest);
+
+ // interaction with app functions
+ void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
+ uint32_t sent_interests, uint32_t lost_data,
+ uint32_t definitely_lost, uint32_t recovered_losses,
+ uint32_t received_nacks, uint32_t received_fec);
+
+ // FEC functions
+ // send the received content object to the decoder
+ void decodePacket(ContentObject &content_object, bool is_manifest);
+ void onFecPackets(fec::BufferArray &packets);
+
+ // Utils
+ ContentObject::Ptr removeFecHeader(const ContentObject &content_object);
+ ContentObject::Ptr toContentObject(const Name &name, Packet::Format format,
+ PayloadType payload_type,
+ const uint8_t *payload,
+ std::size_t payload_size,
+ std::size_t additional_header_size = 0);
+
+ // protocol state
+ bool start_send_interest_;
+ SyncState current_state_;
+
+ // cwin vars
+ uint32_t current_sync_win_;
+ uint32_t max_sync_win_;
+
+ // round timer
+ std::unique_ptr<asio::steady_timer> round_timer_;
+
+ // scheduler timer (postpone interest sending to explot aggregated interests)
+ std::unique_ptr<asio::steady_timer> scheduler_timer_;
+ bool scheduler_timer_on_;
+ uint64_t last_interest_sent_time_;
+ uint64_t last_interest_sent_seq_;
+
+ // maximum aggregated interest. if the transport is connected to the forwarder
+ // we cannot use aggregated interests
+ uint32_t max_aggregated_interest_;
+ // maximum number of intereset that can be sent in a loop to avoid packets
+ // dropped by the kernel
+ uint32_t max_sent_int_;
+
+ // pacing timer (do not send too many interests in a short time to avoid
+ // packet drops in the kernel)
+ std::unique_ptr<asio::steady_timer> pacing_timer_;
+ bool pacing_timer_on_;
+
+ // timeouts
+ std::unordered_set<uint32_t> timeouts_or_nacks_;
+
+ std::shared_ptr<RTCState> state_;
+ std::shared_ptr<RTCRateControl> rc_;
+ std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
+ std::shared_ptr<RTCVerifier> verifier_;
+
+ // forwarding strategy selection
+ RTCForwardingStrategy fwd_strategy_;
+
+ uint32_t number_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
new file mode 100644
index 000000000..29b5a3a12
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/rtc/rtc_packet.h>
+#include <stdint.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// used in rtc
+// protocol consts
+const uint32_t ROUND_LEN = 200;
+// ms interval of time on which
+// we take decisions / measurements
+const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
+// how big (in ms) should be the buffer at the producer.
+// increasing this number we increase the time that an
+// interest will wait for the data packet to be produced
+// at the producer socket
+const uint32_t PRODUCER_BUFFER_MS = 300; // ms
+
+// interest scheduler
+// const uint32_t MAX_INTERESTS_IN_BATCH = 5;
+// const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per
+ // aggregated interest packet
+ // considering the name itself
+const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try
+ // to aggregate interest in the
+ // same packet
+const uint32_t MAX_PACING_BATCH = 5; // number of interest that we can send
+ // inside the loop before they get dropped
+ // by the kernel.
+const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As
+ // for MAX_PACING_BATCH this value was
+ // computed during tests
+const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop
+
+// packet const
+const uint32_t RTC_INTEREST_LIFETIME = 4000;
+
+// probes sequence range
+const uint32_t MIN_PROBE_SEQ = 0xefffffff;
+const uint32_t MIN_INIT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MAX_INIT_PROBE_SEQ = 0xf7ffffff - 1;
+const uint32_t MIN_RTT_PROBE_SEQ = MAX_INIT_PROBE_SEQ + 1;
+const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
+// RTT_PROBE_INTERVAL will be used during the section while
+// INIT_RTT_PROBE_INTERVAL is used at the beginning to
+// quickily estimate the RTT
+const uint32_t RTT_PROBE_INTERVAL = 200000; // us
+const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
+const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
+// if the produdcer is not yet started we need to probe multple times
+// to get an answer. we wait 100ms between each try
+const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
+// once we get the first probe we wait at most 60ms for the others
+const uint32_t INIT_RTT_PROBE_WAIT =
+ ((INIT_RTT_PROBES * INIT_RTT_PROBE_INTERVAL) / 1000) * 2; // ms
+// we reuires at least 5 probes to be recevied
+const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms
+const uint32_t MAX_PENDING_PROBES = 10;
+
+// congestion
+const double MAX_QUEUING_DELAY = 50.0; // ms
+
+// data from cache
+const double MAX_DATA_FROM_CACHE = 0.10; // 10%
+
+// window const
+const uint32_t INITIAL_WIN = 5; // pkts
+const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
+const uint32_t WIN_MIN = 5; // pkts
+const uint32_t WIN_MIN_WITH_AGGREGARED_DATA = 10; // pkts
+const double CATCH_UP_WIN_INCREMENT = 1.2;
+// used in rate control
+const double WIN_DECREASE_FACTOR = 0.5;
+const double WIN_INCREASE_FACTOR = 1.5;
+const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes
+
+// round in congestion
+const double ROUNDS_BEFORE_TAKE_ACTION = 5;
+
+// used in state
+const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3;
+const double PRODUCTION_RATE_FRACTION = 0.8;
+
+const uint32_t INIT_PACKET_SIZE = 1200;
+
+const double MOVING_AVG_ALPHA = 0.8;
+
+const double MILLI_IN_A_SEC = 1000.0;
+const double MICRO_IN_A_SEC = 1000000.0;
+const uint32_t ROUNDS_PER_SEC = (uint32_t)(MILLI_IN_A_SEC / ROUND_LEN);
+const uint32_t ROUNDS_PER_MIN = (uint32_t)ROUNDS_PER_SEC * 60;
+
+const uint32_t MAX_ROUND_WHIOUT_PACKETS =
+ (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
+
+// used in ldr
+const uint32_t RTC_MAX_RTX = 100;
+const uint32_t RTC_MAX_AGE = 60000; // in ms
+const uint64_t MAX_TIMER_RTX = ~0;
+const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
+const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
+const double CATCH_UP_RTT_INCREMENT = 1.2;
+const double MAX_RESIDUAL_LOSS_RATE = 1.0; // %
+const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC;
+const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms
+
+// used by producer
+const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
+const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window *
+ // rounds in a second
+const uint32_t FEC_PACING_TIME = 5; // ms
+
+// aggregated data consts
+const uint16_t MAX_RTC_PAYLOAD_SIZE = 1200; // bytes
+const uint16_t MAX_AGGREGATED_PACKETS = 5; // pkt
+const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms
+
+// alert thresholds
+const uint32_t MAX_RTT = 200; // ms
+const double MAX_RESIDUAL_LOSSES = 0.05; // %
+
+const uint8_t FEC_MATRIX[64][10] = {
+ {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1
+ {1, 2, 3, 3, 4, 5, 5, 6, 7, 9},
+ {2, 2, 3, 4, 5, 6, 7, 8, 9, 11},
+ {2, 3, 4, 5, 5, 7, 8, 9, 11, 13},
+ {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5
+ {2, 3, 4, 6, 7, 8, 10, 12, 14, 16},
+ {2, 4, 5, 6, 8, 9, 11, 13, 15, 18},
+ {3, 4, 5, 7, 8, 10, 12, 14, 16, 19},
+ {3, 4, 6, 7, 9, 11, 13, 15, 18, 21},
+ {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10
+ {3, 5, 6, 8, 10, 12, 14, 17, 20, 24},
+ {3, 5, 7, 8, 10, 13, 15, 18, 21, 26},
+ {3, 5, 7, 9, 11, 13, 16, 19, 23, 27},
+ {3, 5, 7, 9, 12, 14, 17, 20, 24, 28},
+ {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15
+ {4, 6, 8, 10, 13, 15, 19, 22, 26, 31},
+ {4, 6, 8, 11, 13, 16, 19, 23, 27, 33},
+ {4, 6, 9, 11, 14, 17, 20, 24, 29, 34},
+ {4, 6, 9, 11, 14, 17, 21, 25, 30, 35},
+ {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20
+ {4, 7, 9, 12, 15, 19, 22, 27, 32, 38},
+ {4, 7, 10, 13, 16, 19, 23, 28, 33, 40},
+ {5, 7, 10, 13, 16, 20, 24, 29, 34, 41},
+ {5, 7, 10, 13, 17, 20, 25, 30, 35, 42},
+ {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25
+ {5, 8, 11, 14, 18, 22, 26, 31, 38, 45},
+ {5, 8, 11, 15, 18, 22, 27, 32, 39, 46},
+ {5, 8, 11, 15, 19, 23, 28, 33, 40, 48},
+ {5, 8, 12, 15, 19, 24, 28, 34, 41, 49},
+ {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30
+ {5, 9, 12, 16, 20, 25, 30, 36, 43, 51},
+ {5, 9, 13, 16, 21, 25, 31, 37, 44, 53},
+ {6, 9, 13, 17, 21, 26, 31, 38, 45, 54},
+ {6, 9, 13, 17, 22, 26, 32, 39, 46, 55},
+ {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35
+ {6, 10, 14, 18, 22, 28, 34, 40, 48, 58},
+ {6, 10, 14, 18, 23, 28, 34, 41, 49, 59},
+ {6, 10, 14, 19, 23, 29, 35, 42, 50, 60},
+ {6, 10, 14, 19, 24, 29, 36, 43, 52, 62},
+ {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40
+ {6, 11, 15, 20, 25, 31, 37, 45, 54, 64},
+ {6, 11, 15, 20, 25, 31, 38, 46, 55, 65},
+ {7, 11, 15, 20, 26, 32, 39, 46, 56, 67},
+ {7, 11, 16, 21, 26, 32, 39, 47, 57, 68},
+ {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45
+ {7, 11, 16, 21, 27, 33, 41, 49, 59, 70},
+ {7, 12, 16, 22, 27, 34, 41, 50, 60, 72},
+ {7, 12, 17, 22, 28, 34, 42, 51, 61, 73},
+ {7, 12, 17, 22, 28, 35, 43, 52, 62, 74},
+ {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50
+ {7, 12, 17, 23, 29, 36, 44, 53, 64, 77},
+ {7, 12, 18, 23, 30, 37, 45, 54, 65, 78},
+ {7, 13, 18, 24, 30, 37, 45, 55, 66, 79},
+ {8, 13, 18, 24, 31, 38, 46, 56, 67, 80},
+ {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55
+ {8, 13, 19, 25, 31, 39, 47, 57, 69, 83},
+ {8, 13, 19, 25, 32, 39, 48, 58, 70, 84},
+ {8, 13, 19, 25, 32, 40, 49, 59, 71, 85},
+ {8, 14, 19, 26, 33, 41, 50, 60, 72, 86},
+ {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60
+ {8, 14, 20, 26, 34, 42, 51, 61, 74, 89},
+ {8, 14, 20, 27, 34, 42, 52, 62, 75, 90},
+ {8, 14, 20, 27, 34, 43, 52, 63, 76, 91},
+ {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
new file mode 100644
index 000000000..a421396b1
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -0,0 +1,251 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <protocols/rtc/rtc_data_path.h>
+#include <stdlib.h>
+
+#include <algorithm>
+#include <cfloat>
+#include <chrono>
+#include <cmath>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+#define AVG_RTT_TIME 1000 // (ms) 1sec
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCDataPath::RTCDataPath(uint32_t path_id)
+ : path_id_(path_id),
+ min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ max_rtt(0),
+ prev_max_rtt(0),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(DBL_MAX),
+ queuing_delay(DBL_MAX),
+ jitter_(0.0),
+ last_owd_(0),
+ largest_recv_seq_(0),
+ largest_recv_seq_time_(0),
+ avg_inter_arrival_(DBL_MAX),
+ rtt_sum_(0),
+ last_avg_rtt_compute_(0),
+ rtt_samples_(0),
+ avg_rtt_(0.0),
+ received_nacks_(false),
+ received_packets_(0),
+ rounds_without_packets_(0),
+ last_received_data_packet_(0),
+ min_RTT_history_(HISTORY_LEN),
+ max_RTT_history_(HISTORY_LEN),
+ OWD_history_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(
+ const utils::SteadyTime::Milliseconds& rtt_milliseconds, bool is_probe) {
+ // compute min rtt
+ uint64_t rtt = rtt_milliseconds.count();
+ if (rtt < min_rtt) min_rtt = rtt;
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ last_received_data_packet_ = now;
+
+ // compute avg rtt
+ if (is_probe) {
+ // max rtt is computed only on probes to avoid to take into account the
+ // production time at the server
+ if (rtt > max_rtt) max_rtt = rtt;
+
+ rtt_sum_ += rtt;
+ rtt_samples_++;
+ }
+
+ if ((now - last_avg_rtt_compute_) >= AVG_RTT_TIME) {
+ // compute a new avg rtt
+ // if rtt_samples_ = 0 keep the same rtt
+ if (rtt_samples_ != 0) avg_rtt_ = (double)rtt_sum_ / (double)rtt_samples_;
+
+ rtt_sum_ = 0;
+ rtt_samples_ = 0;
+ last_avg_rtt_compute_ = now;
+ }
+
+ received_packets_++;
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ if (avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ int64_t queueVal = owd - std::min(getMinOwd(), min_owd);
+
+ if (queuing_delay != DBL_MAX)
+ queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC);
+ else {
+ queuing_delay = queueVal;
+ }
+
+ // keep track of the jitter computed as for RTP (RFC 3550)
+ int64_t diff = std::abs(owd - last_owd_);
+ last_owd_ = owd;
+ jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
+}
+
+void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
+ // got packet in sequence, compute gap
+ if (largest_recv_seq_ == (segment_number - 1)) {
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ uint64_t delta = now - largest_recv_seq_time_;
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ = now;
+ if (avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ =
+ (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
+ return;
+ }
+
+ // ooo packet, update the stasts if needed
+ if (largest_recv_seq_ <= segment_number) {
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ = utils::SteadyTime::nowMs().count();
+ }
+}
+
+void RTCDataPath::receivedNack() { received_nacks_ = true; }
+
+double RTCDataPath::getInterArrivalGap() {
+ if (avg_inter_arrival_ == DBL_MAX) return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isValidProducer() {
+ if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
+
+bool RTCDataPath::isActive() {
+ if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true;
+ return false;
+}
+
+bool RTCDataPath::pathToProducer() {
+ if (received_nacks_) return true;
+ return false;
+}
+
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ // same for max_rtt
+ if (max_rtt != 0) {
+ prev_max_rtt = max_rtt;
+ } else {
+ max_rtt = prev_max_rtt;
+ }
+
+ if (min_rtt == 0) min_rtt = 1;
+ if (max_rtt == 0) max_rtt = 1;
+
+ min_RTT_history_.pushBack(min_rtt);
+ max_RTT_history_.pushBack(max_rtt);
+ min_rtt = UINT_MAX;
+ max_rtt = 0;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ if (min_owd != INT_MAX) {
+ OWD_history_.pushBack(min_owd);
+ min_owd = INT_MAX;
+ }
+
+ if (received_packets_ == 0)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = 0;
+}
+
+uint32_t RTCDataPath::getPathId() { return path_id_; }
+
+double RTCDataPath::getQueuingDealy() {
+ if (queuing_delay == DBL_MAX) return 0;
+ return queuing_delay;
+}
+
+uint64_t RTCDataPath::getMinRtt() {
+ if (min_RTT_history_.size() != 0) return min_RTT_history_.begin();
+ return 0;
+}
+
+uint64_t RTCDataPath::getAvgRtt() { return std::round(avg_rtt_); }
+
+uint64_t RTCDataPath::getMaxRtt() {
+ if (max_RTT_history_.size() != 0) return max_RTT_history_.begin();
+ return 0;
+}
+
+int64_t RTCDataPath::getMinOwd() {
+ if (OWD_history_.size() != 0) return OWD_history_.begin();
+ return INT_MAX;
+}
+
+double RTCDataPath::getJitter() { return jitter_; }
+
+uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; }
+
+uint32_t RTCDataPath::getPacketsLastRound() { return received_packets_; }
+
+void RTCDataPath::clearRtt() {
+ min_RTT_history_.clear();
+ max_RTT_history_.clear();
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
new file mode 100644
index 000000000..ba5201fe8
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <stdint.h>
+#include <utils/max_filter.h>
+#include <utils/min_filter.h>
+
+#include <climits>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const double ALPHA_RTC = 0.125;
+const uint32_t HISTORY_LEN = 20; // 4 sec
+
+class RTCDataPath {
+ public:
+ RTCDataPath(uint32_t path_id);
+
+ public:
+ void insertRttSample(const utils::SteadyTime::Milliseconds &rtt,
+ bool is_probe);
+ void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segment_number);
+ void receivedNack();
+
+ uint32_t getPathId();
+ uint64_t getMinRtt();
+ uint64_t getAvgRtt();
+ uint64_t getMaxRtt();
+ double getQueuingDealy();
+ double getInterArrivalGap();
+ double getJitter();
+ bool isActive(); // pakets recevied from this path in the last rounds
+ bool pathToProducer(); // path from a producer
+ bool isValidProducer(); // path from a producer that is also active
+ uint64_t getLastPacketTS();
+ uint32_t getPacketsLastRound();
+
+ void clearRtt();
+
+ void roundEnd();
+
+ private:
+ uint32_t path_id_;
+
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ uint64_t max_rtt;
+ uint64_t prev_max_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ double jitter_;
+ int64_t last_owd_;
+
+ uint32_t largest_recv_seq_;
+ uint64_t largest_recv_seq_time_;
+ double avg_inter_arrival_;
+
+ // compute the avg rtt over one sec
+ uint64_t rtt_sum_;
+ uint64_t last_avg_rtt_compute_;
+ uint32_t rtt_samples_;
+ double avg_rtt_;
+
+ // flags to check if a path is active
+ // we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ // and if we receives packets
+ bool received_nacks_;
+ uint32_t received_packets_;
+ uint32_t rounds_without_packets_; // if we don't get any packet
+ // for MAX_ROUNDS_WITHOUT_PKTS
+ // we consider the path inactive
+ uint64_t last_received_data_packet_; // timestamp for the last data received
+ // on this path
+
+ utils::MinFilter<uint64_t> min_RTT_history_;
+ utils::MaxFilter<uint64_t> max_RTT_history_;
+ utils::MinFilter<int64_t> OWD_history_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
new file mode 100644
index 000000000..4bbd7eac0
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/notification.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_forwarding_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace transport::interface;
+
+const double FWD_MAX_QUEUE = 30.0; // ms
+const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms
+const double FWD_MAX_LOSS_RATE = 0.1;
+
+RTCForwardingStrategy::RTCForwardingStrategy()
+ : low_rate_app_(false),
+ init_(false),
+ forwarder_set_(false),
+ selected_strategy_(NONE),
+ current_strategy_(NONE),
+ rounds_since_last_set_(0),
+ portal_(nullptr),
+ state_(nullptr) {}
+
+RTCForwardingStrategy::~RTCForwardingStrategy() {}
+
+void RTCForwardingStrategy::setCallback(
+ interface::StrategyCallback&& callback) {
+ callback_ = std::move(callback);
+}
+
+void RTCForwardingStrategy::initFwdStrategy(
+ std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state,
+ interface::RtcTransportRecoveryStrategies strategy) {
+ switch (strategy) {
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES:
+ init_ = true;
+ low_rate_app_ = true;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = BEST_PATH;
+ current_strategy_ = BEST_PATH;
+ break;
+ case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION:
+ init_ = true;
+ low_rate_app_ = false;
+ selected_strategy_ = REPLICATION;
+ current_strategy_ = REPLICATION;
+ break;
+ case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF:
+ case interface::RtcTransportRecoveryStrategies::RTX_ONLY:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY:
+ case interface::RtcTransportRecoveryStrategies::DELAY_BASED:
+ case interface::RtcTransportRecoveryStrategies::LOW_RATE:
+ case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES:
+ default:
+ // fwd strategies are not used
+ init_ = false;
+ }
+
+ if (init_) {
+ rounds_since_last_set_ = 0;
+ prefix_ = prefix;
+ portal_ = portal;
+ state_ = state;
+ }
+}
+
+void RTCForwardingStrategy::checkStrategy() {
+ strategy_t used_strategy = selected_strategy_;
+ if (used_strategy == BOTH) used_strategy = current_strategy_;
+ assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
+ used_strategy == NONE);
+
+ notification::ForwardingStrategy strategy =
+ notification::ForwardingStrategy::NONE;
+ switch (used_strategy) {
+ case BEST_PATH:
+ strategy = notification::ForwardingStrategy::BEST_PATH;
+ break;
+ case REPLICATION:
+ strategy = notification::ForwardingStrategy::REPLICATION;
+ break;
+ default:
+ break;
+ }
+ callback_(strategy);
+
+ if (!init_) return;
+
+ if (selected_strategy_ == NONE) return;
+
+ if (selected_strategy_ == BEST_PATH) {
+ checkStrategyBestPath();
+ return;
+ }
+
+ if (selected_strategy_ == REPLICATION) {
+ checkStrategyReplication();
+ return;
+ }
+
+ checkStrategyBoth();
+}
+
+void RTCForwardingStrategy::checkStrategyBestPath() {
+ if (!forwarder_set_) {
+ setStrategy(BEST_PATH);
+ forwarder_set_ = true;
+ return;
+ }
+
+ if (low_rate_app_) {
+ // this is used for gaming
+ uint8_t qs = state_->getQualityScore();
+
+ if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
+
+ // try to switch path
+ setStrategy(BEST_PATH);
+ } else {
+ if (rounds_since_last_set_ < 25) { // wait a least 5 sec
+ // between each switch
+ rounds_since_last_set_++;
+ return;
+ }
+
+ double queue = state_->getQueuing();
+ double rtt = state_->getAvgRTT();
+ double loss_rate = state_->getPerSecondLossRate();
+
+ if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT ||
+ loss_rate > FWD_MAX_LOSS_RATE) {
+ // try to switch path
+ setStrategy(BEST_PATH);
+ }
+ }
+}
+
+void RTCForwardingStrategy::checkStrategyReplication() {
+ if (!forwarder_set_) {
+ setStrategy(REPLICATION);
+ forwarder_set_ = true;
+ return;
+ }
+
+ // here we have nothing to do for the moment
+ return;
+}
+
+void RTCForwardingStrategy::checkStrategyBoth() {
+ if (!forwarder_set_) {
+ setStrategy(current_strategy_);
+ forwarder_set_ = true;
+ return;
+ }
+
+ checkStrategyBestPath();
+
+ // TODO
+ // for the moment we use only best path.
+ // for later:
+ // 1. if both paths are bad use replication
+ // 2. while using replication compute the effectiveness. if the majority of
+ // the packets are coming from a single path, try to use bestpath
+}
+
+void RTCForwardingStrategy::setStrategy(strategy_t strategy) {
+ rounds_since_last_set_ = 0;
+ current_strategy_ = strategy;
+ portal_->setForwardingStrategy(prefix_,
+ string_strategies_[current_strategy_]);
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
new file mode 100644
index 000000000..c2227e09f
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/portal.h>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <array>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCForwardingStrategy {
+ public:
+ enum strategy_t {
+ BEST_PATH,
+ REPLICATION,
+ BOTH,
+ NONE,
+ };
+
+ RTCForwardingStrategy();
+ ~RTCForwardingStrategy();
+
+ void initFwdStrategy(std::shared_ptr<core::Portal> portal,
+ core::Prefix& prefix, RTCState* state,
+ interface::RtcTransportRecoveryStrategies strategy);
+
+ void checkStrategy();
+ void setCallback(interface::StrategyCallback&& callback);
+
+ private:
+ void checkStrategyBestPath();
+ void checkStrategyReplication();
+ void checkStrategyBoth();
+
+ void setStrategy(strategy_t strategy);
+
+ std::array<std::string, 4> string_strategies_ = {"bestpath", "replication",
+ "both", "none"};
+
+ bool low_rate_app_; // if set to true the best path strategy will
+ // trigger a path switch based on the quality
+ // score, otherwise it will use the RTT,
+ // queuing delay and loss rate
+ bool init_; // true if all val are initializes
+ bool forwarder_set_; // true if the strategy is been set at least
+ // once
+ strategy_t selected_strategy_; // this is the strategy selected using socket
+ // options. this can also be equal to BOTH
+ strategy_t current_strategy_; // if both strategies can be used this
+ // indicates the one that is currently in use
+ // that can be only replication or best path
+ uint32_t rounds_since_last_set_;
+ core::Prefix prefix_;
+ std::shared_ptr<core::Portal> portal_;
+ RTCState* state_;
+ interface::StrategyCallback callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h
new file mode 100644
index 000000000..f87fcaaa2
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_indexer.h
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/errors.h>
+#include <protocols/fec_utils.h>
+#include <protocols/indexer.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/transport_protocol.h>
+#include <utils/suffix_strategy.h>
+
+#include <deque>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+namespace rtc {
+
+template <uint32_t LIMIT = MIN_PROBE_SEQ>
+class RtcIndexer : public Indexer {
+ public:
+ RtcIndexer(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport)
+ : Indexer(icn_socket, transport),
+ first_suffix_(1),
+ next_suffix_(first_suffix_),
+ fec_type_(fec::FECType::UNKNOWN),
+ n_fec_(0),
+ n_current_fec_(n_fec_) {}
+
+ RtcIndexer(RtcIndexer &&other) : Indexer(other) {}
+
+ ~RtcIndexer() {}
+
+ void reset() override {
+ next_suffix_ = first_suffix_;
+ n_fec_ = 0;
+ }
+
+ uint32_t checkNextSuffix() const override { return next_suffix_; }
+
+ uint32_t getNextSuffix() override {
+ if (isFec(next_suffix_)) {
+ if (n_current_fec_) {
+ auto ret = next_suffix_++;
+ n_current_fec_--;
+ return ret;
+ } else {
+ n_current_fec_ = n_fec_;
+ next_suffix_ = nextSource(next_suffix_);
+ }
+ } else if (!n_current_fec_) {
+ n_current_fec_ = n_fec_;
+ }
+
+ return (next_suffix_++ % LIMIT);
+ }
+
+ void setFirstSuffix(uint32_t suffix) override {
+ first_suffix_ = suffix % LIMIT;
+ }
+
+ uint32_t getFirstSuffix() const override { return first_suffix_; }
+
+ uint32_t jumpToIndex(uint32_t index) override {
+ next_suffix_ = index % LIMIT;
+ return next_suffix_;
+ }
+
+ void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object,
+ bool reassembly) override {
+ if (reassembly) {
+ reassembly_->reassemble(content_object);
+ }
+ }
+
+ /**
+ * Retrieve the next segment to be reassembled.
+ */
+ uint32_t getNextReassemblySegment() override {
+ throw errors::RuntimeException(
+ "Get reassembly segment called on rtc indexer. RTC indexer does not "
+ "provide reassembly.");
+ }
+
+ bool isFinalSuffixDiscovered() override { return true; }
+
+ uint32_t getFinalSuffix() const override { return LIMIT; }
+
+ void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; }
+
+ void disableFec() override { fec_type_ = fec::FECType::UNKNOWN; }
+
+ void setNFec(uint32_t n_fec) override {
+ n_fec_ = n_fec;
+ n_current_fec_ = n_fec_;
+ }
+
+ uint32_t getNFec() const override { return n_fec_; }
+
+ bool isFec(uint32_t index) override {
+ return isFec(fec_type_, index, first_suffix_);
+ }
+
+ double getFecOverhead() const override {
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return 0;
+ }
+
+ double k = (double)fec::FECUtils::getSourceSymbols(fec_type_);
+ return (double)n_fec_ / k;
+ }
+
+ double getMaxFecOverhead() const override {
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return 0;
+ }
+
+ double k = (double)fec::FECUtils::getSourceSymbols(fec_type_);
+ double n = (double)fec::FECUtils::getBlockSymbols(fec_type_);
+ return (double)(n - k) / k;
+ }
+
+ static bool isFec(fec::FECType fec_type, uint32_t index,
+ uint32_t first_suffix) {
+ if (index < LIMIT) {
+ return fec::FECUtils::isFec(fec_type, index, first_suffix);
+ }
+
+ return false;
+ }
+
+ static uint32_t nextSource(fec::FECType fec_type, uint32_t index,
+ uint32_t first_suffix) {
+ return fec::FECUtils::nextSource(fec_type, index, first_suffix) % LIMIT;
+ }
+
+ private:
+ uint32_t nextSource(uint32_t index) {
+ return nextSource(fec_type_, index, first_suffix_);
+ }
+
+ private:
+ uint32_t first_suffix_;
+ uint32_t next_suffix_;
+ fec::FECType fec_type_;
+ bool fec_enabled_;
+ uint32_t n_fec_;
+ uint32_t n_current_fec_;
+};
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
new file mode 100644
index 000000000..6e88a8636
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -0,0 +1,236 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rs_delay.h>
+#include <protocols/rtc/rtc_rs_fec_only.h>
+#include <protocols/rtc/rtc_rs_low_rate.h>
+#include <protocols/rtc/rtc_rs_recovery_off.h>
+#include <protocols/rtc/rtc_rs_rtx_only.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <algorithm>
+#include <unordered_set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
+ Indexer *indexer, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies type,
+ RecoveryStrategy::SendRtxCallback &&callback,
+ interface::StrategyCallback &&external_callback) {
+ if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
+ rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
+ rs_ = std::make_shared<RecoveryStrategyDelayBased>(
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
+ rs_ = std::make_shared<RecoveryStrategyFecOnly>(
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ rs_ = std::make_shared<RecoveryStrategyLowRate>(
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ } else {
+ // default
+ type = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
+ rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
+ indexer, std::move(callback), io_service, type,
+ std::move(external_callback));
+ }
+}
+
+RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
+
+void RTCLossDetectionAndRecovery::changeRecoveryStrategy(
+ interface::RtcTransportRecoveryStrategies type) {
+ if (type == rs_->getType()) return;
+
+ rs_->updateType(type);
+ if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
+ rs_ =
+ std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ DELAY_AND_REPLICATION) {
+ rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY ||
+ type == interface::RtcTransportRecoveryStrategies::
+ FEC_ONLY_LOW_RES_LOSSES) {
+ rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get())));
+ } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_BESTPATH ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION ||
+ type == interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ rs_ = std::make_shared<RecoveryStrategyLowRate>(std::move(*(rs_.get())));
+ } else {
+ // default
+ rs_ = std::make_shared<RecoveryStrategyRtxOnly>(std::move(*(rs_.get())));
+ }
+}
+
+void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
+ rs_->incRoundId();
+ rs_->onNewRound(in_sync);
+}
+
+bool RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
+ if (!lost) {
+ return detectLoss(seq, seq + 1, false);
+ } else {
+ rs_->onLostTimeout(seq);
+ }
+ return false;
+}
+
+bool RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
+ rs_->receivedPacket(seq);
+ return false;
+}
+
+bool RTCLossDetectionAndRecovery::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ uint32_t seq = content_object.getName().getSuffix();
+ bool is_rtx = rs_->isRtx(seq);
+ rs_->receivedPacket(seq);
+ bool ret = false;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received data. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq;
+ if (!is_rtx)
+ ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false);
+
+ rs_->getState()->updateHighestSeqReceived(seq);
+ return ret;
+}
+
+bool RTCLossDetectionAndRecovery::onNackPacketReceived(
+ const core::ContentObject &nack) {
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint32_t production_seq = nack_pkt->getProductionSegment();
+ uint32_t seq = nack.getName().getSuffix();
+
+ // received a nack. we can try to recover all data packets between the last
+ // received data and the production seq in the nack. this is similar to the
+ // recption of a probe
+ // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
+ // with productionSeq = 18. this says that all the packets between 12 and 18
+ // may got lost and we should ask them
+
+ rs_->receivedPacket(seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
+
+ // if it is a future nack store it in the list set of nacked seq
+ if (production_seq <= seq) rs_->receivedFutureNack(seq);
+
+ // call the detectLoss function using the probe flag = true. in fact the
+ // losses detected using nacks are the same as the one detected using probes,
+ // we should not increase the loss counter
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
+ production_seq, true);
+}
+
+bool RTCLossDetectionAndRecovery::onProbePacketReceived(
+ const core::ContentObject &probe) {
+ // we don't log the reception of a probe packet for the sentinel timer because
+ // probes are not taken into account into the sync window. we use them as
+ // future nacks to detect possible packets lost
+
+ uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg;
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from "
+ << rs_->getState()->getHighestSeqReceived() + 1
+ << " to " << production_seq;
+
+ return detectLoss(rs_->getState()->getHighestSeqReceived() + 1,
+ production_seq, true);
+}
+
+bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop,
+ bool recv_probe) {
+ if (start >= stop) return false;
+
+ // skip nacked packets
+ if (start <= rs_->getState()->getLastSeqNacked()) {
+ start = rs_->getState()->getLastSeqNacked() + 1;
+ }
+
+ // skip received or lost packets
+ if (start <= rs_->getState()->getHighestSeqReceived()) {
+ start = rs_->getState()->getHighestSeqReceived() + 1;
+ }
+
+ bool loss_detected = false;
+ for (uint32_t seq = start; seq < stop; seq++) {
+ if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) {
+ if (rs_->lossDetected(seq)) {
+ loss_detected = true;
+ if ((recv_probe || rs_->wasNacked(seq)) && !rs_->isFecOn()) {
+ // these losses were detected using a probe and fec is off.
+ // in this case most likelly the procotol is about to go out of sync
+ // and the packets are not really lost (e.g. increase in prod rate).
+ // for this reason we do not
+ // count the losses in the stats. Instead we do the following
+ // 1. send RTX for the packets in case they were really lost
+ // 2. return to the RTC protocol that a loss was detected using a
+ // probe. the protocol will switch to catch_up mode to increase the
+ // size of the window
+ rs_->requestPossibleLostPacket(seq);
+ } else {
+ // if fec is on we don't need to mask pontetial losses, so increase
+ // the loss rate
+ rs_->notifyNewLossDetedcted(seq);
+ }
+ }
+ }
+ }
+ return loss_detected;
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
new file mode 100644
index 000000000..24f22ffed
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+// RtcTransportRecoveryStrategies
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+#include <functional>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCLossDetectionAndRecovery
+ : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> {
+ public:
+ RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies type,
+ RecoveryStrategy::SendRtxCallback &&callback,
+ interface::StrategyCallback &&external_callback);
+
+ ~RTCLossDetectionAndRecovery();
+
+ void setState(RTCState *state) { rs_->setState(state); }
+ void setRateControl(RTCRateControl *rateControl) {
+ rs_->setRateControl(rateControl);
+ }
+
+ void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
+
+ void setContentSharingMode() { rs_->setContentSharingMode(); }
+ void turnOnRecovery() { rs_->turnOnRecovery(); }
+ bool isRtxOn() { return rs_->isRtxOn(); }
+
+ void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type);
+
+ void onNewRound(bool in_sync);
+
+ // the following functions return true if a loss is detected, false otherwise
+ bool onTimeout(uint32_t seq, bool lost);
+ bool onPacketRecoveredFec(uint32_t seq);
+ bool onDataPacketReceived(const core::ContentObject &content_object);
+ bool onNackPacketReceived(const core::ContentObject &nack);
+ bool onProbePacketReceived(const core::ContentObject &probe);
+
+ void clear() { rs_->clear(); }
+
+ bool isRtx(uint32_t seq) { return rs_->isRtx(seq); }
+ bool isPossibleLossWithNoRtx(uint32_t seq) {
+ return rs_->isPossibleLossWithNoRtx(seq);
+ }
+
+ uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); }
+
+ private:
+ // returns true if a loss is detected, false otherwise
+ bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe);
+
+ std::shared_ptr<RecoveryStrategy> rs_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
new file mode 100644
index 000000000..ffbbd78fd
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+/* data packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | payload |
+ * | ... |
+ */
+
+/* nack packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | uint32_t: current seg in production |
+ * +-----------------------------------------+
+ */
+
+/* aggregated packets
+ * +---------------------------------+
+ * |c| #pkts | len1 | len2 | .... |
+ * +----------------------------------
+ *
+ * +---------------------------------+
+ * |c| #pkts | resv | len 1 |
+ * +----------------------------------
+ *
+ * aggregated packets header.
+ * header position. just after the data packet header
+ *
+ * c: 1 bit: 0 8bit encoding, 1 16bit encoding
+ * #pkts: 7 bits: number of application packets contained
+ * 8bits encoding:
+ * lenX: 8 bits: len in bites of packet X
+ * 16bits econding:
+ * resv: 8 bits: reserved field (unused)
+ * lenX: 16bits: len in bytes of packet X
+ */
+
+#pragma once
+#ifndef _WIN32
+#include <arpa/inet.h>
+#else
+#include <hicn/transport/portability/win_portability.h>
+#endif
+
+#include <hicn/transport/portability/endianess.h>
+
+#include <cstring>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const uint32_t DATA_HEADER_SIZE = 12; // bytes
+ // XXX: sizeof(data_packet_t) is 16
+ // beacuse of padding
+const uint32_t NACK_HEADER_SIZE = 16;
+
+struct data_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
+
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
+};
+
+struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const {
+ return portability::net_to_host(timestamp);
+ }
+ inline void setTimestamp(uint64_t time) {
+ timestamp = portability::host_to_net(time);
+ }
+
+ inline uint32_t getProductionRate() const {
+ return portability::net_to_host(prod_rate);
+ }
+ inline void setProductionRate(uint32_t rate) {
+ prod_rate = portability::host_to_net(rate);
+ }
+
+ inline uint32_t getProductionSegment() const {
+ return portability::net_to_host(prod_seg);
+ }
+ inline void setProductionSegment(uint32_t seg) {
+ prod_seg = portability::host_to_net(seg);
+ }
+};
+
+class AggrPktHeader {
+ public:
+ // XXX buf always point to the payload after the data header
+ AggrPktHeader(uint8_t *buf, uint16_t max_packet_len, uint16_t pkt_number)
+ : buf_(buf), pkt_num_(pkt_number) {
+ *buf_ = 0; // reset the first byte to correctly add the header
+ // encoding and the packet number
+ if (max_packet_len > 0xff) {
+ setAggrPktEncoding16bit();
+ } else {
+ setAggrPktEncoding8bit();
+ }
+ setAggrPktNUmber(pkt_number);
+ header_len_ = computeHeaderLen();
+ memset(buf_ + 1, 0, header_len_ - 1);
+ }
+
+ // XXX buf always point to the payload after the data header
+ AggrPktHeader(uint8_t *buf) : buf_(buf) {
+ encoding_ = getAggrPktEncoding();
+ pkt_num_ = getAggrPktNumber();
+ header_len_ = computeHeaderLen();
+ }
+
+ ~AggrPktHeader(){};
+
+ int addPacketToHeader(uint8_t index, uint16_t len) {
+ if (index > pkt_num_) return -1;
+
+ setAggrPktLen(index, len);
+ return 0;
+ }
+
+ int getPointerToPacket(uint8_t index, uint8_t **pkt_ptr, uint16_t *pkt_len) {
+ if (index > pkt_num_) return -1;
+
+ uint16_t len = 0;
+ for (int i = 0; i < index; i++)
+ len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1
+
+ uint16_t offset = len + header_len_;
+ *pkt_ptr = buf_ + offset;
+ *pkt_len = getAggrPktLen(index);
+ return 0;
+ }
+
+ int getPacketOffsets(uint8_t index, uint16_t *pkt_offset, uint16_t *pkt_len) {
+ if (index > pkt_num_) return -1;
+
+ uint16_t len = 0;
+ for (int i = 0; i < index; i++)
+ len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1
+
+ uint16_t offset = len + header_len_;
+ *pkt_offset = offset;
+ *pkt_len = getAggrPktLen(index);
+
+ return 0;
+ }
+
+ uint8_t *getPayloadAppendPtr() { return buf_ + header_len_; }
+
+ uint16_t getHeaderLen() { return header_len_; }
+
+ uint8_t getNumberOfPackets() { return pkt_num_; }
+
+ private:
+ inline uint16_t computeHeaderLen() const {
+ uint16_t len = 4; // min len in bytes
+ if (!encoding_) {
+ while (pkt_num_ >= len) {
+ len += 4;
+ }
+ } else {
+ while (pkt_num_ * 2 >= len) {
+ len += 4;
+ }
+ }
+ return len;
+ }
+
+ inline uint8_t getAggrPktEncoding() const {
+ // get the first bit of the first byte
+ return (*buf_ >> 7);
+ }
+
+ inline void setAggrPktEncoding8bit() {
+ // reset the first bit of the first byte
+ encoding_ = 0;
+ *buf_ &= 0x7f;
+ }
+
+ inline void setAggrPktEncoding16bit() {
+ // set the first bit of the first byte
+ encoding_ = 1;
+ *buf_ ^= 0x80;
+ }
+
+ inline uint8_t getAggrPktNumber() const {
+ // return the first byte with the first bit = 0
+ return (*buf_ & 0x7f);
+ }
+
+ inline void setAggrPktNUmber(uint8_t val) {
+ // set the val without modifying the first bit
+ *buf_ &= 0x80; // reset everithing but the first bit
+ val &= 0x7f; // reset the first bit
+ *buf_ |= val; // or the vals, done!
+ }
+
+ inline uint16_t getAggrPktLen(uint8_t pkt_index) const {
+ pkt_index++;
+ if (!encoding_) { // 8 bits
+ return (uint16_t) * (buf_ + pkt_index);
+ } else { // 16 bits
+ uint16_t *buf_16 = (uint16_t *)buf_;
+ return portability::net_to_host(*(buf_16 + pkt_index));
+ }
+ }
+
+ inline void setAggrPktLen(uint8_t pkt_index, uint16_t len) {
+ pkt_index++;
+ if (!encoding_) { // 8 bits
+ *(buf_ + pkt_index) = (uint8_t)len;
+ } else { // 16 bits
+ uint16_t *buf_16 = (uint16_t *)buf_;
+ *(buf_16 + pkt_index) = portability::host_to_net(len);
+ }
+ }
+
+ uint8_t *buf_;
+ uint8_t encoding_;
+ uint8_t pkt_num_;
+ uint16_t header_len_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h
new file mode 100644
index 000000000..62636ce40
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc.h
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> {
+ public:
+ RTCRateControl()
+ : rc_on_(false),
+ congestion_win_(1000000), // init the win to a large number
+ congestion_state_(CongestionState::Normal),
+ protocol_state_(nullptr) {}
+
+ virtual ~RTCRateControl() = default;
+
+ void turnOnRateControl() { rc_on_ = true; }
+ void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; };
+ uint32_t getCongestionWindow() { return congestion_win_; };
+ bool inCongestionState() {
+ if (congestion_state_ == CongestionState::Congested) return true;
+ return false;
+ }
+
+ virtual void onNewRound(double round_len) = 0;
+ virtual void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) = 0;
+
+ protected:
+ enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last };
+
+ protected:
+ bool rc_on_;
+ uint32_t congestion_win_;
+ CongestionState congestion_state_;
+
+ std::shared_ptr<RTCState> protocol_state_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc
new file mode 100644
index 000000000..6cd3094b5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_congestion_detection.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlCongestionDetection::RTCRateControlCongestionDetection()
+ : rounds_without_congestion_(4), last_queue_(0) {} // must be > 3
+
+RTCRateControlCongestionDetection::~RTCRateControlCongestionDetection() {}
+
+void RTCRateControlCongestionDetection::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ if (last_queue_ == queue) {
+ // if last_queue == queue the consumer didn't receive any
+ // packet from the producer. we do not change the current congestion state.
+ // we just increase the counter of rounds whithout congestion if needed
+ // (in case of congestion the counter is already set to 0)
+ if (congestion_state_ == CongestionState::Normal)
+ rounds_without_congestion_++;
+ } else {
+ if (queue > MAX_QUEUING_DELAY) {
+ // here we detect congestion.
+ congestion_state_ = CongestionState::Congested;
+ rounds_without_congestion_ = 0;
+ } else {
+ // wait 3 rounds before switch back to no congestion
+ if (rounds_without_congestion_ > 3) {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+ rounds_without_congestion_++;
+ }
+ last_queue_ = queue;
+ }
+}
+
+void RTCRateControlCongestionDetection::onDataPacketReceived(
+ const core::ContentObject &content_object, bool compute_stats) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h
new file mode 100644
index 000000000..9afa6c39a
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlCongestionDetection : public RTCRateControl {
+ public:
+ RTCRateControlCongestionDetection();
+
+ ~RTCRateControlCongestionDetection();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.cc b/libtransport/src/protocols/rtc/rtc_rc_iat.cc
new file mode 100644
index 000000000..f06f377f3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_iat.cc
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_iat.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlIAT::RTCRateControlIAT()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ rounds_with_congestion_(0),
+ last_queue_(0),
+ last_rcv_time_(0),
+ last_prod_time_(0),
+ last_seq_number_(0),
+ target_rate_avg_(0),
+ round_index_(0),
+ congestion_cause_(CongestionCause::UNKNOWN) {}
+
+RTCRateControlIAT::~RTCRateControlIAT() {}
+
+void RTCRateControlIAT::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate() +
+ protocol_state_->getRecoveredFecRate();
+
+ double target_rate =
+ protocol_state_->getProducerRate(); // * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
+ // double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ target_rate_avg_ = target_rate_avg_ * (1 - MOVING_AVG_ALPHA) +
+ target_rate * MOVING_AVG_ALPHA;
+
+ if (prev_congestion_state == CongestionState::Congested) {
+ if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ congestion_state_ = CongestionState::Congested;
+
+ received_rate_.push_back(received_rate);
+ target_rate_.push_back(target_rate);
+
+ // We assume the cause does not change
+ // Note that the first assumption about the cause could be wrong
+ // the cause of congestion could change
+ if (congestion_cause_ == CongestionCause::UNKNOWN)
+ if (rounds_with_congestion_ >= 1)
+ congestion_cause_ = apply_classification_tree(
+ rounds_with_congestion_ > ROUND_TO_WAIT_FORCE_DECISION);
+
+ rounds_with_congestion_++;
+ } else {
+ congestion_state_ = CongestionState::Normal;
+
+ // clear past history
+ reset_congestion_statistics();
+
+ // TODO maybe we can use some of these values for the stdev of the
+ // congestion mode
+ for (int i = 0; i < ROUND_HISTORY_SIZE; i++) {
+ iat_on_hold_[i].clear();
+ }
+ }
+ } else if (queue > MAX_QUEUING_DELAY) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ rounds_with_congestion_ = 0;
+
+ if (rounds_without_congestion_ > ROUND_TO_RESET_CAUSE)
+ congestion_cause_ = CongestionCause::UNKNOWN;
+ }
+ congestion_state_ = CongestionState::Congested;
+ received_rate_.push_back(received_rate);
+ target_rate_.push_back(target_rate);
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ reset_congestion_statistics();
+
+ int past_index = (round_index_ + 1) % ROUND_HISTORY_SIZE;
+ for (std::vector<double>::iterator it = iat_on_hold_[past_index].begin();
+ it != iat_on_hold_[past_index].end(); ++it) {
+ congestion_free_iat_.push_back(*it);
+ if (congestion_free_iat_.size() > 50) {
+ congestion_free_iat_.erase(congestion_free_iat_.begin());
+ }
+ }
+ iat_on_hold_[past_index].clear();
+ round_index_ = (round_index_ + 1) % ROUND_HISTORY_SIZE;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ // disabling for the moment the congestion window setup
+ // congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ // disabling for the moment the congestion window setup
+ // uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ // congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ // disabling for the moment the congestion window setup
+ // congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ // congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+
+ if (received_rate_.size() > 1000)
+ received_rate_.erase(received_rate_.begin());
+ if (target_rate_.size() > 1000) target_rate_.erase(target_rate_.begin());
+}
+
+void RTCRateControlIAT::onDataPacketReceived(
+ const core::ContentObject &content_object, bool compute_stats) {
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number == (last_seq_number_ + 1) && compute_stats) {
+ uint64_t iat = now - last_rcv_time_;
+ uint64_t ist = params.timestamp - last_prod_time_;
+ if (now >= last_rcv_time_ && params.timestamp > last_prod_time_) {
+ if (iat >= ist && ist < MIN_IST_VALUE) {
+ if (congestion_state_ == CongestionState::Congested) {
+ iat_.push_back((iat - ist));
+ } else {
+ // no congestion, but we do not always add new values, but only when
+ // there is no sign of congestion
+ double queue = protocol_state_->getQueuing();
+ if (queue <= CONGESTION_FREE_QUEUEING_DELAY) {
+ iat_on_hold_[round_index_].push_back((iat - ist));
+ }
+ }
+ }
+ }
+ }
+
+ last_seq_number_ = segment_number;
+ last_rcv_time_ = now;
+ last_prod_time_ = params.timestamp;
+
+ if (iat_.size() > 1000) iat_.erase(iat_.begin());
+ return;
+}
+
+CongestionCause RTCRateControlIAT::apply_classification_tree(bool force_reply) {
+ if (iat_.size() <= 2 || received_rate_.size() < 2)
+ return CongestionCause::UNKNOWN;
+
+ double received_ratio = 0;
+ double iat_ratio = 0;
+ double iat_stdev = compute_iat_stdev(iat_);
+ double iat_congestion_free_stdev = compute_iat_stdev(congestion_free_iat_);
+
+ double iat_avg = 0.0;
+
+ double recv_avg = 0.0;
+ double recv_max = 0.0;
+
+ double target_rate_avg = 0.0;
+
+ int counter = 0;
+ std::vector<double>::reverse_iterator target_it = target_rate_.rbegin();
+ for (std::vector<double>::reverse_iterator it = received_rate_.rbegin();
+ it != received_rate_.rend(); ++it) {
+ recv_avg += *it;
+ target_rate_avg += *target_it;
+ if (counter < ROUND_HISTORY_SIZE)
+ if (recv_max < *it) {
+ recv_max = *it; // we consider only the last 2 seconds
+ }
+ counter++;
+ target_it++;
+ }
+ recv_avg = recv_avg / received_rate_.size();
+ target_rate_avg = target_rate_avg / target_rate_.size();
+
+ for (std::vector<double>::iterator it = iat_.begin(); it != iat_.end();
+ ++it) {
+ iat_avg += *it;
+ }
+ iat_avg = iat_avg / iat_.size();
+
+ double congestion_free_iat_avg = 0.0;
+ for (std::vector<double>::iterator it = congestion_free_iat_.begin();
+ it != congestion_free_iat_.end(); ++it) {
+ congestion_free_iat_avg += *it;
+ }
+ congestion_free_iat_avg =
+ congestion_free_iat_avg / congestion_free_iat_.size();
+
+ received_ratio = recv_avg / target_rate_avg;
+
+ iat_ratio = iat_stdev / iat_congestion_free_stdev;
+
+ CongestionCause congestion_cause = CongestionCause::UNKNOWN;
+ // applying classification tree model
+ if (received_ratio <= 0.87)
+ if (iat_stdev <= 6.48)
+ if (received_ratio <= 0.83)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else if (force_reply)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low
+ else if (iat_ratio <= 2.46)
+ if (force_reply)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low
+ else
+ congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC;
+ else if (received_ratio <= 0.913 && iat_stdev <= 0.784)
+ congestion_cause = CongestionCause::LINK_CAPACITY;
+ else
+ congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC;
+
+ return congestion_cause;
+}
+
+void RTCRateControlIAT::reset_congestion_statistics() {
+ iat_.clear();
+ received_rate_.clear();
+ target_rate_.clear();
+}
+
+double RTCRateControlIAT::compute_iat_stdev(std::vector<double> v) {
+ if (v.size() == 0) return 0;
+
+ float sum = 0.0, mean, standard_deviation = 0.0;
+ for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) {
+ sum += *it;
+ }
+
+ mean = sum / v.size();
+ for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) {
+ standard_deviation += pow(*it - mean, 2);
+ }
+ return sqrt(standard_deviation / v.size());
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.h b/libtransport/src/protocols/rtc/rtc_rc_iat.h
new file mode 100644
index 000000000..715637807
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_iat.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const int ROUND_HISTORY_SIZE = 10; // equivalent to two seconds
+const int ROUND_TO_WAIT_FORCE_DECISION = 5;
+
+// once congestion is gone, we need to wait for k rounds before changing the
+// congestion cause in the case it appears again
+const int ROUND_TO_RESET_CAUSE = 5;
+
+const int MIN_IST_VALUE = 150; // samples of ist larger than 150ms are
+ // discarded
+const double CONGESTION_FREE_QUEUEING_DELAY = 10;
+
+enum class CongestionCause : uint8_t {
+ COMPETING_CROSS_TRAFFIC,
+ FRIENDLY_CROSS_TRAFFIC,
+ UNKNOWN_CROSS_TRAFFIC,
+ LINK_CAPACITY,
+ UNKNOWN
+};
+
+class RTCRateControlIAT : public RTCRateControl {
+ public:
+ RTCRateControlIAT();
+
+ ~RTCRateControlIAT();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void reset_congestion_statistics();
+
+ double compute_iat_stdev(std::vector<double> v);
+
+ CongestionCause apply_classification_tree(bool force_reply);
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ uint32_t rounds_with_congestion_;
+ double last_queue_;
+ uint64_t last_rcv_time_;
+ uint64_t last_prod_time_;
+ uint32_t last_seq_number_;
+ double target_rate_avg_;
+
+ // Iat values are not immediately added to the congestion free set of values
+ std::array<std::vector<double>, ROUND_HISTORY_SIZE> iat_on_hold_;
+ uint32_t round_index_;
+
+ // with congestion statistics
+ std::vector<double> iat_;
+ std::vector<double> received_rate_;
+ std::vector<double> target_rate_;
+
+ // congestion free statistics
+ std::vector<double> congestion_free_iat_;
+
+ CongestionCause congestion_cause_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
new file mode 100644
index 000000000..ecabc5205
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlQueue::RTCRateControlQueue()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ last_queue_(0) {}
+
+RTCRateControlQueue::~RTCRateControlQueue() {}
+
+void RTCRateControlQueue::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate();
+ double target_rate =
+ protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ if (prev_congestion_state == CongestionState::Normal &&
+ received_rate >= target_rate) {
+ // if the queue is high in this case we are most likelly fighting with
+ // a TCP flow and there is enough bandwidth to match the producer rate
+ congestion_state_ = CongestionState::Normal;
+ } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ // here we detect congestion. in the case that last_queue == queue
+ // the consumer didn't receive any packet from the producer so we
+ // consider this case as congestion
+ // TODO: wath happen in case of high loss rate?
+ congestion_state_ = CongestionState::Congested;
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+}
+
+void RTCRateControlQueue::onDataPacketReceived(
+ const core::ContentObject &content_object, bool compute_stats) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h
new file mode 100644
index 000000000..cdf78fd47
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlQueue : public RTCRateControl {
+ public:
+ RTCRateControlQueue();
+
+ ~RTCRateControlQueue();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc
new file mode 100644
index 000000000..b1b0fcaba
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <protocols/rtc/rtc_reassembly.h>
+#include <protocols/transport_protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RtcReassembly::RtcReassembly(implementation::ConsumerSocket* icn_socket,
+ TransportProtocol* transport_protocol)
+ : DatagramReassembly(icn_socket, transport_protocol) {
+ is_setup_ = false;
+}
+
+void RtcReassembly::reassemble(core::ContentObject& content_object) {
+ if (!is_setup_) {
+ is_setup_ = true;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_);
+ }
+
+ auto read_buffer = content_object.getPayload();
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length();
+
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength(false));
+
+ if (data_aggregation_) {
+ rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data());
+
+ for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) {
+ std::unique_ptr<utils::MemBuf> segment = read_buffer->clone();
+
+ uint16_t pkt_start = 0;
+ uint16_t pkt_len = 0;
+ int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len);
+ if (res == -1) {
+ // this should not happen
+ break;
+ }
+
+ segment->trimStart(pkt_start);
+ segment->trimEnd(segment->length() - pkt_len);
+
+ Reassembly::read_buffer_ = std::move(segment);
+ Reassembly::notifyApplication();
+ }
+ } else {
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+ }
+}
+
+void RtcReassembly::reassemble(utils::MemBuf& buffer, uint32_t suffix) {
+ if (!is_setup_) {
+ is_setup_ = true;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_);
+ }
+
+ if (data_aggregation_) {
+ rtc::AggrPktHeader hdr((uint8_t*)buffer.data());
+
+ for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) {
+ std::unique_ptr<utils::MemBuf> segment = buffer.clone();
+
+ uint16_t pkt_start = 0;
+ uint16_t pkt_len = 0;
+ int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len);
+ if (res == -1) {
+ // this should not happen
+ break;
+ }
+
+ segment->trimStart(pkt_start);
+ segment->trimEnd(segment->length() - pkt_len);
+
+ Reassembly::read_buffer_ = std::move(segment);
+ Reassembly::notifyApplication();
+ }
+
+ } else {
+ Reassembly::read_buffer_ = buffer.cloneOne();
+ Reassembly::notifyApplication();
+ }
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h
new file mode 100644
index 000000000..132004605
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RtcReassembly : public DatagramReassembly {
+ public:
+ RtcReassembly(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ void reassemble(core::ContentObject &content_object) override;
+ void reassemble(utils::MemBuf &buffer, uint32_t suffix) override;
+
+ private:
+ bool is_setup_;
+ bool data_aggregation_;
+};
+
+} // namespace rtc
+} // namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
new file mode 100644
index 000000000..257fdd09b
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -0,0 +1,420 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <hicn/transport/interfaces/notification.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace transport::interface;
+
+RecoveryStrategy::RecoveryStrategy(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : rs_type_(rs_type),
+ recovery_on_(false),
+ content_sharing_mode_(false),
+ rtx_during_fec_(0),
+ next_rtx_timer_(MAX_TIMER_RTX),
+ send_rtx_callback_(std::move(callback)),
+ indexer_(indexer),
+ round_id_(0),
+ last_fec_used_(0),
+ callback_(std::move(external_callback)) {
+ setRtxFec(use_rtx, use_fec);
+ timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
+ : rs_type_(rs.rs_type_),
+ content_sharing_mode_(rs.content_sharing_mode_),
+ rtx_during_fec_(0),
+ rtx_state_(std::move(rs.rtx_state_)),
+ rtx_timers_(std::move(rs.rtx_timers_)),
+ recover_with_fec_(std::move(rs.recover_with_fec_)),
+ timer_(std::move(rs.timer_)),
+ next_rtx_timer_(std::move(rs.next_rtx_timer_)),
+ send_rtx_callback_(std::move(rs.send_rtx_callback_)),
+ n_(std::move(rs.n_)),
+ k_(std::move(rs.k_)),
+ indexer_(std::move(rs.indexer_)),
+ state_(std::move(rs.state_)),
+ rc_(std::move(rs.rc_)),
+ round_id_(std::move(rs.round_id_)),
+ last_fec_used_(std::move(rs.last_fec_used_)),
+ callback_(std::move(rs.callback_)) {
+ setFecParams(n_, k_);
+}
+
+RecoveryStrategy::~RecoveryStrategy() {}
+
+void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
+ // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64
+ n_ = n;
+ k_ = k;
+
+ // XXX for the moment we go in steps of 5% loss rate.
+ uint32_t i = 0;
+ for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
+ uint32_t fec_to_ask = 0;
+ if (n_ != 0 && k_ != 0) {
+ if (rs_type_ ==
+ interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) {
+ // the max loss rate in the matrix is 50%
+ uint32_t index = i;
+ if (i > 9) index = 9;
+ fec_to_ask = FEC_MATRIX[k_ - 1][index];
+ } else {
+ double dec_loss_rate = (double)(loss_rate + 5);
+ if (dec_loss_rate == 100.0) dec_loss_rate = 95.0;
+ dec_loss_rate = dec_loss_rate / 100.0;
+ double exp_losses = ceil((double)k_ * dec_loss_rate);
+ fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25);
+ }
+ }
+ fec_to_ask = std::min(fec_to_ask, (n_ - k_));
+ fec_per_loss_rate_.push_back(fec_to_ask);
+
+ i++;
+ }
+}
+
+uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) {
+ auto it = rtx_state_.find(seq);
+
+ if (it == rtx_state_.end()) return 0;
+
+ // we can compute the RTT of an RTX only if it was send once. Infact if the
+ // RTX was sent twice or more the data may be alredy in flight and the RTT
+ // will be underestimated. This may happen also for packets that we
+ // retransmitted too soon. in that case the RTT will be filtered out by
+ // checking the path label
+ if (it->second.rtx_count_ != 1) return 0;
+
+ // this a potentialy valid packet, compute the RTT
+ return (utils::SteadyTime::nowMs().count() - it->second.last_send_);
+}
+
+bool RecoveryStrategy::lossDetected(uint32_t seq) {
+ if (isRtx(seq)) {
+ // this packet is already in the list of rtx
+ return false;
+ }
+
+ auto it_fec = recover_with_fec_.find(seq);
+ if (it_fec != recover_with_fec_.end()) {
+ // this packet is already in list of packets to recover with fec
+ // this list contians also fec packets that will not be recovered with rtx
+ return false;
+ }
+
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ // this packet was nacked so we do not use it to determine the loss rate
+ return false;
+ }
+
+ return true;
+}
+
+void RecoveryStrategy::notifyNewLossDetedcted(uint32_t seq) {
+ // new loss detected
+ // first record the loss. second do what is needed to recover it
+ state_->onLossDetected(seq);
+ newPacketLoss(seq);
+}
+
+void RecoveryStrategy::requestPossibleLostPacket(uint32_t seq) {
+ // these are packets for which we send a RTX but we do not increase the loss
+ // counter beacuse we don't know if they are lost or not
+ addNewRtx(seq, false);
+}
+
+void RecoveryStrategy::receivedFutureNack(uint32_t seq) {
+ nacked_seq_.insert(seq);
+}
+
+void RecoveryStrategy::clear() {
+ rtx_state_.clear();
+ rtx_timers_.clear();
+ recover_with_fec_.clear();
+
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ timer_->cancel();
+ }
+}
+
+// rtx functions
+void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) {
+ if (!indexer_->isFec(seq) || force) {
+ // this packet needs to be re-transmitted
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.last_send_ = state.first_send_; // we didn't send an RTX for this
+ // packet yet
+ state.rtx_count_ = 0;
+ state.next_send_ = computeNextSend(seq, state.rtx_count_);
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Add " << seq << " to retransmissions. next rtx is in "
+ << state.next_send_ - getNow() << " ms";
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+
+ // if a new rtx is introduced, check the rtx timer
+ scheduleNextRtx();
+ } else {
+ // do not re-send fec packets but keep track of them
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+}
+
+uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) {
+ uint64_t now = getNow();
+ if (rtx_counter == 0) {
+ uint32_t wait = 1;
+ if (content_sharing_mode_) return now + wait;
+
+ uint32_t jitter = SENTINEL_TIMER_INTERVAL;
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) jitter = ceil(state_->getJitter());
+
+ wait += jitter;
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait
+ << " ms, jitter = " << jitter;
+
+ return now + wait;
+ } else {
+ // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate == 0) {
+ return now + SENTINEL_TIMER_INTERVAL;
+ }
+
+ uint64_t rtt = 0;
+ // if the transport detects an edge we try first to get the RTX from the
+ // edge. if no interest get a reply we move to the full RTT
+ if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) {
+ rtt = state_->getEdgeRtt();
+ } else {
+ rtt = state_->getAvgRTT();
+ }
+
+ if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
+
+ if (content_sharing_mode_) return now + rtt;
+
+ uint32_t wait = (uint32_t)rtt;
+
+ uint32_t jitter = ceil(state_->getJitter());
+ wait += jitter;
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt
+ << " jtter = " << jitter;
+
+ return now + wait;
+ }
+}
+
+void RecoveryStrategy::retransmit() {
+ if (rtx_timers_.size() == 0) return;
+
+ uint64_t now = getNow();
+
+ auto it = rtx_timers_.begin();
+ std::unordered_set<uint32_t> lost_pkt;
+ uint32_t sent_counter = 0;
+ while (it != rtx_timers_.end() && it->first <= now &&
+ sent_counter < MAX_RTX_IN_BATCH) {
+ uint32_t seq = it->second;
+ auto rtx_it =
+ rtx_state_.find(seq); // this should always return a valid iter
+ if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
+ (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
+ seq < state_->getLastSeqNacked()) {
+ // max rtx reached or packet too old or packet nacked, this packet is lost
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "packet " << seq << " lost because 1) max rtx: "
+ << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: "
+ << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE)
+ << " 3) nacked: " << (seq < state_->getLastSeqNacked());
+ lost_pkt.insert(seq);
+ it++;
+ } else {
+ // resend the packet
+ state_->onRetransmission(seq);
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) rtx_it->second.rtx_count_++;
+ rtx_it->second.last_send_ = now;
+ rtx_it->second.next_send_ =
+ computeNextSend(seq, rtx_it->second.rtx_count_);
+ it = rtx_timers_.erase(it);
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "send rtx for sequence " << seq << ", next send in "
+ << (rtx_it->second.next_send_ - now);
+
+ // if fec is on increase the number of RTX send during fec
+ if (fec_on_) rtx_during_fec_++;
+ send_rtx_callback_(seq);
+ sent_counter++;
+ }
+ }
+
+ // remove packets if needed
+ for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
+ uint32_t seq = *lost_it;
+ state_->onPacketLost(seq);
+ deleteRtx(seq);
+ }
+}
+
+void RecoveryStrategy::scheduleNextRtx() {
+ if (rtx_timers_.size() == 0) {
+ // all the rtx were removed, reset timer
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ return;
+ }
+
+ // check if timer is alreay set
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ // a new check for rtx is already scheduled
+ if (next_rtx_timer_ > rtx_timers_.begin()->first) {
+ // we need to re-schedule it
+ timer_->cancel();
+ } else {
+ // wait for the next timer
+ return;
+ }
+ }
+
+ // set a new timer
+ next_rtx_timer_ = rtx_timers_.begin()->first;
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ uint64_t wait = 1;
+ if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
+ wait = next_rtx_timer_ - now;
+
+ std::weak_ptr<RecoveryStrategy> self(shared_from_this());
+ timer_->expires_from_now(std::chrono::milliseconds(wait));
+ timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->retransmit();
+ s->next_rtx_timer_ = MAX_TIMER_RTX;
+ s->scheduleNextRtx();
+ }
+ });
+}
+
+void RecoveryStrategy::deleteRtx(uint32_t seq) {
+ auto it_rtx = rtx_state_.find(seq);
+ if (it_rtx == rtx_state_.end()) return; // rtx not found
+
+ // remove the rtx from the timers list
+ uint64_t ts = it_rtx->second.next_send_;
+ auto it_timers = rtx_timers_.find(ts);
+ while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
+ if (it_timers->second == seq) {
+ rtx_timers_.erase(it_timers);
+ break;
+ }
+ it_timers++;
+ }
+
+ // remove rtx
+ rtx_state_.erase(it_rtx);
+}
+
+// fec functions
+uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
+ double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in %
+
+ if (loss_rate > 95) loss_rate = 95; // max loss rate
+
+ if (loss_rate == 0) return 0;
+
+ // keep track of the last used fec. if we use a new bin on this round reset
+ // consecutive use and avg loss in the prev bin
+ uint32_t bin = ceil(loss_rate / 5.0) - 1;
+ if (bin > fec_per_loss_rate_.size() - 1)
+ bin = (uint32_t)fec_per_loss_rate_.size() - 1;
+
+ return fec_per_loss_rate_[bin];
+}
+
+void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
+ std::optional<bool> fec_on) {
+ if (rtx_on) rtx_on_ = *rtx_on;
+ if (fec_on) {
+ if (fec_on_ == false && (*fec_on) == true) { // turn on fec
+ // reset the number of RTX sent during fec
+ rtx_during_fec_ = 0;
+ }
+ fec_on_ = *fec_on;
+ }
+
+ notification::RecoveryStrategy strategy =
+ notification::RecoveryStrategy::RECOVERY_OFF;
+
+ if (rtx_on_ && fec_on_)
+ strategy = notification::RecoveryStrategy::RTX_AND_FEC;
+ else if (rtx_on_)
+ strategy = notification::RecoveryStrategy::RTX_ONLY;
+ else if (fec_on_)
+ strategy = notification::RecoveryStrategy::FEC_ONLY;
+
+ callback_(strategy);
+}
+
+// common functions
+void RecoveryStrategy::onLostTimeout(uint32_t seq) { removePacketState(seq); }
+
+void RecoveryStrategy::removePacketState(uint32_t seq) {
+ auto it_fec = recover_with_fec_.find(seq);
+ if (it_fec != recover_with_fec_.end()) {
+ recover_with_fec_.erase(it_fec);
+ return;
+ }
+
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ nacked_seq_.erase(it_nack);
+ return;
+ }
+
+ deleteRtx(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
new file mode 100644
index 000000000..405e1ebba
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+#include <protocols/indexer.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <map>
+#include <optional>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
+ protected:
+ struct rtx_state_ {
+ uint64_t first_send_; // first time this interest was sent
+ uint64_t last_send_; // last time this rtx was sent
+ uint64_t next_send_; // next retransmission time
+ uint32_t rtx_count_; // number or rtx
+ };
+
+ using rtxState = struct rtx_state_;
+
+ public:
+ using SendRtxCallback = std::function<void(uint32_t)>;
+
+ RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service, bool use_rtx, bool use_fec,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategy(RecoveryStrategy &&rs);
+
+ virtual ~RecoveryStrategy();
+
+ void setRtxFec(std::optional<bool> rtx_on = {},
+ std::optional<bool> fec_on = {});
+ void setState(RTCState *state) { state_ = state; }
+ void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
+ void setFecParams(uint32_t n, uint32_t k);
+ void setContentSharingMode() { content_sharing_mode_ = true; }
+
+ bool isRtx(uint32_t seq) {
+ if (rtx_state_.find(seq) != rtx_state_.end()) return true;
+ return false;
+ }
+
+ bool isPossibleLossWithNoRtx(uint32_t seq) {
+ if (recover_with_fec_.find(seq) != recover_with_fec_.end()) return true;
+ return false;
+ }
+
+ bool wasNacked(uint32_t seq) {
+ if (nacked_seq_.find(seq) != nacked_seq_.end()) return true;
+ return false;
+ }
+
+ interface::RtcTransportRecoveryStrategies getType() {
+ return rs_type_;
+ }
+ void updateType(interface::RtcTransportRecoveryStrategies type) {
+ rs_type_ = type;
+ }
+ bool isRtxOn() { return rtx_on_; }
+ bool isFecOn() { return fec_on_; }
+
+ RTCState *getState() { return state_; }
+
+ // if the function returns 0 it means that the packet is not an RTX or it is
+ // not a valid packet to safely compute the RTT
+ uint64_t getRtxRtt(uint32_t seq);
+ bool lossDetected(uint32_t seq);
+ void notifyNewLossDetedcted(uint32_t seq);
+ void requestPossibleLostPacket(uint32_t seq);
+ void receivedFutureNack(uint32_t seq);
+ void clear();
+
+ virtual void turnOnRecovery() = 0;
+ virtual void onNewRound(bool in_sync) = 0;
+ virtual void newPacketLoss(uint32_t seq) = 0;
+ virtual void receivedPacket(uint32_t seq) = 0;
+ void onLostTimeout(uint32_t seq);
+
+ void incRoundId() { round_id_++; }
+
+ // utils
+ uint64_t getNow() {
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ return now;
+ }
+
+ protected:
+ // rtx functions
+ void addNewRtx(uint32_t seq, bool force);
+ uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter);
+ void retransmit();
+ void scheduleNextRtx();
+ void deleteRtx(uint32_t seq);
+
+ // fec functions
+ uint32_t computeFecPacketsToAsk();
+
+ // common functons
+ void removePacketState(uint32_t seq);
+
+ interface::RtcTransportRecoveryStrategies rs_type_;
+ bool recovery_on_;
+ bool rtx_on_;
+ bool fec_on_;
+ bool content_sharing_mode_;
+
+ // number of RTX sent after fec turned on
+ // this is used to take into account jitter and out of order packets
+ // if we detect losses but we do not sent any RTX it means that the holes in
+ // the sequence are caused by the jitter
+ uint32_t rtx_during_fec_;
+
+ // this map keeps track of the retransmitted interest, ordered from the oldest
+ // to the newest one. the state contains the timer of the first send of the
+ // interest (from pendingIntetests_), the timer of the next send (key of the
+ // multimap) and the number of rtx
+ std::map<uint32_t, rtxState> rtx_state_;
+ // this map stored the rtx by timer. The key is the time at which the rtx
+ // should be sent, and the val is the interest seq number
+ std::multimap<uint64_t, uint32_t> rtx_timers_;
+
+ // lost packets that will be recovered with fec
+ std::unordered_set<uint32_t> recover_with_fec_;
+
+ // packet for which we recived a future nack
+ // in case we detect a loss for a nacked packet we send an RTX but we do not
+ // increase the loss counter. this is done because it may happen that the
+ // producer rate checkes over time and in flight interest may be satified by
+ // data packet after the reception of nacks
+ std::unordered_set<uint32_t> nacked_seq_;
+
+ // rtx vars
+ std::unique_ptr<asio::steady_timer> timer_;
+ uint64_t next_rtx_timer_;
+ SendRtxCallback send_rtx_callback_;
+
+ // fec vars
+ uint32_t n_;
+ uint32_t k_;
+ Indexer *indexer_;
+
+ RTCState *state_;
+ RTCRateControl *rc_;
+
+ private:
+ uint32_t round_id_; // number of rounds
+ uint32_t last_fec_used_;
+ std::vector<uint32_t> fec_per_loss_rate_;
+ interface::StrategyCallback callback_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
new file mode 100644
index 000000000..7d7a01133
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_delay.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ rs_type,
+ std::move(external_callback)), // start with rtx
+ congestion_state_(false),
+ probing_state_(false),
+ switch_rounds_(0) {}
+
+RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(true, false);
+ // we have to re-init congestion and
+ // probing
+ switch_rounds_ = 0;
+ congestion_state_ = false;
+ probing_state_ = false;
+}
+
+RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {}
+
+void RecoveryStrategyDelayBased::turnOnRecovery() {
+ recovery_on_ = true;
+ uint64_t rtt = state_->getMinRTT();
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) {
+ // we need to start FEC (see fec only strategy for more details)
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // use RTX
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ }
+}
+
+void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) {
+ if (fec_to_ask == 0) {
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ } else {
+ switch_rounds_++;
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ != 0) { // go to fec only if it is needed (RTX are on)
+ setRtxFec(false, true);
+ } else {
+ setRtxFec(true, true);
+ }
+ }
+}
+
+void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ // disable fec so that no extra packet will be sent
+ // for rtx we check if recovery is on in newPacketLoss
+ setRtxFec(true, false);
+ indexer_->setNFec(0);
+ return;
+ }
+
+ uint64_t rtt = state_->getAvgRTT();
+
+ // XXX at the moment we are not looking at congestion events
+ // bool congestion = rc_->inCongestionState();
+
+ if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) ||
+ (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) {
+ // switch from rtx to fec or keep use fec. Notice that if some rtx are
+ // waiting to be scheduled, they will be sent normally, but no new rtx will
+ // be created if the loss rate is 0 keep to use RTX.
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ softSwitchToFec(fec_to_ask);
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
+ return;
+ }
+
+ if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) ||
+ (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) {
+ // turn on rtx
+ softSwitchToFec(0);
+ indexer_->setNFec(0);
+ return;
+ }
+}
+
+void RecoveryStrategyDelayBased::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyDelayBased::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+void RecoveryStrategyDelayBased::probing() {
+ // TODO
+ // for the moment ask for all fec and exit the probing phase
+ probing_state_ = false;
+ setRtxFec(false, true);
+ indexer_->setNFec(computeFecPacketsToAsk());
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
new file mode 100644
index 000000000..9e1c41388
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyDelayBased : public RecoveryStrategy {
+ public:
+ RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyDelayBased();
+
+ void turnOnRecovery();
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ void softSwitchToFec(uint32_t fec_to_ask);
+
+ bool congestion_state_;
+ bool probing_state_;
+ uint32_t switch_rounds_;
+
+ void probing();
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
new file mode 100644
index 000000000..5b10823ec
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_fec_only.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ rs_type, std::move(external_callback)),
+ congestion_state_(false),
+ probing_state_(false),
+ switch_rounds_(0) {}
+
+RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ congestion_state_ = false;
+ probing_state_ = false;
+}
+
+RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {}
+
+void RecoveryStrategyFecOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ // init strategy
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (fec_to_ask > 0) {
+ // the probing phase detected a lossy link. we immedialty start the fec and
+ // we disable the check to prevent to send fec packets before RTX. in fact
+ // here we know that we have losses and it is not a problem of OOO packets
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // keep only RTX on
+ setRtxFec(true, true);
+ }
+}
+
+void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ indexer_->setNFec(0);
+ return;
+ }
+
+ // XXX for the moment we are considering congestion events
+ // if(rc_->inCongestionState()){
+ // congestion_state_ = true;
+ // probing_state_ = false;
+ // indexer_->setNFec(0);
+ // return;
+ // }
+
+ // no congestion
+ if (congestion_state_) {
+ // this is the first round after congestion
+ // enter probing phase
+ probing_state_ = true;
+ congestion_state_ = false;
+ }
+
+ if (probing_state_) {
+ probing();
+ } else {
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ // If fec_to_ask == 0 we use rtx even if in these strategy we use only fec.
+ // In this way the first packet loss that triggers the usage of fec can be
+ // recovered using rtx, otherwise it will always be lost
+ if (fec_to_ask == 0) {
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ } else {
+ switch_rounds_++;
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ !=
+ 0) { // go to fec only if it is needed (RTX are on)
+ setRtxFec(false, true);
+ } else {
+ setRtxFec(true, true); // keep both
+ }
+ }
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
+ }
+}
+
+void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ // if not pending add to list to recover with fec
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyFecOnly::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+void RecoveryStrategyFecOnly::probing() {
+ // TODO
+ // for the moment ask for all fec and exit the probing phase
+ probing_state_ = false;
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ indexer_->setNFec(fec_to_ask);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
new file mode 100644
index 000000000..42df25bd9
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyFecOnly : public RecoveryStrategy {
+ public:
+ RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyFecOnly();
+
+ void turnOnRecovery();
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ bool congestion_state_;
+ bool probing_state_;
+ uint32_t switch_rounds_;
+
+ void probing();
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
new file mode 100644
index 000000000..dbad563cd
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_low_rate.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyLowRate::RecoveryStrategyLowRate(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
+ rs_type,
+ std::move(external_callback)), // start with fec
+ fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
+ rtx_allowed_consecutive_rounds_(0) {
+ initSwitchVector();
+}
+
+RecoveryStrategyLowRate::RecoveryStrategyLowRate(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)),
+ fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
+ rtx_allowed_consecutive_rounds_(0) {
+ setRtxFec(false, true);
+ initSwitchVector();
+}
+
+RecoveryStrategyLowRate::~RecoveryStrategyLowRate() {}
+
+void RecoveryStrategyLowRate::initSwitchVector() {
+ // TODO adjust thresholds here when new data are collected
+ // see resutls in
+ // https://confluence-eng-gpk1.cisco.com/conf/display/SPT/dailyreports
+ thresholds_t t1;
+ t1.rtt = 15; // 15ms
+ t1.loss_rtx_to_fec = 15; // 15%
+ t1.loss_fec_to_rtx = 10; // 10%
+ thresholds_t t2;
+ t2.rtt = 35; // 35ms
+ t2.loss_rtx_to_fec = 5; // 5%
+ t2.loss_fec_to_rtx = 1; // 1%
+ switch_vector.push_back(t1);
+ switch_vector.push_back(t2);
+}
+
+void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec,
+ uint32_t fec_to_ask) {
+ setRtxFec(use_rtx, use_fec);
+ indexer_->setNFec(fec_to_ask);
+}
+
+void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (fec_to_ask == 0) {
+ // fec is off, turn on RTX immediatly to avoid packet losses
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ return;
+ }
+
+ uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100);
+ uint32_t rtt = (uint32_t)state_->getAvgRTT();
+
+ bool use_rtx = false;
+ for (size_t i = 0; i < switch_vector.size(); i++) {
+ uint32_t max_loss_rate = 0;
+ if (fec_on_)
+ max_loss_rate = switch_vector[i].loss_fec_to_rtx;
+ else
+ max_loss_rate = switch_vector[i].loss_rtx_to_fec;
+
+ if (rtt < switch_vector[i].rtt && loss_rate < max_loss_rate) {
+ use_rtx = true;
+ rtx_allowed_consecutive_rounds_++;
+ break;
+ }
+ }
+
+ if (!use_rtx) rtx_allowed_consecutive_rounds_ = 0;
+
+ if (use_rtx) {
+ if (fec_on_) {
+ // here we should swtich from RTX to FEC
+ // wait 10sec where the switch is allowed before actually switch
+ if (rtx_allowed_consecutive_rounds_ >=
+ ((MILLI_IN_A_SEC / ROUND_LEN) * 10)) { // 10 sec
+ // use RTX
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ } else {
+ // keep using FEC (and maybe RTX)
+ setRecoveryParameters(true, true, fec_to_ask);
+ fec_consecutive_rounds_++;
+ }
+ } else {
+ // keep using RTX
+ setRecoveryParameters(true, false, 0);
+ fec_consecutive_rounds_ = 0;
+ }
+ } else {
+ // use FEC and RTX
+ setRecoveryParameters(true, true, fec_to_ask);
+ fec_consecutive_rounds_++;
+ }
+
+ // everytime that we anable FEC we keep also RTX on. in this way the first
+ // losses that are not covered by FEC are recovered using RTX. after 5 sec we
+ // disable fec
+ if (fec_consecutive_rounds_ >= ((MILLI_IN_A_SEC / ROUND_LEN) * 5)) {
+ // turn off RTX
+ setRtxFec(false);
+ }
+}
+
+void RecoveryStrategyLowRate::turnOnRecovery() {
+ recovery_on_ = 1;
+ // the stategy will be init in the new round function
+}
+
+void RecoveryStrategyLowRate::onNewRound(bool in_sync) {
+ if (!recovery_on_) {
+ // disable fec so that no extra packet will be sent
+ // for rtx we check if recovery is on in newPacketLoss
+ setRtxFec(true, false);
+ indexer_->setNFec(0);
+ return;
+ }
+
+ // XXX since this strategy will be used only for flow at low rate we do not
+ // consider congestion events like in other strategies
+
+ selectRecoveryStrategy(in_sync);
+}
+
+void RecoveryStrategyLowRate::newPacketLoss(uint32_t seq) {
+ if (rtx_on_ && recovery_on_) {
+ addNewRtx(seq, false);
+ } else {
+ if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
+ addNewRtx(seq, true);
+ } else {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ }
+ }
+}
+
+void RecoveryStrategyLowRate::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
new file mode 100644
index 000000000..0e76efaca
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+struct thresholds_t {
+ uint32_t rtt;
+ uint32_t loss_rtx_to_fec; // loss rate used to move from rtx to fec
+ uint32_t loss_fec_to_rtx; // loss rate used to move from fec to rtx
+};
+
+class RecoveryStrategyLowRate : public RecoveryStrategy {
+ public:
+ RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategyLowRate(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyLowRate();
+
+ void turnOnRecovery();
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+
+ private:
+ void initSwitchVector();
+ void setRecoveryParameters(bool use_rtx, bool use_fec, uint32_t fec_to_ask);
+ void selectRecoveryStrategy(bool in_sync);
+
+ uint32_t fec_consecutive_rounds_;
+ uint32_t rtx_allowed_consecutive_rounds_;
+
+ // this table contains the thresholds that indicates when to switch from RTX
+ // to FEC and viceversa. values in the vector are detected with a set of
+ // experiments. the vector is used in the following way: if rtt and loss rate
+ // are less than one of the values in the in the vector, losses are
+ // recovered using RTX. otherwive losses are recovered using FEC. as for FEC
+ // only and delay based strategy, the swith from RTX to FEC is smooth,
+ // meaning that FEC and RTX are used together for some rounds
+ std::vector<thresholds_t> switch_vector;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
new file mode 100644
index 000000000..00c6a0504
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_recovery_off.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
+ rs_type, std::move(external_callback)) {}
+
+RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(false, false);
+}
+
+RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {}
+
+void RecoveryStrategyRecoveryOff::turnOnRecovery() {
+ // nothing to do
+ return;
+}
+void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) {
+ // nothing to do
+ return;
+}
+
+void RecoveryStrategyRecoveryOff::newPacketLoss(uint32_t seq) {
+ // here we only keep track of the lost packets to avoid to
+ // count them multple times in the counters. for this we
+ // use the recover_with_fec_ set
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+}
+
+void RecoveryStrategyRecoveryOff::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
new file mode 100644
index 000000000..3d59cc473
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
+ public:
+ RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyRecoveryOff();
+
+ void turnOnRecovery();
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
new file mode 100644
index 000000000..4d7cf7a82
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rs_rtx_only.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ rs_type, std::move(external_callback)) {}
+
+RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
+ : RecoveryStrategy(std::move(rs)) {
+ setRtxFec(true, false);
+}
+
+RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {}
+
+void RecoveryStrategyRtxOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ setRtxFec(true, false);
+}
+
+void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) {
+ // nothing to do
+ return;
+}
+
+void RecoveryStrategyRtxOnly::newPacketLoss(uint32_t seq) {
+ if (!recovery_on_) {
+ recover_with_fec_.insert(seq);
+ state_->onPossibleLossWithNoRtx(seq);
+ return;
+ }
+ addNewRtx(seq, false);
+}
+
+void RecoveryStrategyRtxOnly::receivedPacket(uint32_t seq) {
+ removePacketState(seq);
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
new file mode 100644
index 000000000..03dbed1c7
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_recovery_strategy.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RecoveryStrategyRtxOnly : public RecoveryStrategy {
+ public:
+ RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
+ asio::io_service &io_service,
+ interface::RtcTransportRecoveryStrategies rs_type,
+ interface::StrategyCallback &&external_callback);
+
+ RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
+
+ ~RecoveryStrategyRtxOnly();
+
+ void turnOnRecovery();
+ void onNewRound(bool in_sync);
+ void newPacketLoss(uint32_t seq);
+ void receivedPacket(uint32_t seq);
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
new file mode 100644
index 000000000..82ac0b9c1
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -0,0 +1,900 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCState::RTCState(Indexer *indexer,
+ ProbeHandler::SendProbeCallback &&probe_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service)
+ : loss_history_(10), // log 10sec history
+ indexer_(indexer),
+ probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback),
+ io_service)),
+ discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
+ init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RTCState::~RTCState() {}
+
+void RTCState::initParams() {
+ // packets counters (total)
+ sent_interests_ = 0;
+ sent_rtx_ = 0;
+ received_data_ = 0;
+ received_nacks_ = 0;
+ received_timeouts_ = 0;
+ received_probes_ = 0;
+
+ // loss counters
+ packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = 0;
+ highest_seq_received_ = 0;
+ highest_seq_received_in_order_ = 0;
+ last_seq_nacked_ = 0;
+ loss_rate_ = 0.0;
+ avg_loss_rate_ = -1.0;
+ last_round_loss_rate_ = 0.0;
+
+ // loss rate per sec
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ per_sec_loss_rate_ = 0.0;
+
+ // residual losses counters
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
+ residual_loss_rate_ = 0.0;
+
+ // fec counters
+ pending_fec_pkt_ = 0;
+ received_fec_pkt_ = 0;
+
+ // bw counters
+ received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
+
+ avg_packet_size_ = INIT_PACKET_SIZE;
+ production_rate_ = 0.0;
+ received_rate_ = 0.0;
+ fec_recovered_rate_ = 0.0;
+
+ // nack counter
+ past_nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ // packets counter
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ // round conunters
+ rounds_ = 0;
+ rounds_without_nacks_ = 0;
+ rounds_without_packets_ = 0;
+
+ last_production_seq_ = 0;
+ producer_is_active_ = false;
+ last_prod_update_seq_ = 0;
+
+ // paths stats
+ path_table_.clear();
+ main_path_ = nullptr;
+ edge_path_ = nullptr;
+
+ // packet cache (not pending anymore)
+ packet_cache_.clear();
+
+ // pending interests
+ pending_interests_.clear();
+
+ // used to keep track of the skipped interest
+ last_interest_sent_ = 0;
+
+ // init rtt
+ first_interest_sent_time_ = ~0;
+ first_interest_sent_seq_ = 0;
+
+ // start probing the producer
+ init_rtt_ = false;
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+}
+
+// packet events
+void RTCState::onSendNewInterest(const core::Name *interest_name) {
+ uint64_t now = utils::SteadyTime::nowMs().count();
+ uint32_t seq = interest_name->getSuffix();
+ pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+
+ if (sent_interests_ == 0) {
+ first_interest_sent_time_ = now;
+ first_interest_sent_seq_ = seq;
+ }
+
+ if (indexer_->isFec(seq)) {
+ pending_fec_pkt_++;
+ }
+
+ if (last_interest_sent_ == 0 && seq != 0) {
+ last_interest_sent_ = seq; // init last interest sent
+ }
+
+ // TODO what happen in case of jumps?
+ eraseFromPacketCache(
+ seq); // if we send this interest we don't know its state
+ for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) {
+ if (indexer_->isFec(i)) {
+ // only fec packets can be skipped
+ addToPacketCache(i, PacketState::SKIPPED);
+ }
+ }
+
+ last_interest_sent_ = seq;
+
+ sent_interests_++;
+ sent_interests_last_round_++;
+}
+
+void RTCState::onTimeout(uint32_t seq, bool lost) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+ received_timeouts_++;
+
+ if (lost) onPacketLost(seq);
+}
+
+void RTCState::onLossDetected(uint32_t seq) {
+ PacketState state = getPacketState(seq);
+
+ // if the packet is already marked with a state, do nothing
+ // to be considered lost the packet must be pending
+ if (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end()) {
+ packets_lost_++;
+ addToPacketCache(seq, PacketState::LOST);
+ }
+}
+
+void RTCState::onRetransmission(uint32_t seq) {
+ // remove the interest for the pendingInterest map only after the first rtx.
+ // in this way we can handle the ooo packets that come in late as normla
+ // packet. we consider a packet lost only if we sent at least an RTX for it.
+ // XXX this may become problematic if we stop the RTX transmissions
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+ sent_rtx_++;
+ sent_rtx_last_round_++;
+}
+
+void RTCState::onPossibleLossWithNoRtx(uint32_t seq) {
+ // if fec is on or rtx is disable we don't need to do anything to recover a
+ // packet. however in both cases we need to remove possible missing packets
+ // from the window of pendinig interest in order to free space without wating
+ // for the timeout.
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+}
+
+void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) {
+ uint32_t seq = content_object.getName().getSuffix();
+
+ if (compute_stats) {
+ updatePathStats(content_object, false);
+ received_data_last_round_++;
+ }
+ received_data_++;
+ packets_sent_to_app_++;
+
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+
+ if (last_prod_update_seq_ < seq) {
+ last_prod_update_seq_ = seq;
+ production_rate_ = (double)params.prod_rate;
+ }
+
+ updatePacketSize(content_object);
+ updateReceivedBytes(content_object, false);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
+ uint32_t seq = content_object.getName().getSuffix();
+ updateReceivedBytes(content_object, true);
+
+ PacketState state = getPacketState(seq);
+ if (state != PacketState::LOST) {
+ // increase only for not lost packets
+ received_fec_pkt_++;
+ }
+ addRecvOrLost(seq, PacketState::RECEIVED);
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+}
+
+void RTCState::onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats) {
+ uint32_t seq = nack.getName().getSuffix();
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint32_t production_seq = nack_pkt->getProductionSegment();
+ uint32_t production_rate = nack_pkt->getProductionRate();
+
+ if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
+ last_prod_update_seq_ < production_seq) {
+ // update production rate
+ last_production_seq_ = production_seq;
+ production_rate_ = (double)production_rate;
+ }
+
+ if (compute_stats) {
+ // this is not an RTX
+ updatePathStats(nack, true);
+ }
+
+ // for statistics pourpose we log all nacks, also the one received for
+ // retransmitted packets
+ received_nacks_++;
+ received_nacks_last_round_++;
+
+ bool to_delete = false;
+ if (production_seq > seq) {
+ // old nack, seq is lost
+ // update last nacked
+ if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "lost packet " << seq << " beacuse of a past nack";
+ if (compute_stats) past_nack_on_last_round_ = true;
+ onPacketLost(seq);
+ } else if (seq > production_seq) {
+ // future nack
+ // remove the nack from the pending interest map
+ // (the packet is not received/lost yet)
+ to_delete = true;
+ } else {
+ // this should be a quite rear event. simply remove the
+ // packet from the pending interest list
+ to_delete = true;
+ }
+
+ if (to_delete) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+ }
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onPacketLost(uint32_t seq) {
+ if (!indexer_->isFec(seq)) {
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST ||
+ (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end())) {
+ definitely_lost_pkt_++;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ }
+ }
+
+ addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
+}
+
+void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt) {
+ uint32_t seq = content_object.getName().getSuffix();
+ packets_sent_to_app_++;
+
+ // increase the recovered packet counter only if the packet was marked as LOST
+ // before.
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST) losses_recovered_++;
+
+ addRecvOrLost(seq, PacketState::RECEIVED);
+ updateReceivedBytes(content_object, false);
+
+ if (rtt == 0) return; // nothing to do
+
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+ if (path_it == path_table_.end()) {
+ // this is a new path and it must be a cache
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+ if (path->pathToProducer())
+ return; // this packet is coming from a producer
+ // even if we sent an RTX. this may happen
+ // for RTX that are sent too fast or in
+ // case of multipath
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
+}
+
+void RTCState::onFecPacketRecoveredRtx(
+ const core::ContentObject &content_object) {
+ // This is the same as onPacketRecoveredRtx, but in this is case the
+ // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
+ losses_recovered_++;
+ updateReceivedBytes(content_object, true);
+}
+
+void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
+ losses_recovered_++;
+ packets_sent_to_app_++;
+ recovered_bytes_with_fec_ += size;
+
+ // adding header to the count
+ recovered_bytes_with_fec_ += 60; // XXX get header size some where
+
+ // the packet could be not marked as lost yet. onLossDetected checks if add in
+ // the packet in the lost count or not
+ onLossDetected(seq);
+
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
+
+ bool is_valid = true;
+ uint32_t max = UINT32_MAX;
+ if (params.prod_rate == max) is_valid = false;
+
+ uint64_t rtt;
+ rtt = probe_handler_->getRtt(seq, is_valid);
+ if (rtt == 0) return false; // this is not a valid probe
+
+ if (!is_valid) return false; // not a valid probe
+
+ // if we are here the producer is active
+ producer_is_active_ = true;
+
+ // Like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to estimate
+ // info on the path.
+ uint32_t path_label = probe.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
+ path->receivedNack();
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ int64_t OWD = now - params.timestamp;
+ path->insertOwdSample(OWD);
+
+ if (last_prod_update_seq_ < params.prod_seg) {
+ last_production_seq_ = params.prod_seg;
+ production_rate_ = (double)params.prod_rate;
+ }
+
+ // check for init RTT. if received_probes_ is equal to 0 schedule a timer to
+ // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
+ // wait forever
+ received_probes_++;
+
+ if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) {
+ if (received_probes_ == 1) {
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the
+ // others.
+ main_path_ = path;
+ setInitRttTimer(INIT_RTT_PROBE_WAIT);
+ }
+ if (received_probes_ == INIT_RTT_PROBES) {
+ // we are done
+ init_rtt_timer_->cancel();
+ checkInitRttTimer();
+ }
+ }
+
+ received_packets_last_round_++;
+
+ // ignore probes sent before the first interest
+ if ((now - rtt) <= first_interest_sent_time_) return false;
+ return true;
+}
+
+void RTCState::onJumpForward(uint32_t next_seq) {
+ for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
+ seq++) {
+ PacketState packet_state = getPacketState(seq);
+ if (packet_state != PacketState::RECEIVED &&
+ packet_state != PacketState::DEFINITELY_LOST) {
+ // here we considere the packet as definitely lost whitout increase the
+ // lost packet counter because this loss is not due to the network
+ // condition but the transport wants to skip the packet
+ onPacketLost(seq);
+ }
+ }
+}
+
+void RTCState::onNewRound(double round_len, bool in_sync) {
+ if (path_table_.empty()) return;
+
+ double bytes_per_sec =
+ ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
+ if (received_rate_ == 0)
+ received_rate_ = bytes_per_sec;
+ else
+ received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+ double fec_bytes_per_sec =
+ ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len));
+
+ if (fec_received_rate_ == 0)
+ fec_received_rate_ = fec_bytes_per_sec;
+ else
+ fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec);
+
+ double fec_recovered_bytes_per_sec =
+ ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len));
+
+ if (fec_recovered_rate_ == 0)
+ fec_recovered_rate_ = fec_recovered_bytes_per_sec;
+ else
+ fec_recovered_rate_ =
+ (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
+
+ // search for an active path. Is it possible to have multiple path that are
+ // used at the same time. We use as reference path the one from where we gets
+ // more packets. This means that the path should have better lantecy or less
+ // channel losses
+
+ uint32_t last_round_packets = 0;
+ uint64_t min_edge_rtt = UINT_MAX;
+ std::shared_ptr<RTCDataPath> old_main_path = main_path_;
+ main_path_ = nullptr;
+ edge_path_ = nullptr;
+
+ for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
+ if (it->second->isValidProducer()) {
+ uint32_t pkt = it->second->getPacketsLastRound();
+ if (pkt > last_round_packets) {
+ last_round_packets = pkt;
+ main_path_ = it->second;
+ }
+ } else if (it->second->isActive() && !it->second->pathToProducer()) {
+ // this is a path to a cache from where we are receiving content
+ if (it->second->getMinRtt() < min_edge_rtt) {
+ min_edge_rtt = it->second->getMinRtt();
+ edge_path_ = it->second;
+ }
+ }
+ it->second->roundEnd();
+ }
+
+ if (main_path_ == nullptr) main_path_ = old_main_path;
+ if (edge_path_ == nullptr) edge_path_ = main_path_;
+ if (edge_path_->getMinRtt() >= main_path_->getMinRtt())
+ edge_path_ = main_path_;
+
+ // in case we get a new main path we reset the stats of the old one. this is
+ // beacuse, in case we need to switch back we don't what to take decisions on
+ // old stats that may be outdated.
+ if (main_path_ != old_main_path) old_main_path->clearRtt();
+
+ updateLossRate(in_sync);
+
+ // handle nacks
+ if (!past_nack_on_last_round_ && received_bytes_ > 0) {
+ rounds_without_nacks_++;
+ } else {
+ rounds_without_nacks_ = 0;
+ }
+
+ // check if the producer is active
+ if (received_packets_last_round_ != 0) {
+ rounds_without_packets_ = 0;
+ } else {
+ rounds_without_packets_++;
+ if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS &&
+ producer_is_active_ != false) {
+ initParams();
+ }
+ }
+
+ // reset counters
+ received_bytes_ = 0;
+ received_fec_bytes_ = 0;
+ recovered_bytes_with_fec_ = 0;
+ packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = highest_seq_received_;
+
+ past_nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ received_fec_pkt_ = 0;
+
+ rounds_++;
+}
+
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec) {
+ if (isFec) {
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ } else {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ }
+}
+
+void RTCState::updatePacketSize(const core::ContentObject &content_object) {
+ uint32_t pkt_size =
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) +
+ ((1 - MOVING_AVG_ALPHA) * pkt_size);
+}
+
+void RTCState::updatePathStats(const core::ContentObject &content_object,
+ bool is_nack) {
+ // get packet path
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ // compute rtt
+ uint32_t seq = content_object.getName().getSuffix();
+ uint64_t interest_sent_time = getInterestSentTime(seq);
+ if (interest_sent_time == 0)
+ return; // this should not happen,
+ // it means that we are processing an interest
+ // that is not pending
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ uint64_t RTT = now - interest_sent_time;
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false);
+
+ // compute OWD (the first part of the nack and data packet header are the
+ // same, so we cast to data data packet)
+ core::ParamsRTC params = RTCState::getDataParams(content_object);
+ int64_t OWD = now - params.timestamp;
+ path->insertOwdSample(OWD);
+
+ // compute IAT or set path to producer
+ if (!is_nack) {
+ // compute the iat only for the content packets
+ uint32_t segment_number = content_object.getName().getSuffix();
+ path->computeInterArrivalGap(segment_number);
+ if (!path->pathToProducer()) received_data_from_cache_++;
+ } else {
+ path->receivedNack();
+ }
+}
+
+void RTCState::updateLossRate(bool in_sync) {
+ last_round_loss_rate_ = loss_rate_;
+ loss_rate_ = 0.0;
+
+ uint32_t number_theorically_received_packets_ =
+ highest_seq_received_ - first_seq_in_round_;
+
+ // XXX this may be quite inefficient if the rate is high
+ // maybe is better to iterate over the set?
+
+ uint32_t fec_packets = 0;
+ for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) {
+ PacketState state = getPacketState(i);
+ if (state == PacketState::SKIPPED) {
+ if (number_theorically_received_packets_ > 0)
+ number_theorically_received_packets_--;
+ }
+ if (indexer_->isFec(i)) fec_packets++;
+ }
+ if (indexer_->isFec(highest_seq_received_)) fec_packets++;
+
+ // in this case no new packet was received after the previous round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return;
+
+ if (number_theorically_received_packets_ != 0)
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+ else
+ // we didn't receive anything except NACKs that triggered losses
+ loss_rate_ = 1.0;
+
+ if (avg_loss_rate_ == -1.0)
+ avg_loss_rate_ = loss_rate_;
+ else
+ avg_loss_rate_ =
+ avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA);
+
+ // update counters for loss rate per second
+ total_expected_packets_ += number_theorically_received_packets_;
+ lost_per_sec_ += packets_lost_;
+
+ if (in_sync) {
+ // update counters for residual losses
+ // fec packets are not sent to the app so we don't want to count them here
+ expected_packets_ +=
+ ((highest_seq_received_ - first_seq_in_round_) - fec_packets);
+ } else {
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ }
+
+ if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) {
+ // compute loss rate per second
+ if (lost_per_sec_ > total_expected_packets_)
+ lost_per_sec_ = total_expected_packets_;
+
+ if (total_expected_packets_ == 0)
+ per_sec_loss_rate_ = 0;
+ else
+ per_sec_loss_rate_ =
+ (double)((double)(lost_per_sec_) / (double)total_expected_packets_);
+
+ loss_history_.pushBack(per_sec_loss_rate_);
+
+ if (in_sync && expected_packets_ != 0) {
+ // compute residual loss rate
+ if (packets_sent_to_app_ > expected_packets_) {
+ // this may happen if we get packet from the prev bin that get recovered
+ // on the current one
+ packets_sent_to_app_ = expected_packets_;
+ }
+
+ residual_loss_rate_ =
+ 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_);
+ if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0;
+ }
+
+ lost_per_sec_ = 0;
+ total_expected_packets_ = 0;
+ expected_packets_ = 0;
+ packets_sent_to_app_ = 0;
+ rounds_from_last_compute_ = 0;
+ }
+
+ rounds_from_last_compute_++;
+}
+
+void RTCState::dataToBeReceived(uint32_t seq) {
+ addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
+}
+
+void RTCState::updateHighestSeqReceived(uint32_t seq) {
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+}
+
+void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
+ }
+
+ addToPacketCache(seq, state);
+
+ // keep track of the last packet received/lost
+ // without holes.
+ if (highest_seq_received_in_order_ < last_seq_nacked_) {
+ highest_seq_received_in_order_ = last_seq_nacked_;
+ }
+
+ if ((highest_seq_received_in_order_ + 1) == seq) {
+ highest_seq_received_in_order_ = seq;
+ } else if (seq <= highest_seq_received_in_order_) {
+ // here we do nothing
+ } else if (seq > highest_seq_received_in_order_) {
+ // 1) there is a gap in the sequence so we do not update
+ // highest_seq_received_in_order_
+ // 2) all the packets from highest_seq_received_in_order_ to seq are
+ // received or lost or are fec packetis. In this case we increase
+ // highest_seq_received_in_order_ until we find an hole in the sequence
+
+ for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
+ PacketState state = getPacketState(i);
+ if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) {
+ if (indexer_->isFec(i)) {
+ // this is a fec packet and we don't care to receive it
+ // however we may need to increse the number or lost packets
+ // XXX: in case we want to use rtx to recover fec packets,
+ // this may prevent to detect a packet loss and no rtx will be sent
+ if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) {
+ onLossDetected(i);
+ }
+ } else {
+ // this is a data packet and we need to get it
+ break;
+ }
+ }
+ // this packet is in order so we can update the
+ // highest_seq_received_in_order_
+ highest_seq_received_in_order_ = i;
+ }
+ }
+}
+
+void RTCState::setInitRttTimer(uint32_t wait) {
+ init_rtt_timer_->cancel();
+ init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
+
+ std::weak_ptr<RTCState> self = shared_from_this();
+ init_rtt_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+
+ if (auto ptr = self.lock()) {
+ ptr->checkInitRttTimer();
+ }
+ });
+}
+
+void RTCState::checkInitRttTimer() {
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV ||
+ probe_handler_->getProbeLossRate() == 1.0) {
+ // we didn't received enough probes or they were not valid, restart
+ received_probes_ = 0;
+ probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
+ probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ probe_handler_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+ return;
+ }
+
+ init_rtt_ = true;
+ main_path_->roundEnd();
+ loss_history_.pushBack(probe_handler_->getProbeLossRate());
+
+ probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
+ probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
+ probe_handler_->sendProbes();
+
+ // init last_seq_nacked_. skip packets that may come from the cache
+ double prod_rate = getProducerRate();
+ double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
+ double packet_size = getAveragePacketSize();
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt));
+ last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
+
+ discovered_rtt_callback_();
+}
+
+core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params;
+
+ switch (ProbeHandler::getProbeType(seq)) {
+ case ProbeType::INIT: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(probe).shared_from_this());
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ case ProbeType::RTT: {
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = probe_pkt->getTimestamp(),
+ .prod_rate = probe_pkt->getProductionRate(),
+ .prod_seg = probe_pkt->getProductionSegment(),
+ };
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
+core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
+ core::ParamsRTC params;
+
+ switch (data.getPayloadType()) {
+ case core::PayloadType::DATA: {
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)data.getPayload()->data();
+ params = core::ParamsRTC{
+ .timestamp = data_pkt->getTimestamp(),
+ .prod_rate = data_pkt->getProductionRate(),
+ .prod_seg = data.getName().getSuffix(),
+ };
+ break;
+ }
+ case core::PayloadType::MANIFEST: {
+ core::ContentObjectManifest manifest(
+ const_cast<core::ContentObject &>(data).shared_from_this());
+ manifest.decode();
+ params = manifest.getParamsRTC();
+ break;
+ }
+ default:
+ break;
+ }
+
+ return params;
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
new file mode 100644
index 000000000..ac3cc621f
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -0,0 +1,410 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <core/facade.h>
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/utils/rtc_quality_score.h>
+#include <protocols/indexer.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_data_path.h>
+#include <utils/max_filter.h>
+
+#include <map>
+#include <set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// packet state
+// RECEIVED: the packet was already received
+// LOST: the packet is marked as lost but can be recovered
+// DEFINITELY_LOST: the packet is lost and cannot be recovered
+// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec
+// decoder may decide to send the packet directly to the app. to avoid
+// duplicated the packet is marked with this state
+// SKIPPED: an interest that was not sent, only for FEC packets
+// UNKNOWN: unknown state
+enum class PacketState : uint8_t {
+ RECEIVED,
+ TO_BE_RECEIVED,
+ LOST,
+ DEFINITELY_LOST,
+ SKIPPED,
+ UNKNOWN
+};
+
+class RTCState : public std::enable_shared_from_this<RTCState> {
+ using PendingInterestsMap = std::map<uint32_t, uint64_t>;
+
+ private:
+ const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small
+ // for high rate apps
+
+ public:
+ using DiscoveredRttCallback = std::function<void()>;
+
+ public:
+ RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service);
+
+ ~RTCState();
+
+ // initialization
+ void initParams();
+
+ // packet events
+ void onSendNewInterest(const core::Name *interest_name);
+ void onTimeout(uint32_t seq, bool lost);
+ void onLossDetected(uint32_t seq);
+ void onRetransmission(uint32_t seq);
+ void onPossibleLossWithNoRtx(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+ void onFecPacketReceived(const core::ContentObject &content_object);
+ void onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats);
+ void onPacketLost(uint32_t seq);
+ void onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt);
+ void onFecPacketRecoveredRtx(const core::ContentObject &content_object);
+ void onPacketRecoveredFec(uint32_t seq, uint32_t size);
+ bool onProbePacketReceived(const core::ContentObject &probe);
+ void onJumpForward(uint32_t next_seq);
+
+ // protocol state
+ void onNewRound(double round_len, bool in_sync);
+
+ // main path
+ uint32_t getProducerPath() const {
+ if (mainPathIsValid()) return main_path_->getPathId();
+ return 0;
+ }
+
+ // delay metrics
+ bool isRttDiscovered() const { return init_rtt_; }
+
+ uint64_t getMinRTT() const {
+ if (mainPathIsValid()) return main_path_->getMinRtt();
+ return 0;
+ }
+
+ uint64_t getAvgRTT() const {
+ if (mainPathIsValid()) return main_path_->getAvgRtt();
+ return 0;
+ }
+
+ uint64_t getMaxRTT() const {
+ if (mainPathIsValid()) return main_path_->getMaxRtt();
+ return 0;
+ }
+
+ uint64_t getEdgeRtt() const {
+ if (edge_path_ != nullptr) return edge_path_->getMinRtt();
+ return 0;
+ }
+
+ void resetRttStats() {
+ if (mainPathIsValid()) main_path_->clearRtt();
+ }
+
+ double getQueuing() const {
+ if (mainPathIsValid()) return main_path_->getQueuingDealy();
+ return 0.0;
+ }
+ double getIAT() const {
+ if (mainPathIsValid()) return main_path_->getInterArrivalGap();
+ return 0.0;
+ }
+
+ double getJitter() const {
+ if (mainPathIsValid()) return main_path_->getJitter();
+ return 0.0;
+ }
+
+ // pending interests
+ uint64_t getInterestSentTime(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) return it->second;
+
+ return 0;
+ }
+
+ bool isPending(uint32_t seq) {
+ if (pending_interests_.find(seq) != pending_interests_.end()) return true;
+ return false;
+ }
+
+ uint32_t getPendingInterestNumber() const {
+ return (uint32_t)pending_interests_.size();
+ }
+
+ PacketState getPacketState(uint32_t seq) {
+ auto it = packet_cache_.find(seq);
+ if (it != packet_cache_.end()) return it->second;
+ return PacketState::UNKNOWN;
+ }
+
+ // loss rate
+ double getPerRoundLossRate() const { return loss_rate_; }
+ double getPerSecondLossRate() const { return per_sec_loss_rate_; }
+ double getAvgLossRate() const { return avg_loss_rate_; }
+ double getMaxLossRate() const {
+ if (loss_history_.size() != 0) return loss_history_.begin();
+ return 0;
+ }
+
+ double getLastRoundLossRate() const { return last_round_loss_rate_; }
+ double getResidualLossRate() const { return residual_loss_rate_; }
+
+ uint32_t getLostData() const { return packets_lost_; };
+ uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ uint32_t getDefinitelyLostPackets() const { return definitely_lost_pkt_; }
+
+ uint32_t getHighestSeqReceived() const { return highest_seq_received_; }
+
+ uint32_t getHighestSeqReceivedInOrder() const {
+ return highest_seq_received_in_order_;
+ }
+
+ // fec packets
+ uint32_t getReceivedFecPackets() const { return received_fec_pkt_; }
+ uint32_t getPendingFecPackets() const { return pending_fec_pkt_; }
+
+ // generic stats
+ uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; }
+ uint32_t getRecoveredFecBytesInRound() const {
+ return recovered_bytes_with_fec_;
+ }
+ uint32_t getReceivedNacksInRound() const {
+ return received_nacks_last_round_;
+ }
+ uint32_t getReceivedDataInRound() const { return received_data_last_round_; }
+ uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
+ uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
+
+ // bandwidth/production metrics
+ double getAvailableBw() const { return 0.0; }; // TODO
+ double getProducerRate() const { return production_rate_; }
+ double getReceivedRate() const { return received_rate_; }
+ double getReceivedFecRate() const { return fec_received_rate_; }
+ double getRecoveredFecRate() const { return fec_recovered_rate_; }
+
+ double getAveragePacketSize() const { return avg_packet_size_; }
+
+ // nacks
+ uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; }
+ uint32_t getLastSeqNacked() const { return last_seq_nacked_; }
+
+ // producer state
+ bool isProducerActive() const { return producer_is_active_; }
+
+ // packets from cache
+ // this should be called at the end of a round beacuse otherwise we may have
+ // not enough packets to get a good stat
+ double getPacketFromCacheRatio() const {
+ if (received_data_last_round_ == 0) return 0;
+ return (double)received_data_from_cache_ /
+ (double)received_data_last_round_;
+ }
+
+ PendingInterestsMap::iterator getPendingInterestsMapBegin() {
+ return pending_interests_.begin();
+ }
+ PendingInterestsMap::iterator getPendingInterestsMapEnd() {
+ return pending_interests_.end();
+ }
+
+ // quality
+ uint8_t getQualityScore() {
+ uint8_t qs = quality_score_.getQualityScore(
+ getMaxRTT(), std::round(getResidualLossRate() * 100));
+ return qs;
+ }
+
+ // We received a data pkt that will be set to RECEIVED, but first we have to
+ // go through FEC. We do not want to consider this pkt as recovered, thus we
+ // set it as TO_BE_RECEIVED.
+ void dataToBeReceived(uint32_t seq);
+
+ void updateHighestSeqReceived(uint32_t seq);
+
+ // Extract RTC parameters from probes (init or RTT probes) and data packets.
+ static core::ParamsRTC getProbeParams(const core::ContentObject &probe);
+ static core::ParamsRTC getDataParams(const core::ContentObject &data);
+
+ private:
+ void addToPacketCache(uint32_t seq, PacketState state) {
+ // this function adds or updates the current state
+ if (packet_cache_.size() >= MAX_CACHED_PACKETS) {
+ packet_cache_.erase(packet_cache_.begin());
+ }
+ packet_cache_[seq] = state;
+ }
+
+ void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); }
+
+ // update stats
+ void updateState();
+ void updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec);
+ void updatePacketSize(const core::ContentObject &content_object);
+ void updatePathStats(const core::ContentObject &content_object, bool is_nack);
+ void updateLossRate(bool in_sycn);
+
+ void addRecvOrLost(uint32_t seq, PacketState state);
+
+ void setInitRttTimer(uint32_t wait);
+ void checkInitRttTimer();
+
+ bool mainPathIsValid() const {
+ if (main_path_ != nullptr)
+ return true;
+ else
+ return false;
+ }
+
+ // packets counters (total)
+ uint32_t sent_interests_;
+ uint32_t sent_rtx_;
+ uint32_t received_data_;
+ uint32_t received_nacks_;
+ uint32_t received_timeouts_;
+ uint32_t received_probes_;
+
+ // loss counters
+ int32_t packets_lost_;
+ int32_t losses_recovered_;
+ uint32_t definitely_lost_pkt_;
+ uint32_t first_seq_in_round_;
+ uint32_t highest_seq_received_;
+ uint32_t highest_seq_received_in_order_;
+ uint32_t last_seq_nacked_; // segment for which we got an oldNack
+ double loss_rate_;
+ double avg_loss_rate_;
+ double last_round_loss_rate_;
+ utils::MaxFilter<double> loss_history_;
+
+ // per second loss rate
+ uint32_t lost_per_sec_;
+ uint32_t total_expected_packets_;
+ double per_sec_loss_rate_;
+
+ // conunters for residual losses
+ // residual losses are computed every second and are used
+ // as feedback to the upper levels (e.g application)
+ uint32_t expected_packets_;
+ uint32_t packets_sent_to_app_;
+ uint32_t rounds_from_last_compute_;
+ double residual_loss_rate_;
+
+ // bw counters
+ uint32_t received_bytes_;
+ uint32_t received_fec_bytes_;
+ uint32_t recovered_bytes_with_fec_;
+ double avg_packet_size_;
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer (only data)
+ double fec_received_rate_; // fec rate recevied by the consumer
+ double fec_recovered_rate_; // rate recovered using fec
+
+ // nack counters
+ // the bool takes tracks only about the valid past nacks (no rtx) and it is
+ // used to switch between the states. Instead received_nacks_last_round_ logs
+ // all the nacks for statistics
+ bool past_nack_on_last_round_;
+ uint32_t received_nacks_last_round_;
+
+ // packets counters
+ uint32_t received_packets_last_round_;
+ uint32_t received_data_last_round_;
+ uint32_t received_data_from_cache_;
+ uint32_t sent_interests_last_round_;
+ uint32_t sent_rtx_last_round_;
+
+ // fec counters
+ uint32_t received_fec_pkt_;
+ uint32_t pending_fec_pkt_;
+
+ // round counters
+ uint32_t rounds_;
+ uint32_t rounds_without_nacks_;
+ uint32_t rounds_without_packets_;
+
+ // init rtt
+ uint64_t first_interest_sent_time_;
+ uint32_t first_interest_sent_seq_;
+
+ // producer state
+ bool
+ producer_is_active_; // the prodcuer is active if we receive some packets
+ uint32_t last_production_seq_; // last production seq received by the
+ // producer used to init the sync protcol
+ uint32_t last_prod_update_seq_; // seq number of the last packet used to
+ // update the update from the producer.
+ // assumption: the highest seq number carries
+ // the most up to date info. in case of
+ // probes we look at the produced seq number
+
+ // paths stats
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
+ std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects
+ // the consumer to the producer. in
+ // case of multipath the trasnport
+ // uses the most active path
+ std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it
+ // exists
+
+ // packet received
+ // cache where to store info about the last MAX_CACHED_PACKETS
+ // these are packets that are received or lost or definitely lost and are not
+ // anymore in the pending intetest list
+ std::map<uint32_t, PacketState> packet_cache_;
+
+ // pending interests
+ PendingInterestsMap pending_interests_;
+
+ // indexer
+ Indexer *indexer_;
+
+ // used to keep track of the skipped interests
+ uint32_t last_interest_sent_;
+
+ // probes
+ std::shared_ptr<ProbeHandler> probe_handler_;
+ bool init_rtt_;
+ std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+
+ // quality score
+ RTCQualityScore quality_score_;
+
+ // callbacks
+ DiscoveredRttCallback discovered_rtt_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
new file mode 100644
index 000000000..60fce92a5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -0,0 +1,248 @@
+/*
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/facade.h>
+#include <protocols/rtc/rtc_packet.h>
+#include <protocols/rtc/rtc_verifier.h>
+
+namespace transport {
+namespace protocol {
+namespace rtc {
+
+RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t factor_relevant, uint32_t factor_alert)
+ : verifier_(verifier),
+ factor_relevant_(factor_relevant),
+ factor_alert_(factor_alert),
+ manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {}
+
+void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
+ rtc_state_ = rtc_state;
+}
+
+void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
+ verifier_ = verifier;
+}
+
+void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) {
+ factor_relevant_ = factor_relevant;
+}
+
+void RTCVerifier::setFactorAlert(uint32_t factor_alert) {
+ factor_alert_ = factor_alert;
+}
+
+auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) {
+ return verifier_->verifyPackets(&interest);
+}
+
+auth::VerificationPolicy RTCVerifier::verify(
+ core::ContentObject &content_object, bool is_fec) {
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy default_policy = auth::VerificationPolicy::ABORT;
+
+ core::PayloadType payload_type = content_object.getPayloadType();
+ bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE;
+ bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
+ bool is_manifest = !is_probe && !is_nack && !is_fec &&
+ payload_type == core::PayloadType::MANIFEST;
+ bool is_data = !is_probe && !is_nack && !is_fec &&
+ payload_type == core::PayloadType::DATA;
+
+ if (is_probe) return verifyProbe(content_object);
+ if (is_nack) return verifyNack(content_object);
+ if (is_fec) return verifyFec(content_object);
+ if (is_data) return verifyData(content_object);
+ if (is_manifest) return verifyManifest(content_object);
+
+ verifier_->callVerificationFailedCallback(suffix, default_policy);
+ return default_policy;
+}
+
+auth::VerificationPolicy RTCVerifier::verifyProbe(
+ core::ContentObject &content_object) {
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+
+ switch (ProbeHandler::getProbeType(suffix)) {
+ case ProbeType::INIT:
+ policy = verifyManifest(content_object);
+ if (policy == auth::VerificationPolicy::ACCEPT) {
+ policy = processManifest(content_object);
+ }
+ break;
+ case ProbeType::RTT:
+ policy = verifyNack(content_object);
+ break;
+ default:
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ break;
+ }
+
+ return policy;
+}
+
+auth::VerificationPolicy RTCVerifier::verifyNack(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::verifyFec(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::verifyData(
+ core::ContentObject &content_object) {
+ if (HICN_PACKET_FORMAT_IS_AH(content_object.getFormat())) {
+ return verifier_->verifyPackets(&content_object);
+ }
+
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+
+ uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_;
+ uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_;
+
+ // Flush packets outside relevance window
+ for (auto it = packets_unverif_.set().begin();
+ it != packets_unverif_.set().end();) {
+ if (it->first > current_index_ - threshold_relevant) {
+ break;
+ }
+ packets_unverif_erased_.insert((unsigned int)it->first);
+ it = packets_unverif_.remove(it);
+ }
+
+ // Add packet to set of unverified packets
+ packets_unverif_.add({current_index_, suffix},
+ content_object.computeDigest(manifest_hash_algo_));
+ current_index_++;
+
+ // Check that the number of unverified packets is below the alert threshold
+ if (packets_unverif_.set().size() <= threshold_alert) {
+ policy = auth::VerificationPolicy::ACCEPT;
+ }
+
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
+}
+
+auth::VerificationPolicy RTCVerifier::verifyManifest(
+ core::ContentObject &content_object) {
+ return verifier_->verifyPackets(&content_object);
+}
+
+auth::VerificationPolicy RTCVerifier::processManifest(
+ core::ContentObject &content_object) {
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT;
+
+ // Decode manifest
+ core::ContentObjectManifest manifest(content_object.shared_from_this());
+ manifest.decode();
+
+ // Extract manifest data
+ manifest_max_capacity_ = manifest.getMaxCapacity();
+ manifest_hash_algo_ = manifest.getHashAlgorithm();
+ auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap();
+
+ // Return early if the manifest is empty
+ if (suffix_map.empty()) {
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
+ }
+
+ // Add hashes to map of all manifest hashes
+ manifest_digests_.insert(suffix_map.begin(), suffix_map.end());
+
+ // Remove discarded and definitely lost packets from digest map
+ for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) {
+ auto it_erased = packets_unverif_erased_.find(it->first);
+
+ if (it_erased != packets_unverif_erased_.end()) {
+ packets_unverif_erased_.erase(it_erased);
+ it = manifest_digests_.erase(it);
+ continue;
+ }
+
+ if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) {
+ it = manifest_digests_.erase(it);
+ continue;
+ }
+
+ ++it;
+ }
+
+ // Verify packets
+ auth::Verifier::PolicyMap policies =
+ verifier_->verifyHashes(packets_unverif_.suffixMap(), manifest_digests_);
+
+ for (const auto &p : policies) {
+ switch (p.second) {
+ case auth::VerificationPolicy::ACCEPT: {
+ packets_unverif_.remove(packets_unverif_.packet(p.first));
+ manifest_digests_.erase(p.first);
+ break;
+ }
+ case auth::VerificationPolicy::UNKNOWN:
+ break;
+ case auth::VerificationPolicy::DROP:
+ case auth::VerificationPolicy::ABORT:
+ return p.second;
+ }
+ }
+
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
+}
+
+void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
+ manifest_digests_.erase(suffix);
+}
+
+std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add(
+ const Packet &packet, const auth::CryptoHash &digest) {
+ auto inserted = packets_.insert(packet);
+ if (inserted.second) {
+ packets_map_[packet.second] = inserted.first;
+ suffix_map_[packet.second] = digest;
+ }
+ return inserted;
+}
+
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove(
+ PacketSet::iterator packet_it) {
+ packets_map_.erase(packet_it->second);
+ suffix_map_.erase(packet_it->second);
+ return packets_.erase(packet_it);
+}
+
+const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const {
+ return packets_;
+};
+
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet(
+ auth::Suffix suffix) {
+ return packets_map_.at(suffix);
+};
+
+const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const {
+ return suffix_map_;
+}
+
+} // end namespace rtc
+} // end namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
new file mode 100644
index 000000000..c83faf08a
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/facade.h>
+#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/content_object.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+namespace protocol {
+namespace rtc {
+
+class RTCVerifier {
+ public:
+ explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t factor_relevant, uint32_t factor_alert);
+
+ virtual ~RTCVerifier() = default;
+
+ void setState(std::shared_ptr<RTCState> rtc_state);
+ void setVerifier(std::shared_ptr<auth::Verifier> verifier);
+ void setFactorRelevant(uint32_t factor_relevant);
+ void setFactorAlert(uint32_t factor_alert);
+
+ auth::VerificationPolicy verify(core::Interest &interest);
+ auth::VerificationPolicy verify(core::ContentObject &content_object,
+ bool is_fec = false);
+ auth::VerificationPolicy verifyProbe(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyNack(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyFec(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyData(core::ContentObject &content_object);
+ auth::VerificationPolicy verifyManifest(core::ContentObject &content_object);
+
+ auth::VerificationPolicy processManifest(core::ContentObject &content_object);
+
+ void onDataRecoveredFec(uint32_t suffix);
+
+ protected:
+ using Index = uint64_t;
+ using Packet = std::pair<Index, auth::Suffix>;
+ using PacketSet = std::set<Packet>;
+
+ class Packets {
+ public:
+ std::pair<PacketSet::iterator, bool> add(const Packet &packet,
+ const auth::CryptoHash &digest);
+ PacketSet::iterator remove(PacketSet::iterator packet_it);
+ const PacketSet &set() const;
+ PacketSet::iterator packet(auth::Suffix suffix);
+ const auth::Verifier::SuffixMap &suffixMap() const;
+
+ private:
+ PacketSet packets_;
+ std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_;
+ auth::Verifier::SuffixMap suffix_map_;
+ };
+
+ // The RTC state.
+ std::shared_ptr<RTCState> rtc_state_;
+ // The verifier instance.
+ std::shared_ptr<auth::Verifier> verifier_;
+ // Used to compute the relevance windows size (in packets).
+ uint32_t factor_relevant_;
+ // Used to compute the alert threshold (in packets).
+ uint32_t factor_alert_;
+ // The maximum number of entries a manifest can contain.
+ uint8_t manifest_max_capacity_;
+ // Hash algorithm used by manifests.
+ auth::CryptoHashType manifest_hash_algo_;
+ // Digests extracted from all manifests received.
+ auth::Verifier::SuffixMap manifest_digests_;
+ // The number of data packets processed.
+ Index current_index_;
+ // Unverified packets with index in relevance window.
+ Packets packets_unverif_;
+ // Unverified erased packets with index outside relevance window.
+ std::unordered_set<auth::Suffix> packets_unverif_erased_;
+};
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport