diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc.cc | 1101 |
1 files changed, 1101 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc new file mode 100644 index 000000000..9a56269f3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -0,0 +1,1101 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <math.h> +#include <protocols/errors.h> +#include <protocols/incremental_indexer_bytestream.h> +#include <protocols/rtc/rtc.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_indexer.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), + new RtcReassembly(icn_socket, this)), + max_aggregated_interest_(1), + number_(0) { + icn_socket->getSocketOption(PORTAL, portal_); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + scheduler_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); +} + +RTCTransportProtocol::~RTCTransportProtocol() {} + +void RTCTransportProtocol::resume() { + newRound(); + TransportProtocol::resume(); +} + +std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) { + return DATA_HEADER_SIZE + + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0); +} + +// private +void RTCTransportProtocol::initParams() { + TransportProtocol::reset(); + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + + fwd_strategy_.setCallback([self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy); + } + }); + + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + uint32_t factor_relevant; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + factor_relevant); + + uint32_t factor_alert; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + factor_alert); + + rc_ = std::make_shared<RTCRateControlCongestionDetection>(); + ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( + indexer_verifier_.get(), portal_->getThread().getIoService(), + interface::RtcTransportRecoveryStrategies::RTX_ONLY, + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendRtxInterest(seq); + } + }, + [self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy); + } + }); + + verifier_ = + std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert); + + state_ = std::make_shared<RTCState>( + indexer_verifier_.get(), + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendProbeInterest(seq); + } + }, + [self]() { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->discoveredRtt(); + } + }, + portal_->getThread().getIoService()); + + rc_->setState(state_); + rc_->turnOnRateControl(); + ldr_->setState(state_.get()); + ldr_->setRateControl(rc_.get()); + verifier_->setState(state_); + + // protocol state + start_send_interest_ = false; + current_state_ = SyncState::catch_up; + + // Cancel timer + number_++; + round_timer_->cancel(); + + scheduler_timer_->cancel(); + scheduler_timer_on_ = false; + last_interest_sent_time_ = 0; + last_interest_sent_seq_ = 0; + + // Aggregated interests setup + bool aggregated_interests_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS, + aggregated_interests_on); + if (aggregated_interests_on) { + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) + max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr)); + else + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + + max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_, + 1 + MAX_SUFFIXES_IN_MANIFEST); + } + LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_; + + max_sent_int_ = + std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); + + pacing_timer_->cancel(); + pacing_timer_on_ = false; + + // delete all timeouts and future nacks + timeouts_or_nacks_.clear(); + + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + RTC_INTEREST_LIFETIME); + + // init state params + state_->initParams(); +} + +// private +void RTCTransportProtocol::reset() { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called"; + initParams(); + newRound(); +} + +void RTCTransportProtocol::inactiveProducer() { + // when the producer is inactive we reset the consumer state + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_ + << ", max_sync_win_: " << max_sync_win_; + + // names/packets var + indexer_verifier_->reset(); + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); + + ldr_->clear(); +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { + if (ec) { + return; + } + + auto ptr = self.lock(); + + if (!ptr || !ptr->isRunning()) { + return; + } + + auto &state = ptr->state_; + + // saving counters that will be reset on new round + uint32_t sent_retx = state->getSentRtxInRound(); + uint32_t received_bytes = + (state->getReceivedBytesInRound() + // data packets received + state->getReceivedFecBytesInRound()); // fec packets received + uint32_t sent_interest = state->getSentInterestInRound(); + uint32_t lost_data = state->getLostData(); + uint32_t definitely_lost = state->getDefinitelyLostPackets(); + uint32_t recovered_losses = state->getRecoveredLosses(); + uint32_t received_nacks = state->getReceivedNacksInRound(); + uint32_t received_fec = state->getReceivedFecPackets(); + + // update sync state if needed + double cache_rate = state->getPacketFromCacheRatio(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + + if (ptr->current_state_ == SyncState::in_sync) { + if (cache_rate > MAX_DATA_FROM_CACHE) { + ptr->current_state_ = SyncState::catch_up; + } + } else { + if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && + cache_rate < MAX_DATA_FROM_CACHE) { + ptr->current_state_ = SyncState::in_sync; + } + } + + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Calling updateSyncWindow in newRound function"; + ptr->updateSyncWindow(); + + ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + definitely_lost, recovered_losses, received_nacks, + received_fec); + ptr->fwd_strategy_.checkStrategy(); + ptr->newRound(); + }); +} + +void RTCTransportProtocol::discoveredRtt() { + start_send_interest_ = true; + uint32_t strategy; + socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); + ldr_->changeRecoveryStrategy( + (interface::RtcTransportRecoveryStrategies)strategy); + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) ldr_->setContentSharingMode(); + ldr_->turnOnRecovery(); + ldr_->onNewRound(false); + + // set forwarding strategy switch if selected + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + Prefix prefix(*name, 128); + fwd_strategy_.initFwdStrategy( + portal_, prefix, state_.get(), + (interface::RtcTransportRecoveryStrategies)strategy); + updateSyncWindow(); +} + +void RTCTransportProtocol::computeMaxSyncWindow() { + double production_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + if (production_rate == 0.0 || packet_size == 0.0) { + // the consumer has no info about the producer, + // keep the previous maxCWin + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Returning in computeMaxSyncWindow because: prod_rate: " + << (production_rate == 0.0) + << " || packet_size: " << (packet_size == 0.0); + return; + } + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE)) + production_rate = MIN_PROD_RATE_SHARING_MODE; + + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; + + max_sync_win_ = (uint32_t)ceil( + (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / + packet_size); + + max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow()); +} + +void RTCTransportProtocol::updateSyncWindow() { + computeMaxSyncWindow(); + + if (max_sync_win_ == INITIAL_WIN_MAX) { + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return; + + current_sync_win_ = INITIAL_WIN; + scheduleNextInterests(); + return; + } + + double prod_rate = state_->getProducerRate(); + double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; + double packet_size = state_->getAveragePacketSize(); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE)) + prod_rate = MIN_PROD_RATE_SHARING_MODE; + + // if some of the info are not available do not update the current win + if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0); + + current_sync_win_ += + ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); + + if (current_state_ == SyncState::catch_up) { + current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; + } + + uint32_t min_win = WIN_MIN; + bool aggregated_data_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA, + aggregated_data_on); + if (aggregated_data_on) { + min_win = WIN_MIN_WITH_AGGREGARED_DATA; + min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3)); + } + + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); + current_sync_win_ = std::max(current_sync_win_, min_win); + } + + scheduleNextInterests(); +} + +void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + if (!start_send_interest_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq; + interest_name->setSuffix(seq); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq; + interest_name->setSuffix(seq); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + // we got a timeout for this packet so it is not pending anymore + interest_name->setSuffix(seq); + state_->onSendNewInterest(interest_name); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::scheduleNextInterests() { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; + + if (!isRunning() && !is_first_) { + return; + } + + if (pacing_timer_on_) { + return; // wait pacing timer for the next send + } + + if (!start_send_interest_) { + return; // RTT discovering phase is not finished so + // do not start to send interests + } + + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; + // here we keep seding the same interest until the producer + // does not start again + if (indexer_verifier_->checkNextSuffix() != 0) { + // the producer just become inactive, reset the state + inactiveProducer(); + } + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + uint32_t next_seg = 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg; + interest_name->setSuffix(next_seg); + + if (portal_->interestIsPending(*interest_name)) { + // if interest 0 is already pending we return + return; + } + + sendInterest(*interest_name); + state_->onSendNewInterest(interest_name); + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Pending interest number: " << state_->getPendingInterestNumber() + << " -- current_sync_win_: " << current_sync_win_; + + uint32_t pending = state_->getPendingInterestNumber(); + uint32_t pending_fec = state_->getPendingFecPackets(); + + if ((pending - pending_fec) >= current_sync_win_) + return; // no space in the window + + // XXX double check if aggregated interests are still working here + if ((current_sync_win_ - (pending - pending_fec)) < + max_aggregated_interest_) { + if (scheduler_timer_on_) return; // timer already scheduled + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint64_t time = now - last_interest_sent_time_; + if (time < WAIT_FOR_INTEREST_BATCH) { + uint64_t next = WAIT_FOR_INTEREST_BATCH - time; + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->scheduler_timer_on_) return; + ptr->scheduler_timer_on_ = false; + ptr->scheduleNextInterests(); + } + }); + return; // wait for the timer + } + } + + scheduler_timer_on_ = false; + scheduler_timer_->cancel(); + + // skip nacked pacekts + if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) { + indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); + } + + // skip received packets + uint32_t max_received = state_->getHighestSeqReceivedInOrder(); + if (indexer_verifier_->checkNextSuffix() <= max_received) { + indexer_verifier_->jumpToIndex(max_received + 1); + } + + uint32_t sent_interests = 0; + uint32_t sent_packets = 0; + uint32_t aggregated_counter = 0; + Name *name = nullptr; + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; + + while (((state_->getPendingInterestNumber() - + state_->getPendingFecPackets()) < current_sync_win_) && + (sent_interests < max_sent_int_)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "In while loop. Window size: " << current_sync_win_; + + uint32_t next_seg = indexer_verifier_->getNextSuffix(); + name->setSuffix(next_seg); + + // send the packet only if: + // 1) it is not pending yet (not true for rtx) + // 2) the packet is not received or def lost + // 3) is not in the rtx list + // 4) is fec and is not in order (!= last sent + 1) + PacketState packet_state = state_->getPacketState(next_seg); + if (portal_->interestIsPending(*name) || + packet_state == PacketState::RECEIVED || + packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) || + (indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "skip interest " << next_seg << " because: pending " + << portal_->interestIsPending(*name) << ", recv or lost" + << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg)) + << ", is old fec " + << ((indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)); + continue; + } + + if (aggregated_counter == 0) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg; + interest_name = *name; + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg; + additional_suffixes[aggregated_counter - 1] = next_seg; + } + + last_interest_sent_seq_ = next_seg; + state_->onSendNewInterest(name); + aggregated_counter++; + + if (aggregated_counter >= max_aggregated_interest_) { + sent_packets++; + sent_interests++; + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); + aggregated_counter = 0; + } + } + + // exiting the while we may have some pending interest to send + if (aggregated_counter != 0) { + sent_packets++; + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + } + + if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) < + current_sync_win_) { + // we still have space in the window but we already sent too many packets + // wait PACING_WAIT to avoid drops in the kernel + + pacing_timer_on_ = true; + pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->pacing_timer_on_) return; + + ptr->pacing_timer_on_ = false; + ptr->scheduleNextInterests(); + } + }); + } +} + +void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, + const Name &name) { + uint32_t segment_number = name.getSuffix(); + + if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) { + // this is a timeout on a probe, do nothing + return; + } + + PacketState state = state_->getPacketState(segment_number); + if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) { + // we may recover a packets using fec, ignore this timer + return; + } + + timeouts_or_nacks_.insert(segment_number); + if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && + segment_number <= state_->getHighestSeqReceived()) { + // we retransmit packets only if the producer is active, otherwise we + // use timeouts to avoid to send too much traffic + // + // a timeout is sent using RTX only if it is an old packet. if it is for a + // seq number that we didn't reach yet, we send the packet using the normal + // schedule next interest + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "handle timeout for packet " << segment_number << " using rtx"; + if (ldr_->isRtxOn()) { + if (indexer_verifier_->isFec(segment_number)) { + // if this is a fec packet we do not recover it with rtx so we consider + // the packet to be lost + ldr_->onTimeout(segment_number, true); + state_->onTimeout(segment_number, true); + } else { + ldr_->onTimeout(segment_number, false); + state_->onTimeout(segment_number, false); + } + } else { + // in this case we wil never recover the timeout + ldr_->onTimeout(segment_number, true); + state_->onTimeout(segment_number, true); + } + scheduleNextInterests(); + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number + << " using normal interests"; + + if (segment_number < indexer_verifier_->checkNextSuffix()) { + // this is a timeout for a packet that will be generated in the future but + // we are asking for higher sequence numbers. we need to go back like in the + // case of future nacks + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << segment_number; + // add an extra space in the window + indexer_verifier_->jumpToIndex(segment_number); + } + + state_->onTimeout(segment_number, false); + sendInterestForTimeout(segment_number); + scheduleNextInterests(); +} + +void RTCTransportProtocol::onNack(const ContentObject &content_object) { + struct nack_packet_t *nack = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = nack->getProductionSegment(); + uint32_t nack_segment = content_object.getName().getSuffix(); + bool is_rtx = ldr_->isRtx(nack_segment); + + // check if the packet got a timeout + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment + << ". Production segment: " << production_seg; + + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(nack_segment); + if (tn_it != timeouts_or_nacks_.end() || is_rtx) { + compute_stats = false; + // remove packets from timeouts_or_nacks only in case of a past nack + } + + state_->onNackPacketReceived(content_object, compute_stats); + ldr_->onNackPacketReceived(content_object); + + // both in case of past and future nack we jump to the + // production segment in the nack. In case of past nack we will skip unneded + // interest (this is already done in the scheduleNextInterest in any case) + // while in case of future nacks we can go back in time and ask again for the + // content that generated the nack + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On nack next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); + + if (production_seg > nack_segment) { + // remove the nack is it exists + if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + + state_->onJumpForward(production_seg); + // the client is asking for content in the past + // switch to catch up state and increase the window + // this is true only if the packet is not an RTX + if (!is_rtx) current_state_ = SyncState::catch_up; + } else { + // if production_seg == nack_segment we consider this a future nack, since + // production_seg is not yet created. this may happen in case of low + // production rate (e.g. ping at 1pps) + + // if a future nack was also retransmitted add it to the timeout_or_nacks + // set + if (is_rtx) timeouts_or_nacks_.insert(nack_segment); + + // the client is asking for content in the future + // switch to in sync state and decrease the window + current_state_ = SyncState::in_sync; + } + updateSyncWindow(); +} + +void RTCTransportProtocol::onProbe(const ContentObject &content_object) { + uint32_t suffix = content_object.getName().getSuffix(); + ParamsRTC params = RTCState::getProbeParams(content_object); + + if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) { + fec::FECType fec_type = params.fec_type; + + if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) { + // Update FEC type + fec_type_ = fec_type; + + // Enable FEC + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, + std::placeholders::_1), + fec::FECBase::BufferRequested(0)); + + // Update FEC parameters + indexer_verifier_->enableFec(fec_type); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type), + fec::FECUtils::getSourceSymbols(fec_type)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); + } else if (fec_type == fec::FECType::UNKNOWN) { + indexer_verifier_->disableFec(); + } + } + + if (!state_->onProbePacketReceived(content_object)) return; + + // As for NACKs, set next_segment + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "on probe next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << params.prod_seg; + indexer_verifier_->jumpToIndex(params.prod_seg); + + bool loss_detected = ldr_->onProbePacketReceived(content_object); + // we are not out of sync here but we are starting to download content from + // the cache, maybe beacuse the production rate increased suddenly. for this + // reason we put the state to catch up to increase the window + if (loss_detected) current_state_ = SyncState::catch_up; + updateSyncWindow(); +} + +void RTCTransportProtocol::onContentObjectReceived( + Interest &interest, ContentObject &content_object, std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received content object of size: " << content_object.payloadSize(); + + uint32_t segment_number = content_object.getName().getSuffix(); + PayloadType payload_type = content_object.getPayloadType(); + PacketState state; + + ContentObject *content_ptr = &content_object; + ContentObject::Ptr manifest_ptr = nullptr; + + bool is_probe = + ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_fec = indexer_verifier_->isFec(segment_number); + bool is_manifest = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST; + bool is_data = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA; + bool compute_stats = is_data || is_manifest; + + ec = make_error_code(protocol_error::not_reassemblable); + + // A helper function to process manifests or data packets received + auto onDataPacketReceived = [this](ContentObject &content_object, + bool compute_stats) { + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object, compute_stats); + updateSyncWindow(); + }; + + // First verify the packet signature and apply the corresponding policy + auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec); + indexer_verifier_->applyPolicy(interest, content_object, false, policy); + + if (is_probe) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onProbe(content_object); + return; + } + + if (is_nack) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onNack(content_object); + return; + } + + // content_ptr will point either to the input data packet or to a manifest + // whose FEC header has been removed + if (is_manifest) { + manifest_ptr = removeFecHeader(content_object); + if (manifest_ptr) { + content_ptr = manifest_ptr.get(); + } + } + + // From there, the packet is either a FEC, a manifest or a data packet. + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; + + // Do not count timed out packets in stats + auto tn_it = timeouts_or_nacks_.find(segment_number); + if (tn_it != timeouts_or_nacks_.end()) { + compute_stats = false; + timeouts_or_nacks_.erase(tn_it); + } + + // Do not count retransmissions or losses in stats + if (ldr_->isRtx(segment_number) || + ldr_->isPossibleLossWithNoRtx(segment_number)) { + compute_stats = false; + } + + // Fetch packet state + state = state_->getPacketState(segment_number); + + // Check if the packet is a retransmission + if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { + if (is_data || is_manifest) { + uint64_t rtt = ldr_->getRtxRtt(segment_number); + state_->onPacketRecoveredRtx(content_object, rtt); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + + // The packet is considered received, return early + onDataPacketReceived(*content_ptr, compute_stats); + // this is a rtx but we may need to feed it in the decoder + decodePacket(content_object, is_manifest); + return; + } + + if (is_fec) { + state_->onFecPacketRecoveredRtx(content_object); + } + } + + // Fetch packet state again; it may have changed + state = state_->getPacketState(segment_number); + + // Check if the packet was already received + if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received duplicated content " << segment_number << ", drop it"; + ec = make_error_code(protocol_error::duplicated_content); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + if (!is_fec) { + state_->dataToBeReceived(segment_number); + } + + // send packet to the decoder + decodePacket(content_object, is_manifest); + + // We can return early if FEC + if (is_fec) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number; + state_->onFecPacketReceived(content_object); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + // The packet may have been already sent to the app by the decoder, check + // again if it is already received + state = state_->getPacketState( + segment_number); // state == RECEIVED or TO_BE_RECEIVED + + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << (is_manifest ? "Received manifest " : "Received data ") + << segment_number; + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + state_->onDataPacketReceived(*content_ptr, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + } + + onDataPacketReceived(*content_ptr, compute_stats); +} + +void RTCTransportProtocol::sendStatsToApp( + uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, + uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec) { + if (*stats_summary_) { + // Send the stats to the app + stats_->updateQueuingDelay(state_->getQueuing()); + + // stats_->updateInterestFecTx(0); //todo must be implemented + // stats_->updateBytesFecRecv(0); //todo must be implemented + + stats_->updateRetxCount(retx_count); + stats_->updateBytesRecv(received_bytes); + stats_->updateInterestTx(sent_interests); + stats_->updateReceivedNacks(received_nacks); + stats_->updateReceivedFEC(received_fec); + + stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); + stats_->updateLossRatio(state_->getPerSecondLossRate()); + uint64_t rtt = state_->getAvgRTT(); + stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000)); + + stats_->updateQueuingDelay(state_->getQueuing()); + stats_->updateLostData(lost_data); + stats_->updateDefinitelyLostData(definitely_lost); + stats_->updateRecoveredData(recovered_losses); + stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); + (*stats_summary_)(*socket_->getInterface(), *stats_); + bool in_congestion = rc_->inCongestionState(); + stats_->updateCongestionState(in_congestion); + double residual_losses = state_->getResidualLossRate(); + stats_->updateResidualLossRate(residual_losses); + stats_->updateQualityScore(state_->getQualityScore()); + + // set alerts + if (rtt > MAX_RTT) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY); + + if (in_congestion) + stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION); + else + stats_->clearAlert( + interface::TransportStatistics::statsAlerts::CONGESTION); + + if (residual_losses > MAX_RESIDUAL_LOSSES) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES); + } +} + +void RTCTransportProtocol::decodePacket(ContentObject &content_object, + bool is_manifest) { + if (!fec_decoder_) return; + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << content_object.getName() << " to FEC decoder"; + + uint32_t offset = + is_manifest + ? (uint32_t)content_object.headerSize() + : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE); + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); +} + +void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + + for (auto &packet : packets) { + uint32_t seq_number = packet.getIndex(); + uint32_t metadata = packet.getMetadata(); + fec::buffer buffer = packet.getBuffer(); + + PayloadType payload_type = static_cast<PayloadType>(metadata); + switch (payload_type) { + case PayloadType::DATA: + case PayloadType::MANIFEST: + break; + case PayloadType::UNSPECIFIED: + default: + payload_type = PayloadType::DATA; + break; + } + + switch (state_->getPacketState(seq_number)) { + case PacketState::RECEIVED: + case PacketState::TO_BE_RECEIVED: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet " << seq_number << " already received"; + break; + } + default: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << seq_number << " through FEC"; + + if (payload_type == PayloadType::MANIFEST) { + name->setSuffix(seq_number); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); + interest->setName(*name); + + auto content_object = toContentObject( + *name, format, payload_type, buffer->data(), buffer->length()); + + processManifest(*interest, *content_object); + } + + state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length()); + ldr_->onPacketRecoveredFec(seq_number); + + if (payload_type == PayloadType::DATA) { + verifier_->onDataRecoveredFec(seq_number); + reassembly_->reassemble(*buffer, seq_number); + } + + break; + } + } + } +} + +void RTCTransportProtocol::processManifest(Interest &interest, + ContentObject &manifest) { + auth::VerificationPolicy policy = verifier_->processManifest(manifest); + indexer_verifier_->applyPolicy(interest, manifest, false, policy); +} + +ContentObject::Ptr RTCTransportProtocol::removeFecHeader( + const ContentObject &content_object) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) { + return nullptr; + } + + size_t fec_header_size = fec_decoder_->getFecHeaderSize(false); + const uint8_t *payload = + content_object.data() + content_object.headerSize() + fec_header_size; + size_t payload_size = content_object.payloadSize() - fec_header_size; + + ContentObject::Ptr co = + toContentObject(content_object.getName(), content_object.getFormat(), + content_object.getPayloadType(), payload, payload_size); + + return co; +} + +ContentObject::Ptr RTCTransportProtocol::toContentObject( + const Name &name, Packet::Format format, PayloadType payload_type, + const uint8_t *payload, std::size_t payload_size, + std::size_t additional_header_size) { + // Recreate ContentObject + ContentObject::Ptr co = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + format, additional_header_size); + co->updateLength(payload_size); + co->append(payload_size); + co->trimStart(co->headerSize()); + + // Copy payload + std::memcpy(co->writableData(), payload, payload_size); + + // Restore network headers and some fields + co->prepend(co->headerSize()); + co->setName(name); + co->setPayloadType(payload_type); + + return co; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport |