From 229385955109b866a23c4ac2aa03d4d11044c39d Mon Sep 17 00:00:00 2001 From: "Enrico Loparco (eloparco)" Date: Thu, 24 Jun 2021 09:15:41 +0200 Subject: [HICN-708] Rebase with master Signed-off-by: Enrico Loparco (eloparco) Change-Id: I2122e1d61dd3b2e039972624ffbdbcb3c5610159 --- libtransport/src/protocols/rtc/CMakeLists.txt | 38 ++ .../src/protocols/rtc/congestion_detection.cc | 101 ++++ .../src/protocols/rtc/congestion_detection.h | 138 +++++ libtransport/src/protocols/rtc/probe_handler.cc | 107 ++++ libtransport/src/protocols/rtc/probe_handler.h | 75 +++ libtransport/src/protocols/rtc/rtc.cc | 607 +++++++++++++++++++++ libtransport/src/protocols/rtc/rtc.h | 113 ++++ libtransport/src/protocols/rtc/rtc_consts.h | 121 ++++ libtransport/src/protocols/rtc/rtc_data_path.cc | 197 +++++++ libtransport/src/protocols/rtc/rtc_data_path.h | 97 ++++ libtransport/src/protocols/rtc/rtc_ldr.cc | 427 +++++++++++++++ libtransport/src/protocols/rtc/rtc_ldr.h | 108 ++++ libtransport/src/protocols/rtc/rtc_packet.h | 93 ++++ libtransport/src/protocols/rtc/rtc_rc.h | 58 ++ libtransport/src/protocols/rtc/rtc_rc_frame.cc | 79 +++ libtransport/src/protocols/rtc/rtc_rc_frame.h | 46 ++ libtransport/src/protocols/rtc/rtc_rc_queue.cc | 106 ++++ libtransport/src/protocols/rtc/rtc_rc_queue.h | 47 ++ libtransport/src/protocols/rtc/rtc_state.cc | 560 +++++++++++++++++++ libtransport/src/protocols/rtc/rtc_state.h | 253 +++++++++ .../src/protocols/rtc/trendline_estimator.cc | 334 ++++++++++++ .../src/protocols/rtc/trendline_estimator.h | 147 +++++ 22 files changed, 3852 insertions(+) create mode 100644 libtransport/src/protocols/rtc/CMakeLists.txt create mode 100644 libtransport/src/protocols/rtc/congestion_detection.cc create mode 100644 libtransport/src/protocols/rtc/congestion_detection.h create mode 100644 libtransport/src/protocols/rtc/probe_handler.cc create mode 100644 libtransport/src/protocols/rtc/probe_handler.h create mode 100644 libtransport/src/protocols/rtc/rtc.cc create mode 100644 libtransport/src/protocols/rtc/rtc.h create mode 100644 libtransport/src/protocols/rtc/rtc_consts.h create mode 100644 libtransport/src/protocols/rtc/rtc_data_path.cc create mode 100644 libtransport/src/protocols/rtc/rtc_data_path.h create mode 100644 libtransport/src/protocols/rtc/rtc_ldr.cc create mode 100644 libtransport/src/protocols/rtc/rtc_ldr.h create mode 100644 libtransport/src/protocols/rtc/rtc_packet.h create mode 100644 libtransport/src/protocols/rtc/rtc_rc.h create mode 100644 libtransport/src/protocols/rtc/rtc_rc_frame.cc create mode 100644 libtransport/src/protocols/rtc/rtc_rc_frame.h create mode 100644 libtransport/src/protocols/rtc/rtc_rc_queue.cc create mode 100644 libtransport/src/protocols/rtc/rtc_rc_queue.h create mode 100644 libtransport/src/protocols/rtc/rtc_state.cc create mode 100644 libtransport/src/protocols/rtc/rtc_state.h create mode 100644 libtransport/src/protocols/rtc/trendline_estimator.cc create mode 100644 libtransport/src/protocols/rtc/trendline_estimator.h (limited to 'libtransport/src/protocols/rtc') diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt new file mode 100644 index 000000000..77f065d0e --- /dev/null +++ b/libtransport/src/protocols/rtc/CMakeLists.txt @@ -0,0 +1,38 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc new file mode 100644 index 000000000..e2d44ae66 --- /dev/null +++ b/libtransport/src/protocols/rtc/congestion_detection.cc @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +CongestionDetection::CongestionDetection() + : cc_estimator_(), last_processed_chunk_() {} + +CongestionDetection::~CongestionDetection() {} + +void CongestionDetection::updateStats() { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (chunks_number_.empty()) return; + + uint32_t chunk_number = chunks_number_.front(); + + while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS < + now || + chunks_[chunk_number].isComplete()) { + if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) { + chunks_[chunk_number].setPreviousSentTime( + last_processed_chunk_.getSentTime()); + + chunks_[chunk_number].setPreviousReceivedTime( + last_processed_chunk_.getReceivedTime()); + cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(), + chunks_[chunk_number].getSentDelta(), + chunks_[chunk_number].getSentTime(), + chunks_[chunk_number].getReceivedTime(), + chunks_[chunk_number].getFrameSize(), true); + + } else { + TRANSPORT_LOGD( + "CongestionDetection::updateStats frame %u but not the \ + previous one, last one was %u currentFrame %u", + chunk_number, last_processed_chunk_.getFrameSeqNum(), + chunks_[chunk_number].getFrameSeqNum()); + } + + last_processed_chunk_ = chunks_[chunk_number]; + + chunks_.erase(chunk_number); + + chunks_number_.pop(); + if (chunks_number_.empty()) break; + + chunk_number = chunks_number_.front(); + } +} + +void CongestionDetection::addPacket(const core::ContentObject &content_object) { + auto payload = content_object.getPayload(); + uint32_t payload_size = (uint32_t)payload->length(); + uint32_t segmentNumber = content_object.getName().getSuffix(); + // uint32_t pkt = segmentNumber & modMask_; + uint64_t *sentTimePtr = (uint64_t *)payload->data(); + + // this is just for testing with hiperf, assuming a frame is 10 pkts + // in the final version, the split should be based on the timestamp in the pkt + uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE); + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (chunks_.find(frameNum) == chunks_.end()) { + // new chunk of pkts or out of order + if (last_processed_chunk_.getFrameSeqNum() > frameNum) + return; // out of order and we already processed the chunk + + chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE); + chunks_number_.push(frameNum); + } + + chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size); +} + +} // namespace rtc +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h new file mode 100644 index 000000000..17f4aa54c --- /dev/null +++ b/libtransport/src/protocols/rtc/congestion_detection.h @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#define HICN_CC_STATS_CHUNK_SIZE 10 +#define HICN_CC_STATS_MAX_DELAY_MS 100 + +namespace transport { + +namespace protocol { + +namespace rtc { + +class FrameStats { + public: + FrameStats() + : frame_num_(0), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(HICN_CC_STATS_CHUNK_SIZE){}; + + FrameStats(uint32_t burst_size) + : frame_num_(0), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(burst_size){}; + + FrameStats(uint32_t frame_num, uint32_t burst_size) + : frame_num_(frame_num), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(burst_size){}; + + FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time, + uint32_t size, FrameStats previousFrame, uint32_t burst_size) + : frame_num_(frame_num), + sent_time_(sent_time), + received_time_(received_time), + previous_sent_time_(previousFrame.getSentTime()), + previous_received_time_(previousFrame.getReceivedTime()), + size_(size), + received_pkt_m(1), + burst_size_m(burst_size){}; + + void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) { + size_ += size; + sent_time_ = + (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time); + received_time_ = std::max(received_time, received_time_); + received_pkt_m++; + } + + bool isComplete() { return received_pkt_m == burst_size_m; } + + uint32_t getFrameSeqNum() const { return frame_num_; } + uint64_t getSentTime() const { return sent_time_; } + uint64_t getReceivedTime() const { return received_time_; } + uint32_t getFrameSize() const { return size_; } + + void setPreviousReceivedTime(uint64_t time) { + previous_received_time_ = time; + } + void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; } + + // todo manage first frame + double getReceivedDelta() { + return static_cast(received_time_ - previous_received_time_); + } + double getSentDelta() { + return static_cast(sent_time_ - previous_sent_time_); + } + + private: + uint32_t frame_num_; + uint64_t sent_time_; + uint64_t received_time_; + + uint64_t previous_sent_time_; + uint64_t previous_received_time_; + uint32_t size_; + + uint32_t received_pkt_m; + uint32_t burst_size_m; +}; + +class CongestionDetection { + public: + CongestionDetection(); + ~CongestionDetection(); + + void addPacket(const core::ContentObject &content_object); + + BandwidthUsage getState() { return cc_estimator_.State(); } + + void updateStats(); + + private: + TrendlineEstimator cc_estimator_; + std::map chunks_; + std::queue chunks_number_; + + FrameStats last_processed_chunk_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc new file mode 100644 index 000000000..efba362d4 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback, + asio::io_service &io_service) + : probe_interval_(0), + max_probes_(0), + sent_probes_(0), + probe_timer_(std::make_unique(io_service)), + rand_eng_((std::random_device())()), + distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ), + send_probe_callback_(std::move(send_callback)) {} + +ProbeHandler::~ProbeHandler() {} + +uint64_t ProbeHandler::getRtt(uint32_t seq) { + auto it = pending_probes_.find(seq); + + if (it == pending_probes_.end()) return 0; + + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t rtt = now - it->second; + if(rtt < 1) rtt = 1; + + pending_probes_.erase(it); + + return rtt; +} + +void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) { + stopProbes(); + probe_interval_ = probe_interval; + max_probes_ = max_probes; +} + +void ProbeHandler::stopProbes() { + probe_interval_ = 0; + max_probes_ = 0; + sent_probes_ = 0; + probe_timer_->cancel(); +} + +void ProbeHandler::sendProbes() { + if (probe_interval_ == 0) return; + if (max_probes_ != 0 && sent_probes_ >= max_probes_) return; + + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint32_t seq = distr_(rand_eng_); + pending_probes_.insert(std::pair(seq, now)); + send_probe_callback_(seq); + sent_probes_++; + + // clean up + // a probe may get lost. if the pending_probes_ size becomes bigger than + // MAX_PENDING_PROBES remove all the probes older than a seconds + if (pending_probes_.size() > MAX_PENDING_PROBES) { + for (auto it = pending_probes_.begin(); it != pending_probes_.end();) { + if ((now - it->second) > 1000) + it = pending_probes_.erase(it); + else + it++; + } + } + + if (probe_interval_ == 0) return; + + std::weak_ptr self(shared_from_this()); + probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); + probe_timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->sendProbes(); + } + }); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h new file mode 100644 index 000000000..b8ed84445 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include + +#include +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class ProbeHandler : public std::enable_shared_from_this { + public: + using SendProbeCallback = std::function; + + public: + ProbeHandler(SendProbeCallback &&send_callback, + asio::io_service &io_service); + + ~ProbeHandler(); + + // if the function returns 0 the probe is not valaid + uint64_t getRtt(uint32_t seq); + + // reset the probes parameters. it stop the current probing. + // to restar call sendProbes. + // probe_interval = 0 means that no event will be scheduled + // max_probe = 0 means no limit to the number of probe to send + void setProbes(uint32_t probe_interval, uint32_t max_probes); + + // stop to schedule probes + void stopProbes(); + + void sendProbes(); + + private: + uint32_t probe_interval_; // us + uint32_t max_probes_; // packets + uint32_t sent_probes_; // packets + + std::unique_ptr probe_timer_; + + // map from seqnumber to timestamp + std::unordered_map pending_probes_; + + // random generator + std::default_random_engine rand_eng_; + std::uniform_int_distribution distr_; + + SendProbeCallback send_probe_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc new file mode 100644 index 000000000..46659ac74 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -0,0 +1,607 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), + number_(0) { + icn_socket->getSocketOption(PORTAL, portal_); + round_timer_ = std::make_unique(portal_->getIoService()); + scheduler_timer_ = + std::make_unique(portal_->getIoService()); +} + +RTCTransportProtocol::~RTCTransportProtocol() {} + +void RTCTransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + + newRound(); + + portal_->runEventsLoop(); + is_running_ = false; +} + +// private +void RTCTransportProtocol::initParams() { + portal_->setConsumerCallback(this); + + rc_ = std::make_shared(); + ldr_ = std::make_shared( + std::bind(&RTCTransportProtocol::sendRtxInterest, this, + std::placeholders::_1), + portal_->getIoService()); + + state_ = std::make_shared( + std::bind(&RTCTransportProtocol::sendProbeInterest, this, + std::placeholders::_1), + std::bind(&RTCTransportProtocol::discoveredRtt, this), + portal_->getIoService()); + + rc_->setState(state_); + // TODO: for the moment we keep the congestion control disabled + // rc_->tunrOnRateControl(); + ldr_->setState(state_); + + // protocol state + start_send_interest_ = false; + current_state_ = SyncState::catch_up; + + // Cancel timer + number_++; + round_timer_->cancel(); + scheduler_timer_->cancel(); + scheduler_timer_on_ = false; + + // delete all timeouts and future nacks + timeouts_or_nacks_.clear(); + + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + // names/packets var + next_segment_ = 0; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + RTC_INTEREST_LIFETIME); +} + +// private +void RTCTransportProtocol::reset() { + TRANSPORT_LOGD("reset called"); + initParams(); + newRound(); +} + +void RTCTransportProtocol::inactiveProducer() { + // when the producer is inactive we reset the consumer state + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_, + max_sync_win_); + + // names/packets var + next_segment_ = 0; + + ldr_->clear(); +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); + // TODO pass weak_ptr here + round_timer_->async_wait([this, n{number_}](std::error_code ec) { + if (ec) return; + + if (n != number_) { + return; + } + + // saving counters that will be reset on new round + uint32_t sent_retx = state_->getSentRtxInRound(); + uint32_t received_bytes = state_->getReceivedBytesInRound(); + uint32_t sent_interest = state_->getSentInterestInRound(); + uint32_t lost_data = state_->getLostData(); + uint32_t recovered_losses = state_->getRecoveredLosses(); + uint32_t received_nacks = state_->getReceivedNacksInRound(); + + bool in_sync = (current_state_ == SyncState::in_sync); + state_->onNewRound((double)ROUND_LEN, in_sync); + rc_->onNewRound((double)ROUND_LEN); + + // update sync state if needed + if (current_state_ == SyncState::in_sync) { + double cache_rate = state_->getPacketFromCacheRatio(); + if (cache_rate > MAX_DATA_FROM_CACHE) { + current_state_ = SyncState::catch_up; + } + } else { + double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION; + double received_rate = state_->getReceivedRate(); + uint32_t round_without_nacks = state_->getRoundsWithoutNacks(); + double cache_ratio = state_->getPacketFromCacheRatio(); + if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && + received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { + current_state_ = SyncState::in_sync; + } + } + + TRANSPORT_LOGD("Calling updateSyncWindow in newRound function"); + updateSyncWindow(); + + sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + recovered_losses, received_nacks); + newRound(); + }); +} + +void RTCTransportProtocol::discoveredRtt() { + start_send_interest_ = true; + ldr_->turnOnRTX(); + updateSyncWindow(); +} + +void RTCTransportProtocol::computeMaxSyncWindow() { + double production_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + if (production_rate == 0.0 || packet_size == 0.0) { + // the consumer has no info about the producer, + // keep the previous maxCWin + TRANSPORT_LOGD( + "Returning in computeMaxSyncWindow because: prod_rate: %d || " + "packet_size: %d", + (int)(production_rate == 0.0), (int)(packet_size == 0.0)); + return; + } + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; + + + max_sync_win_ = + (uint32_t)ceil((production_rate * lifetime_ms * + INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); + + max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); +} + +void RTCTransportProtocol::updateSyncWindow() { + computeMaxSyncWindow(); + + if (max_sync_win_ == INITIAL_WIN_MAX) { + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return; + + current_sync_win_ = INITIAL_WIN; + scheduleNextInterests(); + return; + } + + double prod_rate = state_->getProducerRate(); + double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = state_->getAveragePacketSize(); + + // if some of the info are not available do not update the current win + if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + current_sync_win_ += (uint32_t) + ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size); + + if(current_state_ == SyncState::catch_up) { + current_sync_win_ = (uint32_t) (current_sync_win_ * CATCH_UP_WIN_INCREMENT); + } + + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); + current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + } + + scheduleNextInterests(); +} + +void RTCTransportProtocol::decreaseSyncWindow() { + // called on future nack + // we have a new sample of the production rate, so update max win first + computeMaxSyncWindow(); + current_sync_win_--; + current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + scheduleNextInterests(); +} + +void RTCTransportProtocol::sendInterest(Name *interest_name) { + TRANSPORT_LOGD("Sending interest for name %s", + interest_name->toString().c_str()); + + auto interest = core::PacketManager<>::getInstance().getPacket(); + interest->setName(*interest_name); + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + interest->setLifetime(uint32_t(lifetime)); + + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); +} + +void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { + if (!is_running_ && !is_first_) return; + + if(!start_send_interest_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send rtx %u", seq); + interest_name->setSuffix(seq); + sendInterest(interest_name); +} + +void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { + if (!is_running_ && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send probe %u", seq); + interest_name->setSuffix(seq); + sendInterest(interest_name); +} + +void RTCTransportProtocol::scheduleNextInterests() { + TRANSPORT_LOGD("Schedule next interests"); + + if (!is_running_ && !is_first_) return; + + if(!start_send_interest_) return; // RTT discovering phase is not finished so + // do not start to send interests + + if (scheduler_timer_on_) return; // wait befor send other interests + + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + TRANSPORT_LOGD("Inactive producer."); + // here we keep seding the same interest until the producer + // does not start again + if (next_segment_ != 0) { + // the producer just become inactive, reset the state + inactiveProducer(); + } + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send interest %u", next_segment_); + interest_name->setSuffix(next_segment_); + + if (portal_->interestIsPending(*interest_name)) { + // if interest 0 is already pending we return + return; + } + + sendInterest(interest_name); + state_->onSendNewInterest(interest_name); + return; + } + + TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d", + state_->getPendingInterestNumber(), current_sync_win_); + + // skip nacked pacekts + if (next_segment_ <= state_->getLastSeqNacked()) { + next_segment_ = state_->getLastSeqNacked() + 1; + } + + // skipe received packets + if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) { + next_segment_ = state_->getHighestSeqReceivedInOrder() + 1; + } + + uint32_t sent_interests = 0; + while ((state_->getPendingInterestNumber() < current_sync_win_) && + (sent_interests < MAX_INTERESTS_IN_BATCH)) { + TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_); + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + interest_name->setSuffix(next_segment_); + + // send the packet only if: + // 1) it is not pending yet (not true for rtx) + // 2) the packet is not received or lost + // 3) is not in the rtx list + if (portal_->interestIsPending(*interest_name) || + state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN || + ldr_->isRtx(next_segment_)) { + TRANSPORT_LOGD( + "skip interest %u because: pending %u, recv %u, rtx %u", + next_segment_, (portal_->interestIsPending(*interest_name)), + (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN), + (ldr_->isRtx(next_segment_))); + next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + continue; + } + + + sent_interests++; + TRANSPORT_LOGD("send interest %u", next_segment_); + sendInterest(interest_name); + state_->onSendNewInterest(interest_name); + + next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + } + + if (state_->getPendingInterestNumber() < current_sync_win_) { + // we still have space in the window but we already sent a batch of + // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one + // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop + + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now( + std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES)); + scheduler_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + if (!scheduler_timer_on_) return; + + scheduler_timer_on_ = false; + scheduleNextInterests(); + }); + } +} + +void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { + uint32_t segment_number = interest->getName().getSuffix(); + + TRANSPORT_LOGD("timeout for packet %u", segment_number); + + if (segment_number >= MIN_PROBE_SEQ) { + // this is a timeout on a probe, do nothing + return; + } + + timeouts_or_nacks_.insert(segment_number); + + if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && + segment_number <= state_->getHighestSeqReceivedInOrder()) { + // we retransmit packets only if the producer is active, otherwise we + // use timeouts to avoid to send too much traffic + // + // a timeout is sent using RTX only if it is an old packet. if it is for a + // seq number that we didn't reach yet, we send the packet using the normal + // schedule next interest + TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number); + ldr_->onTimeout(segment_number); + state_->onTimeout(segment_number); + scheduleNextInterests(); + return; + } + + TRANSPORT_LOGD("handle timeout for packet %u using normal interests", + segment_number); + + if (segment_number < next_segment_) { + // this is a timeout for a packet that will be generated in the future but + // we are asking for higher sequence numbers. we need to go back like in the + // case of future nacks + TRANSPORT_LOGD("on timeout next seg = %u, jump to %u", + next_segment_, segment_number); + next_segment_ = segment_number; + } + + state_->onTimeout(segment_number); + scheduleNextInterests(); +} + +void RTCTransportProtocol::onNack(const ContentObject &content_object) { + struct nack_packet_t *nack = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = nack->getProductionSegement(); + uint32_t nack_segment = content_object.getName().getSuffix(); + bool is_rtx = ldr_->isRtx(nack_segment); + + // check if the packet got a timeout + + TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment, + production_seg); + + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(nack_segment); + if (tn_it != timeouts_or_nacks_.end() || is_rtx) { + compute_stats = false; + // remove packets from timeouts_or_nacks only in case of a past nack + } + + state_->onNackPacketReceived(content_object, compute_stats); + ldr_->onNackPacketReceived(content_object); + + // both in case of past and future nack we set next_segment_ equal to the + // production segment in the nack. In case of past nack we will skip unneded + // interest (this is already done in the scheduleNextInterest in any case) + // while in case of future nacks we can go back in time and ask again for the + // content that generated the nack + TRANSPORT_LOGD("on nack next seg = %u, jump to %u", + next_segment_, production_seg); + next_segment_ = production_seg; + + if (production_seg > nack_segment) { + // remove the nack is it exists + if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + + // the client is asking for content in the past + // switch to catch up state and increase the window + // this is true only if the packet is not an RTX + if (!is_rtx) current_state_ = SyncState::catch_up; + + updateSyncWindow(); + } else { + // if production_seg == nack_segment we consider this a future nack, since + // production_seg is not yet created. this may happen in case of low + // production rate (e.g. ping at 1pps) + + // if a future nack was also retransmitted add it to the timeout_or_nacks + // set + if (is_rtx) timeouts_or_nacks_.insert(nack_segment); + + // the client is asking for content in the future + // switch to in sync state and decrease the window + current_state_ = SyncState::in_sync; + decreaseSyncWindow(); + } +} + +void RTCTransportProtocol::onProbe(const ContentObject &content_object) { + bool valid = state_->onProbePacketReceived(content_object); + if(!valid) return; + + struct nack_packet_t *probe = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = probe->getProductionSegement(); + + // as for the nacks set next_segment_ + TRANSPORT_LOGD("on probe next seg = %u, jump to %u", + next_segment_, production_seg); + next_segment_ = production_seg; + + ldr_->onProbePacketReceived(content_object); + updateSyncWindow(); +} + +void RTCTransportProtocol::onContentObject(Interest &interest, + ContentObject &content_object) { + TRANSPORT_LOGD("Received content object of size: %zu", + content_object.payloadSize()); + uint32_t payload_size = (uint32_t) content_object.payloadSize(); + uint32_t segment_number = content_object.getName().getSuffix(); + + if (segment_number >= MIN_PROBE_SEQ) { + TRANSPORT_LOGD("Received probe %u", segment_number); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onProbe(content_object); + return; + } + + if (payload_size == NACK_HEADER_SIZE) { + TRANSPORT_LOGD("Received nack %u", segment_number); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onNack(content_object); + return; + } + + TRANSPORT_LOGD("Received content %u", segment_number); + + rc_->onDataPacketReceived(content_object); + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(segment_number); + if (tn_it != timeouts_or_nacks_.end()) { + compute_stats = false; + timeouts_or_nacks_.erase(tn_it); + } + if (ldr_->isRtx(segment_number)) { + compute_stats = false; + } + + // check if the packet was already received + PacketState state = state_->isReceivedOrLost(segment_number); + state_->onDataPacketReceived(content_object, compute_stats); + ldr_->onDataPacketReceived(content_object); + + // if the stat for this seq number is received do not send the packet to app + if (state != PacketState::RECEIVED) { + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + reassemble(content_object); + } else { + TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number); + } + + updateSyncWindow(); +} + +void RTCTransportProtocol::sendStatsToApp( + uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, + uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) { + if (*stats_summary_) { + // Send the stats to the app + stats_->updateQueuingDelay(state_->getQueuing()); + + // stats_->updateInterestFecTx(0); //todo must be implemented + // stats_->updateBytesFecRecv(0); //todo must be implemented + + stats_->updateRetxCount(retx_count); + stats_->updateBytesRecv(received_bytes); + stats_->updateInterestTx(sent_interests); + stats_->updateReceivedNacks(received_nacks); + + stats_->updateAverageWindowSize(current_sync_win_); + stats_->updateLossRatio(state_->getLossRate()); + stats_->updateAverageRtt(state_->getRTT()); + stats_->updateLostData(lost_data); + stats_->updateRecoveredData(recovered_losses); + stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); + (*stats_summary_)(*socket_->getInterface(), *stats_); + } +} + +void RTCTransportProtocol::reassemble(ContentObject &content_object) { + auto read_buffer = content_object.getPayload(); + TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length()); + read_buffer->trimStart(DATA_HEADER_SIZE); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h new file mode 100644 index 000000000..596887067 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.h @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCTransportProtocol : public TransportProtocol, + public DatagramReassembly { + public: + RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + ~RTCTransportProtocol(); + + using TransportProtocol::start; + + using TransportProtocol::stop; + + void resume() override; + + private: + enum class SyncState { catch_up = 0, in_sync = 1, last }; + + private: + // setup functions + void initParams(); + void reset() override; + + void inactiveProducer(); + + // protocol functions + void discoveredRtt(); + void newRound(); + + // window functions + void computeMaxSyncWindow(); + void updateSyncWindow(); + void decreaseSyncWindow(); + + // packet functions + void sendInterest(Name *interest_name); + void sendRtxInterest(uint32_t seq); + void sendProbeInterest(uint32_t seq); + void scheduleNextInterests() override; + void onTimeout(Interest::Ptr &&interest) override; + void onNack(const ContentObject &content_object); + void onProbe(const ContentObject &content_object); + void reassemble(ContentObject &content_object) override; + void onContentObject(Interest &interest, + ContentObject &content_object) override; + void onPacketDropped(Interest &interest, + ContentObject &content_object) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} + + // interaction with app functions + void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes, + uint32_t sent_interests, uint32_t lost_data, + uint32_t recovered_losses, uint32_t received_nacks); + // protocol state + bool start_send_interest_; + SyncState current_state_; + // cwin vars + uint32_t current_sync_win_; + uint32_t max_sync_win_; + + // controller var + std::unique_ptr round_timer_; + std::unique_ptr scheduler_timer_; + bool scheduler_timer_on_; + + // timeouts + std::unordered_set timeouts_or_nacks_; + + // names/packets var + uint32_t next_segment_; + + std::shared_ptr state_; + std::shared_ptr rc_; + std::shared_ptr ldr_; + + uint32_t number_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h new file mode 100644 index 000000000..e172fc7a1 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +// used in rtc +// protocol consts +const uint32_t ROUND_LEN = 200; +// ms interval of time on which +// we take decisions / measurements +const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8; +// how big (in ms) should be the buffer at the producer. +// increasing this number we increase the time that an +// interest will wait for the data packet to be produced +// at the producer socket +const uint32_t PRODUCER_BUFFER_MS = 200; // ms + +// interest scheduler +const uint32_t MAX_INTERESTS_IN_BATCH = 5; +const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec + +// packet const +const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes +const uint32_t RTC_INTEREST_LIFETIME = 1000; + +// probes sequence range +const uint32_t MIN_PROBE_SEQ = 0xefffffff; +const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ; +const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1; +// RTT_PROBE_INTERVAL will be used during the section while +// INIT_RTT_PROBE_INTERVAL is used at the beginning to +// quickily estimate the RTT +const uint32_t RTT_PROBE_INTERVAL = 200000; // us +const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us +const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT +// if the produdcer is not yet started we need to probe multple times +// to get an answer. we wait 100ms between each try +const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms +// once we get the first probe we wait at most 60ms for the others +const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms +// we reuires at least 5 probes to be recevied +const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms +const uint32_t MAX_PENDING_PROBES = 10; + + +// congestion +const double MAX_QUEUING_DELAY = 100.0; // ms + +// data from cache +const double MAX_DATA_FROM_CACHE = 0.25; // 25% + +// window const +const uint32_t INITIAL_WIN = 5; // pkts +const uint32_t INITIAL_WIN_MAX = 1000000; // pkts +const uint32_t WIN_MIN = 5; // pkts +const double CATCH_UP_WIN_INCREMENT = 1.2; +// used in rate control +const double WIN_DECREASE_FACTOR = 0.5; +const double WIN_INCREASE_FACTOR = 1.5; + +// round in congestion +const double ROUNDS_BEFORE_TAKE_ACTION = 5; + +// used in state +const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3; +const double PRODUCTION_RATE_FRACTION = 0.8; + +const uint32_t INIT_PACKET_SIZE = 1200; + +const double MOVING_AVG_ALPHA = 0.8; + +const double MILLI_IN_A_SEC = 1000.0; +const double MICRO_IN_A_SEC = 1000000.0; + +const double MAX_CACHED_PACKETS = 262144; // 2^18 + // about 50 sec of traffic at 50Mbps + // with 1200 bytes packets + +const uint32_t MAX_ROUND_WHIOUT_PACKETS = (const uint32_t) + (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds; + +// used in ldr +const uint32_t RTC_MAX_RTX = 100; +const uint32_t RTC_MAX_AGE = 60000; // in ms +const uint64_t MAX_TIMER_RTX = ~0; +const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms +const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets +const double CATCH_UP_RTT_INCREMENT = 1.2; + +// used by producer +const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms +const uint32_t MIN_PRODUCTION_RATE = 10; // pps + // min prod rate + // set running several test + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc new file mode 100644 index 000000000..a545225cb --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCDataPath::RTCDataPath(uint32_t path_id) + : path_id_(path_id), + min_rtt(UINT_MAX), + prev_min_rtt(UINT_MAX), + min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the + // real OWD, but the measured one, that depends on the + // clock of sender and receiver. the only meaningful + // value is is the queueing delay. for this reason we + // keep both RTT (for the windowd calculation) and OWD + // (for congestion/quality control) + prev_min_owd(INT_MAX), + avg_owd(DBL_MAX), + queuing_delay(DBL_MAX), + jitter_(0.0), + last_owd_(0), + largest_recv_seq_(0), + largest_recv_seq_time_(0), + avg_inter_arrival_(DBL_MAX), + received_nacks_(false), + received_packets_(false), + rounds_without_packets_(0), + last_received_data_packet_(0), + RTT_history_(HISTORY_LEN), + OWD_history_(HISTORY_LEN){}; + +void RTCDataPath::insertRttSample(uint64_t rtt) { + // for the rtt we only keep track of the min one + if (rtt < min_rtt) min_rtt = rtt; + last_received_data_packet_ = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +void RTCDataPath::insertOwdSample(int64_t owd) { + // for owd we use both min and avg + if (owd < min_owd) min_owd = owd; + + if (avg_owd != DBL_MAX) + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + else { + avg_owd = (double)owd; + } + + int64_t queueVal = owd - std::min(getMinOwd(), min_owd); + + if (queuing_delay != DBL_MAX) + queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC); + else { + queuing_delay = (double)queueVal; + } + + // keep track of the jitter computed as for RTP (RFC 3550) + int64_t diff = std::abs(owd - last_owd_); + last_owd_ = owd; + jitter_ += (1.0 / 16.0) * ((double)diff - jitter_); + + // owd is computed only for valid data packets so we count only + // this for decide if we recevie traffic or not + received_packets_ = true; +} + +void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { + // got packet in sequence, compute gap + if (largest_recv_seq_ == (segment_number - 1)) { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t delta = now - largest_recv_seq_time_; + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = now; + if (avg_inter_arrival_ == DBL_MAX) + avg_inter_arrival_ = (double)delta; + else + avg_inter_arrival_ = + (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC); + return; + } + + // ooo packet, update the stasts if needed + if (largest_recv_seq_ <= segment_number) { + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } +} + +void RTCDataPath::receivedNack() { received_nacks_ = true; } + +double RTCDataPath::getInterArrivalGap() { + if (avg_inter_arrival_ == DBL_MAX) return 0; + return avg_inter_arrival_; +} + +bool RTCDataPath::isActive() { + if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) + return true; + return false; +} + +bool RTCDataPath::pathToProducer() { + if (received_nacks_) return true; + return false; +} + +void RTCDataPath::roundEnd() { + // reset min_rtt and add it to the history + if (min_rtt != UINT_MAX) { + prev_min_rtt = min_rtt; + } else { + // this may happen if we do not receive any packet + // from this path in the last round. in this case + // we use the measure from the previuos round + min_rtt = prev_min_rtt; + } + + if (min_rtt == 0) min_rtt = 1; + + RTT_history_.pushBack(min_rtt); + min_rtt = UINT_MAX; + + // do the same for min owd + if (min_owd != INT_MAX) { + prev_min_owd = min_owd; + } else { + min_owd = prev_min_owd; + } + + if (min_owd != INT_MAX) { + OWD_history_.pushBack(min_owd); + min_owd = INT_MAX; + } + + if (!received_packets_) + rounds_without_packets_++; + else + rounds_without_packets_ = 0; + received_packets_ = false; +} + +uint32_t RTCDataPath::getPathId() { return path_id_; } + +double RTCDataPath::getQueuingDealy() { return queuing_delay; } + +uint64_t RTCDataPath::getMinRtt() { + if (RTT_history_.size() != 0) return RTT_history_.begin(); + return 0; +} + +int64_t RTCDataPath::getMinOwd() { + if (OWD_history_.size() != 0) return OWD_history_.begin(); + return 0; +} + +double RTCDataPath::getJitter() { return jitter_; } + +uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; } + +void RTCDataPath::clearRtt() { RTT_history_.clear(); } + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h new file mode 100644 index 000000000..c5c37fc0d --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +const double ALPHA_RTC = 0.125; +const uint32_t HISTORY_LEN = 20; // 4 sec + +class RTCDataPath { + public: + RTCDataPath(uint32_t path_id); + + public: + void insertRttSample(uint64_t rtt); + void insertOwdSample(int64_t owd); + void computeInterArrivalGap(uint32_t segment_number); + void receivedNack(); + + uint32_t getPathId(); + uint64_t getMinRtt(); + double getQueuingDealy(); + double getInterArrivalGap(); + double getJitter(); + bool isActive(); + bool pathToProducer(); + uint64_t getLastPacketTS(); + + void clearRtt(); + + void roundEnd(); + + private: + uint32_t path_id_; + + int64_t getMinOwd(); + + uint64_t min_rtt; + uint64_t prev_min_rtt; + + int64_t min_owd; + int64_t prev_min_owd; + + double avg_owd; + + double queuing_delay; + + double jitter_; + int64_t last_owd_; + + uint32_t largest_recv_seq_; + uint64_t largest_recv_seq_time_; + double avg_inter_arrival_; + + // flags to check if a path is active + // we considere a path active if it reaches a producer + //(not a cache) --aka we got at least one nack on this path-- + // and if we receives packets + bool received_nacks_; + bool received_packets_; + uint8_t rounds_without_packets_; // if we don't get any packet + // for MAX_ROUNDS_WITHOUT_PKTS + // we consider the path inactive + uint64_t last_received_data_packet_; // timestamp for the last data received + // on this path + + utils::MinFilter RTT_history_; + utils::MinFilter OWD_history_; +}; + +} // namespace rtc + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc new file mode 100644 index 000000000..0ef381fe1 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -0,0 +1,427 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( + SendRtxCallback &&callback, asio::io_service &io_service) + : rtx_on_(false), + next_rtx_timer_(MAX_TIMER_RTX), + last_event_(0), + sentinel_timer_interval_(MAX_TIMER_RTX), + send_rtx_callback_(std::move(callback)) { + timer_ = std::make_unique(io_service); + sentinel_timer_ = std::make_unique(io_service); +} + +RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} + +void RTCLossDetectionAndRecovery::turnOnRTX() { + rtx_on_ = true; + scheduleSentinelTimer((uint32_t)(state_->getRTT() * CATCH_UP_RTT_INCREMENT)); +} + +void RTCLossDetectionAndRecovery::turnOffRTX() { + rtx_on_ = false; + clear(); +} + +void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { + // always add timeouts to the RTX list to avoid to send the same packet as if + // it was not a rtx + addToRetransmissions(seq, seq + 1); + last_event_ = getNow(); +} + +void RTCLossDetectionAndRecovery::onDataPacketReceived( + const core::ContentObject &content_object) { + last_event_ = getNow(); + + uint32_t seq = content_object.getName().getSuffix(); + if (deleteRtx(seq)) { + state_->onPacketRecovered(seq); + } else { + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + TRANSPORT_LOGD("received data. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq); + } +} + +void RTCLossDetectionAndRecovery::onNackPacketReceived( + const core::ContentObject &nack) { + last_event_ = getNow(); + + uint32_t seq = nack.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint32_t production_seq = nack_pkt->getProductionSegement(); + + if (production_seq > seq) { + // this is a past nack, all data before productionSeq are lost. if + // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to + // recover any packet. If this is not the case we can try to recover the + // packets between state_->getHighestSeqReceivedInOrder() and productionSeq. + // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with + // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and + // 14 that are not arrived yet + deleteRtx(seq); + TRANSPORT_LOGD("received past nack. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); + } else { + // future nack. here there should be a gap between the last data received + // and this packet and is it possible to recover the packets between the + // last received data and the production seq. we should not use the seq + // number of the nack since we know that is too early to ask for this seq + // number + // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack + // with productionSeq = 18. this says that all the packets between 12 and 18 + // may got lost and we should ask them + deleteRtx(seq); + TRANSPORT_LOGD("received futrue nack. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); + } +} + +void RTCLossDetectionAndRecovery::onProbePacketReceived( + const core::ContentObject &probe) { + // we don't log the reception of a probe packet for the sentinel timer because + // probes are not taken into account into the sync window. we use them as + // future nacks to detect possible packets lost + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + uint32_t production_seq = probe_pkt->getProductionSegement(); + TRANSPORT_LOGD("received probe. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); +} + +void RTCLossDetectionAndRecovery::clear() { + rtx_state_.clear(); + rtx_timers_.clear(); + sentinel_timer_->cancel(); + if (next_rtx_timer_ != MAX_TIMER_RTX) { + next_rtx_timer_ = MAX_TIMER_RTX; + timer_->cancel(); + } +} + +void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start, + uint32_t stop) { + // skip nacked packets + if (start <= state_->getLastSeqNacked()) { + start = state_->getLastSeqNacked() + 1; + } + + // skip received or lost packets + if (start <= state_->getHighestSeqReceivedInOrder()) { + start = state_->getHighestSeqReceivedInOrder() + 1; + } + + for (uint32_t seq = start; seq < stop; seq++) { + if (!isRtx(seq) && // is not already an rtx + // is not received or lost + state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { + // add rtx + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.next_send_ = computeNextSend(seq, true); + state.rtx_count_ = 0; + TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq, + (state.next_send_ - getNow())); + rtx_state_.insert(std::pair(seq, state)); + rtx_timers_.insert(std::pair(state.next_send_, seq)); + } + } + scheduleNextRtx(); +} + +uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, + bool new_rtx) { + uint64_t now = getNow(); + if (new_rtx) { + // for the new rtx we wait one estimated IAT after the loss detection. this + // is bacause, assuming that packets arrive with a constant IAT, we should + // get a new packet every IAT + double prod_rate = state_->getProducerRate(); + uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; + uint32_t jitter = 0; + + if (prod_rate != 0) { + double packet_size = state_->getAveragePacketSize(); + estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); + jitter = (uint32_t)ceil(state_->getJitter()); + } + + uint32_t wait = estimated_iat + jitter; + TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u", + seq, wait, state_->getRTT(), estimated_iat, jitter); + + return now + wait; + } else { + // wait one RTT + // however if the IAT is larger than the RTT, wait one IAT + uint32_t wait = SENTINEL_TIMER_INTERVAL; + + double prod_rate = state_->getProducerRate(); + if (prod_rate == 0) { + return now + SENTINEL_TIMER_INTERVAL; + } + + double packet_size = state_->getAveragePacketSize(); + uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); + + uint64_t rtt = state_->getRTT(); + if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; + wait = (uint32_t)rtt; + + if (estimated_iat > rtt) wait = estimated_iat; + + uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + wait += jitter; + + // it may happen that the channel is congested and we have some additional + // queuing delay to take into account + uint32_t queue = (uint32_t)ceil(state_->getQueuing()); + wait += queue; + + TRANSPORT_LOGD( + "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u", + seq, wait, state_->getRTT(), estimated_iat, jitter, queue); + + return now + wait; + } +} + +void RTCLossDetectionAndRecovery::retransmit() { + if (rtx_timers_.size() == 0) return; + + uint64_t now = getNow(); + + auto it = rtx_timers_.begin(); + std::unordered_set lost_pkt; + uint32_t sent_counter = 0; + while (it != rtx_timers_.end() && it->first <= now && + sent_counter < MAX_INTERESTS_IN_BATCH) { + uint32_t seq = it->second; + auto rtx_it = + rtx_state_.find(seq); // this should always return a valid iter + if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX || + (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || + seq < state_->getLastSeqNacked()) { + // max rtx reached or packet too old or packet nacked, this packet is lost + TRANSPORT_LOGD( + "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u", + seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX), + ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE), + (seq < state_->getLastSeqNacked())); + lost_pkt.insert(seq); + it++; + } else { + // resend the packet + state_->onRetransmission(seq); + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) rtx_it->second.rtx_count_++; + rtx_it->second.next_send_ = computeNextSend(seq, false); + it = rtx_timers_.erase(it); + rtx_timers_.insert( + std::pair(rtx_it->second.next_send_, seq)); + TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq, + (rtx_it->second.next_send_ - now)); + send_rtx_callback_(seq); + sent_counter++; + } + } + + // remove packets if needed + for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) { + uint32_t seq = *lost_it; + state_->onPacketLost(seq); + deleteRtx(seq); + } +} + +void RTCLossDetectionAndRecovery::scheduleNextRtx() { + if (rtx_timers_.size() == 0) { + // all the rtx were removed, reset timer + next_rtx_timer_ = MAX_TIMER_RTX; + return; + } + + // check if timer is alreay set + if (next_rtx_timer_ != MAX_TIMER_RTX) { + // a new check for rtx is already scheduled + if (next_rtx_timer_ > rtx_timers_.begin()->first) { + // we need to re-schedule it + timer_->cancel(); + } else { + // wait for the next timer + return; + } + } + + // set a new timer + next_rtx_timer_ = rtx_timers_.begin()->first; + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t wait = 1; + if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now) + wait = next_rtx_timer_ - now; + + std::weak_ptr self(shared_from_this()); + timer_->expires_from_now(std::chrono::milliseconds(wait)); + timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->retransmit(); + s->next_rtx_timer_ = MAX_TIMER_RTX; + s->scheduleNextRtx(); + } + }); +} + +bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) { + auto it_rtx = rtx_state_.find(seq); + if (it_rtx == rtx_state_.end()) return false; // rtx not found + + uint64_t ts = it_rtx->second.next_send_; + auto it_timers = rtx_timers_.find(ts); + while (it_timers != rtx_timers_.end() && it_timers->first == ts) { + if (it_timers->second == seq) { + rtx_timers_.erase(it_timers); + break; + } + it_timers++; + } + + bool lost = it_rtx->second.rtx_count_ > 0; + rtx_state_.erase(it_rtx); + + return lost; +} + +void RTCLossDetectionAndRecovery::scheduleSentinelTimer( + uint64_t expires_from_now) { + std::weak_ptr self(shared_from_this()); + sentinel_timer_->expires_from_now( + std::chrono::milliseconds(expires_from_now)); + sentinel_timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->sentinelTimer(); + } + }); +} + +void RTCLossDetectionAndRecovery::sentinelTimer() { + uint64_t now = getNow(); + + bool expired = false; + bool sent = false; + if ((now - last_event_) >= sentinel_timer_interval_) { + // at least a sentinel_timer_interval_ elapsed since last event + expired = true; + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + // this happens at the beginning (or if the producer stops for some + // reason) we need to keep sending interest 0 until we get an answer + TRANSPORT_LOGD( + "sentinel timer: the producer is not active, send packet 0"); + state_->onRetransmission(0); + send_rtx_callback_(0); + } else { + TRANSPORT_LOGD( + "sentinel timer: the producer is active, send the 10 oldest packets"); + sent = true; + uint32_t rtx = 0; + auto it = state_->getPendingInterestsMapBegin(); + auto end = state_->getPendingInterestsMapEnd(); + while (it != end && rtx < MAX_RTX_WITH_SENTINEL) { + uint32_t seq = it->first; + TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq); + addToRetransmissions(seq, seq + 1); + rtx++; + it++; + } + } + } else { + // sentinel timer did not expire because we registered at least one event + } + + uint32_t next_timer; + double prod_rate = state_->getProducerRate(); + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) { + TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL); + next_timer = SENTINEL_TIMER_INTERVAL; + } else { + double prod_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); + uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + + // try to reduce the number of timers if the estimated IAT is too small + next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1); + TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u", + next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat, + jitter); + + if (!expired) { + // discount the amout of time that is already passed + uint32_t discount = (uint32_t)(now - last_event_); + if (next_timer > discount) { + next_timer = next_timer - discount; + } else { + // in this case we trigger the timer in 1 ms + next_timer = 1; + } + TRANSPORT_LOGD("timer after discout: %u", next_timer); + } else if (sent) { + // wait at least one producer stats interval + owd to check if the + // production rate is reducing. + uint32_t min_wait = PRODUCER_STATS_INTERVAL + (uint32_t)ceil(state_->getQueuing()); + next_timer = std::max(next_timer, min_wait); + TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer); + } + } + + scheduleSentinelTimer(next_timer); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h new file mode 100644 index 000000000..c0912303b --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCLossDetectionAndRecovery + : public std::enable_shared_from_this { + struct rtx_state_ { + uint64_t first_send_; + uint64_t next_send_; + uint32_t rtx_count_; + }; + + using rtxState = struct rtx_state_; + using SendRtxCallback = std::function; + + public: + RTCLossDetectionAndRecovery(SendRtxCallback &&callback, + asio::io_service &io_service); + + ~RTCLossDetectionAndRecovery(); + + void setState(std::shared_ptr state) { state_ = state; } + void turnOnRTX(); + void turnOffRTX(); + + void onTimeout(uint32_t seq); + void onDataPacketReceived(const core::ContentObject &content_object); + void onNackPacketReceived(const core::ContentObject &nack); + void onProbePacketReceived(const core::ContentObject &probe); + + void clear(); + + bool isRtx(uint32_t seq) { + if (rtx_state_.find(seq) != rtx_state_.end()) return true; + return false; + } + + private: + void addToRetransmissions(uint32_t start, uint32_t stop); + uint64_t computeNextSend(uint32_t seq, bool new_rtx); + void retransmit(); + void scheduleNextRtx(); + bool deleteRtx(uint32_t seq); + void scheduleSentinelTimer(uint64_t expires_from_now); + void sentinelTimer(); + + uint64_t getNow() { + using namespace std::chrono; + uint64_t now = + duration_cast(steady_clock::now().time_since_epoch()) + .count(); + return now; + } + + // this map keeps track of the retransmitted interest, ordered from the oldest + // to the newest one. the state contains the timer of the first send of the + // interest (from pendingIntetests_), the timer of the next send (key of the + // multimap) and the number of rtx + std::map rtx_state_; + // this map stored the rtx by timer. The key is the time at which the rtx + // should be sent, and the val is the interest seq number + std::multimap rtx_timers_; + + bool rtx_on_; + uint64_t next_rtx_timer_; + uint64_t last_event_; + uint64_t sentinel_timer_interval_; + std::unique_ptr timer_; + std::unique_ptr sentinel_timer_; + std::shared_ptr state_; + + SendRtxCallback send_rtx_callback_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h new file mode 100644 index 000000000..2f2b19fb9 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +/* data packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | payload | + * | ... | + */ + +/* nack packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | uint32_t: current seg in production | + * +-----------------------------------------+ + */ + +#pragma once +#ifndef _WIN32 +#include +#else +#include +#endif + +namespace transport { + +namespace protocol { + +namespace rtc { + +inline uint64_t _ntohll(const uint64_t *input) { + uint64_t return_val; + uint8_t *tmp = (uint8_t *)&return_val; + + tmp[0] = (uint8_t)(*input >> 56); + tmp[1] = (uint8_t)(*input >> 48); + tmp[2] = (uint8_t)(*input >> 40); + tmp[3] = (uint8_t)(*input >> 32); + tmp[4] = (uint8_t)(*input >> 24); + tmp[5] = (uint8_t)(*input >> 16); + tmp[6] = (uint8_t)(*input >> 8); + tmp[7] = (uint8_t)(*input >> 0); + + return return_val; +} + +inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } + +const uint32_t DATA_HEADER_SIZE = 12; // bytes + // XXX: sizeof(data_packet_t) is 16 + // beacuse of padding +const uint32_t NACK_HEADER_SIZE = 16; + +struct data_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } +}; + +struct nack_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + uint32_t prod_seg; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + + inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } + inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h new file mode 100644 index 000000000..34d090092 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControl : public std::enable_shared_from_this { + public: + RTCRateControl() + : rc_on_(false), + congestion_win_(1000000), // init the win to a large number + congestion_state_(CongestionState::Normal), + protocol_state_(nullptr) {} + + virtual ~RTCRateControl() = default; + + void turnOnRateControl() { rc_on_ = true; } + void setState(std::shared_ptr state) { protocol_state_ = state; }; + uint32_t getCongesionWindow() { return congestion_win_; }; + + virtual void onNewRound(double round_len) = 0; + virtual void onDataPacketReceived( + const core::ContentObject &content_object) = 0; + + protected: + enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last }; + + protected: + bool rc_on_; + uint32_t congestion_win_; + CongestionState congestion_state_; + + std::shared_ptr protocol_state_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc new file mode 100644 index 000000000..b577b5bea --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_frame.cc @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {} + +RTCRateControlFrame::~RTCRateControlFrame() {} + +void RTCRateControlFrame::onNewRound(double round_len) { + if (!rc_on_) return; + + CongestionState prev_congestion_state = congestion_state_; + cc_detector_.updateStats(); + congestion_state_ = (CongestionState)cc_detector_.getState(); + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // congestion detected, notify app and init congestion win + double prod_rate = protocol_state_->getReceivedRate(); + double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = protocol_state_->getAveragePacketSize(); + + if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) { + // TODO do something + return; + } + + congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + } + uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + congestion_win_ = std::max(win, WIN_MIN); + return; + } +} + +void RTCRateControlFrame::onDataPacketReceived( + const core::ContentObject &content_object) { + if (!rc_on_) return; + + uint32_t seq = content_object.getName().getSuffix(); + if (!protocol_state_->isPending(seq)) return; + + cc_detector_.addPacket(content_object); +} + +void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts, + uint64_t last_probe_ts, + uint32_t total_probes) { + // TODO + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h new file mode 100644 index 000000000..25d5ddbb6 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_frame.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlFrame : public RTCRateControl { + public: + RTCRateControlFrame(); + + ~RTCRateControlFrame(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object); + + void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts, + uint32_t total_probes); + + private: + CongestionDetection cc_detector_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc new file mode 100644 index 000000000..3c7318dae --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlQueue::RTCRateControlQueue() + : rounds_since_last_drop_(0), + rounds_without_congestion_(0), + last_queue_(0) {} + +RTCRateControlQueue::~RTCRateControlQueue() {} + +void RTCRateControlQueue::onNewRound(double round_len) { + if (!rc_on_) return; + + double received_rate = protocol_state_->getReceivedRate(); + double target_rate = + protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION; + double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = protocol_state_->getAveragePacketSize(); + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + CongestionState prev_congestion_state = congestion_state_; + + if (prev_congestion_state == CongestionState::Normal && + received_rate >= target_rate) { + // if the queue is high in this case we are most likelly fighting with + // a TCP flow and there is enough bandwidth to match the producer rate + congestion_state_ = CongestionState::Normal; + } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) { + // here we detect congestion. in the case that last_queue == queue + // the consumer didn't receive any packet from the producer so we + // consider this case as congestion + // TODO: wath happen in case of high loss rate? + congestion_state_ = CongestionState::Congested; + } else { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + } + + last_queue_ = queue; + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // init the congetion window using the received rate + congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); + rounds_since_last_drop_ = (uint32_t)ROUNDS_BEFORE_TAKE_ACTION + 1; + } + + if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { + uint32_t win = congestion_win_ * (uint32_t)WIN_DECREASE_FACTOR; + congestion_win_ = std::max(win, WIN_MIN); + rounds_since_last_drop_ = 0; + return; + } + + rounds_since_last_drop_++; + } + + if (congestion_state_ == CongestionState::Normal) { + if (prev_congestion_state == CongestionState::Congested) { + rounds_without_congestion_ = 0; + } + + rounds_without_congestion_++; + if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; + + congestion_win_ = congestion_win_ * (uint32_t)WIN_INCREASE_FACTOR; + congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); + } +} + +void RTCRateControlQueue::onDataPacketReceived( + const core::ContentObject &content_object) { + // nothing to do + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h new file mode 100644 index 000000000..407354d43 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlQueue : public RTCRateControl { + public: + RTCRateControlQueue(); + + ~RTCRateControlQueue(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + uint32_t rounds_since_last_drop_; + uint32_t rounds_without_congestion_; + double last_queue_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc new file mode 100644 index 000000000..9c965bfed --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -0,0 +1,560 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service) + : rtt_probes_(std::make_shared( + std::move(rtt_probes_callback), io_service)), + discovered_rtt_callback_(std::move(discovered_rtt_callback)) { + init_rtt_timer_ = std::make_unique(io_service); + initParams(); +} + +RTCState::~RTCState() {} + +void RTCState::initParams() { + // packets counters (total) + sent_interests_ = 0; + sent_rtx_ = 0; + received_data_ = 0; + received_nacks_ = 0; + received_timeouts_ = 0; + received_probes_ = 0; + + // loss counters + packets_lost_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = 0; + highest_seq_received_ = 0; + highest_seq_received_in_order_ = 0; + last_seq_nacked_ = 0; + loss_rate_ = 0.0; + residual_loss_rate_ = 0.0; + + // bw counters + received_bytes_ = 0; + avg_packet_size_ = INIT_PACKET_SIZE; + production_rate_ = 0.0; + received_rate_ = 0.0; + + // nack counter + nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + // packets counter + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + data_from_cache_rate_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + // round conunters + rounds_ = 0; + rounds_without_nacks_ = 0; + rounds_without_packets_ = 0; + + last_production_seq_ = 0; + producer_is_active_ = false; + last_prod_update_ = 0; + + // paths stats + path_table_.clear(); + main_path_ = nullptr; + + // packet received + received_or_lost_packets_.clear(); + + // pending interests + pending_interests_.clear(); + + // init rtt + first_interest_sent_ = ~0; + init_rtt_ = false; + rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + rtt_probes_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); +} + +// packet events +void RTCState::onSendNewInterest(const core::Name *interest_name) { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint32_t seq = interest_name->getSuffix(); + pending_interests_.insert(std::pair(seq, now)); + + if(sent_interests_ == 0) first_interest_sent_ = now; + + sent_interests_++; + sent_interests_last_round_++; +} + +void RTCState::onTimeout(uint32_t seq) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + } + received_timeouts_++; +} + +void RTCState::onRetransmission(uint32_t seq) { + // remove the interest for the pendingInterest map only after the first rtx. + // in this way we can handle the ooo packets that come in late as normla + // packet. we consider a packet lost only if we sent at least an RTX for it. + // XXX this may become problematic if we stop the RTX transmissions + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + packets_lost_++; + } + sent_rtx_++; + sent_rtx_last_round_++; +} + +void RTCState::onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats) { + uint32_t seq = content_object.getName().getSuffix(); + if (compute_stats) { + updatePathStats(content_object, false); + received_data_last_round_++; + } + received_data_++; + + struct data_packet_t *data_pkt = + (struct data_packet_t *)content_object.getPayload()->data(); + uint64_t production_time = data_pkt->getTimestamp(); + if (last_prod_update_ < production_time) { + last_prod_update_ = production_time; + uint32_t production_rate = data_pkt->getProductionRate(); + production_rate_ = (double)production_rate; + } + + updatePacketSize(content_object); + updateReceivedBytes(content_object); + addRecvOrLost(seq, PacketState::RECEIVED); + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; + + received_packets_last_round_++; +} + +void RTCState::onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats) { + uint32_t seq = nack.getName().getSuffix(); + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint64_t production_time = nack_pkt->getTimestamp(); + uint32_t production_seq = nack_pkt->getProductionSegement(); + uint32_t production_rate = nack_pkt->getProductionRate(); + + if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || + last_prod_update_ < production_time) { + // update production rate + last_prod_update_ = production_time; + last_production_seq_ = production_seq; + production_rate_ = (double)production_rate; + } + + if (compute_stats) { + // this is not an RTX + updatePathStats(nack, true); + nack_on_last_round_ = true; + } + + // for statistics pourpose we log all nacks, also the one received for + // retransmitted packets + received_nacks_++; + received_nacks_last_round_++; + + if (production_seq > seq) { + // old nack, seq is lost + // update last nacked + if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; + TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq); + onPacketLost(seq); + } else if (seq > production_seq) { + // future nack + // remove the nack from the pending interest map + // (the packet is not received/lost yet) + pending_interests_.erase(seq); + } else { + // this should be a quite rear event. simply remove the + // packet from the pending interest list + pending_interests_.erase(seq); + } + + // the producer is responding + // we consider it active only if the production rate is not 0 + // or the production sequence number is not 1 + if (production_rate_ != 0 || production_seq != 1) { + producer_is_active_ = true; + } + + received_packets_last_round_++; +} + +void RTCState::onPacketLost(uint32_t seq) { + TRANSPORT_LOGD("packet %u is lost", seq); + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + // this packet was never retransmitted so it does + // not appear in the loss count + packets_lost_++; + } + addRecvOrLost(seq, PacketState::LOST); +} + +void RTCState::onPacketRecovered(uint32_t seq) { + losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); +} + +bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + uint64_t rtt; + + rtt = rtt_probes_->getRtt(seq); + + if (rtt == 0) return false; // this is not a valid probe + + // like for data and nacks update the path stats. Here the RTT is computed + // by the probe handler. Both probes for rtt and bw are good to esimate + // info on the path + uint32_t path_label = probe.getPathLabel(); + + auto path_it = path_table_.find(path_label); + + // update production rate and last_seq_nacked like in case of a nack + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + uint64_t sender_timestamp = probe_pkt->getTimestamp(); + uint32_t production_seq = probe_pkt->getProductionSegement(); + uint32_t production_rate = probe_pkt->getProductionRate(); + + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr newPath = + std::make_shared(path_label); + auto ret = path_table_.insert( + std::pair>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + path->insertRttSample(rtt); + path->receivedNack(); + + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + int64_t OWD = now - sender_timestamp; + path->insertOwdSample(OWD); + + if (last_prod_update_ < sender_timestamp) { + last_production_seq_ = production_seq; + last_prod_update_ = sender_timestamp; + production_rate_ = (double)production_rate; + } + + // the producer is responding + // we consider it active only if the production rate is not 0 + // or the production sequence numner is not 1 + if (production_rate_ != 0 || production_seq != 1) { + producer_is_active_ = true; + } + + // check for init RTT. if received_probes_ is equal to 0 schedule a timer to + // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't + // wait forever + received_probes_++; + + if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){ + if(received_probes_ == 1){ + // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others + main_path_ = path; + setInitRttTimer(INIT_RTT_PROBE_WAIT); + } + if(received_probes_ == INIT_RTT_PROBES) { + // we are done + init_rtt_timer_->cancel(); + checkInitRttTimer(); + } + } + + received_packets_last_round_++; + + // ignore probes sent before the first interest + if((now - rtt) <= first_interest_sent_) return false; + return true; +} + +void RTCState::onNewRound(double round_len, bool in_sync) { + // XXX + // here we take into account only the single path case so we assume that we + // don't use two paths in parellel for this single flow + + if (path_table_.empty()) return; + + double bytes_per_sec = + ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); + if(received_rate_ == 0) + received_rate_ = bytes_per_sec; + else + received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + + // search for an active path. There should be only one active path (meaning a + // path that leads to the producer socket -no cache- and from which we are + // currently getting data packets) at any time. However it may happen that + // there are mulitple active paths in case of mobility (the old path will + // remain active for a short ammount of time). The main path is selected as + // the active path from where the consumer received the latest data packet + + uint64_t last_packet_ts = 0; + main_path_ = nullptr; + + for (auto it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->roundEnd(); + if (it->second->isActive()) { + uint64_t ts = it->second->getLastPacketTS(); + if (ts > last_packet_ts) { + last_packet_ts = ts; + main_path_ = it->second; + } + } + } + + if (in_sync) updateLossRate(); + + // handle nacks + if (!nack_on_last_round_ && received_bytes_ > 0) { + rounds_without_nacks_++; + } else { + rounds_without_nacks_ = 0; + } + + // check if the producer is active + if (received_packets_last_round_ != 0) { + rounds_without_packets_ = 0; + } else { + rounds_without_packets_++; + if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS && + producer_is_active_ != false) { + initParams(); + } + } + + // compute cache/producer ratio + if (received_data_last_round_ != 0) { + double new_rate = + (double)received_data_from_cache_ / (double)received_data_last_round_; + data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA + + (new_rate * (1 - MOVING_AVG_ALPHA)); + } + + // reset counters + received_bytes_ = 0; + packets_lost_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = highest_seq_received_; + + nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + rounds_++; +} + +void RTCState::updateReceivedBytes(const core::ContentObject &content_object) { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); +} + +void RTCState::updatePacketSize(const core::ContentObject &content_object) { + uint32_t pkt_size = + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) + + ((1 - MOVING_AVG_ALPHA) * pkt_size); +} + +void RTCState::updatePathStats(const core::ContentObject &content_object, + bool is_nack) { + // get packet path + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr newPath = + std::make_shared(path_label); + auto ret = path_table_.insert( + std::pair>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + // compute rtt + uint32_t seq = content_object.getName().getSuffix(); + uint64_t interest_sent_time = getInterestSentTime(seq); + if (interest_sent_time == 0) + return; // this should not happen, + // it means that we are processing an interest + // that is not pending + + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint64_t RTT = now - interest_sent_time; + + path->insertRttSample(RTT); + + // compute OWD (the first part of the nack and data packet header are the + // same, so we cast to data data packet) + struct data_packet_t *packet = + (struct data_packet_t *)content_object.getPayload()->data(); + uint64_t sender_timestamp = packet->getTimestamp(); + int64_t OWD = now - sender_timestamp; + path->insertOwdSample(OWD); + + // compute IAT or set path to producer + if (!is_nack) { + // compute the iat only for the content packets + uint32_t segment_number = content_object.getName().getSuffix(); + path->computeInterArrivalGap(segment_number); + if (!path->pathToProducer()) received_data_from_cache_++; + } else { + path->receivedNack(); + } +} + +void RTCState::updateLossRate() { + loss_rate_ = 0.0; + residual_loss_rate_ = 0.0; + + uint32_t number_theorically_received_packets_ = + highest_seq_received_ - first_seq_in_round_; + + // in this case no new packet was recevied after the previuos round, avoid + // division by 0 + if (number_theorically_received_packets_ == 0) return; + + loss_rate_ = (double)((double)(packets_lost_) / + (double)number_theorically_received_packets_); + + residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / + (double)number_theorically_received_packets_); + + if (residual_loss_rate_ < 0) residual_loss_rate_ = 0; +} + +void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { + pending_interests_.erase(seq); + if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { + received_or_lost_packets_.erase(received_or_lost_packets_.begin()); + } + // notice that it may happen that a packet that we consider lost arrives after + // some time, in this case we simply overwrite the packet state. + received_or_lost_packets_[seq] = state; + + // keep track of the last packet received/lost + // without holes. + if (highest_seq_received_in_order_ < last_seq_nacked_) { + highest_seq_received_in_order_ = last_seq_nacked_; + } + + if ((highest_seq_received_in_order_ + 1) == seq) { + highest_seq_received_in_order_ = seq; + } else if (seq <= highest_seq_received_in_order_) { + // here we do nothing + } else if (seq > highest_seq_received_in_order_) { + // 1) there is a gap in the sequence so we do not update largest_in_seq_ + // 2) all the packets from largest_in_seq_ to seq are in + // received_or_lost_packets_ an we upate largest_in_seq_ + + for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { + if (received_or_lost_packets_.find(i) == + received_or_lost_packets_.end()) { + break; + } + // this packet is in order so we can update the + // highest_seq_received_in_order_ + highest_seq_received_in_order_ = i; + } + } +} + +void RTCState::setInitRttTimer(uint32_t wait){ + init_rtt_timer_->cancel(); + init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); + init_rtt_timer_->async_wait([this](std::error_code ec) { + if(ec) return; + checkInitRttTimer(); + }); +} + +void RTCState::checkInitRttTimer() { + if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){ + // we didn't received enough probes, restart + received_probes_ = 0; + rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + rtt_probes_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); + return; + } + init_rtt_ = true; + main_path_->roundEnd(); + rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0); + rtt_probes_->sendProbes(); + + // init last_seq_nacked_. skip packets that may come from the cache + double prod_rate = getProducerRate(); + double rtt = (double)getRTT() / MILLI_IN_A_SEC; + double packet_size = getAveragePacketSize(); + uint32_t pkt_in_rtt_ = (uint32_t)std::floor(((prod_rate / packet_size) * rtt) * 0.8); + last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; + + discovered_rtt_callback_(); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h new file mode 100644 index 000000000..e4fefaffe --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN }; + +class RTCState : std::enable_shared_from_this { + public: + using DiscoveredRttCallback = std::function; + public: + RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service); + + ~RTCState(); + + // packet events + void onSendNewInterest(const core::Name *interest_name); + void onTimeout(uint32_t seq); + void onRetransmission(uint32_t seq); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + void onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats); + void onPacketLost(uint32_t seq); + void onPacketRecovered(uint32_t seq); + bool onProbePacketReceived(const core::ContentObject &probe); + + // protocol state + void onNewRound(double round_len, bool in_sync); + + // main path + uint32_t getProducerPath() const { + if (mainPathIsValid()) return main_path_->getPathId(); + return 0; + } + + // delay metrics + bool isRttDiscovered() const { + return init_rtt_; + } + + uint64_t getRTT() const { + if (mainPathIsValid()) return main_path_->getMinRtt(); + return 0; + } + void resetRttStats() { + if (mainPathIsValid()) main_path_->clearRtt(); + } + + double getQueuing() const { + if (mainPathIsValid()) return main_path_->getQueuingDealy(); + return 0.0; + } + double getIAT() const { + if (mainPathIsValid()) return main_path_->getInterArrivalGap(); + return 0.0; + } + + double getJitter() const { + if (mainPathIsValid()) return main_path_->getJitter(); + return 0.0; + } + + // pending interests + uint64_t getInterestSentTime(uint32_t seq) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) return it->second; + return 0; + } + bool isPending(uint32_t seq) { + if (pending_interests_.find(seq) != pending_interests_.end()) return true; + return false; + } + uint32_t getPendingInterestNumber() const { + return (uint32_t)pending_interests_.size(); + } + PacketState isReceivedOrLost(uint32_t seq) { + auto it = received_or_lost_packets_.find(seq); + if (it != received_or_lost_packets_.end()) return it->second; + return PacketState::UNKNOWN; + } + + // loss rate + double getLossRate() const { return loss_rate_; } + double getResidualLossRate() const { return residual_loss_rate_; } + uint32_t getHighestSeqReceivedInOrder() const { + return highest_seq_received_in_order_; + } + uint32_t getLostData() const { return packets_lost_; }; + uint32_t getRecoveredLosses() const { return losses_recovered_; } + + // generic stats + uint32_t getReceivedBytesInRound() const { return received_bytes_; } + uint32_t getReceivedNacksInRound() const { + return received_nacks_last_round_; + } + uint32_t getSentInterestInRound() const { return sent_interests_last_round_; } + uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; } + + // bandwidth/production metrics + double getAvailableBw() const { return 0.0; }; // TODO + double getProducerRate() const { return production_rate_; } + double getReceivedRate() const { return received_rate_; } + double getAveragePacketSize() const { return avg_packet_size_; } + + // nacks + uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; } + uint32_t getLastSeqNacked() const { return last_seq_nacked_; } + + // producer state + bool isProducerActive() const { return producer_is_active_; } + + // packets from cache + double getPacketFromCacheRatio() const { return data_from_cache_rate_; } + + std::map::iterator getPendingInterestsMapBegin() { + return pending_interests_.begin(); + } + std::map::iterator getPendingInterestsMapEnd() { + return pending_interests_.end(); + } + + private: + void initParams(); + + // update stats + void updateState(); + void updateReceivedBytes(const core::ContentObject &content_object); + void updatePacketSize(const core::ContentObject &content_object); + void updatePathStats(const core::ContentObject &content_object, bool is_nack); + void updateLossRate(); + + void addRecvOrLost(uint32_t seq, PacketState state); + + void setInitRttTimer(uint32_t wait); + void checkInitRttTimer(); + + bool mainPathIsValid() const { + if (main_path_ != nullptr) + return true; + else + return false; + } + + // packets counters (total) + uint32_t sent_interests_; + uint32_t sent_rtx_; + uint32_t received_data_; + uint32_t received_nacks_; + uint32_t received_timeouts_; + uint32_t received_probes_; + + // loss counters + int32_t packets_lost_; + int32_t losses_recovered_; + uint32_t first_seq_in_round_; + uint32_t highest_seq_received_; + uint32_t highest_seq_received_in_order_; + uint32_t last_seq_nacked_; // segment for which we got an oldNack + double loss_rate_; + double residual_loss_rate_; + + // bw counters + uint32_t received_bytes_; + double avg_packet_size_; + double production_rate_; // rate communicated by the producer using nacks + double received_rate_; // rate recevied by the consumer + + // nack counter + // the bool takes tracks only about the valid nacks (no rtx) and it is used to + // switch between the states. Instead received_nacks_last_round_ logs all the + // nacks for statistics + bool nack_on_last_round_; + uint32_t received_nacks_last_round_; + + // packets counter + uint32_t received_packets_last_round_; + uint32_t received_data_last_round_; + uint32_t received_data_from_cache_; + double data_from_cache_rate_; + uint32_t sent_interests_last_round_; + uint32_t sent_rtx_last_round_; + + // round conunters + uint32_t rounds_; + uint32_t rounds_without_nacks_; + uint32_t rounds_without_packets_; + + // init rtt + uint64_t first_interest_sent_; + + // producer state + bool + producer_is_active_; // the prodcuer is active if we receive some packets + uint32_t last_production_seq_; // last production seq received by the producer + uint64_t last_prod_update_; // timestamp of the last packets used to update + // stats from the producer + + // paths stats + std::unordered_map> path_table_; + std::shared_ptr main_path_; + + // packet received + // cache where to store info about the last MAX_CACHED_PACKETS + std::map received_or_lost_packets_; + + // pending interests + std::map pending_interests_; + + // probes + std::shared_ptr rtt_probes_; + bool init_rtt_; + std::unique_ptr init_rtt_timer_; + + // callbacks + DiscoveredRttCallback discovered_rtt_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc new file mode 100644 index 000000000..7a0803857 --- /dev/null +++ b/libtransport/src/protocols/rtc/trendline_estimator.cc @@ -0,0 +1,334 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// FROM +// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc + +#include "trendline_estimator.h" + +#include + +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +// Parameters for linear least squares fit of regression line to noisy data. +constexpr double kDefaultTrendlineSmoothingCoeff = 0.9; +constexpr double kDefaultTrendlineThresholdGain = 4.0; +// const char kBweWindowSizeInPacketsExperiment[] = +// "WebRTC-BweWindowSizeInPackets"; + +/*size_t ReadTrendlineFilterWindowSize( + const WebRtcKeyValueConfig* key_value_config) { + std::string experiment_string = + key_value_config->Lookup(kBweWindowSizeInPacketsExperiment); + size_t window_size; + int parsed_values = + sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size); + if (parsed_values == 1) { + if (window_size > 1) + return window_size; + RTC_LOG(WARNING) << "Window size must be greater than 1."; + } + RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets" + " experiment from field trial string. Using default."; + return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize; +} +*/ + +OptionalDouble LinearFitSlope( + const std::deque& packets) { + // RTC_DCHECK(packets.size() >= 2); + // Compute the "center of mass". + double sum_x = 0; + double sum_y = 0; + for (const auto& packet : packets) { + sum_x += packet.arrival_time_ms; + sum_y += packet.smoothed_delay_ms; + } + double x_avg = sum_x / packets.size(); + double y_avg = sum_y / packets.size(); + // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2 + double numerator = 0; + double denominator = 0; + for (const auto& packet : packets) { + double x = packet.arrival_time_ms; + double y = packet.smoothed_delay_ms; + numerator += (x - x_avg) * (y - y_avg); + denominator += (x - x_avg) * (x - x_avg); + } + if (denominator == 0) return OptionalDouble(); + return OptionalDouble(numerator / denominator); +} + +OptionalDouble ComputeSlopeCap( + const std::deque& packets, + const TrendlineEstimatorSettings& settings) { + /*RTC_DCHECK(1 <= settings.beginning_packets && + settings.beginning_packets < packets.size()); + RTC_DCHECK(1 <= settings.end_packets && + settings.end_packets < packets.size()); + RTC_DCHECK(settings.beginning_packets + settings.end_packets <= + packets.size());*/ + TrendlineEstimator::PacketTiming early = packets[0]; + for (size_t i = 1; i < settings.beginning_packets; ++i) { + if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i]; + } + size_t late_start = packets.size() - settings.end_packets; + TrendlineEstimator::PacketTiming late = packets[late_start]; + for (size_t i = late_start + 1; i < packets.size(); ++i) { + if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i]; + } + if (late.arrival_time_ms - early.arrival_time_ms < 1) { + return OptionalDouble(); + } + return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) / + (late.arrival_time_ms - early.arrival_time_ms) + + settings.cap_uncertainty); +} + +constexpr double kMaxAdaptOffsetMs = 15.0; +constexpr double kOverUsingTimeThreshold = 10; +constexpr int kMinNumDeltas = 60; +constexpr int kDeltaCounterMax = 1000; + +//} // namespace + +constexpr char TrendlineEstimatorSettings::kKey[]; + +TrendlineEstimatorSettings::TrendlineEstimatorSettings( + /*const WebRtcKeyValueConfig* key_value_config*/) { + /*if (absl::StartsWith( + key_value_config->Lookup(kBweWindowSizeInPacketsExperiment), + "Enabled")) { + window_size = ReadTrendlineFilterWindowSize(key_value_config); + } + Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/ + window_size = kDefaultTrendlineWindowSize; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + + /*if (window_size < 10 || 200 < window_size) { + RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets"; + window_size = kDefaultTrendlineWindowSize; + } + if (enable_cap) { + if (beginning_packets < 1 || end_packets < 1 || + beginning_packets > window_size || end_packets > window_size) { + RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and " + << window_size; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + } + if (beginning_packets + end_packets > window_size) { + RTC_LOG(LS_WARNING) + << "Size of beginning plus end can't exceed the window size"; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + } + if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) { + RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025"; + cap_uncertainty = 0.0; + } + }*/ +} + +/*std::unique_ptr TrendlineEstimatorSettings::Parser() { + return StructParametersParser::Create("sort", &enable_sort, // + "cap", &enable_cap, // + "beginning_packets", + &beginning_packets, // + "end_packets", &end_packets, // + "cap_uncertainty", &cap_uncertainty, // + "window_size", &window_size); +}*/ + +TrendlineEstimator::TrendlineEstimator( + /*const WebRtcKeyValueConfig* key_value_config, + NetworkStatePredictor* network_state_predictor*/) + : settings_(), + smoothing_coef_(kDefaultTrendlineSmoothingCoeff), + threshold_gain_(kDefaultTrendlineThresholdGain), + num_of_deltas_(0), + first_arrival_time_ms_(-1), + accumulated_delay_(0), + smoothed_delay_(0), + delay_hist_(), + k_up_(0.0087), + k_down_(0.039), + overusing_time_threshold_(kOverUsingTimeThreshold), + threshold_(12.5), + prev_modified_trend_(NAN), + last_update_ms_(-1), + prev_trend_(0.0), + time_over_using_(-1), + overuse_counter_(0), + hypothesis_(BandwidthUsage::kBwNormal){ + // hypothesis_predicted_(BandwidthUsage::kBwNormal){//}, + // network_state_predictor_(network_state_predictor) { + /* RTC_LOG(LS_INFO) + << "Using Trendline filter for delay change estimation with settings " + << settings_.Parser()->Encode() << " and " + // << (network_state_predictor_ ? "injected" : "no") + << " network state predictor";*/ +} + +TrendlineEstimator::~TrendlineEstimator() {} + +void TrendlineEstimator::UpdateTrendline(double recv_delta_ms, + double send_delta_ms, + int64_t send_time_ms, + int64_t arrival_time_ms, + size_t packet_size) { + const double delta_ms = recv_delta_ms - send_delta_ms; + ++num_of_deltas_; + num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax); + if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms; + + // Exponential backoff filter. + accumulated_delay_ += delta_ms; + // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms, + // accumulated_delay_); + smoothed_delay_ = smoothing_coef_ * smoothed_delay_ + + (1 - smoothing_coef_) * accumulated_delay_; + // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms, + // smoothed_delay_); + + // Maintain packet window + delay_hist_.emplace_back( + static_cast(arrival_time_ms - first_arrival_time_ms_), + smoothed_delay_, accumulated_delay_); + if (settings_.enable_sort) { + for (size_t i = delay_hist_.size() - 1; + i > 0 && + delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms; + --i) { + std::swap(delay_hist_[i], delay_hist_[i - 1]); + } + } + if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front(); + + // Simple linear regression. + double trend = prev_trend_; + if (delay_hist_.size() == settings_.window_size) { + // Update trend_ if it is possible to fit a line to the data. The delay + // trend can be seen as an estimate of (send_rate - capacity)/capacity. + // 0 < trend < 1 -> the delay increases, queues are filling up + // trend == 0 -> the delay does not change + // trend < 0 -> the delay decreases, queues are being emptied + OptionalDouble trendO = LinearFitSlope(delay_hist_); + if (trendO.has_value()) trend = trendO.value(); + if (settings_.enable_cap) { + OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_); + // We only use the cap to filter out overuse detections, not + // to detect additional underuses. + if (trend >= 0 && cap.has_value() && trend > cap.value()) { + trend = cap.value(); + } + } + } + // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend); + + Detect(trend, send_delta_ms, arrival_time_ms); +} + +void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms, + int64_t send_time_ms, int64_t arrival_time_ms, + size_t packet_size, bool calculated_deltas) { + if (calculated_deltas) { + UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms, + packet_size); + } + /*if (network_state_predictor_) { + hypothesis_predicted_ = network_state_predictor_->Update( + send_time_ms, arrival_time_ms, hypothesis_); + }*/ +} + +BandwidthUsage TrendlineEstimator::State() const { + return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_; +} + +void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) { + /*if (num_of_deltas_ < 2) { + hypothesis_ = BandwidthUsage::kBwNormal; + return; + }*/ + + const double modified_trend = + std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_; + prev_modified_trend_ = modified_trend; + // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend); + // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_); + if (modified_trend > threshold_) { + if (time_over_using_ == -1) { + // Initialize the timer. Assume that we've been + // over-using half of the time since the previous + // sample. + time_over_using_ = ts_delta / 2; + } else { + // Increment timer + time_over_using_ += ts_delta; + } + overuse_counter_++; + if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) { + if (trend >= prev_trend_) { + time_over_using_ = 0; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwOverusing; + } + } + } else if (modified_trend < -threshold_) { + time_over_using_ = -1; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwUnderusing; + } else { + time_over_using_ = -1; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwNormal; + } + prev_trend_ = trend; + UpdateThreshold(modified_trend, now_ms); +} + +void TrendlineEstimator::UpdateThreshold(double modified_trend, + int64_t now_ms) { + if (last_update_ms_ == -1) last_update_ms_ = now_ms; + + if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) { + // Avoid adapting the threshold to big latency spikes, caused e.g., + // by a sudden capacity drop. + last_update_ms_ = now_ms; + return; + } + + const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_; + const int64_t kMaxTimeDeltaMs = 100; + int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs); + threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms; + if (threshold_ < 6.f) threshold_ = 6.f; + if (threshold_ > 600.f) threshold_ = 600.f; + // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f); + last_update_ms_ = now_ms; +} + +} // namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h new file mode 100644 index 000000000..372acbc67 --- /dev/null +++ b/libtransport/src/protocols/rtc/trendline_estimator.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// FROM +// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h + +#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ +#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ + +#include +#include + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +namespace rtc { + +class OptionalDouble { + public: + OptionalDouble() : val(0), has_val(false){}; + OptionalDouble(double val) : val(val), has_val(true){}; + + double value() { return val; } + bool has_value() { return has_val; } + + private: + double val; + bool has_val; +}; + +enum class BandwidthUsage { + kBwNormal = 0, + kBwUnderusing = 1, + kBwOverusing = 2, + kLast +}; + +struct TrendlineEstimatorSettings { + static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings"; + static constexpr unsigned kDefaultTrendlineWindowSize = 20; + + // TrendlineEstimatorSettings() = delete; + TrendlineEstimatorSettings( + /*const WebRtcKeyValueConfig* key_value_config*/); + + // Sort the packets in the window. Should be redundant, + // but then almost no cost. + bool enable_sort = false; + + // Cap the trendline slope based on the minimum delay seen + // in the beginning_packets and end_packets respectively. + bool enable_cap = false; + unsigned beginning_packets = 7; + unsigned end_packets = 7; + double cap_uncertainty = 0.0; + + // Size (in packets) of the window. + unsigned window_size = kDefaultTrendlineWindowSize; + + // std::unique_ptr Parser(); +}; + +class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ { + public: + TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config, + NetworkStatePredictor* network_state_predictor*/); + + ~TrendlineEstimator(); + + // Update the estimator with a new sample. The deltas should represent deltas + // between timestamp groups as defined by the InterArrival class. + void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms, + int64_t arrival_time_ms, size_t packet_size, + bool calculated_deltas); + + void UpdateTrendline(double recv_delta_ms, double send_delta_ms, + int64_t send_time_ms, int64_t arrival_time_ms, + size_t packet_size); + + BandwidthUsage State() const; + + struct PacketTiming { + PacketTiming(double arrival_time_ms, double smoothed_delay_ms, + double raw_delay_ms) + : arrival_time_ms(arrival_time_ms), + smoothed_delay_ms(smoothed_delay_ms), + raw_delay_ms(raw_delay_ms) {} + double arrival_time_ms; + double smoothed_delay_ms; + double raw_delay_ms; + }; + + private: + // friend class GoogCcStatePrinter; + void Detect(double trend, double ts_delta, int64_t now_ms); + + void UpdateThreshold(double modified_offset, int64_t now_ms); + + // Parameters. + TrendlineEstimatorSettings settings_; + const double smoothing_coef_; + const double threshold_gain_; + // Used by the existing threshold. + int num_of_deltas_; + // Keep the arrival times small by using the change from the first packet. + int64_t first_arrival_time_ms_; + // Exponential backoff filtering. + double accumulated_delay_; + double smoothed_delay_; + // Linear least squares regression. + std::deque delay_hist_; + + const double k_up_; + const double k_down_; + double overusing_time_threshold_; + double threshold_; + double prev_modified_trend_; + int64_t last_update_ms_; + double prev_trend_; + double time_over_using_; + int overuse_counter_; + BandwidthUsage hypothesis_; + // BandwidthUsage hypothesis_predicted_; + // NetworkStatePredictor* network_state_predictor_; + + // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator); +}; + +} // namespace rtc + +} // end namespace protocol + +} // end namespace transport +#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ -- cgit 1.2.3-korg