diff options
Diffstat (limited to 'libtransport/src/protocols/rtc')
39 files changed, 7165 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt new file mode 100644 index 000000000..be8e0189c --- /dev/null +++ b/libtransport/src/protocols/rtc/CMakeLists.txt @@ -0,0 +1,59 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_forwarding_strategy.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_congestion_detection.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_iat.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_recovery_strategy.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_delay.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_fec_only.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_low_rate.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_recovery_off.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rs_rtx_only.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_verifier.cc +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc new file mode 100644 index 000000000..60eceeb19 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <protocols/rtc/probe_handler.h> +#include <protocols/rtc/rtc_consts.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback, + asio::io_service &io_service) + : probe_interval_(0), + max_probes_(0), + sent_probes_(0), + recv_probes_(0), + probe_timer_(std::make_unique<asio::steady_timer>(io_service)), + rand_eng_((std::random_device())()), + distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ), + send_probe_callback_(std::move(send_callback)) {} + +ProbeHandler::~ProbeHandler() {} + +uint64_t ProbeHandler::getRtt(uint32_t seq, bool is_valid) { + auto it = pending_probes_.find(seq); + + if (it == pending_probes_.end()) return 0; + + if (!is_valid) { + // delete the probe anyway + pending_probes_.erase(it); + valid_batch_ = false; + return 0; + } + + uint64_t now = utils::SteadyTime::nowMs().count(); + uint64_t rtt = now - it->second; + if (rtt < 1) rtt = 1; + + pending_probes_.erase(it); + recv_probes_++; + + return rtt; +} + +double ProbeHandler::getProbeLossRate() { + if (!valid_batch_) return 1.0; + return 1.0 - ((double)recv_probes_ / (double)sent_probes_); +} + +void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) { + DCHECK(min <= max && min >= MIN_PROBE_SEQ); + distr_ = std::uniform_int_distribution<uint32_t>(min, max); +} + +void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) { + stopProbes(); + probe_interval_ = probe_interval; + max_probes_ = max_probes; +} + +void ProbeHandler::stopProbes() { + probe_interval_ = 0; + max_probes_ = 0; + sent_probes_ = 0; + recv_probes_ = 0; + valid_batch_ = true; + probe_timer_->cancel(); +} + +void ProbeHandler::sendProbes() { + if (probe_interval_ == 0) return; + + std::weak_ptr<ProbeHandler> self(shared_from_this()); + probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); + probe_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + auto s = self.lock(); + if (s) { + s->generateProbe(); + } + }); +} + +void ProbeHandler::generateProbe() { + if (probe_interval_ == 0) return; + if (max_probes_ != 0 && sent_probes_ >= max_probes_) return; + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint32_t seq = distr_(rand_eng_); + pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now)); + send_probe_callback_(seq); + sent_probes_++; + + // clean up + // a probe may get lost. if the pending_probes_ size becomes bigger than + // MAX_PENDING_PROBES remove all the probes older than a seconds + if (pending_probes_.size() > MAX_PENDING_PROBES) { + for (auto it = pending_probes_.begin(); it != pending_probes_.end();) { + if ((now - it->second) > 1000) + it = pending_probes_.erase(it); + else + it++; + } + } + + sendProbes(); +} + +ProbeType ProbeHandler::getProbeType(uint32_t seq) { + if (MIN_INIT_PROBE_SEQ <= seq && seq <= MAX_INIT_PROBE_SEQ) { + return ProbeType::INIT; + } + if (MIN_RTT_PROBE_SEQ <= seq && seq <= MAX_RTT_PROBE_SEQ) { + return ProbeType::RTT; + } + return ProbeType::NOT_PROBE; +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h new file mode 100644 index 000000000..d989194d4 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> + +#include <functional> +#include <random> +#include <unordered_map> + +namespace transport { + +namespace protocol { + +namespace rtc { + +enum class ProbeType { + NOT_PROBE, + INIT, + RTT, +}; + +class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { + public: + using SendProbeCallback = std::function<void(uint32_t)>; + + public: + ProbeHandler(SendProbeCallback &&send_callback, asio::io_service &io_service); + + ~ProbeHandler(); + + // If the function returns 0 the probe is not valid. + uint64_t getRtt(uint32_t seq, bool is_valid); + + // this function may return a residual loss rate higher than the real one if + // we don't wait enough time for the probes to come back + double getProbeLossRate(); + + // Set the probe suffix range [min, max] + void setSuffixRange(uint32_t min, uint32_t max); + + // Reset the probes parameters and stops the current probing. + // probe_interval = 0 means that no event will be scheduled. + // max_probe = 0 means no limit to the number of probe to send. + void setProbes(uint32_t probe_interval, uint32_t max_probes); + + void stopProbes(); + + void sendProbes(); + + static ProbeType getProbeType(uint32_t seq); + + private: + void generateProbe(); + + uint32_t probe_interval_; // us + uint32_t max_probes_; // packets + uint32_t sent_probes_; // packets + uint32_t recv_probes_; // packets + + bool valid_batch_; // if at least one probe in a batch is considered not + // valid (e.g. prod rate == ~0) the full batch is invalid. + // the bool is set to true when sendProbe is called + + std::unique_ptr<asio::steady_timer> probe_timer_; + + // Map from packet suffixes to timestamp + std::unordered_map<uint32_t, uint64_t> pending_probes_; + + // Random generator + std::default_random_engine rand_eng_; + std::uniform_int_distribution<uint32_t> distr_; + + SendProbeCallback send_probe_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc new file mode 100644 index 000000000..9a56269f3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -0,0 +1,1101 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <math.h> +#include <protocols/errors.h> +#include <protocols/incremental_indexer_bytestream.h> +#include <protocols/rtc/rtc.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_indexer.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), + new RtcReassembly(icn_socket, this)), + max_aggregated_interest_(1), + number_(0) { + icn_socket->getSocketOption(PORTAL, portal_); + round_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + scheduler_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); + pacing_timer_ = + std::make_unique<asio::steady_timer>(portal_->getThread().getIoService()); +} + +RTCTransportProtocol::~RTCTransportProtocol() {} + +void RTCTransportProtocol::resume() { + newRound(); + TransportProtocol::resume(); +} + +std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) { + return DATA_HEADER_SIZE + + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0); +} + +// private +void RTCTransportProtocol::initParams() { + TransportProtocol::reset(); + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + + fwd_strategy_.setCallback([self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy); + } + }); + + std::shared_ptr<auth::Verifier> verifier; + socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + uint32_t factor_relevant; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + factor_relevant); + + uint32_t factor_alert; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + factor_alert); + + rc_ = std::make_shared<RTCRateControlCongestionDetection>(); + ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( + indexer_verifier_.get(), portal_->getThread().getIoService(), + interface::RtcTransportRecoveryStrategies::RTX_ONLY, + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendRtxInterest(seq); + } + }, + [self](notification::Strategy strategy) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy); + } + }); + + verifier_ = + std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert); + + state_ = std::make_shared<RTCState>( + indexer_verifier_.get(), + [self](uint32_t seq) { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->sendProbeInterest(seq); + } + }, + [self]() { + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + ptr->discoveredRtt(); + } + }, + portal_->getThread().getIoService()); + + rc_->setState(state_); + rc_->turnOnRateControl(); + ldr_->setState(state_.get()); + ldr_->setRateControl(rc_.get()); + verifier_->setState(state_); + + // protocol state + start_send_interest_ = false; + current_state_ = SyncState::catch_up; + + // Cancel timer + number_++; + round_timer_->cancel(); + + scheduler_timer_->cancel(); + scheduler_timer_on_ = false; + last_interest_sent_time_ = 0; + last_interest_sent_seq_ = 0; + + // Aggregated interests setup + bool aggregated_interests_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS, + aggregated_interests_on); + if (aggregated_interests_on) { + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) + max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr)); + else + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + + max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_, + 1 + MAX_SUFFIXES_IN_MANIFEST); + } + LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_; + + max_sent_int_ = + std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); + + pacing_timer_->cancel(); + pacing_timer_on_ = false; + + // delete all timeouts and future nacks + timeouts_or_nacks_.clear(); + + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + RTC_INTEREST_LIFETIME); + + // init state params + state_->initParams(); +} + +// private +void RTCTransportProtocol::reset() { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called"; + initParams(); + newRound(); +} + +void RTCTransportProtocol::inactiveProducer() { + // when the producer is inactive we reset the consumer state + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_ + << ", max_sync_win_: " << max_sync_win_; + + // names/packets var + indexer_verifier_->reset(); + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); + + ldr_->clear(); +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + round_timer_->async_wait([self](const std::error_code &ec) { + if (ec) { + return; + } + + auto ptr = self.lock(); + + if (!ptr || !ptr->isRunning()) { + return; + } + + auto &state = ptr->state_; + + // saving counters that will be reset on new round + uint32_t sent_retx = state->getSentRtxInRound(); + uint32_t received_bytes = + (state->getReceivedBytesInRound() + // data packets received + state->getReceivedFecBytesInRound()); // fec packets received + uint32_t sent_interest = state->getSentInterestInRound(); + uint32_t lost_data = state->getLostData(); + uint32_t definitely_lost = state->getDefinitelyLostPackets(); + uint32_t recovered_losses = state->getRecoveredLosses(); + uint32_t received_nacks = state->getReceivedNacksInRound(); + uint32_t received_fec = state->getReceivedFecPackets(); + + // update sync state if needed + double cache_rate = state->getPacketFromCacheRatio(); + uint32_t round_without_nacks = state->getRoundsWithoutNacks(); + + if (ptr->current_state_ == SyncState::in_sync) { + if (cache_rate > MAX_DATA_FROM_CACHE) { + ptr->current_state_ = SyncState::catch_up; + } + } else { + if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && + cache_rate < MAX_DATA_FROM_CACHE) { + ptr->current_state_ = SyncState::in_sync; + } + } + + bool in_sync = (ptr->current_state_ == SyncState::in_sync); + ptr->ldr_->onNewRound(in_sync); + ptr->state_->onNewRound((double)ROUND_LEN, in_sync); + ptr->rc_->onNewRound((double)ROUND_LEN); + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Calling updateSyncWindow in newRound function"; + ptr->updateSyncWindow(); + + ptr->sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + definitely_lost, recovered_losses, received_nacks, + received_fec); + ptr->fwd_strategy_.checkStrategy(); + ptr->newRound(); + }); +} + +void RTCTransportProtocol::discoveredRtt() { + start_send_interest_ = true; + uint32_t strategy; + socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); + ldr_->changeRecoveryStrategy( + (interface::RtcTransportRecoveryStrategies)strategy); + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) ldr_->setContentSharingMode(); + ldr_->turnOnRecovery(); + ldr_->onNewRound(false); + + // set forwarding strategy switch if selected + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + Prefix prefix(*name, 128); + fwd_strategy_.initFwdStrategy( + portal_, prefix, state_.get(), + (interface::RtcTransportRecoveryStrategies)strategy); + updateSyncWindow(); +} + +void RTCTransportProtocol::computeMaxSyncWindow() { + double production_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + if (production_rate == 0.0 || packet_size == 0.0) { + // the consumer has no info about the producer, + // keep the previous maxCWin + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Returning in computeMaxSyncWindow because: prod_rate: " + << (production_rate == 0.0) + << " || packet_size: " << (packet_size == 0.0); + return; + } + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE)) + production_rate = MIN_PROD_RATE_SHARING_MODE; + + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; + + max_sync_win_ = (uint32_t)ceil( + (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / + packet_size); + + max_sync_win_ = std::min(max_sync_win_, rc_->getCongestionWindow()); +} + +void RTCTransportProtocol::updateSyncWindow() { + computeMaxSyncWindow(); + + if (max_sync_win_ == INITIAL_WIN_MAX) { + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return; + + current_sync_win_ = INITIAL_WIN; + scheduleNextInterests(); + return; + } + + double prod_rate = state_->getProducerRate(); + double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; + double packet_size = state_->getAveragePacketSize(); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE)) + prod_rate = MIN_PROD_RATE_SHARING_MODE; + + // if some of the info are not available do not update the current win + if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0); + + current_sync_win_ += + ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); + + if (current_state_ == SyncState::catch_up) { + current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; + } + + uint32_t min_win = WIN_MIN; + bool aggregated_data_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_DATA, + aggregated_data_on); + if (aggregated_data_on) { + min_win = WIN_MIN_WITH_AGGREGARED_DATA; + min_win += (min_win * (1 - (std::max(0.3, rtt) - rtt) / 0.3)); + } + + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); + current_sync_win_ = std::max(current_sync_win_, min_win); + } + + scheduleNextInterests(); +} + +void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + if (!start_send_interest_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq; + interest_name->setSuffix(seq); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send probe " << seq; + interest_name->setSuffix(seq); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + // we got a timeout for this packet so it is not pending anymore + interest_name->setSuffix(seq); + state_->onSendNewInterest(interest_name); + sendInterest(*interest_name); +} + +void RTCTransportProtocol::scheduleNextInterests() { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; + + if (!isRunning() && !is_first_) { + return; + } + + if (pacing_timer_on_) { + return; // wait pacing timer for the next send + } + + if (!start_send_interest_) { + return; // RTT discovering phase is not finished so + // do not start to send interests + } + + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; + // here we keep seding the same interest until the producer + // does not start again + if (indexer_verifier_->checkNextSuffix() != 0) { + // the producer just become inactive, reset the state + inactiveProducer(); + } + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + uint32_t next_seg = 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send interest " << next_seg; + interest_name->setSuffix(next_seg); + + if (portal_->interestIsPending(*interest_name)) { + // if interest 0 is already pending we return + return; + } + + sendInterest(*interest_name); + state_->onSendNewInterest(interest_name); + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Pending interest number: " << state_->getPendingInterestNumber() + << " -- current_sync_win_: " << current_sync_win_; + + uint32_t pending = state_->getPendingInterestNumber(); + uint32_t pending_fec = state_->getPendingFecPackets(); + + if ((pending - pending_fec) >= current_sync_win_) + return; // no space in the window + + // XXX double check if aggregated interests are still working here + if ((current_sync_win_ - (pending - pending_fec)) < + max_aggregated_interest_) { + if (scheduler_timer_on_) return; // timer already scheduled + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint64_t time = now - last_interest_sent_time_; + if (time < WAIT_FOR_INTEREST_BATCH) { + uint64_t next = WAIT_FOR_INTEREST_BATCH - time; + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->scheduler_timer_on_) return; + ptr->scheduler_timer_on_ = false; + ptr->scheduleNextInterests(); + } + }); + return; // wait for the timer + } + } + + scheduler_timer_on_ = false; + scheduler_timer_->cancel(); + + // skip nacked pacekts + if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) { + indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); + } + + // skip received packets + uint32_t max_received = state_->getHighestSeqReceivedInOrder(); + if (indexer_verifier_->checkNextSuffix() <= max_received) { + indexer_verifier_->jumpToIndex(max_received + 1); + } + + uint32_t sent_interests = 0; + uint32_t sent_packets = 0; + uint32_t aggregated_counter = 0; + Name *name = nullptr; + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; + + while (((state_->getPendingInterestNumber() - + state_->getPendingFecPackets()) < current_sync_win_) && + (sent_interests < max_sent_int_)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "In while loop. Window size: " << current_sync_win_; + + uint32_t next_seg = indexer_verifier_->getNextSuffix(); + name->setSuffix(next_seg); + + // send the packet only if: + // 1) it is not pending yet (not true for rtx) + // 2) the packet is not received or def lost + // 3) is not in the rtx list + // 4) is fec and is not in order (!= last sent + 1) + PacketState packet_state = state_->getPacketState(next_seg); + if (portal_->interestIsPending(*name) || + packet_state == PacketState::RECEIVED || + packet_state == PacketState::DEFINITELY_LOST || ldr_->isRtx(next_seg) || + (indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "skip interest " << next_seg << " because: pending " + << portal_->interestIsPending(*name) << ", recv or lost" + << (int)packet_state << ", rtx " << (ldr_->isRtx(next_seg)) + << ", is old fec " + << ((indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)); + continue; + } + + if (aggregated_counter == 0) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg; + interest_name = *name; + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg; + additional_suffixes[aggregated_counter - 1] = next_seg; + } + + last_interest_sent_seq_ = next_seg; + state_->onSendNewInterest(name); + aggregated_counter++; + + if (aggregated_counter >= max_aggregated_interest_) { + sent_packets++; + sent_interests++; + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); + aggregated_counter = 0; + } + } + + // exiting the while we may have some pending interest to send + if (aggregated_counter != 0) { + sent_packets++; + last_interest_sent_time_ = utils::SteadyTime::nowMs().count(); + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + } + + if ((state_->getPendingInterestNumber() - state_->getPendingFecPackets()) < + current_sync_win_) { + // we still have space in the window but we already sent too many packets + // wait PACING_WAIT to avoid drops in the kernel + + pacing_timer_on_ = true; + pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); + + std::weak_ptr<RTCTransportProtocol> self = shared_from_this(); + scheduler_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + auto ptr = self.lock(); + if (ptr && ptr->isRunning()) { + if (!ptr->pacing_timer_on_) return; + + ptr->pacing_timer_on_ = false; + ptr->scheduleNextInterests(); + } + }); + } +} + +void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, + const Name &name) { + uint32_t segment_number = name.getSuffix(); + + if (ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE) { + // this is a timeout on a probe, do nothing + return; + } + + PacketState state = state_->getPacketState(segment_number); + if (state == PacketState::RECEIVED || state == PacketState::DEFINITELY_LOST) { + // we may recover a packets using fec, ignore this timer + return; + } + + timeouts_or_nacks_.insert(segment_number); + if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && + segment_number <= state_->getHighestSeqReceived()) { + // we retransmit packets only if the producer is active, otherwise we + // use timeouts to avoid to send too much traffic + // + // a timeout is sent using RTX only if it is an old packet. if it is for a + // seq number that we didn't reach yet, we send the packet using the normal + // schedule next interest + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "handle timeout for packet " << segment_number << " using rtx"; + if (ldr_->isRtxOn()) { + if (indexer_verifier_->isFec(segment_number)) { + // if this is a fec packet we do not recover it with rtx so we consider + // the packet to be lost + ldr_->onTimeout(segment_number, true); + state_->onTimeout(segment_number, true); + } else { + ldr_->onTimeout(segment_number, false); + state_->onTimeout(segment_number, false); + } + } else { + // in this case we wil never recover the timeout + ldr_->onTimeout(segment_number, true); + state_->onTimeout(segment_number, true); + } + scheduleNextInterests(); + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number + << " using normal interests"; + + if (segment_number < indexer_verifier_->checkNextSuffix()) { + // this is a timeout for a packet that will be generated in the future but + // we are asking for higher sequence numbers. we need to go back like in the + // case of future nacks + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << segment_number; + // add an extra space in the window + indexer_verifier_->jumpToIndex(segment_number); + } + + state_->onTimeout(segment_number, false); + sendInterestForTimeout(segment_number); + scheduleNextInterests(); +} + +void RTCTransportProtocol::onNack(const ContentObject &content_object) { + struct nack_packet_t *nack = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = nack->getProductionSegment(); + uint32_t nack_segment = content_object.getName().getSuffix(); + bool is_rtx = ldr_->isRtx(nack_segment); + + // check if the packet got a timeout + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment + << ". Production segment: " << production_seg; + + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(nack_segment); + if (tn_it != timeouts_or_nacks_.end() || is_rtx) { + compute_stats = false; + // remove packets from timeouts_or_nacks only in case of a past nack + } + + state_->onNackPacketReceived(content_object, compute_stats); + ldr_->onNackPacketReceived(content_object); + + // both in case of past and future nack we jump to the + // production segment in the nack. In case of past nack we will skip unneded + // interest (this is already done in the scheduleNextInterest in any case) + // while in case of future nacks we can go back in time and ask again for the + // content that generated the nack + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On nack next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); + + if (production_seg > nack_segment) { + // remove the nack is it exists + if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + + state_->onJumpForward(production_seg); + // the client is asking for content in the past + // switch to catch up state and increase the window + // this is true only if the packet is not an RTX + if (!is_rtx) current_state_ = SyncState::catch_up; + } else { + // if production_seg == nack_segment we consider this a future nack, since + // production_seg is not yet created. this may happen in case of low + // production rate (e.g. ping at 1pps) + + // if a future nack was also retransmitted add it to the timeout_or_nacks + // set + if (is_rtx) timeouts_or_nacks_.insert(nack_segment); + + // the client is asking for content in the future + // switch to in sync state and decrease the window + current_state_ = SyncState::in_sync; + } + updateSyncWindow(); +} + +void RTCTransportProtocol::onProbe(const ContentObject &content_object) { + uint32_t suffix = content_object.getName().getSuffix(); + ParamsRTC params = RTCState::getProbeParams(content_object); + + if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) { + fec::FECType fec_type = params.fec_type; + + if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) { + // Update FEC type + fec_type_ = fec_type; + + // Enable FEC + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, + std::placeholders::_1), + fec::FECBase::BufferRequested(0)); + + // Update FEC parameters + indexer_verifier_->enableFec(fec_type); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type), + fec::FECUtils::getSourceSymbols(fec_type)); + fec_decoder_->setIOService(portal_->getThread().getIoService()); + } else if (fec_type == fec::FECType::UNKNOWN) { + indexer_verifier_->disableFec(); + } + } + + if (!state_->onProbePacketReceived(content_object)) return; + + // As for NACKs, set next_segment + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "on probe next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << params.prod_seg; + indexer_verifier_->jumpToIndex(params.prod_seg); + + bool loss_detected = ldr_->onProbePacketReceived(content_object); + // we are not out of sync here but we are starting to download content from + // the cache, maybe beacuse the production rate increased suddenly. for this + // reason we put the state to catch up to increase the window + if (loss_detected) current_state_ = SyncState::catch_up; + updateSyncWindow(); +} + +void RTCTransportProtocol::onContentObjectReceived( + Interest &interest, ContentObject &content_object, std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received content object of size: " << content_object.payloadSize(); + + uint32_t segment_number = content_object.getName().getSuffix(); + PayloadType payload_type = content_object.getPayloadType(); + PacketState state; + + ContentObject *content_ptr = &content_object; + ContentObject::Ptr manifest_ptr = nullptr; + + bool is_probe = + ProbeHandler::getProbeType(segment_number) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_fec = indexer_verifier_->isFec(segment_number); + bool is_manifest = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::MANIFEST; + bool is_data = + !is_probe && !is_nack && !is_fec && payload_type == PayloadType::DATA; + bool compute_stats = is_data || is_manifest; + + ec = make_error_code(protocol_error::not_reassemblable); + + // A helper function to process manifests or data packets received + auto onDataPacketReceived = [this](ContentObject &content_object, + bool compute_stats) { + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object, compute_stats); + updateSyncWindow(); + }; + + // First verify the packet signature and apply the corresponding policy + auth::VerificationPolicy policy = verifier_->verify(content_object, is_fec); + indexer_verifier_->applyPolicy(interest, content_object, false, policy); + + if (is_probe) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onProbe(content_object); + return; + } + + if (is_nack) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onNack(content_object); + return; + } + + // content_ptr will point either to the input data packet or to a manifest + // whose FEC header has been removed + if (is_manifest) { + manifest_ptr = removeFecHeader(content_object); + if (manifest_ptr) { + content_ptr = manifest_ptr.get(); + } + } + + // From there, the packet is either a FEC, a manifest or a data packet. + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; + + // Do not count timed out packets in stats + auto tn_it = timeouts_or_nacks_.find(segment_number); + if (tn_it != timeouts_or_nacks_.end()) { + compute_stats = false; + timeouts_or_nacks_.erase(tn_it); + } + + // Do not count retransmissions or losses in stats + if (ldr_->isRtx(segment_number) || + ldr_->isPossibleLossWithNoRtx(segment_number)) { + compute_stats = false; + } + + // Fetch packet state + state = state_->getPacketState(segment_number); + + // Check if the packet is a retransmission + if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { + if (is_data || is_manifest) { + uint64_t rtt = ldr_->getRtxRtt(segment_number); + state_->onPacketRecoveredRtx(content_object, rtt); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + + // The packet is considered received, return early + onDataPacketReceived(*content_ptr, compute_stats); + // this is a rtx but we may need to feed it in the decoder + decodePacket(content_object, is_manifest); + return; + } + + if (is_fec) { + state_->onFecPacketRecoveredRtx(content_object); + } + } + + // Fetch packet state again; it may have changed + state = state_->getPacketState(segment_number); + + // Check if the packet was already received + if (state == PacketState::RECEIVED || state == PacketState::TO_BE_RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received duplicated content " << segment_number << ", drop it"; + ec = make_error_code(protocol_error::duplicated_content); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + if (!is_fec) { + state_->dataToBeReceived(segment_number); + } + + // send packet to the decoder + decodePacket(content_object, is_manifest); + + // We can return early if FEC + if (is_fec) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received FEC " << segment_number; + state_->onFecPacketReceived(content_object); + onDataPacketReceived(*content_ptr, compute_stats); + return; + } + + // The packet may have been already sent to the app by the decoder, check + // again if it is already received + state = state_->getPacketState( + segment_number); // state == RECEIVED or TO_BE_RECEIVED + + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << (is_manifest ? "Received manifest " : "Received data ") + << segment_number; + + if (is_manifest) { + processManifest(interest, *content_ptr); + } + + state_->onDataPacketReceived(*content_ptr, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + + ec = is_manifest ? make_error_code(protocol_error::not_reassemblable) + : make_error_code(protocol_error::success); + } + + onDataPacketReceived(*content_ptr, compute_stats); +} + +void RTCTransportProtocol::sendStatsToApp( + uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, + uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec) { + if (*stats_summary_) { + // Send the stats to the app + stats_->updateQueuingDelay(state_->getQueuing()); + + // stats_->updateInterestFecTx(0); //todo must be implemented + // stats_->updateBytesFecRecv(0); //todo must be implemented + + stats_->updateRetxCount(retx_count); + stats_->updateBytesRecv(received_bytes); + stats_->updateInterestTx(sent_interests); + stats_->updateReceivedNacks(received_nacks); + stats_->updateReceivedFEC(received_fec); + + stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); + stats_->updateLossRatio(state_->getPerSecondLossRate()); + uint64_t rtt = state_->getAvgRTT(); + stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000)); + + stats_->updateQueuingDelay(state_->getQueuing()); + stats_->updateLostData(lost_data); + stats_->updateDefinitelyLostData(definitely_lost); + stats_->updateRecoveredData(recovered_losses); + stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); + (*stats_summary_)(*socket_->getInterface(), *stats_); + bool in_congestion = rc_->inCongestionState(); + stats_->updateCongestionState(in_congestion); + double residual_losses = state_->getResidualLossRate(); + stats_->updateResidualLossRate(residual_losses); + stats_->updateQualityScore(state_->getQualityScore()); + + // set alerts + if (rtt > MAX_RTT) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LATENCY); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LATENCY); + + if (in_congestion) + stats_->setAlert(interface::TransportStatistics::statsAlerts::CONGESTION); + else + stats_->clearAlert( + interface::TransportStatistics::statsAlerts::CONGESTION); + + if (residual_losses > MAX_RESIDUAL_LOSSES) + stats_->setAlert(interface::TransportStatistics::statsAlerts::LOSSES); + else + stats_->clearAlert(interface::TransportStatistics::statsAlerts::LOSSES); + } +} + +void RTCTransportProtocol::decodePacket(ContentObject &content_object, + bool is_manifest) { + if (!fec_decoder_) return; + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Send packet " << content_object.getName() << " to FEC decoder"; + + uint32_t offset = + is_manifest + ? (uint32_t)content_object.headerSize() + : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE); + uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); + + fec_decoder_->onDataPacket(content_object, offset, metadata); +} + +void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + Name *name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + + for (auto &packet : packets) { + uint32_t seq_number = packet.getIndex(); + uint32_t metadata = packet.getMetadata(); + fec::buffer buffer = packet.getBuffer(); + + PayloadType payload_type = static_cast<PayloadType>(metadata); + switch (payload_type) { + case PayloadType::DATA: + case PayloadType::MANIFEST: + break; + case PayloadType::UNSPECIFIED: + default: + payload_type = PayloadType::DATA; + break; + } + + switch (state_->getPacketState(seq_number)) { + case PacketState::RECEIVED: + case PacketState::TO_BE_RECEIVED: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet " << seq_number << " already received"; + break; + } + default: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << seq_number << " through FEC"; + + if (payload_type == PayloadType::MANIFEST) { + name->setSuffix(seq_number); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); + interest->setName(*name); + + auto content_object = toContentObject( + *name, format, payload_type, buffer->data(), buffer->length()); + + processManifest(*interest, *content_object); + } + + state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length()); + ldr_->onPacketRecoveredFec(seq_number); + + if (payload_type == PayloadType::DATA) { + verifier_->onDataRecoveredFec(seq_number); + reassembly_->reassemble(*buffer, seq_number); + } + + break; + } + } + } +} + +void RTCTransportProtocol::processManifest(Interest &interest, + ContentObject &manifest) { + auth::VerificationPolicy policy = verifier_->processManifest(manifest); + indexer_verifier_->applyPolicy(interest, manifest, false, policy); +} + +ContentObject::Ptr RTCTransportProtocol::removeFecHeader( + const ContentObject &content_object) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) { + return nullptr; + } + + size_t fec_header_size = fec_decoder_->getFecHeaderSize(false); + const uint8_t *payload = + content_object.data() + content_object.headerSize() + fec_header_size; + size_t payload_size = content_object.payloadSize() - fec_header_size; + + ContentObject::Ptr co = + toContentObject(content_object.getName(), content_object.getFormat(), + content_object.getPayloadType(), payload, payload_size); + + return co; +} + +ContentObject::Ptr RTCTransportProtocol::toContentObject( + const Name &name, Packet::Format format, PayloadType payload_type, + const uint8_t *payload, std::size_t payload_size, + std::size_t additional_header_size) { + // Recreate ContentObject + ContentObject::Ptr co = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + format, additional_header_size); + co->updateLength(payload_size); + co->append(payload_size); + co->trimStart(co->headerSize()); + + // Copy payload + std::memcpy(co->writableData(), payload, payload_size); + + // Restore network headers and some fields + co->prepend(co->headerSize()); + co->setName(name); + co->setPayloadType(payload_type); + + return co; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h new file mode 100644 index 000000000..a8a474216 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.h @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/rtc/rtc_forwarding_strategy.h> +#include <protocols/rtc/rtc_ldr.h> +#include <protocols/rtc/rtc_rc.h> +#include <protocols/rtc/rtc_reassembly.h> +#include <protocols/rtc/rtc_state.h> +#include <protocols/rtc/rtc_verifier.h> +#include <protocols/transport_protocol.h> + +#include <unordered_set> +#include <vector> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCTransportProtocol : public TransportProtocol { + public: + RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + ~RTCTransportProtocol(); + + using TransportProtocol::start; + + using TransportProtocol::stop; + + void resume() override; + + std::size_t transportHeaderLength(bool isFEC) override; + + auto shared_from_this() { return utils::shared_from(this); } + + private: + enum class SyncState { catch_up = 0, in_sync = 1, last }; + + private: + // setup functions + void initParams(); + void reset() override; + + void inactiveProducer(); + + // protocol functions + void discoveredRtt(); + void newRound(); + + // window functions + void computeMaxSyncWindow(); + void updateSyncWindow(); + + // packet functions + void sendRtxInterest(uint32_t seq); + void sendProbeInterest(uint32_t seq); + void sendInterestForTimeout(uint32_t seq); + void scheduleNextInterests() override; + void onInterestTimeout(Interest::Ptr &interest, const Name &name) override; + void onNack(const ContentObject &content_object); + void onProbe(const ContentObject &content_object); + void onContentObjectReceived(Interest &interest, + ContentObject &content_object, + std::error_code &ec) override; + void onPacketDropped(Interest &interest, ContentObject &content_object, + const std::error_code &reason) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} + void processManifest(Interest &interest, ContentObject &manifest); + + // interaction with app functions + void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes, + uint32_t sent_interests, uint32_t lost_data, + uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec); + + // FEC functions + // send the received content object to the decoder + void decodePacket(ContentObject &content_object, bool is_manifest); + void onFecPackets(fec::BufferArray &packets); + + // Utils + ContentObject::Ptr removeFecHeader(const ContentObject &content_object); + ContentObject::Ptr toContentObject(const Name &name, Packet::Format format, + PayloadType payload_type, + const uint8_t *payload, + std::size_t payload_size, + std::size_t additional_header_size = 0); + + // protocol state + bool start_send_interest_; + SyncState current_state_; + + // cwin vars + uint32_t current_sync_win_; + uint32_t max_sync_win_; + + // round timer + std::unique_ptr<asio::steady_timer> round_timer_; + + // scheduler timer (postpone interest sending to explot aggregated interests) + std::unique_ptr<asio::steady_timer> scheduler_timer_; + bool scheduler_timer_on_; + uint64_t last_interest_sent_time_; + uint64_t last_interest_sent_seq_; + + // maximum aggregated interest. if the transport is connected to the forwarder + // we cannot use aggregated interests + uint32_t max_aggregated_interest_; + // maximum number of intereset that can be sent in a loop to avoid packets + // dropped by the kernel + uint32_t max_sent_int_; + + // pacing timer (do not send too many interests in a short time to avoid + // packet drops in the kernel) + std::unique_ptr<asio::steady_timer> pacing_timer_; + bool pacing_timer_on_; + + // timeouts + std::unordered_set<uint32_t> timeouts_or_nacks_; + + std::shared_ptr<RTCState> state_; + std::shared_ptr<RTCRateControl> rc_; + std::shared_ptr<RTCLossDetectionAndRecovery> ldr_; + std::shared_ptr<RTCVerifier> verifier_; + + // forwarding strategy selection + RTCForwardingStrategy fwd_strategy_; + + uint32_t number_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h new file mode 100644 index 000000000..29b5a3a12 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/rtc/rtc_packet.h> +#include <stdint.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +// used in rtc +// protocol consts +const uint32_t ROUND_LEN = 200; +// ms interval of time on which +// we take decisions / measurements +const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8; +// how big (in ms) should be the buffer at the producer. +// increasing this number we increase the time that an +// interest will wait for the data packet to be produced +// at the producer socket +const uint32_t PRODUCER_BUFFER_MS = 300; // ms + +// interest scheduler +// const uint32_t MAX_INTERESTS_IN_BATCH = 5; +// const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec +const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per + // aggregated interest packet + // considering the name itself +const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try + // to aggregate interest in the + // same packet +const uint32_t MAX_PACING_BATCH = 5; // number of interest that we can send + // inside the loop before they get dropped + // by the kernel. +const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As + // for MAX_PACING_BATCH this value was + // computed during tests +const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop + +// packet const +const uint32_t RTC_INTEREST_LIFETIME = 4000; + +// probes sequence range +const uint32_t MIN_PROBE_SEQ = 0xefffffff; +const uint32_t MIN_INIT_PROBE_SEQ = MIN_PROBE_SEQ; +const uint32_t MAX_INIT_PROBE_SEQ = 0xf7ffffff - 1; +const uint32_t MIN_RTT_PROBE_SEQ = MAX_INIT_PROBE_SEQ + 1; +const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1; +// RTT_PROBE_INTERVAL will be used during the section while +// INIT_RTT_PROBE_INTERVAL is used at the beginning to +// quickily estimate the RTT +const uint32_t RTT_PROBE_INTERVAL = 200000; // us +const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us +const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT +// if the produdcer is not yet started we need to probe multple times +// to get an answer. we wait 100ms between each try +const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms +// once we get the first probe we wait at most 60ms for the others +const uint32_t INIT_RTT_PROBE_WAIT = + ((INIT_RTT_PROBES * INIT_RTT_PROBE_INTERVAL) / 1000) * 2; // ms +// we reuires at least 5 probes to be recevied +const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms +const uint32_t MAX_PENDING_PROBES = 10; + +// congestion +const double MAX_QUEUING_DELAY = 50.0; // ms + +// data from cache +const double MAX_DATA_FROM_CACHE = 0.10; // 10% + +// window const +const uint32_t INITIAL_WIN = 5; // pkts +const uint32_t INITIAL_WIN_MAX = 1000000; // pkts +const uint32_t WIN_MIN = 5; // pkts +const uint32_t WIN_MIN_WITH_AGGREGARED_DATA = 10; // pkts +const double CATCH_UP_WIN_INCREMENT = 1.2; +// used in rate control +const double WIN_DECREASE_FACTOR = 0.5; +const double WIN_INCREASE_FACTOR = 1.5; +const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes + +// round in congestion +const double ROUNDS_BEFORE_TAKE_ACTION = 5; + +// used in state +const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3; +const double PRODUCTION_RATE_FRACTION = 0.8; + +const uint32_t INIT_PACKET_SIZE = 1200; + +const double MOVING_AVG_ALPHA = 0.8; + +const double MILLI_IN_A_SEC = 1000.0; +const double MICRO_IN_A_SEC = 1000000.0; +const uint32_t ROUNDS_PER_SEC = (uint32_t)(MILLI_IN_A_SEC / ROUND_LEN); +const uint32_t ROUNDS_PER_MIN = (uint32_t)ROUNDS_PER_SEC * 60; + +const uint32_t MAX_ROUND_WHIOUT_PACKETS = + (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds; + +// used in ldr +const uint32_t RTC_MAX_RTX = 100; +const uint32_t RTC_MAX_AGE = 60000; // in ms +const uint64_t MAX_TIMER_RTX = ~0; +const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms +const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets +const double CATCH_UP_RTT_INCREMENT = 1.2; +const double MAX_RESIDUAL_LOSS_RATE = 1.0; // % +const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC; +const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms + +// used by producer +const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms +const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window * + // rounds in a second +const uint32_t FEC_PACING_TIME = 5; // ms + +// aggregated data consts +const uint16_t MAX_RTC_PAYLOAD_SIZE = 1200; // bytes +const uint16_t MAX_AGGREGATED_PACKETS = 5; // pkt +const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms + +// alert thresholds +const uint32_t MAX_RTT = 200; // ms +const double MAX_RESIDUAL_LOSSES = 0.05; // % + +const uint8_t FEC_MATRIX[64][10] = { + {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1 + {1, 2, 3, 3, 4, 5, 5, 6, 7, 9}, + {2, 2, 3, 4, 5, 6, 7, 8, 9, 11}, + {2, 3, 4, 5, 5, 7, 8, 9, 11, 13}, + {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5 + {2, 3, 4, 6, 7, 8, 10, 12, 14, 16}, + {2, 4, 5, 6, 8, 9, 11, 13, 15, 18}, + {3, 4, 5, 7, 8, 10, 12, 14, 16, 19}, + {3, 4, 6, 7, 9, 11, 13, 15, 18, 21}, + {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10 + {3, 5, 6, 8, 10, 12, 14, 17, 20, 24}, + {3, 5, 7, 8, 10, 13, 15, 18, 21, 26}, + {3, 5, 7, 9, 11, 13, 16, 19, 23, 27}, + {3, 5, 7, 9, 12, 14, 17, 20, 24, 28}, + {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15 + {4, 6, 8, 10, 13, 15, 19, 22, 26, 31}, + {4, 6, 8, 11, 13, 16, 19, 23, 27, 33}, + {4, 6, 9, 11, 14, 17, 20, 24, 29, 34}, + {4, 6, 9, 11, 14, 17, 21, 25, 30, 35}, + {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20 + {4, 7, 9, 12, 15, 19, 22, 27, 32, 38}, + {4, 7, 10, 13, 16, 19, 23, 28, 33, 40}, + {5, 7, 10, 13, 16, 20, 24, 29, 34, 41}, + {5, 7, 10, 13, 17, 20, 25, 30, 35, 42}, + {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25 + {5, 8, 11, 14, 18, 22, 26, 31, 38, 45}, + {5, 8, 11, 15, 18, 22, 27, 32, 39, 46}, + {5, 8, 11, 15, 19, 23, 28, 33, 40, 48}, + {5, 8, 12, 15, 19, 24, 28, 34, 41, 49}, + {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30 + {5, 9, 12, 16, 20, 25, 30, 36, 43, 51}, + {5, 9, 13, 16, 21, 25, 31, 37, 44, 53}, + {6, 9, 13, 17, 21, 26, 31, 38, 45, 54}, + {6, 9, 13, 17, 22, 26, 32, 39, 46, 55}, + {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35 + {6, 10, 14, 18, 22, 28, 34, 40, 48, 58}, + {6, 10, 14, 18, 23, 28, 34, 41, 49, 59}, + {6, 10, 14, 19, 23, 29, 35, 42, 50, 60}, + {6, 10, 14, 19, 24, 29, 36, 43, 52, 62}, + {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40 + {6, 11, 15, 20, 25, 31, 37, 45, 54, 64}, + {6, 11, 15, 20, 25, 31, 38, 46, 55, 65}, + {7, 11, 15, 20, 26, 32, 39, 46, 56, 67}, + {7, 11, 16, 21, 26, 32, 39, 47, 57, 68}, + {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45 + {7, 11, 16, 21, 27, 33, 41, 49, 59, 70}, + {7, 12, 16, 22, 27, 34, 41, 50, 60, 72}, + {7, 12, 17, 22, 28, 34, 42, 51, 61, 73}, + {7, 12, 17, 22, 28, 35, 43, 52, 62, 74}, + {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50 + {7, 12, 17, 23, 29, 36, 44, 53, 64, 77}, + {7, 12, 18, 23, 30, 37, 45, 54, 65, 78}, + {7, 13, 18, 24, 30, 37, 45, 55, 66, 79}, + {8, 13, 18, 24, 31, 38, 46, 56, 67, 80}, + {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55 + {8, 13, 19, 25, 31, 39, 47, 57, 69, 83}, + {8, 13, 19, 25, 32, 39, 48, 58, 70, 84}, + {8, 13, 19, 25, 32, 40, 49, 59, 71, 85}, + {8, 14, 19, 26, 33, 41, 50, 60, 72, 86}, + {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60 + {8, 14, 20, 26, 34, 42, 51, 61, 74, 89}, + {8, 14, 20, 27, 34, 42, 52, 62, 75, 90}, + {8, 14, 20, 27, 34, 43, 52, 63, 76, 91}, + {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64 +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc new file mode 100644 index 000000000..a421396b1 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/chrono_typedefs.h> +#include <protocols/rtc/rtc_data_path.h> +#include <stdlib.h> + +#include <algorithm> +#include <cfloat> +#include <chrono> +#include <cmath> + +#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec +#define AVG_RTT_TIME 1000 // (ms) 1sec + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCDataPath::RTCDataPath(uint32_t path_id) + : path_id_(path_id), + min_rtt(UINT_MAX), + prev_min_rtt(UINT_MAX), + max_rtt(0), + prev_max_rtt(0), + min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the + // real OWD, but the measured one, that depends on the + // clock of sender and receiver. the only meaningful + // value is is the queueing delay. for this reason we + // keep both RTT (for the windowd calculation) and OWD + // (for congestion/quality control) + prev_min_owd(INT_MAX), + avg_owd(DBL_MAX), + queuing_delay(DBL_MAX), + jitter_(0.0), + last_owd_(0), + largest_recv_seq_(0), + largest_recv_seq_time_(0), + avg_inter_arrival_(DBL_MAX), + rtt_sum_(0), + last_avg_rtt_compute_(0), + rtt_samples_(0), + avg_rtt_(0.0), + received_nacks_(false), + received_packets_(0), + rounds_without_packets_(0), + last_received_data_packet_(0), + min_RTT_history_(HISTORY_LEN), + max_RTT_history_(HISTORY_LEN), + OWD_history_(HISTORY_LEN){}; + +void RTCDataPath::insertRttSample( + const utils::SteadyTime::Milliseconds& rtt_milliseconds, bool is_probe) { + // compute min rtt + uint64_t rtt = rtt_milliseconds.count(); + if (rtt < min_rtt) min_rtt = rtt; + + uint64_t now = utils::SteadyTime::nowMs().count(); + last_received_data_packet_ = now; + + // compute avg rtt + if (is_probe) { + // max rtt is computed only on probes to avoid to take into account the + // production time at the server + if (rtt > max_rtt) max_rtt = rtt; + + rtt_sum_ += rtt; + rtt_samples_++; + } + + if ((now - last_avg_rtt_compute_) >= AVG_RTT_TIME) { + // compute a new avg rtt + // if rtt_samples_ = 0 keep the same rtt + if (rtt_samples_ != 0) avg_rtt_ = (double)rtt_sum_ / (double)rtt_samples_; + + rtt_sum_ = 0; + rtt_samples_ = 0; + last_avg_rtt_compute_ = now; + } + + received_packets_++; +} + +void RTCDataPath::insertOwdSample(int64_t owd) { + // for owd we use both min and avg + if (owd < min_owd) min_owd = owd; + + if (avg_owd != DBL_MAX) + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + else { + avg_owd = owd; + } + + int64_t queueVal = owd - std::min(getMinOwd(), min_owd); + + if (queuing_delay != DBL_MAX) + queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC); + else { + queuing_delay = queueVal; + } + + // keep track of the jitter computed as for RTP (RFC 3550) + int64_t diff = std::abs(owd - last_owd_); + last_owd_ = owd; + jitter_ += (1.0 / 16.0) * ((double)diff - jitter_); +} + +void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { + // got packet in sequence, compute gap + if (largest_recv_seq_ == (segment_number - 1)) { + uint64_t now = utils::SteadyTime::nowMs().count(); + uint64_t delta = now - largest_recv_seq_time_; + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = now; + if (avg_inter_arrival_ == DBL_MAX) + avg_inter_arrival_ = delta; + else + avg_inter_arrival_ = + (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC); + return; + } + + // ooo packet, update the stasts if needed + if (largest_recv_seq_ <= segment_number) { + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = utils::SteadyTime::nowMs().count(); + } +} + +void RTCDataPath::receivedNack() { received_nacks_ = true; } + +double RTCDataPath::getInterArrivalGap() { + if (avg_inter_arrival_ == DBL_MAX) return 0; + return avg_inter_arrival_; +} + +bool RTCDataPath::isValidProducer() { + if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) + return true; + return false; +} + +bool RTCDataPath::isActive() { + if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true; + return false; +} + +bool RTCDataPath::pathToProducer() { + if (received_nacks_) return true; + return false; +} + +void RTCDataPath::roundEnd() { + // reset min_rtt and add it to the history + if (min_rtt != UINT_MAX) { + prev_min_rtt = min_rtt; + } else { + // this may happen if we do not receive any packet + // from this path in the last round. in this case + // we use the measure from the previuos round + min_rtt = prev_min_rtt; + } + + // same for max_rtt + if (max_rtt != 0) { + prev_max_rtt = max_rtt; + } else { + max_rtt = prev_max_rtt; + } + + if (min_rtt == 0) min_rtt = 1; + if (max_rtt == 0) max_rtt = 1; + + min_RTT_history_.pushBack(min_rtt); + max_RTT_history_.pushBack(max_rtt); + min_rtt = UINT_MAX; + max_rtt = 0; + + // do the same for min owd + if (min_owd != INT_MAX) { + prev_min_owd = min_owd; + } else { + min_owd = prev_min_owd; + } + + if (min_owd != INT_MAX) { + OWD_history_.pushBack(min_owd); + min_owd = INT_MAX; + } + + if (received_packets_ == 0) + rounds_without_packets_++; + else + rounds_without_packets_ = 0; + received_packets_ = 0; +} + +uint32_t RTCDataPath::getPathId() { return path_id_; } + +double RTCDataPath::getQueuingDealy() { + if (queuing_delay == DBL_MAX) return 0; + return queuing_delay; +} + +uint64_t RTCDataPath::getMinRtt() { + if (min_RTT_history_.size() != 0) return min_RTT_history_.begin(); + return 0; +} + +uint64_t RTCDataPath::getAvgRtt() { return std::round(avg_rtt_); } + +uint64_t RTCDataPath::getMaxRtt() { + if (max_RTT_history_.size() != 0) return max_RTT_history_.begin(); + return 0; +} + +int64_t RTCDataPath::getMinOwd() { + if (OWD_history_.size() != 0) return OWD_history_.begin(); + return INT_MAX; +} + +double RTCDataPath::getJitter() { return jitter_; } + +uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; } + +uint32_t RTCDataPath::getPacketsLastRound() { return received_packets_; } + +void RTCDataPath::clearRtt() { + min_RTT_history_.clear(); + max_RTT_history_.clear(); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h new file mode 100644 index 000000000..ba5201fe8 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/chrono_typedefs.h> +#include <stdint.h> +#include <utils/max_filter.h> +#include <utils/min_filter.h> + +#include <climits> + +namespace transport { + +namespace protocol { + +namespace rtc { + +const double ALPHA_RTC = 0.125; +const uint32_t HISTORY_LEN = 20; // 4 sec + +class RTCDataPath { + public: + RTCDataPath(uint32_t path_id); + + public: + void insertRttSample(const utils::SteadyTime::Milliseconds &rtt, + bool is_probe); + void insertOwdSample(int64_t owd); + void computeInterArrivalGap(uint32_t segment_number); + void receivedNack(); + + uint32_t getPathId(); + uint64_t getMinRtt(); + uint64_t getAvgRtt(); + uint64_t getMaxRtt(); + double getQueuingDealy(); + double getInterArrivalGap(); + double getJitter(); + bool isActive(); // pakets recevied from this path in the last rounds + bool pathToProducer(); // path from a producer + bool isValidProducer(); // path from a producer that is also active + uint64_t getLastPacketTS(); + uint32_t getPacketsLastRound(); + + void clearRtt(); + + void roundEnd(); + + private: + uint32_t path_id_; + + int64_t getMinOwd(); + + uint64_t min_rtt; + uint64_t prev_min_rtt; + + uint64_t max_rtt; + uint64_t prev_max_rtt; + + int64_t min_owd; + int64_t prev_min_owd; + + double avg_owd; + + double queuing_delay; + + double jitter_; + int64_t last_owd_; + + uint32_t largest_recv_seq_; + uint64_t largest_recv_seq_time_; + double avg_inter_arrival_; + + // compute the avg rtt over one sec + uint64_t rtt_sum_; + uint64_t last_avg_rtt_compute_; + uint32_t rtt_samples_; + double avg_rtt_; + + // flags to check if a path is active + // we considere a path active if it reaches a producer + //(not a cache) --aka we got at least one nack on this path-- + // and if we receives packets + bool received_nacks_; + uint32_t received_packets_; + uint32_t rounds_without_packets_; // if we don't get any packet + // for MAX_ROUNDS_WITHOUT_PKTS + // we consider the path inactive + uint64_t last_received_data_packet_; // timestamp for the last data received + // on this path + + utils::MinFilter<uint64_t> min_RTT_history_; + utils::MaxFilter<uint64_t> max_RTT_history_; + utils::MinFilter<int64_t> OWD_history_; +}; + +} // namespace rtc + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc new file mode 100644 index 000000000..4bbd7eac0 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/notification.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_forwarding_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace transport::interface; + +const double FWD_MAX_QUEUE = 30.0; // ms +const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms +const double FWD_MAX_LOSS_RATE = 0.1; + +RTCForwardingStrategy::RTCForwardingStrategy() + : low_rate_app_(false), + init_(false), + forwarder_set_(false), + selected_strategy_(NONE), + current_strategy_(NONE), + rounds_since_last_set_(0), + portal_(nullptr), + state_(nullptr) {} + +RTCForwardingStrategy::~RTCForwardingStrategy() {} + +void RTCForwardingStrategy::setCallback( + interface::StrategyCallback&& callback) { + callback_ = std::move(callback); +} + +void RTCForwardingStrategy::initFwdStrategy( + std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, + interface::RtcTransportRecoveryStrategies strategy) { + switch (strategy) { + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF: + case interface::RtcTransportRecoveryStrategies::RTX_ONLY: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY: + case interface::RtcTransportRecoveryStrategies::DELAY_BASED: + case interface::RtcTransportRecoveryStrategies::LOW_RATE: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES: + default: + // fwd strategies are not used + init_ = false; + } + + if (init_) { + rounds_since_last_set_ = 0; + prefix_ = prefix; + portal_ = portal; + state_ = state; + } +} + +void RTCForwardingStrategy::checkStrategy() { + strategy_t used_strategy = selected_strategy_; + if (used_strategy == BOTH) used_strategy = current_strategy_; + assert(used_strategy == BEST_PATH || used_strategy == REPLICATION || + used_strategy == NONE); + + notification::ForwardingStrategy strategy = + notification::ForwardingStrategy::NONE; + switch (used_strategy) { + case BEST_PATH: + strategy = notification::ForwardingStrategy::BEST_PATH; + break; + case REPLICATION: + strategy = notification::ForwardingStrategy::REPLICATION; + break; + default: + break; + } + callback_(strategy); + + if (!init_) return; + + if (selected_strategy_ == NONE) return; + + if (selected_strategy_ == BEST_PATH) { + checkStrategyBestPath(); + return; + } + + if (selected_strategy_ == REPLICATION) { + checkStrategyReplication(); + return; + } + + checkStrategyBoth(); +} + +void RTCForwardingStrategy::checkStrategyBestPath() { + if (!forwarder_set_) { + setStrategy(BEST_PATH); + forwarder_set_ = true; + return; + } + + if (low_rate_app_) { + // this is used for gaming + uint8_t qs = state_->getQualityScore(); + + if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } + + // try to switch path + setStrategy(BEST_PATH); + } else { + if (rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } + + double queue = state_->getQueuing(); + double rtt = state_->getAvgRTT(); + double loss_rate = state_->getPerSecondLossRate(); + + if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT || + loss_rate > FWD_MAX_LOSS_RATE) { + // try to switch path + setStrategy(BEST_PATH); + } + } +} + +void RTCForwardingStrategy::checkStrategyReplication() { + if (!forwarder_set_) { + setStrategy(REPLICATION); + forwarder_set_ = true; + return; + } + + // here we have nothing to do for the moment + return; +} + +void RTCForwardingStrategy::checkStrategyBoth() { + if (!forwarder_set_) { + setStrategy(current_strategy_); + forwarder_set_ = true; + return; + } + + checkStrategyBestPath(); + + // TODO + // for the moment we use only best path. + // for later: + // 1. if both paths are bad use replication + // 2. while using replication compute the effectiveness. if the majority of + // the packets are coming from a single path, try to use bestpath +} + +void RTCForwardingStrategy::setStrategy(strategy_t strategy) { + rounds_since_last_set_ = 0; + current_strategy_ = strategy; + portal_->setForwardingStrategy(prefix_, + string_strategies_[current_strategy_]); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h new file mode 100644 index 000000000..c2227e09f --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/portal.h> +#include <hicn/transport/interfaces/callbacks.h> +#include <protocols/rtc/rtc_state.h> + +#include <array> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCForwardingStrategy { + public: + enum strategy_t { + BEST_PATH, + REPLICATION, + BOTH, + NONE, + }; + + RTCForwardingStrategy(); + ~RTCForwardingStrategy(); + + void initFwdStrategy(std::shared_ptr<core::Portal> portal, + core::Prefix& prefix, RTCState* state, + interface::RtcTransportRecoveryStrategies strategy); + + void checkStrategy(); + void setCallback(interface::StrategyCallback&& callback); + + private: + void checkStrategyBestPath(); + void checkStrategyReplication(); + void checkStrategyBoth(); + + void setStrategy(strategy_t strategy); + + std::array<std::string, 4> string_strategies_ = {"bestpath", "replication", + "both", "none"}; + + bool low_rate_app_; // if set to true the best path strategy will + // trigger a path switch based on the quality + // score, otherwise it will use the RTT, + // queuing delay and loss rate + bool init_; // true if all val are initializes + bool forwarder_set_; // true if the strategy is been set at least + // once + strategy_t selected_strategy_; // this is the strategy selected using socket + // options. this can also be equal to BOTH + strategy_t current_strategy_; // if both strategies can be used this + // indicates the one that is currently in use + // that can be only replication or best path + uint32_t rounds_since_last_set_; + core::Prefix prefix_; + std::shared_ptr<core::Portal> portal_; + RTCState* state_; + interface::StrategyCallback callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h new file mode 100644 index 000000000..f87fcaaa2 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_indexer.h @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/errors.h> +#include <protocols/fec_utils.h> +#include <protocols/indexer.h> +#include <protocols/rtc/probe_handler.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/transport_protocol.h> +#include <utils/suffix_strategy.h> + +#include <deque> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +namespace rtc { + +template <uint32_t LIMIT = MIN_PROBE_SEQ> +class RtcIndexer : public Indexer { + public: + RtcIndexer(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport) + : Indexer(icn_socket, transport), + first_suffix_(1), + next_suffix_(first_suffix_), + fec_type_(fec::FECType::UNKNOWN), + n_fec_(0), + n_current_fec_(n_fec_) {} + + RtcIndexer(RtcIndexer &&other) : Indexer(other) {} + + ~RtcIndexer() {} + + void reset() override { + next_suffix_ = first_suffix_; + n_fec_ = 0; + } + + uint32_t checkNextSuffix() const override { return next_suffix_; } + + uint32_t getNextSuffix() override { + if (isFec(next_suffix_)) { + if (n_current_fec_) { + auto ret = next_suffix_++; + n_current_fec_--; + return ret; + } else { + n_current_fec_ = n_fec_; + next_suffix_ = nextSource(next_suffix_); + } + } else if (!n_current_fec_) { + n_current_fec_ = n_fec_; + } + + return (next_suffix_++ % LIMIT); + } + + void setFirstSuffix(uint32_t suffix) override { + first_suffix_ = suffix % LIMIT; + } + + uint32_t getFirstSuffix() const override { return first_suffix_; } + + uint32_t jumpToIndex(uint32_t index) override { + next_suffix_ = index % LIMIT; + return next_suffix_; + } + + void onContentObject(core::Interest &interest, + core::ContentObject &content_object, + bool reassembly) override { + if (reassembly) { + reassembly_->reassemble(content_object); + } + } + + /** + * Retrieve the next segment to be reassembled. + */ + uint32_t getNextReassemblySegment() override { + throw errors::RuntimeException( + "Get reassembly segment called on rtc indexer. RTC indexer does not " + "provide reassembly."); + } + + bool isFinalSuffixDiscovered() override { return true; } + + uint32_t getFinalSuffix() const override { return LIMIT; } + + void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; } + + void disableFec() override { fec_type_ = fec::FECType::UNKNOWN; } + + void setNFec(uint32_t n_fec) override { + n_fec_ = n_fec; + n_current_fec_ = n_fec_; + } + + uint32_t getNFec() const override { return n_fec_; } + + bool isFec(uint32_t index) override { + return isFec(fec_type_, index, first_suffix_); + } + + double getFecOverhead() const override { + if (fec_type_ == fec::FECType::UNKNOWN) { + return 0; + } + + double k = (double)fec::FECUtils::getSourceSymbols(fec_type_); + return (double)n_fec_ / k; + } + + double getMaxFecOverhead() const override { + if (fec_type_ == fec::FECType::UNKNOWN) { + return 0; + } + + double k = (double)fec::FECUtils::getSourceSymbols(fec_type_); + double n = (double)fec::FECUtils::getBlockSymbols(fec_type_); + return (double)(n - k) / k; + } + + static bool isFec(fec::FECType fec_type, uint32_t index, + uint32_t first_suffix) { + if (index < LIMIT) { + return fec::FECUtils::isFec(fec_type, index, first_suffix); + } + + return false; + } + + static uint32_t nextSource(fec::FECType fec_type, uint32_t index, + uint32_t first_suffix) { + return fec::FECUtils::nextSource(fec_type, index, first_suffix) % LIMIT; + } + + private: + uint32_t nextSource(uint32_t index) { + return nextSource(fec_type_, index, first_suffix_); + } + + private: + uint32_t first_suffix_; + uint32_t next_suffix_; + fec::FECType fec_type_; + bool fec_enabled_; + uint32_t n_fec_; + uint32_t n_current_fec_; +}; + +} // namespace rtc +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc new file mode 100644 index 000000000..6e88a8636 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_ldr.h> +#include <protocols/rtc/rtc_rs_delay.h> +#include <protocols/rtc/rtc_rs_fec_only.h> +#include <protocols/rtc/rtc_rs_low_rate.h> +#include <protocols/rtc/rtc_rs_recovery_off.h> +#include <protocols/rtc/rtc_rs_rtx_only.h> +#include <protocols/rtc/rtc_state.h> + +#include <algorithm> +#include <unordered_set> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( + Indexer *indexer, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies type, + RecoveryStrategy::SendRtxCallback &&callback, + interface::StrategyCallback &&external_callback) { + if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { + rs_ = std::make_shared<RecoveryStrategyRecoveryOff>( + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { + rs_ = std::make_shared<RecoveryStrategyDelayBased>( + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { + rs_ = std::make_shared<RecoveryStrategyFecOnly>( + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + rs_ = std::make_shared<RecoveryStrategyLowRate>( + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else { + // default + type = interface::RtcTransportRecoveryStrategies::RTX_ONLY; + rs_ = std::make_shared<RecoveryStrategyRtxOnly>( + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } +} + +RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} + +void RTCLossDetectionAndRecovery::changeRecoveryStrategy( + interface::RtcTransportRecoveryStrategies type) { + if (type == rs_->getType()) return; + + rs_->updateType(type); + if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { + rs_ = + std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { + rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { + rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get()))); + } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_REPLICATION || + type == interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES) { + rs_ = std::make_shared<RecoveryStrategyLowRate>(std::move(*(rs_.get()))); + } else { + // default + rs_ = std::make_shared<RecoveryStrategyRtxOnly>(std::move(*(rs_.get()))); + } +} + +void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) { + rs_->incRoundId(); + rs_->onNewRound(in_sync); +} + +bool RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) { + if (!lost) { + return detectLoss(seq, seq + 1, false); + } else { + rs_->onLostTimeout(seq); + } + return false; +} + +bool RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { + rs_->receivedPacket(seq); + return false; +} + +bool RTCLossDetectionAndRecovery::onDataPacketReceived( + const core::ContentObject &content_object) { + uint32_t seq = content_object.getName().getSuffix(); + bool is_rtx = rs_->isRtx(seq); + rs_->receivedPacket(seq); + bool ret = false; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received data. add from " + << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq; + if (!is_rtx) + ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false); + + rs_->getState()->updateHighestSeqReceived(seq); + return ret; +} + +bool RTCLossDetectionAndRecovery::onNackPacketReceived( + const core::ContentObject &nack) { + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint32_t production_seq = nack_pkt->getProductionSegment(); + uint32_t seq = nack.getName().getSuffix(); + + // received a nack. we can try to recover all data packets between the last + // received data and the production seq in the nack. this is similar to the + // recption of a probe + // e.g.: the client receives packets 10 11 12 20 where 20 is a nack + // with productionSeq = 18. this says that all the packets between 12 and 18 + // may got lost and we should ask them + + rs_->receivedPacket(seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; + + // if it is a future nack store it in the list set of nacked seq + if (production_seq <= seq) rs_->receivedFutureNack(seq); + + // call the detectLoss function using the probe flag = true. in fact the + // losses detected using nacks are the same as the one detected using probes, + // we should not increase the loss counter + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, + production_seq, true); +} + +bool RTCLossDetectionAndRecovery::onProbePacketReceived( + const core::ContentObject &probe) { + // we don't log the reception of a probe packet for the sentinel timer because + // probes are not taken into account into the sync window. we use them as + // future nacks to detect possible packets lost + + uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg; + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; + + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, + production_seq, true); +} + +bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop, + bool recv_probe) { + if (start >= stop) return false; + + // skip nacked packets + if (start <= rs_->getState()->getLastSeqNacked()) { + start = rs_->getState()->getLastSeqNacked() + 1; + } + + // skip received or lost packets + if (start <= rs_->getState()->getHighestSeqReceived()) { + start = rs_->getState()->getHighestSeqReceived() + 1; + } + + bool loss_detected = false; + for (uint32_t seq = start; seq < stop; seq++) { + if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) { + if (rs_->lossDetected(seq)) { + loss_detected = true; + if ((recv_probe || rs_->wasNacked(seq)) && !rs_->isFecOn()) { + // these losses were detected using a probe and fec is off. + // in this case most likelly the procotol is about to go out of sync + // and the packets are not really lost (e.g. increase in prod rate). + // for this reason we do not + // count the losses in the stats. Instead we do the following + // 1. send RTX for the packets in case they were really lost + // 2. return to the RTC protocol that a loss was detected using a + // probe. the protocol will switch to catch_up mode to increase the + // size of the window + rs_->requestPossibleLostPacket(seq); + } else { + // if fec is on we don't need to mask pontetial losses, so increase + // the loss rate + rs_->notifyNewLossDetedcted(seq); + } + } + } + } + return loss_detected; +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h new file mode 100644 index 000000000..24f22ffed --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/config.h> +#include <hicn/transport/interfaces/socket_options_keys.h> +// RtcTransportRecoveryStrategies +#include <hicn/transport/core/asio_wrapper.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/name.h> +#include <protocols/rtc/rtc_recovery_strategy.h> + +#include <functional> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCLossDetectionAndRecovery + : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> { + public: + RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies type, + RecoveryStrategy::SendRtxCallback &&callback, + interface::StrategyCallback &&external_callback); + + ~RTCLossDetectionAndRecovery(); + + void setState(RTCState *state) { rs_->setState(state); } + void setRateControl(RTCRateControl *rateControl) { + rs_->setRateControl(rateControl); + } + + void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); } + + void setContentSharingMode() { rs_->setContentSharingMode(); } + void turnOnRecovery() { rs_->turnOnRecovery(); } + bool isRtxOn() { return rs_->isRtxOn(); } + + void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type); + + void onNewRound(bool in_sync); + + // the following functions return true if a loss is detected, false otherwise + bool onTimeout(uint32_t seq, bool lost); + bool onPacketRecoveredFec(uint32_t seq); + bool onDataPacketReceived(const core::ContentObject &content_object); + bool onNackPacketReceived(const core::ContentObject &nack); + bool onProbePacketReceived(const core::ContentObject &probe); + + void clear() { rs_->clear(); } + + bool isRtx(uint32_t seq) { return rs_->isRtx(seq); } + bool isPossibleLossWithNoRtx(uint32_t seq) { + return rs_->isPossibleLossWithNoRtx(seq); + } + + uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); } + + private: + // returns true if a loss is detected, false otherwise + bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe); + + std::shared_ptr<RecoveryStrategy> rs_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h new file mode 100644 index 000000000..ffbbd78fd --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +/* data packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | payload | + * | ... | + */ + +/* nack packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | uint32_t: current seg in production | + * +-----------------------------------------+ + */ + +/* aggregated packets + * +---------------------------------+ + * |c| #pkts | len1 | len2 | .... | + * +---------------------------------- + * + * +---------------------------------+ + * |c| #pkts | resv | len 1 | + * +---------------------------------- + * + * aggregated packets header. + * header position. just after the data packet header + * + * c: 1 bit: 0 8bit encoding, 1 16bit encoding + * #pkts: 7 bits: number of application packets contained + * 8bits encoding: + * lenX: 8 bits: len in bites of packet X + * 16bits econding: + * resv: 8 bits: reserved field (unused) + * lenX: 16bits: len in bytes of packet X + */ + +#pragma once +#ifndef _WIN32 +#include <arpa/inet.h> +#else +#include <hicn/transport/portability/win_portability.h> +#endif + +#include <hicn/transport/portability/endianess.h> + +#include <cstring> + +namespace transport { + +namespace protocol { + +namespace rtc { + +const uint32_t DATA_HEADER_SIZE = 12; // bytes + // XXX: sizeof(data_packet_t) is 16 + // beacuse of padding +const uint32_t NACK_HEADER_SIZE = 16; + +struct data_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } + + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } +}; + +struct nack_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + uint32_t prod_seg; + + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } + + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } + + inline uint32_t getProductionSegment() const { + return portability::net_to_host(prod_seg); + } + inline void setProductionSegment(uint32_t seg) { + prod_seg = portability::host_to_net(seg); + } +}; + +class AggrPktHeader { + public: + // XXX buf always point to the payload after the data header + AggrPktHeader(uint8_t *buf, uint16_t max_packet_len, uint16_t pkt_number) + : buf_(buf), pkt_num_(pkt_number) { + *buf_ = 0; // reset the first byte to correctly add the header + // encoding and the packet number + if (max_packet_len > 0xff) { + setAggrPktEncoding16bit(); + } else { + setAggrPktEncoding8bit(); + } + setAggrPktNUmber(pkt_number); + header_len_ = computeHeaderLen(); + memset(buf_ + 1, 0, header_len_ - 1); + } + + // XXX buf always point to the payload after the data header + AggrPktHeader(uint8_t *buf) : buf_(buf) { + encoding_ = getAggrPktEncoding(); + pkt_num_ = getAggrPktNumber(); + header_len_ = computeHeaderLen(); + } + + ~AggrPktHeader(){}; + + int addPacketToHeader(uint8_t index, uint16_t len) { + if (index > pkt_num_) return -1; + + setAggrPktLen(index, len); + return 0; + } + + int getPointerToPacket(uint8_t index, uint8_t **pkt_ptr, uint16_t *pkt_len) { + if (index > pkt_num_) return -1; + + uint16_t len = 0; + for (int i = 0; i < index; i++) + len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1 + + uint16_t offset = len + header_len_; + *pkt_ptr = buf_ + offset; + *pkt_len = getAggrPktLen(index); + return 0; + } + + int getPacketOffsets(uint8_t index, uint16_t *pkt_offset, uint16_t *pkt_len) { + if (index > pkt_num_) return -1; + + uint16_t len = 0; + for (int i = 0; i < index; i++) + len += getAggrPktLen(i); // sum the pkts len from 0 to index - 1 + + uint16_t offset = len + header_len_; + *pkt_offset = offset; + *pkt_len = getAggrPktLen(index); + + return 0; + } + + uint8_t *getPayloadAppendPtr() { return buf_ + header_len_; } + + uint16_t getHeaderLen() { return header_len_; } + + uint8_t getNumberOfPackets() { return pkt_num_; } + + private: + inline uint16_t computeHeaderLen() const { + uint16_t len = 4; // min len in bytes + if (!encoding_) { + while (pkt_num_ >= len) { + len += 4; + } + } else { + while (pkt_num_ * 2 >= len) { + len += 4; + } + } + return len; + } + + inline uint8_t getAggrPktEncoding() const { + // get the first bit of the first byte + return (*buf_ >> 7); + } + + inline void setAggrPktEncoding8bit() { + // reset the first bit of the first byte + encoding_ = 0; + *buf_ &= 0x7f; + } + + inline void setAggrPktEncoding16bit() { + // set the first bit of the first byte + encoding_ = 1; + *buf_ ^= 0x80; + } + + inline uint8_t getAggrPktNumber() const { + // return the first byte with the first bit = 0 + return (*buf_ & 0x7f); + } + + inline void setAggrPktNUmber(uint8_t val) { + // set the val without modifying the first bit + *buf_ &= 0x80; // reset everithing but the first bit + val &= 0x7f; // reset the first bit + *buf_ |= val; // or the vals, done! + } + + inline uint16_t getAggrPktLen(uint8_t pkt_index) const { + pkt_index++; + if (!encoding_) { // 8 bits + return (uint16_t) * (buf_ + pkt_index); + } else { // 16 bits + uint16_t *buf_16 = (uint16_t *)buf_; + return portability::net_to_host(*(buf_16 + pkt_index)); + } + } + + inline void setAggrPktLen(uint8_t pkt_index, uint16_t len) { + pkt_index++; + if (!encoding_) { // 8 bits + *(buf_ + pkt_index) = (uint8_t)len; + } else { // 16 bits + uint16_t *buf_16 = (uint16_t *)buf_; + *(buf_16 + pkt_index) = portability::host_to_net(len); + } + } + + uint8_t *buf_; + uint8_t encoding_; + uint8_t pkt_num_; + uint16_t header_len_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h new file mode 100644 index 000000000..62636ce40 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_state.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> { + public: + RTCRateControl() + : rc_on_(false), + congestion_win_(1000000), // init the win to a large number + congestion_state_(CongestionState::Normal), + protocol_state_(nullptr) {} + + virtual ~RTCRateControl() = default; + + void turnOnRateControl() { rc_on_ = true; } + void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; }; + uint32_t getCongestionWindow() { return congestion_win_; }; + bool inCongestionState() { + if (congestion_state_ == CongestionState::Congested) return true; + return false; + } + + virtual void onNewRound(double round_len) = 0; + virtual void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats) = 0; + + protected: + enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last }; + + protected: + bool rc_on_; + uint32_t congestion_win_; + CongestionState congestion_state_; + + std::shared_ptr<RTCState> protocol_state_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc new file mode 100644 index 000000000..6cd3094b5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.cc @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_congestion_detection.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlCongestionDetection::RTCRateControlCongestionDetection() + : rounds_without_congestion_(4), last_queue_(0) {} // must be > 3 + +RTCRateControlCongestionDetection::~RTCRateControlCongestionDetection() {} + +void RTCRateControlCongestionDetection::onNewRound(double round_len) { + if (!rc_on_) return; + + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + if (last_queue_ == queue) { + // if last_queue == queue the consumer didn't receive any + // packet from the producer. we do not change the current congestion state. + // we just increase the counter of rounds whithout congestion if needed + // (in case of congestion the counter is already set to 0) + if (congestion_state_ == CongestionState::Normal) + rounds_without_congestion_++; + } else { + if (queue > MAX_QUEUING_DELAY) { + // here we detect congestion. + congestion_state_ = CongestionState::Congested; + rounds_without_congestion_ = 0; + } else { + // wait 3 rounds before switch back to no congestion + if (rounds_without_congestion_ > 3) { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + } + rounds_without_congestion_++; + } + last_queue_ = queue; + } +} + +void RTCRateControlCongestionDetection::onDataPacketReceived( + const core::ContentObject &content_object, bool compute_stats) { + // nothing to do + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h new file mode 100644 index 000000000..9afa6c39a --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_congestion_detection.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlCongestionDetection : public RTCRateControl { + public: + RTCRateControlCongestionDetection(); + + ~RTCRateControlCongestionDetection(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + uint32_t rounds_without_congestion_; + double last_queue_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.cc b/libtransport/src/protocols/rtc/rtc_rc_iat.cc new file mode 100644 index 000000000..f06f377f3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_iat.cc @@ -0,0 +1,287 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_iat.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlIAT::RTCRateControlIAT() + : rounds_since_last_drop_(0), + rounds_without_congestion_(0), + rounds_with_congestion_(0), + last_queue_(0), + last_rcv_time_(0), + last_prod_time_(0), + last_seq_number_(0), + target_rate_avg_(0), + round_index_(0), + congestion_cause_(CongestionCause::UNKNOWN) {} + +RTCRateControlIAT::~RTCRateControlIAT() {} + +void RTCRateControlIAT::onNewRound(double round_len) { + if (!rc_on_) return; + + double received_rate = protocol_state_->getReceivedRate() + + protocol_state_->getRecoveredFecRate(); + + double target_rate = + protocol_state_->getProducerRate(); // * PRODUCTION_RATE_FRACTION; + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; + // double packet_size = protocol_state_->getAveragePacketSize(); + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + CongestionState prev_congestion_state = congestion_state_; + + target_rate_avg_ = target_rate_avg_ * (1 - MOVING_AVG_ALPHA) + + target_rate * MOVING_AVG_ALPHA; + + if (prev_congestion_state == CongestionState::Congested) { + if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) { + congestion_state_ = CongestionState::Congested; + + received_rate_.push_back(received_rate); + target_rate_.push_back(target_rate); + + // We assume the cause does not change + // Note that the first assumption about the cause could be wrong + // the cause of congestion could change + if (congestion_cause_ == CongestionCause::UNKNOWN) + if (rounds_with_congestion_ >= 1) + congestion_cause_ = apply_classification_tree( + rounds_with_congestion_ > ROUND_TO_WAIT_FORCE_DECISION); + + rounds_with_congestion_++; + } else { + congestion_state_ = CongestionState::Normal; + + // clear past history + reset_congestion_statistics(); + + // TODO maybe we can use some of these values for the stdev of the + // congestion mode + for (int i = 0; i < ROUND_HISTORY_SIZE; i++) { + iat_on_hold_[i].clear(); + } + } + } else if (queue > MAX_QUEUING_DELAY) { + if (prev_congestion_state == CongestionState::Normal) { + rounds_with_congestion_ = 0; + + if (rounds_without_congestion_ > ROUND_TO_RESET_CAUSE) + congestion_cause_ = CongestionCause::UNKNOWN; + } + congestion_state_ = CongestionState::Congested; + received_rate_.push_back(received_rate); + target_rate_.push_back(target_rate); + } else { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + reset_congestion_statistics(); + + int past_index = (round_index_ + 1) % ROUND_HISTORY_SIZE; + for (std::vector<double>::iterator it = iat_on_hold_[past_index].begin(); + it != iat_on_hold_[past_index].end(); ++it) { + congestion_free_iat_.push_back(*it); + if (congestion_free_iat_.size() > 50) { + congestion_free_iat_.erase(congestion_free_iat_.begin()); + } + } + iat_on_hold_[past_index].clear(); + round_index_ = (round_index_ + 1) % ROUND_HISTORY_SIZE; + } + + last_queue_ = queue; + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // init the congetion window using the received rate + // disabling for the moment the congestion window setup + // congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); + rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1; + } + + if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { + // disabling for the moment the congestion window setup + // uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + // congestion_win_ = std::max(win, WIN_MIN); + rounds_since_last_drop_ = 0; + return; + } + + rounds_since_last_drop_++; + } + + if (congestion_state_ == CongestionState::Normal) { + if (prev_congestion_state == CongestionState::Congested) { + rounds_without_congestion_ = 0; + } + + rounds_without_congestion_++; + if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; + + // disabling for the moment the congestion window setup + // congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR; + // congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); + } + + if (received_rate_.size() > 1000) + received_rate_.erase(received_rate_.begin()); + if (target_rate_.size() > 1000) target_rate_.erase(target_rate_.begin()); +} + +void RTCRateControlIAT::onDataPacketReceived( + const core::ContentObject &content_object, bool compute_stats) { + core::ParamsRTC params = RTCState::getDataParams(content_object); + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint32_t segment_number = content_object.getName().getSuffix(); + + if (segment_number == (last_seq_number_ + 1) && compute_stats) { + uint64_t iat = now - last_rcv_time_; + uint64_t ist = params.timestamp - last_prod_time_; + if (now >= last_rcv_time_ && params.timestamp > last_prod_time_) { + if (iat >= ist && ist < MIN_IST_VALUE) { + if (congestion_state_ == CongestionState::Congested) { + iat_.push_back((iat - ist)); + } else { + // no congestion, but we do not always add new values, but only when + // there is no sign of congestion + double queue = protocol_state_->getQueuing(); + if (queue <= CONGESTION_FREE_QUEUEING_DELAY) { + iat_on_hold_[round_index_].push_back((iat - ist)); + } + } + } + } + } + + last_seq_number_ = segment_number; + last_rcv_time_ = now; + last_prod_time_ = params.timestamp; + + if (iat_.size() > 1000) iat_.erase(iat_.begin()); + return; +} + +CongestionCause RTCRateControlIAT::apply_classification_tree(bool force_reply) { + if (iat_.size() <= 2 || received_rate_.size() < 2) + return CongestionCause::UNKNOWN; + + double received_ratio = 0; + double iat_ratio = 0; + double iat_stdev = compute_iat_stdev(iat_); + double iat_congestion_free_stdev = compute_iat_stdev(congestion_free_iat_); + + double iat_avg = 0.0; + + double recv_avg = 0.0; + double recv_max = 0.0; + + double target_rate_avg = 0.0; + + int counter = 0; + std::vector<double>::reverse_iterator target_it = target_rate_.rbegin(); + for (std::vector<double>::reverse_iterator it = received_rate_.rbegin(); + it != received_rate_.rend(); ++it) { + recv_avg += *it; + target_rate_avg += *target_it; + if (counter < ROUND_HISTORY_SIZE) + if (recv_max < *it) { + recv_max = *it; // we consider only the last 2 seconds + } + counter++; + target_it++; + } + recv_avg = recv_avg / received_rate_.size(); + target_rate_avg = target_rate_avg / target_rate_.size(); + + for (std::vector<double>::iterator it = iat_.begin(); it != iat_.end(); + ++it) { + iat_avg += *it; + } + iat_avg = iat_avg / iat_.size(); + + double congestion_free_iat_avg = 0.0; + for (std::vector<double>::iterator it = congestion_free_iat_.begin(); + it != congestion_free_iat_.end(); ++it) { + congestion_free_iat_avg += *it; + } + congestion_free_iat_avg = + congestion_free_iat_avg / congestion_free_iat_.size(); + + received_ratio = recv_avg / target_rate_avg; + + iat_ratio = iat_stdev / iat_congestion_free_stdev; + + CongestionCause congestion_cause = CongestionCause::UNKNOWN; + // applying classification tree model + if (received_ratio <= 0.87) + if (iat_stdev <= 6.48) + if (received_ratio <= 0.83) + congestion_cause = CongestionCause::LINK_CAPACITY; + else if (force_reply) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low + else if (iat_ratio <= 2.46) + if (force_reply) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::UNKNOWN; // accuracy is too low + else + congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC; + else if (received_ratio <= 0.913 && iat_stdev <= 0.784) + congestion_cause = CongestionCause::LINK_CAPACITY; + else + congestion_cause = CongestionCause::COMPETING_CROSS_TRAFFIC; + + return congestion_cause; +} + +void RTCRateControlIAT::reset_congestion_statistics() { + iat_.clear(); + received_rate_.clear(); + target_rate_.clear(); +} + +double RTCRateControlIAT::compute_iat_stdev(std::vector<double> v) { + if (v.size() == 0) return 0; + + float sum = 0.0, mean, standard_deviation = 0.0; + for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) { + sum += *it; + } + + mean = sum / v.size(); + for (std::vector<double>::iterator it = v.begin(); it != v.end(); it++) { + standard_deviation += pow(*it - mean, 2); + } + return sqrt(standard_deviation / v.size()); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_iat.h b/libtransport/src/protocols/rtc/rtc_rc_iat.h new file mode 100644 index 000000000..715637807 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_iat.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +const int ROUND_HISTORY_SIZE = 10; // equivalent to two seconds +const int ROUND_TO_WAIT_FORCE_DECISION = 5; + +// once congestion is gone, we need to wait for k rounds before changing the +// congestion cause in the case it appears again +const int ROUND_TO_RESET_CAUSE = 5; + +const int MIN_IST_VALUE = 150; // samples of ist larger than 150ms are + // discarded +const double CONGESTION_FREE_QUEUEING_DELAY = 10; + +enum class CongestionCause : uint8_t { + COMPETING_CROSS_TRAFFIC, + FRIENDLY_CROSS_TRAFFIC, + UNKNOWN_CROSS_TRAFFIC, + LINK_CAPACITY, + UNKNOWN +}; + +class RTCRateControlIAT : public RTCRateControl { + public: + RTCRateControlIAT(); + + ~RTCRateControlIAT(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + void reset_congestion_statistics(); + + double compute_iat_stdev(std::vector<double> v); + + CongestionCause apply_classification_tree(bool force_reply); + + private: + uint32_t rounds_since_last_drop_; + uint32_t rounds_without_congestion_; + uint32_t rounds_with_congestion_; + double last_queue_; + uint64_t last_rcv_time_; + uint64_t last_prod_time_; + uint32_t last_seq_number_; + double target_rate_avg_; + + // Iat values are not immediately added to the congestion free set of values + std::array<std::vector<double>, ROUND_HISTORY_SIZE> iat_on_hold_; + uint32_t round_index_; + + // with congestion statistics + std::vector<double> iat_; + std::vector<double> received_rate_; + std::vector<double> target_rate_; + + // congestion free statistics + std::vector<double> congestion_free_iat_; + + CongestionCause congestion_cause_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc new file mode 100644 index 000000000..ecabc5205 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_queue.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlQueue::RTCRateControlQueue() + : rounds_since_last_drop_(0), + rounds_without_congestion_(0), + last_queue_(0) {} + +RTCRateControlQueue::~RTCRateControlQueue() {} + +void RTCRateControlQueue::onNewRound(double round_len) { + if (!rc_on_) return; + + double received_rate = protocol_state_->getReceivedRate(); + double target_rate = + protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION; + double rtt = (double)protocol_state_->getMinRTT() / MILLI_IN_A_SEC; + double packet_size = protocol_state_->getAveragePacketSize(); + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + CongestionState prev_congestion_state = congestion_state_; + + if (prev_congestion_state == CongestionState::Normal && + received_rate >= target_rate) { + // if the queue is high in this case we are most likelly fighting with + // a TCP flow and there is enough bandwidth to match the producer rate + congestion_state_ = CongestionState::Normal; + } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) { + // here we detect congestion. in the case that last_queue == queue + // the consumer didn't receive any packet from the producer so we + // consider this case as congestion + // TODO: wath happen in case of high loss rate? + congestion_state_ = CongestionState::Congested; + } else { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + } + + last_queue_ = queue; + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // init the congetion window using the received rate + congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); + rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1; + } + + if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { + uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + congestion_win_ = std::max(win, WIN_MIN); + rounds_since_last_drop_ = 0; + return; + } + + rounds_since_last_drop_++; + } + + if (congestion_state_ == CongestionState::Normal) { + if (prev_congestion_state == CongestionState::Congested) { + rounds_without_congestion_ = 0; + } + + rounds_without_congestion_++; + if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; + + congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR; + congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); + } +} + +void RTCRateControlQueue::onDataPacketReceived( + const core::ContentObject &content_object, bool compute_stats) { + // nothing to do + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h new file mode 100644 index 000000000..cdf78fd47 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlQueue : public RTCRateControl { + public: + RTCRateControlQueue(); + + ~RTCRateControlQueue(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + uint32_t rounds_since_last_drop_; + uint32_t rounds_without_congestion_; + double last_queue_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc new file mode 100644 index 000000000..b1b0fcaba --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <protocols/rtc/rtc_reassembly.h> +#include <protocols/transport_protocol.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RtcReassembly::RtcReassembly(implementation::ConsumerSocket* icn_socket, + TransportProtocol* transport_protocol) + : DatagramReassembly(icn_socket, transport_protocol) { + is_setup_ = false; +} + +void RtcReassembly::reassemble(core::ContentObject& content_object) { + if (!is_setup_) { + is_setup_ = true; + reassembly_consumer_socket_->getSocketOption( + interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); + } + + auto read_buffer = content_object.getPayload(); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length(); + + read_buffer->trimStart(transport_protocol_->transportHeaderLength(false)); + + if (data_aggregation_) { + rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data()); + + for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) { + std::unique_ptr<utils::MemBuf> segment = read_buffer->clone(); + + uint16_t pkt_start = 0; + uint16_t pkt_len = 0; + int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len); + if (res == -1) { + // this should not happen + break; + } + + segment->trimStart(pkt_start); + segment->trimEnd(segment->length() - pkt_len); + + Reassembly::read_buffer_ = std::move(segment); + Reassembly::notifyApplication(); + } + } else { + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); + } +} + +void RtcReassembly::reassemble(utils::MemBuf& buffer, uint32_t suffix) { + if (!is_setup_) { + is_setup_ = true; + reassembly_consumer_socket_->getSocketOption( + interface::RtcTransportOptions::AGGREGATED_DATA, data_aggregation_); + } + + if (data_aggregation_) { + rtc::AggrPktHeader hdr((uint8_t*)buffer.data()); + + for (uint8_t i = 0; i < hdr.getNumberOfPackets(); i++) { + std::unique_ptr<utils::MemBuf> segment = buffer.clone(); + + uint16_t pkt_start = 0; + uint16_t pkt_len = 0; + int res = hdr.getPacketOffsets(i, &pkt_start, &pkt_len); + if (res == -1) { + // this should not happen + break; + } + + segment->trimStart(pkt_start); + segment->trimEnd(segment->length() - pkt_len); + + Reassembly::read_buffer_ = std::move(segment); + Reassembly::notifyApplication(); + } + + } else { + Reassembly::read_buffer_ = buffer.cloneOne(); + Reassembly::notifyApplication(); + } +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h new file mode 100644 index 000000000..132004605 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_reassembly.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <glog/logging.h> +#include <protocols/datagram_reassembly.h> +#include <protocols/rtc/rtc_consts.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RtcReassembly : public DatagramReassembly { + public: + RtcReassembly(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + void reassemble(core::ContentObject &content_object) override; + void reassemble(utils::MemBuf &buffer, uint32_t suffix) override; + + private: + bool is_setup_; + bool data_aggregation_; +}; + +} // namespace rtc +} // namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc new file mode 100644 index 000000000..257fdd09b --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -0,0 +1,420 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/interfaces/notification.h> +#include <hicn/transport/interfaces/socket_options_keys.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace transport::interface; + +RecoveryStrategy::RecoveryStrategy( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : rs_type_(rs_type), + recovery_on_(false), + content_sharing_mode_(false), + rtx_during_fec_(0), + next_rtx_timer_(MAX_TIMER_RTX), + send_rtx_callback_(std::move(callback)), + indexer_(indexer), + round_id_(0), + last_fec_used_(0), + callback_(std::move(external_callback)) { + setRtxFec(use_rtx, use_fec); + timer_ = std::make_unique<asio::steady_timer>(io_service); +} + +RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) + : rs_type_(rs.rs_type_), + content_sharing_mode_(rs.content_sharing_mode_), + rtx_during_fec_(0), + rtx_state_(std::move(rs.rtx_state_)), + rtx_timers_(std::move(rs.rtx_timers_)), + recover_with_fec_(std::move(rs.recover_with_fec_)), + timer_(std::move(rs.timer_)), + next_rtx_timer_(std::move(rs.next_rtx_timer_)), + send_rtx_callback_(std::move(rs.send_rtx_callback_)), + n_(std::move(rs.n_)), + k_(std::move(rs.k_)), + indexer_(std::move(rs.indexer_)), + state_(std::move(rs.state_)), + rc_(std::move(rs.rc_)), + round_id_(std::move(rs.round_id_)), + last_fec_used_(std::move(rs.last_fec_used_)), + callback_(std::move(rs.callback_)) { + setFecParams(n_, k_); +} + +RecoveryStrategy::~RecoveryStrategy() {} + +void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { + // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64 + n_ = n; + k_ = k; + + // XXX for the moment we go in steps of 5% loss rate. + uint32_t i = 0; + for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { + uint32_t fec_to_ask = 0; + if (n_ != 0 && k_ != 0) { + if (rs_type_ == + interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) { + // the max loss rate in the matrix is 50% + uint32_t index = i; + if (i > 9) index = 9; + fec_to_ask = FEC_MATRIX[k_ - 1][index]; + } else { + double dec_loss_rate = (double)(loss_rate + 5); + if (dec_loss_rate == 100.0) dec_loss_rate = 95.0; + dec_loss_rate = dec_loss_rate / 100.0; + double exp_losses = ceil((double)k_ * dec_loss_rate); + fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25); + } + } + fec_to_ask = std::min(fec_to_ask, (n_ - k_)); + fec_per_loss_rate_.push_back(fec_to_ask); + + i++; + } +} + +uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) { + auto it = rtx_state_.find(seq); + + if (it == rtx_state_.end()) return 0; + + // we can compute the RTT of an RTX only if it was send once. Infact if the + // RTX was sent twice or more the data may be alredy in flight and the RTT + // will be underestimated. This may happen also for packets that we + // retransmitted too soon. in that case the RTT will be filtered out by + // checking the path label + if (it->second.rtx_count_ != 1) return 0; + + // this a potentialy valid packet, compute the RTT + return (utils::SteadyTime::nowMs().count() - it->second.last_send_); +} + +bool RecoveryStrategy::lossDetected(uint32_t seq) { + if (isRtx(seq)) { + // this packet is already in the list of rtx + return false; + } + + auto it_fec = recover_with_fec_.find(seq); + if (it_fec != recover_with_fec_.end()) { + // this packet is already in list of packets to recover with fec + // this list contians also fec packets that will not be recovered with rtx + return false; + } + + auto it_nack = nacked_seq_.find(seq); + if (it_nack != nacked_seq_.end()) { + // this packet was nacked so we do not use it to determine the loss rate + return false; + } + + return true; +} + +void RecoveryStrategy::notifyNewLossDetedcted(uint32_t seq) { + // new loss detected + // first record the loss. second do what is needed to recover it + state_->onLossDetected(seq); + newPacketLoss(seq); +} + +void RecoveryStrategy::requestPossibleLostPacket(uint32_t seq) { + // these are packets for which we send a RTX but we do not increase the loss + // counter beacuse we don't know if they are lost or not + addNewRtx(seq, false); +} + +void RecoveryStrategy::receivedFutureNack(uint32_t seq) { + nacked_seq_.insert(seq); +} + +void RecoveryStrategy::clear() { + rtx_state_.clear(); + rtx_timers_.clear(); + recover_with_fec_.clear(); + + if (next_rtx_timer_ != MAX_TIMER_RTX) { + next_rtx_timer_ = MAX_TIMER_RTX; + timer_->cancel(); + } +} + +// rtx functions +void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { + if (!indexer_->isFec(seq) || force) { + // this packet needs to be re-transmitted + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.last_send_ = state.first_send_; // we didn't send an RTX for this + // packet yet + state.rtx_count_ = 0; + state.next_send_ = computeNextSend(seq, state.rtx_count_); + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Add " << seq << " to retransmissions. next rtx is in " + << state.next_send_ - getNow() << " ms"; + rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); + rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq)); + + // if a new rtx is introduced, check the rtx timer + scheduleNextRtx(); + } else { + // do not re-send fec packets but keep track of them + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } +} + +uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) { + uint64_t now = getNow(); + if (rtx_counter == 0) { + uint32_t wait = 1; + if (content_sharing_mode_) return now + wait; + + uint32_t jitter = SENTINEL_TIMER_INTERVAL; + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) jitter = ceil(state_->getJitter()); + + wait += jitter; + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait + << " ms, jitter = " << jitter; + + return now + wait; + } else { + // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx + double prod_rate = state_->getProducerRate(); + if (prod_rate == 0) { + return now + SENTINEL_TIMER_INTERVAL; + } + + uint64_t rtt = 0; + // if the transport detects an edge we try first to get the RTX from the + // edge. if no interest get a reply we move to the full RTT + if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) { + rtt = state_->getEdgeRtt(); + } else { + rtt = state_->getAvgRTT(); + } + + if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; + + if (content_sharing_mode_) return now + rtt; + + uint32_t wait = (uint32_t)rtt; + + uint32_t jitter = ceil(state_->getJitter()); + wait += jitter; + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt + << " jtter = " << jitter; + + return now + wait; + } +} + +void RecoveryStrategy::retransmit() { + if (rtx_timers_.size() == 0) return; + + uint64_t now = getNow(); + + auto it = rtx_timers_.begin(); + std::unordered_set<uint32_t> lost_pkt; + uint32_t sent_counter = 0; + while (it != rtx_timers_.end() && it->first <= now && + sent_counter < MAX_RTX_IN_BATCH) { + uint32_t seq = it->second; + auto rtx_it = + rtx_state_.find(seq); // this should always return a valid iter + if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX || + (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || + seq < state_->getLastSeqNacked()) { + // max rtx reached or packet too old or packet nacked, this packet is lost + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "packet " << seq << " lost because 1) max rtx: " + << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: " + << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE) + << " 3) nacked: " << (seq < state_->getLastSeqNacked()); + lost_pkt.insert(seq); + it++; + } else { + // resend the packet + state_->onRetransmission(seq); + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) rtx_it->second.rtx_count_++; + rtx_it->second.last_send_ = now; + rtx_it->second.next_send_ = + computeNextSend(seq, rtx_it->second.rtx_count_); + it = rtx_timers_.erase(it); + rtx_timers_.insert( + std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "send rtx for sequence " << seq << ", next send in " + << (rtx_it->second.next_send_ - now); + + // if fec is on increase the number of RTX send during fec + if (fec_on_) rtx_during_fec_++; + send_rtx_callback_(seq); + sent_counter++; + } + } + + // remove packets if needed + for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) { + uint32_t seq = *lost_it; + state_->onPacketLost(seq); + deleteRtx(seq); + } +} + +void RecoveryStrategy::scheduleNextRtx() { + if (rtx_timers_.size() == 0) { + // all the rtx were removed, reset timer + next_rtx_timer_ = MAX_TIMER_RTX; + return; + } + + // check if timer is alreay set + if (next_rtx_timer_ != MAX_TIMER_RTX) { + // a new check for rtx is already scheduled + if (next_rtx_timer_ > rtx_timers_.begin()->first) { + // we need to re-schedule it + timer_->cancel(); + } else { + // wait for the next timer + return; + } + } + + // set a new timer + next_rtx_timer_ = rtx_timers_.begin()->first; + uint64_t now = utils::SteadyTime::nowMs().count(); + uint64_t wait = 1; + if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now) + wait = next_rtx_timer_ - now; + + std::weak_ptr<RecoveryStrategy> self(shared_from_this()); + timer_->expires_from_now(std::chrono::milliseconds(wait)); + timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + if (auto s = self.lock()) { + s->retransmit(); + s->next_rtx_timer_ = MAX_TIMER_RTX; + s->scheduleNextRtx(); + } + }); +} + +void RecoveryStrategy::deleteRtx(uint32_t seq) { + auto it_rtx = rtx_state_.find(seq); + if (it_rtx == rtx_state_.end()) return; // rtx not found + + // remove the rtx from the timers list + uint64_t ts = it_rtx->second.next_send_; + auto it_timers = rtx_timers_.find(ts); + while (it_timers != rtx_timers_.end() && it_timers->first == ts) { + if (it_timers->second == seq) { + rtx_timers_.erase(it_timers); + break; + } + it_timers++; + } + + // remove rtx + rtx_state_.erase(it_rtx); +} + +// fec functions +uint32_t RecoveryStrategy::computeFecPacketsToAsk() { + double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in % + + if (loss_rate > 95) loss_rate = 95; // max loss rate + + if (loss_rate == 0) return 0; + + // keep track of the last used fec. if we use a new bin on this round reset + // consecutive use and avg loss in the prev bin + uint32_t bin = ceil(loss_rate / 5.0) - 1; + if (bin > fec_per_loss_rate_.size() - 1) + bin = (uint32_t)fec_per_loss_rate_.size() - 1; + + return fec_per_loss_rate_[bin]; +} + +void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, + std::optional<bool> fec_on) { + if (rtx_on) rtx_on_ = *rtx_on; + if (fec_on) { + if (fec_on_ == false && (*fec_on) == true) { // turn on fec + // reset the number of RTX sent during fec + rtx_during_fec_ = 0; + } + fec_on_ = *fec_on; + } + + notification::RecoveryStrategy strategy = + notification::RecoveryStrategy::RECOVERY_OFF; + + if (rtx_on_ && fec_on_) + strategy = notification::RecoveryStrategy::RTX_AND_FEC; + else if (rtx_on_) + strategy = notification::RecoveryStrategy::RTX_ONLY; + else if (fec_on_) + strategy = notification::RecoveryStrategy::FEC_ONLY; + + callback_(strategy); +} + +// common functions +void RecoveryStrategy::onLostTimeout(uint32_t seq) { removePacketState(seq); } + +void RecoveryStrategy::removePacketState(uint32_t seq) { + auto it_fec = recover_with_fec_.find(seq); + if (it_fec != recover_with_fec_.end()) { + recover_with_fec_.erase(it_fec); + return; + } + + auto it_nack = nacked_seq_.find(seq); + if (it_nack != nacked_seq_.end()) { + nacked_seq_.erase(it_nack); + return; + } + + deleteRtx(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h new file mode 100644 index 000000000..405e1ebba --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/utils/chrono_typedefs.h> +#include <protocols/indexer.h> +#include <protocols/rtc/rtc_rc.h> +#include <protocols/rtc/rtc_state.h> + +#include <map> +#include <optional> +#include <unordered_map> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { + protected: + struct rtx_state_ { + uint64_t first_send_; // first time this interest was sent + uint64_t last_send_; // last time this rtx was sent + uint64_t next_send_; // next retransmission time + uint32_t rtx_count_; // number or rtx + }; + + using rtxState = struct rtx_state_; + + public: + using SendRtxCallback = std::function<void(uint32_t)>; + + RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategy(RecoveryStrategy &&rs); + + virtual ~RecoveryStrategy(); + + void setRtxFec(std::optional<bool> rtx_on = {}, + std::optional<bool> fec_on = {}); + void setState(RTCState *state) { state_ = state; } + void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; } + void setFecParams(uint32_t n, uint32_t k); + void setContentSharingMode() { content_sharing_mode_ = true; } + + bool isRtx(uint32_t seq) { + if (rtx_state_.find(seq) != rtx_state_.end()) return true; + return false; + } + + bool isPossibleLossWithNoRtx(uint32_t seq) { + if (recover_with_fec_.find(seq) != recover_with_fec_.end()) return true; + return false; + } + + bool wasNacked(uint32_t seq) { + if (nacked_seq_.find(seq) != nacked_seq_.end()) return true; + return false; + } + + interface::RtcTransportRecoveryStrategies getType() { + return rs_type_; + } + void updateType(interface::RtcTransportRecoveryStrategies type) { + rs_type_ = type; + } + bool isRtxOn() { return rtx_on_; } + bool isFecOn() { return fec_on_; } + + RTCState *getState() { return state_; } + + // if the function returns 0 it means that the packet is not an RTX or it is + // not a valid packet to safely compute the RTT + uint64_t getRtxRtt(uint32_t seq); + bool lossDetected(uint32_t seq); + void notifyNewLossDetedcted(uint32_t seq); + void requestPossibleLostPacket(uint32_t seq); + void receivedFutureNack(uint32_t seq); + void clear(); + + virtual void turnOnRecovery() = 0; + virtual void onNewRound(bool in_sync) = 0; + virtual void newPacketLoss(uint32_t seq) = 0; + virtual void receivedPacket(uint32_t seq) = 0; + void onLostTimeout(uint32_t seq); + + void incRoundId() { round_id_++; } + + // utils + uint64_t getNow() { + uint64_t now = utils::SteadyTime::nowMs().count(); + return now; + } + + protected: + // rtx functions + void addNewRtx(uint32_t seq, bool force); + uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter); + void retransmit(); + void scheduleNextRtx(); + void deleteRtx(uint32_t seq); + + // fec functions + uint32_t computeFecPacketsToAsk(); + + // common functons + void removePacketState(uint32_t seq); + + interface::RtcTransportRecoveryStrategies rs_type_; + bool recovery_on_; + bool rtx_on_; + bool fec_on_; + bool content_sharing_mode_; + + // number of RTX sent after fec turned on + // this is used to take into account jitter and out of order packets + // if we detect losses but we do not sent any RTX it means that the holes in + // the sequence are caused by the jitter + uint32_t rtx_during_fec_; + + // this map keeps track of the retransmitted interest, ordered from the oldest + // to the newest one. the state contains the timer of the first send of the + // interest (from pendingIntetests_), the timer of the next send (key of the + // multimap) and the number of rtx + std::map<uint32_t, rtxState> rtx_state_; + // this map stored the rtx by timer. The key is the time at which the rtx + // should be sent, and the val is the interest seq number + std::multimap<uint64_t, uint32_t> rtx_timers_; + + // lost packets that will be recovered with fec + std::unordered_set<uint32_t> recover_with_fec_; + + // packet for which we recived a future nack + // in case we detect a loss for a nacked packet we send an RTX but we do not + // increase the loss counter. this is done because it may happen that the + // producer rate checkes over time and in flight interest may be satified by + // data packet after the reception of nacks + std::unordered_set<uint32_t> nacked_seq_; + + // rtx vars + std::unique_ptr<asio::steady_timer> timer_; + uint64_t next_rtx_timer_; + SendRtxCallback send_rtx_callback_; + + // fec vars + uint32_t n_; + uint32_t k_; + Indexer *indexer_; + + RTCState *state_; + RTCRateControl *rc_; + + private: + uint32_t round_id_; // number of rounds + uint32_t last_fec_used_; + std::vector<uint32_t> fec_per_loss_rate_; + interface::StrategyCallback callback_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc new file mode 100644 index 000000000..7d7a01133 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_delay.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyDelayBased::RecoveryStrategyDelayBased( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + rs_type, + std::move(external_callback)), // start with rtx + congestion_state_(false), + probing_state_(false), + switch_rounds_(0) {} + +RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(true, false); + // we have to re-init congestion and + // probing + switch_rounds_ = 0; + congestion_state_ = false; + probing_state_ = false; +} + +RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {} + +void RecoveryStrategyDelayBased::turnOnRecovery() { + recovery_on_ = true; + uint64_t rtt = state_->getMinRTT(); + uint32_t fec_to_ask = computeFecPacketsToAsk(); + if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) { + // we need to start FEC (see fec only strategy for more details) + setRtxFec(true, true); + rtx_during_fec_ = 1; // avoid to stop fec + indexer_->setNFec(fec_to_ask); + } else { + // use RTX + setRtxFec(true, false); + switch_rounds_ = 0; + } +} + +void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) { + if (fec_to_ask == 0) { + setRtxFec(true, false); + switch_rounds_ = 0; + } else { + switch_rounds_++; + if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) && + rtx_during_fec_ != 0) { // go to fec only if it is needed (RTX are on) + setRtxFec(false, true); + } else { + setRtxFec(true, true); + } + } +} + +void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { + if (!recovery_on_) { + // disable fec so that no extra packet will be sent + // for rtx we check if recovery is on in newPacketLoss + setRtxFec(true, false); + indexer_->setNFec(0); + return; + } + + uint64_t rtt = state_->getAvgRTT(); + + // XXX at the moment we are not looking at congestion events + // bool congestion = rc_->inCongestionState(); + + if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) || + (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) { + // switch from rtx to fec or keep use fec. Notice that if some rtx are + // waiting to be scheduled, they will be sent normally, but no new rtx will + // be created if the loss rate is 0 keep to use RTX. + uint32_t fec_to_ask = computeFecPacketsToAsk(); + softSwitchToFec(fec_to_ask); + if (rtx_during_fec_ == 0) // if we do not send any RTX the losses + // registered may be due to jitter + indexer_->setNFec(0); + else + indexer_->setNFec(fec_to_ask); + return; + } + + if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) || + (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) { + // turn on rtx + softSwitchToFec(0); + indexer_->setNFec(0); + return; + } +} + +void RecoveryStrategyDelayBased::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyDelayBased::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +void RecoveryStrategyDelayBased::probing() { + // TODO + // for the moment ask for all fec and exit the probing phase + probing_state_ = false; + setRtxFec(false, true); + indexer_->setNFec(computeFecPacketsToAsk()); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h new file mode 100644 index 000000000..9e1c41388 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyDelayBased : public RecoveryStrategy { + public: + RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategyDelayBased(RecoveryStrategy &&rs); + + ~RecoveryStrategyDelayBased(); + + void turnOnRecovery(); + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + void softSwitchToFec(uint32_t fec_to_ask); + + bool congestion_state_; + bool probing_state_; + uint32_t switch_rounds_; + + void probing(); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc new file mode 100644 index 000000000..5b10823ec --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_fec_only.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyFecOnly::RecoveryStrategyFecOnly( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + rs_type, std::move(external_callback)), + congestion_state_(false), + probing_state_(false), + switch_rounds_(0) {} + +RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(true, false); + switch_rounds_ = 0; + congestion_state_ = false; + probing_state_ = false; +} + +RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {} + +void RecoveryStrategyFecOnly::turnOnRecovery() { + recovery_on_ = true; + // init strategy + uint32_t fec_to_ask = computeFecPacketsToAsk(); + if (fec_to_ask > 0) { + // the probing phase detected a lossy link. we immedialty start the fec and + // we disable the check to prevent to send fec packets before RTX. in fact + // here we know that we have losses and it is not a problem of OOO packets + setRtxFec(true, true); + rtx_during_fec_ = 1; // avoid to stop fec + indexer_->setNFec(fec_to_ask); + } else { + // keep only RTX on + setRtxFec(true, true); + } +} + +void RecoveryStrategyFecOnly::onNewRound(bool in_sync) { + if (!recovery_on_) { + indexer_->setNFec(0); + return; + } + + // XXX for the moment we are considering congestion events + // if(rc_->inCongestionState()){ + // congestion_state_ = true; + // probing_state_ = false; + // indexer_->setNFec(0); + // return; + // } + + // no congestion + if (congestion_state_) { + // this is the first round after congestion + // enter probing phase + probing_state_ = true; + congestion_state_ = false; + } + + if (probing_state_) { + probing(); + } else { + uint32_t fec_to_ask = computeFecPacketsToAsk(); + // If fec_to_ask == 0 we use rtx even if in these strategy we use only fec. + // In this way the first packet loss that triggers the usage of fec can be + // recovered using rtx, otherwise it will always be lost + if (fec_to_ask == 0) { + setRtxFec(true, false); + switch_rounds_ = 0; + } else { + switch_rounds_++; + if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) && + rtx_during_fec_ != + 0) { // go to fec only if it is needed (RTX are on) + setRtxFec(false, true); + } else { + setRtxFec(true, true); // keep both + } + } + if (rtx_during_fec_ == 0) // if we do not send any RTX the losses + // registered may be due to jitter + indexer_->setNFec(0); + else + indexer_->setNFec(fec_to_ask); + } +} + +void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + // if not pending add to list to recover with fec + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyFecOnly::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +void RecoveryStrategyFecOnly::probing() { + // TODO + // for the moment ask for all fec and exit the probing phase + probing_state_ = false; + uint32_t fec_to_ask = computeFecPacketsToAsk(); + indexer_->setNFec(fec_to_ask); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h new file mode 100644 index 000000000..42df25bd9 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyFecOnly : public RecoveryStrategy { + public: + RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategyFecOnly(RecoveryStrategy &&rs); + + ~RecoveryStrategyFecOnly(); + + void turnOnRecovery(); + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + bool congestion_state_; + bool probing_state_; + uint32_t switch_rounds_; + + void probing(); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc new file mode 100644 index 000000000..dbad563cd --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_low_rate.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyLowRate::RecoveryStrategyLowRate( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, + rs_type, + std::move(external_callback)), // start with fec + fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec + rtx_allowed_consecutive_rounds_(0) { + initSwitchVector(); +} + +RecoveryStrategyLowRate::RecoveryStrategyLowRate(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)), + fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec + rtx_allowed_consecutive_rounds_(0) { + setRtxFec(false, true); + initSwitchVector(); +} + +RecoveryStrategyLowRate::~RecoveryStrategyLowRate() {} + +void RecoveryStrategyLowRate::initSwitchVector() { + // TODO adjust thresholds here when new data are collected + // see resutls in + // https://confluence-eng-gpk1.cisco.com/conf/display/SPT/dailyreports + thresholds_t t1; + t1.rtt = 15; // 15ms + t1.loss_rtx_to_fec = 15; // 15% + t1.loss_fec_to_rtx = 10; // 10% + thresholds_t t2; + t2.rtt = 35; // 35ms + t2.loss_rtx_to_fec = 5; // 5% + t2.loss_fec_to_rtx = 1; // 1% + switch_vector.push_back(t1); + switch_vector.push_back(t2); +} + +void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec, + uint32_t fec_to_ask) { + setRtxFec(use_rtx, use_fec); + indexer_->setNFec(fec_to_ask); +} + +void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { + uint32_t fec_to_ask = computeFecPacketsToAsk(); + if (fec_to_ask == 0) { + // fec is off, turn on RTX immediatly to avoid packet losses + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + return; + } + + uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100); + uint32_t rtt = (uint32_t)state_->getAvgRTT(); + + bool use_rtx = false; + for (size_t i = 0; i < switch_vector.size(); i++) { + uint32_t max_loss_rate = 0; + if (fec_on_) + max_loss_rate = switch_vector[i].loss_fec_to_rtx; + else + max_loss_rate = switch_vector[i].loss_rtx_to_fec; + + if (rtt < switch_vector[i].rtt && loss_rate < max_loss_rate) { + use_rtx = true; + rtx_allowed_consecutive_rounds_++; + break; + } + } + + if (!use_rtx) rtx_allowed_consecutive_rounds_ = 0; + + if (use_rtx) { + if (fec_on_) { + // here we should swtich from RTX to FEC + // wait 10sec where the switch is allowed before actually switch + if (rtx_allowed_consecutive_rounds_ >= + ((MILLI_IN_A_SEC / ROUND_LEN) * 10)) { // 10 sec + // use RTX + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + } else { + // keep using FEC (and maybe RTX) + setRecoveryParameters(true, true, fec_to_ask); + fec_consecutive_rounds_++; + } + } else { + // keep using RTX + setRecoveryParameters(true, false, 0); + fec_consecutive_rounds_ = 0; + } + } else { + // use FEC and RTX + setRecoveryParameters(true, true, fec_to_ask); + fec_consecutive_rounds_++; + } + + // everytime that we anable FEC we keep also RTX on. in this way the first + // losses that are not covered by FEC are recovered using RTX. after 5 sec we + // disable fec + if (fec_consecutive_rounds_ >= ((MILLI_IN_A_SEC / ROUND_LEN) * 5)) { + // turn off RTX + setRtxFec(false); + } +} + +void RecoveryStrategyLowRate::turnOnRecovery() { + recovery_on_ = 1; + // the stategy will be init in the new round function +} + +void RecoveryStrategyLowRate::onNewRound(bool in_sync) { + if (!recovery_on_) { + // disable fec so that no extra packet will be sent + // for rtx we check if recovery is on in newPacketLoss + setRtxFec(true, false); + indexer_->setNFec(0); + return; + } + + // XXX since this strategy will be used only for flow at low rate we do not + // consider congestion events like in other strategies + + selectRecoveryStrategy(in_sync); +} + +void RecoveryStrategyLowRate::newPacketLoss(uint32_t seq) { + if (rtx_on_ && recovery_on_) { + addNewRtx(seq, false); + } else { + if (!state_->isPending(seq) && !indexer_->isFec(seq)) { + addNewRtx(seq, true); + } else { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + } + } +} + +void RecoveryStrategyLowRate::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h new file mode 100644 index 000000000..0e76efaca --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +#include <vector> + +namespace transport { + +namespace protocol { + +namespace rtc { + +struct thresholds_t { + uint32_t rtt; + uint32_t loss_rtx_to_fec; // loss rate used to move from rtx to fec + uint32_t loss_fec_to_rtx; // loss rate used to move from fec to rtx +}; + +class RecoveryStrategyLowRate : public RecoveryStrategy { + public: + RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategyLowRate(RecoveryStrategy &&rs); + + ~RecoveryStrategyLowRate(); + + void turnOnRecovery(); + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); + + private: + void initSwitchVector(); + void setRecoveryParameters(bool use_rtx, bool use_fec, uint32_t fec_to_ask); + void selectRecoveryStrategy(bool in_sync); + + uint32_t fec_consecutive_rounds_; + uint32_t rtx_allowed_consecutive_rounds_; + + // this table contains the thresholds that indicates when to switch from RTX + // to FEC and viceversa. values in the vector are detected with a set of + // experiments. the vector is used in the following way: if rtt and loss rate + // are less than one of the values in the in the vector, losses are + // recovered using RTX. otherwive losses are recovered using FEC. as for FEC + // only and delay based strategy, the swith from RTX to FEC is smooth, + // meaning that FEC and RTX are used together for some rounds + std::vector<thresholds_t> switch_vector; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc new file mode 100644 index 000000000..00c6a0504 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_recovery_off.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, false, false, + rs_type, std::move(external_callback)) {} + +RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(false, false); +} + +RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {} + +void RecoveryStrategyRecoveryOff::turnOnRecovery() { + // nothing to do + return; +} +void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) { + // nothing to do + return; +} + +void RecoveryStrategyRecoveryOff::newPacketLoss(uint32_t seq) { + // here we only keep track of the lost packets to avoid to + // count them multple times in the counters. for this we + // use the recover_with_fec_ set + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); +} + +void RecoveryStrategyRecoveryOff::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h new file mode 100644 index 000000000..3d59cc473 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyRecoveryOff : public RecoveryStrategy { + public: + RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs); + + ~RecoveryStrategyRecoveryOff(); + + void turnOnRecovery(); + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc new file mode 100644 index 000000000..4d7cf7a82 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rs_rtx_only.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly( + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + rs_type, std::move(external_callback)) {} + +RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) + : RecoveryStrategy(std::move(rs)) { + setRtxFec(true, false); +} + +RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {} + +void RecoveryStrategyRtxOnly::turnOnRecovery() { + recovery_on_ = true; + setRtxFec(true, false); +} + +void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) { + // nothing to do + return; +} + +void RecoveryStrategyRtxOnly::newPacketLoss(uint32_t seq) { + if (!recovery_on_) { + recover_with_fec_.insert(seq); + state_->onPossibleLossWithNoRtx(seq); + return; + } + addNewRtx(seq, false); +} + +void RecoveryStrategyRtxOnly::receivedPacket(uint32_t seq) { + removePacketState(seq); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h new file mode 100644 index 000000000..03dbed1c7 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_recovery_strategy.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RecoveryStrategyRtxOnly : public RecoveryStrategy { + public: + RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback, + asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback); + + RecoveryStrategyRtxOnly(RecoveryStrategy &&rs); + + ~RecoveryStrategyRtxOnly(); + + void turnOnRecovery(); + void onNewRound(bool in_sync); + void newPacketLoss(uint32_t seq); + void receivedPacket(uint32_t seq); +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc new file mode 100644 index 000000000..82ac0b9c1 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -0,0 +1,900 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_state.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCState::RTCState(Indexer *indexer, + ProbeHandler::SendProbeCallback &&probe_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service) + : loss_history_(10), // log 10sec history + indexer_(indexer), + probe_handler_(std::make_shared<ProbeHandler>(std::move(probe_callback), + io_service)), + discovered_rtt_callback_(std::move(discovered_rtt_callback)) { + init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); +} + +RTCState::~RTCState() {} + +void RTCState::initParams() { + // packets counters (total) + sent_interests_ = 0; + sent_rtx_ = 0; + received_data_ = 0; + received_nacks_ = 0; + received_timeouts_ = 0; + received_probes_ = 0; + + // loss counters + packets_lost_ = 0; + definitely_lost_pkt_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = 0; + highest_seq_received_ = 0; + highest_seq_received_in_order_ = 0; + last_seq_nacked_ = 0; + loss_rate_ = 0.0; + avg_loss_rate_ = -1.0; + last_round_loss_rate_ = 0.0; + + // loss rate per sec + lost_per_sec_ = 0; + total_expected_packets_ = 0; + per_sec_loss_rate_ = 0.0; + + // residual losses counters + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; + residual_loss_rate_ = 0.0; + + // fec counters + pending_fec_pkt_ = 0; + received_fec_pkt_ = 0; + + // bw counters + received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; + + avg_packet_size_ = INIT_PACKET_SIZE; + production_rate_ = 0.0; + received_rate_ = 0.0; + fec_recovered_rate_ = 0.0; + + // nack counter + past_nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + // packets counter + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + // round conunters + rounds_ = 0; + rounds_without_nacks_ = 0; + rounds_without_packets_ = 0; + + last_production_seq_ = 0; + producer_is_active_ = false; + last_prod_update_seq_ = 0; + + // paths stats + path_table_.clear(); + main_path_ = nullptr; + edge_path_ = nullptr; + + // packet cache (not pending anymore) + packet_cache_.clear(); + + // pending interests + pending_interests_.clear(); + + // used to keep track of the skipped interest + last_interest_sent_ = 0; + + // init rtt + first_interest_sent_time_ = ~0; + first_interest_sent_seq_ = 0; + + // start probing the producer + init_rtt_ = false; + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); +} + +// packet events +void RTCState::onSendNewInterest(const core::Name *interest_name) { + uint64_t now = utils::SteadyTime::nowMs().count(); + uint32_t seq = interest_name->getSuffix(); + pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); + + if (sent_interests_ == 0) { + first_interest_sent_time_ = now; + first_interest_sent_seq_ = seq; + } + + if (indexer_->isFec(seq)) { + pending_fec_pkt_++; + } + + if (last_interest_sent_ == 0 && seq != 0) { + last_interest_sent_ = seq; // init last interest sent + } + + // TODO what happen in case of jumps? + eraseFromPacketCache( + seq); // if we send this interest we don't know its state + for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { + if (indexer_->isFec(i)) { + // only fec packets can be skipped + addToPacketCache(i, PacketState::SKIPPED); + } + } + + last_interest_sent_ = seq; + + sent_interests_++; + sent_interests_last_round_++; +} + +void RTCState::onTimeout(uint32_t seq, bool lost) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } + received_timeouts_++; + + if (lost) onPacketLost(seq); +} + +void RTCState::onLossDetected(uint32_t seq) { + PacketState state = getPacketState(seq); + + // if the packet is already marked with a state, do nothing + // to be considered lost the packet must be pending + if (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end()) { + packets_lost_++; + addToPacketCache(seq, PacketState::LOST); + } +} + +void RTCState::onRetransmission(uint32_t seq) { + // remove the interest for the pendingInterest map only after the first rtx. + // in this way we can handle the ooo packets that come in late as normla + // packet. we consider a packet lost only if we sent at least an RTX for it. + // XXX this may become problematic if we stop the RTX transmissions + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } + sent_rtx_++; + sent_rtx_last_round_++; +} + +void RTCState::onPossibleLossWithNoRtx(uint32_t seq) { + // if fec is on or rtx is disable we don't need to do anything to recover a + // packet. however in both cases we need to remove possible missing packets + // from the window of pendinig interest in order to free space without wating + // for the timeout. + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } +} + +void RTCState::onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats) { + uint32_t seq = content_object.getName().getSuffix(); + + if (compute_stats) { + updatePathStats(content_object, false); + received_data_last_round_++; + } + received_data_++; + packets_sent_to_app_++; + + core::ParamsRTC params = RTCState::getDataParams(content_object); + + if (last_prod_update_seq_ < seq) { + last_prod_update_seq_ = seq; + production_rate_ = (double)params.prod_rate; + } + + updatePacketSize(content_object); + updateReceivedBytes(content_object, false); + addRecvOrLost(seq, PacketState::RECEIVED); + + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; + + received_packets_last_round_++; +} + +void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { + uint32_t seq = content_object.getName().getSuffix(); + updateReceivedBytes(content_object, true); + + PacketState state = getPacketState(seq); + if (state != PacketState::LOST) { + // increase only for not lost packets + received_fec_pkt_++; + } + addRecvOrLost(seq, PacketState::RECEIVED); + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; +} + +void RTCState::onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats) { + uint32_t seq = nack.getName().getSuffix(); + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint32_t production_seq = nack_pkt->getProductionSegment(); + uint32_t production_rate = nack_pkt->getProductionRate(); + + if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || + last_prod_update_seq_ < production_seq) { + // update production rate + last_production_seq_ = production_seq; + production_rate_ = (double)production_rate; + } + + if (compute_stats) { + // this is not an RTX + updatePathStats(nack, true); + } + + // for statistics pourpose we log all nacks, also the one received for + // retransmitted packets + received_nacks_++; + received_nacks_last_round_++; + + bool to_delete = false; + if (production_seq > seq) { + // old nack, seq is lost + // update last nacked + if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "lost packet " << seq << " beacuse of a past nack"; + if (compute_stats) past_nack_on_last_round_ = true; + onPacketLost(seq); + } else if (seq > production_seq) { + // future nack + // remove the nack from the pending interest map + // (the packet is not received/lost yet) + to_delete = true; + } else { + // this should be a quite rear event. simply remove the + // packet from the pending interest list + to_delete = true; + } + + if (to_delete) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } + } + + received_packets_last_round_++; +} + +void RTCState::onPacketLost(uint32_t seq) { + if (!indexer_->isFec(seq)) { + PacketState state = getPacketState(seq); + if (state == PacketState::LOST || + (state == PacketState::UNKNOWN && + pending_interests_.find(seq) != pending_interests_.end())) { + definitely_lost_pkt_++; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + } + } + + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); +} + +void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt) { + uint32_t seq = content_object.getName().getSuffix(); + packets_sent_to_app_++; + + // increase the recovered packet counter only if the packet was marked as LOST + // before. + PacketState state = getPacketState(seq); + if (state == PacketState::LOST) losses_recovered_++; + + addRecvOrLost(seq, PacketState::RECEIVED); + updateReceivedBytes(content_object, false); + + if (rtt == 0) return; // nothing to do + + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + if (path_it == path_table_.end()) { + // this is a new path and it must be a cache + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + if (path->pathToProducer()) + return; // this packet is coming from a producer + // even if we sent an RTX. this may happen + // for RTX that are sent too fast or in + // case of multipath + + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); +} + +void RTCState::onFecPacketRecoveredRtx( + const core::ContentObject &content_object) { + // This is the same as onPacketRecoveredRtx, but in this is case the + // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards + losses_recovered_++; + updateReceivedBytes(content_object, true); +} + +void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { + losses_recovered_++; + packets_sent_to_app_++; + recovered_bytes_with_fec_ += size; + + // adding header to the count + recovered_bytes_with_fec_ += 60; // XXX get header size some where + + // the packet could be not marked as lost yet. onLossDetected checks if add in + // the packet in the lost count or not + onLossDetected(seq); + + addRecvOrLost(seq, PacketState::RECEIVED); +} + +bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params = RTCState::getProbeParams(probe); + + bool is_valid = true; + uint32_t max = UINT32_MAX; + if (params.prod_rate == max) is_valid = false; + + uint64_t rtt; + rtt = probe_handler_->getRtt(seq, is_valid); + if (rtt == 0) return false; // this is not a valid probe + + if (!is_valid) return false; // not a valid probe + + // if we are here the producer is active + producer_is_active_ = true; + + // Like for data and nacks update the path stats. Here the RTT is computed + // by the probe handler. Both probes for rtt and bw are good to estimate + // info on the path. + uint32_t path_label = probe.getPathLabel(); + auto path_it = path_table_.find(path_label); + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); + path->receivedNack(); + + uint64_t now = utils::SteadyTime::nowMs().count(); + + int64_t OWD = now - params.timestamp; + path->insertOwdSample(OWD); + + if (last_prod_update_seq_ < params.prod_seg) { + last_production_seq_ = params.prod_seg; + production_rate_ = (double)params.prod_rate; + } + + // check for init RTT. if received_probes_ is equal to 0 schedule a timer to + // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't + // wait forever + received_probes_++; + + if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { + if (received_probes_ == 1) { + // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the + // others. + main_path_ = path; + setInitRttTimer(INIT_RTT_PROBE_WAIT); + } + if (received_probes_ == INIT_RTT_PROBES) { + // we are done + init_rtt_timer_->cancel(); + checkInitRttTimer(); + } + } + + received_packets_last_round_++; + + // ignore probes sent before the first interest + if ((now - rtt) <= first_interest_sent_time_) return false; + return true; +} + +void RTCState::onJumpForward(uint32_t next_seq) { + for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq; + seq++) { + PacketState packet_state = getPacketState(seq); + if (packet_state != PacketState::RECEIVED && + packet_state != PacketState::DEFINITELY_LOST) { + // here we considere the packet as definitely lost whitout increase the + // lost packet counter because this loss is not due to the network + // condition but the transport wants to skip the packet + onPacketLost(seq); + } + } +} + +void RTCState::onNewRound(double round_len, bool in_sync) { + if (path_table_.empty()) return; + + double bytes_per_sec = + ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); + if (received_rate_ == 0) + received_rate_ = bytes_per_sec; + else + received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + double fec_bytes_per_sec = + ((double)received_fec_bytes_ * (MILLI_IN_A_SEC / round_len)); + + if (fec_received_rate_ == 0) + fec_received_rate_ = fec_bytes_per_sec; + else + fec_received_rate_ = (fec_received_rate_ * 0.8) + (0.2 * fec_bytes_per_sec); + + double fec_recovered_bytes_per_sec = + ((double)recovered_bytes_with_fec_ * (MILLI_IN_A_SEC / round_len)); + + if (fec_recovered_rate_ == 0) + fec_recovered_rate_ = fec_recovered_bytes_per_sec; + else + fec_recovered_rate_ = + (fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec); + + // search for an active path. Is it possible to have multiple path that are + // used at the same time. We use as reference path the one from where we gets + // more packets. This means that the path should have better lantecy or less + // channel losses + + uint32_t last_round_packets = 0; + uint64_t min_edge_rtt = UINT_MAX; + std::shared_ptr<RTCDataPath> old_main_path = main_path_; + main_path_ = nullptr; + edge_path_ = nullptr; + + for (auto it = path_table_.begin(); it != path_table_.end(); it++) { + if (it->second->isValidProducer()) { + uint32_t pkt = it->second->getPacketsLastRound(); + if (pkt > last_round_packets) { + last_round_packets = pkt; + main_path_ = it->second; + } + } else if (it->second->isActive() && !it->second->pathToProducer()) { + // this is a path to a cache from where we are receiving content + if (it->second->getMinRtt() < min_edge_rtt) { + min_edge_rtt = it->second->getMinRtt(); + edge_path_ = it->second; + } + } + it->second->roundEnd(); + } + + if (main_path_ == nullptr) main_path_ = old_main_path; + if (edge_path_ == nullptr) edge_path_ = main_path_; + if (edge_path_->getMinRtt() >= main_path_->getMinRtt()) + edge_path_ = main_path_; + + // in case we get a new main path we reset the stats of the old one. this is + // beacuse, in case we need to switch back we don't what to take decisions on + // old stats that may be outdated. + if (main_path_ != old_main_path) old_main_path->clearRtt(); + + updateLossRate(in_sync); + + // handle nacks + if (!past_nack_on_last_round_ && received_bytes_ > 0) { + rounds_without_nacks_++; + } else { + rounds_without_nacks_ = 0; + } + + // check if the producer is active + if (received_packets_last_round_ != 0) { + rounds_without_packets_ = 0; + } else { + rounds_without_packets_++; + if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS && + producer_is_active_ != false) { + initParams(); + } + } + + // reset counters + received_bytes_ = 0; + received_fec_bytes_ = 0; + recovered_bytes_with_fec_ = 0; + packets_lost_ = 0; + definitely_lost_pkt_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = highest_seq_received_; + + past_nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + received_fec_pkt_ = 0; + + rounds_++; +} + +void RTCState::updateReceivedBytes(const core::ContentObject &content_object, + bool isFec) { + if (isFec) { + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } else { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } +} + +void RTCState::updatePacketSize(const core::ContentObject &content_object) { + uint32_t pkt_size = + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) + + ((1 - MOVING_AVG_ALPHA) * pkt_size); +} + +void RTCState::updatePathStats(const core::ContentObject &content_object, + bool is_nack) { + // get packet path + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + // compute rtt + uint32_t seq = content_object.getName().getSuffix(); + uint64_t interest_sent_time = getInterestSentTime(seq); + if (interest_sent_time == 0) + return; // this should not happen, + // it means that we are processing an interest + // that is not pending + + uint64_t now = utils::SteadyTime::nowMs().count(); + + uint64_t RTT = now - interest_sent_time; + + path->insertRttSample(utils::SteadyTime::Milliseconds(RTT), false); + + // compute OWD (the first part of the nack and data packet header are the + // same, so we cast to data data packet) + core::ParamsRTC params = RTCState::getDataParams(content_object); + int64_t OWD = now - params.timestamp; + path->insertOwdSample(OWD); + + // compute IAT or set path to producer + if (!is_nack) { + // compute the iat only for the content packets + uint32_t segment_number = content_object.getName().getSuffix(); + path->computeInterArrivalGap(segment_number); + if (!path->pathToProducer()) received_data_from_cache_++; + } else { + path->receivedNack(); + } +} + +void RTCState::updateLossRate(bool in_sync) { + last_round_loss_rate_ = loss_rate_; + loss_rate_ = 0.0; + + uint32_t number_theorically_received_packets_ = + highest_seq_received_ - first_seq_in_round_; + + // XXX this may be quite inefficient if the rate is high + // maybe is better to iterate over the set? + + uint32_t fec_packets = 0; + for (uint32_t i = (first_seq_in_round_ + 1); i < highest_seq_received_; i++) { + PacketState state = getPacketState(i); + if (state == PacketState::SKIPPED) { + if (number_theorically_received_packets_ > 0) + number_theorically_received_packets_--; + } + if (indexer_->isFec(i)) fec_packets++; + } + if (indexer_->isFec(highest_seq_received_)) fec_packets++; + + // in this case no new packet was received after the previous round, avoid + // division by 0 + if (number_theorically_received_packets_ == 0 && packets_lost_ == 0) return; + + if (number_theorically_received_packets_ != 0) + loss_rate_ = (double)((double)(packets_lost_) / + (double)number_theorically_received_packets_); + else + // we didn't receive anything except NACKs that triggered losses + loss_rate_ = 1.0; + + if (avg_loss_rate_ == -1.0) + avg_loss_rate_ = loss_rate_; + else + avg_loss_rate_ = + avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); + + // update counters for loss rate per second + total_expected_packets_ += number_theorically_received_packets_; + lost_per_sec_ += packets_lost_; + + if (in_sync) { + // update counters for residual losses + // fec packets are not sent to the app so we don't want to count them here + expected_packets_ += + ((highest_seq_received_ - first_seq_in_round_) - fec_packets); + } else { + expected_packets_ = 0; + packets_sent_to_app_ = 0; + } + + if (rounds_from_last_compute_ >= (MILLI_IN_A_SEC / ROUND_LEN)) { + // compute loss rate per second + if (lost_per_sec_ > total_expected_packets_) + lost_per_sec_ = total_expected_packets_; + + if (total_expected_packets_ == 0) + per_sec_loss_rate_ = 0; + else + per_sec_loss_rate_ = + (double)((double)(lost_per_sec_) / (double)total_expected_packets_); + + loss_history_.pushBack(per_sec_loss_rate_); + + if (in_sync && expected_packets_ != 0) { + // compute residual loss rate + if (packets_sent_to_app_ > expected_packets_) { + // this may happen if we get packet from the prev bin that get recovered + // on the current one + packets_sent_to_app_ = expected_packets_; + } + + residual_loss_rate_ = + 1.0 - ((double)packets_sent_to_app_ / (double)expected_packets_); + if (residual_loss_rate_ < 0.0) residual_loss_rate_ = 0.0; + } + + lost_per_sec_ = 0; + total_expected_packets_ = 0; + expected_packets_ = 0; + packets_sent_to_app_ = 0; + rounds_from_last_compute_ = 0; + } + + rounds_from_last_compute_++; +} + +void RTCState::dataToBeReceived(uint32_t seq) { + addToPacketCache(seq, PacketState::TO_BE_RECEIVED); +} + +void RTCState::updateHighestSeqReceived(uint32_t seq) { + if (seq > highest_seq_received_) highest_seq_received_ = seq; +} + +void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + if (indexer_->isFec(seq)) pending_fec_pkt_--; + } + + addToPacketCache(seq, state); + + // keep track of the last packet received/lost + // without holes. + if (highest_seq_received_in_order_ < last_seq_nacked_) { + highest_seq_received_in_order_ = last_seq_nacked_; + } + + if ((highest_seq_received_in_order_ + 1) == seq) { + highest_seq_received_in_order_ = seq; + } else if (seq <= highest_seq_received_in_order_) { + // here we do nothing + } else if (seq > highest_seq_received_in_order_) { + // 1) there is a gap in the sequence so we do not update + // highest_seq_received_in_order_ + // 2) all the packets from highest_seq_received_in_order_ to seq are + // received or lost or are fec packetis. In this case we increase + // highest_seq_received_in_order_ until we find an hole in the sequence + + for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { + PacketState state = getPacketState(i); + if ((state == PacketState::UNKNOWN || state == PacketState::LOST)) { + if (indexer_->isFec(i)) { + // this is a fec packet and we don't care to receive it + // however we may need to increse the number or lost packets + // XXX: in case we want to use rtx to recover fec packets, + // this may prevent to detect a packet loss and no rtx will be sent + if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) { + onLossDetected(i); + } + } else { + // this is a data packet and we need to get it + break; + } + } + // this packet is in order so we can update the + // highest_seq_received_in_order_ + highest_seq_received_in_order_ = i; + } + } +} + +void RTCState::setInitRttTimer(uint32_t wait) { + init_rtt_timer_->cancel(); + init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); + + std::weak_ptr<RTCState> self = shared_from_this(); + init_rtt_timer_->async_wait([self](const std::error_code &ec) { + if (ec) return; + + if (auto ptr = self.lock()) { + ptr->checkInitRttTimer(); + } + }); +} + +void RTCState::checkInitRttTimer() { + if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV || + probe_handler_->getProbeLossRate() == 1.0) { + // we didn't received enough probes or they were not valid, restart + received_probes_ = 0; + probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ); + probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + probe_handler_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); + return; + } + + init_rtt_ = true; + main_path_->roundEnd(); + loss_history_.pushBack(probe_handler_->getProbeLossRate()); + + probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ); + probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0); + probe_handler_->sendProbes(); + + // init last_seq_nacked_. skip packets that may come from the cache + double prod_rate = getProducerRate(); + double rtt = (double)getMinRTT() / MILLI_IN_A_SEC; + double packet_size = getAveragePacketSize(); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt)); + last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; + + discovered_rtt_callback_(); +} + +core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + core::ParamsRTC params; + + switch (ProbeHandler::getProbeType(seq)) { + case ProbeType::INIT: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(probe).shared_from_this()); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + case ProbeType::RTT: { + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = probe_pkt->getTimestamp(), + .prod_rate = probe_pkt->getProductionRate(), + .prod_seg = probe_pkt->getProductionSegment(), + }; + break; + } + default: + break; + } + + return params; +} + +core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { + core::ParamsRTC params; + + switch (data.getPayloadType()) { + case core::PayloadType::DATA: { + struct data_packet_t *data_pkt = + (struct data_packet_t *)data.getPayload()->data(); + params = core::ParamsRTC{ + .timestamp = data_pkt->getTimestamp(), + .prod_rate = data_pkt->getProductionRate(), + .prod_seg = data.getName().getSuffix(), + }; + break; + } + case core::PayloadType::MANIFEST: { + core::ContentObjectManifest manifest( + const_cast<core::ContentObject &>(data).shared_from_this()); + manifest.decode(); + params = manifest.getParamsRTC(); + break; + } + default: + break; + } + + return params; +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h new file mode 100644 index 000000000..ac3cc621f --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -0,0 +1,410 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <core/facade.h> +#include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/utils/rtc_quality_score.h> +#include <protocols/indexer.h> +#include <protocols/rtc/probe_handler.h> +#include <protocols/rtc/rtc_data_path.h> +#include <utils/max_filter.h> + +#include <map> +#include <set> + +namespace transport { + +namespace protocol { + +namespace rtc { + +// packet state +// RECEIVED: the packet was already received +// LOST: the packet is marked as lost but can be recovered +// DEFINITELY_LOST: the packet is lost and cannot be recovered +// TO_BE_RECEIVED: when a packet is received is sent to the FEC decoder. the fec +// decoder may decide to send the packet directly to the app. to avoid +// duplicated the packet is marked with this state +// SKIPPED: an interest that was not sent, only for FEC packets +// UNKNOWN: unknown state +enum class PacketState : uint8_t { + RECEIVED, + TO_BE_RECEIVED, + LOST, + DEFINITELY_LOST, + SKIPPED, + UNKNOWN +}; + +class RTCState : public std::enable_shared_from_this<RTCState> { + using PendingInterestsMap = std::map<uint32_t, uint64_t>; + + private: + const double MAX_CACHED_PACKETS = 8192; // XXX this value may be too small + // for high rate apps + + public: + using DiscoveredRttCallback = std::function<void()>; + + public: + RTCState(Indexer *indexer, ProbeHandler::SendProbeCallback &&probe_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service); + + ~RTCState(); + + // initialization + void initParams(); + + // packet events + void onSendNewInterest(const core::Name *interest_name); + void onTimeout(uint32_t seq, bool lost); + void onLossDetected(uint32_t seq); + void onRetransmission(uint32_t seq); + void onPossibleLossWithNoRtx(uint32_t seq); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + void onFecPacketReceived(const core::ContentObject &content_object); + void onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats); + void onPacketLost(uint32_t seq); + void onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt); + void onFecPacketRecoveredRtx(const core::ContentObject &content_object); + void onPacketRecoveredFec(uint32_t seq, uint32_t size); + bool onProbePacketReceived(const core::ContentObject &probe); + void onJumpForward(uint32_t next_seq); + + // protocol state + void onNewRound(double round_len, bool in_sync); + + // main path + uint32_t getProducerPath() const { + if (mainPathIsValid()) return main_path_->getPathId(); + return 0; + } + + // delay metrics + bool isRttDiscovered() const { return init_rtt_; } + + uint64_t getMinRTT() const { + if (mainPathIsValid()) return main_path_->getMinRtt(); + return 0; + } + + uint64_t getAvgRTT() const { + if (mainPathIsValid()) return main_path_->getAvgRtt(); + return 0; + } + + uint64_t getMaxRTT() const { + if (mainPathIsValid()) return main_path_->getMaxRtt(); + return 0; + } + + uint64_t getEdgeRtt() const { + if (edge_path_ != nullptr) return edge_path_->getMinRtt(); + return 0; + } + + void resetRttStats() { + if (mainPathIsValid()) main_path_->clearRtt(); + } + + double getQueuing() const { + if (mainPathIsValid()) return main_path_->getQueuingDealy(); + return 0.0; + } + double getIAT() const { + if (mainPathIsValid()) return main_path_->getInterArrivalGap(); + return 0.0; + } + + double getJitter() const { + if (mainPathIsValid()) return main_path_->getJitter(); + return 0.0; + } + + // pending interests + uint64_t getInterestSentTime(uint32_t seq) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) return it->second; + + return 0; + } + + bool isPending(uint32_t seq) { + if (pending_interests_.find(seq) != pending_interests_.end()) return true; + return false; + } + + uint32_t getPendingInterestNumber() const { + return (uint32_t)pending_interests_.size(); + } + + PacketState getPacketState(uint32_t seq) { + auto it = packet_cache_.find(seq); + if (it != packet_cache_.end()) return it->second; + return PacketState::UNKNOWN; + } + + // loss rate + double getPerRoundLossRate() const { return loss_rate_; } + double getPerSecondLossRate() const { return per_sec_loss_rate_; } + double getAvgLossRate() const { return avg_loss_rate_; } + double getMaxLossRate() const { + if (loss_history_.size() != 0) return loss_history_.begin(); + return 0; + } + + double getLastRoundLossRate() const { return last_round_loss_rate_; } + double getResidualLossRate() const { return residual_loss_rate_; } + + uint32_t getLostData() const { return packets_lost_; }; + uint32_t getRecoveredLosses() const { return losses_recovered_; } + + uint32_t getDefinitelyLostPackets() const { return definitely_lost_pkt_; } + + uint32_t getHighestSeqReceived() const { return highest_seq_received_; } + + uint32_t getHighestSeqReceivedInOrder() const { + return highest_seq_received_in_order_; + } + + // fec packets + uint32_t getReceivedFecPackets() const { return received_fec_pkt_; } + uint32_t getPendingFecPackets() const { return pending_fec_pkt_; } + + // generic stats + uint32_t getReceivedBytesInRound() const { return received_bytes_; } + uint32_t getReceivedFecBytesInRound() const { return received_fec_bytes_; } + uint32_t getRecoveredFecBytesInRound() const { + return recovered_bytes_with_fec_; + } + uint32_t getReceivedNacksInRound() const { + return received_nacks_last_round_; + } + uint32_t getReceivedDataInRound() const { return received_data_last_round_; } + uint32_t getSentInterestInRound() const { return sent_interests_last_round_; } + uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; } + + // bandwidth/production metrics + double getAvailableBw() const { return 0.0; }; // TODO + double getProducerRate() const { return production_rate_; } + double getReceivedRate() const { return received_rate_; } + double getReceivedFecRate() const { return fec_received_rate_; } + double getRecoveredFecRate() const { return fec_recovered_rate_; } + + double getAveragePacketSize() const { return avg_packet_size_; } + + // nacks + uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; } + uint32_t getLastSeqNacked() const { return last_seq_nacked_; } + + // producer state + bool isProducerActive() const { return producer_is_active_; } + + // packets from cache + // this should be called at the end of a round beacuse otherwise we may have + // not enough packets to get a good stat + double getPacketFromCacheRatio() const { + if (received_data_last_round_ == 0) return 0; + return (double)received_data_from_cache_ / + (double)received_data_last_round_; + } + + PendingInterestsMap::iterator getPendingInterestsMapBegin() { + return pending_interests_.begin(); + } + PendingInterestsMap::iterator getPendingInterestsMapEnd() { + return pending_interests_.end(); + } + + // quality + uint8_t getQualityScore() { + uint8_t qs = quality_score_.getQualityScore( + getMaxRTT(), std::round(getResidualLossRate() * 100)); + return qs; + } + + // We received a data pkt that will be set to RECEIVED, but first we have to + // go through FEC. We do not want to consider this pkt as recovered, thus we + // set it as TO_BE_RECEIVED. + void dataToBeReceived(uint32_t seq); + + void updateHighestSeqReceived(uint32_t seq); + + // Extract RTC parameters from probes (init or RTT probes) and data packets. + static core::ParamsRTC getProbeParams(const core::ContentObject &probe); + static core::ParamsRTC getDataParams(const core::ContentObject &data); + + private: + void addToPacketCache(uint32_t seq, PacketState state) { + // this function adds or updates the current state + if (packet_cache_.size() >= MAX_CACHED_PACKETS) { + packet_cache_.erase(packet_cache_.begin()); + } + packet_cache_[seq] = state; + } + + void eraseFromPacketCache(uint32_t seq) { packet_cache_.erase(seq); } + + // update stats + void updateState(); + void updateReceivedBytes(const core::ContentObject &content_object, + bool isFec); + void updatePacketSize(const core::ContentObject &content_object); + void updatePathStats(const core::ContentObject &content_object, bool is_nack); + void updateLossRate(bool in_sycn); + + void addRecvOrLost(uint32_t seq, PacketState state); + + void setInitRttTimer(uint32_t wait); + void checkInitRttTimer(); + + bool mainPathIsValid() const { + if (main_path_ != nullptr) + return true; + else + return false; + } + + // packets counters (total) + uint32_t sent_interests_; + uint32_t sent_rtx_; + uint32_t received_data_; + uint32_t received_nacks_; + uint32_t received_timeouts_; + uint32_t received_probes_; + + // loss counters + int32_t packets_lost_; + int32_t losses_recovered_; + uint32_t definitely_lost_pkt_; + uint32_t first_seq_in_round_; + uint32_t highest_seq_received_; + uint32_t highest_seq_received_in_order_; + uint32_t last_seq_nacked_; // segment for which we got an oldNack + double loss_rate_; + double avg_loss_rate_; + double last_round_loss_rate_; + utils::MaxFilter<double> loss_history_; + + // per second loss rate + uint32_t lost_per_sec_; + uint32_t total_expected_packets_; + double per_sec_loss_rate_; + + // conunters for residual losses + // residual losses are computed every second and are used + // as feedback to the upper levels (e.g application) + uint32_t expected_packets_; + uint32_t packets_sent_to_app_; + uint32_t rounds_from_last_compute_; + double residual_loss_rate_; + + // bw counters + uint32_t received_bytes_; + uint32_t received_fec_bytes_; + uint32_t recovered_bytes_with_fec_; + double avg_packet_size_; + double production_rate_; // rate communicated by the producer using nacks + double received_rate_; // rate recevied by the consumer (only data) + double fec_received_rate_; // fec rate recevied by the consumer + double fec_recovered_rate_; // rate recovered using fec + + // nack counters + // the bool takes tracks only about the valid past nacks (no rtx) and it is + // used to switch between the states. Instead received_nacks_last_round_ logs + // all the nacks for statistics + bool past_nack_on_last_round_; + uint32_t received_nacks_last_round_; + + // packets counters + uint32_t received_packets_last_round_; + uint32_t received_data_last_round_; + uint32_t received_data_from_cache_; + uint32_t sent_interests_last_round_; + uint32_t sent_rtx_last_round_; + + // fec counters + uint32_t received_fec_pkt_; + uint32_t pending_fec_pkt_; + + // round counters + uint32_t rounds_; + uint32_t rounds_without_nacks_; + uint32_t rounds_without_packets_; + + // init rtt + uint64_t first_interest_sent_time_; + uint32_t first_interest_sent_seq_; + + // producer state + bool + producer_is_active_; // the prodcuer is active if we receive some packets + uint32_t last_production_seq_; // last production seq received by the + // producer used to init the sync protcol + uint32_t last_prod_update_seq_; // seq number of the last packet used to + // update the update from the producer. + // assumption: the highest seq number carries + // the most up to date info. in case of + // probes we look at the produced seq number + + // paths stats + std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_; + std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects + // the consumer to the producer. in + // case of multipath the trasnport + // uses the most active path + std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it + // exists + + // packet received + // cache where to store info about the last MAX_CACHED_PACKETS + // these are packets that are received or lost or definitely lost and are not + // anymore in the pending intetest list + std::map<uint32_t, PacketState> packet_cache_; + + // pending interests + PendingInterestsMap pending_interests_; + + // indexer + Indexer *indexer_; + + // used to keep track of the skipped interests + uint32_t last_interest_sent_; + + // probes + std::shared_ptr<ProbeHandler> probe_handler_; + bool init_rtt_; + std::unique_ptr<asio::steady_timer> init_rtt_timer_; + + // quality score + RTCQualityScore quality_score_; + + // callbacks + DiscoveredRttCallback discovered_rtt_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc new file mode 100644 index 000000000..60fce92a5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_verifier.cc @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2017-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/facade.h> +#include <protocols/rtc/rtc_packet.h> +#include <protocols/rtc/rtc_verifier.h> + +namespace transport { +namespace protocol { +namespace rtc { + +RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier, + uint32_t factor_relevant, uint32_t factor_alert) + : verifier_(verifier), + factor_relevant_(factor_relevant), + factor_alert_(factor_alert), + manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {} + +void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) { + rtc_state_ = rtc_state; +} + +void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) { + verifier_ = verifier; +} + +void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) { + factor_relevant_ = factor_relevant; +} + +void RTCVerifier::setFactorAlert(uint32_t factor_alert) { + factor_alert_ = factor_alert; +} + +auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) { + return verifier_->verifyPackets(&interest); +} + +auth::VerificationPolicy RTCVerifier::verify( + core::ContentObject &content_object, bool is_fec) { + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy default_policy = auth::VerificationPolicy::ABORT; + + core::PayloadType payload_type = content_object.getPayloadType(); + bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE; + bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE; + bool is_manifest = !is_probe && !is_nack && !is_fec && + payload_type == core::PayloadType::MANIFEST; + bool is_data = !is_probe && !is_nack && !is_fec && + payload_type == core::PayloadType::DATA; + + if (is_probe) return verifyProbe(content_object); + if (is_nack) return verifyNack(content_object); + if (is_fec) return verifyFec(content_object); + if (is_data) return verifyData(content_object); + if (is_manifest) return verifyManifest(content_object); + + verifier_->callVerificationFailedCallback(suffix, default_policy); + return default_policy; +} + +auth::VerificationPolicy RTCVerifier::verifyProbe( + core::ContentObject &content_object) { + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + + switch (ProbeHandler::getProbeType(suffix)) { + case ProbeType::INIT: + policy = verifyManifest(content_object); + if (policy == auth::VerificationPolicy::ACCEPT) { + policy = processManifest(content_object); + } + break; + case ProbeType::RTT: + policy = verifyNack(content_object); + break; + default: + verifier_->callVerificationFailedCallback(suffix, policy); + break; + } + + return policy; +} + +auth::VerificationPolicy RTCVerifier::verifyNack( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::verifyFec( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::verifyData( + core::ContentObject &content_object) { + if (HICN_PACKET_FORMAT_IS_AH(content_object.getFormat())) { + return verifier_->verifyPackets(&content_object); + } + + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; + + uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_; + uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_; + + // Flush packets outside relevance window + for (auto it = packets_unverif_.set().begin(); + it != packets_unverif_.set().end();) { + if (it->first > current_index_ - threshold_relevant) { + break; + } + packets_unverif_erased_.insert((unsigned int)it->first); + it = packets_unverif_.remove(it); + } + + // Add packet to set of unverified packets + packets_unverif_.add({current_index_, suffix}, + content_object.computeDigest(manifest_hash_algo_)); + current_index_++; + + // Check that the number of unverified packets is below the alert threshold + if (packets_unverif_.set().size() <= threshold_alert) { + policy = auth::VerificationPolicy::ACCEPT; + } + + verifier_->callVerificationFailedCallback(suffix, policy); + return policy; +} + +auth::VerificationPolicy RTCVerifier::verifyManifest( + core::ContentObject &content_object) { + return verifier_->verifyPackets(&content_object); +} + +auth::VerificationPolicy RTCVerifier::processManifest( + core::ContentObject &content_object) { + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT; + + // Decode manifest + core::ContentObjectManifest manifest(content_object.shared_from_this()); + manifest.decode(); + + // Extract manifest data + manifest_max_capacity_ = manifest.getMaxCapacity(); + manifest_hash_algo_ = manifest.getHashAlgorithm(); + auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap(); + + // Return early if the manifest is empty + if (suffix_map.empty()) { + verifier_->callVerificationFailedCallback(suffix, accept_policy); + return accept_policy; + } + + // Add hashes to map of all manifest hashes + manifest_digests_.insert(suffix_map.begin(), suffix_map.end()); + + // Remove discarded and definitely lost packets from digest map + for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) { + auto it_erased = packets_unverif_erased_.find(it->first); + + if (it_erased != packets_unverif_erased_.end()) { + packets_unverif_erased_.erase(it_erased); + it = manifest_digests_.erase(it); + continue; + } + + if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) { + it = manifest_digests_.erase(it); + continue; + } + + ++it; + } + + // Verify packets + auth::Verifier::PolicyMap policies = + verifier_->verifyHashes(packets_unverif_.suffixMap(), manifest_digests_); + + for (const auto &p : policies) { + switch (p.second) { + case auth::VerificationPolicy::ACCEPT: { + packets_unverif_.remove(packets_unverif_.packet(p.first)); + manifest_digests_.erase(p.first); + break; + } + case auth::VerificationPolicy::UNKNOWN: + break; + case auth::VerificationPolicy::DROP: + case auth::VerificationPolicy::ABORT: + return p.second; + } + } + + verifier_->callVerificationFailedCallback(suffix, accept_policy); + return accept_policy; +} + +void RTCVerifier::onDataRecoveredFec(uint32_t suffix) { + manifest_digests_.erase(suffix); +} + +std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add( + const Packet &packet, const auth::CryptoHash &digest) { + auto inserted = packets_.insert(packet); + if (inserted.second) { + packets_map_[packet.second] = inserted.first; + suffix_map_[packet.second] = digest; + } + return inserted; +} + +RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove( + PacketSet::iterator packet_it) { + packets_map_.erase(packet_it->second); + suffix_map_.erase(packet_it->second); + return packets_.erase(packet_it); +} + +const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const { + return packets_; +}; + +RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet( + auth::Suffix suffix) { + return packets_map_.at(suffix); +}; + +const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const { + return suffix_map_; +} + +} // end namespace rtc +} // end namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h new file mode 100644 index 000000000..c83faf08a --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_verifier.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2017-2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/facade.h> +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/content_object.h> +#include <protocols/rtc/rtc_state.h> + +namespace transport { +namespace protocol { +namespace rtc { + +class RTCVerifier { + public: + explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier, + uint32_t factor_relevant, uint32_t factor_alert); + + virtual ~RTCVerifier() = default; + + void setState(std::shared_ptr<RTCState> rtc_state); + void setVerifier(std::shared_ptr<auth::Verifier> verifier); + void setFactorRelevant(uint32_t factor_relevant); + void setFactorAlert(uint32_t factor_alert); + + auth::VerificationPolicy verify(core::Interest &interest); + auth::VerificationPolicy verify(core::ContentObject &content_object, + bool is_fec = false); + auth::VerificationPolicy verifyProbe(core::ContentObject &content_object); + auth::VerificationPolicy verifyNack(core::ContentObject &content_object); + auth::VerificationPolicy verifyFec(core::ContentObject &content_object); + auth::VerificationPolicy verifyData(core::ContentObject &content_object); + auth::VerificationPolicy verifyManifest(core::ContentObject &content_object); + + auth::VerificationPolicy processManifest(core::ContentObject &content_object); + + void onDataRecoveredFec(uint32_t suffix); + + protected: + using Index = uint64_t; + using Packet = std::pair<Index, auth::Suffix>; + using PacketSet = std::set<Packet>; + + class Packets { + public: + std::pair<PacketSet::iterator, bool> add(const Packet &packet, + const auth::CryptoHash &digest); + PacketSet::iterator remove(PacketSet::iterator packet_it); + const PacketSet &set() const; + PacketSet::iterator packet(auth::Suffix suffix); + const auth::Verifier::SuffixMap &suffixMap() const; + + private: + PacketSet packets_; + std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_; + auth::Verifier::SuffixMap suffix_map_; + }; + + // The RTC state. + std::shared_ptr<RTCState> rtc_state_; + // The verifier instance. + std::shared_ptr<auth::Verifier> verifier_; + // Used to compute the relevance windows size (in packets). + uint32_t factor_relevant_; + // Used to compute the alert threshold (in packets). + uint32_t factor_alert_; + // The maximum number of entries a manifest can contain. + uint8_t manifest_max_capacity_; + // Hash algorithm used by manifests. + auth::CryptoHashType manifest_hash_algo_; + // Digests extracted from all manifests received. + auth::Verifier::SuffixMap manifest_digests_; + // The number of data packets processed. + Index current_index_; + // Unverified packets with index in relevance window. + Packets packets_unverif_; + // Unverified erased packets with index outside relevance window. + std::unordered_set<auth::Suffix> packets_unverif_erased_; +}; + +} // namespace rtc +} // namespace protocol +} // namespace transport |