aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt38
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.cc101
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.h138
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc107
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h75
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc607
-rw-r--r--libtransport/src/protocols/rtc/rtc.h113
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h121
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc197
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h97
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc427
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h108
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h89
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc.h58
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.cc79
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc560
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h253
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.cc334
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.h147
22 files changed, 3848 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
new file mode 100644
index 000000000..77f065d0e
--- /dev/null
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -0,0 +1,38 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc
new file mode 100644
index 000000000..e2d44ae66
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.cc
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/log.h>
+#include <protocols/rtc/congestion_detection.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+CongestionDetection::CongestionDetection()
+ : cc_estimator_(), last_processed_chunk_() {}
+
+CongestionDetection::~CongestionDetection() {}
+
+void CongestionDetection::updateStats() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_number_.empty()) return;
+
+ uint32_t chunk_number = chunks_number_.front();
+
+ while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS <
+ now ||
+ chunks_[chunk_number].isComplete()) {
+ if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) {
+ chunks_[chunk_number].setPreviousSentTime(
+ last_processed_chunk_.getSentTime());
+
+ chunks_[chunk_number].setPreviousReceivedTime(
+ last_processed_chunk_.getReceivedTime());
+ cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(),
+ chunks_[chunk_number].getSentDelta(),
+ chunks_[chunk_number].getSentTime(),
+ chunks_[chunk_number].getReceivedTime(),
+ chunks_[chunk_number].getFrameSize(), true);
+
+ } else {
+ TRANSPORT_LOGD(
+ "CongestionDetection::updateStats frame %u but not the \
+ previous one, last one was %u currentFrame %u",
+ chunk_number, last_processed_chunk_.getFrameSeqNum(),
+ chunks_[chunk_number].getFrameSeqNum());
+ }
+
+ last_processed_chunk_ = chunks_[chunk_number];
+
+ chunks_.erase(chunk_number);
+
+ chunks_number_.pop();
+ if (chunks_number_.empty()) break;
+
+ chunk_number = chunks_number_.front();
+ }
+}
+
+void CongestionDetection::addPacket(const core::ContentObject &content_object) {
+ auto payload = content_object.getPayload();
+ uint32_t payload_size = (uint32_t)payload->length();
+ uint32_t segmentNumber = content_object.getName().getSuffix();
+ // uint32_t pkt = segmentNumber & modMask_;
+ uint64_t *sentTimePtr = (uint64_t *)payload->data();
+
+ // this is just for testing with hiperf, assuming a frame is 10 pkts
+ // in the final version, the split should be based on the timestamp in the pkt
+ uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE);
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_.find(frameNum) == chunks_.end()) {
+ // new chunk of pkts or out of order
+ if (last_processed_chunk_.getFrameSeqNum() > frameNum)
+ return; // out of order and we already processed the chunk
+
+ chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE);
+ chunks_number_.push(frameNum);
+ }
+
+ chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size);
+}
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h
new file mode 100644
index 000000000..17f4aa54c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.h
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/content_object.h>
+#include <protocols/rtc/trendline_estimator.h>
+
+#include <map>
+#include <queue>
+
+#define HICN_CC_STATS_CHUNK_SIZE 10
+#define HICN_CC_STATS_MAX_DELAY_MS 100
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class FrameStats {
+ public:
+ FrameStats()
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(HICN_CC_STATS_CHUNK_SIZE){};
+
+ FrameStats(uint32_t burst_size)
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time,
+ uint32_t size, FrameStats previousFrame, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(sent_time),
+ received_time_(received_time),
+ previous_sent_time_(previousFrame.getSentTime()),
+ previous_received_time_(previousFrame.getReceivedTime()),
+ size_(size),
+ received_pkt_m(1),
+ burst_size_m(burst_size){};
+
+ void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) {
+ size_ += size;
+ sent_time_ =
+ (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time);
+ received_time_ = std::max(received_time, received_time_);
+ received_pkt_m++;
+ }
+
+ bool isComplete() { return received_pkt_m == burst_size_m; }
+
+ uint32_t getFrameSeqNum() const { return frame_num_; }
+ uint64_t getSentTime() const { return sent_time_; }
+ uint64_t getReceivedTime() const { return received_time_; }
+ uint32_t getFrameSize() const { return size_; }
+
+ void setPreviousReceivedTime(uint64_t time) {
+ previous_received_time_ = time;
+ }
+ void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; }
+
+ // todo manage first frame
+ double getReceivedDelta() {
+ return static_cast<double>(received_time_ - previous_received_time_);
+ }
+ double getSentDelta() {
+ return static_cast<double>(sent_time_ - previous_sent_time_);
+ }
+
+ private:
+ uint32_t frame_num_;
+ uint64_t sent_time_;
+ uint64_t received_time_;
+
+ uint64_t previous_sent_time_;
+ uint64_t previous_received_time_;
+ uint32_t size_;
+
+ uint32_t received_pkt_m;
+ uint32_t burst_size_m;
+};
+
+class CongestionDetection {
+ public:
+ CongestionDetection();
+ ~CongestionDetection();
+
+ void addPacket(const core::ContentObject &content_object);
+
+ BandwidthUsage getState() { return cc_estimator_.State(); }
+
+ void updateStats();
+
+ private:
+ TrendlineEstimator cc_estimator_;
+ std::map<uint32_t, FrameStats> chunks_;
+ std::queue<uint32_t> chunks_number_;
+
+ FrameStats last_processed_chunk_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
new file mode 100644
index 000000000..efba362d4
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service)
+ : probe_interval_(0),
+ max_probes_(0),
+ sent_probes_(0),
+ probe_timer_(std::make_unique<asio::steady_timer>(io_service)),
+ rand_eng_((std::random_device())()),
+ distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ),
+ send_probe_callback_(std::move(send_callback)) {}
+
+ProbeHandler::~ProbeHandler() {}
+
+uint64_t ProbeHandler::getRtt(uint32_t seq) {
+ auto it = pending_probes_.find(seq);
+
+ if (it == pending_probes_.end()) return 0;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t rtt = now - it->second;
+ if(rtt < 1) rtt = 1;
+
+ pending_probes_.erase(it);
+
+ return rtt;
+}
+
+void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) {
+ stopProbes();
+ probe_interval_ = probe_interval;
+ max_probes_ = max_probes;
+}
+
+void ProbeHandler::stopProbes() {
+ probe_interval_ = 0;
+ max_probes_ = 0;
+ sent_probes_ = 0;
+ probe_timer_->cancel();
+}
+
+void ProbeHandler::sendProbes() {
+ if (probe_interval_ == 0) return;
+ if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint32_t seq = distr_(rand_eng_);
+ pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+ send_probe_callback_(seq);
+ sent_probes_++;
+
+ // clean up
+ // a probe may get lost. if the pending_probes_ size becomes bigger than
+ // MAX_PENDING_PROBES remove all the probes older than a seconds
+ if (pending_probes_.size() > MAX_PENDING_PROBES) {
+ for (auto it = pending_probes_.begin(); it != pending_probes_.end();) {
+ if ((now - it->second) > 1000)
+ it = pending_probes_.erase(it);
+ else
+ it++;
+ }
+ }
+
+ if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sendProbes();
+ }
+ });
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
new file mode 100644
index 000000000..b8ed84445
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/config.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <random>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
+ public:
+ using SendProbeCallback = std::function<void(uint32_t)>;
+
+ public:
+ ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service);
+
+ ~ProbeHandler();
+
+ // if the function returns 0 the probe is not valaid
+ uint64_t getRtt(uint32_t seq);
+
+ // reset the probes parameters. it stop the current probing.
+ // to restar call sendProbes.
+ // probe_interval = 0 means that no event will be scheduled
+ // max_probe = 0 means no limit to the number of probe to send
+ void setProbes(uint32_t probe_interval, uint32_t max_probes);
+
+ // stop to schedule probes
+ void stopProbes();
+
+ void sendProbes();
+
+ private:
+ uint32_t probe_interval_; // us
+ uint32_t max_probes_; // packets
+ uint32_t sent_probes_; // packets
+
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+
+ // map from seqnumber to timestamp
+ std::unordered_map<uint32_t, uint64_t> pending_probes_;
+
+ // random generator
+ std::default_random_engine rand_eng_;
+ std::uniform_int_distribution<uint32_t> distr_;
+
+ SendProbeCallback send_probe_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
new file mode 100644
index 000000000..bb95ab686
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -0,0 +1,607 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <math.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
+ number_(0) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ scheduler_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {}
+
+void RTCTransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ newRound();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+// private
+void RTCTransportProtocol::initParams() {
+ portal_->setConsumerCallback(this);
+
+ rc_ = std::make_shared<RTCRateControlQueue>();
+ ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ std::bind(&RTCTransportProtocol::sendRtxInterest, this,
+ std::placeholders::_1),
+ portal_->getIoService());
+
+ state_ = std::make_shared<RTCState>(
+ std::bind(&RTCTransportProtocol::sendProbeInterest, this,
+ std::placeholders::_1),
+ std::bind(&RTCTransportProtocol::discoveredRtt, this),
+ portal_->getIoService());
+
+ rc_->setState(state_);
+ // TODO: for the moment we keep the congestion control disabled
+ // rc_->tunrOnRateControl();
+ ldr_->setState(state_);
+
+ // protocol state
+ start_send_interest_ = false;
+ current_state_ = SyncState::catch_up;
+
+ // Cancel timer
+ number_++;
+ round_timer_->cancel();
+ scheduler_timer_->cancel();
+ scheduler_timer_on_ = false;
+
+ // delete all timeouts and future nacks
+ timeouts_or_nacks_.clear();
+
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ // names/packets var
+ next_segment_ = 0;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ RTC_INTEREST_LIFETIME);
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ TRANSPORT_LOGD("reset called");
+ initParams();
+ newRound();
+}
+
+void RTCTransportProtocol::inactiveProducer() {
+ // when the producer is inactive we reset the consumer state
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
+ max_sync_win_);
+
+ // names/packets var
+ next_segment_ = 0;
+
+ ldr_->clear();
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
+ // TODO pass weak_ptr here
+ round_timer_->async_wait([this, n{number_}](std::error_code ec) {
+ if (ec) return;
+
+ if (n != number_) {
+ return;
+ }
+
+ // saving counters that will be reset on new round
+ uint32_t sent_retx = state_->getSentRtxInRound();
+ uint32_t received_bytes = state_->getReceivedBytesInRound();
+ uint32_t sent_interest = state_->getSentInterestInRound();
+ uint32_t lost_data = state_->getLostData();
+ uint32_t recovered_losses = state_->getRecoveredLosses();
+ uint32_t received_nacks = state_->getReceivedNacksInRound();
+
+ bool in_sync = (current_state_ == SyncState::in_sync);
+ state_->onNewRound((double)ROUND_LEN, in_sync);
+ rc_->onNewRound((double)ROUND_LEN);
+
+ // update sync state if needed
+ if (current_state_ == SyncState::in_sync) {
+ double cache_rate = state_->getPacketFromCacheRatio();
+ if (cache_rate > MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::catch_up;
+ }
+ } else {
+ double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate = state_->getReceivedRate();
+ uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
+ double cache_ratio = state_->getPacketFromCacheRatio();
+ if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
+ received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::in_sync;
+ }
+ }
+
+ TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ updateSyncWindow();
+
+ sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ recovered_losses, received_nacks);
+ newRound();
+ });
+}
+
+void RTCTransportProtocol::discoveredRtt() {
+ start_send_interest_ = true;
+ ldr_->turnOnRTX();
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::computeMaxSyncWindow() {
+ double production_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ if (production_rate == 0.0 || packet_size == 0.0) {
+ // the consumer has no info about the producer,
+ // keep the previous maxCWin
+ TRANSPORT_LOGD(
+ "Returning in computeMaxSyncWindow because: prod_rate: %d || "
+ "packet_size: %d",
+ (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ return;
+ }
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
+
+
+ max_sync_win_ =
+ (uint32_t)ceil((production_rate * lifetime_ms *
+ INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+}
+
+void RTCTransportProtocol::updateSyncWindow() {
+ computeMaxSyncWindow();
+
+ if (max_sync_win_ == INITIAL_WIN_MAX) {
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return;
+
+ current_sync_win_ = INITIAL_WIN;
+ scheduleNextInterests();
+ return;
+ }
+
+ double prod_rate = state_->getProducerRate();
+ double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = state_->getAveragePacketSize();
+
+ // if some of the info are not available do not update the current win
+ if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ current_sync_win_ +=
+ ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+
+ if(current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
+ }
+
+ current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ }
+
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::decreaseSyncWindow() {
+ // called on future nack
+ // we have a new sample of the production rate, so update max win first
+ computeMaxSyncWindow();
+ current_sync_win_--;
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::sendInterest(Name *interest_name) {
+ TRANSPORT_LOGD("Sending interest for name %s",
+ interest_name->toString().c_str());
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(*interest_name);
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ interest->setLifetime(uint32_t(lifetime));
+
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send rtx %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send probe %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ TRANSPORT_LOGD("Schedule next interests");
+
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return; // RTT discovering phase is not finished so
+ // do not start to send interests
+
+ if (scheduler_timer_on_) return; // wait befor send other interests
+
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ TRANSPORT_LOGD("Inactive producer.");
+ // here we keep seding the same interest until the producer
+ // does not start again
+ if (next_segment_ != 0) {
+ // the producer just become inactive, reset the state
+ inactiveProducer();
+ }
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ interest_name->setSuffix(next_segment_);
+
+ if (portal_->interestIsPending(*interest_name)) {
+ // if interest 0 is already pending we return
+ return;
+ }
+
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+ return;
+ }
+
+ TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
+ state_->getPendingInterestNumber(), current_sync_win_);
+
+ // skip nacked pacekts
+ if (next_segment_ <= state_->getLastSeqNacked()) {
+ next_segment_ = state_->getLastSeqNacked() + 1;
+ }
+
+ // skipe received packets
+ if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
+ next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ uint32_t sent_interests = 0;
+ while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ (sent_interests < MAX_INTERESTS_IN_BATCH)) {
+ TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ interest_name->setSuffix(next_segment_);
+
+ // send the packet only if:
+ // 1) it is not pending yet (not true for rtx)
+ // 2) the packet is not received or lost
+ // 3) is not in the rtx list
+ if (portal_->interestIsPending(*interest_name) ||
+ state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_segment_)) {
+ TRANSPORT_LOGD(
+ "skip interest %u because: pending %u, recv %u, rtx %u",
+ next_segment_, (portal_->interestIsPending(*interest_name)),
+ (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
+ (ldr_->isRtx(next_segment_)));
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ continue;
+ }
+
+
+ sent_interests++;
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ }
+
+ if (state_->getPendingInterestNumber() < current_sync_win_) {
+ // we still have space in the window but we already sent a batch of
+ // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
+ // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(
+ std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ }
+}
+
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ uint32_t segment_number = interest->getName().getSuffix();
+
+ TRANSPORT_LOGD("timeout for packet %u", segment_number);
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ timeouts_or_nacks_.insert(segment_number);
+
+ if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
+ segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ // we retransmit packets only if the producer is active, otherwise we
+ // use timeouts to avoid to send too much traffic
+ //
+ // a timeout is sent using RTX only if it is an old packet. if it is for a
+ // seq number that we didn't reach yet, we send the packet using the normal
+ // schedule next interest
+ TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
+ ldr_->onTimeout(segment_number);
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+ return;
+ }
+
+ TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
+ segment_number);
+
+ if (segment_number < next_segment_) {
+ // this is a timeout for a packet that will be generated in the future but
+ // we are asking for higher sequence numbers. we need to go back like in the
+ // case of future nacks
+ TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
+ next_segment_, segment_number);
+ next_segment_ = segment_number;
+ }
+
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ struct nack_packet_t *nack =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = nack->getProductionSegement();
+ uint32_t nack_segment = content_object.getName().getSuffix();
+ bool is_rtx = ldr_->isRtx(nack_segment);
+
+ // check if the packet got a timeout
+
+ TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
+ production_seg);
+
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(nack_segment);
+ if (tn_it != timeouts_or_nacks_.end() || is_rtx) {
+ compute_stats = false;
+ // remove packets from timeouts_or_nacks only in case of a past nack
+ }
+
+ state_->onNackPacketReceived(content_object, compute_stats);
+ ldr_->onNackPacketReceived(content_object);
+
+ // both in case of past and future nack we set next_segment_ equal to the
+ // production segment in the nack. In case of past nack we will skip unneded
+ // interest (this is already done in the scheduleNextInterest in any case)
+ // while in case of future nacks we can go back in time and ask again for the
+ // content that generated the nack
+ TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ if (production_seg > nack_segment) {
+ // remove the nack is it exists
+ if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+
+ // the client is asking for content in the past
+ // switch to catch up state and increase the window
+ // this is true only if the packet is not an RTX
+ if (!is_rtx) current_state_ = SyncState::catch_up;
+
+ updateSyncWindow();
+ } else {
+ // if production_seg == nack_segment we consider this a future nack, since
+ // production_seg is not yet created. this may happen in case of low
+ // production rate (e.g. ping at 1pps)
+
+ // if a future nack was also retransmitted add it to the timeout_or_nacks
+ // set
+ if (is_rtx) timeouts_or_nacks_.insert(nack_segment);
+
+ // the client is asking for content in the future
+ // switch to in sync state and decrease the window
+ current_state_ = SyncState::in_sync;
+ decreaseSyncWindow();
+ }
+}
+
+void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
+ bool valid = state_->onProbePacketReceived(content_object);
+ if(!valid) return;
+
+ struct nack_packet_t *probe =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = probe->getProductionSegement();
+
+ // as for the nacks set next_segment_
+ TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ ldr_->onProbePacketReceived(content_object);
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("Received content object of size: %zu",
+ content_object.payloadSize());
+ uint32_t payload_size = content_object.payloadSize();
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onProbe(content_object);
+ return;
+ }
+
+ if (payload_size == NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onNack(content_object);
+ return;
+ }
+
+ TRANSPORT_LOGD("Received content %u", segment_number);
+
+ rc_->onDataPacketReceived(content_object);
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(segment_number);
+ if (tn_it != timeouts_or_nacks_.end()) {
+ compute_stats = false;
+ timeouts_or_nacks_.erase(tn_it);
+ }
+ if (ldr_->isRtx(segment_number)) {
+ compute_stats = false;
+ }
+
+ // check if the packet was already received
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ state_->onDataPacketReceived(content_object, compute_stats);
+ ldr_->onDataPacketReceived(content_object);
+
+ // if the stat for this seq number is received do not send the packet to app
+ if (state != PacketState::RECEIVED) {
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ reassemble(content_object);
+ } else {
+ TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ }
+
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::sendStatsToApp(
+ uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
+ uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ if (*stats_summary_) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(state_->getQueuing());
+
+ // stats_->updateInterestFecTx(0); //todo must be implemented
+ // stats_->updateBytesFecRecv(0); //todo must be implemented
+
+ stats_->updateRetxCount(retx_count);
+ stats_->updateBytesRecv(received_bytes);
+ stats_->updateInterestTx(sent_interests);
+ stats_->updateReceivedNacks(received_nacks);
+
+ stats_->updateAverageWindowSize(current_sync_win_);
+ stats_->updateLossRatio(state_->getLossRate());
+ stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateLostData(lost_data);
+ stats_->updateRecoveredData(recovered_losses);
+ stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ }
+}
+
+void RTCTransportProtocol::reassemble(ContentObject &content_object) {
+ auto read_buffer = content_object.getPayload();
+ TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
+ read_buffer->trimStart(DATA_HEADER_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
new file mode 100644
index 000000000..596887067
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_state.h>
+#include <protocols/transport_protocol.h>
+
+#include <unordered_set>
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
+ public:
+ RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ using TransportProtocol::start;
+
+ using TransportProtocol::stop;
+
+ void resume() override;
+
+ private:
+ enum class SyncState { catch_up = 0, in_sync = 1, last };
+
+ private:
+ // setup functions
+ void initParams();
+ void reset() override;
+
+ void inactiveProducer();
+
+ // protocol functions
+ void discoveredRtt();
+ void newRound();
+
+ // window functions
+ void computeMaxSyncWindow();
+ void updateSyncWindow();
+ void decreaseSyncWindow();
+
+ // packet functions
+ void sendInterest(Name *interest_name);
+ void sendRtxInterest(uint32_t seq);
+ void sendProbeInterest(uint32_t seq);
+ void scheduleNextInterests() override;
+ void onTimeout(Interest::Ptr &&interest) override;
+ void onNack(const ContentObject &content_object);
+ void onProbe(const ContentObject &content_object);
+ void reassemble(ContentObject &content_object) override;
+ void onContentObject(Interest &interest,
+ ContentObject &content_object) override;
+ void onPacketDropped(Interest &interest,
+ ContentObject &content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
+
+ // interaction with app functions
+ void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
+ uint32_t sent_interests, uint32_t lost_data,
+ uint32_t recovered_losses, uint32_t received_nacks);
+ // protocol state
+ bool start_send_interest_;
+ SyncState current_state_;
+ // cwin vars
+ uint32_t current_sync_win_;
+ uint32_t max_sync_win_;
+
+ // controller var
+ std::unique_ptr<asio::steady_timer> round_timer_;
+ std::unique_ptr<asio::steady_timer> scheduler_timer_;
+ bool scheduler_timer_on_;
+
+ // timeouts
+ std::unordered_set<uint32_t> timeouts_or_nacks_;
+
+ // names/packets var
+ uint32_t next_segment_;
+
+ std::shared_ptr<RTCState> state_;
+ std::shared_ptr<RTCRateControl> rc_;
+ std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
+
+ uint32_t number_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
new file mode 100644
index 000000000..0cf9516ab
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/rtc/rtc_packet.h>
+#include <stdint.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// used in rtc
+// protocol consts
+const uint32_t ROUND_LEN = 200;
+// ms interval of time on which
+// we take decisions / measurements
+const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
+// how big (in ms) should be the buffer at the producer.
+// increasing this number we increase the time that an
+// interest will wait for the data packet to be produced
+// at the producer socket
+const uint32_t PRODUCER_BUFFER_MS = 200; // ms
+
+// interest scheduler
+const uint32_t MAX_INTERESTS_IN_BATCH = 5;
+const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+
+// packet const
+const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes
+const uint32_t RTC_INTEREST_LIFETIME = 1000;
+
+// probes sequence range
+const uint32_t MIN_PROBE_SEQ = 0xefffffff;
+const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
+// RTT_PROBE_INTERVAL will be used during the section while
+// INIT_RTT_PROBE_INTERVAL is used at the beginning to
+// quickily estimate the RTT
+const uint32_t RTT_PROBE_INTERVAL = 200000; // us
+const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
+const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
+// if the produdcer is not yet started we need to probe multple times
+// to get an answer. we wait 100ms between each try
+const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
+// once we get the first probe we wait at most 60ms for the others
+const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+// we reuires at least 5 probes to be recevied
+const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms
+const uint32_t MAX_PENDING_PROBES = 10;
+
+
+// congestion
+const double MAX_QUEUING_DELAY = 100.0; // ms
+
+// data from cache
+const double MAX_DATA_FROM_CACHE = 0.25; // 25%
+
+// window const
+const uint32_t INITIAL_WIN = 5; // pkts
+const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
+const uint32_t WIN_MIN = 5; // pkts
+const double CATCH_UP_WIN_INCREMENT = 1.2;
+// used in rate control
+const double WIN_DECREASE_FACTOR = 0.5;
+const double WIN_INCREASE_FACTOR = 1.5;
+
+// round in congestion
+const double ROUNDS_BEFORE_TAKE_ACTION = 5;
+
+// used in state
+const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3;
+const double PRODUCTION_RATE_FRACTION = 0.8;
+
+const uint32_t INIT_PACKET_SIZE = 1200;
+
+const double MOVING_AVG_ALPHA = 0.8;
+
+const double MILLI_IN_A_SEC = 1000.0;
+const double MICRO_IN_A_SEC = 1000000.0;
+
+const double MAX_CACHED_PACKETS = 262144; // 2^18
+ // about 50 sec of traffic at 50Mbps
+ // with 1200 bytes packets
+
+const uint32_t MAX_ROUND_WHIOUT_PACKETS =
+ (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
+
+// used in ldr
+const uint32_t RTC_MAX_RTX = 100;
+const uint32_t RTC_MAX_AGE = 60000; // in ms
+const uint64_t MAX_TIMER_RTX = ~0;
+const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
+const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
+const double CATCH_UP_RTT_INCREMENT = 1.2;
+
+// used by producer
+const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
+const uint32_t MIN_PRODUCTION_RATE = 10; // pps
+ // min prod rate
+ // set running several test
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
new file mode 100644
index 000000000..c098088a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_data_path.h>
+#include <stdlib.h>
+
+#include <algorithm>
+#include <cfloat>
+#include <chrono>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCDataPath::RTCDataPath(uint32_t path_id)
+ : path_id_(path_id),
+ min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(DBL_MAX),
+ queuing_delay(DBL_MAX),
+ jitter_(0.0),
+ last_owd_(0),
+ largest_recv_seq_(0),
+ largest_recv_seq_time_(0),
+ avg_inter_arrival_(DBL_MAX),
+ received_nacks_(false),
+ received_packets_(false),
+ rounds_without_packets_(0),
+ last_received_data_packet_(0),
+ RTT_history_(HISTORY_LEN),
+ OWD_history_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(uint64_t rtt) {
+ // for the rtt we only keep track of the min one
+ if (rtt < min_rtt) min_rtt = rtt;
+ last_received_data_packet_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ if (avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ int64_t queueVal = owd - std::min(getMinOwd(), min_owd);
+
+ if (queuing_delay != DBL_MAX)
+ queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC);
+ else {
+ queuing_delay = queueVal;
+ }
+
+ // keep track of the jitter computed as for RTP (RFC 3550)
+ int64_t diff = std::abs(owd - last_owd_);
+ last_owd_ = owd;
+ jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
+
+ // owd is computed only for valid data packets so we count only
+ // this for decide if we recevie traffic or not
+ received_packets_ = true;
+}
+
+void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
+ // got packet in sequence, compute gap
+ if (largest_recv_seq_ == (segment_number - 1)) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t delta = now - largest_recv_seq_time_;
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ = now;
+ if (avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ =
+ (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
+ return;
+ }
+
+ // ooo packet, update the stasts if needed
+ if (largest_recv_seq_ <= segment_number) {
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+}
+
+void RTCDataPath::receivedNack() { received_nacks_ = true; }
+
+double RTCDataPath::getInterArrivalGap() {
+ if (avg_inter_arrival_ == DBL_MAX) return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isActive() {
+ if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
+
+bool RTCDataPath::pathToProducer() {
+ if (received_nacks_) return true;
+ return false;
+}
+
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ if (min_rtt == 0) min_rtt = 1;
+
+ RTT_history_.pushBack(min_rtt);
+ min_rtt = UINT_MAX;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ if (min_owd != INT_MAX) {
+ OWD_history_.pushBack(min_owd);
+ min_owd = INT_MAX;
+ }
+
+ if (!received_packets_)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = false;
+}
+
+uint32_t RTCDataPath::getPathId() { return path_id_; }
+
+double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+
+uint64_t RTCDataPath::getMinRtt() {
+ if (RTT_history_.size() != 0) return RTT_history_.begin();
+ return 0;
+}
+
+int64_t RTCDataPath::getMinOwd() {
+ if (OWD_history_.size() != 0) return OWD_history_.begin();
+ return 0;
+}
+
+double RTCDataPath::getJitter() { return jitter_; }
+
+uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; }
+
+void RTCDataPath::clearRtt() { RTT_history_.clear(); }
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
new file mode 100644
index 000000000..c5c37fc0d
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <utils/min_filter.h>
+
+#include <climits>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const double ALPHA_RTC = 0.125;
+const uint32_t HISTORY_LEN = 20; // 4 sec
+
+class RTCDataPath {
+ public:
+ RTCDataPath(uint32_t path_id);
+
+ public:
+ void insertRttSample(uint64_t rtt);
+ void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segment_number);
+ void receivedNack();
+
+ uint32_t getPathId();
+ uint64_t getMinRtt();
+ double getQueuingDealy();
+ double getInterArrivalGap();
+ double getJitter();
+ bool isActive();
+ bool pathToProducer();
+ uint64_t getLastPacketTS();
+
+ void clearRtt();
+
+ void roundEnd();
+
+ private:
+ uint32_t path_id_;
+
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ double jitter_;
+ int64_t last_owd_;
+
+ uint32_t largest_recv_seq_;
+ uint64_t largest_recv_seq_time_;
+ double avg_inter_arrival_;
+
+ // flags to check if a path is active
+ // we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ // and if we receives packets
+ bool received_nacks_;
+ bool received_packets_;
+ uint8_t rounds_without_packets_; // if we don't get any packet
+ // for MAX_ROUNDS_WITHOUT_PKTS
+ // we consider the path inactive
+ uint64_t last_received_data_packet_; // timestamp for the last data received
+ // on this path
+
+ utils::MinFilter<uint64_t> RTT_history_;
+ utils::MinFilter<int64_t> OWD_history_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
new file mode 100644
index 000000000..e91b29c04
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_ldr.h>
+
+#include <algorithm>
+#include <unordered_set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
+ SendRtxCallback &&callback, asio::io_service &io_service)
+ : rtx_on_(false),
+ next_rtx_timer_(MAX_TIMER_RTX),
+ last_event_(0),
+ sentinel_timer_interval_(MAX_TIMER_RTX),
+ send_rtx_callback_(std::move(callback)) {
+ timer_ = std::make_unique<asio::steady_timer>(io_service);
+ sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
+
+void RTCLossDetectionAndRecovery::turnOnRTX() {
+ rtx_on_ = true;
+ scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
+}
+
+void RTCLossDetectionAndRecovery::turnOffRTX() {
+ rtx_on_ = false;
+ clear();
+}
+
+void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
+ // always add timeouts to the RTX list to avoid to send the same packet as if
+ // it was not a rtx
+ addToRetransmissions(seq, seq + 1);
+ last_event_ = getNow();
+}
+
+void RTCLossDetectionAndRecovery::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ last_event_ = getNow();
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (deleteRtx(seq)) {
+ state_->onPacketRecovered(seq);
+ } else {
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ TRANSPORT_LOGD("received data. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onNackPacketReceived(
+ const core::ContentObject &nack) {
+ last_event_ = getNow();
+
+ uint32_t seq = nack.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+
+ if (production_seq > seq) {
+ // this is a past nack, all data before productionSeq are lost. if
+ // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to
+ // recover any packet. If this is not the case we can try to recover the
+ // packets between state_->getHighestSeqReceivedInOrder() and productionSeq.
+ // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with
+ // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
+ // 14 that are not arrived yet
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received past nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ } else {
+ // future nack. here there should be a gap between the last data received
+ // and this packet and is it possible to recover the packets between the
+ // last received data and the production seq. we should not use the seq
+ // number of the nack since we know that is too early to ask for this seq
+ // number
+ // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
+ // with productionSeq = 18. this says that all the packets between 12 and 18
+ // may got lost and we should ask them
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received futrue nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onProbePacketReceived(
+ const core::ContentObject &probe) {
+ // we don't log the reception of a probe packet for the sentinel timer because
+ // probes are not taken into account into the sync window. we use them as
+ // future nacks to detect possible packets lost
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ TRANSPORT_LOGD("received probe. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+}
+
+void RTCLossDetectionAndRecovery::clear() {
+ rtx_state_.clear();
+ rtx_timers_.clear();
+ sentinel_timer_->cancel();
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ timer_->cancel();
+ }
+}
+
+void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
+ uint32_t stop) {
+ // skip nacked packets
+ if (start <= state_->getLastSeqNacked()) {
+ start = state_->getLastSeqNacked() + 1;
+ }
+
+ // skip received or lost packets
+ if (start <= state_->getHighestSeqReceivedInOrder()) {
+ start = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ for (uint32_t seq = start; seq < stop; seq++) {
+ if (!isRtx(seq) && // is not already an rtx
+ // is not received or lost
+ state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
+ // add rtx
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq,
+ (state.next_send_ - getNow()));
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ }
+ }
+ scheduleNextRtx();
+}
+
+uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
+ bool new_rtx) {
+ uint64_t now = getNow();
+ if (new_rtx) {
+ // for the new rtx we wait one estimated IAT after the loss detection. this
+ // is bacause, assuming that packets arrive with a constant IAT, we should
+ // get a new packet every IAT
+ double prod_rate = state_->getProducerRate();
+ uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
+ uint32_t jitter = 0;
+
+ if (prod_rate != 0) {
+ double packet_size = state_->getAveragePacketSize();
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
+ }
+
+ uint32_t wait = estimated_iat + jitter;
+ TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter);
+
+ return now + wait;
+ } else {
+ // wait one RTT
+ // however if the IAT is larger than the RTT, wait one IAT
+ uint32_t wait = SENTINEL_TIMER_INTERVAL;
+
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate == 0) {
+ return now + SENTINEL_TIMER_INTERVAL;
+ }
+
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+
+ uint64_t rtt = state_->getRTT();
+ if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
+ wait = rtt;
+
+ if (estimated_iat > rtt) wait = estimated_iat;
+
+ uint32_t jitter = ceil(state_->getJitter());
+ wait += jitter;
+
+ // it may happen that the channel is congested and we have some additional
+ // queuing delay to take into account
+ uint32_t queue = ceil(state_->getQueuing());
+ wait += queue;
+
+ TRANSPORT_LOGD(
+ "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter, queue);
+
+ return now + wait;
+ }
+}
+
+void RTCLossDetectionAndRecovery::retransmit() {
+ if (rtx_timers_.size() == 0) return;
+
+ uint64_t now = getNow();
+
+ auto it = rtx_timers_.begin();
+ std::unordered_set<uint32_t> lost_pkt;
+ uint32_t sent_counter = 0;
+ while (it != rtx_timers_.end() && it->first <= now &&
+ sent_counter < MAX_INTERESTS_IN_BATCH) {
+ uint32_t seq = it->second;
+ auto rtx_it =
+ rtx_state_.find(seq); // this should always return a valid iter
+ if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
+ (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
+ seq < state_->getLastSeqNacked()) {
+ // max rtx reached or packet too old or packet nacked, this packet is lost
+ TRANSPORT_LOGD(
+ "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u",
+ seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX),
+ ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE),
+ (seq < state_->getLastSeqNacked()));
+ lost_pkt.insert(seq);
+ it++;
+ } else {
+ // resend the packet
+ state_->onRetransmission(seq);
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) rtx_it->second.rtx_count_++;
+ rtx_it->second.next_send_ = computeNextSend(seq, false);
+ it = rtx_timers_.erase(it);
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
+ TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq,
+ (rtx_it->second.next_send_ - now));
+ send_rtx_callback_(seq);
+ sent_counter++;
+ }
+ }
+
+ // remove packets if needed
+ for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
+ uint32_t seq = *lost_it;
+ state_->onPacketLost(seq);
+ deleteRtx(seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::scheduleNextRtx() {
+ if (rtx_timers_.size() == 0) {
+ // all the rtx were removed, reset timer
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ return;
+ }
+
+ // check if timer is alreay set
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ // a new check for rtx is already scheduled
+ if (next_rtx_timer_ > rtx_timers_.begin()->first) {
+ // we need to re-schedule it
+ timer_->cancel();
+ } else {
+ // wait for the next timer
+ return;
+ }
+ }
+
+ // set a new timer
+ next_rtx_timer_ = rtx_timers_.begin()->first;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t wait = 1;
+ if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
+ wait = next_rtx_timer_ - now;
+
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ timer_->expires_from_now(std::chrono::milliseconds(wait));
+ timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->retransmit();
+ s->next_rtx_timer_ = MAX_TIMER_RTX;
+ s->scheduleNextRtx();
+ }
+ });
+}
+
+bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) {
+ auto it_rtx = rtx_state_.find(seq);
+ if (it_rtx == rtx_state_.end()) return false; // rtx not found
+
+ uint64_t ts = it_rtx->second.next_send_;
+ auto it_timers = rtx_timers_.find(ts);
+ while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
+ if (it_timers->second == seq) {
+ rtx_timers_.erase(it_timers);
+ break;
+ }
+ it_timers++;
+ }
+
+ bool lost = it_rtx->second.rtx_count_ > 0;
+ rtx_state_.erase(it_rtx);
+
+ return lost;
+}
+
+void RTCLossDetectionAndRecovery::scheduleSentinelTimer(
+ uint64_t expires_from_now) {
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ sentinel_timer_->expires_from_now(
+ std::chrono::milliseconds(expires_from_now));
+ sentinel_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sentinelTimer();
+ }
+ });
+}
+
+void RTCLossDetectionAndRecovery::sentinelTimer() {
+ uint64_t now = getNow();
+
+ bool expired = false;
+ bool sent = false;
+ if ((now - last_event_) >= sentinel_timer_interval_) {
+ // at least a sentinel_timer_interval_ elapsed since last event
+ expired = true;
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ // this happens at the beginning (or if the producer stops for some
+ // reason) we need to keep sending interest 0 until we get an answer
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is not active, send packet 0");
+ state_->onRetransmission(0);
+ send_rtx_callback_(0);
+ } else {
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is active, send the 10 oldest packets");
+ sent = true;
+ uint32_t rtx = 0;
+ auto it = state_->getPendingInterestsMapBegin();
+ auto end = state_->getPendingInterestsMapEnd();
+ while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
+ uint32_t seq = it->first;
+ TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq);
+ addToRetransmissions(seq, seq + 1);
+ rtx++;
+ it++;
+ }
+ }
+ } else {
+ // sentinel timer did not expire because we registered at least one event
+ }
+
+ uint32_t next_timer;
+ double prod_rate = state_->getProducerRate();
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
+ TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL);
+ next_timer = SENTINEL_TIMER_INTERVAL;
+ } else {
+ double prod_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t jitter = ceil(state_->getJitter());
+
+ // try to reduce the number of timers if the estimated IAT is too small
+ next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
+ TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u",
+ next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat,
+ jitter);
+
+ if (!expired) {
+ // discount the amout of time that is already passed
+ uint32_t discount = now - last_event_;
+ if (next_timer > discount) {
+ next_timer = next_timer - discount;
+ } else {
+ // in this case we trigger the timer in 1 ms
+ next_timer = 1;
+ }
+ TRANSPORT_LOGD("timer after discout: %u", next_timer);
+ } else if (sent) {
+ // wait at least one producer stats interval + owd to check if the
+ // production rate is reducing.
+ uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
+ next_timer = std::max(next_timer, min_wait);
+ TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer);
+ }
+ }
+
+ scheduleSentinelTimer(next_timer);
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
new file mode 100644
index 000000000..c0912303b
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCLossDetectionAndRecovery
+ : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> {
+ struct rtx_state_ {
+ uint64_t first_send_;
+ uint64_t next_send_;
+ uint32_t rtx_count_;
+ };
+
+ using rtxState = struct rtx_state_;
+ using SendRtxCallback = std::function<void(uint32_t)>;
+
+ public:
+ RTCLossDetectionAndRecovery(SendRtxCallback &&callback,
+ asio::io_service &io_service);
+
+ ~RTCLossDetectionAndRecovery();
+
+ void setState(std::shared_ptr<RTCState> state) { state_ = state; }
+ void turnOnRTX();
+ void turnOffRTX();
+
+ void onTimeout(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+ void onNackPacketReceived(const core::ContentObject &nack);
+ void onProbePacketReceived(const core::ContentObject &probe);
+
+ void clear();
+
+ bool isRtx(uint32_t seq) {
+ if (rtx_state_.find(seq) != rtx_state_.end()) return true;
+ return false;
+ }
+
+ private:
+ void addToRetransmissions(uint32_t start, uint32_t stop);
+ uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ void retransmit();
+ void scheduleNextRtx();
+ bool deleteRtx(uint32_t seq);
+ void scheduleSentinelTimer(uint64_t expires_from_now);
+ void sentinelTimer();
+
+ uint64_t getNow() {
+ using namespace std::chrono;
+ uint64_t now =
+ duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
+ .count();
+ return now;
+ }
+
+ // this map keeps track of the retransmitted interest, ordered from the oldest
+ // to the newest one. the state contains the timer of the first send of the
+ // interest (from pendingIntetests_), the timer of the next send (key of the
+ // multimap) and the number of rtx
+ std::map<uint32_t, rtxState> rtx_state_;
+ // this map stored the rtx by timer. The key is the time at which the rtx
+ // should be sent, and the val is the interest seq number
+ std::multimap<uint64_t, uint32_t> rtx_timers_;
+
+ bool rtx_on_;
+ uint64_t next_rtx_timer_;
+ uint64_t last_event_;
+ uint64_t sentinel_timer_interval_;
+ std::unique_ptr<asio::steady_timer> timer_;
+ std::unique_ptr<asio::steady_timer> sentinel_timer_;
+ std::shared_ptr<RTCState> state_;
+
+ SendRtxCallback send_rtx_callback_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
new file mode 100644
index 000000000..abb1323a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+/* data packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | payload |
+ * | ... |
+ */
+
+/* nack packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | uint32_t: current seg in production |
+ * +-----------------------------------------+
+ */
+
+#pragma once
+#include <arpa/inet.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
+
+const uint32_t DATA_HEADER_SIZE = 12; // bytes
+ // XXX: sizeof(data_packet_t) is 16
+ // beacuse of padding
+const uint32_t NACK_HEADER_SIZE = 16;
+
+struct data_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+};
+
+struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h
new file mode 100644
index 000000000..34d090092
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> {
+ public:
+ RTCRateControl()
+ : rc_on_(false),
+ congestion_win_(1000000), // init the win to a large number
+ congestion_state_(CongestionState::Normal),
+ protocol_state_(nullptr) {}
+
+ virtual ~RTCRateControl() = default;
+
+ void turnOnRateControl() { rc_on_ = true; }
+ void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; };
+ uint32_t getCongesionWindow() { return congestion_win_; };
+
+ virtual void onNewRound(double round_len) = 0;
+ virtual void onDataPacketReceived(
+ const core::ContentObject &content_object) = 0;
+
+ protected:
+ enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last };
+
+ protected:
+ bool rc_on_;
+ uint32_t congestion_win_;
+ CongestionState congestion_state_;
+
+ std::shared_ptr<RTCState> protocol_state_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
new file mode 100644
index 000000000..b577b5bea
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_frame.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {}
+
+RTCRateControlFrame::~RTCRateControlFrame() {}
+
+void RTCRateControlFrame::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ CongestionState prev_congestion_state = congestion_state_;
+ cc_detector_.updateStats();
+ congestion_state_ = (CongestionState)cc_detector_.getState();
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // congestion detected, notify app and init congestion win
+ double prod_rate = protocol_state_->getReceivedRate();
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+
+ if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) {
+ // TODO do something
+ return;
+ }
+
+ congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ }
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ return;
+ }
+}
+
+void RTCRateControlFrame::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ if (!rc_on_) return;
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (!protocol_state_->isPending(seq)) return;
+
+ cc_detector_.addPacket(content_object);
+}
+
+void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts,
+ uint64_t last_probe_ts,
+ uint32_t total_probes) {
+ // TODO
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h
new file mode 100644
index 000000000..25d5ddbb6
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/congestion_detection.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlFrame : public RTCRateControl {
+ public:
+ RTCRateControlFrame();
+
+ ~RTCRateControlFrame();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts,
+ uint32_t total_probes);
+
+ private:
+ CongestionDetection cc_detector_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
new file mode 100644
index 000000000..a1c89e329
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlQueue::RTCRateControlQueue()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ last_queue_(0) {}
+
+RTCRateControlQueue::~RTCRateControlQueue() {}
+
+void RTCRateControlQueue::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate();
+ double target_rate =
+ protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ if (prev_congestion_state == CongestionState::Normal &&
+ received_rate >= target_rate) {
+ // if the queue is high in this case we are most likelly fighting with
+ // a TCP flow and there is enough bandwidth to match the producer rate
+ congestion_state_ = CongestionState::Normal;
+ } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ // here we detect congestion. in the case that last_queue == queue
+ // the consumer didn't receive any packet from the producer so we
+ // consider this case as congestion
+ // TODO: wath happen in case of high loss rate?
+ congestion_state_ = CongestionState::Congested;
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+}
+
+void RTCRateControlQueue::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h
new file mode 100644
index 000000000..407354d43
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlQueue : public RTCRateControl {
+ public:
+ RTCRateControlQueue();
+
+ ~RTCRateControlQueue();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
new file mode 100644
index 000000000..eabf8942c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -0,0 +1,560 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service)
+ : rtt_probes_(std::make_shared<ProbeHandler>(
+ std::move(rtt_probes_callback), io_service)),
+ discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
+ init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
+ initParams();
+}
+
+RTCState::~RTCState() {}
+
+void RTCState::initParams() {
+ // packets counters (total)
+ sent_interests_ = 0;
+ sent_rtx_ = 0;
+ received_data_ = 0;
+ received_nacks_ = 0;
+ received_timeouts_ = 0;
+ received_probes_ = 0;
+
+ // loss counters
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = 0;
+ highest_seq_received_ = 0;
+ highest_seq_received_in_order_ = 0;
+ last_seq_nacked_ = 0;
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ // bw counters
+ received_bytes_ = 0;
+ avg_packet_size_ = INIT_PACKET_SIZE;
+ production_rate_ = 0.0;
+ received_rate_ = 0.0;
+
+ // nack counter
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ // packets counter
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ data_from_cache_rate_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ // round conunters
+ rounds_ = 0;
+ rounds_without_nacks_ = 0;
+ rounds_without_packets_ = 0;
+
+ last_production_seq_ = 0;
+ producer_is_active_ = false;
+ last_prod_update_ = 0;
+
+ // paths stats
+ path_table_.clear();
+ main_path_ = nullptr;
+
+ // packet received
+ received_or_lost_packets_.clear();
+
+ // pending interests
+ pending_interests_.clear();
+
+ // init rtt
+ first_interest_sent_ = ~0;
+ init_rtt_ = false;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+}
+
+// packet events
+void RTCState::onSendNewInterest(const core::Name *interest_name) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint32_t seq = interest_name->getSuffix();
+ pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+
+ if(sent_interests_ == 0) first_interest_sent_ = now;
+
+ sent_interests_++;
+ sent_interests_last_round_++;
+}
+
+void RTCState::onTimeout(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ }
+ received_timeouts_++;
+}
+
+void RTCState::onRetransmission(uint32_t seq) {
+ // remove the interest for the pendingInterest map only after the first rtx.
+ // in this way we can handle the ooo packets that come in late as normla
+ // packet. we consider a packet lost only if we sent at least an RTX for it.
+ // XXX this may become problematic if we stop the RTX transmissions
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ packets_lost_++;
+ }
+ sent_rtx_++;
+ sent_rtx_last_round_++;
+}
+
+void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) {
+ uint32_t seq = content_object.getName().getSuffix();
+ if (compute_stats) {
+ updatePathStats(content_object, false);
+ received_data_last_round_++;
+ }
+ received_data_++;
+
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t production_time = data_pkt->getTimestamp();
+ if (last_prod_update_ < production_time) {
+ last_prod_update_ = production_time;
+ uint32_t production_rate = data_pkt->getProductionRate();
+ production_rate_ = (double)production_rate;
+ }
+
+ updatePacketSize(content_object);
+ updateReceivedBytes(content_object);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats) {
+ uint32_t seq = nack.getName().getSuffix();
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint64_t production_time = nack_pkt->getTimestamp();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_rate = nack_pkt->getProductionRate();
+
+ if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
+ last_prod_update_ < production_time) {
+ // update production rate
+ last_prod_update_ = production_time;
+ last_production_seq_ = production_seq;
+ production_rate_ = (double)production_rate;
+ }
+
+ if (compute_stats) {
+ // this is not an RTX
+ updatePathStats(nack, true);
+ nack_on_last_round_ = true;
+ }
+
+ // for statistics pourpose we log all nacks, also the one received for
+ // retransmitted packets
+ received_nacks_++;
+ received_nacks_last_round_++;
+
+ if (production_seq > seq) {
+ // old nack, seq is lost
+ // update last nacked
+ if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
+ TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq);
+ onPacketLost(seq);
+ } else if (seq > production_seq) {
+ // future nack
+ // remove the nack from the pending interest map
+ // (the packet is not received/lost yet)
+ pending_interests_.erase(seq);
+ } else {
+ // this should be a quite rear event. simply remove the
+ // packet from the pending interest list
+ pending_interests_.erase(seq);
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence number is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onPacketLost(uint32_t seq) {
+ TRANSPORT_LOGD("packet %u is lost", seq);
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ // this packet was never retransmitted so it does
+ // not appear in the loss count
+ packets_lost_++;
+ }
+ addRecvOrLost(seq, PacketState::LOST);
+}
+
+void RTCState::onPacketRecovered(uint32_t seq) {
+ losses_recovered_++;
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ uint64_t rtt;
+
+ rtt = rtt_probes_->getRtt(seq);
+
+ if (rtt == 0) return false; // this is not a valid probe
+
+ // like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to esimate
+ // info on the path
+ uint32_t path_label = probe.getPathLabel();
+
+ auto path_it = path_table_.find(path_label);
+
+ // update production rate and last_seq_nacked like in case of a nack
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint64_t sender_timestamp = probe_pkt->getTimestamp();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ uint32_t production_rate = probe_pkt->getProductionRate();
+
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ path->insertRttSample(rtt);
+ path->receivedNack();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ if (last_prod_update_ < sender_timestamp) {
+ last_production_seq_ = production_seq;
+ last_prod_update_ = sender_timestamp;
+ production_rate_ = (double)production_rate;
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence numner is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ // check for init RTT. if received_probes_ is equal to 0 schedule a timer to
+ // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
+ // wait forever
+ received_probes_++;
+
+ if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){
+ if(received_probes_ == 1){
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others
+ main_path_ = path;
+ setInitRttTimer(INIT_RTT_PROBE_WAIT);
+ }
+ if(received_probes_ == INIT_RTT_PROBES) {
+ // we are done
+ init_rtt_timer_->cancel();
+ checkInitRttTimer();
+ }
+ }
+
+ received_packets_last_round_++;
+
+ // ignore probes sent before the first interest
+ if((now - rtt) <= first_interest_sent_) return false;
+ return true;
+}
+
+void RTCState::onNewRound(double round_len, bool in_sync) {
+ // XXX
+ // here we take into account only the single path case so we assume that we
+ // don't use two paths in parellel for this single flow
+
+ if (path_table_.empty()) return;
+
+ double bytes_per_sec =
+ ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
+ if(received_rate_ == 0)
+ received_rate_ = bytes_per_sec;
+ else
+ received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+
+ // search for an active path. There should be only one active path (meaning a
+ // path that leads to the producer socket -no cache- and from which we are
+ // currently getting data packets) at any time. However it may happen that
+ // there are mulitple active paths in case of mobility (the old path will
+ // remain active for a short ammount of time). The main path is selected as
+ // the active path from where the consumer received the latest data packet
+
+ uint64_t last_packet_ts = 0;
+ main_path_ = nullptr;
+
+ for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->roundEnd();
+ if (it->second->isActive()) {
+ uint64_t ts = it->second->getLastPacketTS();
+ if (ts > last_packet_ts) {
+ last_packet_ts = ts;
+ main_path_ = it->second;
+ }
+ }
+ }
+
+ if (in_sync) updateLossRate();
+
+ // handle nacks
+ if (!nack_on_last_round_ && received_bytes_ > 0) {
+ rounds_without_nacks_++;
+ } else {
+ rounds_without_nacks_ = 0;
+ }
+
+ // check if the producer is active
+ if (received_packets_last_round_ != 0) {
+ rounds_without_packets_ = 0;
+ } else {
+ rounds_without_packets_++;
+ if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS &&
+ producer_is_active_ != false) {
+ initParams();
+ }
+ }
+
+ // compute cache/producer ratio
+ if (received_data_last_round_ != 0) {
+ double new_rate =
+ (double)received_data_from_cache_ / (double)received_data_last_round_;
+ data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
+ (new_rate * (1 - MOVING_AVG_ALPHA));
+ }
+
+ // reset counters
+ received_bytes_ = 0;
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = highest_seq_received_;
+
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ rounds_++;
+}
+
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+}
+
+void RTCState::updatePacketSize(const core::ContentObject &content_object) {
+ uint32_t pkt_size =
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) +
+ ((1 - MOVING_AVG_ALPHA) * pkt_size);
+}
+
+void RTCState::updatePathStats(const core::ContentObject &content_object,
+ bool is_nack) {
+ // get packet path
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ // compute rtt
+ uint32_t seq = content_object.getName().getSuffix();
+ uint64_t interest_sent_time = getInterestSentTime(seq);
+ if (interest_sent_time == 0)
+ return; // this should not happen,
+ // it means that we are processing an interest
+ // that is not pending
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint64_t RTT = now - interest_sent_time;
+
+ path->insertRttSample(RTT);
+
+ // compute OWD (the first part of the nack and data packet header are the
+ // same, so we cast to data data packet)
+ struct data_packet_t *packet =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t sender_timestamp = packet->getTimestamp();
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ // compute IAT or set path to producer
+ if (!is_nack) {
+ // compute the iat only for the content packets
+ uint32_t segment_number = content_object.getName().getSuffix();
+ path->computeInterArrivalGap(segment_number);
+ if (!path->pathToProducer()) received_data_from_cache_++;
+ } else {
+ path->receivedNack();
+ }
+}
+
+void RTCState::updateLossRate() {
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ uint32_t number_theorically_received_packets_ =
+ highest_seq_received_ - first_seq_in_round_;
+
+ // in this case no new packet was recevied after the previuos round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0) return;
+
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+
+ residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
+ (double)number_theorically_received_packets_);
+
+ if (residual_loss_rate_ < 0) residual_loss_rate_ = 0;
+}
+
+void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ pending_interests_.erase(seq);
+ if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
+ received_or_lost_packets_.erase(received_or_lost_packets_.begin());
+ }
+ // notice that it may happen that a packet that we consider lost arrives after
+ // some time, in this case we simply overwrite the packet state.
+ received_or_lost_packets_[seq] = state;
+
+ // keep track of the last packet received/lost
+ // without holes.
+ if (highest_seq_received_in_order_ < last_seq_nacked_) {
+ highest_seq_received_in_order_ = last_seq_nacked_;
+ }
+
+ if ((highest_seq_received_in_order_ + 1) == seq) {
+ highest_seq_received_in_order_ = seq;
+ } else if (seq <= highest_seq_received_in_order_) {
+ // here we do nothing
+ } else if (seq > highest_seq_received_in_order_) {
+ // 1) there is a gap in the sequence so we do not update largest_in_seq_
+ // 2) all the packets from largest_in_seq_ to seq are in
+ // received_or_lost_packets_ an we upate largest_in_seq_
+
+ for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
+ if (received_or_lost_packets_.find(i) ==
+ received_or_lost_packets_.end()) {
+ break;
+ }
+ // this packet is in order so we can update the
+ // highest_seq_received_in_order_
+ highest_seq_received_in_order_ = i;
+ }
+ }
+}
+
+void RTCState::setInitRttTimer(uint32_t wait){
+ init_rtt_timer_->cancel();
+ init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ init_rtt_timer_->async_wait([this](std::error_code ec) {
+ if(ec) return;
+ checkInitRttTimer();
+ });
+}
+
+void RTCState::checkInitRttTimer() {
+ if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){
+ // we didn't received enough probes, restart
+ received_probes_ = 0;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+ return;
+ }
+ init_rtt_ = true;
+ main_path_->roundEnd();
+ rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0);
+ rtt_probes_->sendProbes();
+
+ // init last_seq_nacked_. skip packets that may come from the cache
+ double prod_rate = getProducerRate();
+ double rtt = (double)getRTT() / MILLI_IN_A_SEC;
+ double packet_size = getAveragePacketSize();
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
+
+ discovered_rtt_callback_();
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
new file mode 100644
index 000000000..943a0a113
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -0,0 +1,253 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_data_path.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <map>
+#include <set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
+
+class RTCState : std::enable_shared_from_this<RTCState> {
+ public:
+ using DiscoveredRttCallback = std::function<void()>;
+ public:
+ RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service);
+
+ ~RTCState();
+
+ // packet events
+ void onSendNewInterest(const core::Name *interest_name);
+ void onTimeout(uint32_t seq);
+ void onRetransmission(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+ void onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats);
+ void onPacketLost(uint32_t seq);
+ void onPacketRecovered(uint32_t seq);
+ bool onProbePacketReceived(const core::ContentObject &probe);
+
+ // protocol state
+ void onNewRound(double round_len, bool in_sync);
+
+ // main path
+ uint32_t getProducerPath() const {
+ if (mainPathIsValid()) return main_path_->getPathId();
+ return 0;
+ }
+
+ // delay metrics
+ bool isRttDiscovered() const {
+ return init_rtt_;
+ }
+
+ uint64_t getRTT() const {
+ if (mainPathIsValid()) return main_path_->getMinRtt();
+ return 0;
+ }
+ void resetRttStats() {
+ if (mainPathIsValid()) main_path_->clearRtt();
+ }
+
+ double getQueuing() const {
+ if (mainPathIsValid()) return main_path_->getQueuingDealy();
+ return 0.0;
+ }
+ double getIAT() const {
+ if (mainPathIsValid()) return main_path_->getInterArrivalGap();
+ return 0.0;
+ }
+
+ double getJitter() const {
+ if (mainPathIsValid()) return main_path_->getJitter();
+ return 0.0;
+ }
+
+ // pending interests
+ uint64_t getInterestSentTime(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) return it->second;
+ return 0;
+ }
+ bool isPending(uint32_t seq) {
+ if (pending_interests_.find(seq) != pending_interests_.end()) return true;
+ return false;
+ }
+ uint32_t getPendingInterestNumber() const {
+ return pending_interests_.size();
+ }
+ PacketState isReceivedOrLost(uint32_t seq) {
+ auto it = received_or_lost_packets_.find(seq);
+ if (it != received_or_lost_packets_.end()) return it->second;
+ return PacketState::UNKNOWN;
+ }
+
+ // loss rate
+ double getLossRate() const { return loss_rate_; }
+ double getResidualLossRate() const { return residual_loss_rate_; }
+ uint32_t getHighestSeqReceivedInOrder() const {
+ return highest_seq_received_in_order_;
+ }
+ uint32_t getLostData() const { return packets_lost_; };
+ uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ // generic stats
+ uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedNacksInRound() const {
+ return received_nacks_last_round_;
+ }
+ uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
+ uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
+
+ // bandwidth/production metrics
+ double getAvailableBw() const { return 0.0; }; // TODO
+ double getProducerRate() const { return production_rate_; }
+ double getReceivedRate() const { return received_rate_; }
+ double getAveragePacketSize() const { return avg_packet_size_; }
+
+ // nacks
+ uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; }
+ uint32_t getLastSeqNacked() const { return last_seq_nacked_; }
+
+ // producer state
+ bool isProducerActive() const { return producer_is_active_; }
+
+ // packets from cache
+ double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() {
+ return pending_interests_.begin();
+ }
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() {
+ return pending_interests_.end();
+ }
+
+ private:
+ void initParams();
+
+ // update stats
+ void updateState();
+ void updateReceivedBytes(const core::ContentObject &content_object);
+ void updatePacketSize(const core::ContentObject &content_object);
+ void updatePathStats(const core::ContentObject &content_object, bool is_nack);
+ void updateLossRate();
+
+ void addRecvOrLost(uint32_t seq, PacketState state);
+
+ void setInitRttTimer(uint32_t wait);
+ void checkInitRttTimer();
+
+ bool mainPathIsValid() const {
+ if (main_path_ != nullptr)
+ return true;
+ else
+ return false;
+ }
+
+ // packets counters (total)
+ uint32_t sent_interests_;
+ uint32_t sent_rtx_;
+ uint32_t received_data_;
+ uint32_t received_nacks_;
+ uint32_t received_timeouts_;
+ uint32_t received_probes_;
+
+ // loss counters
+ int32_t packets_lost_;
+ int32_t losses_recovered_;
+ uint32_t first_seq_in_round_;
+ uint32_t highest_seq_received_;
+ uint32_t highest_seq_received_in_order_;
+ uint32_t last_seq_nacked_; // segment for which we got an oldNack
+ double loss_rate_;
+ double residual_loss_rate_;
+
+ // bw counters
+ uint32_t received_bytes_;
+ double avg_packet_size_;
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer
+
+ // nack counter
+ // the bool takes tracks only about the valid nacks (no rtx) and it is used to
+ // switch between the states. Instead received_nacks_last_round_ logs all the
+ // nacks for statistics
+ bool nack_on_last_round_;
+ uint32_t received_nacks_last_round_;
+
+ // packets counter
+ uint32_t received_packets_last_round_;
+ uint32_t received_data_last_round_;
+ uint32_t received_data_from_cache_;
+ double data_from_cache_rate_;
+ uint32_t sent_interests_last_round_;
+ uint32_t sent_rtx_last_round_;
+
+ // round conunters
+ uint32_t rounds_;
+ uint32_t rounds_without_nacks_;
+ uint32_t rounds_without_packets_;
+
+ // init rtt
+ uint64_t first_interest_sent_;
+
+ // producer state
+ bool
+ producer_is_active_; // the prodcuer is active if we receive some packets
+ uint32_t last_production_seq_; // last production seq received by the producer
+ uint64_t last_prod_update_; // timestamp of the last packets used to update
+ // stats from the producer
+
+ // paths stats
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
+ std::shared_ptr<RTCDataPath> main_path_;
+
+ // packet received
+ // cache where to store info about the last MAX_CACHED_PACKETS
+ std::map<uint32_t, PacketState> received_or_lost_packets_;
+
+ // pending interests
+ std::map<uint32_t, uint64_t> pending_interests_;
+
+ // probes
+ std::shared_ptr<ProbeHandler> rtt_probes_;
+ bool init_rtt_;
+ std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+
+ // callbacks
+ DiscoveredRttCallback discovered_rtt_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc
new file mode 100644
index 000000000..7a0803857
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.cc
@@ -0,0 +1,334 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc
+
+#include "trendline_estimator.h"
+
+#include <math.h>
+
+#include <algorithm>
+#include <string>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// Parameters for linear least squares fit of regression line to noisy data.
+constexpr double kDefaultTrendlineSmoothingCoeff = 0.9;
+constexpr double kDefaultTrendlineThresholdGain = 4.0;
+// const char kBweWindowSizeInPacketsExperiment[] =
+// "WebRTC-BweWindowSizeInPackets";
+
+/*size_t ReadTrendlineFilterWindowSize(
+ const WebRtcKeyValueConfig* key_value_config) {
+ std::string experiment_string =
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment);
+ size_t window_size;
+ int parsed_values =
+ sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size);
+ if (parsed_values == 1) {
+ if (window_size > 1)
+ return window_size;
+ RTC_LOG(WARNING) << "Window size must be greater than 1.";
+ }
+ RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets"
+ " experiment from field trial string. Using default.";
+ return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize;
+}
+*/
+
+OptionalDouble LinearFitSlope(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets) {
+ // RTC_DCHECK(packets.size() >= 2);
+ // Compute the "center of mass".
+ double sum_x = 0;
+ double sum_y = 0;
+ for (const auto& packet : packets) {
+ sum_x += packet.arrival_time_ms;
+ sum_y += packet.smoothed_delay_ms;
+ }
+ double x_avg = sum_x / packets.size();
+ double y_avg = sum_y / packets.size();
+ // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2
+ double numerator = 0;
+ double denominator = 0;
+ for (const auto& packet : packets) {
+ double x = packet.arrival_time_ms;
+ double y = packet.smoothed_delay_ms;
+ numerator += (x - x_avg) * (y - y_avg);
+ denominator += (x - x_avg) * (x - x_avg);
+ }
+ if (denominator == 0) return OptionalDouble();
+ return OptionalDouble(numerator / denominator);
+}
+
+OptionalDouble ComputeSlopeCap(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets,
+ const TrendlineEstimatorSettings& settings) {
+ /*RTC_DCHECK(1 <= settings.beginning_packets &&
+ settings.beginning_packets < packets.size());
+ RTC_DCHECK(1 <= settings.end_packets &&
+ settings.end_packets < packets.size());
+ RTC_DCHECK(settings.beginning_packets + settings.end_packets <=
+ packets.size());*/
+ TrendlineEstimator::PacketTiming early = packets[0];
+ for (size_t i = 1; i < settings.beginning_packets; ++i) {
+ if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i];
+ }
+ size_t late_start = packets.size() - settings.end_packets;
+ TrendlineEstimator::PacketTiming late = packets[late_start];
+ for (size_t i = late_start + 1; i < packets.size(); ++i) {
+ if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i];
+ }
+ if (late.arrival_time_ms - early.arrival_time_ms < 1) {
+ return OptionalDouble();
+ }
+ return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) /
+ (late.arrival_time_ms - early.arrival_time_ms) +
+ settings.cap_uncertainty);
+}
+
+constexpr double kMaxAdaptOffsetMs = 15.0;
+constexpr double kOverUsingTimeThreshold = 10;
+constexpr int kMinNumDeltas = 60;
+constexpr int kDeltaCounterMax = 1000;
+
+//} // namespace
+
+constexpr char TrendlineEstimatorSettings::kKey[];
+
+TrendlineEstimatorSettings::TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/) {
+ /*if (absl::StartsWith(
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment),
+ "Enabled")) {
+ window_size = ReadTrendlineFilterWindowSize(key_value_config);
+ }
+ Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/
+ window_size = kDefaultTrendlineWindowSize;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+
+ /*if (window_size < 10 || 200 < window_size) {
+ RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets";
+ window_size = kDefaultTrendlineWindowSize;
+ }
+ if (enable_cap) {
+ if (beginning_packets < 1 || end_packets < 1 ||
+ beginning_packets > window_size || end_packets > window_size) {
+ RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and "
+ << window_size;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (beginning_packets + end_packets > window_size) {
+ RTC_LOG(LS_WARNING)
+ << "Size of beginning plus end can't exceed the window size";
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) {
+ RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025";
+ cap_uncertainty = 0.0;
+ }
+ }*/
+}
+
+/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() {
+ return StructParametersParser::Create("sort", &enable_sort, //
+ "cap", &enable_cap, //
+ "beginning_packets",
+ &beginning_packets, //
+ "end_packets", &end_packets, //
+ "cap_uncertainty", &cap_uncertainty, //
+ "window_size", &window_size);
+}*/
+
+TrendlineEstimator::TrendlineEstimator(
+ /*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/)
+ : settings_(),
+ smoothing_coef_(kDefaultTrendlineSmoothingCoeff),
+ threshold_gain_(kDefaultTrendlineThresholdGain),
+ num_of_deltas_(0),
+ first_arrival_time_ms_(-1),
+ accumulated_delay_(0),
+ smoothed_delay_(0),
+ delay_hist_(),
+ k_up_(0.0087),
+ k_down_(0.039),
+ overusing_time_threshold_(kOverUsingTimeThreshold),
+ threshold_(12.5),
+ prev_modified_trend_(NAN),
+ last_update_ms_(-1),
+ prev_trend_(0.0),
+ time_over_using_(-1),
+ overuse_counter_(0),
+ hypothesis_(BandwidthUsage::kBwNormal){
+ // hypothesis_predicted_(BandwidthUsage::kBwNormal){//},
+ // network_state_predictor_(network_state_predictor) {
+ /* RTC_LOG(LS_INFO)
+ << "Using Trendline filter for delay change estimation with settings "
+ << settings_.Parser()->Encode() << " and "
+ // << (network_state_predictor_ ? "injected" : "no")
+ << " network state predictor";*/
+}
+
+TrendlineEstimator::~TrendlineEstimator() {}
+
+void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
+ double send_delta_ms,
+ int64_t send_time_ms,
+ int64_t arrival_time_ms,
+ size_t packet_size) {
+ const double delta_ms = recv_delta_ms - send_delta_ms;
+ ++num_of_deltas_;
+ num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
+ if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms;
+
+ // Exponential backoff filter.
+ accumulated_delay_ += delta_ms;
+ // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
+ // accumulated_delay_);
+ smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
+ (1 - smoothing_coef_) * accumulated_delay_;
+ // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
+ // smoothed_delay_);
+
+ // Maintain packet window
+ delay_hist_.emplace_back(
+ static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
+ smoothed_delay_, accumulated_delay_);
+ if (settings_.enable_sort) {
+ for (size_t i = delay_hist_.size() - 1;
+ i > 0 &&
+ delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
+ --i) {
+ std::swap(delay_hist_[i], delay_hist_[i - 1]);
+ }
+ }
+ if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front();
+
+ // Simple linear regression.
+ double trend = prev_trend_;
+ if (delay_hist_.size() == settings_.window_size) {
+ // Update trend_ if it is possible to fit a line to the data. The delay
+ // trend can be seen as an estimate of (send_rate - capacity)/capacity.
+ // 0 < trend < 1 -> the delay increases, queues are filling up
+ // trend == 0 -> the delay does not change
+ // trend < 0 -> the delay decreases, queues are being emptied
+ OptionalDouble trendO = LinearFitSlope(delay_hist_);
+ if (trendO.has_value()) trend = trendO.value();
+ if (settings_.enable_cap) {
+ OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_);
+ // We only use the cap to filter out overuse detections, not
+ // to detect additional underuses.
+ if (trend >= 0 && cap.has_value() && trend > cap.value()) {
+ trend = cap.value();
+ }
+ }
+ }
+ // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
+
+ Detect(trend, send_delta_ms, arrival_time_ms);
+}
+
+void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size, bool calculated_deltas) {
+ if (calculated_deltas) {
+ UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
+ packet_size);
+ }
+ /*if (network_state_predictor_) {
+ hypothesis_predicted_ = network_state_predictor_->Update(
+ send_time_ms, arrival_time_ms, hypothesis_);
+ }*/
+}
+
+BandwidthUsage TrendlineEstimator::State() const {
+ return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_;
+}
+
+void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
+ /*if (num_of_deltas_ < 2) {
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ return;
+ }*/
+
+ const double modified_trend =
+ std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
+ prev_modified_trend_ = modified_trend;
+ // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
+ // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
+ if (modified_trend > threshold_) {
+ if (time_over_using_ == -1) {
+ // Initialize the timer. Assume that we've been
+ // over-using half of the time since the previous
+ // sample.
+ time_over_using_ = ts_delta / 2;
+ } else {
+ // Increment timer
+ time_over_using_ += ts_delta;
+ }
+ overuse_counter_++;
+ if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
+ if (trend >= prev_trend_) {
+ time_over_using_ = 0;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwOverusing;
+ }
+ }
+ } else if (modified_trend < -threshold_) {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwUnderusing;
+ } else {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ }
+ prev_trend_ = trend;
+ UpdateThreshold(modified_trend, now_ms);
+}
+
+void TrendlineEstimator::UpdateThreshold(double modified_trend,
+ int64_t now_ms) {
+ if (last_update_ms_ == -1) last_update_ms_ = now_ms;
+
+ if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
+ // Avoid adapting the threshold to big latency spikes, caused e.g.,
+ // by a sudden capacity drop.
+ last_update_ms_ = now_ms;
+ return;
+ }
+
+ const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
+ const int64_t kMaxTimeDeltaMs = 100;
+ int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
+ threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
+ if (threshold_ < 6.f) threshold_ = 6.f;
+ if (threshold_ > 600.f) threshold_ = 600.f;
+ // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
+ last_update_ms_ = now_ms;
+}
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h
new file mode 100644
index 000000000..372acbc67
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.h
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h
+
+#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <deque>
+#include <memory>
+#include <utility>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class OptionalDouble {
+ public:
+ OptionalDouble() : val(0), has_val(false){};
+ OptionalDouble(double val) : val(val), has_val(true){};
+
+ double value() { return val; }
+ bool has_value() { return has_val; }
+
+ private:
+ double val;
+ bool has_val;
+};
+
+enum class BandwidthUsage {
+ kBwNormal = 0,
+ kBwUnderusing = 1,
+ kBwOverusing = 2,
+ kLast
+};
+
+struct TrendlineEstimatorSettings {
+ static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings";
+ static constexpr unsigned kDefaultTrendlineWindowSize = 20;
+
+ // TrendlineEstimatorSettings() = delete;
+ TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/);
+
+ // Sort the packets in the window. Should be redundant,
+ // but then almost no cost.
+ bool enable_sort = false;
+
+ // Cap the trendline slope based on the minimum delay seen
+ // in the beginning_packets and end_packets respectively.
+ bool enable_cap = false;
+ unsigned beginning_packets = 7;
+ unsigned end_packets = 7;
+ double cap_uncertainty = 0.0;
+
+ // Size (in packets) of the window.
+ unsigned window_size = kDefaultTrendlineWindowSize;
+
+ // std::unique_ptr<StructParametersParser> Parser();
+};
+
+class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ {
+ public:
+ TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/);
+
+ ~TrendlineEstimator();
+
+ // Update the estimator with a new sample. The deltas should represent deltas
+ // between timestamp groups as defined by the InterArrival class.
+ void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms,
+ int64_t arrival_time_ms, size_t packet_size,
+ bool calculated_deltas);
+
+ void UpdateTrendline(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size);
+
+ BandwidthUsage State() const;
+
+ struct PacketTiming {
+ PacketTiming(double arrival_time_ms, double smoothed_delay_ms,
+ double raw_delay_ms)
+ : arrival_time_ms(arrival_time_ms),
+ smoothed_delay_ms(smoothed_delay_ms),
+ raw_delay_ms(raw_delay_ms) {}
+ double arrival_time_ms;
+ double smoothed_delay_ms;
+ double raw_delay_ms;
+ };
+
+ private:
+ // friend class GoogCcStatePrinter;
+ void Detect(double trend, double ts_delta, int64_t now_ms);
+
+ void UpdateThreshold(double modified_offset, int64_t now_ms);
+
+ // Parameters.
+ TrendlineEstimatorSettings settings_;
+ const double smoothing_coef_;
+ const double threshold_gain_;
+ // Used by the existing threshold.
+ int num_of_deltas_;
+ // Keep the arrival times small by using the change from the first packet.
+ int64_t first_arrival_time_ms_;
+ // Exponential backoff filtering.
+ double accumulated_delay_;
+ double smoothed_delay_;
+ // Linear least squares regression.
+ std::deque<PacketTiming> delay_hist_;
+
+ const double k_up_;
+ const double k_down_;
+ double overusing_time_threshold_;
+ double threshold_;
+ double prev_modified_trend_;
+ int64_t last_update_ms_;
+ double prev_trend_;
+ double time_over_using_;
+ int overuse_counter_;
+ BandwidthUsage hypothesis_;
+ // BandwidthUsage hypothesis_predicted_;
+ // NetworkStatePredictor* network_state_predictor_;
+
+ // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator);
+};
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
+#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_