diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc.cc | 616 |
1 files changed, 457 insertions, 159 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index 0cb4cda1d..df6522471 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -22,7 +22,7 @@ #include <protocols/rtc/rtc.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_indexer.h> -#include <protocols/rtc/rtc_rc_queue.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> #include <algorithm> @@ -37,13 +37,15 @@ using namespace interface; RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), - new DatagramReassembly(icn_socket, this)), + new RtcReassembly(icn_socket, this)), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); - round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); scheduler_timer_ = - std::make_unique<asio::steady_timer>(portal_->getIoService()); - pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); } RTCTransportProtocol::~RTCTransportProtocol() {} @@ -61,25 +63,52 @@ std::size_t RTCTransportProtocol::transportHeaderLength() { // private void RTCTransportProtocol::initParams() { TransportProtocol::reset(); + fwd_strategy_.setCallback(on_fwd_strategy_); - rc_ = std::make_shared<RTCRateControlQueue>(); + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + uint32_t max_unverified_delay; + socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME, + max_unverified_delay); + + rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( - indexer_verifier_.get(), - std::bind(&RTCTransportProtocol::sendRtxInterest, this, - std::placeholders::_1), - portal_->getIoService()); + indexer_verifier_.get(), portal_->getThread().getIoService(), + interface::RtcTransportRecoveryStrategies::RTX_ONLY, + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendRtxInterest(seq); + } + }, + on_rec_strategy_); + verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), - std::bind(&RTCTransportProtocol::sendProbeInterest, this, - std::placeholders::_1), - std::bind(&RTCTransportProtocol::discoveredRtt, this), - portal_->getIoService()); + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendProbeInterest(seq); + } + }, + [self]() { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->discoveredRtt(); + } + }, + portal_->getThread().getIoService()); + state_->initParams(); rc_->setState(state_); - // TODO: for the moment we keep the congestion control disabled - // rc_->tunrOnRateControl(); - ldr_->setState(state_); + rc_->turnOnRateControl(); + ldr_->setState(state_.get()); + ldr_->setRateControl(rc_.get()); + verifier_->setState(state_); // protocol state start_send_interest_ = false; @@ -102,6 +131,10 @@ void RTCTransportProtocol::initParams() { } #else max_aggregated_interest_ = 1; + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) { + LOG(INFO) << "Max Aggregated: " << max_aggr; + max_aggregated_interest_ = std::stoul(std::string(max_aggr)); + } #endif max_sent_int_ = @@ -131,6 +164,7 @@ void RTCTransportProtocol::initParams() { indexer_verifier_->setNFec(0); ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), fec::FECUtils::getSourceSymbols(fec_type_)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); } else { indexer_verifier_->disableFec(); } @@ -162,61 +196,97 @@ void RTCTransportProtocol::inactiveProducer() { void RTCTransportProtocol::newRound() { round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); - // TODO pass weak_ptr here - round_timer_->async_wait([this, n{number_}](std::error_code ec) { - if (ec) return; - if (n != number_) { + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { + if (ec) { return; } + auto ptr = self.lock(); + + if (!ptr || !ptr->isRunning()) { + return; + } + + auto &state = ptr->state_; + // saving counters that will be reset on new round - uint32_t sent_retx = state_->getSentRtxInRound(); - uint32_t received_bytes = state_->getReceivedBytesInRound(); - uint32_t sent_interest = state_->getSentInterestInRound(); - uint32_t lost_data = state_->getLostData(); - uint32_t definitely_lost = state_->getDefinitelyLostPackets(); - uint32_t recovered_losses = state_->getRecoveredLosses(); - uint32_t received_nacks = state_->getReceivedNacksInRound(); - uint32_t received_fec = state_->getReceivedFecPackets(); - - bool in_sync = (current_state_ == SyncState::in_sync); - ldr_->onNewRound(in_sync); - state_->onNewRound((double)ROUND_LEN, in_sync); - rc_->onNewRound((double)ROUND_LEN); + uint32_t sent_retx = state->getSentRtxInRound(); + uint32_t received_bytes = + (state->getReceivedBytesInRound() + // data packets received + state->getReceivedFecBytesInRound()); // fec packets received + uint32_t sent_interest = state->getSentInterestInRound(); + uint32_t lost_data = state->getLostData(); + uint32_t definitely_lost = state->getDefinitelyLostPackets(); + uint32_t recovered_losses = state->getRecoveredLosses(); + uint32_t received_nacks = state->getReceivedNacksInRound(); + uint32_t received_fec = state->getReceivedFecPackets(); + + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); // update sync state if needed - if (current_state_ == SyncState::in_sync) { - double cache_rate = state_->getPacketFromCacheRatio(); + if (ptr->current_state_ == SyncState::in_sync) { + double cache_rate = state->getPacketFromCacheRatio(); if (cache_rate > MAX_DATA_FROM_CACHE) { - current_state_ = SyncState::catch_up; + ptr->current_state_ = SyncState::catch_up; } } else { - double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION; - double received_rate = state_->getReceivedRate(); - uint32_t round_without_nacks = state_->getRoundsWithoutNacks(); - double cache_ratio = state_->getPacketFromCacheRatio(); + double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION; + double received_rate = + state->getReceivedRate() + state->getRecoveredFecRate(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + double cache_ratio = state->getPacketFromCacheRatio(); if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { - current_state_ = SyncState::in_sync; + ptr->current_state_ = SyncState::in_sync; } } DLOG_IF(INFO, VLOG_IS_ON(3)) << "Calling updateSyncWindow in newRound function"; - updateSyncWindow(); + ptr->updateSyncWindow(); - sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, - definitely_lost, recovered_losses, received_nacks, - received_fec); - newRound(); + ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + definitely_lost, recovered_losses, received_nacks, + received_fec); + ptr->fwd_strategy_.checkStrategy(); + ptr->newRound(); }); } void RTCTransportProtocol::discoveredRtt() { start_send_interest_ = true; - ldr_->turnOnRTX(); + uint32_t strategy; + socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); + ldr_->changeRecoveryStrategy( + (interface::RtcTransportRecoveryStrategies)strategy); + ldr_->turnOnRecovery(); ldr_->onNewRound(false); + + // set forwarding strategy switch if selected + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + Prefix prefix(*name, 128); + if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::BEST_PATH); + } else if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::REPLICATION); + } else if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::BOTH); + } + updateSyncWindow(); } @@ -244,7 +314,7 @@ void RTCTransportProtocol::computeMaxSyncWindow() { (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); - max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); + max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow()); } void RTCTransportProtocol::updateSyncWindow() { @@ -259,25 +329,14 @@ void RTCTransportProtocol::updateSyncWindow() { } double prod_rate = state_->getProducerRate(); - double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC; + double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = state_->getAveragePacketSize(); // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { - double fec_interest_overhead = (double)state_->getPendingFecPackets() / - (double)(state_->getPendingInterestNumber() - - state_->getPendingFecPackets()); - - double fec_overhead = - std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead); - - prod_rate += (prod_rate * fec_overhead); - current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); uint32_t buffer = PRODUCER_BUFFER_MS; - if (rtt > 150) - buffer = buffer * 2; // if the RTT is too large we increase the - // the size of the buffer + current_sync_win_ += ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); @@ -285,8 +344,17 @@ void RTCTransportProtocol::updateSyncWindow() { current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; } + uint32_t min_win = WIN_MIN; + bool aggregated_data_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA, + aggregated_data_on); + if (aggregated_data_on) { + min_win = WIN_MIN_WITH_AGGREGARED_DATA; + min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3)); + } + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); - current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + current_sync_win_ = std::max(current_sync_win_, min_win); } scheduleNextInterests(); @@ -322,7 +390,7 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq; interest_name->setSuffix(seq); sendInterest(*interest_name); } @@ -330,13 +398,18 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { void RTCTransportProtocol::scheduleNextInterests() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; - if (!isRunning() && !is_first_) return; + if (!isRunning() && !is_first_) { + return; + } - if (pacing_timer_on_) return; // wait pacing timer for the next send + if (pacing_timer_on_) { + return; // wait pacing timer for the next send + } - if (!start_send_interest_) + if (!start_send_interest_) { return; // RTT discovering phase is not finished so // do not start to send interests + } if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; @@ -352,7 +425,7 @@ void RTCTransportProtocol::scheduleNextInterests() { &interest_name); uint32_t next_seg = 0; - DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg; interest_name->setSuffix(next_seg); if (portal_->interestIsPending(*interest_name)) { @@ -370,28 +443,37 @@ void RTCTransportProtocol::scheduleNextInterests() { << " -- current_sync_win_: " << current_sync_win_; uint32_t pending = state_->getPendingInterestNumber(); - if (pending >= current_sync_win_) return; // no space in the window + uint32_t pending_fec = state_->getPendingFecPackets(); + + if ((pending - pending_fec) >= current_sync_win_) + return; // no space in the window - if ((current_sync_win_ - pending) < max_aggregated_interest_) { + // XXX double check if aggregated interests are still working here + if ((current_sync_win_ - (pending - pending_fec)) < + max_aggregated_interest_) { if (scheduler_timer_on_) return; // timer already scheduled - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t time = now - last_interest_sent_time_; if (time < WAIT_FOR_INTEREST_BATCH) { uint64_t next = WAIT_FOR_INTEREST_BATCH - time; scheduler_timer_on_ = true; scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); - scheduler_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - if (!scheduler_timer_on_) return; - scheduler_timer_on_ = false; - scheduleNextInterests(); + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->scheduler_timer_on_) return; + + ptr->scheduler_timer_on_ = false; + ptr->scheduleNextInterests(); + } }); - return; // whait for the timer + return; // wait for the timer } } @@ -403,7 +485,7 @@ void RTCTransportProtocol::scheduleNextInterests() { indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); } - // skipe received packets + // skip received packets if (indexer_verifier_->checkNextSuffix() <= state_->getHighestSeqReceivedInOrder()) { indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); @@ -417,7 +499,8 @@ void RTCTransportProtocol::scheduleNextInterests() { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; - while ((state_->getPendingInterestNumber() < current_sync_win_) && + while (((state_->getPendingInterestNumber() - + state_->getPendingFecPackets()) < current_sync_win_) && (sent_interests < max_sent_int_)) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "In while loop. Window size: " << current_sync_win_; @@ -428,19 +511,20 @@ void RTCTransportProtocol::scheduleNextInterests() { // send the packet only if: // 1) it is not pending yet (not true for rtx) - // 2) the packet is not received or lost + // 2) the packet is not received or def lost // 3) is not in the rtx list // 4) is fec and is not in order (!= last sent + 1) + PacketState packet_state = state_->getPacketState(next_seg); if (portal_->interestIsPending(*name) || - state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN || - ldr_->isRtx(next_seg) || + packet_state == PacketState::RECEIVED || + packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) || (indexer_verifier_->isFec(next_seg) && next_seg != last_interest_sent_seq_ + 1)) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "skip interest " << next_seg << " because: pending " - << portal_->interestIsPending(*name) << ", recv " - << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN) - << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec " + << portal_->interestIsPending(*name) << ", recv or lost" + << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg)) + << ", is old fec " << ((indexer_verifier_->isFec(next_seg) && next_seg != last_interest_sent_seq_ + 1)); continue; @@ -462,10 +546,7 @@ void RTCTransportProtocol::scheduleNextInterests() { sent_packets++; sent_interests++; sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); - last_interest_sent_time_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); aggregated_counter = 0; } } @@ -473,25 +554,29 @@ void RTCTransportProtocol::scheduleNextInterests() { // exiting the while we may have some pending interest to send if (aggregated_counter != 0) { sent_packets++; - last_interest_sent_time_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); } - if (state_->getPendingInterestNumber() < current_sync_win_) { + if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) < + current_sync_win_) { // we still have space in the window but we already sent too many packets // wait PACING_WAIT to avoid drops in the kernel pacing_timer_on_ = true; pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); - scheduler_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - if (!pacing_timer_on_) return; - pacing_timer_on_ = false; - scheduleNextInterests(); + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->pacing_timer_on_) return; + + ptr->pacing_timer_on_ = false; + ptr->scheduleNextInterests(); + } }); } } @@ -500,13 +585,13 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, const Name &name) { uint32_t segment_number = name.getSuffix(); - if (segment_number >= MIN_PROBE_SEQ) { + if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) { // this is a timeout on a probe, do nothing return; } - PacketState state = state_->isReceivedOrLost(segment_number); - if (state != PacketState::UNKNOWN) { + PacketState state = state_->getPacketState(segment_number); + if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) { // we may recover a packets using fec, ignore this timer return; } @@ -524,13 +609,18 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number << " using rtx"; if (ldr_->isRtxOn()) { - ldr_->onTimeout(segment_number); - if (indexer_verifier_->isFec(segment_number)) + if (indexer_verifier_->isFec(segment_number)) { + // if this is a fec packet we do not recover it with rtx so we consider + // the packet to be lost + ldr_->onTimeout(segment_number, true); state_->onTimeout(segment_number, true); - else + } else { + ldr_->onTimeout(segment_number, false); state_->onTimeout(segment_number, false); + } } else { // in this case we wil never recover the timeout + ldr_->onTimeout(segment_number, true); state_->onTimeout(segment_number, true); } scheduleNextInterests(); @@ -559,7 +649,7 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, void RTCTransportProtocol::onNack(const ContentObject &content_object) { struct nack_packet_t *nack = (struct nack_packet_t *)content_object.getPayload()->data(); - uint32_t production_seg = nack->getProductionSegement(); + uint32_t production_seg = nack->getProductionSegment(); uint32_t nack_segment = content_object.getName().getSuffix(); bool is_rtx = ldr_->isRtx(nack_segment); @@ -592,6 +682,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // remove the nack is it exists if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + state_->onJumpForward(production_seg); + verifier_->onJumpForward(production_seg); // the client is asking for content in the past // switch to catch up state and increase the window // this is true only if the packet is not an RTX @@ -618,11 +710,9 @@ void RTCTransportProtocol::onProbe(const ContentObject &content_object) { bool valid = state_->onProbePacketReceived(content_object); if (!valid) return; - struct nack_packet_t *probe = - (struct nack_packet_t *)content_object.getPayload()->data(); - uint32_t production_seg = probe->getProductionSegement(); + uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg; - // as for the nacks set next_segment + // As for the nacks set next_segment DLOG_IF(INFO, VLOG_IS_ON(3)) << "on probe next seg = " << indexer_verifier_->checkNextSuffix() << ", jump to " << production_seg; @@ -636,12 +726,39 @@ void RTCTransportProtocol::onContentObjectReceived( Interest &interest, ContentObject &content_object, std::error_code &ec) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content object of size: " << content_object.payloadSize(); - uint32_t payload_size = content_object.payloadSize(); + uint32_t segment_number = content_object.getName().getSuffix(); + PayloadType payload_type = content_object.getPayloadType(); + PacketState state; + + ContentObject *content_ptr = &content_object; + ContentObject::Ptr manifest_ptr = nullptr; + + bool is_probe = + ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_fec = indexer_verifier_->isFec(segment_number); + bool is_manifest = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST; + bool is_data = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA; + bool compute_stats = is_data || is_manifest; ec = make_error_code(protocol_error::not_reassemblable); - if (segment_number >= MIN_PROBE_SEQ) { + // A helper function to process manifests or data packets received + auto onDataPacketReceived = [this](ContentObject &content_object, + bool compute_stats) { + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object, compute_stats); + updateSyncWindow(); + }; + + // First verify the packet signature and apply the corresponding policy + auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec); + indexer_verifier_->applyPolicy(interest, content_object, false, policy); + + if (is_probe) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -650,7 +767,7 @@ void RTCTransportProtocol::onContentObjectReceived( return; } - if (payload_size == NACK_HEADER_SIZE) { + if (is_nack) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -659,57 +776,122 @@ void RTCTransportProtocol::onContentObjectReceived( return; } + // content_ptr will point either to the input data packet or to a manifest + // whose FEC header has been removed + if (is_manifest) { + manifest_ptr = removeFecHeader(content_object); + if (manifest_ptr) { + content_ptr = manifest_ptr.get(); + } + } + + // From there, the packet is either a FEC, a manifest or a data packet. DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; - bool compute_stats = true; + // Do not count timed out packets in stats auto tn_it = timeouts_or_nacks_.find(segment_number); if (tn_it != timeouts_or_nacks_.end()) { compute_stats = false; timeouts_or_nacks_.erase(tn_it); } - if (ldr_->isRtx(segment_number)) { + + // Do not count retransmissions or losses in stats + if (ldr_->isRtx(segment_number) || + ldr_->isPossibleLossWithNoRtx(segment_number)) { compute_stats = false; } - // check if the packet was already received - PacketState state = state_->isReceivedOrLost(segment_number); + // Fetch packet state + state = state_->getPacketState(segment_number); - if (state != PacketState::RECEIVED) { - // send packet to decoder - if (fec_decoder_) { - DLOG_IF(INFO, VLOG_IS_ON(4)) - << "send packet " << segment_number << " to FEC decoder"; - fec_decoder_->onDataPacket( - content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE); - } - if (!indexer_verifier_->isFec(segment_number)) { - // the packet may be alredy sent to the ap by the decoder, check again if - // it is already received - state = state_->isReceivedOrLost(segment_number); - if (state != PacketState::RECEIVED) { - DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number; + // Check if the packet is a retransmission + if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { + if (is_data || is_manifest) { + state_->onPacketRecoveredRtx(segment_number); - state_->onDataPacketReceived(content_object, compute_stats); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } - if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), content_object); - } - ec = make_error_code(protocol_error::success); + if (is_manifest) { + processManifest(interest, *content_ptr); } - } else { - DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number; - state_->onFecPacketReceived(content_object); + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + + // The packet is considered received, return early + onDataPacketReceived(*content_ptr, compute_stats); + return; } - } else { + + if (is_fec) { + state_->onFecPacketRecoveredRtx(segment_number); + } + } + + // Fetch packet state again; it may have changed + state = state_->getPacketState(segment_number); + + // Check if the packet was already received + if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received duplicated content " << segment_number << ", drop it"; ec = make_error_code(protocol_error::duplicated_content); + onDataPacketReceived(*content_ptr, compute_stats); + return; } - ldr_->onDataPacketReceived(content_object); - rc_->onDataPacketReceived(content_object); + if (!is_fec) { + state_->dataToBeReceived(segment_number); + } - updateSyncWindow(); + // Send packet to FEC decoder + if (fec_decoder_) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << segment_number << " to FEC decoder"; + + uint32_t offset = is_manifest + ? content_object.headerSize() + : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); + } + + // We can return early if FEC + if (is_fec) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number; + state_->onFecPacketReceived(content_object); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + // The packet may have been already sent to the app by the decoder, check + // again if it is already received + state = state_->getPacketState( + segment_number); // state == RECEIVED or TO_BE_RECEIVED + + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << (is_manifest ? "Received manifest " : "Received data ") + << segment_number; + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + state_->onDataPacketReceived(*content_ptr, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + } + + onDataPacketReceived(*content_ptr, compute_stats); } void RTCTransportProtocol::sendStatsToApp( @@ -729,33 +911,149 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateReceivedNacks(received_nacks); stats_->updateReceivedFEC(received_fec); - stats_->updateAverageWindowSize(current_sync_win_); - stats_->updateLossRatio(state_->getLossRate()); - stats_->updateAverageRtt(state_->getRTT()); + stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); + stats_->updateLossRatio(state_->getPerSecondLossRate()); + uint64_t rtt = state_->getAvgRTT(); + stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt)); + stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); stats_->updateDefinitelyLostData(definitely_lost); stats_->updateRecoveredData(recovered_losses); stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); (*stats_summary_)(*socket_->getInterface(), *stats_); + bool in_congestion = rc_->inCongestionState(); + stats_->updateCongestionState(in_congestion); + double residual_losses = state_->getResidualLossRate(); + stats_->updateResidualLossRate(residual_losses); + stats_->updateQualityScore(state_->getQualityScore()); + + // set alerts + if (rtt > MAX_RTT) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY); + + if (in_congestion) + stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION); + else + stats_->clearAlert( + interface::TransportStatistics::statsAlerts::CONGESTION); + + if (residual_losses > MAX_RESIDUAL_LOSSES) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES); } } -void RTCTransportProtocol::onFecPackets( - std::vector<std::pair<uint32_t, fec::buffer>> &packets) { +void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + for (auto &packet : packets) { - PacketState state = state_->isReceivedOrLost(packet.first); - if (state != PacketState::RECEIVED) { - state_->onPacketRecoveredFec(packet.first); - ldr_->onPacketRecoveredFec(packet.first); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Recovered packet " << packet.first << " through FEC."; - reassembly_->reassemble(*packet.second, packet.first); - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Packet" << packet.first << "already received."; + uint32_t seq_number = packet.getIndex(); + uint32_t metadata = packet.getMetadata(); + fec::buffer buffer = packet.getBuffer(); + + PayloadType payload_type = static_cast<PayloadType>(metadata); + switch (payload_type) { + case PayloadType::DATA: + case PayloadType::MANIFEST: + break; + case PayloadType::UNSPECIFIED: + default: + payload_type = PayloadType::DATA; + break; } + + switch (state_->getPacketState(seq_number)) { + case PacketState::RECEIVED: + case PacketState::TO_BE_RECEIVED: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet " << seq_number << " already received"; + break; + } + default: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << seq_number << " through FEC"; + + if (payload_type == PayloadType::MANIFEST) { + name->setSuffix(seq_number); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); + interest->setName(*name); + + auto content_object = toContentObject( + *name, format, payload_type, buffer->data(), buffer->length()); + + processManifest(*interest, *content_object); + } + + state_->onPacketRecoveredFec(seq_number, buffer->length()); + ldr_->onPacketRecoveredFec(seq_number); + + if (payload_type == PayloadType::DATA) { + verifier_->onDataRecoveredFec(seq_number); + reassembly_->reassemble(*buffer, seq_number); + } + + break; + } + } + } +} + +void RTCTransportProtocol::processManifest(Interest &interest, + ContentObject &manifest) { + auth::VerificationPolicy policy = verifier_->processManifest(manifest); + indexer_verifier_->applyPolicy(interest, manifest, false, policy); +} + +ContentObject::Ptr RTCTransportProtocol::removeFecHeader( + const ContentObject &content_object) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) { + return nullptr; } + + size_t fec_header_size = fec_decoder_->getFecHeaderSize(); + const uint8_t *payload = + content_object.data() + content_object.headerSize() + fec_header_size; + size_t payload_size = content_object.payloadSize() - fec_header_size; + + ContentObject::Ptr co = + toContentObject(content_object.getName(), content_object.getFormat(), + content_object.getPayloadType(), payload, payload_size); + + return co; +} + +ContentObject::Ptr RTCTransportProtocol::toContentObject( + const Name &name, Packet::Format format, PayloadType payload_type, + const uint8_t *payload, std::size_t payload_size, + std::size_t additional_header_size) { + // Recreate ContentObject + ContentObject::Ptr co = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + format, additional_header_size); + co->updateLength(payload_size); + co->append(payload_size); + co->trimStart(co->headerSize()); + + // Copy payload + std::memcpy(co->writableData(), payload, payload_size); + + // Restore network headers and some fields + co->prepend(co->headerSize()); + co->setName(name); + co->setPayloadType(payload_type); + + return co; } } // end namespace rtc |