diff options
Diffstat (limited to 'libtransport/src/protocols/rtc')
39 files changed, 4014 insertions, 939 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt index 873b345d0..be8e0189c 100644 --- a/libtransport/src/protocols/rtc/CMakeLists.txt +++ b/libtransport/src/protocols/rtc/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -16,22 +16,43 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.h ) list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.cc ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc index abaca6ad9..abb234757 100644 --- a/libtransport/src/protocols/rtc/probe_handler.cc +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/utils/chrono_typedefs.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> @@ -27,6 +28,7 @@ ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback, : probe_interval_(0), max_probes_(0), sent_probes_(0), + recv_probes_(0), probe_timer_(std::make_unique<asio::steady_timer>(io_service)), rand_eng_((std::random_device())()), distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ), @@ -39,17 +41,25 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) { if (it == pending_probes_.end()) return 0; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t rtt = now - it->second; if (rtt < 1) rtt = 1; pending_probes_.erase(it); + recv_probes_++; return rtt; } +double ProbeHandler::getProbeLossRate() { + return 1.0 - ((double)recv_probes_ / (double)sent_probes_); +} + +void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) { + assert(min <= max && min >= MIN_PROBE_SEQ); + distr_ = std::uniform_int_distribution<uint32_t>(min, max); +} + void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) { stopProbes(); probe_interval_ = probe_interval; @@ -60,6 +70,7 @@ void ProbeHandler::stopProbes() { probe_interval_ = 0; max_probes_ = 0; sent_probes_ = 0; + recv_probes_ = 0; probe_timer_->cancel(); } @@ -67,9 +78,7 @@ void ProbeHandler::sendProbes() { if (probe_interval_ == 0) return; if (max_probes_ != 0 && sent_probes_ >= max_probes_) return; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t seq = distr_(rand_eng_); pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now)); @@ -92,14 +101,25 @@ void ProbeHandler::sendProbes() { std::weak_ptr<ProbeHandler> self(shared_from_this()); probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); - probe_timer_->async_wait([self](std::error_code ec) { + probe_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - if (auto s = self.lock()) { + auto s = self.lock(); + if (s) { s->sendProbes(); } }); } +ProbeType ProbeHandler::getProbeType(uint32_t seq) { + if (MIN_INIT_PROBE_SEQ <= seq && seq <= MAX_INIT_PROBE_SEQ) { + return ProbeType::INIT; + } + if (MIN_RTT_PROBE_SEQ <= seq && seq <= MAX_RTT_PROBE_SEQ) { + return ProbeType::RTT; + } + return ProbeType::NOT_PROBE; +} + } // namespace rtc } // namespace protocol diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h index e34b23df0..2de908176 100644 --- a/libtransport/src/protocols/rtc/probe_handler.h +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -26,6 +26,12 @@ namespace protocol { namespace rtc { +enum class ProbeType { + NOT_PROBE, + INIT, + RTT, +}; + class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { public: using SendProbeCallback = std::function<void(uint32_t)>; @@ -35,31 +41,39 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { ~ProbeHandler(); - // if the function returns 0 the probe is not valaid + // If the function returns 0 the probe is not valid. uint64_t getRtt(uint32_t seq); - // reset the probes parameters. it stop the current probing. - // to restar call sendProbes. - // probe_interval = 0 means that no event will be scheduled - // max_probe = 0 means no limit to the number of probe to send + // this function may return a residual loss rate higher than the real one if + // we don't wait enough time for the probes to come back + double getProbeLossRate(); + + // Set the probe suffix range [min, max] + void setSuffixRange(uint32_t min, uint32_t max); + + // Reset the probes parameters and stops the current probing. + // probe_interval = 0 means that no event will be scheduled. + // max_probe = 0 means no limit to the number of probe to send. void setProbes(uint32_t probe_interval, uint32_t max_probes); - // stop to schedule probes void stopProbes(); void sendProbes(); + static ProbeType getProbeType(uint32_t seq); + private: uint32_t probe_interval_; // us uint32_t max_probes_; // packets uint32_t sent_probes_; // packets + uint32_t recv_probes_; // packets std::unique_ptr<asio::steady_timer> probe_timer_; - // map from seqnumber to timestamp + // Map from packet suffixes to timestamp std::unordered_map<uint32_t, uint64_t> pending_probes_; - // random generator + // Random generator std::default_random_engine rand_eng_; std::uniform_int_distribution<uint32_t> distr_; diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index 0cb4cda1d..df6522471 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -22,7 +22,7 @@ #include <protocols/rtc/rtc.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_indexer.h> -#include <protocols/rtc/rtc_rc_queue.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> #include <algorithm> @@ -37,13 +37,15 @@ using namespace interface; RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), - new DatagramReassembly(icn_socket, this)), + new RtcReassembly(icn_socket, this)), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); - round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); scheduler_timer_ = - std::make_unique<asio::steady_timer>(portal_->getIoService()); - pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); } RTCTransportProtocol::~RTCTransportProtocol() {} @@ -61,25 +63,52 @@ std::size_t RTCTransportProtocol::transportHeaderLength() { // private void RTCTransportProtocol::initParams() { TransportProtocol::reset(); + fwd_strategy_.setCallback(on_fwd_strategy_); - rc_ = std::make_shared<RTCRateControlQueue>(); + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + uint32_t max_unverified_delay; + socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME, + max_unverified_delay); + + rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( - indexer_verifier_.get(), - std::bind(&RTCTransportProtocol::sendRtxInterest, this, - std::placeholders::_1), - portal_->getIoService()); + indexer_verifier_.get(), portal_->getThread().getIoService(), + interface::RtcTransportRecoveryStrategies::RTX_ONLY, + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendRtxInterest(seq); + } + }, + on_rec_strategy_); + verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), - std::bind(&RTCTransportProtocol::sendProbeInterest, this, - std::placeholders::_1), - std::bind(&RTCTransportProtocol::discoveredRtt, this), - portal_->getIoService()); + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendProbeInterest(seq); + } + }, + [self]() { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->discoveredRtt(); + } + }, + portal_->getThread().getIoService()); + state_->initParams(); rc_->setState(state_); - // TODO: for the moment we keep the congestion control disabled - // rc_->tunrOnRateControl(); - ldr_->setState(state_); + rc_->turnOnRateControl(); + ldr_->setState(state_.get()); + ldr_->setRateControl(rc_.get()); + verifier_->setState(state_); // protocol state start_send_interest_ = false; @@ -102,6 +131,10 @@ void RTCTransportProtocol::initParams() { } #else max_aggregated_interest_ = 1; + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) { + LOG(INFO) << "Max Aggregated: " << max_aggr; + max_aggregated_interest_ = std::stoul(std::string(max_aggr)); + } #endif max_sent_int_ = @@ -131,6 +164,7 @@ void RTCTransportProtocol::initParams() { indexer_verifier_->setNFec(0); ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), fec::FECUtils::getSourceSymbols(fec_type_)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); } else { indexer_verifier_->disableFec(); } @@ -162,61 +196,97 @@ void RTCTransportProtocol::inactiveProducer() { void RTCTransportProtocol::newRound() { round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); - // TODO pass weak_ptr here - round_timer_->async_wait([this, n{number_}](std::error_code ec) { - if (ec) return; - if (n != number_) { + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { + if (ec) { return; } + auto ptr = self.lock(); + + if (!ptr || !ptr->isRunning()) { + return; + } + + auto &state = ptr->state_; + // saving counters that will be reset on new round - uint32_t sent_retx = state_->getSentRtxInRound(); - uint32_t received_bytes = state_->getReceivedBytesInRound(); - uint32_t sent_interest = state_->getSentInterestInRound(); - uint32_t lost_data = state_->getLostData(); - uint32_t definitely_lost = state_->getDefinitelyLostPackets(); - uint32_t recovered_losses = state_->getRecoveredLosses(); - uint32_t received_nacks = state_->getReceivedNacksInRound(); - uint32_t received_fec = state_->getReceivedFecPackets(); - - bool in_sync = (current_state_ == SyncState::in_sync); - ldr_->onNewRound(in_sync); - state_->onNewRound((double)ROUND_LEN, in_sync); - rc_->onNewRound((double)ROUND_LEN); + uint32_t sent_retx = state->getSentRtxInRound(); + uint32_t received_bytes = + (state->getReceivedBytesInRound() + // data packets received + state->getReceivedFecBytesInRound()); // fec packets received + uint32_t sent_interest = state->getSentInterestInRound(); + uint32_t lost_data = state->getLostData(); + uint32_t definitely_lost = state->getDefinitelyLostPackets(); + uint32_t recovered_losses = state->getRecoveredLosses(); + uint32_t received_nacks = state->getReceivedNacksInRound(); + uint32_t received_fec = state->getReceivedFecPackets(); + + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); // update sync state if needed - if (current_state_ == SyncState::in_sync) { - double cache_rate = state_->getPacketFromCacheRatio(); + if (ptr->current_state_ == SyncState::in_sync) { + double cache_rate = state->getPacketFromCacheRatio(); if (cache_rate > MAX_DATA_FROM_CACHE) { - current_state_ = SyncState::catch_up; + ptr->current_state_ = SyncState::catch_up; } } else { - double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION; - double received_rate = state_->getReceivedRate(); - uint32_t round_without_nacks = state_->getRoundsWithoutNacks(); - double cache_ratio = state_->getPacketFromCacheRatio(); + double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION; + double received_rate = + state->getReceivedRate() + state->getRecoveredFecRate(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + double cache_ratio = state->getPacketFromCacheRatio(); if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { - current_state_ = SyncState::in_sync; + ptr->current_state_ = SyncState::in_sync; } } DLOG_IF(INFO, VLOG_IS_ON(3)) << "Calling updateSyncWindow in newRound function"; - updateSyncWindow(); + ptr->updateSyncWindow(); - sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, - definitely_lost, recovered_losses, received_nacks, - received_fec); - newRound(); + ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + definitely_lost, recovered_losses, received_nacks, + received_fec); + ptr->fwd_strategy_.checkStrategy(); + ptr->newRound(); }); } void RTCTransportProtocol::discoveredRtt() { start_send_interest_ = true; - ldr_->turnOnRTX(); + uint32_t strategy; + socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); + ldr_->changeRecoveryStrategy( + (interface::RtcTransportRecoveryStrategies)strategy); + ldr_->turnOnRecovery(); ldr_->onNewRound(false); + + // set forwarding strategy switch if selected + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + Prefix prefix(*name, 128); + if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::BEST_PATH); + } else if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::REPLICATION); + } else if ((interface::RtcTransportRecoveryStrategies)strategy == + interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), + RTCForwardingStrategy::BOTH); + } + updateSyncWindow(); } @@ -244,7 +314,7 @@ void RTCTransportProtocol::computeMaxSyncWindow() { (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); - max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); + max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow()); } void RTCTransportProtocol::updateSyncWindow() { @@ -259,25 +329,14 @@ void RTCTransportProtocol::updateSyncWindow() { } double prod_rate = state_->getProducerRate(); - double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC; + double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = state_->getAveragePacketSize(); // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { - double fec_interest_overhead = (double)state_->getPendingFecPackets() / - (double)(state_->getPendingInterestNumber() - - state_->getPendingFecPackets()); - - double fec_overhead = - std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead); - - prod_rate += (prod_rate * fec_overhead); - current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); uint32_t buffer = PRODUCER_BUFFER_MS; - if (rtt > 150) - buffer = buffer * 2; // if the RTT is too large we increase the - // the size of the buffer + current_sync_win_ += ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); @@ -285,8 +344,17 @@ void RTCTransportProtocol::updateSyncWindow() { current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; } + uint32_t min_win = WIN_MIN; + bool aggregated_data_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA, + aggregated_data_on); + if (aggregated_data_on) { + min_win = WIN_MIN_WITH_AGGREGARED_DATA; + min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3)); + } + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); - current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + current_sync_win_ = std::max(current_sync_win_, min_win); } scheduleNextInterests(); @@ -322,7 +390,7 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq; interest_name->setSuffix(seq); sendInterest(*interest_name); } @@ -330,13 +398,18 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { void RTCTransportProtocol::scheduleNextInterests() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; - if (!isRunning() && !is_first_) return; + if (!isRunning() && !is_first_) { + return; + } - if (pacing_timer_on_) return; // wait pacing timer for the next send + if (pacing_timer_on_) { + return; // wait pacing timer for the next send + } - if (!start_send_interest_) + if (!start_send_interest_) { return; // RTT discovering phase is not finished so // do not start to send interests + } if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; @@ -352,7 +425,7 @@ void RTCTransportProtocol::scheduleNextInterests() { &interest_name); uint32_t next_seg = 0; - DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg; interest_name->setSuffix(next_seg); if (portal_->interestIsPending(*interest_name)) { @@ -370,28 +443,37 @@ void RTCTransportProtocol::scheduleNextInterests() { << " -- current_sync_win_: " << current_sync_win_; uint32_t pending = state_->getPendingInterestNumber(); - if (pending >= current_sync_win_) return; // no space in the window + uint32_t pending_fec = state_->getPendingFecPackets(); + + if ((pending - pending_fec) >= current_sync_win_) + return; // no space in the window - if ((current_sync_win_ - pending) < max_aggregated_interest_) { + // XXX double check if aggregated interests are still working here + if ((current_sync_win_ - (pending - pending_fec)) < + max_aggregated_interest_) { if (scheduler_timer_on_) return; // timer already scheduled - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t time = now - last_interest_sent_time_; if (time < WAIT_FOR_INTEREST_BATCH) { uint64_t next = WAIT_FOR_INTEREST_BATCH - time; scheduler_timer_on_ = true; scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); - scheduler_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - if (!scheduler_timer_on_) return; - scheduler_timer_on_ = false; - scheduleNextInterests(); + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->scheduler_timer_on_) return; + + ptr->scheduler_timer_on_ = false; + ptr->scheduleNextInterests(); + } }); - return; // whait for the timer + return; // wait for the timer } } @@ -403,7 +485,7 @@ void RTCTransportProtocol::scheduleNextInterests() { indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); } - // skipe received packets + // skip received packets if (indexer_verifier_->checkNextSuffix() <= state_->getHighestSeqReceivedInOrder()) { indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); @@ -417,7 +499,8 @@ void RTCTransportProtocol::scheduleNextInterests() { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; - while ((state_->getPendingInterestNumber() < current_sync_win_) && + while (((state_->getPendingInterestNumber() - + state_->getPendingFecPackets()) < current_sync_win_) && (sent_interests < max_sent_int_)) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "In while loop. Window size: " << current_sync_win_; @@ -428,19 +511,20 @@ void RTCTransportProtocol::scheduleNextInterests() { // send the packet only if: // 1) it is not pending yet (not true for rtx) - // 2) the packet is not received or lost + // 2) the packet is not received or def lost // 3) is not in the rtx list // 4) is fec and is not in order (!= last sent + 1) + PacketState packet_state = state_->getPacketState(next_seg); if (portal_->interestIsPending(*name) || - state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN || - ldr_->isRtx(next_seg) || + packet_state == PacketState::RECEIVED || + packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) || (indexer_verifier_->isFec(next_seg) && next_seg != last_interest_sent_seq_ + 1)) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "skip interest " << next_seg << " because: pending " - << portal_->interestIsPending(*name) << ", recv " - << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN) - << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec " + << portal_->interestIsPending(*name) << ", recv or lost" + << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg)) + << ", is old fec " << ((indexer_verifier_->isFec(next_seg) && next_seg != last_interest_sent_seq_ + 1)); continue; @@ -462,10 +546,7 @@ void RTCTransportProtocol::scheduleNextInterests() { sent_packets++; sent_interests++; sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); - last_interest_sent_time_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); aggregated_counter = 0; } } @@ -473,25 +554,29 @@ void RTCTransportProtocol::scheduleNextInterests() { // exiting the while we may have some pending interest to send if (aggregated_counter != 0) { sent_packets++; - last_interest_sent_time_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); } - if (state_->getPendingInterestNumber() < current_sync_win_) { + if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) < + current_sync_win_) { // we still have space in the window but we already sent too many packets // wait PACING_WAIT to avoid drops in the kernel pacing_timer_on_ = true; pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); - scheduler_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - if (!pacing_timer_on_) return; - pacing_timer_on_ = false; - scheduleNextInterests(); + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->pacing_timer_on_) return; + + ptr->pacing_timer_on_ = false; + ptr->scheduleNextInterests(); + } }); } } @@ -500,13 +585,13 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, const Name &name) { uint32_t segment_number = name.getSuffix(); - if (segment_number >= MIN_PROBE_SEQ) { + if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) { // this is a timeout on a probe, do nothing return; } - PacketState state = state_->isReceivedOrLost(segment_number); - if (state != PacketState::UNKNOWN) { + PacketState state = state_->getPacketState(segment_number); + if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) { // we may recover a packets using fec, ignore this timer return; } @@ -524,13 +609,18 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number << " using rtx"; if (ldr_->isRtxOn()) { - ldr_->onTimeout(segment_number); - if (indexer_verifier_->isFec(segment_number)) + if (indexer_verifier_->isFec(segment_number)) { + // if this is a fec packet we do not recover it with rtx so we consider + // the packet to be lost + ldr_->onTimeout(segment_number, true); state_->onTimeout(segment_number, true); - else + } else { + ldr_->onTimeout(segment_number, false); state_->onTimeout(segment_number, false); + } } else { // in this case we wil never recover the timeout + ldr_->onTimeout(segment_number, true); state_->onTimeout(segment_number, true); } scheduleNextInterests(); @@ -559,7 +649,7 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, void RTCTransportProtocol::onNack(const ContentObject &content_object) { struct nack_packet_t *nack = (struct nack_packet_t *)content_object.getPayload()->data(); - uint32_t production_seg = nack->getProductionSegement(); + uint32_t production_seg = nack->getProductionSegment(); uint32_t nack_segment = content_object.getName().getSuffix(); bool is_rtx = ldr_->isRtx(nack_segment); @@ -592,6 +682,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // remove the nack is it exists if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + state_->onJumpForward(production_seg); + verifier_->onJumpForward(production_seg); // the client is asking for content in the past // switch to catch up state and increase the window // this is true only if the packet is not an RTX @@ -618,11 +710,9 @@ void RTCTransportProtocol::onProbe(const ContentObject &content_object) { bool valid = state_->onProbePacketReceived(content_object); if (!valid) return; - struct nack_packet_t *probe = - (struct nack_packet_t *)content_object.getPayload()->data(); - uint32_t production_seg = probe->getProductionSegement(); + uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg; - // as for the nacks set next_segment + // As for the nacks set next_segment DLOG_IF(INFO, VLOG_IS_ON(3)) << "on probe next seg = " << indexer_verifier_->checkNextSuffix() << ", jump to " << production_seg; @@ -636,12 +726,39 @@ void RTCTransportProtocol::onContentObjectReceived( Interest &interest, ContentObject &content_object, std::error_code &ec) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content object of size: " << content_object.payloadSize(); - uint32_t payload_size = content_object.payloadSize(); + uint32_t segment_number = content_object.getName().getSuffix(); + PayloadType payload_type = content_object.getPayloadType(); + PacketState state; + + ContentObject *content_ptr = &content_object; + ContentObject::Ptr manifest_ptr = nullptr; + + bool is_probe = + ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_fec = indexer_verifier_->isFec(segment_number); + bool is_manifest = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST; + bool is_data = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA; + bool compute_stats = is_data || is_manifest; ec = make_error_code(protocol_error::not_reassemblable); - if (segment_number >= MIN_PROBE_SEQ) { + // A helper function to process manifests or data packets received + auto onDataPacketReceived = [this](ContentObject &content_object, + bool compute_stats) { + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object, compute_stats); + updateSyncWindow(); + }; + + // First verify the packet signature and apply the corresponding policy + auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec); + indexer_verifier_->applyPolicy(interest, content_object, false, policy); + + if (is_probe) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -650,7 +767,7 @@ void RTCTransportProtocol::onContentObjectReceived( return; } - if (payload_size == NACK_HEADER_SIZE) { + if (is_nack) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -659,57 +776,122 @@ void RTCTransportProtocol::onContentObjectReceived( return; } + // content_ptr will point either to the input data packet or to a manifest + // whose FEC header has been removed + if (is_manifest) { + manifest_ptr = removeFecHeader(content_object); + if (manifest_ptr) { + content_ptr = manifest_ptr.get(); + } + } + + // From there, the packet is either a FEC, a manifest or a data packet. DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; - bool compute_stats = true; + // Do not count timed out packets in stats auto tn_it = timeouts_or_nacks_.find(segment_number); if (tn_it != timeouts_or_nacks_.end()) { compute_stats = false; timeouts_or_nacks_.erase(tn_it); } - if (ldr_->isRtx(segment_number)) { + + // Do not count retransmissions or losses in stats + if (ldr_->isRtx(segment_number) || + ldr_->isPossibleLossWithNoRtx(segment_number)) { compute_stats = false; } - // check if the packet was already received - PacketState state = state_->isReceivedOrLost(segment_number); + // Fetch packet state + state = state_->getPacketState(segment_number); - if (state != PacketState::RECEIVED) { - // send packet to decoder - if (fec_decoder_) { - DLOG_IF(INFO, VLOG_IS_ON(4)) - << "send packet " << segment_number << " to FEC decoder"; - fec_decoder_->onDataPacket( - content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE); - } - if (!indexer_verifier_->isFec(segment_number)) { - // the packet may be alredy sent to the ap by the decoder, check again if - // it is already received - state = state_->isReceivedOrLost(segment_number); - if (state != PacketState::RECEIVED) { - DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number; + // Check if the packet is a retransmission + if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { + if (is_data || is_manifest) { + state_->onPacketRecoveredRtx(segment_number); - state_->onDataPacketReceived(content_object, compute_stats); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } - if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), content_object); - } - ec = make_error_code(protocol_error::success); + if (is_manifest) { + processManifest(interest, *content_ptr); } - } else { - DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number; - state_->onFecPacketReceived(content_object); + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + + // The packet is considered received, return early + onDataPacketReceived(*content_ptr, compute_stats); + return; } - } else { + + if (is_fec) { + state_->onFecPacketRecoveredRtx(segment_number); + } + } + + // Fetch packet state again; it may have changed + state = state_->getPacketState(segment_number); + + // Check if the packet was already received + if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received duplicated content " << segment_number << ", drop it"; ec = make_error_code(protocol_error::duplicated_content); + onDataPacketReceived(*content_ptr, compute_stats); + return; } - ldr_->onDataPacketReceived(content_object); - rc_->onDataPacketReceived(content_object); + if (!is_fec) { + state_->dataToBeReceived(segment_number); + } - updateSyncWindow(); + // Send packet to FEC decoder + if (fec_decoder_) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << segment_number << " to FEC decoder"; + + uint32_t offset = is_manifest + ? content_object.headerSize() + : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); + } + + // We can return early if FEC + if (is_fec) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number; + state_->onFecPacketReceived(content_object); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + // The packet may have been already sent to the app by the decoder, check + // again if it is already received + state = state_->getPacketState( + segment_number); // state == RECEIVED or TO_BE_RECEIVED + + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << (is_manifest ? "Received manifest " : "Received data ") + << segment_number; + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + state_->onDataPacketReceived(*content_ptr, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + } + + onDataPacketReceived(*content_ptr, compute_stats); } void RTCTransportProtocol::sendStatsToApp( @@ -729,33 +911,149 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateReceivedNacks(received_nacks); stats_->updateReceivedFEC(received_fec); - stats_->updateAverageWindowSize(current_sync_win_); - stats_->updateLossRatio(state_->getLossRate()); - stats_->updateAverageRtt(state_->getRTT()); + stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); + stats_->updateLossRatio(state_->getPerSecondLossRate()); + uint64_t rtt = state_->getAvgRTT(); + stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt)); + stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); stats_->updateDefinitelyLostData(definitely_lost); stats_->updateRecoveredData(recovered_losses); stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); (*stats_summary_)(*socket_->getInterface(), *stats_); + bool in_congestion = rc_->inCongestionState(); + stats_->updateCongestionState(in_congestion); + double residual_losses = state_->getResidualLossRate(); + stats_->updateResidualLossRate(residual_losses); + stats_->updateQualityScore(state_->getQualityScore()); + + // set alerts + if (rtt > MAX_RTT) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY); + + if (in_congestion) + stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION); + else + stats_->clearAlert( + interface::TransportStatistics::statsAlerts::CONGESTION); + + if (residual_losses > MAX_RESIDUAL_LOSSES) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES); } } -void RTCTransportProtocol::onFecPackets( - std::vector<std::pair<uint32_t, fec::buffer>> &packets) { +void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + for (auto &packet : packets) { - PacketState state = state_->isReceivedOrLost(packet.first); - if (state != PacketState::RECEIVED) { - state_->onPacketRecoveredFec(packet.first); - ldr_->onPacketRecoveredFec(packet.first); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Recovered packet " << packet.first << " through FEC."; - reassembly_->reassemble(*packet.second, packet.first); - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Packet" << packet.first << "already received."; + uint32_t seq_number = packet.getIndex(); + uint32_t metadata = packet.getMetadata(); + fec::buffer buffer = packet.getBuffer(); + + PayloadType payload_type = static_cast<PayloadType>(metadata); + switch (payload_type) { + case PayloadType::DATA: + case PayloadType::MANIFEST: + break; + case PayloadType::UNSPECIFIED: + default: + payload_type = PayloadType::DATA; + break; } + + switch (state_->getPacketState(seq_number)) { + case PacketState::RECEIVED: + case PacketState::TO_BE_RECEIVED: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet " << seq_number << " already received"; + break; + } + default: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << seq_number << " through FEC"; + + if (payload_type == PayloadType::MANIFEST) { + name->setSuffix(seq_number); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); + interest->setName(*name); + + auto content_object = toContentObject( + *name, format, payload_type, buffer->data(), buffer->length()); + + processManifest(*interest, *content_object); + } + + state_->onPacketRecoveredFec(seq_number, buffer->length()); + ldr_->onPacketRecoveredFec(seq_number); + + if (payload_type == PayloadType::DATA) { + verifier_->onDataRecoveredFec(seq_number); + reassembly_->reassemble(*buffer, seq_number); + } + + break; + } + } + } +} + +void RTCTransportProtocol::processManifest(Interest &interest, + ContentObject &manifest) { + auth::VerificationPolicy policy = verifier_->processManifest(manifest); + indexer_verifier_->applyPolicy(interest, manifest, false, policy); +} + +ContentObject::Ptr RTCTransportProtocol::removeFecHeader( + const ContentObject &content_object) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) { + return nullptr; } + + size_t fec_header_size = fec_decoder_->getFecHeaderSize(); + const uint8_t *payload = + content_object.data() + content_object.headerSize() + fec_header_size; + size_t payload_size = content_object.payloadSize() - fec_header_size; + + ContentObject::Ptr co = + toContentObject(content_object.getName(), content_object.getFormat(), + content_object.getPayloadType(), payload, payload_size); + + return co; +} + +ContentObject::Ptr RTCTransportProtocol::toContentObject( + const Name &name, Packet::Format format, PayloadType payload_type, + const uint8_t *payload, std::size_t payload_size, + std::size_t additional_header_size) { + // Recreate ContentObject + ContentObject::Ptr co = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + format, additional_header_size); + co->updateLength(payload_size); + co->append(payload_size); + co->trimStart(co->headerSize()); + + // Copy payload + std::memcpy(co->writableData(), payload, payload_size); + + // Restore network headers and some fields + co->prepend(co->headerSize()); + co->setName(name); + co->setPayloadType(payload_type); + + return co; } } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h index e6431264d..37706eb1c 100644 --- a/libtransport/src/protocols/rtc/rtc.h +++ b/libtransport/src/protocols/rtc/rtc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,10 +15,12 @@ #pragma once -#include <protocols/datagram_reassembly.h> +#include <protocols/rtc/rtc_forwarding_strategy.h> #include <protocols/rtc/rtc_ldr.h> #include <protocols/rtc/rtc_rc.h> +#include <protocols/rtc/rtc_reassembly.h> #include <protocols/rtc/rtc_state.h> +#include <protocols/rtc/rtc_verifier.h> #include <protocols/transport_protocol.h> #include <unordered_set> @@ -44,6 +46,8 @@ class RTCTransportProtocol : public TransportProtocol { std::size_t transportHeaderLength() override; + auto shared_from_this() { return utils::shared_from(this); } + private: enum class SyncState { catch_up = 0, in_sync = 1, last }; @@ -76,6 +80,7 @@ class RTCTransportProtocol : public TransportProtocol { void onPacketDropped(Interest &interest, ContentObject &content_object, const std::error_code &reason) override {} void onReassemblyFailed(std::uint32_t missing_segment) override {} + void processManifest(Interest &interest, ContentObject &manifest); // interaction with app functions void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes, @@ -84,11 +89,20 @@ class RTCTransportProtocol : public TransportProtocol { uint32_t received_nacks, uint32_t received_fec); // FEC functions - void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets); + void onFecPackets(fec::BufferArray &packets); + + // Utils + ContentObject::Ptr removeFecHeader(const ContentObject &content_object); + ContentObject::Ptr toContentObject(const Name &name, Packet::Format format, + PayloadType payload_type, + const uint8_t *payload, + std::size_t payload_size, + std::size_t additional_header_size = 0); // protocol state bool start_send_interest_; SyncState current_state_; + // cwin vars uint32_t current_sync_win_; uint32_t max_sync_win_; @@ -120,6 +134,10 @@ class RTCTransportProtocol : public TransportProtocol { std::shared_ptr<RTCState> state_; std::shared_ptr<RTCRateControl> rc_; std::shared_ptr<RTCLossDetectionAndRecovery> ldr_; + std::shared_ptr<RTCVerifier> verifier_; + + // forwarding strategy selection + RTCForwardingStrategy fwd_strategy_; uint32_t number_; }; diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h index d04bc1b1f..03efd8e84 100644 --- a/libtransport/src/protocols/rtc/rtc_consts.h +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -45,22 +45,22 @@ const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try // to aggregate interest in the // same packet -const uint32_t MAX_PACING_BATCH = 5; -// number of interest that we can send inside -// the loop before they get dropped by the -// kernel. +const uint32_t MAX_PACING_BATCH = 5; // number of interest that we can send + // inside the loop before they get dropped + // by the kernel. const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As // for MAX_PACING_BATCH this value was // computed during tests const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop // packet const -const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes const uint32_t RTC_INTEREST_LIFETIME = 2000; // probes sequence range const uint32_t MIN_PROBE_SEQ = 0xefffffff; -const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ; +const uint32_t MIN_INIT_PROBE_SEQ = MIN_PROBE_SEQ; +const uint32_t MAX_INIT_PROBE_SEQ = 0xf7ffffff - 1; +const uint32_t MIN_RTT_PROBE_SEQ = MAX_INIT_PROBE_SEQ + 1; const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1; // RTT_PROBE_INTERVAL will be used during the section while // INIT_RTT_PROBE_INTERVAL is used at the beginning to @@ -78,15 +78,16 @@ const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms const uint32_t MAX_PENDING_PROBES = 10; // congestion -const double MAX_QUEUING_DELAY = 100.0; // ms +const double MAX_QUEUING_DELAY = 50.0; // ms // data from cache const double MAX_DATA_FROM_CACHE = 0.25; // 25% // window const -const uint32_t INITIAL_WIN = 5; // pkts -const uint32_t INITIAL_WIN_MAX = 1000000; // pkts -const uint32_t WIN_MIN = 5; // pkts +const uint32_t INITIAL_WIN = 5; // pkts +const uint32_t INITIAL_WIN_MAX = 1000000; // pkts +const uint32_t WIN_MIN = 5; // pkts +const uint32_t WIN_MIN_WITH_AGGREGARED_DATA = 10; // pkts const double CATCH_UP_WIN_INCREMENT = 1.2; // used in rate control const double WIN_DECREASE_FACTOR = 0.5; @@ -105,10 +106,8 @@ const double MOVING_AVG_ALPHA = 0.8; const double MILLI_IN_A_SEC = 1000.0; const double MICRO_IN_A_SEC = 1000000.0; - -const double MAX_CACHED_PACKETS = 262144; // 2^18 - // about 50 sec of traffic at 50Mbps - // with 1200 bytes packets +const uint32_t ROUNDS_PER_SEC = (uint32_t)(MILLI_IN_A_SEC / ROUND_LEN); +const uint32_t ROUNDS_PER_MIN = (uint32_t)ROUNDS_PER_SEC * 60; const uint32_t MAX_ROUND_WHIOUT_PACKETS = (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds; @@ -120,12 +119,24 @@ const uint64_t MAX_TIMER_RTX = ~0; const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets const double CATCH_UP_RTT_INCREMENT = 1.2; +const double MAX_RESIDUAL_LOSS_RATE = 2.0; // % +const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5; // used by producer const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms -const uint32_t MIN_PRODUCTION_RATE = 10; // pps - // min prod rate - // set running several test +const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window * + // rounds in a second +const uint32_t NACK_DELAY = 1500; // ms +const uint32_t FEC_PACING_TIME = 5; // ms + +// aggregated data consts +const uint16_t MAX_RTC_PAYLOAD_SIZE = 1200; // bytes +const uint16_t MAX_AGGREGATED_PACKETS = 5; // pkt +const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms + +// alert thresholds +const uint32_t MAX_RTT = 200; // ms +const double MAX_RESIDUAL_LOSSES = 0.05; // % } // namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc index c098088a3..b3abf5ea8 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.cc +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,14 +13,17 @@ * limitations under the License. */ +#include <hicn/transport/utils/chrono_typedefs.h> #include <protocols/rtc/rtc_data_path.h> #include <stdlib.h> #include <algorithm> #include <cfloat> #include <chrono> +#include <cmath> #define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec +#define AVG_RTT_TIME 1000 // (ms) 1sec namespace transport { @@ -32,6 +35,8 @@ RTCDataPath::RTCDataPath(uint32_t path_id) : path_id_(path_id), min_rtt(UINT_MAX), prev_min_rtt(UINT_MAX), + max_rtt(0), + prev_max_rtt(0), min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the // real OWD, but the measured one, that depends on the // clock of sender and receiver. the only meaningful @@ -46,20 +51,46 @@ RTCDataPath::RTCDataPath(uint32_t path_id) largest_recv_seq_(0), largest_recv_seq_time_(0), avg_inter_arrival_(DBL_MAX), + rtt_sum_(0), + last_avg_rtt_compute_(0), + rtt_samples_(0), + avg_rtt_(0.0), received_nacks_(false), - received_packets_(false), + received_packets_(0), rounds_without_packets_(0), last_received_data_packet_(0), - RTT_history_(HISTORY_LEN), + min_RTT_history_(HISTORY_LEN), + max_RTT_history_(HISTORY_LEN), OWD_history_(HISTORY_LEN){}; -void RTCDataPath::insertRttSample(uint64_t rtt) { - // for the rtt we only keep track of the min one +void RTCDataPath::insertRttSample( + const utils::SteadyTime::Milliseconds& rtt_milliseconds, bool is_probe) { + // compute min rtt + uint64_t rtt = rtt_milliseconds.count(); if (rtt < min_rtt) min_rtt = rtt; - last_received_data_packet_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + + uint64_t now = utils::SteadyTime::nowMs().count(); + last_received_data_packet_ = now; + + // compute avg rtt + if (is_probe) { + // max rtt is computed only on probes to avoid to take into account the + // production time at the server + if (rtt > max_rtt) max_rtt = rtt; + + rtt_sum_ += rtt; + rtt_samples_++; + } + + if ((now - last_avg_rtt_compute_) >= AVG_RTT_TIME) { + // compute a new avg rtt + // if rtt_samples_ = 0 keep the same rtt + if (rtt_samples_ != 0) avg_rtt_ = (double)rtt_sum_ / (double)rtt_samples_; + + rtt_sum_ = 0; + rtt_samples_ = 0; + last_avg_rtt_compute_ = now; + } } void RTCDataPath::insertOwdSample(int64_t owd) { @@ -87,15 +118,13 @@ void RTCDataPath::insertOwdSample(int64_t owd) { // owd is computed only for valid data packets so we count only // this for decide if we recevie traffic or not - received_packets_ = true; + received_packets_++; } void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { // got packet in sequence, compute gap if (largest_recv_seq_ == (segment_number - 1)) { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t delta = now - largest_recv_seq_time_; largest_recv_seq_ = segment_number; largest_recv_seq_time_ = now; @@ -110,10 +139,7 @@ void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { // ooo packet, update the stasts if needed if (largest_recv_seq_ <= segment_number) { largest_recv_seq_ = segment_number; - largest_recv_seq_time_ = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + largest_recv_seq_time_ = utils::SteadyTime::nowMs().count(); } } @@ -146,10 +172,20 @@ void RTCDataPath::roundEnd() { min_rtt = prev_min_rtt; } + // same for max_rtt + if (max_rtt != 0) { + prev_max_rtt = max_rtt; + } else { + max_rtt = prev_max_rtt; + } + if (min_rtt == 0) min_rtt = 1; + if (max_rtt == 0) max_rtt = 1; - RTT_history_.pushBack(min_rtt); + min_RTT_history_.pushBack(min_rtt); + max_RTT_history_.pushBack(max_rtt); min_rtt = UINT_MAX; + max_rtt = 0; // do the same for min owd if (min_owd != INT_MAX) { @@ -163,32 +199,47 @@ void RTCDataPath::roundEnd() { min_owd = INT_MAX; } - if (!received_packets_) + if (received_packets_ == 0) rounds_without_packets_++; else rounds_without_packets_ = 0; - received_packets_ = false; + received_packets_ = 0; } uint32_t RTCDataPath::getPathId() { return path_id_; } -double RTCDataPath::getQueuingDealy() { return queuing_delay; } +double RTCDataPath::getQueuingDealy() { + if (queuing_delay == DBL_MAX) return 0; + return queuing_delay; +} uint64_t RTCDataPath::getMinRtt() { - if (RTT_history_.size() != 0) return RTT_history_.begin(); + if (min_RTT_history_.size() != 0) return min_RTT_history_.begin(); + return 0; +} + +uint64_t RTCDataPath::getAvgRtt() { return std::round(avg_rtt_); } + +uint64_t RTCDataPath::getMaxRtt() { + if (max_RTT_history_.size() != 0) return max_RTT_history_.begin(); return 0; } int64_t RTCDataPath::getMinOwd() { if (OWD_history_.size() != 0) return OWD_history_.begin(); - return 0; + return INT_MAX; } double RTCDataPath::getJitter() { return jitter_; } uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; } -void RTCDataPath::clearRtt() { RTT_history_.clear(); } +uint32_t RTCDataPath::getPacketsLastRound() { return received_packets_; } + +void RTCDataPath::clearRtt() { + min_RTT_history_.clear(); + max_RTT_history_.clear(); +} } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h index c5c37fc0d..5afbbb87f 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.h +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,7 +15,9 @@ #pragma once +#include <hicn/transport/utils/chrono_typedefs.h> #include <stdint.h> +#include <utils/max_filter.h> #include <utils/min_filter.h> #include <climits> @@ -34,19 +36,23 @@ class RTCDataPath { RTCDataPath(uint32_t path_id); public: - void insertRttSample(uint64_t rtt); + void insertRttSample(const utils::SteadyTime::Milliseconds &rtt, + bool is_probe); void insertOwdSample(int64_t owd); void computeInterArrivalGap(uint32_t segment_number); void receivedNack(); uint32_t getPathId(); uint64_t getMinRtt(); + uint64_t getAvgRtt(); + uint64_t getMaxRtt(); double getQueuingDealy(); double getInterArrivalGap(); double getJitter(); bool isActive(); bool pathToProducer(); uint64_t getLastPacketTS(); + uint32_t getPacketsLastRound(); void clearRtt(); @@ -60,6 +66,9 @@ class RTCDataPath { uint64_t min_rtt; uint64_t prev_min_rtt; + uint64_t max_rtt; + uint64_t prev_max_rtt; + int64_t min_owd; int64_t prev_min_owd; @@ -74,19 +83,26 @@ class RTCDataPath { uint64_t largest_recv_seq_time_; double avg_inter_arrival_; + // compute the avg rtt over one sec + uint64_t rtt_sum_; + uint64_t last_avg_rtt_compute_; + uint32_t rtt_samples_; + double avg_rtt_; + // flags to check if a path is active // we considere a path active if it reaches a producer //(not a cache) --aka we got at least one nack on this path-- // and if we receives packets bool received_nacks_; - bool received_packets_; - uint8_t rounds_without_packets_; // if we don't get any packet + uint32_t received_packets_; + uint32_t rounds_without_packets_; // if we don't get any packet // for MAX_ROUNDS_WITHOUT_PKTS // we consider the path inactive uint64_t last_received_data_packet_; // timestamp for the last data received // on this path - utils::MinFilter<uint64_t> RTT_history_; + utils::MinFilter<uint64_t> min_RTT_history_; + utils::MaxFilter<uint64_t> max_RTT_history_; utils::MinFilter<int64_t> OWD_history_; }; diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc new file mode 100644 index 000000000..9503eed3e --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/notification.h> +#include <protocols/rtc/rtc_forwarding_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace transport::interface; + +RTCForwardingStrategy::RTCForwardingStrategy() + : init_(false), + forwarder_set_(false), + selected_strategy_(NONE), + current_strategy_(NONE), + rounds_since_last_set_(0), + portal_(nullptr), + state_(nullptr) {} + +RTCForwardingStrategy::~RTCForwardingStrategy() {} + +void RTCForwardingStrategy::setCallback(interface::StrategyCallback* callback) { + callback_ = callback; +} + +void RTCForwardingStrategy::initFwdStrategy( + std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, + strategy_t strategy) { + init_ = true; + selected_strategy_ = strategy; + if (strategy == BOTH) + current_strategy_ = BEST_PATH; + else + current_strategy_ = strategy; + rounds_since_last_set_ = 0; + prefix_ = prefix; + portal_ = portal; + state_ = state; +} + +void RTCForwardingStrategy::checkStrategy() { + if (*callback_) { + strategy_t used_strategy = selected_strategy_; + if (used_strategy == BOTH) used_strategy = current_strategy_; + assert(used_strategy == BEST_PATH || used_strategy == REPLICATION || + used_strategy == NONE); + + notification::ForwardingStrategy strategy = + notification::ForwardingStrategy::NONE; + switch (used_strategy) { + case BEST_PATH: + strategy = notification::ForwardingStrategy::BEST_PATH; + break; + case REPLICATION: + strategy = notification::ForwardingStrategy::REPLICATION; + break; + default: + break; + } + + (*callback_)(strategy); + } + + if (!init_) return; + + if (selected_strategy_ == NONE) return; + + if (selected_strategy_ == BEST_PATH) { + checkStrategyBestPath(); + return; + } + + if (selected_strategy_ == REPLICATION) { + checkStrategyReplication(); + return; + } + + checkStrategyBoth(); +} + +void RTCForwardingStrategy::checkStrategyBestPath() { + if (!forwarder_set_) { + setStrategy(BEST_PATH); + forwarder_set_ = true; + return; + } + + uint8_t qs = state_->getQualityScore(); + + if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } + + // try to switch path + setStrategy(BEST_PATH); +} + +void RTCForwardingStrategy::checkStrategyReplication() { + if (!forwarder_set_) { + setStrategy(REPLICATION); + forwarder_set_ = true; + return; + } + + // here we have nothing to do for the moment + return; +} + +void RTCForwardingStrategy::checkStrategyBoth() { + if (!forwarder_set_) { + setStrategy(current_strategy_); + forwarder_set_ = true; + return; + } + + checkStrategyBestPath(); + + // TODO + // for the moment we use only best path. + // but later: + // 1. if both paths are bad use replication + // 2. while using replication compute the effectiveness. if the majority of + // the packets are coming from a single path, try to use bestpath +} + +void RTCForwardingStrategy::setStrategy(strategy_t strategy) { + rounds_since_last_set_ = 0; + current_strategy_ = strategy; + portal_->setForwardingStrategy(prefix_, + string_strategies_[current_strategy_]); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h new file mode 100644 index 000000000..821b28051 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/portal.h> +#include <hicn/transport/interfaces/callbacks.h> +#include <protocols/rtc/rtc_state.h> + +#include <array> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCForwardingStrategy { + public: + enum strategy_t { + BEST_PATH, + REPLICATION, + BOTH, + NONE, + }; + + RTCForwardingStrategy(); + ~RTCForwardingStrategy(); + + void initFwdStrategy(std::shared_ptr<core::Portal> portal, + core::Prefix& prefix, RTCState* state, + strategy_t strategy); + + void checkStrategy(); + void setCallback(interface::StrategyCallback* callback); + + private: + void checkStrategyBestPath(); + void checkStrategyReplication(); + void checkStrategyBoth(); + + void setStrategy(strategy_t strategy); + + std::array<std::string, 4> string_strategies_ = {"bestpath", "replication", + "both", "none"}; + + bool init_; // true if all val are initializes + bool forwarder_set_; // true if the strategy is been set at least + // once + strategy_t selected_strategy_; // this is the strategy selected using socket + // options. this can also be equal to BOTH + strategy_t current_strategy_; // if both strategies can be used this + // indicates the one that is currently in use + // that can be only replication or best path + uint32_t rounds_since_last_set_; + core::Prefix prefix_; + std::shared_ptr<core::Portal> portal_; + RTCState* state_; + interface::StrategyCallback* callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h index 4aee242bb..cda156b22 100644 --- a/libtransport/src/protocols/rtc/rtc_indexer.h +++ b/libtransport/src/protocols/rtc/rtc_indexer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -18,8 +18,10 @@ #include <protocols/errors.h> #include <protocols/fec_utils.h> #include <protocols/indexer.h> +#include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/transport_protocol.h> +#include <utils/suffix_strategy.h> #include <deque> @@ -54,7 +56,7 @@ class RtcIndexer : public Indexer { n_fec_ = 0; } - uint32_t checkNextSuffix() override { return next_suffix_; } + uint32_t checkNextSuffix() const override { return next_suffix_; } uint32_t getNextSuffix() override { if (isFec(next_suffix_)) { @@ -77,7 +79,7 @@ class RtcIndexer : public Indexer { first_suffix_ = suffix % LIMIT; } - uint32_t getFirstSuffix() override { return first_suffix_; } + uint32_t getFirstSuffix() const override { return first_suffix_; } uint32_t jumpToIndex(uint32_t index) override { next_suffix_ = index % LIMIT; @@ -87,30 +89,8 @@ class RtcIndexer : public Indexer { void onContentObject(core::Interest &interest, core::ContentObject &content_object, bool reassembly) override { - setVerifier(); - auto ret = verifier_->verifyPackets(&content_object); - - switch (ret) { - case auth::VerificationPolicy::ACCEPT: { - if (reassembly) { - reassembly_->reassemble(content_object); - } - break; - } - - case auth::VerificationPolicy::UNKNOWN: - case auth::VerificationPolicy::DROP: { - transport_->onPacketDropped( - interest, content_object, - make_error_code(protocol_error::verification_failed)); - break; - } - - case auth::VerificationPolicy::ABORT: { - transport_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } + if (reassembly) { + reassembly_->reassemble(content_object); } } @@ -120,13 +100,12 @@ class RtcIndexer : public Indexer { uint32_t getNextReassemblySegment() override { throw errors::RuntimeException( "Get reassembly segment called on rtc indexer. RTC indexer does not " - "provide " - "reassembly."); + "provide reassembly."); } bool isFinalSuffixDiscovered() override { return true; } - uint32_t getFinalSuffix() override { return LIMIT; } + uint32_t getFinalSuffix() const override { return LIMIT; } void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; } @@ -137,13 +116,13 @@ class RtcIndexer : public Indexer { n_current_fec_ = n_fec_; } - uint32_t getNFec() override { return n_fec_; } + uint32_t getNFec() const override { return n_fec_; } bool isFec(uint32_t index) override { return isFec(fec_type_, index, first_suffix_); } - double getFecOverhead() override { + double getFecOverhead() const override { if (fec_type_ == fec::FECType::UNKNOWN) { return 0; } @@ -152,7 +131,7 @@ class RtcIndexer : public Indexer { return (double)n_fec_ / k; } - double getMaxFecOverhead() override { + double getMaxFecOverhead() const override { if (fec_type_ == fec::FECType::UNKNOWN) { return 0; } diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index f0de48871..1ca1cf48d 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -16,6 +16,12 @@ #include <glog/logging.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_ldr.h> +#include <protocols/rtc/rtc_rs_delay.h> +#include <protocols/rtc/rtc_rs_fec_only.h> +#include <protocols/rtc/rtc_rs_low_rate.h> +#include <protocols/rtc/rtc_rs_recovery_off.h> +#include <protocols/rtc/rtc_rs_rtx_only.h> +#include <protocols/rtc/rtc_state.h> #include <algorithm> #include <unordered_set> @@ -27,146 +33,115 @@ namespace protocol { namespace rtc { RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( - Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service) - : rtx_on_(false), - fec_on_(false), - next_rtx_timer_(MAX_TIMER_RTX), - last_event_(0), - sentinel_timer_interval_(MAX_TIMER_RTX), - indexer_(indexer), - send_rtx_callback_(std::move(callback)) { - timer_ = std::make_unique<asio::steady_timer>(io_service); - sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service); + Indexer *indexer, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies type, + RecoveryStrategy::SendRtxCallback &&callback, + interface::StrategyCallback *external_callback) { + rs_type_ = type; + if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { + rs_ = std::make_shared<RecoveryStrategyRecoveryOff>( + indexer, std::move(callback), io_service, external_callback); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + rs_ = std::make_shared<RecoveryStrategyDelayBased>( + indexer, std::move(callback), io_service, external_callback); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + rs_ = std::make_shared<RecoveryStrategyFecOnly>( + indexer, std::move(callback), io_service, external_callback); + } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + rs_ = std::make_shared<RecoveryStrategyLowRate>( + indexer, std::move(callback), io_service, external_callback); + } else { + // default + rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY; + rs_ = std::make_shared<RecoveryStrategyRtxOnly>( + indexer, std::move(callback), io_service, external_callback); + } } RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} -void RTCLossDetectionAndRecovery::turnOnRTX() { - rtx_on_ = true; - scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT); -} - -void RTCLossDetectionAndRecovery::turnOffRTX() { - rtx_on_ = false; - clear(); -} - -uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) { - uint32_t current_fec = indexer_->getNFec(); - double current_loss_rate = state_->getLossRate(); - double last_loss_rate = state_->getLastRoundLossRate(); - - // when in sync ask for fec only if there are losses for 2 rounds - if (in_sync && current_fec == 0 && - (current_loss_rate == 0 || last_loss_rate == 0)) - return 0; - - double loss_rate = state_->getMaxLossRate() * 1.5; - - if (!in_sync && loss_rate == 0) loss_rate = 0.05; - if (loss_rate > 0.5) loss_rate = 0.5; - - double exp_losses = (double)k_ * loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate)); - - if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_; - - return fec_to_ask; +void RTCLossDetectionAndRecovery::changeRecoveryStrategy( + interface::RtcTransportRecoveryStrategies type) { + if (type == rs_type_) return; + + rs_type_ = type; + if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { + rs_ = + std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + rs_ = std::make_shared<RecoveryStrategyLowRate>(std::move(*(rs_.get()))); + } else { + // default + rs_ = std::make_shared<RecoveryStrategyRtxOnly>(std::move(*(rs_.get()))); + } } void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) { - uint64_t rtt = state_->getRTT(); - if (!fec_on_ && rtt >= 100) { - // turn on fec, here we may have no info so ask for all packets - fec_on_ = true; - turnOffRTX(); - indexer_->setNFec(computeFecPacketsToAsk(in_sync)); - return; - } - - if (fec_on_ && rtt > 80) { - // keep using fec, maybe update it - indexer_->setNFec(computeFecPacketsToAsk(in_sync)); - return; - } - - if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { - // turn on rtx - fec_on_ = false; - indexer_->setNFec(0); - turnOnRTX(); - return; - } + rs_->incRoundId(); + rs_->onNewRound(in_sync); } -void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { - // always add timeouts to the RTX list to avoid to send the same packet as if - // it was not a rtx - addToRetransmissions(seq, seq + 1); - last_event_ = getNow(); +void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) { + if (!lost) { + detectLoss(seq, seq + 1); + } else { + rs_->onLostTimeout(seq); + } } void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { - // if an RTX is scheduled for a packet recovered using FEC delete it - deleteRtx(seq); - recover_with_fec_.erase(seq); + rs_->receivedPacket(seq); } void RTCLossDetectionAndRecovery::onDataPacketReceived( const core::ContentObject &content_object) { - last_event_ = getNow(); - uint32_t seq = content_object.getName().getSuffix(); - if (deleteRtx(seq)) { - state_->onPacketRecoveredRtx(seq); - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "received data. add from " - << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq; - addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq); - } + bool is_rtx = rs_->isRtx(seq); + rs_->receivedPacket(seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received data. add from " + << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq; + if (!is_rtx) + detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq); } void RTCLossDetectionAndRecovery::onNackPacketReceived( const core::ContentObject &nack) { - last_event_ = getNow(); - - uint32_t seq = nack.getName().getSuffix(); - struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); - uint32_t production_seq = nack_pkt->getProductionSegement(); + uint32_t production_seq = nack_pkt->getProductionSegment(); + uint32_t seq = nack.getName().getSuffix(); - if (production_seq > seq) { - // this is a past nack, all data before productionSeq are lost. if - // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to - // recover any packet. If this is not the case we can try to recover the - // packets between state_->getHighestSeqReceivedInOrder() and productionSeq. - // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with - // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and - // 14 that are not arrived yet - deleteRtx(seq); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from " - << state_->getHighestSeqReceivedInOrder() + 1 - << " to " << production_seq; - addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, - production_seq); - } else { - // future nack. here there should be a gap between the last data received - // and this packet and is it possible to recover the packets between the - // last received data and the production seq. we should not use the seq - // number of the nack since we know that is too early to ask for this seq - // number - // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack - // with productionSeq = 18. this says that all the packets between 12 and 18 - // may got lost and we should ask them - deleteRtx(seq); - DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from " - << state_->getHighestSeqReceivedInOrder() + 1 - << " to " << production_seq; - addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, - production_seq); - } + // received a nack. we can try to recover all data packets between the last + // received data and the production seq in the nack. this is similar to the + // recption of a probe + // e.g.: the client receives packets 10 11 12 20 where 20 is a nack + // with productionSeq = 18. this says that all the packets between 12 and 18 + // may got lost and we should ask them + + rs_->receivedPacket(seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received nack. add from " + << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " + << production_seq; + detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + production_seq); } void RTCLossDetectionAndRecovery::onProbePacketReceived( @@ -174,336 +149,38 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived( // we don't log the reception of a probe packet for the sentinel timer because // probes are not taken into account into the sync window. we use them as // future nacks to detect possible packets lost - struct nack_packet_t *probe_pkt = - (struct nack_packet_t *)probe.getPayload()->data(); - uint32_t production_seq = probe_pkt->getProductionSegement(); + + uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from " - << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq; + << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " + << production_seq; - addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, - production_seq); + detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + production_seq); } -void RTCLossDetectionAndRecovery::clear() { - rtx_state_.clear(); - rtx_timers_.clear(); - sentinel_timer_->cancel(); - if (next_rtx_timer_ != MAX_TIMER_RTX) { - next_rtx_timer_ = MAX_TIMER_RTX; - timer_->cancel(); - } -} +void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) { + if (start >= stop) return; -void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start, - uint32_t stop) { // skip nacked packets - if (start <= state_->getLastSeqNacked()) { - start = state_->getLastSeqNacked() + 1; + if (start <= rs_->getState()->getLastSeqNacked()) { + start = rs_->getState()->getLastSeqNacked() + 1; } // skip received or lost packets - if (start <= state_->getHighestSeqReceivedInOrder()) { - start = state_->getHighestSeqReceivedInOrder() + 1; + if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) { + start = rs_->getState()->getHighestSeqReceivedInOrder() + 1; } for (uint32_t seq = start; seq < stop; seq++) { - if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { - if (rtx_on_) { - if (!indexer_->isFec(seq)) { - // handle it with rtx - if (!isRtx(seq)) { - state_->onLossDetected(seq); - rtxState state; - state.first_send_ = state_->getInterestSentTime(seq); - if (state.first_send_ == 0) // this interest was never sent before - state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); - state.rtx_count_ = 0; - DLOG_IF(INFO, VLOG_IS_ON(4)) - << "Add " << seq << " to retransmissions. next rtx is %lu " - << state.next_send_ - getNow(); - rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); - rtx_timers_.insert( - std::pair<uint64_t, uint32_t>(state.next_send_, seq)); - } - } else { - // is fec, do not send it - auto it = recover_with_fec_.find(seq); - if (it == recover_with_fec_.end()) { - state_->onLossDetected(seq); - recover_with_fec_.insert(seq); - } - } - } else { - // keep track of losses but recover with FEC - auto it = recover_with_fec_.find(seq); - if (it == recover_with_fec_.end()) { - state_->onLossDetected(seq); - recover_with_fec_.insert(seq); - } - } - } - } - scheduleNextRtx(); -} - -uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, - bool new_rtx) { - uint64_t now = getNow(); - if (new_rtx) { - // for the new rtx we wait one estimated IAT after the loss detection. this - // is bacause, assuming that packets arrive with a constant IAT, we should - // get a new packet every IAT - double prod_rate = state_->getProducerRate(); - uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; - uint32_t jitter = 0; - - if (prod_rate != 0) { - double packet_size = state_->getAveragePacketSize(); - estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - jitter = ceil(state_->getJitter()); - } - - uint32_t wait = estimated_iat + jitter; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "first rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat - << " jttr = " << jitter; - - return now + wait; - } else { - // wait one RTT - // however if the IAT is larger than the RTT, wait one IAT - uint32_t wait = SENTINEL_TIMER_INTERVAL; - - double prod_rate = state_->getProducerRate(); - if (prod_rate == 0) { - return now + SENTINEL_TIMER_INTERVAL; - } - - double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - - uint64_t rtt = state_->getRTT(); - if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = rtt; - - if (estimated_iat > rtt) wait = estimated_iat; - - uint32_t jitter = ceil(state_->getJitter()); - wait += jitter; - - // it may happen that the channel is congested and we have some additional - // queuing delay to take into account - uint32_t queue = ceil(state_->getQueuing()); - wait += queue; - - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "next rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat - << " jttr = " << jitter << " queue = " << queue; - - return now + wait; - } -} - -void RTCLossDetectionAndRecovery::retransmit() { - if (rtx_timers_.size() == 0) return; - - uint64_t now = getNow(); - - auto it = rtx_timers_.begin(); - std::unordered_set<uint32_t> lost_pkt; - uint32_t sent_counter = 0; - while (it != rtx_timers_.end() && it->first <= now && - sent_counter < MAX_RTX_IN_BATCH) { - uint32_t seq = it->second; - auto rtx_it = - rtx_state_.find(seq); // this should always return a valid iter - if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX || - (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || - seq < state_->getLastSeqNacked()) { - // max rtx reached or packet too old or packet nacked, this packet is lost - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "packet " << seq << " lost because 1) max rtx: " - << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: " - << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE) - << " 3) nacked: " << (seq < state_->getLastSeqNacked()); - lost_pkt.insert(seq); - it++; - } else { - // resend the packet - state_->onRetransmission(seq); - double prod_rate = state_->getProducerRate(); - if (prod_rate != 0) rtx_it->second.rtx_count_++; - rtx_it->second.next_send_ = computeNextSend(seq, false); - it = rtx_timers_.erase(it); - rtx_timers_.insert( - std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "send rtx for sequence " << seq << ", next send in " - << (rtx_it->second.next_send_ - now); - send_rtx_callback_(seq); - sent_counter++; - } - } - - // remove packets if needed - for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) { - uint32_t seq = *lost_it; - state_->onPacketLost(seq); - deleteRtx(seq); - } -} - -void RTCLossDetectionAndRecovery::scheduleNextRtx() { - if (rtx_timers_.size() == 0) { - // all the rtx were removed, reset timer - next_rtx_timer_ = MAX_TIMER_RTX; - return; - } - - // check if timer is alreay set - if (next_rtx_timer_ != MAX_TIMER_RTX) { - // a new check for rtx is already scheduled - if (next_rtx_timer_ > rtx_timers_.begin()->first) { - // we need to re-schedule it - timer_->cancel(); - } else { - // wait for the next timer - return; - } - } - - // set a new timer - next_rtx_timer_ = rtx_timers_.begin()->first; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - uint64_t wait = 1; - if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now) - wait = next_rtx_timer_ - now; - - std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this()); - timer_->expires_from_now(std::chrono::milliseconds(wait)); - timer_->async_wait([self](std::error_code ec) { - if (ec) return; - if (auto s = self.lock()) { - s->retransmit(); - s->next_rtx_timer_ = MAX_TIMER_RTX; - s->scheduleNextRtx(); - } - }); -} - -bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) { - auto it_rtx = rtx_state_.find(seq); - if (it_rtx == rtx_state_.end()) return false; // rtx not found - - uint64_t ts = it_rtx->second.next_send_; - auto it_timers = rtx_timers_.find(ts); - while (it_timers != rtx_timers_.end() && it_timers->first == ts) { - if (it_timers->second == seq) { - rtx_timers_.erase(it_timers); - break; - } - it_timers++; - } - - bool lost = it_rtx->second.rtx_count_ > 0; - rtx_state_.erase(it_rtx); - - return lost; -} - -void RTCLossDetectionAndRecovery::scheduleSentinelTimer( - uint64_t expires_from_now) { - std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this()); - sentinel_timer_->expires_from_now( - std::chrono::milliseconds(expires_from_now)); - sentinel_timer_->async_wait([self](std::error_code ec) { - if (ec) return; - if (auto s = self.lock()) { - s->sentinelTimer(); - } - }); -} - -void RTCLossDetectionAndRecovery::sentinelTimer() { - uint64_t now = getNow(); - - bool expired = false; - bool sent = false; - if ((now - last_event_) >= sentinel_timer_interval_) { - // at least a sentinel_timer_interval_ elapsed since last event - expired = true; - if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { - // this happens at the beginning (or if the producer stops for some - // reason) we need to keep sending interest 0 until we get an answer - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "sentinel timer: the producer is not active, send packet 0"; - state_->onRetransmission(0); - send_rtx_callback_(0); - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, " - "send the 10 oldest packets"; - sent = true; - uint32_t rtx = 0; - auto it = state_->getPendingInterestsMapBegin(); - auto end = state_->getPendingInterestsMapEnd(); - while (it != end && rtx < MAX_RTX_WITH_SENTINEL) { - uint32_t seq = it->first; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "sentinel timer, add " << seq << " to the rtx list"; - addToRetransmissions(seq, seq + 1); - rtx++; - it++; - } - } - } else { - // sentinel timer did not expire because we registered at least one event - } - - uint32_t next_timer; - double prod_rate = state_->getProducerRate(); - if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL; - next_timer = SENTINEL_TIMER_INTERVAL; - } else { - double prod_rate = state_->getProducerRate(); - double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - uint32_t jitter = ceil(state_->getJitter()); - - // try to reduce the number of timers if the estimated IAT is too small - next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "next sentinel in " << next_timer - << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0) - << ", iat: " << estimated_iat << ", jitter: " << jitter; - - if (!expired) { - // discount the amout of time that is already passed - uint32_t discount = now - last_event_; - if (next_timer > discount) { - next_timer = next_timer - discount; - } else { - // in this case we trigger the timer in 1 ms - next_timer = 1; + if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) { + if (rs_->lossDetected(seq)) { + rs_->getState()->onLossDetected(seq); } - DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer; - } else if (sent) { - // wait at least one producer stats interval + owd to check if the - // production rate is reducing. - uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing()); - next_timer = std::max(next_timer, min_wait); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "wait for updates from prod, next timer: " << next_timer; } } - - scheduleSentinelTimer(next_timer); } } // namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h index 1b9f9afd6..e7f8ce5db 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.h +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,16 +15,14 @@ #pragma once #include <hicn/transport/config.h> +#include <hicn/transport/interfaces/socket_options_keys.h> +// RtcTransportRecoveryStrategies #include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> -#include <protocols/indexer.h> -#include <protocols/rtc/rtc_consts.h> -#include <protocols/rtc/rtc_state.h> +#include <protocols/rtc/rtc_recovery_strategy.h> #include <functional> -#include <map> -#include <unordered_map> namespace transport { @@ -34,91 +32,45 @@ namespace rtc { class RTCLossDetectionAndRecovery : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> { - struct rtx_state_ { - uint64_t first_send_; - uint64_t next_send_; - uint32_t rtx_count_; - }; - - using rtxState = struct rtx_state_; - using SendRtxCallback = std::function<void(uint32_t)>; - public: - RTCLossDetectionAndRecovery(Indexer *indexer, SendRtxCallback &&callback, - asio::io_service &io_service); + RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies type, + RecoveryStrategy::SendRtxCallback &&callback, + interface::StrategyCallback *external_callback); ~RTCLossDetectionAndRecovery(); - void setState(std::shared_ptr<RTCState> state) { state_ = state; } - void setFecParams(uint32_t n, uint32_t k) { - n_ = n; - k_ = k; + void setState(RTCState *state) { rs_->setState(state); } + void setRateControl(RTCRateControl *rateControl) { + rs_->setRateControl(rateControl); } - void turnOnRTX(); - void turnOffRTX(); - bool isRtxOn() { return rtx_on_; } + + void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); } + + void turnOnRecovery() { rs_->tunrOnRecovery(); } + bool isRtxOn() { return rs_->isRtxOn(); } + + void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type); void onNewRound(bool in_sync); - void onTimeout(uint32_t seq); + void onTimeout(uint32_t seq, bool lost); void onPacketRecoveredFec(uint32_t seq); void onDataPacketReceived(const core::ContentObject &content_object); void onNackPacketReceived(const core::ContentObject &nack); void onProbePacketReceived(const core::ContentObject &probe); - void clear(); + void clear() { rs_->clear(); } - bool isRtx(uint32_t seq) { - if (rtx_state_.find(seq) != rtx_state_.end()) return true; - return false; + bool isRtx(uint32_t seq) { return rs_->isRtx(seq); } + bool isPossibleLossWithNoRtx(uint32_t seq) { + return rs_->isPossibleLossWithNoRtx(seq); } private: - void addToRetransmissions(uint32_t start, uint32_t stop); - uint64_t computeNextSend(uint32_t seq, bool new_rtx); - void retransmit(); - void scheduleNextRtx(); - bool deleteRtx(uint32_t seq); - void scheduleSentinelTimer(uint64_t expires_from_now); - void sentinelTimer(); - uint32_t computeFecPacketsToAsk(bool in_sync); - - uint64_t getNow() { - using namespace std::chrono; - uint64_t now = - duration_cast<milliseconds>(steady_clock::now().time_since_epoch()) - .count(); - return now; - } - - // this map keeps track of the retransmitted interest, ordered from the oldest - // to the newest one. the state contains the timer of the first send of the - // interest (from pendingIntetests_), the timer of the next send (key of the - // multimap) and the number of rtx - std::map<uint32_t, rtxState> rtx_state_; - // this map stored the rtx by timer. The key is the time at which the rtx - // should be sent, and the val is the interest seq number - std::multimap<uint64_t, uint32_t> rtx_timers_; - - // lost packets that will be recovered with fec - std::unordered_set<uint32_t> recover_with_fec_; - - bool rtx_on_; - bool fec_on_; - uint64_t next_rtx_timer_; - uint64_t last_event_; - uint64_t sentinel_timer_interval_; - - // fec params - uint32_t n_; - uint32_t k_; - - std::unique_ptr<asio::steady_timer> timer_; - std::unique_ptr<asio::steady_timer> sentinel_timer_; - std::shared_ptr<RTCState> state_; - - Indexer *indexer_; + void detectLoss(uint32_t start, uint32_t stop); - SendRtxCallback send_rtx_callback_; + interface::RtcTransportRecoveryStrategies rs_type_; + std::shared_ptr<RecoveryStrategy> rs_; }; } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h index 7dc2f82c3..391aedfc6 100644 --- a/libtransport/src/protocols/rtc/rtc_packet.h +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -24,6 +24,27 @@ * +-----------------------------------------+ */ +/* aggregated packets + * +---------------------------------+ + * |c| #pkts | len1 | len2 | .... | + * +---------------------------------- + * + * +---------------------------------+ + * |c| #pkts | resv | len 1 | + * +---------------------------------- + * + * aggregated packets header. + * header position. just after the data packet header + * + * c: 1 bit: 0 8bit encoding, 1 16bit encoding + * #pkts: 7 bits: number of application packets contained + * 8bits encoding: + * lenX: 8 bits: len in bites of packet X + * 16bits econding: + * resv: 8 bits: reserved field (unused) + * lenX: 16bits: len in bytes of packet X + */ + #pragma once #ifndef _WIN32 #include <arpa/inet.h> @@ -31,6 +52,8 @@ #include <hicn/transport/portability/win_portability.h> #endif +#include <cstring> + namespace transport { namespace protocol { @@ -82,8 +105,144 @@ struct nack_packet_t { inline uint32_t getProductionRate() const { return ntohl(prod_rate); } inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } - inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } - inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } + inline uint32_t getProductionSegment() const { return ntohl(prod_seg); } + inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); } +}; + +class AggrPktHeader { + public: + // XXX buf always point to the payload after the data header + AggrPktHeader(uint8_t *buf, uint16_t max_packet_len, uint16_t pkt_number) + : buf_(buf), pkt_num_(pkt_number) { + *buf_ = 0; // reset the first byte to correctly add the header + // encoding and the packet number + if (max_packet_len > 0xff) { + setAggrPktEncoding16bit(); + } else { + setAggrPktEncoding8bit(); + } + setAggrPktNUmber(pkt_number); + header_len_ = computeHeaderLen(); + memset(buf_ + 1, 0, header_len_ - 1); + } + + // XXX buf always point to the payload after the data header + AggrPktHeader(uint8_t *buf) : buf_(buf) { + encoding_ = getAggrPktEncoding(); + pkt_num_ = getAggrPktNumber(); + header_len_ = computeHeaderLen(); + } + + ~AggrPktHeader(){}; + + int addPacketToHeader(uint8_t index, uint16_t len) { + if (index > pkt_num_) return -1; + + setAggrPktLen(index, len); + return 0; + } + + int getPointerToPacket(uint8_t index, uint8_t **pkt_ptr, uint16_t *pkt_len) { + if (index > pkt_num_) return -1; + + uint16_t len = 0; + for (int i = 0; i < index; i++) + len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1 + + uint16_t offset = len + header_len_; + *pkt_ptr = buf_ + offset; + *pkt_len = getAggrPktLen(index); + return 0; + } + + int getPacketOffsets(uint8_t index, uint16_t *pkt_offset, uint16_t *pkt_len) { + if (index > pkt_num_) return -1; + + uint16_t len = 0; + for (int i = 0; i < index; i++) + len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1 + + uint16_t offset = len + header_len_; + *pkt_offset = offset; + *pkt_len = getAggrPktLen(index); + + return 0; + } + + uint8_t *getPayloadAppendPtr() { return buf_ + header_len_; } + + uint16_t getHeaderLen() { return header_len_; } + + uint8_t getNumberOfPackets() { return pkt_num_; } + + private: + inline uint16_t computeHeaderLen() const { + uint16_t len = 4; // min len in bytes + if (!encoding_) { + while (pkt_num_ >= len) { + len += 4; + } + } else { + while (pkt_num_ * 2 >= len) { + len += 4; + } + } + return len; + } + + inline uint8_t getAggrPktEncoding() const { + // get the first bit of the first byte + return (*buf_ >> 7); + } + + inline void setAggrPktEncoding8bit() { + // reset the first bit of the first byte + encoding_ = 0; + *buf_ &= 0x7f; + } + + inline void setAggrPktEncoding16bit() { + // set the first bit of the first byte + encoding_ = 1; + *buf_ ^= 0x80; + } + + inline uint8_t getAggrPktNumber() const { + // return the first byte with the first bit = 0 + return (*buf_ & 0x7f); + } + + inline void setAggrPktNUmber(uint8_t val) { + // set the val without modifying the first bit + *buf_ &= 0x80; // reset everithing but the first bit + val &= 0x7f; // reset the first bit + *buf_ |= val; // or the vals, done! + } + + inline uint16_t getAggrPktLen(uint8_t pkt_index) const { + pkt_index++; + if (!encoding_) { // 8 bits + return (uint16_t) * (buf_ + pkt_index); + } else { // 16 bits + uint16_t *buf_16 = (uint16_t *)buf_; + return ntohs(*(buf_16 + pkt_index)); + } + } + + inline void setAggrPktLen(uint8_t pkt_index, uint16_t len) { + pkt_index++; + if (!encoding_) { // 8 bits + *(buf_ + pkt_index) = (uint8_t)len; + } else { // 16 bits + uint16_t *buf_16 = (uint16_t *)buf_; + *(buf_16 + pkt_index) = htons(len); + } + } + + uint8_t *buf_; + uint8_t encoding_; + uint8_t pkt_num_; + uint16_t header_len_; }; } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h index 34d090092..62636ce40 100644 --- a/libtransport/src/protocols/rtc/rtc_rc.h +++ b/libtransport/src/protocols/rtc/rtc_rc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -34,11 +34,15 @@ class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> { void turnOnRateControl() { rc_on_ = true; } void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; }; - uint32_t getCongesionWindow() { return congestion_win_; }; + uint32_t getCongestionWindow() { return congestion_win_; }; + bool inCongestionState() { + if (congestion_state_ == CongestionState::Congested) return true; + return false; + } virtual void onNewRound(double round_len) = 0; - virtual void onDataPacketReceived( - const core::ContentObject &content_object) = 0; + virtual void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats) = 0; protected: enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last }; diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc new file mode 100644 index 000000000..6cd3094b5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlCongestionDetection::RTCRateControlCongestionDetection() + : rounds_without_congestion_(4), last_queue_(0) {} // must be > 3 + +RTCRateControlCongestionDetection::~RTCRateControlCongestionDetection() {} + +void RTCRateControlCongestionDetection::onNewRound(double round_len) { + if (!rc_on_) return; + + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + if (last_queue_ == queue) { + // if last_queue == queue the consumer didn't receive any + // packet from the producer. we do not change the current congestion state. + // we just increase the counter of rounds whithout congestion if needed + // (in case of congestion the counter is already set to 0) + if (congestion_state_ == CongestionState::Normal) + rounds_without_congestion_++; + } else { + if (queue > MAX_QUEUING_DELAY) { + // here we detect congestion. + congestion_state_ = CongestionState::Congested; + rounds_without_congestion_ = 0; + } else { + // wait 3 rounds before switch back to no congestion + if (rounds_without_congestion_ > 3) { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + } + rounds_without_congestion_++; + } + last_queue_ = queue; + } +} + +void RTCRateControlCongestionDetection::onDataPacketReceived( + const core::ContentObject &content_object, bool compute_stats) { + // nothing to do + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h new file mode 100644 index 000000000..9afa6c39a --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlCongestionDetection : public RTCRateControl { + public: + RTCRateControlCongestionDetection(); + + ~RTCRateControlCongestionDetection(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + uint32_t rounds_without_congestion_; + double last_queue_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.cc b/libtransport/src/protocols/rtc/rtc_rc_iat.cc new file mode 100644 index 000000000..f06f377f3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_iat.cc @@ -0,0 +1,287 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_iat.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlIAT::RTCRateControlIAT() + : rounds_since_last_drop_(0), + rounds_without_congestion_(0), + rounds_with_congestion_(0), + last_queue_(0), + last_rcv_time_(0), + last_prod_time_(0), + last_seq_number_(0), + target_rate_avg_(0), + round_index_(0), + congestion_cause_(CongestionCause::UNKNOWN) {} + +RTCRateControlIAT::~RTCRateControlIAT() {} + +void RTCRateControlIAT::onNewRound(double round_len) { + if (!rc_on_) return; + + double received_rate = protocol_state_->getReceivedRate() + + protocol_state_->getRecoveredFecRate(); + + double target_rate = + protocol_state_->getProducerRate(); // * PRODUCTION_RATE_FRACTION; + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; + // double packet_size = protocol_state_->getAveragePacketSize(); + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + CongestionState prev_congestion_state = congestion_state_; + + target_rate_avg_ = target_rate_avg_ * (1 - MOVING_AVG_ALPHA) + + target_rate * MOVING_AVG_ALPHA; + + if (prev_congestion_state == CongestionState::Congested) { + if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) { + congestion_state_ = CongestionState::Congested; + + received_rate_.push_back(received_rate); + target_rate_.push_back(target_rate); + + // We assume the cause does not change + // Note that the first assumption about the cause could be wrong + // the cause of congestion could change + if (congestion_cause_ == CongestionCause::UNKNOWN) + if (rounds_with_congestion_ >= 1) + congestion_cause_ = apply_classification_tree( + rounds_with_congestion_ > ROUND_TO_WAIT_FORCE_DECISION); + + rounds_with_congestion_++; + } else { + congestion_state_ = CongestionState::Normal; + + // clear past history + reset_congestion_statistics(); + + // TODO maybe we can use some of these values for the stdev of the + // congestion mode + for (int i = 0; i < ROUND_HISTORY_SIZE; i++) { + iat_on_hold_[i].clear(); + } + } + } else if (queue > MAX_QUEUING_DELAY) { + if (prev_congestion_state == CongestionState::Normal) { + rounds_with_congestion_ = 0; + + if (rounds_without_congestion_ > ROUND_TO_RESET_CAUSE) + congestion_cause_ = CongestionCause::UNKNOWN; + } + congestion_state_ = CongestionState::Congested; + received_rate_.push_back(received_rate); + target_rate_.push_back(target_rate); + } else { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + reset_congestion_statistics(); + + int past_index = (round_index_ + 1) % ROUND_HISTORY_SIZE; + for (std::vector<double>::iterator it = iat_on_hold_[past_index].begin(); + it != iat_on_hold_[past_index].end(); ++it) { + congestion_free_iat_.push_back(*it); + if (congestion_free_iat_.size() > 50) { + congestion_free_iat_.erase(congestion_free_iat_.begin()); + } + } + iat_on_hold_[past_index].clear(); + round_index_ = (round_index_ + 1) % ROUND_HISTORY_SIZE; + } + + last_queue_ = queue; + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // init the congetion window using the received rate + // disabling for the moment the congestion window setup + // congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); + rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1; + } + + if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { + // disabling for the moment the congestion window setup + // uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + // congestion_win_ = std::max(win, WIN_MIN); + rounds_since_last_drop_ = 0; + return; + } + + rounds_since_last_drop_++; + } + + if (congestion_state_ == CongestionState::Normal) { + if (prev_congestion_state == CongestionState::Congested) { + rounds_without_congestion_ = 0; + } + + rounds_without_congestion_++; + if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; + + // disabling for the moment the congestion window setup + // congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR; + // congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); + } + + if (received_rate_.size() > 1000) + received_rate_.erase(received_rate_.begin()); + if (target_rate_.size() > 1000) target_rate_.erase(target_rate_.begin()); +} + +void RTCRateControlIAT::onDataPacketReceived( + const core::ContentObject &content_object, bool compute_stats) { + core::ParamsRTC params = RTCState::getDataParams(content_object); + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint32_t segment_number = content_object.getName().getSuffix(); + + if (segment_number == (last_seq_number_ + 1) && compute_stats) { + uint64_t iat = now - last_rcv_time_; + uint64_t ist = params.timestamp - last_prod_time_; + if (now >= last_rcv_time_ && params.timestamp > last_prod_time_) { + if (iat >= ist && ist < MIN_IST_VALUE) { + if (congestion_state_ == CongestionState::Congested) { + iat_.push_back((iat - ist)); + } else { + // no congestion, but we do not always add new values, but only when + // there is no sign of congestion + double queue = protocol_state_->getQueuing(); + if (queue <= CONGESTION_FREE_QUEUEING_DELAY) { + iat_on_hold_[round_index_].push_back((iat - ist)); + } + } + } + } + } + + last_seq_number_ = segment_number; + last_rcv_time_ = now; + last_prod_time_ = params.timestamp; + + if (iat_.size() > 1000) iat_.erase(iat_.begin()); + return; +} + +CongestionCause RTCRateControlIAT::apply_classification_tree(bool force_reply) { + if (iat_.size() <= 2 || received_rate_.size() < 2) + return CongestionCause::UNKNOWN; + + double received_ratio = 0; + double iat_ratio = 0; + double iat_stdev = compute_iat_stdev(iat_); + double iat_congestion_free_stdev = compute_iat_stdev(congestion_free_iat_); + + double iat_avg = 0.0; + + double recv_avg = 0.0; + double recv_max = 0.0; + + double target_rate_avg = 0.0; + + int counter = 0; + std::vector<double>::reverse_iterator target_it = target_rate_.rbegin(); + for (std::vector<double>::reverse_iterator it = received_rate_.rbegin(); + it != received_rate_.rend(); ++it) { + recv_avg += *it; + target_rate_avg += *target_it; + if (counter < ROUND_HISTORY_SIZE) + if (recv_max < *it) { + recv_max = *it; // we consider only the last 2 seconds + } + counter++; + target_it++; + } + recv_avg = recv_avg / received_rate_.size(); + target_rate_avg = target_rate_avg / target_rate_.size(); + + for (std::vector<double>::iterator it = iat_.begin(); it != iat_.end(); + ++it) { + iat_avg += *it; + } + iat_avg = iat_avg / iat_.size(); + + double congestion_free_iat_avg = 0.0; + for (std::vector<double>::iterator it = congestion_free_iat_.begin(); + it != congestion_free_iat_.end(); ++it) { + congestion_free_iat_avg += *it; + } + congestion_free_iat_avg = + congestion_free_iat_avg / congestion_free_iat_.size(); + + received_ratio = recv_avg / target_rate_avg; + + iat_ratio = iat_stdev / iat_congestion_free_stdev; + + CongestionCause congestion_cause = CongestionCause::UNKNOWN; + // applying classification tree model + if (received_ratio <= 0.87) + if (iat_stdev <= 6.48) + if (received_ratio <= 0.83) + congestion_cause = CongestionCause::LINK_CAPACITY; + else if (force_reply) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low + else if (iat_ratio <= 2.46) + if (force_reply) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low + else + congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC; + else if (received_ratio <= 0.913 && iat_stdev <= 0.784) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC; + + return congestion_cause; +} + +void RTCRateControlIAT::reset_congestion_statistics() { + iat_.clear(); + received_rate_.clear(); + target_rate_.clear(); +} + +double RTCRateControlIAT::compute_iat_stdev(std::vector<double> v) { + if (v.size() == 0) return 0; + + float sum = 0.0, mean, standard_deviation = 0.0; + for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) { + sum += *it; + } + + mean = sum / v.size(); + for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) { + standard_deviation += pow(*it - mean, 2); + } + return sqrt(standard_deviation / v.size()); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.h b/libtransport/src/protocols/rtc/rtc_rc_iat.h new file mode 100644 index 000000000..715637807 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_iat.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +const int ROUND_HISTORY_SIZE = 10; // equivalent to two seconds +const int ROUND_TO_WAIT_FORCE_DECISION = 5; + +// once congestion is gone, we need to wait for k rounds before changing the +// congestion cause in the case it appears again +const int ROUND_TO_RESET_CAUSE = 5; + +const int MIN_IST_VALUE = 150; // samples of ist larger than 150ms are + // discarded +const double CONGESTION_FREE_QUEUEING_DELAY = 10; + +enum class CongestionCause : uint8_t { + COMPETING_CROSS_TRAFFIC, + FRIENDLY_CROSS_TRAFFIC, + UNKNOWN_CROSS_TRAFFIC, + LINK_CAPACITY, + UNKNOWN +}; + +class RTCRateControlIAT : public RTCRateControl { + public: + RTCRateControlIAT(); + + ~RTCRateControlIAT(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + void reset_congestion_statistics(); + + double compute_iat_stdev(std::vector<double> v); + + CongestionCause apply_classification_tree(bool force_reply); + + private: + uint32_t rounds_since_last_drop_; + uint32_t rounds_without_congestion_; + uint32_t rounds_with_congestion_; + double last_queue_; + uint64_t last_rcv_time_; + uint64_t last_prod_time_; + uint32_t last_seq_number_; + double target_rate_avg_; + + // Iat values are not immediately added to the congestion free set of values + std::array<std::vector<double>, ROUND_HISTORY_SIZE> iat_on_hold_; + uint32_t round_index_; + + // with congestion statistics + std::vector<double> iat_; + std::vector<double> received_rate_; + std::vector<double> target_rate_; + + // congestion free statistics + std::vector<double> congestion_free_iat_; + + CongestionCause congestion_cause_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc index a1c89e329..ecabc5205 100644 --- a/libtransport/src/protocols/rtc/rtc_rc_queue.cc +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -37,7 +37,7 @@ void RTCRateControlQueue::onNewRound(double round_len) { double received_rate = protocol_state_->getReceivedRate(); double target_rate = protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION; - double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = protocol_state_->getAveragePacketSize(); double queue = protocol_state_->getQueuing(); @@ -94,7 +94,7 @@ void RTCRateControlQueue::onNewRound(double round_len) { } void RTCRateControlQueue::onDataPacketReceived( - const core::ContentObject &content_object) { + const core::ContentObject &content_object, bool compute_stats) { // nothing to do return; } diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h index 407354d43..cdf78fd47 100644 --- a/libtransport/src/protocols/rtc/rtc_rc_queue.h +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -30,7 +30,8 @@ class RTCRateControlQueue : public RTCRateControl { ~RTCRateControlQueue(); void onNewRound(double round_len); - void onDataPacketReceived(const core::ContentObject &content_object); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); auto shared_from_this() { return utils::shared_from(this); } diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc new file mode 100644 index 000000000..992bab50e --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <protocols/rtc/rtc_reassembly.h> +#include <protocols/transport_protocol.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RtcReassembly::RtcReassembly(implementation::ConsumerSocket* icn_socket, + TransportProtocol* transport_protocol) + : DatagramReassembly(icn_socket, transport_protocol) { + is_setup_ = false; +} + +void RtcReassembly::reassemble(core::ContentObject& content_object) { + if (!is_setup_) { + is_setup_ = true; + reassembly_consumer_socket_->getSocketOption( + interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); + } + + auto read_buffer = content_object.getPayload(); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length(); + + read_buffer->trimStart(transport_protocol_->transportHeaderLength()); + + if (data_aggregation_) { + rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data()); + + for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) { + std::unique_ptr<utils::MemBuf> segment = read_buffer->clone(); + + uint16_t pkt_start = 0; + uint16_t pkt_len = 0; + int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len); + if (res == -1) { + // this should not happen + break; + } + + segment->trimStart(pkt_start); + segment->trimEnd(segment->length() - pkt_len); + + Reassembly::read_buffer_ = std::move(segment); + Reassembly::notifyApplication(); + } + } else { + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); + } +} + +void RtcReassembly::reassemble(utils::MemBuf& buffer, uint32_t suffix) { + if (!is_setup_) { + is_setup_ = true; + reassembly_consumer_socket_->getSocketOption( + interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); + } + + if (data_aggregation_) { + rtc::AggrPktHeader hdr((uint8_t*)buffer.data()); + + for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) { + std::unique_ptr<utils::MemBuf> segment = buffer.clone(); + + uint16_t pkt_start = 0; + uint16_t pkt_len = 0; + int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len); + if (res == -1) { + // this should not happen + break; + } + + segment->trimStart(pkt_start); + segment->trimEnd(segment->length() - pkt_len); + + Reassembly::read_buffer_ = std::move(segment); + Reassembly::notifyApplication(); + } + + } else { + Reassembly::read_buffer_ = buffer.cloneOne(); + Reassembly::notifyApplication(); + } +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h index 15722a6d5..132004605 100644 --- a/libtransport/src/protocols/rtc/rtc_reassembly.h +++ b/libtransport/src/protocols/rtc/rtc_reassembly.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -28,17 +28,14 @@ namespace rtc { class RtcReassembly : public DatagramReassembly { public: RtcReassembly(implementation::ConsumerSocket *icn_socket, - TransportProtocol *transport_protocol) - : DatagramReassembly(icn_socket, transport_protocol) {} - - void reassemble(core::ContentObject &content_object) override { - auto read_buffer = content_object.getPayload(); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Size of payload: " << read_buffer->length(); - read_buffer->trimStart(transport_protocol_->transportHeaderLength()); - Reassembly::read_buffer_ = std::move(read_buffer); - Reassembly::notifyApplication(); - } + TransportProtocol *transport_protocol); + + void reassemble(core::ContentObject &content_object) override; + void reassemble(utils::MemBuf &buffer, uint32_t suffix) override; + + private: + bool is_setup_; + bool data_aggregation_; }; } // namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc new file mode 100644 index 000000000..888105eab --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -0,0 +1,418 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/interfaces/notification.h> +#include <hicn/transport/interfaces/socket_options_keys.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace transport::interface; + +RecoveryStrategy::RecoveryStrategy( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + bool use_rtx, bool use_fec, interface::StrategyCallback *external_callback) + : recovery_on_(false), + next_rtx_timer_(MAX_TIMER_RTX), + send_rtx_callback_(std::move(callback)), + indexer_(indexer), + round_id_(0), + last_fec_used_(0), + callback_(external_callback) { + setRtxFec(use_rtx, use_fec); + timer_ = std::make_unique<asio::steady_timer>(io_service); +} + +RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) + : rtx_state_(std::move(rs.rtx_state_)), + rtx_timers_(std::move(rs.rtx_timers_)), + recover_with_fec_(std::move(rs.recover_with_fec_)), + timer_(std::move(rs.timer_)), + next_rtx_timer_(std::move(rs.next_rtx_timer_)), + send_rtx_callback_(std::move(rs.send_rtx_callback_)), + n_(std::move(rs.n_)), + k_(std::move(rs.k_)), + indexer_(std::move(rs.indexer_)), + state_(std::move(rs.state_)), + rc_(std::move(rs.rc_)), + round_id_(std::move(rs.round_id_)), + last_fec_used_(std::move(rs.last_fec_used_)), + callback_(rs.callback_) { + setFecParams(n_, k_); +} + +RecoveryStrategy::~RecoveryStrategy() {} + +void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { + n_ = n; + k_ = k; + + // XXX for the moment we go in steps of 5% loss rate. + // max loss rate = 95% + for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { + double dec_loss_rate = (double)loss_rate / 100.0; + double exp_losses = (double)k_ * dec_loss_rate; + uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); + + fec_state_ f; + f.fec_to_ask = std::min(fec_to_ask, (n_ - k_)); + f.last_update = round_id_; + f.avg_residual_losses = 0.0; + f.consecutive_use = 0; + fec_per_loss_rate_.push_back(f); + } +} + +bool RecoveryStrategy::lossDetected(uint32_t seq) { + if (isRtx(seq)) { + // this packet is already in the list of rtx + return false; + } + + auto it = recover_with_fec_.find(seq); + if (it != recover_with_fec_.end()) { + // this packet is already in list of packets to recover with fec + // this list contians also fec packets that will not be recovered with rtx + return false; + } + + // new loss detected, recover it according to the strategy + newPacketLoss(seq); + return true; +} + +void RecoveryStrategy::clear() { + rtx_state_.clear(); + rtx_timers_.clear(); + recover_with_fec_.clear(); + + if (next_rtx_timer_ != MAX_TIMER_RTX) { + next_rtx_timer_ = MAX_TIMER_RTX; + timer_->cancel(); + } +} + +// rtx functions +void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { + if (!indexer_->isFec(seq) || force) { + // this packet needs to be re-transmitted + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.next_send_ = computeNextSend(seq, true); + state.rtx_count_ = 0; + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Add " << seq << " to retransmissions. next rtx is in " + << state.next_send_ - getNow() << " ms"; + rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); + rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq)); + + // if a new rtx is introduced, check the rtx timer + scheduleNextRtx(); + } else { + // do not re-send fec packets but keep track of them + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } +} + +uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) { + uint64_t now = getNow(); + if (new_rtx) { + // for the new rtx we wait one estimated IAT after the loss detection. this + // is bacause, assuming that packets arrive with a constant IAT, we should + // get a new packet every IAT + double prod_rate = state_->getProducerRate(); + uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; + uint32_t jitter = 0; + + if (prod_rate != 0) { + double packet_size = state_->getAveragePacketSize(); + estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + jitter = ceil(state_->getJitter()); + } + + uint32_t wait = 1; + if (estimated_iat < 18) { + // for low rate app we do not wait to send a RTX + // we consider low rate stream with less than 50pps (iat >= 20ms) + // (e.g. audio in videoconf, mobile games). + // in the check we use 18ms to accomodate for measurements errors + // for flows with higher rate wait 1 ait + jitter + wait = estimated_iat + jitter; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "first rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat + << " jttr = " << jitter; + + return now + wait; + } else { + // wait one RTT + uint32_t wait = SENTINEL_TIMER_INTERVAL; + + double prod_rate = state_->getProducerRate(); + if (prod_rate == 0) { + return now + SENTINEL_TIMER_INTERVAL; + } + + double packet_size = state_->getAveragePacketSize(); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + + uint64_t rtt = state_->getMinRTT(); + if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; + wait = rtt; + + uint32_t jitter = ceil(state_->getJitter()); + wait += jitter; + + // it may happen that the channel is congested and we have some additional + // queuing delay to take into account + uint32_t queue = ceil(state_->getQueuing()); + wait += queue; + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat + << " jttr = " << jitter << " queue = " << queue; + + return now + wait; + } +} + +void RecoveryStrategy::retransmit() { + if (rtx_timers_.size() == 0) return; + + uint64_t now = getNow(); + + auto it = rtx_timers_.begin(); + std::unordered_set<uint32_t> lost_pkt; + uint32_t sent_counter = 0; + while (it != rtx_timers_.end() && it->first <= now && + sent_counter < MAX_RTX_IN_BATCH) { + uint32_t seq = it->second; + auto rtx_it = + rtx_state_.find(seq); // this should always return a valid iter + if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX || + (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || + seq < state_->getLastSeqNacked()) { + // max rtx reached or packet too old or packet nacked, this packet is lost + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "packet " << seq << " lost because 1) max rtx: " + << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: " + << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE) + << " 3) nacked: " << (seq < state_->getLastSeqNacked()); + lost_pkt.insert(seq); + it++; + } else { + // resend the packet + state_->onRetransmission(seq); + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) rtx_it->second.rtx_count_++; + rtx_it->second.next_send_ = computeNextSend(seq, false); + it = rtx_timers_.erase(it); + rtx_timers_.insert( + std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "send rtx for sequence " << seq << ", next send in " + << (rtx_it->second.next_send_ - now); + send_rtx_callback_(seq); + sent_counter++; + } + } + + // remove packets if needed + for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) { + uint32_t seq = *lost_it; + state_->onPacketLost(seq); + deleteRtx(seq); + } +} + +void RecoveryStrategy::scheduleNextRtx() { + if (rtx_timers_.size() == 0) { + // all the rtx were removed, reset timer + next_rtx_timer_ = MAX_TIMER_RTX; + return; + } + + // check if timer is alreay set + if (next_rtx_timer_ != MAX_TIMER_RTX) { + // a new check for rtx is already scheduled + if (next_rtx_timer_ > rtx_timers_.begin()->first) { + // we need to re-schedule it + timer_->cancel(); + } else { + // wait for the next timer + return; + } + } + + // set a new timer + next_rtx_timer_ = rtx_timers_.begin()->first; + uint64_t now = utils::SteadyTime::nowMs().count(); + uint64_t wait = 1; + if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now) + wait = next_rtx_timer_ - now; + + std::weak_ptr<RecoveryStrategy> self(shared_from_this()); + timer_->expires_from_now(std::chrono::milliseconds(wait)); + timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + if (auto s = self.lock()) { + s->retransmit(); + s->next_rtx_timer_ = MAX_TIMER_RTX; + s->scheduleNextRtx(); + } + }); +} + +void RecoveryStrategy::deleteRtx(uint32_t seq) { + auto it_rtx = rtx_state_.find(seq); + if (it_rtx == rtx_state_.end()) return; // rtx not found + + // remove the rtx from the timers list + uint64_t ts = it_rtx->second.next_send_; + auto it_timers = rtx_timers_.find(ts); + while (it_timers != rtx_timers_.end() && it_timers->first == ts) { + if (it_timers->second == seq) { + rtx_timers_.erase(it_timers); + break; + } + it_timers++; + } + // remove rtx + rtx_state_.erase(it_rtx); +} + +// fec functions +uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) { + double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in % + + if (loss_rate > 95) loss_rate = 95; // max loss rate + + if (loss_rate == 0) return 0; + + // once per minute try to reduce the fec rate. it may happen that for some bin + // we ask too many fec packet. here we try to reduce this values gently + if (round_id_ % ROUNDS_PER_MIN == 0) { + reduceFec(); + } + + // keep track of the last used fec. if we use a new bin on this round reset + // consecutive use and avg loss in the prev bin + uint32_t bin = ceil(loss_rate / 5.0) - 1; + if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1; + + if (bin != last_fec_used_) { + fec_per_loss_rate_[last_fec_used_].consecutive_use = 0; + fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0; + } + last_fec_used_ = bin; + fec_per_loss_rate_[last_fec_used_].consecutive_use++; + + // we update the stats only once very 5 rounds (1sec) that is the rate at + // which we compute residual losses + if (round_id_ % ROUNDS_PER_SEC == 0) { + double residual_losses = state_->getResidualLossRate() * 100; + // update residual loss rate + fec_per_loss_rate_[bin].avg_residual_losses = + (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) + + (1 - MOVING_AVG_ALPHA) * residual_losses; + + if ((fec_per_loss_rate_[bin].last_update - round_id_) < + WAIT_BEFORE_FEC_UPDATE) { + // this bin is been updated recently so don't modify it and + // return the current state + return fec_per_loss_rate_[bin].fec_to_ask; + } + + // if the residual loss rate is too high and we can ask more fec packets and + // we are using this configuration since at least 5 sec update fec + if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE && + fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) && + fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) { + // so increase the number of fec packets to ask + fec_per_loss_rate_[bin].fec_to_ask++; + fec_per_loss_rate_[bin].last_update = round_id_; + fec_per_loss_rate_[bin].avg_residual_losses = 0.0; + } + } + + return fec_per_loss_rate_[bin].fec_to_ask; +} + +void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, + std::optional<bool> fec_on) { + if (rtx_on) rtx_on_ = *rtx_on; + if (fec_on) fec_on_ = *fec_on; + + if (*callback_) { + notification::RecoveryStrategy strategy = + notification::RecoveryStrategy::RECOVERY_OFF; + + if (rtx_on_ && fec_on_) + strategy = notification::RecoveryStrategy::RTX_AND_FEC; + else if (rtx_on_) + strategy = notification::RecoveryStrategy::RTX_ONLY; + else if (fec_on_) + strategy = notification::RecoveryStrategy::FEC_ONLY; + + (*callback_)(strategy); + } +} + +// common functions +void RecoveryStrategy::onLostTimeout(uint32_t seq) { removePacketState(seq); } + +void RecoveryStrategy::removePacketState(uint32_t seq) { + auto it_fec = recover_with_fec_.find(seq); + if (it_fec != recover_with_fec_.end()) { + recover_with_fec_.erase(it_fec); + return; + } + + deleteRtx(seq); +} + +// private methods + +void RecoveryStrategy::reduceFec() { + for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { + double dec_loss_rate = (double)loss_rate / 100.0; + double exp_losses = (double)k_ * dec_loss_rate; + uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); + + uint32_t bin = ceil(loss_rate / 5.0) - 1; + if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) { + fec_per_loss_rate_[bin].fec_to_ask--; + // std::cout << "reduce fec to ask for bin " << bin << std::endl; + } + } +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h new file mode 100644 index 000000000..9ffc69a1b --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <protocols/indexer.h> +#include <protocols/rtc/rtc_rc.h> +#include <protocols/rtc/rtc_state.h> + +#include <map> +#include <unordered_map> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { + protected: + struct rtx_state_ { + uint64_t first_send_; + uint64_t next_send_; + uint32_t rtx_count_; + }; + + using rtxState = struct rtx_state_; + + public: + using SendRtxCallback = std::function<void(uint32_t)>; + + RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, bool use_rtx, bool use_fec, + interface::StrategyCallback *external_callback); + + RecoveryStrategy(RecoveryStrategy &&rs); + + virtual ~RecoveryStrategy(); + + void setRtxFec(std::optional<bool> rtx_on = {}, + std::optional<bool> fec_on = {}); + void setState(RTCState *state) { state_ = state; } + void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; } + void setFecParams(uint32_t n, uint32_t k); + + void tunrOnRecovery() { recovery_on_ = true; } + + bool isRtx(uint32_t seq) { + if (rtx_state_.find(seq) != rtx_state_.end()) return true; + return false; + } + + bool isPossibleLossWithNoRtx(uint32_t seq) { + if (recover_with_fec_.find(seq) != recover_with_fec_.end()) return true; + return false; + } + + bool isRtxOn() { return rtx_on_; } + + RTCState *getState() { return state_; } + bool lossDetected(uint32_t seq); + void clear(); + + virtual void onNewRound(bool in_sync) = 0; + virtual void newPacketLoss(uint32_t seq) = 0; + virtual void receivedPacket(uint32_t seq) = 0; + void onLostTimeout(uint32_t seq); + + void incRoundId() { round_id_++; } + + // utils + uint64_t getNow() { + uint64_t now = utils::SteadyTime::nowMs().count(); + return now; + } + + protected: + // rtx functions + void addNewRtx(uint32_t seq, bool force); + uint64_t computeNextSend(uint32_t seq, bool new_rtx); + void retransmit(); + void scheduleNextRtx(); + void deleteRtx(uint32_t seq); + + // fec functions + uint32_t computeFecPacketsToAsk(bool in_sync); + + // common functons + void removePacketState(uint32_t seq); + + bool recovery_on_; + bool rtx_on_; + bool fec_on_; + + // this map keeps track of the retransmitted interest, ordered from the oldest + // to the newest one. the state contains the timer of the first send of the + // interest (from pendingIntetests_), the timer of the next send (key of the + // multimap) and the number of rtx + std::map<uint32_t, rtxState> rtx_state_; + // this map stored the rtx by timer. The key is the time at which the rtx + // should be sent, and the val is the interest seq number + std::multimap<uint64_t, uint32_t> rtx_timers_; + + // lost packets that will be recovered with fec + std::unordered_set<uint32_t> recover_with_fec_; + + // rtx vars + std::unique_ptr<asio::steady_timer> timer_; + uint64_t next_rtx_timer_; + SendRtxCallback send_rtx_callback_; + + // fec vars + uint32_t n_; + uint32_t k_; + Indexer *indexer_; + + RTCState *state_; + RTCRateControl *rc_; + + private: + struct fec_state_ { + uint32_t fec_to_ask; + uint32_t last_update; // round id of the last update + // (wait 10 ruonds (2sec) between updates) + uint32_t consecutive_use; // consecutive ruonds where this fec was used + double avg_residual_losses; + }; + + void reduceFec(); + + uint32_t round_id_; // number of rounds + uint32_t last_fec_used_; + std::vector<fec_state_> fec_per_loss_rate_; + interface::StrategyCallback *callback_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc new file mode 100644 index 000000000..e2c60ca77 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_delay.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyDelayBased::RecoveryStrategyDelayBased( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::StrategyCallback *external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + external_callback), // start with rtx + congestion_state_(false), + probing_state_(false), + switch_rounds_(0) {} + +RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(true, false); + // we have to re-init congestion and + // probing + congestion_state_ = false; + probing_state_ = false; +} + +RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {} + +void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) { + if (fec_to_ask == 0) { + setRtxFec(true, false); + switch_rounds_ = 0; + } else { + switch_rounds_++; + if (switch_rounds_ >= 5) { + setRtxFec(false, true); + } else { + setRtxFec({}, true); + } + } +} + +void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { + if (!recovery_on_) { + // disable fec so that no extra packet will be sent + // for rtx we check if recovery is on in newPacketLoss + setRtxFec(true, false); + indexer_->setNFec(0); + return; + } + + uint64_t rtt = state_->getMinRTT(); + + bool congestion = false; + // XXX at the moment we are not looking at congestion events + // congestion = rc_->inCongestionState(); + + if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) { + // switch from rtx to fec or keep use fec. Notice that if some rtx are + // waiting to be scheduled, they will be sent normally, but no new rtx will + // be created If the loss rate is 0 keep to use RTX. + uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + softSwitchToFec(fec_to_ask); + indexer_->setNFec(fec_to_ask); + return; + } + + if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { + // turn on rtx + softSwitchToFec(0); + indexer_->setNFec(0); + return; + } +} + +void RecoveryStrategyDelayBased::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyDelayBased::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +void RecoveryStrategyDelayBased::probing() { + // TODO + // for the moment ask for all fec and exit the probing phase + probing_state_ = false; + setRtxFec(false, true); + indexer_->setNFec(computeFecPacketsToAsk(true)); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h new file mode 100644 index 000000000..0dd199965 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyDelayBased : public RecoveryStrategy { + public: + RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::StrategyCallback *external_callback); + + RecoveryStrategyDelayBased(RecoveryStrategy &&rs); + + ~RecoveryStrategyDelayBased(); + + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + void softSwitchToFec(uint32_t fec_to_ask); + + bool congestion_state_; + bool probing_state_; + uint32_t switch_rounds_; + + void probing(); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc new file mode 100644 index 000000000..36d8e39f0 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_fec_only.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyFecOnly::RecoveryStrategyFecOnly( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::StrategyCallback *external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, + external_callback), + congestion_state_(false), + probing_state_(false), + switch_rounds_(0) {} + +RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(false, true); + congestion_state_ = false; + probing_state_ = false; +} + +RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {} + +void RecoveryStrategyFecOnly::onNewRound(bool in_sync) { + if (!recovery_on_) { + indexer_->setNFec(0); + return; + } + + // XXX for the moment we are considering congestion events + // if(rc_->inCongestionState()){ + // congestion_state_ = true; + // probing_state_ = false; + // indexer_->setNFec(0); + // return; + // } + + // no congestion + if (congestion_state_) { + // this is the first round after congestion + // enter probing phase + probing_state_ = true; + congestion_state_ = false; + } + + if (probing_state_) { + probing(); + } else { + uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + // If fec_to_ask == 0 we use rtx even if in these strategy we use only fec. + // In this way the first packet loss that triggers the usage of fec can be + // recovered using rtx, otherwise it will always be lost + if (fec_to_ask == 0) { + setRtxFec(true, false); + switch_rounds_ = 0; + } else { + switch_rounds_++; + if (switch_rounds_ >= 5) { + setRtxFec(false, true); + } else { + setRtxFec({}, true); + } + } + indexer_->setNFec(fec_to_ask); + } +} + +void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + // if not pending add rtc + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyFecOnly::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +void RecoveryStrategyFecOnly::probing() { + // TODO + // for the moment ask for all fec and exit the probing phase + probing_state_ = false; + uint32_t fec_to_ask = computeFecPacketsToAsk(true); + indexer_->setNFec(fec_to_ask); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h new file mode 100644 index 000000000..37b505d35 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyFecOnly : public RecoveryStrategy { + public: + RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::StrategyCallback *external_callback); + + RecoveryStrategyFecOnly(RecoveryStrategy &&rs); + + ~RecoveryStrategyFecOnly(); + + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + bool congestion_state_; + bool probing_state_; + uint32_t switch_rounds_; + + void probing(); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc new file mode 100644 index 000000000..bd153d209 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_low_rate.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyLowRate::RecoveryStrategyLowRate( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::StrategyCallback *external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, + external_callback), // start with fec + fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec + rtx_allowed_consecutive_rounds_(0) { + initSwitchVector(); +} + +RecoveryStrategyLowRate::RecoveryStrategyLowRate(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)), + fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec + rtx_allowed_consecutive_rounds_(0) { + setRtxFec(false, true); + initSwitchVector(); +} + +RecoveryStrategyLowRate::~RecoveryStrategyLowRate() {} + +void RecoveryStrategyLowRate::initSwitchVector() { + // TODO adjust thresholds here when new data are collected + // see resutls in + // https://confluence-eng-gpk1.cisco.com/conf/display/SPT/dailyreports + thresholds_t t1; + t1.rtt = 15; // 15ms + t1.loss_rtx_to_fec = 15; // 15% + t1.loss_fec_to_rtx = 10; // 10% + thresholds_t t2; + t2.rtt = 35; // 35ms + t2.loss_rtx_to_fec = 5; // 5% + t2.loss_fec_to_rtx = 1; // 1% + switch_vector.push_back(t1); + switch_vector.push_back(t2); +} + +void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec, + uint32_t fec_to_ask) { + setRtxFec(use_rtx, use_fec); + indexer_->setNFec(fec_to_ask); +} + +void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { + uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync); + if (fec_to_ask == 0) { + // fec is off, turn on RTX immediatly to avoid packet losses + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + return; + } + + uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100); + uint32_t rtt = state_->getAvgRTT(); + + bool use_rtx = false; + for (size_t i = 0; i < switch_vector.size(); i++) { + uint32_t max_loss_rate = 0; + if (fec_on_) + max_loss_rate = switch_vector[i].loss_fec_to_rtx; + else + max_loss_rate = switch_vector[i].loss_rtx_to_fec; + + if (rtt < switch_vector[i].rtt && loss_rate < max_loss_rate) { + use_rtx = true; + rtx_allowed_consecutive_rounds_++; + break; + } + } + + if (!use_rtx) rtx_allowed_consecutive_rounds_ = 0; + + if (use_rtx) { + if (fec_on_) { + // here we should swtich from RTX to FEC + // wait 10sec where the switch is allowed before actually switch + if (rtx_allowed_consecutive_rounds_ >= + ((MILLI_IN_A_SEC / ROUND_LEN) * 10)) { // 10 sec + // use RTX + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + } else { + // keep using FEC (and maybe RTX) + setRecoveryParameters(true, true, fec_to_ask); + fec_consecutive_rounds_++; + } + } else { + // keep using RTX + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + } + } else { + // use FEC and RTX + setRecoveryParameters(true, true, fec_to_ask); + fec_consecutive_rounds_++; + } + + // everytime that we anable FEC we keep also RTX on. in this way the first + // losses that are not covered by FEC are recovered using RTX. after 5 sec we + // disable fec + if (fec_consecutive_rounds_ >= ((MILLI_IN_A_SEC / ROUND_LEN) * 5)) { + // turn off RTX + setRtxFec(false); + } +} + +void RecoveryStrategyLowRate::onNewRound(bool in_sync) { + if (!recovery_on_) { + // disable fec so that no extra packet will be sent + // for rtx we check if recovery is on in newPacketLoss + setRtxFec(true, false); + indexer_->setNFec(0); + return; + } + + // XXX since this strategy will be used only for flow at low rate we do not + // consider congestion events like in other strategies + + selectRecoveryStrategy(in_sync); +} + +void RecoveryStrategyLowRate::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyLowRate::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h new file mode 100644 index 000000000..f0c7bd0d5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +#include <vector> + +namespace transport { + +namespace protocol { + +namespace rtc { + +struct thresholds_t { + uint32_t rtt; + uint32_t loss_rtx_to_fec; // loss rate used to move from rtx to fec + uint32_t loss_fec_to_rtx; // loss rate used to move from fec to rtx +}; + +class RecoveryStrategyLowRate : public RecoveryStrategy { + public: + RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::StrategyCallback *external_callback); + + RecoveryStrategyLowRate(RecoveryStrategy &&rs); + + ~RecoveryStrategyLowRate(); + + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + void initSwitchVector(); + void setRecoveryParameters(bool use_rtx, bool use_fec, uint32_t fec_to_ask); + void selectRecoveryStrategy(bool in_sync); + + uint32_t fec_consecutive_rounds_; + uint32_t rtx_allowed_consecutive_rounds_; + + // this table contains the thresholds that indicates when to switch from RTX + // to FEC and viceversa. values in the vector are detected with a set of + // experiments. the vector is used in the following way: if rtt and loss rate + // are less than one of the values in the in the vector, losses are + // recovered using RTX. otherwive losses are recovered using FEC. as for FEC + // only and delay based strategy, the swith from RTX to FEC is smooth, + // meaning that FEC and RTX are used together for some rounds + std::vector<thresholds_t> switch_vector; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc new file mode 100644 index 000000000..499e978f1 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_recovery_off.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::StrategyCallback *external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, false, false, + external_callback) {} + +RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(false, false); +} + +RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {} + +void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) { + // nothing to do + return; +} + +void RecoveryStrategyRecoveryOff::newPacketLoss(uint32_t seq) { + // here we only keep track of the lost packets to avoid to + // count them multple times in the counters. for this we + // use the recover_with_fec_ set + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); +} + +void RecoveryStrategyRecoveryOff::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h new file mode 100644 index 000000000..98cd1e6a5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyRecoveryOff : public RecoveryStrategy { + public: + RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::StrategyCallback *external_callback); + + RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs); + + ~RecoveryStrategyRecoveryOff(); + + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc new file mode 100644 index 000000000..c1ae9b53d --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_rtx_only.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::StrategyCallback *external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + external_callback) {} + +RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(true, false); +} + +RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {} + +void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) { + // nothing to do + return; +} + +void RecoveryStrategyRtxOnly::newPacketLoss(uint32_t seq) { + if (!recovery_on_) { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + return; + } + addNewRtx(seq, false); +} + +void RecoveryStrategyRtxOnly::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h new file mode 100644 index 000000000..7ae909454 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyRtxOnly : public RecoveryStrategy { + public: + RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::StrategyCallback *external_callback); + + RecoveryStrategyRtxOnly(RecoveryStrategy &&rs); + + ~RecoveryStrategyRtxOnly(); + + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index c99205a26..6a21531f8 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -24,15 +24,15 @@ namespace protocol { namespace rtc { RTCState::RTCState(Indexer *indexer, - ProbeHandler::SendProbeCallback &&rtt_probes_callback, + ProbeHandler::SendProbeCallback &&probe_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service) - : indexer_(indexer), - rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback), - io_service)), + : loss_history_(10), // log 10sec history + indexer_(indexer), + probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback), + io_service)), discovered_rtt_callback_(std::move(discovered_rtt_callback)) { init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); - initParams(); } RTCState::~RTCState() {} @@ -55,9 +55,19 @@ void RTCState::initParams() { highest_seq_received_in_order_ = 0; last_seq_nacked_ = 0; loss_rate_ = 0.0; - avg_loss_rate_ = 0.0; + avg_loss_rate_ = -1.0; max_loss_rate_ = 0.0; last_round_loss_rate_ = 0.0; + + // loss rate per sec + lost_per_sec_ = 0; + total_expected_packets_ = 0; + per_sec_loss_rate_ = 0.0; + + // residual losses counters + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; residual_loss_rate_ = 0.0; // fec counters @@ -66,9 +76,13 @@ void RTCState::initParams() { // bw counters received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; + avg_packet_size_ = INIT_PACKET_SIZE; production_rate_ = 0.0; received_rate_ = 0.0; + fec_recovered_rate_ = 0.0; // nack counter nack_on_last_round_ = false; @@ -95,31 +109,30 @@ void RTCState::initParams() { path_table_.clear(); main_path_ = nullptr; - // packet received - received_or_lost_packets_.clear(); + // packet cache (not pending anymore) + packet_cache_.clear(); // pending interests pending_interests_.clear(); - // skipped interest + // used to keep track of the skipped interest last_interest_sent_ = 0; - skipped_interests_.clear(); // init rtt first_interest_sent_time_ = ~0; first_interest_sent_seq_ = 0; + // start probing the producer init_rtt_ = false; - rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); - rtt_probes_->sendProbes(); + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); } // packet events void RTCState::onSendNewInterest(const core::Name *interest_name) { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint32_t seq = interest_name->getSuffix(); pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); @@ -137,11 +150,12 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) { } // TODO what happen in case of jumps? - // look for skipped interests - skipped_interests_.erase(seq); // remove seq if it is there + eraseFromPacketCache( + seq); // if we send this interest we don't know its state for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { if (indexer_->isFec(i)) { - skipped_interests_.insert(i); + // only fec packets can be skipped + addToPacketCache(i, PacketState::SKIPPED); } } @@ -155,6 +169,7 @@ void RTCState::onTimeout(uint32_t seq, bool lost) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; } received_timeouts_++; @@ -162,11 +177,12 @@ void RTCState::onTimeout(uint32_t seq, bool lost) { } void RTCState::onLossDetected(uint32_t seq) { - if (!indexer_->isFec(seq)) { - packets_lost_++; - } else if (skipped_interests_.find(seq) == skipped_interests_.end() && - seq >= first_interest_sent_seq_) { + PacketState state = getPacketState(seq); + + // if the packet is already marked with a state, do nothing + if (state == PacketState::UNKNOWN) { packets_lost_++; + addToPacketCache(seq, PacketState::LOST); } } @@ -178,30 +194,40 @@ void RTCState::onRetransmission(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); -#if 0 - packets_lost_++; -#endif + if (indexer_->isFec(seq)) pending_fec_pkt_--; } sent_rtx_++; sent_rtx_last_round_++; } +void RTCState::onPossibleLossWithNoRtx(uint32_t seq) { + // if fec is on or rtx is disable we don't need to do anything to recover a + // packet. however in both cases we need to remove possible missing packets + // from the window of pendinig interest in order to free space without wating + // for the timeout. + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } +} + void RTCState::onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats) { uint32_t seq = content_object.getName().getSuffix(); + if (compute_stats) { updatePathStats(content_object, false); received_data_last_round_++; } received_data_++; + packets_sent_to_app_++; - struct data_packet_t *data_pkt = - (struct data_packet_t *)content_object.getPayload()->data(); - uint64_t production_time = data_pkt->getTimestamp(); - if (last_prod_update_ < production_time) { - last_prod_update_ = production_time; - uint32_t production_rate = data_pkt->getProductionRate(); - production_rate_ = (double)production_rate; + core::ParamsRTC params = RTCState::getDataParams(content_object); + + if (last_prod_update_ < params.timestamp) { + last_prod_update_ = params.timestamp; + production_rate_ = (double)params.prod_rate; } updatePacketSize(content_object); @@ -219,9 +245,18 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); - updateReceivedBytes(content_object); + // updateReceivedBytes(content_object); + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + PacketState state = getPacketState(seq); + if (state != PacketState::LOST) { + // increase only for not lost packets + received_fec_pkt_++; + } addRecvOrLost(seq, PacketState::RECEIVED); - received_fec_pkt_++; // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; @@ -233,7 +268,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); uint64_t production_time = nack_pkt->getTimestamp(); - uint32_t production_seq = nack_pkt->getProductionSegement(); + uint32_t production_seq = nack_pkt->getProductionSegment(); uint32_t production_rate = nack_pkt->getProductionRate(); if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || @@ -255,6 +290,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, received_nacks_++; received_nacks_last_round_++; + bool to_delete = false; if (production_seq > seq) { // old nack, seq is lost // update last nacked @@ -266,12 +302,19 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, // future nack // remove the nack from the pending interest map // (the packet is not received/lost yet) - if (indexer_->isFec(seq)) pending_fec_pkt_--; - pending_interests_.erase(seq); + to_delete = true; } else { // this should be a quite rear event. simply remove the // packet from the pending interest list - pending_interests_.erase(seq); + to_delete = true; + } + + if (to_delete) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } } // the producer is responding @@ -295,44 +338,58 @@ void RTCState::onPacketLost(uint32_t seq) { } #endif if (!indexer_->isFec(seq)) { - definitely_lost_pkt_++; - DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + PacketState state = getPacketState(seq); + if (state == PacketState::LOST || state == PacketState::UNKNOWN) { + definitely_lost_pkt_++; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + } } - addRecvOrLost(seq, PacketState::LOST); + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } void RTCState::onPacketRecoveredRtx(uint32_t seq) { + packets_sent_to_app_++; + if (seq > highest_seq_received_) highest_seq_received_ = seq; losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); } -void RTCState::onPacketRecoveredFec(uint32_t seq) { +void RTCState::onFecPacketRecoveredRtx(uint32_t seq) { + // This is the same as onPacketRecoveredRtx, but in this is case the + // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards + if (seq > highest_seq_received_) highest_seq_received_ = seq; + losses_recovered_++; +} + +void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { losses_recovered_++; + packets_sent_to_app_++; + recovered_bytes_with_fec_ += size; + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + // adding header to the count + recovered_bytes_with_fec_ += 60; // XXX get header size some where + + if (getPacketState(seq) == PacketState::UNKNOWN) + onLossDetected(seq); // the pkt was lost but didn't account for it yet + addRecvOrLost(seq, PacketState::RECEIVED); } bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t seq = probe.getName().getSuffix(); - uint64_t rtt; - - rtt = rtt_probes_->getRtt(seq); + uint64_t rtt; + rtt = probe_handler_->getRtt(seq); if (rtt == 0) return false; // this is not a valid probe - // like for data and nacks update the path stats. Here the RTT is computed - // by the probe handler. Both probes for rtt and bw are good to esimate - // info on the path + // Like for data and nacks update the path stats. Here the RTT is computed + // by the probe handler. Both probes for rtt and bw are good to estimate + // info on the path. uint32_t path_label = probe.getPathLabel(); - auto path_it = path_table_.find(path_label); - // update production rate and last_seq_nacked like in case of a nack - struct nack_packet_t *probe_pkt = - (struct nack_packet_t *)probe.getPayload()->data(); - uint64_t sender_timestamp = probe_pkt->getTimestamp(); - uint32_t production_seq = probe_pkt->getProductionSegement(); - uint32_t production_rate = probe_pkt->getProductionRate(); - if (path_it == path_table_.end()) { // found a new path std::shared_ptr<RTCDataPath> newPath = @@ -344,26 +401,26 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { auto path = path_it->second; - path->insertRttSample(rtt); + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); path->receivedNack(); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); + + core::ParamsRTC params = RTCState::getProbeParams(probe); - int64_t OWD = now - sender_timestamp; + int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); - if (last_prod_update_ < sender_timestamp) { - last_production_seq_ = production_seq; - last_prod_update_ = sender_timestamp; - production_rate_ = (double)production_rate; + if (last_prod_update_ < params.timestamp) { + last_production_seq_ = params.prod_seg; + last_prod_update_ = params.timestamp; + production_rate_ = (double)params.prod_rate; } // the producer is responding // we consider it active only if the production rate is not 0 // or the production sequence numner is not 1 - if (production_rate_ != 0 || production_seq != 1) { + if (production_rate_ != 0 || params.prod_seg != 1) { producer_is_active_ = true; } @@ -375,7 +432,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { if (received_probes_ == 1) { // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the - // others + // others. main_path_ = path; setInitRttTimer(INIT_RTT_PROBE_WAIT); } @@ -393,11 +450,21 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { return true; } -void RTCState::onNewRound(double round_len, bool in_sync) { - // XXX - // here we take into account only the single path case so we assume that we - // don't use two paths in parellel for this single flow +void RTCState::onJumpForward(uint32_t next_seq) { + for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; + seq++) { + auto it = pending_interests_.find(seq); + PacketState packet_state = getPacketState(seq); + if (it == pending_interests_.end() && + packet_state != PacketState::RECEIVED && + packet_state != PacketState::DEFINITELY_LOST) { + onLossDetected(seq); + onPacketLost(seq); + } + } +} +void RTCState::onNewRound(double round_len, bool in_sync) { if (path_table_.empty()) return; double bytes_per_sec = @@ -407,7 +474,24 @@ void RTCState::onNewRound(double round_len, bool in_sync) { else received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + double fec_bytes_per_sec = + ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len)); + + if (fec_received_rate_ == 0) + fec_received_rate_ = fec_bytes_per_sec; + else + fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec); + + double fec_recovered_bytes_per_sec = + ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len)); + if (fec_recovered_rate_ == 0) + fec_recovered_rate_ = fec_recovered_bytes_per_sec; + else + fec_recovered_rate_ = + (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); + +#if 0 // search for an active path. There should be only one active path (meaning a // path that leads to the producer socket -no cache- and from which we are // currently getting data packets) at any time. However it may happen that @@ -428,9 +512,36 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } } +#endif + + // search for an active path. Is it possible to have multiple path that are + // used at the same time. We use as reference path the one from where we gets + // more packets. This means that the path should have better lantecy or less + // channel losses + + uint32_t last_round_packets = 0; + std::shared_ptr<RTCDataPath> old_main_path = main_path_; + main_path_ = nullptr; + + for (auto it = path_table_.begin(); it != path_table_.end(); it++) { + if (it->second->isActive()) { + uint32_t pkt = it->second->getPacketsLastRound(); + if (pkt > last_round_packets) { + last_round_packets = pkt; + main_path_ = it->second; + } + } + it->second->roundEnd(); + } - // if (in_sync) updateLossRate(); - updateLossRate(); + if (main_path_ == nullptr) main_path_ = old_main_path; + + // in case we get a new main path we reset the stats of the old one. this is + // beacuse, in case we need to switch back we don't what to take decisions on + // old stats that may be outdated. + if (main_path_ != old_main_path) old_main_path->clearRtt(); + + updateLossRate(in_sync); // handle nacks if (!nack_on_last_round_ && received_bytes_ > 0) { @@ -460,6 +571,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // reset counters received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; packets_lost_ = 0; definitely_lost_pkt_ = 0; losses_recovered_ = 0; @@ -516,20 +629,16 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, // it means that we are processing an interest // that is not pending - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t RTT = now - interest_sent_time; - path->insertRttSample(RTT); + path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false); // compute OWD (the first part of the nack and data packet header are the // same, so we cast to data data packet) - struct data_packet_t *packet = - (struct data_packet_t *)content_object.getPayload()->data(); - uint64_t sender_timestamp = packet->getTimestamp(); - int64_t OWD = now - sender_timestamp; + core::ParamsRTC params = RTCState::getDataParams(content_object); + int64_t OWD = now - params.timestamp; path->insertOwdSample(OWD); // compute IAT or set path to producer @@ -543,59 +652,106 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, } } -void RTCState::updateLossRate() { +void RTCState::updateLossRate(bool in_sync) { last_round_loss_rate_ = loss_rate_; loss_rate_ = 0.0; - residual_loss_rate_ = 0.0; uint32_t number_theorically_received_packets_ = highest_seq_received_ - first_seq_in_round_; - // in this case no new packet was recevied after the previuos round, avoid - // division by 0 - if (number_theorically_received_packets_ == 0) return; - // XXX this may be quite inefficient if the rate is high // maybe is better to iterate over the set? - for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) { - auto it = skipped_interests_.find(i); - if (it != skipped_interests_.end()) { + + uint32_t fec_packets = 0; + for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) { + PacketState state = getPacketState(i); + if (state == PacketState::SKIPPED) { if (number_theorically_received_packets_ > 0) number_theorically_received_packets_--; - skipped_interests_.erase(it); } + if (indexer_->isFec(i)) fec_packets++; } + if (indexer_->isFec(highest_seq_received_)) fec_packets++; - loss_rate_ = (double)((double)(packets_lost_) / - (double)number_theorically_received_packets_); + // in this case no new packet was received after the previous round, avoid + // division by 0 + if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return; - if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec - if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_; + if (number_theorically_received_packets_ != 0) + loss_rate_ = (double)((double)(packets_lost_) / + (double)number_theorically_received_packets_); + else + // we didn't receive anything except NACKs that triggered losses + loss_rate_ = 1.0; - if (avg_loss_rate_ == 0) + if (avg_loss_rate_ == -1.0) avg_loss_rate_ = loss_rate_; else avg_loss_rate_ = avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); - residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / - (double)number_theorically_received_packets_); + // update counters for loss rate per second + total_expected_packets_ += number_theorically_received_packets_; + lost_per_sec_ += packets_lost_; + + if (in_sync) { + // update counters for residual losses + // fec packets are not sent to the app so we don't want to count them here + expected_packets_ += + ((highest_seq_received_ - first_seq_in_round_) - fec_packets); + } else { + packets_sent_to_app_ = 0; + } + + if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) { + // compute loss rate per second + if (lost_per_sec_ > total_expected_packets_) + lost_per_sec_ = total_expected_packets_; + + if (total_expected_packets_ == 0) + per_sec_loss_rate_ = 0; + else + per_sec_loss_rate_ = + (double)((double)(lost_per_sec_) / (double)total_expected_packets_); + + loss_history_.pushBack(per_sec_loss_rate_); + max_loss_rate_ = getMaxLoss(); + + if (in_sync && expected_packets_ != 0) { + // compute residual loss rate + if (packets_sent_to_app_ > expected_packets_) { + // this may happen if we get packet from the prev bin that get recovered + // on the current one + packets_sent_to_app_ = expected_packets_; + } + + residual_loss_rate_ = + 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_); + if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0; + } + + lost_per_sec_ = 0; + total_expected_packets_ = 0; + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; + } + + rounds_from_last_compute_++; +} - if (residual_loss_rate_ < 0) residual_loss_rate_ = 0; +void RTCState::dataToBeReceived(uint32_t seq) { + addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { - if (indexer_->isFec(seq)) { - pending_fec_pkt_--; + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; } - pending_interests_.erase(seq); - if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { - received_or_lost_packets_.erase(received_or_lost_packets_.begin()); - } - // notice that it may happen that a packet that we consider lost arrives after - // some time, in this case we simply overwrite the packet state. - received_or_lost_packets_[seq] = state; + addToPacketCache(seq, state); // keep track of the last packet received/lost // without holes. @@ -608,16 +764,25 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { } else if (seq <= highest_seq_received_in_order_) { // here we do nothing } else if (seq > highest_seq_received_in_order_) { - // 1) there is a gap in the sequence so we do not update largest_in_seq_ - // 2) all the packets from largest_in_seq_ to seq are in - // received_or_lost_packets_ an we upate largest_in_seq_ - // or are FEC packets + // 1) there is a gap in the sequence so we do not update + // highest_seq_received_in_order_ + // 2) all the packets from highest_seq_received_in_order_ to seq are + // received or lost or are fec packetis. In this case we increase + // highest_seq_received_in_order_ until we find an hole in the sequence for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { - if (received_or_lost_packets_.find(i) == - received_or_lost_packets_.end() && - !indexer_->isFec(i)) { - break; + PacketState state = getPacketState(i); + if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) { + if (indexer_->isFec(i)) { + // this is a fec packet and we don't care to receive it + // however we may need to increse the number or lost packets + // XXX: in case we want to use rtx to recover fec packets, + // this may prevent to detect a packet loss and no rtx will be sent + onLossDetected(i); + } else { + // this is a data packet and we need to get it + break; + } } // this packet is in order so we can update the // highest_seq_received_in_order_ @@ -629,9 +794,14 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { void RTCState::setInitRttTimer(uint32_t wait) { init_rtt_timer_->cancel(); init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); - init_rtt_timer_->async_wait([this](std::error_code ec) { + + std::weak_ptr<RTCState> self = shared_from_this(); + init_rtt_timer_->async_wait([self](const std::error_code &ec) { if (ec) return; - checkInitRttTimer(); + + if (auto ptr = self.lock()) { + ptr->checkInitRttTimer(); + } }); } @@ -639,19 +809,25 @@ void RTCState::checkInitRttTimer() { if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { // we didn't received enough probes, restart received_probes_ = 0; - rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); - rtt_probes_->sendProbes(); + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); setInitRttTimer(INIT_RTT_PROBE_RESTART); return; } + init_rtt_ = true; main_path_->roundEnd(); - rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0); - rtt_probes_->sendProbes(); + loss_history_.pushBack(probe_handler_->getProbeLossRate()); + max_loss_rate_ = getMaxLoss(); + + probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); + probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); + probe_handler_->sendProbes(); // init last_seq_nacked_. skip packets that may come from the cache double prod_rate = getProducerRate(); - double rtt = (double)getRTT() / MILLI_IN_A_SEC; + double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; @@ -659,6 +835,68 @@ void RTCState::checkInitRttTimer() { discovered_rtt_callback_(); } +double RTCState::getMaxLoss() { + if (loss_history_.size() != 0) return loss_history_.begin(); + return 0; +} + +core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params; + + switch (ProbeHandler::getProbeType(seq)) { + case ProbeType::INIT: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(probe)); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + case ProbeType::RTT: { + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = probe_pkt->getTimestamp(), + .prod_rate = probe_pkt->getProductionRate(), + .prod_seg = probe_pkt->getProductionSegment(), + }; + break; + } + default: + break; + } + + return params; +} + +core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { + core::ParamsRTC params; + + switch (data.getPayloadType()) { + case core::PayloadType::DATA: { + struct data_packet_t *data_pkt = + (struct data_packet_t *)data.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = data_pkt->getTimestamp(), + .prod_rate = data_pkt->getProductionRate(), + .prod_seg = data.getName().getSuffix(), + }; + break; + } + case core::PayloadType::MANIFEST: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(data)); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + default: + break; + } + + return params; +} + } // namespace rtc } // namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index 729ba7a1b..8bf48ccc2 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,13 +14,16 @@ */ #pragma once +#include <core/facade.h> #include <hicn/transport/config.h> #include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> +#include <hicn/transport/utils/rtc_quality_score.h> #include <protocols/indexer.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_data_path.h> +#include <utils/max_filter.h> #include <map> #include <set> @@ -31,25 +34,50 @@ namespace protocol { namespace rtc { -enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN }; +// packet state +// RECEIVED: the packet was already received +// LOST: the packet is marked as lost but can be recovered +// DEFINITELY_LOST: the packet is lost and cannot be recovered +// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec +// decoder may decide to send the packet directly to the app. to avoid +// duplicated the packet is marked with this state +// SKIPPED: an interest that was not sent, only for FEC packets +// UNKNOWN: unknown state +enum class PacketState : uint8_t { + RECEIVED, + TO_BE_RECEIVED, + LOST, + DEFINITELY_LOST, + SKIPPED, + UNKNOWN +}; + +class RTCState : public std::enable_shared_from_this<RTCState> { + using PendingInterestsMap = std::map<uint32_t, uint64_t>; + + private: + const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small + // for high rate apps -class RTCState : std::enable_shared_from_this<RTCState> { public: using DiscoveredRttCallback = std::function<void()>; public: - RTCState(Indexer *indexer, - ProbeHandler::SendProbeCallback &&rtt_probes_callback, + RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service); ~RTCState(); + // initialization + void initParams(); + // packet events void onSendNewInterest(const core::Name *interest_name); void onTimeout(uint32_t seq, bool lost); void onLossDetected(uint32_t seq); void onRetransmission(uint32_t seq); + void onPossibleLossWithNoRtx(uint32_t seq); void onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats); void onFecPacketReceived(const core::ContentObject &content_object); @@ -57,8 +85,10 @@ class RTCState : std::enable_shared_from_this<RTCState> { bool compute_stats); void onPacketLost(uint32_t seq); void onPacketRecoveredRtx(uint32_t seq); - void onPacketRecoveredFec(uint32_t seq); + void onFecPacketRecoveredRtx(uint32_t seq); + void onPacketRecoveredFec(uint32_t seq, uint32_t size); bool onProbePacketReceived(const core::ContentObject &probe); + void onJumpForward(uint32_t next_seq); // protocol state void onNewRound(double round_len, bool in_sync); @@ -72,10 +102,21 @@ class RTCState : std::enable_shared_from_this<RTCState> { // delay metrics bool isRttDiscovered() const { return init_rtt_; } - uint64_t getRTT() const { + uint64_t getMinRTT() const { if (mainPathIsValid()) return main_path_->getMinRtt(); return 0; } + + uint64_t getAvgRTT() const { + if (mainPathIsValid()) return main_path_->getAvgRtt(); + return 0; + } + + uint64_t getMaxRTT() const { + if (mainPathIsValid()) return main_path_->getMaxRtt(); + return 0; + } + void resetRttStats() { if (mainPathIsValid()) main_path_->clearRtt(); } @@ -98,6 +139,7 @@ class RTCState : std::enable_shared_from_this<RTCState> { uint64_t getInterestSentTime(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) return it->second; + return 0; } @@ -110,14 +152,15 @@ class RTCState : std::enable_shared_from_this<RTCState> { return pending_interests_.size(); } - PacketState isReceivedOrLost(uint32_t seq) { - auto it = received_or_lost_packets_.find(seq); - if (it != received_or_lost_packets_.end()) return it->second; + PacketState getPacketState(uint32_t seq) { + auto it = packet_cache_.find(seq); + if (it != packet_cache_.end()) return it->second; return PacketState::UNKNOWN; } // loss rate - double getLossRate() const { return loss_rate_; } + double getPerRoundLossRate() const { return loss_rate_; } + double getPerSecondLossRate() const { return per_sec_loss_rate_; } double getAvgLossRate() const { return avg_loss_rate_; } double getMaxLossRate() const { return max_loss_rate_; } double getLastRoundLossRate() const { return last_round_loss_rate_; } @@ -134,15 +177,22 @@ class RTCState : std::enable_shared_from_this<RTCState> { return highest_seq_received_in_order_; } + double getMaxLoss(); + // fec packets uint32_t getReceivedFecPackets() const { return received_fec_pkt_; } uint32_t getPendingFecPackets() const { return pending_fec_pkt_; } // generic stats uint32_t getReceivedBytesInRound() const { return received_bytes_; } + uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; } + uint32_t getRecoveredFecBytesInRound() const { + return recovered_bytes_with_fec_; + } uint32_t getReceivedNacksInRound() const { return received_nacks_last_round_; } + uint32_t getReceivedDataInRound() const { return received_data_last_round_; } uint32_t getSentInterestInRound() const { return sent_interests_last_round_; } uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; } @@ -150,6 +200,9 @@ class RTCState : std::enable_shared_from_this<RTCState> { double getAvailableBw() const { return 0.0; }; // TODO double getProducerRate() const { return production_rate_; } double getReceivedRate() const { return received_rate_; } + double getReceivedFecRate() const { return fec_received_rate_; } + double getRecoveredFecRate() const { return fec_recovered_rate_; } + double getAveragePacketSize() const { return avg_packet_size_; } // nacks @@ -162,22 +215,46 @@ class RTCState : std::enable_shared_from_this<RTCState> { // packets from cache double getPacketFromCacheRatio() const { return data_from_cache_rate_; } - std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() { + PendingInterestsMap::iterator getPendingInterestsMapBegin() { return pending_interests_.begin(); } - std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() { + PendingInterestsMap::iterator getPendingInterestsMapEnd() { return pending_interests_.end(); } + // quality + uint8_t getQualityScore() { + uint8_t qs = quality_score_.getQualityScore( + getMaxRTT(), std::round(getResidualLossRate() * 100)); + return qs; + } + + // We received a data pkt that will be set to RECEIVED, but first we have to + // go through FEC. We do not want to consider this pkt as recovered, thus we + // set it as TO_BE_RECEIVED. + void dataToBeReceived(uint32_t seq); + + // Extract RTC parameters from probes (init or RTT probes) and data packets. + static core::ParamsRTC getProbeParams(const core::ContentObject &probe); + static core::ParamsRTC getDataParams(const core::ContentObject &data); + private: - void initParams(); + void addToPacketCache(uint32_t seq, PacketState state) { + // this function adds or updates the current state + if (packet_cache_.size() >= MAX_CACHED_PACKETS) { + packet_cache_.erase(packet_cache_.begin()); + } + packet_cache_[seq] = state; + } + + void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); } // update stats void updateState(); void updateReceivedBytes(const core::ContentObject &content_object); void updatePacketSize(const core::ContentObject &content_object); void updatePathStats(const core::ContentObject &content_object, bool is_nack); - void updateLossRate(); + void updateLossRate(bool in_sycn); void addRecvOrLost(uint32_t seq, PacketState state); @@ -211,22 +288,39 @@ class RTCState : std::enable_shared_from_this<RTCState> { double avg_loss_rate_; double max_loss_rate_; double last_round_loss_rate_; + utils::MaxFilter<double> loss_history_; + + // per second loss rate + uint32_t lost_per_sec_; + uint32_t total_expected_packets_; + double per_sec_loss_rate_; + + // conunters for residual losses + // residual losses are computed every second and are used + // as feedback to the upper levels (e.g application) + uint32_t expected_packets_; + uint32_t packets_sent_to_app_; + uint32_t rounds_from_last_compute_; double residual_loss_rate_; // bw counters uint32_t received_bytes_; + uint32_t received_fec_bytes_; + uint32_t recovered_bytes_with_fec_; double avg_packet_size_; - double production_rate_; // rate communicated by the producer using nacks - double received_rate_; // rate recevied by the consumer + double production_rate_; // rate communicated by the producer using nacks + double received_rate_; // rate recevied by the consumer (only data) + double fec_received_rate_; // fec rate recevied by the consumer + double fec_recovered_rate_; // rate recovered using fec - // nack counter + // nack counters // the bool takes tracks only about the valid nacks (no rtx) and it is used to // switch between the states. Instead received_nacks_last_round_ logs all the // nacks for statistics bool nack_on_last_round_; uint32_t received_nacks_last_round_; - // packets counter + // packets counters uint32_t received_packets_last_round_; uint32_t received_data_last_round_; uint32_t received_data_from_cache_; @@ -234,11 +328,11 @@ class RTCState : std::enable_shared_from_this<RTCState> { uint32_t sent_interests_last_round_; uint32_t sent_rtx_last_round_; - // fec counter + // fec counters uint32_t received_fec_pkt_; uint32_t pending_fec_pkt_; - // round conunters + // round counters uint32_t rounds_; uint32_t rounds_without_nacks_; uint32_t rounds_without_packets_; @@ -261,23 +355,27 @@ class RTCState : std::enable_shared_from_this<RTCState> { // packet received // cache where to store info about the last MAX_CACHED_PACKETS - std::map<uint32_t, PacketState> received_or_lost_packets_; + // these are packets that are received or lost or definitely lost and are not + // anymore in the pending intetest list + std::map<uint32_t, PacketState> packet_cache_; // pending interests - std::map<uint32_t, uint64_t> pending_interests_; + PendingInterestsMap pending_interests_; // indexer Indexer *indexer_; - // skipped interests + // used to keep track of the skipped interests uint32_t last_interest_sent_; - std::unordered_set<uint32_t> skipped_interests_; // probes - std::shared_ptr<ProbeHandler> rtt_probes_; + std::shared_ptr<ProbeHandler> probe_handler_; bool init_rtt_; std::unique_ptr<asio::steady_timer> init_rtt_timer_; + // quality score + RTCQualityScore quality_score_; + // callbacks DiscoveredRttCallback discovered_rtt_callback_; }; diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc new file mode 100644 index 000000000..29968dd02 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_verifier.cc @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/facade.h> +#include <protocols/rtc/rtc_packet.h> +#include <protocols/rtc/rtc_verifier.h> + +namespace transport { +namespace protocol { +namespace rtc { + +RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier, + uint32_t max_unverified_delay) + : verifier_(verifier), max_unverified_delay_(max_unverified_delay) {} + +void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) { + rtc_state_ = rtc_state; +} + +void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) { + verifier_ = verifier; +} + +void RTCVerifier::setMaxUnverifiedDelay(uint32_t max_unverified_delay) { + max_unverified_delay_ = max_unverified_delay; +} + +auth::VerificationPolicy RTCVerifier::verify( + core::ContentObject &content_object, bool is_fec) { + uint32_t suffix = content_object.getName().getSuffix(); + core::PayloadType payload_type = content_object.getPayloadType(); + + bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_manifest = !is_probe && !is_nack && !is_fec && + payload_type == core::PayloadType::MANIFEST; + bool is_data = !is_probe && !is_nack && !is_fec && + payload_type == core::PayloadType::DATA; + + if (is_probe) return verifyProbe(content_object); + if (is_nack) return verifyNack(content_object); + if (is_fec) return verifyFec(content_object); + if (is_data) return verifyData(content_object); + if (is_manifest) return verifyManifest(content_object); + + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + verifier_->callVerificationFailedCallback(suffix, policy); + return policy; +} + +auth::VerificationPolicy RTCVerifier::verifyProbe( + core::ContentObject &content_object) { + switch (ProbeHandler::getProbeType(content_object.getName().getSuffix())) { + case ProbeType::INIT: { + auth::VerificationPolicy policy = verifyManifest(content_object); + if (policy != auth::VerificationPolicy::ACCEPT) { + return policy; + } + return processManifest(content_object); + } + case ProbeType::RTT: + return verifyNack(content_object); + default: + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + verifier_->callVerificationFailedCallback( + content_object.getName().getSuffix(), policy); + return policy; + } +} + +auth::VerificationPolicy RTCVerifier::verifyNack( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::verifyFec( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::verifyData( + core::ContentObject &content_object) { + uint32_t suffix = content_object.getName().getSuffix(); + + if (_is_ah(content_object.getFormat())) { + return verifier_->verifyPackets(&content_object); + } + + unverified_bytes_[suffix] = + content_object.headerSize() + content_object.payloadSize(); + unverified_packets_[suffix] = + content_object.computeDigest(manifest_hash_algo_); + + // An alert is raised when too much packets remain unverified + if (getTotalUnverified() > max_unverified_bytes_) { + unverified_bytes_.clear(); + unverified_packets_.clear(); + + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + verifier_->callVerificationFailedCallback(suffix, policy); + return policy; + } + + return auth::VerificationPolicy::ACCEPT; +} + +auth::VerificationPolicy RTCVerifier::verifyManifest( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::processManifest( + core::ContentObject &content_object) { + uint32_t suffix = content_object.getName().getSuffix(); + + core::ContentObjectManifest manifest(content_object); + manifest.decode(); + + // Update last manifest + if (suffix > last_manifest_) { + last_manifest_ = suffix; + } + + // Extract parameters + manifest_hash_algo_ = manifest.getHashAlgorithm(); + core::ParamsRTC params = manifest.getParamsRTC(); + + if (params.prod_rate > 0) { + max_unverified_bytes_ = static_cast<uint64_t>( + (max_unverified_delay_ / 1000.0) * params.prod_rate); + } + + if (max_unverified_bytes_ == 0 || !rtc_state_) { + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + verifier_->callVerificationFailedCallback(suffix, policy); + return policy; + } + + // Extract hashes + auth::Verifier::SuffixMap suffix_map = + core::ContentObjectManifest::getSuffixMap(&manifest); + + // Return early if the manifest is empty + if (suffix_map.empty()) { + return auth::VerificationPolicy::ACCEPT; + } + + // Remove lost packets from digest map + manifest_digests_.insert(suffix_map.begin(), suffix_map.end()); + for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) { + if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) { + unverified_packets_.erase(it->first); + unverified_bytes_.erase(it->first); + it = manifest_digests_.erase(it); + } else { + ++it; + } + } + + // Verify packets + auth::Verifier::PolicyMap policies = + verifier_->verifyHashes(unverified_packets_, manifest_digests_); + + for (const auto &policy : policies) { + switch (policy.second) { + case auth::VerificationPolicy::ACCEPT: { + manifest_digests_.erase(policy.first); + unverified_packets_.erase(policy.first); + unverified_bytes_.erase(policy.first); + break; + } + case auth::VerificationPolicy::UNKNOWN: + break; + case auth::VerificationPolicy::DROP: + case auth::VerificationPolicy::ABORT: + auth::VerificationPolicy p = policy.second; + verifier_->callVerificationFailedCallback(policy.first, p); + return p; + } + } + + return auth::VerificationPolicy::ACCEPT; +} + +void RTCVerifier::onDataRecoveredFec(uint32_t suffix) { + manifest_digests_.erase(suffix); +} + +void RTCVerifier::onJumpForward(uint32_t next_suffix) { + if (next_suffix <= last_manifest_ + 1) { + return; + } + + // When we jump forward in the suffix sequence, we remove packets that + // probably won't be verified. Those packets have a suffix in the range + // [last_manifest_ + 1, next_suffix[. + for (auto it = unverified_packets_.begin(); + it != unverified_packets_.end();) { + if (it->first > last_manifest_) { + unverified_bytes_.erase(it->first); + it = unverified_packets_.erase(it); + } else { + ++it; + } + } +} + +uint32_t RTCVerifier::getTotalUnverified() const { + uint32_t total = 0; + + for (auto bytes : unverified_bytes_) { + if (bytes.second > UINT32_MAX - total) { + total = UINT32_MAX; + break; + } + total += bytes.second; + } + + return total; +} + +uint32_t RTCVerifier::getMaxUnverified() const { return max_unverified_bytes_; } + +} // end namespace rtc +} // end namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h new file mode 100644 index 000000000..596bd8536 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_verifier.h @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/facade.h> +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/content_object.h> +#include <protocols/rtc/rtc_state.h> + +namespace transport { +namespace protocol { +namespace rtc { + +class RTCVerifier { + public: + RTCVerifier(std::shared_ptr<auth::Verifier> verifier, + uint32_t max_unverified_delay); + + virtual ~RTCVerifier() = default; + + void setState(std::shared_ptr<RTCState> rtc_state); + + void setVerifier(std::shared_ptr<auth::Verifier> verifier); + + void setMaxUnverifiedDelay(uint32_t max_unverified_delay); + + void onDataRecoveredFec(uint32_t suffix); + void onJumpForward(uint32_t next_suffix); + + uint32_t getTotalUnverified() const; + uint32_t getMaxUnverified() const; + + auth::VerificationPolicy verify(core::ContentObject &content_object, + bool is_fec = false); + auth::VerificationPolicy verifyProbe(core::ContentObject &content_object); + auth::VerificationPolicy verifyNack(core::ContentObject &content_object); + auth::VerificationPolicy verifyFec(core::ContentObject &content_object); + auth::VerificationPolicy verifyData(core::ContentObject &content_object); + auth::VerificationPolicy verifyManifest(core::ContentObject &content_object); + + auth::VerificationPolicy processManifest(core::ContentObject &content_object); + + protected: + // The RTC state. + std::shared_ptr<RTCState> rtc_state_; + // The verifier instance. + std::shared_ptr<auth::Verifier> verifier_; + // Hash algorithm used by manifests. + auth::CryptoHashType manifest_hash_algo_; + // The last manifest processed. + auth::Suffix last_manifest_; + // Hold digests extracted from all manifests received. + auth::Verifier::SuffixMap manifest_digests_; + // Hold hashes of all content objects received before they are verified. + auth::Verifier::SuffixMap unverified_packets_; + // Hold number of unverified bytes. + std::unordered_map<auth::Suffix, uint32_t> unverified_bytes_; + // Maximum delay (in ms) for an unverified byte to become verifed. + uint32_t max_unverified_delay_; + // Maximum number of unverified bytes before aborting the connection. + uint64_t max_unverified_bytes_; +}; + +} // end namespace rtc +} // namespace protocol +} // namespace transport |