summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc616
1 files changed, 457 insertions, 159 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index 0cb4cda1d..df6522471 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -22,7 +22,7 @@
#include <protocols/rtc/rtc.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_indexer.h>
-#include <protocols/rtc/rtc_rc_queue.h>
+#include <protocols/rtc/rtc_rc_congestion_detection.h>
#include <algorithm>
@@ -37,13 +37,15 @@ using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
- new DatagramReassembly(icn_socket, this)),
+ new RtcReassembly(icn_socket, this)),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
- round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
scheduler_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getIoService());
- pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ pacing_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
}
RTCTransportProtocol::~RTCTransportProtocol() {}
@@ -61,25 +63,52 @@ std::size_t RTCTransportProtocol::transportHeaderLength() {
// private
void RTCTransportProtocol::initParams() {
TransportProtocol::reset();
+ fwd_strategy_.setCallback(on_fwd_strategy_);
- rc_ = std::make_shared<RTCRateControlQueue>();
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ uint32_t max_unverified_delay;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME,
+ max_unverified_delay);
+
+ rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
- indexer_verifier_.get(),
- std::bind(&RTCTransportProtocol::sendRtxInterest, this,
- std::placeholders::_1),
- portal_->getIoService());
+ indexer_verifier_.get(), portal_->getThread().getIoService(),
+ interface::RtcTransportRecoveryStrategies::RTX_ONLY,
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendRtxInterest(seq);
+ }
+ },
+ on_rec_strategy_);
+ verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
- std::bind(&RTCTransportProtocol::sendProbeInterest, this,
- std::placeholders::_1),
- std::bind(&RTCTransportProtocol::discoveredRtt, this),
- portal_->getIoService());
+ [self](uint32_t seq) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->sendProbeInterest(seq);
+ }
+ },
+ [self]() {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ ptr->discoveredRtt();
+ }
+ },
+ portal_->getThread().getIoService());
+ state_->initParams();
rc_->setState(state_);
- // TODO: for the moment we keep the congestion control disabled
- // rc_->tunrOnRateControl();
- ldr_->setState(state_);
+ rc_->turnOnRateControl();
+ ldr_->setState(state_.get());
+ ldr_->setRateControl(rc_.get());
+ verifier_->setState(state_);
// protocol state
start_send_interest_ = false;
@@ -102,6 +131,10 @@ void RTCTransportProtocol::initParams() {
}
#else
max_aggregated_interest_ = 1;
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) {
+ LOG(INFO) << "Max Aggregated: " << max_aggr;
+ max_aggregated_interest_ = std::stoul(std::string(max_aggr));
+ }
#endif
max_sent_int_ =
@@ -131,6 +164,7 @@ void RTCTransportProtocol::initParams() {
indexer_verifier_->setNFec(0);
ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
fec::FECUtils::getSourceSymbols(fec_type_));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
} else {
indexer_verifier_->disableFec();
}
@@ -162,61 +196,97 @@ void RTCTransportProtocol::inactiveProducer() {
void RTCTransportProtocol::newRound() {
round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
- // TODO pass weak_ptr here
- round_timer_->async_wait([this, n{number_}](std::error_code ec) {
- if (ec) return;
- if (n != number_) {
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ round_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) {
return;
}
+ auto ptr = self.lock();
+
+ if (!ptr || !ptr->isRunning()) {
+ return;
+ }
+
+ auto &state = ptr->state_;
+
// saving counters that will be reset on new round
- uint32_t sent_retx = state_->getSentRtxInRound();
- uint32_t received_bytes = state_->getReceivedBytesInRound();
- uint32_t sent_interest = state_->getSentInterestInRound();
- uint32_t lost_data = state_->getLostData();
- uint32_t definitely_lost = state_->getDefinitelyLostPackets();
- uint32_t recovered_losses = state_->getRecoveredLosses();
- uint32_t received_nacks = state_->getReceivedNacksInRound();
- uint32_t received_fec = state_->getReceivedFecPackets();
-
- bool in_sync = (current_state_ == SyncState::in_sync);
- ldr_->onNewRound(in_sync);
- state_->onNewRound((double)ROUND_LEN, in_sync);
- rc_->onNewRound((double)ROUND_LEN);
+ uint32_t sent_retx = state->getSentRtxInRound();
+ uint32_t received_bytes =
+ (state->getReceivedBytesInRound() + // data packets received
+ state->getReceivedFecBytesInRound()); // fec packets received
+ uint32_t sent_interest = state->getSentInterestInRound();
+ uint32_t lost_data = state->getLostData();
+ uint32_t definitely_lost = state->getDefinitelyLostPackets();
+ uint32_t recovered_losses = state->getRecoveredLosses();
+ uint32_t received_nacks = state->getReceivedNacksInRound();
+ uint32_t received_fec = state->getReceivedFecPackets();
+
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
// update sync state if needed
- if (current_state_ == SyncState::in_sync) {
- double cache_rate = state_->getPacketFromCacheRatio();
+ if (ptr->current_state_ == SyncState::in_sync) {
+ double cache_rate = state->getPacketFromCacheRatio();
if (cache_rate > MAX_DATA_FROM_CACHE) {
- current_state_ = SyncState::catch_up;
+ ptr->current_state_ = SyncState::catch_up;
}
} else {
- double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double received_rate = state_->getReceivedRate();
- uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
- double cache_ratio = state_->getPacketFromCacheRatio();
+ double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate =
+ state->getReceivedRate() + state->getRecoveredFecRate();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+ double cache_ratio = state->getPacketFromCacheRatio();
if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
- current_state_ = SyncState::in_sync;
+ ptr->current_state_ = SyncState::in_sync;
}
}
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Calling updateSyncWindow in newRound function";
- updateSyncWindow();
+ ptr->updateSyncWindow();
- sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
- definitely_lost, recovered_losses, received_nacks,
- received_fec);
- newRound();
+ ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ definitely_lost, recovered_losses, received_nacks,
+ received_fec);
+ ptr->fwd_strategy_.checkStrategy();
+ ptr->newRound();
});
}
void RTCTransportProtocol::discoveredRtt() {
start_send_interest_ = true;
- ldr_->turnOnRTX();
+ uint32_t strategy;
+ socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
+ ldr_->changeRecoveryStrategy(
+ (interface::RtcTransportRecoveryStrategies)strategy);
+ ldr_->turnOnRecovery();
ldr_->onNewRound(false);
+
+ // set forwarding strategy switch if selected
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ Prefix prefix(*name, 128);
+ if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::BEST_PATH);
+ } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_REPLICATION) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::REPLICATION);
+ } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
+ interface::RtcTransportRecoveryStrategies::
+ LOW_RATE_AND_ALL_FWD_STRATEGIES) {
+ fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
+ RTCForwardingStrategy::BOTH);
+ }
+
updateSyncWindow();
}
@@ -244,7 +314,7 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
(production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) /
packet_size);
- max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow());
}
void RTCTransportProtocol::updateSyncWindow() {
@@ -259,25 +329,14 @@ void RTCTransportProtocol::updateSyncWindow() {
}
double prod_rate = state_->getProducerRate();
- double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = state_->getAveragePacketSize();
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
- double fec_interest_overhead = (double)state_->getPendingFecPackets() /
- (double)(state_->getPendingInterestNumber() -
- state_->getPendingFecPackets());
-
- double fec_overhead =
- std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead);
-
- prod_rate += (prod_rate * fec_overhead);
-
current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
uint32_t buffer = PRODUCER_BUFFER_MS;
- if (rtt > 150)
- buffer = buffer * 2; // if the RTT is too large we increase the
- // the size of the buffer
+
current_sync_win_ +=
ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
@@ -285,8 +344,17 @@ void RTCTransportProtocol::updateSyncWindow() {
current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
}
+ uint32_t min_win = WIN_MIN;
+ bool aggregated_data_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA,
+ aggregated_data_on);
+ if (aggregated_data_on) {
+ min_win = WIN_MIN_WITH_AGGREGARED_DATA;
+ min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3));
+ }
+
current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
- current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ current_sync_win_ = std::max(current_sync_win_, min_win);
}
scheduleNextInterests();
@@ -322,7 +390,7 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq;
interest_name->setSuffix(seq);
sendInterest(*interest_name);
}
@@ -330,13 +398,18 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
void RTCTransportProtocol::scheduleNextInterests() {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
- if (!isRunning() && !is_first_) return;
+ if (!isRunning() && !is_first_) {
+ return;
+ }
- if (pacing_timer_on_) return; // wait pacing timer for the next send
+ if (pacing_timer_on_) {
+ return; // wait pacing timer for the next send
+ }
- if (!start_send_interest_)
+ if (!start_send_interest_) {
return; // RTT discovering phase is not finished so
// do not start to send interests
+ }
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer.";
@@ -352,7 +425,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
&interest_name);
uint32_t next_seg = 0;
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg;
interest_name->setSuffix(next_seg);
if (portal_->interestIsPending(*interest_name)) {
@@ -370,28 +443,37 @@ void RTCTransportProtocol::scheduleNextInterests() {
<< " -- current_sync_win_: " << current_sync_win_;
uint32_t pending = state_->getPendingInterestNumber();
- if (pending >= current_sync_win_) return; // no space in the window
+ uint32_t pending_fec = state_->getPendingFecPackets();
+
+ if ((pending - pending_fec) >= current_sync_win_)
+ return; // no space in the window
- if ((current_sync_win_ - pending) < max_aggregated_interest_) {
+ // XXX double check if aggregated interests are still working here
+ if ((current_sync_win_ - (pending - pending_fec)) <
+ max_aggregated_interest_) {
if (scheduler_timer_on_) return; // timer already scheduled
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t time = now - last_interest_sent_time_;
if (time < WAIT_FOR_INTEREST_BATCH) {
uint64_t next = WAIT_FOR_INTEREST_BATCH - time;
scheduler_timer_on_ = true;
scheduler_timer_->expires_from_now(std::chrono::milliseconds(next));
- scheduler_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- if (!scheduler_timer_on_) return;
- scheduler_timer_on_ = false;
- scheduleNextInterests();
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->scheduler_timer_on_) return;
+
+ ptr->scheduler_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
});
- return; // whait for the timer
+ return; // wait for the timer
}
}
@@ -403,7 +485,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1);
}
- // skipe received packets
+ // skip received packets
if (indexer_verifier_->checkNextSuffix() <=
state_->getHighestSeqReceivedInOrder()) {
indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
@@ -417,7 +499,8 @@ void RTCTransportProtocol::scheduleNextInterests() {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes;
- while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ while (((state_->getPendingInterestNumber() -
+ state_->getPendingFecPackets()) < current_sync_win_) &&
(sent_interests < max_sent_int_)) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "In while loop. Window size: " << current_sync_win_;
@@ -428,19 +511,20 @@ void RTCTransportProtocol::scheduleNextInterests() {
// send the packet only if:
// 1) it is not pending yet (not true for rtx)
- // 2) the packet is not received or lost
+ // 2) the packet is not received or def lost
// 3) is not in the rtx list
// 4) is fec and is not in order (!= last sent + 1)
+ PacketState packet_state = state_->getPacketState(next_seg);
if (portal_->interestIsPending(*name) ||
- state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN ||
- ldr_->isRtx(next_seg) ||
+ packet_state == PacketState::RECEIVED ||
+ packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) ||
(indexer_verifier_->isFec(next_seg) &&
next_seg != last_interest_sent_seq_ + 1)) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "skip interest " << next_seg << " because: pending "
- << portal_->interestIsPending(*name) << ", recv "
- << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN)
- << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec "
+ << portal_->interestIsPending(*name) << ", recv or lost"
+ << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg))
+ << ", is old fec "
<< ((indexer_verifier_->isFec(next_seg) &&
next_seg != last_interest_sent_seq_ + 1));
continue;
@@ -462,10 +546,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
sent_packets++;
sent_interests++;
sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
- last_interest_sent_time_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
aggregated_counter = 0;
}
}
@@ -473,25 +554,29 @@ void RTCTransportProtocol::scheduleNextInterests() {
// exiting the while we may have some pending interest to send
if (aggregated_counter != 0) {
sent_packets++;
- last_interest_sent_time_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ last_interest_sent_time_ = utils::SteadyTime::nowMs().count();
sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
}
- if (state_->getPendingInterestNumber() < current_sync_win_) {
+ if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) <
+ current_sync_win_) {
// we still have space in the window but we already sent too many packets
// wait PACING_WAIT to avoid drops in the kernel
pacing_timer_on_ = true;
pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT));
- scheduler_timer_->async_wait([this](std::error_code ec) {
+
+ std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ scheduler_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- if (!pacing_timer_on_) return;
- pacing_timer_on_ = false;
- scheduleNextInterests();
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->pacing_timer_on_) return;
+
+ ptr->pacing_timer_on_ = false;
+ ptr->scheduleNextInterests();
+ }
});
}
}
@@ -500,13 +585,13 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
const Name &name) {
uint32_t segment_number = name.getSuffix();
- if (segment_number >= MIN_PROBE_SEQ) {
+ if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) {
// this is a timeout on a probe, do nothing
return;
}
- PacketState state = state_->isReceivedOrLost(segment_number);
- if (state != PacketState::UNKNOWN) {
+ PacketState state = state_->getPacketState(segment_number);
+ if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) {
// we may recover a packets using fec, ignore this timer
return;
}
@@ -524,13 +609,18 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "handle timeout for packet " << segment_number << " using rtx";
if (ldr_->isRtxOn()) {
- ldr_->onTimeout(segment_number);
- if (indexer_verifier_->isFec(segment_number))
+ if (indexer_verifier_->isFec(segment_number)) {
+ // if this is a fec packet we do not recover it with rtx so we consider
+ // the packet to be lost
+ ldr_->onTimeout(segment_number, true);
state_->onTimeout(segment_number, true);
- else
+ } else {
+ ldr_->onTimeout(segment_number, false);
state_->onTimeout(segment_number, false);
+ }
} else {
// in this case we wil never recover the timeout
+ ldr_->onTimeout(segment_number, true);
state_->onTimeout(segment_number, true);
}
scheduleNextInterests();
@@ -559,7 +649,7 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
void RTCTransportProtocol::onNack(const ContentObject &content_object) {
struct nack_packet_t *nack =
(struct nack_packet_t *)content_object.getPayload()->data();
- uint32_t production_seg = nack->getProductionSegement();
+ uint32_t production_seg = nack->getProductionSegment();
uint32_t nack_segment = content_object.getName().getSuffix();
bool is_rtx = ldr_->isRtx(nack_segment);
@@ -592,6 +682,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// remove the nack is it exists
if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+ state_->onJumpForward(production_seg);
+ verifier_->onJumpForward(production_seg);
// the client is asking for content in the past
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
@@ -618,11 +710,9 @@ void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
bool valid = state_->onProbePacketReceived(content_object);
if (!valid) return;
- struct nack_packet_t *probe =
- (struct nack_packet_t *)content_object.getPayload()->data();
- uint32_t production_seg = probe->getProductionSegement();
+ uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg;
- // as for the nacks set next_segment
+ // As for the nacks set next_segment
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "on probe next seg = " << indexer_verifier_->checkNextSuffix()
<< ", jump to " << production_seg;
@@ -636,12 +726,39 @@ void RTCTransportProtocol::onContentObjectReceived(
Interest &interest, ContentObject &content_object, std::error_code &ec) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Received content object of size: " << content_object.payloadSize();
- uint32_t payload_size = content_object.payloadSize();
+
uint32_t segment_number = content_object.getName().getSuffix();
+ PayloadType payload_type = content_object.getPayloadType();
+ PacketState state;
+
+ ContentObject *content_ptr = &content_object;
+ ContentObject::Ptr manifest_ptr = nullptr;
+
+ bool is_probe =
+ ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE;
+ bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
+ bool is_fec = indexer_verifier_->isFec(segment_number);
+ bool is_manifest =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST;
+ bool is_data =
+ !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA;
+ bool compute_stats = is_data || is_manifest;
ec = make_error_code(protocol_error::not_reassemblable);
- if (segment_number >= MIN_PROBE_SEQ) {
+ // A helper function to process manifests or data packets received
+ auto onDataPacketReceived = [this](ContentObject &content_object,
+ bool compute_stats) {
+ ldr_->onDataPacketReceived(content_object);
+ rc_->onDataPacketReceived(content_object, compute_stats);
+ updateSyncWindow();
+ };
+
+ // First verify the packet signature and apply the corresponding policy
+ auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec);
+ indexer_verifier_->applyPolicy(interest, content_object, false, policy);
+
+ if (is_probe) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -650,7 +767,7 @@ void RTCTransportProtocol::onContentObjectReceived(
return;
}
- if (payload_size == NACK_HEADER_SIZE) {
+ if (is_nack) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -659,57 +776,122 @@ void RTCTransportProtocol::onContentObjectReceived(
return;
}
+ // content_ptr will point either to the input data packet or to a manifest
+ // whose FEC header has been removed
+ if (is_manifest) {
+ manifest_ptr = removeFecHeader(content_object);
+ if (manifest_ptr) {
+ content_ptr = manifest_ptr.get();
+ }
+ }
+
+ // From there, the packet is either a FEC, a manifest or a data packet.
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number;
- bool compute_stats = true;
+ // Do not count timed out packets in stats
auto tn_it = timeouts_or_nacks_.find(segment_number);
if (tn_it != timeouts_or_nacks_.end()) {
compute_stats = false;
timeouts_or_nacks_.erase(tn_it);
}
- if (ldr_->isRtx(segment_number)) {
+
+ // Do not count retransmissions or losses in stats
+ if (ldr_->isRtx(segment_number) ||
+ ldr_->isPossibleLossWithNoRtx(segment_number)) {
compute_stats = false;
}
- // check if the packet was already received
- PacketState state = state_->isReceivedOrLost(segment_number);
+ // Fetch packet state
+ state = state_->getPacketState(segment_number);
- if (state != PacketState::RECEIVED) {
- // send packet to decoder
- if (fec_decoder_) {
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "send packet " << segment_number << " to FEC decoder";
- fec_decoder_->onDataPacket(
- content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE);
- }
- if (!indexer_verifier_->isFec(segment_number)) {
- // the packet may be alredy sent to the ap by the decoder, check again if
- // it is already received
- state = state_->isReceivedOrLost(segment_number);
- if (state != PacketState::RECEIVED) {
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number;
+ // Check if the packet is a retransmission
+ if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
+ if (is_data || is_manifest) {
+ state_->onPacketRecoveredRtx(segment_number);
- state_->onDataPacketReceived(content_object, compute_stats);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
- if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), content_object);
- }
- ec = make_error_code(protocol_error::success);
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
}
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number;
- state_->onFecPacketReceived(content_object);
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+
+ // The packet is considered received, return early
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
}
- } else {
+
+ if (is_fec) {
+ state_->onFecPacketRecoveredRtx(segment_number);
+ }
+ }
+
+ // Fetch packet state again; it may have changed
+ state = state_->getPacketState(segment_number);
+
+ // Check if the packet was already received
+ if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Received duplicated content " << segment_number << ", drop it";
ec = make_error_code(protocol_error::duplicated_content);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
}
- ldr_->onDataPacketReceived(content_object);
- rc_->onDataPacketReceived(content_object);
+ if (!is_fec) {
+ state_->dataToBeReceived(segment_number);
+ }
- updateSyncWindow();
+ // Send packet to FEC decoder
+ if (fec_decoder_) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << segment_number << " to FEC decoder";
+
+ uint32_t offset = is_manifest
+ ? content_object.headerSize()
+ : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+ }
+
+ // We can return early if FEC
+ if (is_fec) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number;
+ state_->onFecPacketReceived(content_object);
+ onDataPacketReceived(*content_ptr, compute_stats);
+ return;
+ }
+
+ // The packet may have been already sent to the app by the decoder, check
+ // again if it is already received
+ state = state_->getPacketState(
+ segment_number); // state == RECEIVED or TO_BE_RECEIVED
+
+ if (state != PacketState::RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << (is_manifest ? "Received manifest " : "Received data ")
+ << segment_number;
+
+ if (is_manifest) {
+ processManifest(interest, *content_ptr);
+ }
+
+ state_->onDataPacketReceived(*content_ptr, compute_stats);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+
+ ec = is_manifest ? make_error_code(protocol_error::not_reassemblable)
+ : make_error_code(protocol_error::success);
+ }
+
+ onDataPacketReceived(*content_ptr, compute_stats);
}
void RTCTransportProtocol::sendStatsToApp(
@@ -729,33 +911,149 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateReceivedNacks(received_nacks);
stats_->updateReceivedFEC(received_fec);
- stats_->updateAverageWindowSize(current_sync_win_);
- stats_->updateLossRatio(state_->getLossRate());
- stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
+ stats_->updateLossRatio(state_->getPerSecondLossRate());
+ uint64_t rtt = state_->getAvgRTT();
+ stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt));
+
stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
stats_->updateDefinitelyLostData(definitely_lost);
stats_->updateRecoveredData(recovered_losses);
stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
(*stats_summary_)(*socket_->getInterface(), *stats_);
+ bool in_congestion = rc_->inCongestionState();
+ stats_->updateCongestionState(in_congestion);
+ double residual_losses = state_->getResidualLossRate();
+ stats_->updateResidualLossRate(residual_losses);
+ stats_->updateQualityScore(state_->getQualityScore());
+
+ // set alerts
+ if (rtt > MAX_RTT)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY);
+
+ if (in_congestion)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION);
+ else
+ stats_->clearAlert(
+ interface::TransportStatistics::statsAlerts::CONGESTION);
+
+ if (residual_losses > MAX_RESIDUAL_LOSSES)
+ stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES);
+ else
+ stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES);
}
}
-void RTCTransportProtocol::onFecPackets(
- std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
+ Packet::Format format;
+ socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
+ format);
+
+ Name *name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+
for (auto &packet : packets) {
- PacketState state = state_->isReceivedOrLost(packet.first);
- if (state != PacketState::RECEIVED) {
- state_->onPacketRecoveredFec(packet.first);
- ldr_->onPacketRecoveredFec(packet.first);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Recovered packet " << packet.first << " through FEC.";
- reassembly_->reassemble(*packet.second, packet.first);
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Packet" << packet.first << "already received.";
+ uint32_t seq_number = packet.getIndex();
+ uint32_t metadata = packet.getMetadata();
+ fec::buffer buffer = packet.getBuffer();
+
+ PayloadType payload_type = static_cast<PayloadType>(metadata);
+ switch (payload_type) {
+ case PayloadType::DATA:
+ case PayloadType::MANIFEST:
+ break;
+ case PayloadType::UNSPECIFIED:
+ default:
+ payload_type = PayloadType::DATA;
+ break;
}
+
+ switch (state_->getPacketState(seq_number)) {
+ case PacketState::RECEIVED:
+ case PacketState::TO_BE_RECEIVED: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Packet " << seq_number << " already received";
+ break;
+ }
+ default: {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Recovered packet " << seq_number << " through FEC";
+
+ if (payload_type == PayloadType::MANIFEST) {
+ name->setSuffix(seq_number);
+
+ auto interest =
+ core::PacketManager<>::getInstance().getPacket<Interest>(format);
+ interest->setName(*name);
+
+ auto content_object = toContentObject(
+ *name, format, payload_type, buffer->data(), buffer->length());
+
+ processManifest(*interest, *content_object);
+ }
+
+ state_->onPacketRecoveredFec(seq_number, buffer->length());
+ ldr_->onPacketRecoveredFec(seq_number);
+
+ if (payload_type == PayloadType::DATA) {
+ verifier_->onDataRecoveredFec(seq_number);
+ reassembly_->reassemble(*buffer, seq_number);
+ }
+
+ break;
+ }
+ }
+ }
+}
+
+void RTCTransportProtocol::processManifest(Interest &interest,
+ ContentObject &manifest) {
+ auth::VerificationPolicy policy = verifier_->processManifest(manifest);
+ indexer_verifier_->applyPolicy(interest, manifest, false, policy);
+}
+
+ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
+ const ContentObject &content_object) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) {
+ return nullptr;
}
+
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize();
+ const uint8_t *payload =
+ content_object.data() + content_object.headerSize() + fec_header_size;
+ size_t payload_size = content_object.payloadSize() - fec_header_size;
+
+ ContentObject::Ptr co =
+ toContentObject(content_object.getName(), content_object.getFormat(),
+ content_object.getPayloadType(), payload, payload_size);
+
+ return co;
+}
+
+ContentObject::Ptr RTCTransportProtocol::toContentObject(
+ const Name &name, Packet::Format format, PayloadType payload_type,
+ const uint8_t *payload, std::size_t payload_size,
+ std::size_t additional_header_size) {
+ // Recreate ContentObject
+ ContentObject::Ptr co =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ format, additional_header_size);
+ co->updateLength(payload_size);
+ co->append(payload_size);
+ co->trimStart(co->headerSize());
+
+ // Copy payload
+ std::memcpy(co->writableData(), payload, payload_size);
+
+ // Restore network headers and some fields
+ co->prepend(co->headerSize());
+ co->setName(name);
+ co->setPayloadType(payload_type);
+
+ return co;
}
} // end namespace rtc