aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc159
1 files changed, 90 insertions, 69 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index df6522471..d2682edfa 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -63,16 +63,25 @@ std::size_t RTCTransportProtocol::transportHeaderLength() {
// private
void RTCTransportProtocol::initParams() {
TransportProtocol::reset();
- fwd_strategy_.setCallback(on_fwd_strategy_);
-
std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ fwd_strategy_.setCallback([self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy);
+ }
+ });
+
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t max_unverified_delay;
- socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME,
- max_unverified_delay);
+ uint32_t unverified_interval;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
+ unverified_interval);
+
+ double unverified_ratio;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
+ unverified_ratio);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -84,8 +93,15 @@ void RTCTransportProtocol::initParams() {
ptr->sendRtxInterest(seq);
}
},
- on_rec_strategy_);
- verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay);
+ [self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy);
+ }
+ });
+
+ verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
+ unverified_ratio);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -102,7 +118,6 @@ void RTCTransportProtocol::initParams() {
}
},
portal_->getThread().getIoService());
- state_->initParams();
rc_->setState(state_);
rc_->turnOnRateControl();
@@ -153,21 +168,8 @@ void RTCTransportProtocol::initParams() {
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
RTC_INTEREST_LIFETIME);
- // FEC
- using namespace std::placeholders;
- enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1),
- /* We leave the buffer allocation to the fec decoder */
- fec::FECBase::BufferRequested(0));
-
- if (fec_decoder_) {
- indexer_verifier_->enableFec(fec_type_);
- indexer_verifier_->setNFec(0);
- ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
- fec::FECUtils::getSourceSymbols(fec_type_));
- fec_decoder_->setIOService(portal_->getThread().getIoService());
- } else {
- indexer_verifier_->disableFec();
- }
+ // init state params
+ state_->initParams();
}
// private
@@ -223,29 +225,26 @@ void RTCTransportProtocol::newRound() {
uint32_t received_nacks = state->getReceivedNacksInRound();
uint32_t received_fec = state->getReceivedFecPackets();
- bool in_sync = (ptr->current_state_ == SyncState::in_sync);
- ptr->ldr_->onNewRound(in_sync);
- ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
- ptr->rc_->onNewRound((double)ROUND_LEN);
-
// update sync state if needed
+ double cache_rate = state->getPacketFromCacheRatio();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+
if (ptr->current_state_ == SyncState::in_sync) {
- double cache_rate = state->getPacketFromCacheRatio();
if (cache_rate > MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::catch_up;
}
} else {
- double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double received_rate =
- state->getReceivedRate() + state->getRecoveredFecRate();
- uint32_t round_without_nacks = state->getRoundsWithoutNacks();
- double cache_ratio = state->getPacketFromCacheRatio();
if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
- received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ cache_rate < MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::in_sync;
}
}
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
+
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Calling updateSyncWindow in newRound function";
ptr->updateSyncWindow();
@@ -335,7 +334,7 @@ void RTCTransportProtocol::updateSyncWindow() {
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- uint32_t buffer = PRODUCER_BUFFER_MS;
+ uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0);
current_sync_win_ +=
ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
@@ -360,15 +359,6 @@ void RTCTransportProtocol::updateSyncWindow() {
scheduleNextInterests();
}
-void RTCTransportProtocol::decreaseSyncWindow() {
- // called on future nack
- // we have a new sample of the production rate, so update max win first
- computeMaxSyncWindow();
- current_sync_win_--;
- current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
- scheduleNextInterests();
-}
-
void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
if (!isRunning() && !is_first_) return;
@@ -468,7 +458,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
auto ptr = self.lock();
if (ptr && ptr->isRunning()) {
if (!ptr->scheduler_timer_on_) return;
-
ptr->scheduler_timer_on_ = false;
ptr->scheduleNextInterests();
}
@@ -688,8 +677,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
if (!is_rtx) current_state_ = SyncState::catch_up;
-
- updateSyncWindow();
} else {
// if production_seg == nack_segment we consider this a future nack, since
// production_seg is not yet created. this may happen in case of low
@@ -702,23 +689,50 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// the client is asking for content in the future
// switch to in sync state and decrease the window
current_state_ = SyncState::in_sync;
- decreaseSyncWindow();
}
+ updateSyncWindow();
}
void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
- bool valid = state_->onProbePacketReceived(content_object);
- if (!valid) return;
+ uint32_t suffix = content_object.getName().getSuffix();
+ ParamsRTC params = RTCState::getProbeParams(content_object);
+
+ if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) {
+ fec::FECType fec_type = params.fec_type;
+
+ if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) {
+ // Update FEC type
+ fec_type_ = fec_type;
+
+ // Enable FEC
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this,
+ std::placeholders::_1),
+ fec::FECBase::BufferRequested(0));
+
+ // Update FEC parameters
+ indexer_verifier_->enableFec(fec_type);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type),
+ fec::FECUtils::getSourceSymbols(fec_type));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
+ } else if (fec_type == fec::FECType::UNKNOWN) {
+ indexer_verifier_->disableFec();
+ }
+ }
- uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg;
+ if (!state_->onProbePacketReceived(content_object)) return;
- // As for the nacks set next_segment
+ // As for NACKs, set next_segment
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "on probe next seg = " << indexer_verifier_->checkNextSuffix()
- << ", jump to " << production_seg;
- indexer_verifier_->jumpToIndex(production_seg);
-
- ldr_->onProbePacketReceived(content_object);
+ << ", jump to " << params.prod_seg;
+ indexer_verifier_->jumpToIndex(params.prod_seg);
+
+ bool loss_detected = ldr_->onProbePacketReceived(content_object);
+ // we are not out of sync here but we are starting to download content from
+ // the cache, maybe beacuse the production rate increased suddenly. for this
+ // reason we put the state to catch up to increase the window
+ if (loss_detected) current_state_ = SyncState::catch_up;
updateSyncWindow();
}
@@ -822,6 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// The packet is considered received, return early
onDataPacketReceived(*content_ptr, compute_stats);
+ // this is a rtx but we may need to feed it in the decoder
+ decodePacket(content_object, is_manifest);
return;
}
@@ -846,18 +862,8 @@ void RTCTransportProtocol::onContentObjectReceived(
state_->dataToBeReceived(segment_number);
}
- // Send packet to FEC decoder
- if (fec_decoder_) {
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "Send packet " << segment_number << " to FEC decoder";
-
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
- uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
-
- fec_decoder_->onDataPacket(content_object, offset, metadata);
- }
+ // send packet to the decoder
+ decodePacket(content_object, is_manifest);
// We can return early if FEC
if (is_fec) {
@@ -947,6 +953,21 @@ void RTCTransportProtocol::sendStatsToApp(
}
}
+void RTCTransportProtocol::decodePacket(ContentObject &content_object,
+ bool is_manifest) {
+ if (!fec_decoder_) return;
+
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << content_object.getName() << " to FEC decoder";
+
+ uint32_t offset = is_manifest
+ ? content_object.headerSize()
+ : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+}
+
void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
Packet::Format format;
socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,