diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc.cc | 159 |
1 files changed, 90 insertions, 69 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index df6522471..d2682edfa 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -63,16 +63,25 @@ std::size_t RTCTransportProtocol::transportHeaderLength() { // private void RTCTransportProtocol::initParams() { TransportProtocol::reset(); - fwd_strategy_.setCallback(on_fwd_strategy_); - std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + fwd_strategy_.setCallback([self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy); + } + }); + std::shared_ptr<auth::Verifier> verifier; socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - uint32_t max_unverified_delay; - socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME, - max_unverified_delay); + uint32_t unverified_interval; + socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL, + unverified_interval); + + double unverified_ratio; + socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, + unverified_ratio); rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( @@ -84,8 +93,15 @@ void RTCTransportProtocol::initParams() { ptr->sendRtxInterest(seq); } }, - on_rec_strategy_); - verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay); + [self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy); + } + }); + + verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval, + unverified_ratio); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), @@ -102,7 +118,6 @@ void RTCTransportProtocol::initParams() { } }, portal_->getThread().getIoService()); - state_->initParams(); rc_->setState(state_); rc_->turnOnRateControl(); @@ -153,21 +168,8 @@ void RTCTransportProtocol::initParams() { socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, RTC_INTEREST_LIFETIME); - // FEC - using namespace std::placeholders; - enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1), - /* We leave the buffer allocation to the fec decoder */ - fec::FECBase::BufferRequested(0)); - - if (fec_decoder_) { - indexer_verifier_->enableFec(fec_type_); - indexer_verifier_->setNFec(0); - ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), - fec::FECUtils::getSourceSymbols(fec_type_)); - fec_decoder_->setIOService(portal_->getThread().getIoService()); - } else { - indexer_verifier_->disableFec(); - } + // init state params + state_->initParams(); } // private @@ -223,29 +225,26 @@ void RTCTransportProtocol::newRound() { uint32_t received_nacks = state->getReceivedNacksInRound(); uint32_t received_fec = state->getReceivedFecPackets(); - bool in_sync = (ptr->current_state_ == SyncState::in_sync); - ptr->ldr_->onNewRound(in_sync); - ptr->state_->onNewRound((double)ROUND_LEN, in_sync); - ptr->rc_->onNewRound((double)ROUND_LEN); - // update sync state if needed + double cache_rate = state->getPacketFromCacheRatio(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + if (ptr->current_state_ == SyncState::in_sync) { - double cache_rate = state->getPacketFromCacheRatio(); if (cache_rate > MAX_DATA_FROM_CACHE) { ptr->current_state_ = SyncState::catch_up; } } else { - double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION; - double received_rate = - state->getReceivedRate() + state->getRecoveredFecRate(); - uint32_t round_without_nacks = state->getRoundsWithoutNacks(); - double cache_ratio = state->getPacketFromCacheRatio(); if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && - received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { + cache_rate < MAX_DATA_FROM_CACHE) { ptr->current_state_ = SyncState::in_sync; } } + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Calling updateSyncWindow in newRound function"; ptr->updateSyncWindow(); @@ -335,7 +334,7 @@ void RTCTransportProtocol::updateSyncWindow() { // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); - uint32_t buffer = PRODUCER_BUFFER_MS; + uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0); current_sync_win_ += ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); @@ -360,15 +359,6 @@ void RTCTransportProtocol::updateSyncWindow() { scheduleNextInterests(); } -void RTCTransportProtocol::decreaseSyncWindow() { - // called on future nack - // we have a new sample of the production rate, so update max win first - computeMaxSyncWindow(); - current_sync_win_--; - current_sync_win_ = std::max(current_sync_win_, WIN_MIN); - scheduleNextInterests(); -} - void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { if (!isRunning() && !is_first_) return; @@ -468,7 +458,6 @@ void RTCTransportProtocol::scheduleNextInterests() { auto ptr = self.lock(); if (ptr && ptr->isRunning()) { if (!ptr->scheduler_timer_on_) return; - ptr->scheduler_timer_on_ = false; ptr->scheduleNextInterests(); } @@ -688,8 +677,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // switch to catch up state and increase the window // this is true only if the packet is not an RTX if (!is_rtx) current_state_ = SyncState::catch_up; - - updateSyncWindow(); } else { // if production_seg == nack_segment we consider this a future nack, since // production_seg is not yet created. this may happen in case of low @@ -702,23 +689,50 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // the client is asking for content in the future // switch to in sync state and decrease the window current_state_ = SyncState::in_sync; - decreaseSyncWindow(); } + updateSyncWindow(); } void RTCTransportProtocol::onProbe(const ContentObject &content_object) { - bool valid = state_->onProbePacketReceived(content_object); - if (!valid) return; + uint32_t suffix = content_object.getName().getSuffix(); + ParamsRTC params = RTCState::getProbeParams(content_object); + + if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) { + fec::FECType fec_type = params.fec_type; + + if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) { + // Update FEC type + fec_type_ = fec_type; + + // Enable FEC + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, + std::placeholders::_1), + fec::FECBase::BufferRequested(0)); + + // Update FEC parameters + indexer_verifier_->enableFec(fec_type); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type), + fec::FECUtils::getSourceSymbols(fec_type)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); + } else if (fec_type == fec::FECType::UNKNOWN) { + indexer_verifier_->disableFec(); + } + } - uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg; + if (!state_->onProbePacketReceived(content_object)) return; - // As for the nacks set next_segment + // As for NACKs, set next_segment DLOG_IF(INFO, VLOG_IS_ON(3)) << "on probe next seg = " << indexer_verifier_->checkNextSuffix() - << ", jump to " << production_seg; - indexer_verifier_->jumpToIndex(production_seg); - - ldr_->onProbePacketReceived(content_object); + << ", jump to " << params.prod_seg; + indexer_verifier_->jumpToIndex(params.prod_seg); + + bool loss_detected = ldr_->onProbePacketReceived(content_object); + // we are not out of sync here but we are starting to download content from + // the cache, maybe beacuse the production rate increased suddenly. for this + // reason we put the state to catch up to increase the window + if (loss_detected) current_state_ = SyncState::catch_up; updateSyncWindow(); } @@ -822,6 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived( // The packet is considered received, return early onDataPacketReceived(*content_ptr, compute_stats); + // this is a rtx but we may need to feed it in the decoder + decodePacket(content_object, is_manifest); return; } @@ -846,18 +862,8 @@ void RTCTransportProtocol::onContentObjectReceived( state_->dataToBeReceived(segment_number); } - // Send packet to FEC decoder - if (fec_decoder_) { - DLOG_IF(INFO, VLOG_IS_ON(4)) - << "Send packet " << segment_number << " to FEC decoder"; - - uint32_t offset = is_manifest - ? content_object.headerSize() - : content_object.headerSize() + rtc::DATA_HEADER_SIZE; - uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); - - fec_decoder_->onDataPacket(content_object, offset, metadata); - } + // send packet to the decoder + decodePacket(content_object, is_manifest); // We can return early if FEC if (is_fec) { @@ -947,6 +953,21 @@ void RTCTransportProtocol::sendStatsToApp( } } +void RTCTransportProtocol::decodePacket(ContentObject &content_object, + bool is_manifest) { + if (!fec_decoder_) return; + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << content_object.getName() << " to FEC decoder"; + + uint32_t offset = is_manifest + ? content_object.headerSize() + : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); +} + void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { Packet::Format format; socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, |