aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt20
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.cc101
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.h138
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc2
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h10
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc466
-rw-r--r--libtransport/src/protocols/rtc/rtc.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h35
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc6
-rw-r--r--libtransport/src/protocols/rtc/rtc_indexer.h195
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc212
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h26
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h2
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.cc79
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc6
-rw-r--r--libtransport/src/protocols/rtc/rtc_reassembly.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc152
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h62
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.cc334
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.h147
21 files changed, 981 insertions, 1150 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
index 77f065d0e..873b345d0 100644
--- a/libtransport/src/protocols/rtc/CMakeLists.txt
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -11,27 +11,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
-
list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
- ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
)
list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc
deleted file mode 100644
index e2d44ae66..000000000
--- a/libtransport/src/protocols/rtc/congestion_detection.cc
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/utils/log.h>
-#include <protocols/rtc/congestion_detection.h>
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-CongestionDetection::CongestionDetection()
- : cc_estimator_(), last_processed_chunk_() {}
-
-CongestionDetection::~CongestionDetection() {}
-
-void CongestionDetection::updateStats() {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if (chunks_number_.empty()) return;
-
- uint32_t chunk_number = chunks_number_.front();
-
- while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS <
- now ||
- chunks_[chunk_number].isComplete()) {
- if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) {
- chunks_[chunk_number].setPreviousSentTime(
- last_processed_chunk_.getSentTime());
-
- chunks_[chunk_number].setPreviousReceivedTime(
- last_processed_chunk_.getReceivedTime());
- cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(),
- chunks_[chunk_number].getSentDelta(),
- chunks_[chunk_number].getSentTime(),
- chunks_[chunk_number].getReceivedTime(),
- chunks_[chunk_number].getFrameSize(), true);
-
- } else {
- TRANSPORT_LOGD(
- "CongestionDetection::updateStats frame %u but not the \
- previous one, last one was %u currentFrame %u",
- chunk_number, last_processed_chunk_.getFrameSeqNum(),
- chunks_[chunk_number].getFrameSeqNum());
- }
-
- last_processed_chunk_ = chunks_[chunk_number];
-
- chunks_.erase(chunk_number);
-
- chunks_number_.pop();
- if (chunks_number_.empty()) break;
-
- chunk_number = chunks_number_.front();
- }
-}
-
-void CongestionDetection::addPacket(const core::ContentObject &content_object) {
- auto payload = content_object.getPayload();
- uint32_t payload_size = (uint32_t)payload->length();
- uint32_t segmentNumber = content_object.getName().getSuffix();
- // uint32_t pkt = segmentNumber & modMask_;
- uint64_t *sentTimePtr = (uint64_t *)payload->data();
-
- // this is just for testing with hiperf, assuming a frame is 10 pkts
- // in the final version, the split should be based on the timestamp in the pkt
- uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE);
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if (chunks_.find(frameNum) == chunks_.end()) {
- // new chunk of pkts or out of order
- if (last_processed_chunk_.getFrameSeqNum() > frameNum)
- return; // out of order and we already processed the chunk
-
- chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE);
- chunks_number_.push(frameNum);
- }
-
- chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size);
-}
-
-} // namespace rtc
-} // namespace protocol
-} // namespace transport
diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h
deleted file mode 100644
index 17f4aa54c..000000000
--- a/libtransport/src/protocols/rtc/congestion_detection.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/core/content_object.h>
-#include <protocols/rtc/trendline_estimator.h>
-
-#include <map>
-#include <queue>
-
-#define HICN_CC_STATS_CHUNK_SIZE 10
-#define HICN_CC_STATS_MAX_DELAY_MS 100
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-class FrameStats {
- public:
- FrameStats()
- : frame_num_(0),
- sent_time_(0),
- received_time_(0),
- previous_sent_time_(0),
- previous_received_time_(0),
- size_(0),
- received_pkt_m(0),
- burst_size_m(HICN_CC_STATS_CHUNK_SIZE){};
-
- FrameStats(uint32_t burst_size)
- : frame_num_(0),
- sent_time_(0),
- received_time_(0),
- previous_sent_time_(0),
- previous_received_time_(0),
- size_(0),
- received_pkt_m(0),
- burst_size_m(burst_size){};
-
- FrameStats(uint32_t frame_num, uint32_t burst_size)
- : frame_num_(frame_num),
- sent_time_(0),
- received_time_(0),
- previous_sent_time_(0),
- previous_received_time_(0),
- size_(0),
- received_pkt_m(0),
- burst_size_m(burst_size){};
-
- FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time,
- uint32_t size, FrameStats previousFrame, uint32_t burst_size)
- : frame_num_(frame_num),
- sent_time_(sent_time),
- received_time_(received_time),
- previous_sent_time_(previousFrame.getSentTime()),
- previous_received_time_(previousFrame.getReceivedTime()),
- size_(size),
- received_pkt_m(1),
- burst_size_m(burst_size){};
-
- void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) {
- size_ += size;
- sent_time_ =
- (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time);
- received_time_ = std::max(received_time, received_time_);
- received_pkt_m++;
- }
-
- bool isComplete() { return received_pkt_m == burst_size_m; }
-
- uint32_t getFrameSeqNum() const { return frame_num_; }
- uint64_t getSentTime() const { return sent_time_; }
- uint64_t getReceivedTime() const { return received_time_; }
- uint32_t getFrameSize() const { return size_; }
-
- void setPreviousReceivedTime(uint64_t time) {
- previous_received_time_ = time;
- }
- void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; }
-
- // todo manage first frame
- double getReceivedDelta() {
- return static_cast<double>(received_time_ - previous_received_time_);
- }
- double getSentDelta() {
- return static_cast<double>(sent_time_ - previous_sent_time_);
- }
-
- private:
- uint32_t frame_num_;
- uint64_t sent_time_;
- uint64_t received_time_;
-
- uint64_t previous_sent_time_;
- uint64_t previous_received_time_;
- uint32_t size_;
-
- uint32_t received_pkt_m;
- uint32_t burst_size_m;
-};
-
-class CongestionDetection {
- public:
- CongestionDetection();
- ~CongestionDetection();
-
- void addPacket(const core::ContentObject &content_object);
-
- BandwidthUsage getState() { return cc_estimator_.State(); }
-
- void updateStats();
-
- private:
- TrendlineEstimator cc_estimator_;
- std::map<uint32_t, FrameStats> chunks_;
- std::queue<uint32_t> chunks_number_;
-
- FrameStats last_processed_chunk_;
-};
-
-} // end namespace rtc
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index efba362d4..abaca6ad9 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -43,7 +43,7 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) {
std::chrono::steady_clock::now().time_since_epoch())
.count();
uint64_t rtt = now - it->second;
- if(rtt < 1) rtt = 1;
+ if (rtt < 1) rtt = 1;
pending_probes_.erase(it);
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
index b8ed84445..e34b23df0 100644
--- a/libtransport/src/protocols/rtc/probe_handler.h
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -14,9 +14,8 @@
*/
#pragma once
#include <hicn/transport/config.h>
+#include <hicn/transport/core/asio_wrapper.h>
-#include <asio.hpp>
-#include <asio/steady_timer.hpp>
#include <functional>
#include <random>
#include <unordered_map>
@@ -32,8 +31,7 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
using SendProbeCallback = std::function<void(uint32_t)>;
public:
- ProbeHandler(SendProbeCallback &&send_callback,
- asio::io_service &io_service);
+ ProbeHandler(SendProbeCallback &&send_callback, asio::io_service &io_service);
~ProbeHandler();
@@ -53,8 +51,8 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
private:
uint32_t probe_interval_; // us
- uint32_t max_probes_; // packets
- uint32_t sent_probes_; // packets
+ uint32_t max_probes_; // packets
+ uint32_t sent_probes_; // packets
std::unique_ptr<asio::steady_timer> probe_timer_;
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index 46659ac74..0cb4cda1d 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -17,8 +17,11 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
#include <math.h>
+#include <protocols/errors.h>
+#include <protocols/incremental_indexer_bytestream.h>
#include <protocols/rtc/rtc.h>
#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_indexer.h>
#include <protocols/rtc/rtc_rc_queue.h>
#include <algorithm>
@@ -33,39 +36,41 @@ using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
- : TransportProtocol(icn_socket, nullptr),
- DatagramReassembly(icn_socket, this),
+ : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
+ new DatagramReassembly(icn_socket, this)),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
scheduler_timer_ =
std::make_unique<asio::steady_timer>(portal_->getIoService());
+ pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
}
RTCTransportProtocol::~RTCTransportProtocol() {}
void RTCTransportProtocol::resume() {
- if (is_running_) return;
-
- is_running_ = true;
-
newRound();
+ TransportProtocol::resume();
+}
- portal_->runEventsLoop();
- is_running_ = false;
+std::size_t RTCTransportProtocol::transportHeaderLength() {
+ return DATA_HEADER_SIZE +
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0);
}
// private
void RTCTransportProtocol::initParams() {
- portal_->setConsumerCallback(this);
+ TransportProtocol::reset();
rc_ = std::make_shared<RTCRateControlQueue>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ indexer_verifier_.get(),
std::bind(&RTCTransportProtocol::sendRtxInterest, this,
std::placeholders::_1),
portal_->getIoService());
state_ = std::make_shared<RTCState>(
+ indexer_verifier_.get(),
std::bind(&RTCTransportProtocol::sendProbeInterest, this,
std::placeholders::_1),
std::bind(&RTCTransportProtocol::discoveredRtt, this),
@@ -83,8 +88,27 @@ void RTCTransportProtocol::initParams() {
// Cancel timer
number_++;
round_timer_->cancel();
+
scheduler_timer_->cancel();
scheduler_timer_on_ = false;
+ last_interest_sent_time_ = 0;
+ last_interest_sent_seq_ = 0;
+
+#if 0
+ if(portal_->isConnectedToFwd()){
+ max_aggregated_interest_ = 1;
+ }else{
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+ }
+#else
+ max_aggregated_interest_ = 1;
+#endif
+
+ max_sent_int_ =
+ std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
+
+ pacing_timer_->cancel();
+ pacing_timer_on_ = false;
// delete all timeouts and future nacks
timeouts_or_nacks_.clear();
@@ -93,16 +117,28 @@ void RTCTransportProtocol::initParams() {
current_sync_win_ = INITIAL_WIN;
max_sync_win_ = INITIAL_WIN_MAX;
- // names/packets var
- next_segment_ = 0;
-
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
RTC_INTEREST_LIFETIME);
+
+ // FEC
+ using namespace std::placeholders;
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1),
+ /* We leave the buffer allocation to the fec decoder */
+ fec::FECBase::BufferRequested(0));
+
+ if (fec_decoder_) {
+ indexer_verifier_->enableFec(fec_type_);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
+ fec::FECUtils::getSourceSymbols(fec_type_));
+ } else {
+ indexer_verifier_->disableFec();
+ }
}
// private
void RTCTransportProtocol::reset() {
- TRANSPORT_LOGD("reset called");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called";
initParams();
newRound();
}
@@ -113,11 +149,13 @@ void RTCTransportProtocol::inactiveProducer() {
current_sync_win_ = INITIAL_WIN;
max_sync_win_ = INITIAL_WIN_MAX;
- TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
- max_sync_win_);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_
+ << ", max_sync_win_: " << max_sync_win_;
// names/packets var
- next_segment_ = 0;
+ indexer_verifier_->reset();
+ indexer_verifier_->enableFec(fec_type_);
+ indexer_verifier_->setNFec(0);
ldr_->clear();
}
@@ -137,10 +175,13 @@ void RTCTransportProtocol::newRound() {
uint32_t received_bytes = state_->getReceivedBytesInRound();
uint32_t sent_interest = state_->getSentInterestInRound();
uint32_t lost_data = state_->getLostData();
+ uint32_t definitely_lost = state_->getDefinitelyLostPackets();
uint32_t recovered_losses = state_->getRecoveredLosses();
uint32_t received_nacks = state_->getReceivedNacksInRound();
+ uint32_t received_fec = state_->getReceivedFecPackets();
bool in_sync = (current_state_ == SyncState::in_sync);
+ ldr_->onNewRound(in_sync);
state_->onNewRound((double)ROUND_LEN, in_sync);
rc_->onNewRound((double)ROUND_LEN);
@@ -161,11 +202,13 @@ void RTCTransportProtocol::newRound() {
}
}
- TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Calling updateSyncWindow in newRound function";
updateSyncWindow();
sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
- recovered_losses, received_nacks);
+ definitely_lost, recovered_losses, received_nacks,
+ received_fec);
newRound();
});
}
@@ -173,6 +216,7 @@ void RTCTransportProtocol::newRound() {
void RTCTransportProtocol::discoveredRtt() {
start_send_interest_ = true;
ldr_->turnOnRTX();
+ ldr_->onNewRound(false);
updateSyncWindow();
}
@@ -182,22 +226,23 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
if (production_rate == 0.0 || packet_size == 0.0) {
// the consumer has no info about the producer,
// keep the previous maxCWin
- TRANSPORT_LOGD(
- "Returning in computeMaxSyncWindow because: prod_rate: %d || "
- "packet_size: %d",
- (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Returning in computeMaxSyncWindow because: prod_rate: "
+ << (production_rate == 0.0)
+ << " || packet_size: " << (packet_size == 0.0);
return;
}
+ production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
+
uint32_t lifetime = default_values::interest_lifetime;
socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
lifetime);
double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
-
- max_sync_win_ =
- (uint32_t)ceil((production_rate * lifetime_ms *
- INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+ max_sync_win_ = (uint32_t)ceil(
+ (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) /
+ packet_size);
max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
}
@@ -219,12 +264,25 @@ void RTCTransportProtocol::updateSyncWindow() {
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
- current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- current_sync_win_ += (uint32_t)
- ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+ double fec_interest_overhead = (double)state_->getPendingFecPackets() /
+ (double)(state_->getPendingInterestNumber() -
+ state_->getPendingFecPackets());
+
+ double fec_overhead =
+ std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead);
+
+ prod_rate += (prod_rate * fec_overhead);
- if(current_state_ == SyncState::catch_up) {
- current_sync_win_ = (uint32_t) (current_sync_win_ * CATCH_UP_WIN_INCREMENT);
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ uint32_t buffer = PRODUCER_BUFFER_MS;
+ if (rtt > 150)
+ buffer = buffer * 2; // if the RTT is too large we increase the
+ // the size of the buffer
+ current_sync_win_ +=
+ ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
+
+ if (current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
}
current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
@@ -243,70 +301,48 @@ void RTCTransportProtocol::decreaseSyncWindow() {
scheduleNextInterests();
}
-void RTCTransportProtocol::sendInterest(Name *interest_name) {
- TRANSPORT_LOGD("Sending interest for name %s",
- interest_name->toString().c_str());
-
- auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
- interest->setName(*interest_name);
-
- uint32_t lifetime = default_values::interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- interest->setLifetime(uint32_t(lifetime));
-
- if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
- }
-
- portal_->sendInterest(std::move(interest));
-}
-
void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
- if(!start_send_interest_) return;
+ if (!start_send_interest_) return;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send rtx %u", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq;
interest_name->setSuffix(seq);
- sendInterest(interest_name);
+ sendInterest(*interest_name);
}
void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send probe %u", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq;
interest_name->setSuffix(seq);
- sendInterest(interest_name);
+ sendInterest(*interest_name);
}
void RTCTransportProtocol::scheduleNextInterests() {
- TRANSPORT_LOGD("Schedule next interests");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
- if(!start_send_interest_) return; // RTT discovering phase is not finished so
- // do not start to send interests
+ if (pacing_timer_on_) return; // wait pacing timer for the next send
- if (scheduler_timer_on_) return; // wait befor send other interests
+ if (!start_send_interest_)
+ return; // RTT discovering phase is not finished so
+ // do not start to send interests
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
- TRANSPORT_LOGD("Inactive producer.");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer.";
// here we keep seding the same interest until the producer
// does not start again
- if (next_segment_ != 0) {
+ if (indexer_verifier_->checkNextSuffix() != 0) {
// the producer just become inactive, reset the state
inactiveProducer();
}
@@ -315,125 +351,208 @@ void RTCTransportProtocol::scheduleNextInterests() {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send interest %u", next_segment_);
- interest_name->setSuffix(next_segment_);
+ uint32_t next_seg = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg;
+ interest_name->setSuffix(next_seg);
if (portal_->interestIsPending(*interest_name)) {
// if interest 0 is already pending we return
return;
}
- sendInterest(interest_name);
+ sendInterest(*interest_name);
state_->onSendNewInterest(interest_name);
return;
}
- TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
- state_->getPendingInterestNumber(), current_sync_win_);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Pending interest number: " << state_->getPendingInterestNumber()
+ << " -- current_sync_win_: " << current_sync_win_;
+
+ uint32_t pending = state_->getPendingInterestNumber();
+ if (pending >= current_sync_win_) return; // no space in the window
+
+ if ((current_sync_win_ - pending) < max_aggregated_interest_) {
+ if (scheduler_timer_on_) return; // timer already scheduled
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint64_t time = now - last_interest_sent_time_;
+ if (time < WAIT_FOR_INTEREST_BATCH) {
+ uint64_t next = WAIT_FOR_INTEREST_BATCH - time;
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(std::chrono::milliseconds(next));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ return; // whait for the timer
+ }
+ }
+
+ scheduler_timer_on_ = false;
+ scheduler_timer_->cancel();
// skip nacked pacekts
- if (next_segment_ <= state_->getLastSeqNacked()) {
- next_segment_ = state_->getLastSeqNacked() + 1;
+ if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) {
+ indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1);
}
// skipe received packets
- if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
- next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ if (indexer_verifier_->checkNextSuffix() <=
+ state_->getHighestSeqReceivedInOrder()) {
+ indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
}
uint32_t sent_interests = 0;
+ uint32_t sent_packets = 0;
+ uint32_t aggregated_counter = 0;
+ Name *name = nullptr;
+ Name interest_name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes;
+
while ((state_->getPendingInterestNumber() < current_sync_win_) &&
- (sent_interests < MAX_INTERESTS_IN_BATCH)) {
- TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ (sent_interests < max_sent_int_)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "In while loop. Window size: " << current_sync_win_;
+
+ uint32_t next_seg = indexer_verifier_->getNextSuffix();
- interest_name->setSuffix(next_segment_);
+ name->setSuffix(next_seg);
// send the packet only if:
// 1) it is not pending yet (not true for rtx)
// 2) the packet is not received or lost
// 3) is not in the rtx list
- if (portal_->interestIsPending(*interest_name) ||
- state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
- ldr_->isRtx(next_segment_)) {
- TRANSPORT_LOGD(
- "skip interest %u because: pending %u, recv %u, rtx %u",
- next_segment_, (portal_->interestIsPending(*interest_name)),
- (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
- (ldr_->isRtx(next_segment_)));
- next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ // 4) is fec and is not in order (!= last sent + 1)
+ if (portal_->interestIsPending(*name) ||
+ state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_seg) ||
+ (indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "skip interest " << next_seg << " because: pending "
+ << portal_->interestIsPending(*name) << ", recv "
+ << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN)
+ << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec "
+ << ((indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1));
continue;
}
+ if (aggregated_counter == 0) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg;
+ interest_name = *name;
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg;
+ additional_suffixes[aggregated_counter - 1] = next_seg;
+ }
- sent_interests++;
- TRANSPORT_LOGD("send interest %u", next_segment_);
- sendInterest(interest_name);
- state_->onSendNewInterest(interest_name);
+ last_interest_sent_seq_ = next_seg;
+ state_->onSendNewInterest(name);
+ aggregated_counter++;
+
+ if (aggregated_counter >= max_aggregated_interest_) {
+ sent_packets++;
+ sent_interests++;
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
+ last_interest_sent_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ aggregated_counter = 0;
+ }
+ }
- next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ // exiting the while we may have some pending interest to send
+ if (aggregated_counter != 0) {
+ sent_packets++;
+ last_interest_sent_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
}
if (state_->getPendingInterestNumber() < current_sync_win_) {
- // we still have space in the window but we already sent a batch of
- // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
- // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+ // we still have space in the window but we already sent too many packets
+ // wait PACING_WAIT to avoid drops in the kernel
- scheduler_timer_on_ = true;
- scheduler_timer_->expires_from_now(
- std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ pacing_timer_on_ = true;
+ pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT));
scheduler_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
- if (!scheduler_timer_on_) return;
+ if (!pacing_timer_on_) return;
- scheduler_timer_on_ = false;
+ pacing_timer_on_ = false;
scheduleNextInterests();
});
}
}
-void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- uint32_t segment_number = interest->getName().getSuffix();
-
- TRANSPORT_LOGD("timeout for packet %u", segment_number);
+void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
+ const Name &name) {
+ uint32_t segment_number = name.getSuffix();
if (segment_number >= MIN_PROBE_SEQ) {
// this is a timeout on a probe, do nothing
return;
}
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ if (state != PacketState::UNKNOWN) {
+ // we may recover a packets using fec, ignore this timer
+ return;
+ }
+
timeouts_or_nacks_.insert(segment_number);
if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
- segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ segment_number <= state_->getHighestSeqReceived()) {
// we retransmit packets only if the producer is active, otherwise we
// use timeouts to avoid to send too much traffic
//
// a timeout is sent using RTX only if it is an old packet. if it is for a
// seq number that we didn't reach yet, we send the packet using the normal
// schedule next interest
- TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
- ldr_->onTimeout(segment_number);
- state_->onTimeout(segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "handle timeout for packet " << segment_number << " using rtx";
+ if (ldr_->isRtxOn()) {
+ ldr_->onTimeout(segment_number);
+ if (indexer_verifier_->isFec(segment_number))
+ state_->onTimeout(segment_number, true);
+ else
+ state_->onTimeout(segment_number, false);
+ } else {
+ // in this case we wil never recover the timeout
+ state_->onTimeout(segment_number, true);
+ }
scheduleNextInterests();
return;
}
- TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
- segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number
+ << " using normal interests";
- if (segment_number < next_segment_) {
+ if (segment_number < indexer_verifier_->checkNextSuffix()) {
// this is a timeout for a packet that will be generated in the future but
// we are asking for higher sequence numbers. we need to go back like in the
// case of future nacks
- TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
- next_segment_, segment_number);
- next_segment_ = segment_number;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << segment_number;
+ // add an extra space in the window
+ current_sync_win_++;
+ indexer_verifier_->jumpToIndex(segment_number);
}
- state_->onTimeout(segment_number);
+ state_->onTimeout(segment_number, false);
scheduleNextInterests();
}
@@ -446,8 +565,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// check if the packet got a timeout
- TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
- production_seg);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment
+ << ". Production segment: " << production_seg;
bool compute_stats = true;
auto tn_it = timeouts_or_nacks_.find(nack_segment);
@@ -459,14 +578,15 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
state_->onNackPacketReceived(content_object, compute_stats);
ldr_->onNackPacketReceived(content_object);
- // both in case of past and future nack we set next_segment_ equal to the
+ // both in case of past and future nack we jump to the
// production segment in the nack. In case of past nack we will skip unneded
// interest (this is already done in the scheduleNextInterest in any case)
// while in case of future nacks we can go back in time and ask again for the
// content that generated the nack
- TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
- next_segment_, production_seg);
- next_segment_ = production_seg;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On nack next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << production_seg;
+ indexer_verifier_->jumpToIndex(production_seg);
if (production_seg > nack_segment) {
// remove the nack is it exists
@@ -496,30 +616,33 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
bool valid = state_->onProbePacketReceived(content_object);
- if(!valid) return;
+ if (!valid) return;
struct nack_packet_t *probe =
(struct nack_packet_t *)content_object.getPayload()->data();
uint32_t production_seg = probe->getProductionSegement();
- // as for the nacks set next_segment_
- TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
- next_segment_, production_seg);
- next_segment_ = production_seg;
+ // as for the nacks set next_segment
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "on probe next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << production_seg;
+ indexer_verifier_->jumpToIndex(production_seg);
ldr_->onProbePacketReceived(content_object);
updateSyncWindow();
}
-void RTCTransportProtocol::onContentObject(Interest &interest,
- ContentObject &content_object) {
- TRANSPORT_LOGD("Received content object of size: %zu",
- content_object.payloadSize());
- uint32_t payload_size = (uint32_t) content_object.payloadSize();
+void RTCTransportProtocol::onContentObjectReceived(
+ Interest &interest, ContentObject &content_object, std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received content object of size: " << content_object.payloadSize();
+ uint32_t payload_size = content_object.payloadSize();
uint32_t segment_number = content_object.getName().getSuffix();
+ ec = make_error_code(protocol_error::not_reassemblable);
+
if (segment_number >= MIN_PROBE_SEQ) {
- TRANSPORT_LOGD("Received probe %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
}
@@ -528,7 +651,7 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
}
if (payload_size == NACK_HEADER_SIZE) {
- TRANSPORT_LOGD("Received nack %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
}
@@ -536,9 +659,8 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
return;
}
- TRANSPORT_LOGD("Received content %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number;
- rc_->onDataPacketReceived(content_object);
bool compute_stats = true;
auto tn_it = timeouts_or_nacks_.find(segment_number);
if (tn_it != timeouts_or_nacks_.end()) {
@@ -551,25 +673,49 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
// check if the packet was already received
PacketState state = state_->isReceivedOrLost(segment_number);
- state_->onDataPacketReceived(content_object, compute_stats);
- ldr_->onDataPacketReceived(content_object);
- // if the stat for this seq number is received do not send the packet to app
if (state != PacketState::RECEIVED) {
- if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ // send packet to decoder
+ if (fec_decoder_) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "send packet " << segment_number << " to FEC decoder";
+ fec_decoder_->onDataPacket(
+ content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE);
+ }
+ if (!indexer_verifier_->isFec(segment_number)) {
+ // the packet may be alredy sent to the ap by the decoder, check again if
+ // it is already received
+ state = state_->isReceivedOrLost(segment_number);
+ if (state != PacketState::RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number;
+
+ state_->onDataPacketReceived(content_object, compute_stats);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ ec = make_error_code(protocol_error::success);
+ }
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number;
+ state_->onFecPacketReceived(content_object);
}
- reassemble(content_object);
} else {
- TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received duplicated content " << segment_number << ", drop it";
+ ec = make_error_code(protocol_error::duplicated_content);
}
+ ldr_->onDataPacketReceived(content_object);
+ rc_->onDataPacketReceived(content_object);
+
updateSyncWindow();
}
void RTCTransportProtocol::sendStatsToApp(
uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
- uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses,
+ uint32_t received_nacks, uint32_t received_fec) {
if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(state_->getQueuing());
@@ -581,23 +727,35 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateBytesRecv(received_bytes);
stats_->updateInterestTx(sent_interests);
stats_->updateReceivedNacks(received_nacks);
+ stats_->updateReceivedFEC(received_fec);
stats_->updateAverageWindowSize(current_sync_win_);
stats_->updateLossRatio(state_->getLossRate());
stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
+ stats_->updateDefinitelyLostData(definitely_lost);
stats_->updateRecoveredData(recovered_losses);
stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
(*stats_summary_)(*socket_->getInterface(), *stats_);
}
}
-void RTCTransportProtocol::reassemble(ContentObject &content_object) {
- auto read_buffer = content_object.getPayload();
- TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
- read_buffer->trimStart(DATA_HEADER_SIZE);
- Reassembly::read_buffer_ = std::move(read_buffer);
- Reassembly::notifyApplication();
+void RTCTransportProtocol::onFecPackets(
+ std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+ for (auto &packet : packets) {
+ PacketState state = state_->isReceivedOrLost(packet.first);
+ if (state != PacketState::RECEIVED) {
+ state_->onPacketRecoveredFec(packet.first);
+ ldr_->onPacketRecoveredFec(packet.first);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Recovered packet " << packet.first << " through FEC.";
+ reassembly_->reassemble(*packet.second, packet.first);
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Packet" << packet.first << "already received.";
+ }
+ }
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index 596887067..e6431264d 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -30,8 +30,7 @@ namespace protocol {
namespace rtc {
-class RTCTransportProtocol : public TransportProtocol,
- public DatagramReassembly {
+class RTCTransportProtocol : public TransportProtocol {
public:
RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
@@ -43,6 +42,8 @@ class RTCTransportProtocol : public TransportProtocol,
void resume() override;
+ std::size_t transportHeaderLength() override;
+
private:
enum class SyncState { catch_up = 0, in_sync = 1, last };
@@ -63,24 +64,28 @@ class RTCTransportProtocol : public TransportProtocol,
void decreaseSyncWindow();
// packet functions
- void sendInterest(Name *interest_name);
void sendRtxInterest(uint32_t seq);
void sendProbeInterest(uint32_t seq);
void scheduleNextInterests() override;
- void onTimeout(Interest::Ptr &&interest) override;
+ void onInterestTimeout(Interest::Ptr &interest, const Name &name) override;
void onNack(const ContentObject &content_object);
void onProbe(const ContentObject &content_object);
- void reassemble(ContentObject &content_object) override;
- void onContentObject(Interest &interest,
- ContentObject &content_object) override;
- void onPacketDropped(Interest &interest,
- ContentObject &content_object) override {}
+ void onContentObjectReceived(Interest &interest,
+ ContentObject &content_object,
+ std::error_code &ec) override;
+ void onPacketDropped(Interest &interest, ContentObject &content_object,
+ const std::error_code &reason) override {}
void onReassemblyFailed(std::uint32_t missing_segment) override {}
// interaction with app functions
void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
uint32_t sent_interests, uint32_t lost_data,
- uint32_t recovered_losses, uint32_t received_nacks);
+ uint32_t definitely_lost, uint32_t recovered_losses,
+ uint32_t received_nacks, uint32_t received_fec);
+
+ // FEC functions
+ void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets);
+
// protocol state
bool start_send_interest_;
SyncState current_state_;
@@ -88,17 +93,30 @@ class RTCTransportProtocol : public TransportProtocol,
uint32_t current_sync_win_;
uint32_t max_sync_win_;
- // controller var
+ // round timer
std::unique_ptr<asio::steady_timer> round_timer_;
+
+ // scheduler timer (postpone interest sending to explot aggregated interests)
std::unique_ptr<asio::steady_timer> scheduler_timer_;
bool scheduler_timer_on_;
+ uint64_t last_interest_sent_time_;
+ uint64_t last_interest_sent_seq_;
+
+ // maximum aggregated interest. if the transport is connected to the forwarder
+ // we cannot use aggregated interests
+ uint32_t max_aggregated_interest_;
+ // maximum number of intereset that can be sent in a loop to avoid packets
+ // dropped by the kernel
+ uint32_t max_sent_int_;
+
+ // pacing timer (do not send too many interests in a short time to avoid
+ // packet drops in the kernel)
+ std::unique_ptr<asio::steady_timer> pacing_timer_;
+ bool pacing_timer_on_;
// timeouts
std::unordered_set<uint32_t> timeouts_or_nacks_;
- // names/packets var
- uint32_t next_segment_;
-
std::shared_ptr<RTCState> state_;
std::shared_ptr<RTCRateControl> rc_;
std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index e172fc7a1..d04bc1b1f 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -37,12 +37,26 @@ const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
const uint32_t PRODUCER_BUFFER_MS = 200; // ms
// interest scheduler
-const uint32_t MAX_INTERESTS_IN_BATCH = 5;
-const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+// const uint32_t MAX_INTERESTS_IN_BATCH = 5;
+// const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per
+ // aggregated interest packet
+ // considering the name itself
+const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try
+ // to aggregate interest in the
+ // same packet
+const uint32_t MAX_PACING_BATCH = 5;
+// number of interest that we can send inside
+// the loop before they get dropped by the
+// kernel.
+const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As
+ // for MAX_PACING_BATCH this value was
+ // computed during tests
+const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop
// packet const
const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes
-const uint32_t RTC_INTEREST_LIFETIME = 1000;
+const uint32_t RTC_INTEREST_LIFETIME = 2000;
// probes sequence range
const uint32_t MIN_PROBE_SEQ = 0xefffffff;
@@ -51,19 +65,18 @@ const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
// RTT_PROBE_INTERVAL will be used during the section while
// INIT_RTT_PROBE_INTERVAL is used at the beginning to
// quickily estimate the RTT
-const uint32_t RTT_PROBE_INTERVAL = 200000; // us
-const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
-const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
+const uint32_t RTT_PROBE_INTERVAL = 200000; // us
+const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
+const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
// if the produdcer is not yet started we need to probe multple times
// to get an answer. we wait 100ms between each try
-const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
+const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
// once we get the first probe we wait at most 60ms for the others
-const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
// we reuires at least 5 probes to be recevied
-const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms
+const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms
const uint32_t MAX_PENDING_PROBES = 10;
-
// congestion
const double MAX_QUEUING_DELAY = 100.0; // ms
@@ -97,7 +110,7 @@ const double MAX_CACHED_PACKETS = 262144; // 2^18
// about 50 sec of traffic at 50Mbps
// with 1200 bytes packets
-const uint32_t MAX_ROUND_WHIOUT_PACKETS = (const uint32_t)
+const uint32_t MAX_ROUND_WHIOUT_PACKETS =
(20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
// used in ldr
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
index a545225cb..c098088a3 100644
--- a/libtransport/src/protocols/rtc/rtc_data_path.cc
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -69,7 +69,7 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
if (avg_owd != DBL_MAX)
avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
else {
- avg_owd = (double)owd;
+ avg_owd = owd;
}
int64_t queueVal = owd - std::min(getMinOwd(), min_owd);
@@ -77,7 +77,7 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
if (queuing_delay != DBL_MAX)
queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC);
else {
- queuing_delay = (double)queueVal;
+ queuing_delay = queueVal;
}
// keep track of the jitter computed as for RTP (RFC 3550)
@@ -100,7 +100,7 @@ void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
largest_recv_seq_ = segment_number;
largest_recv_seq_time_ = now;
if (avg_inter_arrival_ == DBL_MAX)
- avg_inter_arrival_ = (double)delta;
+ avg_inter_arrival_ = delta;
else
avg_inter_arrival_ =
(avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h
new file mode 100644
index 000000000..4aee242bb
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_indexer.h
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/errors.h>
+#include <protocols/fec_utils.h>
+#include <protocols/indexer.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/transport_protocol.h>
+
+#include <deque>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+namespace rtc {
+
+template <uint32_t LIMIT = MIN_PROBE_SEQ>
+class RtcIndexer : public Indexer {
+ public:
+ RtcIndexer(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport)
+ : Indexer(icn_socket, transport),
+ first_suffix_(1),
+ next_suffix_(first_suffix_),
+ fec_type_(fec::FECType::UNKNOWN),
+ n_fec_(0),
+ n_current_fec_(n_fec_) {}
+
+ RtcIndexer(RtcIndexer &&other) : Indexer(std::forward<Indexer>(other)) {}
+
+ ~RtcIndexer() {}
+
+ void reset() override {
+ next_suffix_ = first_suffix_;
+ n_fec_ = 0;
+ }
+
+ uint32_t checkNextSuffix() override { return next_suffix_; }
+
+ uint32_t getNextSuffix() override {
+ if (isFec(next_suffix_)) {
+ if (n_current_fec_) {
+ auto ret = next_suffix_++;
+ n_current_fec_--;
+ return ret;
+ } else {
+ n_current_fec_ = n_fec_;
+ next_suffix_ = nextSource(next_suffix_);
+ }
+ } else if (!n_current_fec_) {
+ n_current_fec_ = n_fec_;
+ }
+
+ return (next_suffix_++ % LIMIT);
+ }
+
+ void setFirstSuffix(uint32_t suffix) override {
+ first_suffix_ = suffix % LIMIT;
+ }
+
+ uint32_t getFirstSuffix() override { return first_suffix_; }
+
+ uint32_t jumpToIndex(uint32_t index) override {
+ next_suffix_ = index % LIMIT;
+ return next_suffix_;
+ }
+
+ void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object,
+ bool reassembly) override {
+ setVerifier();
+ auto ret = verifier_->verifyPackets(&content_object);
+
+ switch (ret) {
+ case auth::VerificationPolicy::ACCEPT: {
+ if (reassembly) {
+ reassembly_->reassemble(content_object);
+ }
+ break;
+ }
+
+ case auth::VerificationPolicy::UNKNOWN:
+ case auth::VerificationPolicy::DROP: {
+ transport_->onPacketDropped(
+ interest, content_object,
+ make_error_code(protocol_error::verification_failed));
+ break;
+ }
+
+ case auth::VerificationPolicy::ABORT: {
+ transport_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+ }
+
+ /**
+ * Retrieve the next segment to be reassembled.
+ */
+ uint32_t getNextReassemblySegment() override {
+ throw errors::RuntimeException(
+ "Get reassembly segment called on rtc indexer. RTC indexer does not "
+ "provide "
+ "reassembly.");
+ }
+
+ bool isFinalSuffixDiscovered() override { return true; }
+
+ uint32_t getFinalSuffix() override { return LIMIT; }
+
+ void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; }
+
+ void disableFec() override { fec_type_ = fec::FECType::UNKNOWN; }
+
+ void setNFec(uint32_t n_fec) override {
+ n_fec_ = n_fec;
+ n_current_fec_ = n_fec_;
+ }
+
+ uint32_t getNFec() override { return n_fec_; }
+
+ bool isFec(uint32_t index) override {
+ return isFec(fec_type_, index, first_suffix_);
+ }
+
+ double getFecOverhead() override {
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return 0;
+ }
+
+ double k = (double)fec::FECUtils::getSourceSymbols(fec_type_);
+ return (double)n_fec_ / k;
+ }
+
+ double getMaxFecOverhead() override {
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return 0;
+ }
+
+ double k = (double)fec::FECUtils::getSourceSymbols(fec_type_);
+ double n = (double)fec::FECUtils::getBlockSymbols(fec_type_);
+ return (double)(n - k) / k;
+ }
+
+ static bool isFec(fec::FECType fec_type, uint32_t index,
+ uint32_t first_suffix) {
+ if (index < LIMIT) {
+ return fec::FECUtils::isFec(fec_type, index, first_suffix);
+ }
+
+ return false;
+ }
+
+ static uint32_t nextSource(fec::FECType fec_type, uint32_t index,
+ uint32_t first_suffix) {
+ return fec::FECUtils::nextSource(fec_type, index, first_suffix) % LIMIT;
+ }
+
+ private:
+ uint32_t nextSource(uint32_t index) {
+ return nextSource(fec_type_, index, first_suffix_);
+ }
+
+ private:
+ uint32_t first_suffix_;
+ uint32_t next_suffix_;
+ fec::FECType fec_type_;
+ bool fec_enabled_;
+ uint32_t n_fec_;
+ uint32_t n_current_fec_;
+};
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index 0ef381fe1..f0de48871 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_ldr.h>
@@ -26,11 +27,13 @@ namespace protocol {
namespace rtc {
RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
- SendRtxCallback &&callback, asio::io_service &io_service)
+ Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service)
: rtx_on_(false),
+ fec_on_(false),
next_rtx_timer_(MAX_TIMER_RTX),
last_event_(0),
sentinel_timer_interval_(MAX_TIMER_RTX),
+ indexer_(indexer),
send_rtx_callback_(std::move(callback)) {
timer_ = std::make_unique<asio::steady_timer>(io_service);
sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
@@ -40,7 +43,7 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
void RTCLossDetectionAndRecovery::turnOnRTX() {
rtx_on_ = true;
- scheduleSentinelTimer((uint32_t)(state_->getRTT() * CATCH_UP_RTT_INCREMENT));
+ scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
}
void RTCLossDetectionAndRecovery::turnOffRTX() {
@@ -48,6 +51,54 @@ void RTCLossDetectionAndRecovery::turnOffRTX() {
clear();
}
+uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) {
+ uint32_t current_fec = indexer_->getNFec();
+ double current_loss_rate = state_->getLossRate();
+ double last_loss_rate = state_->getLastRoundLossRate();
+
+ // when in sync ask for fec only if there are losses for 2 rounds
+ if (in_sync && current_fec == 0 &&
+ (current_loss_rate == 0 || last_loss_rate == 0))
+ return 0;
+
+ double loss_rate = state_->getMaxLossRate() * 1.5;
+
+ if (!in_sync && loss_rate == 0) loss_rate = 0.05;
+ if (loss_rate > 0.5) loss_rate = 0.5;
+
+ double exp_losses = (double)k_ * loss_rate;
+ uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate));
+
+ if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_;
+
+ return fec_to_ask;
+}
+
+void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
+ uint64_t rtt = state_->getRTT();
+ if (!fec_on_ && rtt >= 100) {
+ // turn on fec, here we may have no info so ask for all packets
+ fec_on_ = true;
+ turnOffRTX();
+ indexer_->setNFec(computeFecPacketsToAsk(in_sync));
+ return;
+ }
+
+ if (fec_on_ && rtt > 80) {
+ // keep using fec, maybe update it
+ indexer_->setNFec(computeFecPacketsToAsk(in_sync));
+ return;
+ }
+
+ if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) {
+ // turn on rtx
+ fec_on_ = false;
+ indexer_->setNFec(0);
+ turnOnRTX();
+ return;
+ }
+}
+
void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
// always add timeouts to the RTX list to avoid to send the same packet as if
// it was not a rtx
@@ -55,17 +106,23 @@ void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
last_event_ = getNow();
}
+void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
+ // if an RTX is scheduled for a packet recovered using FEC delete it
+ deleteRtx(seq);
+ recover_with_fec_.erase(seq);
+}
+
void RTCLossDetectionAndRecovery::onDataPacketReceived(
const core::ContentObject &content_object) {
last_event_ = getNow();
uint32_t seq = content_object.getName().getSuffix();
if (deleteRtx(seq)) {
- state_->onPacketRecovered(seq);
+ state_->onPacketRecoveredRtx(seq);
} else {
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
- TRANSPORT_LOGD("received data. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received data. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
}
}
@@ -76,8 +133,6 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
uint32_t seq = nack.getName().getSuffix();
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
-
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
uint32_t production_seq = nack_pkt->getProductionSegement();
@@ -91,8 +146,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
// productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
// 14 that are not arrived yet
deleteRtx(seq);
- TRANSPORT_LOGD("received past nack. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1
+ << " to " << production_seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
} else {
@@ -105,8 +161,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
// with productionSeq = 18. this says that all the packets between 12 and 18
// may got lost and we should ask them
deleteRtx(seq);
- TRANSPORT_LOGD("received futrue nack. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1
+ << " to " << production_seq;
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
}
@@ -117,12 +174,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived(
// we don't log the reception of a probe packet for the sentinel timer because
// probes are not taken into account into the sync window. we use them as
// future nacks to detect possible packets lost
- if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
struct nack_packet_t *probe_pkt =
(struct nack_packet_t *)probe.getPayload()->data();
uint32_t production_seq = probe_pkt->getProductionSegement();
- TRANSPORT_LOGD("received probe. add from %u to %u ",
- state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "received probe. add from "
+ << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq;
+
addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
production_seq);
}
@@ -150,20 +208,41 @@ void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
}
for (uint32_t seq = start; seq < stop; seq++) {
- if (!isRtx(seq) && // is not already an rtx
- // is not received or lost
- state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
- // add rtx
- rtxState state;
- state.first_send_ = state_->getInterestSentTime(seq);
- if (state.first_send_ == 0) // this interest was never sent before
- state.first_send_ = getNow();
- state.next_send_ = computeNextSend(seq, true);
- state.rtx_count_ = 0;
- TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq,
- (state.next_send_ - getNow()));
- rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
- rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
+ if (rtx_on_) {
+ if (!indexer_->isFec(seq)) {
+ // handle it with rtx
+ if (!isRtx(seq)) {
+ state_->onLossDetected(seq);
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Add " << seq << " to retransmissions. next rtx is %lu "
+ << state.next_send_ - getNow();
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ }
+ } else {
+ // is fec, do not send it
+ auto it = recover_with_fec_.find(seq);
+ if (it == recover_with_fec_.end()) {
+ state_->onLossDetected(seq);
+ recover_with_fec_.insert(seq);
+ }
+ }
+ } else {
+ // keep track of losses but recover with FEC
+ auto it = recover_with_fec_.find(seq);
+ if (it == recover_with_fec_.end()) {
+ state_->onLossDetected(seq);
+ recover_with_fec_.insert(seq);
+ }
+ }
}
}
scheduleNextRtx();
@@ -182,13 +261,15 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
if (prod_rate != 0) {
double packet_size = state_->getAveragePacketSize();
- estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
- jitter = (uint32_t)ceil(state_->getJitter());
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
}
uint32_t wait = estimated_iat + jitter;
- TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u",
- seq, wait, state_->getRTT(), estimated_iat, jitter);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "first rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter;
return now + wait;
} else {
@@ -202,25 +283,26 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
}
double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
uint64_t rtt = state_->getRTT();
if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
- wait = (uint32_t)rtt;
+ wait = rtt;
if (estimated_iat > rtt) wait = estimated_iat;
- uint32_t jitter = (uint32_t)ceil(state_->getJitter());
+ uint32_t jitter = ceil(state_->getJitter());
wait += jitter;
// it may happen that the channel is congested and we have some additional
// queuing delay to take into account
- uint32_t queue = (uint32_t)ceil(state_->getQueuing());
+ uint32_t queue = ceil(state_->getQueuing());
wait += queue;
- TRANSPORT_LOGD(
- "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u",
- seq, wait, state_->getRTT(), estimated_iat, jitter, queue);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next rtx for " << seq << " in " << wait
+ << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat
+ << " jttr = " << jitter << " queue = " << queue;
return now + wait;
}
@@ -235,7 +317,7 @@ void RTCLossDetectionAndRecovery::retransmit() {
std::unordered_set<uint32_t> lost_pkt;
uint32_t sent_counter = 0;
while (it != rtx_timers_.end() && it->first <= now &&
- sent_counter < MAX_INTERESTS_IN_BATCH) {
+ sent_counter < MAX_RTX_IN_BATCH) {
uint32_t seq = it->second;
auto rtx_it =
rtx_state_.find(seq); // this should always return a valid iter
@@ -243,11 +325,11 @@ void RTCLossDetectionAndRecovery::retransmit() {
(now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
seq < state_->getLastSeqNacked()) {
// max rtx reached or packet too old or packet nacked, this packet is lost
- TRANSPORT_LOGD(
- "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u",
- seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX),
- ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE),
- (seq < state_->getLastSeqNacked()));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "packet " << seq << " lost because 1) max rtx: "
+ << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: "
+ << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE)
+ << " 3) nacked: " << (seq < state_->getLastSeqNacked());
lost_pkt.insert(seq);
it++;
} else {
@@ -259,8 +341,9 @@ void RTCLossDetectionAndRecovery::retransmit() {
it = rtx_timers_.erase(it);
rtx_timers_.insert(
std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
- TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq,
- (rtx_it->second.next_send_ - now));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "send rtx for sequence " << seq << ", next send in "
+ << (rtx_it->second.next_send_ - now);
send_rtx_callback_(seq);
sent_counter++;
}
@@ -358,20 +441,21 @@ void RTCLossDetectionAndRecovery::sentinelTimer() {
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
// this happens at the beginning (or if the producer stops for some
// reason) we need to keep sending interest 0 until we get an answer
- TRANSPORT_LOGD(
- "sentinel timer: the producer is not active, send packet 0");
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "sentinel timer: the producer is not active, send packet 0";
state_->onRetransmission(0);
send_rtx_callback_(0);
} else {
- TRANSPORT_LOGD(
- "sentinel timer: the producer is active, send the 10 oldest packets");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, "
+ "send the 10 oldest packets";
sent = true;
uint32_t rtx = 0;
auto it = state_->getPendingInterestsMapBegin();
auto end = state_->getPendingInterestsMapEnd();
while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
uint32_t seq = it->first;
- TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "sentinel timer, add " << seq << " to the rtx list";
addToRetransmissions(seq, seq + 1);
rtx++;
it++;
@@ -384,36 +468,38 @@ void RTCLossDetectionAndRecovery::sentinelTimer() {
uint32_t next_timer;
double prod_rate = state_->getProducerRate();
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
- TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL;
next_timer = SENTINEL_TIMER_INTERVAL;
} else {
double prod_rate = state_->getProducerRate();
double packet_size = state_->getAveragePacketSize();
- uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size));
- uint32_t jitter = (uint32_t)ceil(state_->getJitter());
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t jitter = ceil(state_->getJitter());
// try to reduce the number of timers if the estimated IAT is too small
next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
- TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u",
- next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat,
- jitter);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "next sentinel in " << next_timer
+ << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0)
+ << ", iat: " << estimated_iat << ", jitter: " << jitter;
if (!expired) {
// discount the amout of time that is already passed
- uint32_t discount = (uint32_t)(now - last_event_);
+ uint32_t discount = now - last_event_;
if (next_timer > discount) {
next_timer = next_timer - discount;
} else {
// in this case we trigger the timer in 1 ms
next_timer = 1;
}
- TRANSPORT_LOGD("timer after discout: %u", next_timer);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer;
} else if (sent) {
// wait at least one producer stats interval + owd to check if the
// production rate is reducing.
- uint32_t min_wait = PRODUCER_STATS_INTERVAL + (uint32_t)ceil(state_->getQueuing());
+ uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
next_timer = std::max(next_timer, min_wait);
- TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "wait for updates from prod, next timer: " << next_timer;
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index c0912303b..1b9f9afd6 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -15,15 +15,16 @@
#pragma once
#include <hicn/transport/config.h>
+#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
+#include <protocols/indexer.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_state.h>
-#include <asio.hpp>
-#include <asio/steady_timer.hpp>
#include <functional>
#include <map>
+#include <unordered_map>
namespace transport {
@@ -43,16 +44,23 @@ class RTCLossDetectionAndRecovery
using SendRtxCallback = std::function<void(uint32_t)>;
public:
- RTCLossDetectionAndRecovery(SendRtxCallback &&callback,
+ RTCLossDetectionAndRecovery(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service);
~RTCLossDetectionAndRecovery();
void setState(std::shared_ptr<RTCState> state) { state_ = state; }
+ void setFecParams(uint32_t n, uint32_t k) {
+ n_ = n;
+ k_ = k;
+ }
void turnOnRTX();
void turnOffRTX();
+ bool isRtxOn() { return rtx_on_; }
+ void onNewRound(bool in_sync);
void onTimeout(uint32_t seq);
+ void onPacketRecoveredFec(uint32_t seq);
void onDataPacketReceived(const core::ContentObject &content_object);
void onNackPacketReceived(const core::ContentObject &nack);
void onProbePacketReceived(const core::ContentObject &probe);
@@ -72,6 +80,7 @@ class RTCLossDetectionAndRecovery
bool deleteRtx(uint32_t seq);
void scheduleSentinelTimer(uint64_t expires_from_now);
void sentinelTimer();
+ uint32_t computeFecPacketsToAsk(bool in_sync);
uint64_t getNow() {
using namespace std::chrono;
@@ -90,14 +99,25 @@ class RTCLossDetectionAndRecovery
// should be sent, and the val is the interest seq number
std::multimap<uint64_t, uint32_t> rtx_timers_;
+ // lost packets that will be recovered with fec
+ std::unordered_set<uint32_t> recover_with_fec_;
+
bool rtx_on_;
+ bool fec_on_;
uint64_t next_rtx_timer_;
uint64_t last_event_;
uint64_t sentinel_timer_interval_;
+
+ // fec params
+ uint32_t n_;
+ uint32_t k_;
+
std::unique_ptr<asio::steady_timer> timer_;
std::unique_ptr<asio::steady_timer> sentinel_timer_;
std::shared_ptr<RTCState> state_;
+ Indexer *indexer_;
+
SendRtxCallback send_rtx_callback_;
};
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
index 2f2b19fb9..7dc2f82c3 100644
--- a/libtransport/src/protocols/rtc/rtc_packet.h
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -90,4 +90,4 @@ struct nack_packet_t {
} // end namespace protocol
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
deleted file mode 100644
index b577b5bea..000000000
--- a/libtransport/src/protocols/rtc/rtc_rc_frame.cc
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <protocols/rtc/rtc_consts.h>
-#include <protocols/rtc/rtc_rc_frame.h>
-
-#include <algorithm>
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {}
-
-RTCRateControlFrame::~RTCRateControlFrame() {}
-
-void RTCRateControlFrame::onNewRound(double round_len) {
- if (!rc_on_) return;
-
- CongestionState prev_congestion_state = congestion_state_;
- cc_detector_.updateStats();
- congestion_state_ = (CongestionState)cc_detector_.getState();
-
- if (congestion_state_ == CongestionState::Congested) {
- if (prev_congestion_state == CongestionState::Normal) {
- // congestion detected, notify app and init congestion win
- double prod_rate = protocol_state_->getReceivedRate();
- double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
- double packet_size = protocol_state_->getAveragePacketSize();
-
- if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) {
- // TODO do something
- return;
- }
-
- congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- }
- uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
- congestion_win_ = std::max(win, WIN_MIN);
- return;
- }
-}
-
-void RTCRateControlFrame::onDataPacketReceived(
- const core::ContentObject &content_object) {
- if (!rc_on_) return;
-
- uint32_t seq = content_object.getName().getSuffix();
- if (!protocol_state_->isPending(seq)) return;
-
- cc_detector_.addPacket(content_object);
-}
-
-void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts,
- uint64_t last_probe_ts,
- uint32_t total_probes) {
- // TODO
- return;
-}
-
-} // end namespace rtc
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h
deleted file mode 100644
index 25d5ddbb6..000000000
--- a/libtransport/src/protocols/rtc/rtc_rc_frame.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <protocols/rtc/congestion_detection.h>
-#include <protocols/rtc/rtc_rc.h>
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-class RTCRateControlFrame : public RTCRateControl {
- public:
- RTCRateControlFrame();
-
- ~RTCRateControlFrame();
-
- void onNewRound(double round_len);
- void onDataPacketReceived(const core::ContentObject &content_object);
-
- void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts,
- uint32_t total_probes);
-
- private:
- CongestionDetection cc_detector_;
-};
-
-} // end namespace rtc
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
index 3c7318dae..a1c89e329 100644
--- a/libtransport/src/protocols/rtc/rtc_rc_queue.cc
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -67,11 +67,11 @@ void RTCRateControlQueue::onNewRound(double round_len) {
if (prev_congestion_state == CongestionState::Normal) {
// init the congetion window using the received rate
congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
- rounds_since_last_drop_ = (uint32_t)ROUNDS_BEFORE_TAKE_ACTION + 1;
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
}
if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
- uint32_t win = congestion_win_ * (uint32_t)WIN_DECREASE_FACTOR;
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
congestion_win_ = std::max(win, WIN_MIN);
rounds_since_last_drop_ = 0;
return;
@@ -88,7 +88,7 @@ void RTCRateControlQueue::onNewRound(double round_len) {
rounds_without_congestion_++;
if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
- congestion_win_ = congestion_win_ * (uint32_t)WIN_INCREASE_FACTOR;
+ congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h
new file mode 100644
index 000000000..15722a6d5
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_reassembly.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RtcReassembly : public DatagramReassembly {
+ public:
+ RtcReassembly(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : DatagramReassembly(icn_socket, transport_protocol) {}
+
+ void reassemble(core::ContentObject &content_object) override {
+ auto read_buffer = content_object.getPayload();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Size of payload: " << read_buffer->length();
+ read_buffer->trimStart(transport_protocol_->transportHeaderLength());
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+ }
+};
+
+} // namespace rtc
+} // namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 9c965bfed..c99205a26 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <protocols/rtc/rtc_consts.h>
#include <protocols/rtc/rtc_state.h>
@@ -22,11 +23,13 @@ namespace protocol {
namespace rtc {
-RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+RTCState::RTCState(Indexer *indexer,
+ ProbeHandler::SendProbeCallback &&rtt_probes_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service)
- : rtt_probes_(std::make_shared<ProbeHandler>(
- std::move(rtt_probes_callback), io_service)),
+ : indexer_(indexer),
+ rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback),
+ io_service)),
discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
initParams();
@@ -45,14 +48,22 @@ void RTCState::initParams() {
// loss counters
packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
first_seq_in_round_ = 0;
highest_seq_received_ = 0;
highest_seq_received_in_order_ = 0;
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
+ avg_loss_rate_ = 0.0;
+ max_loss_rate_ = 0.0;
+ last_round_loss_rate_ = 0.0;
residual_loss_rate_ = 0.0;
+ // fec counters
+ pending_fec_pkt_ = 0;
+ received_fec_pkt_ = 0;
+
// bw counters
received_bytes_ = 0;
avg_packet_size_ = INIT_PACKET_SIZE;
@@ -90,8 +101,14 @@ void RTCState::initParams() {
// pending interests
pending_interests_.clear();
+ // skipped interest
+ last_interest_sent_ = 0;
+ skipped_interests_.clear();
+
// init rtt
- first_interest_sent_ = ~0;
+ first_interest_sent_time_ = ~0;
+ first_interest_sent_seq_ = 0;
+
init_rtt_ = false;
rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
rtt_probes_->sendProbes();
@@ -106,18 +123,51 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) {
uint32_t seq = interest_name->getSuffix();
pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
- if(sent_interests_ == 0) first_interest_sent_ = now;
+ if (sent_interests_ == 0) {
+ first_interest_sent_time_ = now;
+ first_interest_sent_seq_ = seq;
+ }
+
+ if (indexer_->isFec(seq)) {
+ pending_fec_pkt_++;
+ }
+
+ if (last_interest_sent_ == 0 && seq != 0) {
+ last_interest_sent_ = seq; // init last interest sent
+ }
+
+ // TODO what happen in case of jumps?
+ // look for skipped interests
+ skipped_interests_.erase(seq); // remove seq if it is there
+ for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) {
+ if (indexer_->isFec(i)) {
+ skipped_interests_.insert(i);
+ }
+ }
+
+ last_interest_sent_ = seq;
sent_interests_++;
sent_interests_last_round_++;
}
-void RTCState::onTimeout(uint32_t seq) {
+void RTCState::onTimeout(uint32_t seq, bool lost) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
}
received_timeouts_++;
+
+ if (lost) onPacketLost(seq);
+}
+
+void RTCState::onLossDetected(uint32_t seq) {
+ if (!indexer_->isFec(seq)) {
+ packets_lost_++;
+ } else if (skipped_interests_.find(seq) == skipped_interests_.end() &&
+ seq >= first_interest_sent_seq_) {
+ packets_lost_++;
+ }
}
void RTCState::onRetransmission(uint32_t seq) {
@@ -128,7 +178,9 @@ void RTCState::onRetransmission(uint32_t seq) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
pending_interests_.erase(it);
+#if 0
packets_lost_++;
+#endif
}
sent_rtx_++;
sent_rtx_last_round_++;
@@ -165,6 +217,16 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
received_packets_last_round_++;
}
+void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
+ uint32_t seq = content_object.getName().getSuffix();
+ updateReceivedBytes(content_object);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+ received_fec_pkt_++;
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+}
+
void RTCState::onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats) {
uint32_t seq = nack.getName().getSuffix();
@@ -197,12 +259,14 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
// old nack, seq is lost
// update last nacked
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
- TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "lost packet " << seq << " beacuse of a past nack";
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
// remove the nack from the pending interest map
// (the packet is not received/lost yet)
+ if (indexer_->isFec(seq)) pending_fec_pkt_--;
pending_interests_.erase(seq);
} else {
// this should be a quite rear event. simply remove the
@@ -221,17 +285,28 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
}
void RTCState::onPacketLost(uint32_t seq) {
- TRANSPORT_LOGD("packet %u is lost", seq);
+#if 0
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
// this packet was never retransmitted so it does
// not appear in the loss count
packets_lost_++;
}
+#endif
+ if (!indexer_->isFec(seq)) {
+ definitely_lost_pkt_++;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
+ }
addRecvOrLost(seq, PacketState::LOST);
}
-void RTCState::onPacketRecovered(uint32_t seq) {
+void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+ losses_recovered_++;
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+void RTCState::onPacketRecoveredFec(uint32_t seq) {
losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
}
@@ -258,7 +333,6 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t production_seq = probe_pkt->getProductionSegement();
uint32_t production_rate = probe_pkt->getProductionRate();
-
if (path_it == path_table_.end()) {
// found a new path
std::shared_ptr<RTCDataPath> newPath =
@@ -298,13 +372,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
// wait forever
received_probes_++;
- if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){
- if(received_probes_ == 1){
- // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others
+ if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) {
+ if (received_probes_ == 1) {
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the
+ // others
main_path_ = path;
setInitRttTimer(INIT_RTT_PROBE_WAIT);
}
- if(received_probes_ == INIT_RTT_PROBES) {
+ if (received_probes_ == INIT_RTT_PROBES) {
// we are done
init_rtt_timer_->cancel();
checkInitRttTimer();
@@ -314,7 +389,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
received_packets_last_round_++;
// ignore probes sent before the first interest
- if((now - rtt) <= first_interest_sent_) return false;
+ if ((now - rtt) <= first_interest_sent_time_) return false;
return true;
}
@@ -327,11 +402,11 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
double bytes_per_sec =
((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
- if(received_rate_ == 0)
+ if (received_rate_ == 0)
received_rate_ = bytes_per_sec;
else
received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
- ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
// search for an active path. There should be only one active path (meaning a
// path that leads to the producer socket -no cache- and from which we are
@@ -354,7 +429,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- if (in_sync) updateLossRate();
+ // if (in_sync) updateLossRate();
+ updateLossRate();
// handle nacks
if (!nack_on_last_round_ && received_bytes_ > 0) {
@@ -385,6 +461,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// reset counters
received_bytes_ = 0;
packets_lost_ = 0;
+ definitely_lost_pkt_ = 0;
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
@@ -397,6 +474,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
+ received_fec_pkt_ = 0;
+
rounds_++;
}
@@ -465,6 +544,7 @@ void RTCState::updatePathStats(const core::ContentObject &content_object,
}
void RTCState::updateLossRate() {
+ last_round_loss_rate_ = loss_rate_;
loss_rate_ = 0.0;
residual_loss_rate_ = 0.0;
@@ -475,9 +555,29 @@ void RTCState::updateLossRate() {
// division by 0
if (number_theorically_received_packets_ == 0) return;
+ // XXX this may be quite inefficient if the rate is high
+ // maybe is better to iterate over the set?
+ for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) {
+ auto it = skipped_interests_.find(i);
+ if (it != skipped_interests_.end()) {
+ if (number_theorically_received_packets_ > 0)
+ number_theorically_received_packets_--;
+ skipped_interests_.erase(it);
+ }
+ }
+
loss_rate_ = (double)((double)(packets_lost_) /
(double)number_theorically_received_packets_);
+ if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec
+ if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_;
+
+ if (avg_loss_rate_ == 0)
+ avg_loss_rate_ = loss_rate_;
+ else
+ avg_loss_rate_ =
+ avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA);
+
residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
(double)number_theorically_received_packets_);
@@ -485,6 +585,10 @@ void RTCState::updateLossRate() {
}
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ if (indexer_->isFec(seq)) {
+ pending_fec_pkt_--;
+ }
+
pending_interests_.erase(seq);
if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
received_or_lost_packets_.erase(received_or_lost_packets_.begin());
@@ -507,10 +611,12 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
// 1) there is a gap in the sequence so we do not update largest_in_seq_
// 2) all the packets from largest_in_seq_ to seq are in
// received_or_lost_packets_ an we upate largest_in_seq_
+ // or are FEC packets
for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
if (received_or_lost_packets_.find(i) ==
- received_or_lost_packets_.end()) {
+ received_or_lost_packets_.end() &&
+ !indexer_->isFec(i)) {
break;
}
// this packet is in order so we can update the
@@ -520,17 +626,17 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
}
}
-void RTCState::setInitRttTimer(uint32_t wait){
+void RTCState::setInitRttTimer(uint32_t wait) {
init_rtt_timer_->cancel();
init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
init_rtt_timer_->async_wait([this](std::error_code ec) {
- if(ec) return;
+ if (ec) return;
checkInitRttTimer();
});
}
void RTCState::checkInitRttTimer() {
- if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
// we didn't received enough probes, restart
received_probes_ = 0;
rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
@@ -547,7 +653,7 @@ void RTCState::checkInitRttTimer() {
double prod_rate = getProducerRate();
double rtt = (double)getRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = (uint32_t)std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index e4fefaffe..729ba7a1b 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -15,13 +15,13 @@
#pragma once
#include <hicn/transport/config.h>
+#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
+#include <protocols/indexer.h>
#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_data_path.h>
-#include <asio.hpp>
-#include <asio/steady_timer.hpp>
#include <map>
#include <set>
@@ -36,8 +36,10 @@ enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
class RTCState : std::enable_shared_from_this<RTCState> {
public:
using DiscoveredRttCallback = std::function<void()>;
+
public:
- RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ RTCState(Indexer *indexer,
+ ProbeHandler::SendProbeCallback &&rtt_probes_callback,
DiscoveredRttCallback &&discovered_rtt_callback,
asio::io_service &io_service);
@@ -45,14 +47,17 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// packet events
void onSendNewInterest(const core::Name *interest_name);
- void onTimeout(uint32_t seq);
+ void onTimeout(uint32_t seq, bool lost);
+ void onLossDetected(uint32_t seq);
void onRetransmission(uint32_t seq);
void onDataPacketReceived(const core::ContentObject &content_object,
bool compute_stats);
+ void onFecPacketReceived(const core::ContentObject &content_object);
void onNackPacketReceived(const core::ContentObject &nack,
bool compute_stats);
void onPacketLost(uint32_t seq);
- void onPacketRecovered(uint32_t seq);
+ void onPacketRecoveredRtx(uint32_t seq);
+ void onPacketRecoveredFec(uint32_t seq);
bool onProbePacketReceived(const core::ContentObject &probe);
// protocol state
@@ -65,9 +70,7 @@ class RTCState : std::enable_shared_from_this<RTCState> {
}
// delay metrics
- bool isRttDiscovered() const {
- return init_rtt_;
- }
+ bool isRttDiscovered() const { return init_rtt_; }
uint64_t getRTT() const {
if (mainPathIsValid()) return main_path_->getMinRtt();
@@ -97,13 +100,16 @@ class RTCState : std::enable_shared_from_this<RTCState> {
if (it != pending_interests_.end()) return it->second;
return 0;
}
+
bool isPending(uint32_t seq) {
if (pending_interests_.find(seq) != pending_interests_.end()) return true;
return false;
}
+
uint32_t getPendingInterestNumber() const {
- return (uint32_t)pending_interests_.size();
+ return pending_interests_.size();
}
+
PacketState isReceivedOrLost(uint32_t seq) {
auto it = received_or_lost_packets_.find(seq);
if (it != received_or_lost_packets_.end()) return it->second;
@@ -112,12 +118,25 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// loss rate
double getLossRate() const { return loss_rate_; }
+ double getAvgLossRate() const { return avg_loss_rate_; }
+ double getMaxLossRate() const { return max_loss_rate_; }
+ double getLastRoundLossRate() const { return last_round_loss_rate_; }
double getResidualLossRate() const { return residual_loss_rate_; }
+
+ uint32_t getLostData() const { return packets_lost_; };
+ uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ uint32_t getDefinitelyLostPackets() const { return definitely_lost_pkt_; }
+
+ uint32_t getHighestSeqReceived() const { return highest_seq_received_; }
+
uint32_t getHighestSeqReceivedInOrder() const {
return highest_seq_received_in_order_;
}
- uint32_t getLostData() const { return packets_lost_; };
- uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ // fec packets
+ uint32_t getReceivedFecPackets() const { return received_fec_pkt_; }
+ uint32_t getPendingFecPackets() const { return pending_fec_pkt_; }
// generic stats
uint32_t getReceivedBytesInRound() const { return received_bytes_; }
@@ -183,11 +202,15 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// loss counters
int32_t packets_lost_;
int32_t losses_recovered_;
+ uint32_t definitely_lost_pkt_;
uint32_t first_seq_in_round_;
uint32_t highest_seq_received_;
uint32_t highest_seq_received_in_order_;
uint32_t last_seq_nacked_; // segment for which we got an oldNack
double loss_rate_;
+ double avg_loss_rate_;
+ double max_loss_rate_;
+ double last_round_loss_rate_;
double residual_loss_rate_;
// bw counters
@@ -211,18 +234,24 @@ class RTCState : std::enable_shared_from_this<RTCState> {
uint32_t sent_interests_last_round_;
uint32_t sent_rtx_last_round_;
+ // fec counter
+ uint32_t received_fec_pkt_;
+ uint32_t pending_fec_pkt_;
+
// round conunters
uint32_t rounds_;
uint32_t rounds_without_nacks_;
uint32_t rounds_without_packets_;
// init rtt
- uint64_t first_interest_sent_;
+ uint64_t first_interest_sent_time_;
+ uint32_t first_interest_sent_seq_;
// producer state
bool
producer_is_active_; // the prodcuer is active if we receive some packets
- uint32_t last_production_seq_; // last production seq received by the producer
+ uint32_t
+ last_production_seq_; // last production seq received by the producer
uint64_t last_prod_update_; // timestamp of the last packets used to update
// stats from the producer
@@ -237,6 +266,13 @@ class RTCState : std::enable_shared_from_this<RTCState> {
// pending interests
std::map<uint32_t, uint64_t> pending_interests_;
+ // indexer
+ Indexer *indexer_;
+
+ // skipped interests
+ uint32_t last_interest_sent_;
+ std::unordered_set<uint32_t> skipped_interests_;
+
// probes
std::shared_ptr<ProbeHandler> rtt_probes_;
bool init_rtt_;
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc
deleted file mode 100644
index 7a0803857..000000000
--- a/libtransport/src/protocols/rtc/trendline_estimator.cc
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-// FROM
-// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc
-
-#include "trendline_estimator.h"
-
-#include <math.h>
-
-#include <algorithm>
-#include <string>
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-// Parameters for linear least squares fit of regression line to noisy data.
-constexpr double kDefaultTrendlineSmoothingCoeff = 0.9;
-constexpr double kDefaultTrendlineThresholdGain = 4.0;
-// const char kBweWindowSizeInPacketsExperiment[] =
-// "WebRTC-BweWindowSizeInPackets";
-
-/*size_t ReadTrendlineFilterWindowSize(
- const WebRtcKeyValueConfig* key_value_config) {
- std::string experiment_string =
- key_value_config->Lookup(kBweWindowSizeInPacketsExperiment);
- size_t window_size;
- int parsed_values =
- sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size);
- if (parsed_values == 1) {
- if (window_size > 1)
- return window_size;
- RTC_LOG(WARNING) << "Window size must be greater than 1.";
- }
- RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets"
- " experiment from field trial string. Using default.";
- return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize;
-}
-*/
-
-OptionalDouble LinearFitSlope(
- const std::deque<TrendlineEstimator::PacketTiming>& packets) {
- // RTC_DCHECK(packets.size() >= 2);
- // Compute the "center of mass".
- double sum_x = 0;
- double sum_y = 0;
- for (const auto& packet : packets) {
- sum_x += packet.arrival_time_ms;
- sum_y += packet.smoothed_delay_ms;
- }
- double x_avg = sum_x / packets.size();
- double y_avg = sum_y / packets.size();
- // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2
- double numerator = 0;
- double denominator = 0;
- for (const auto& packet : packets) {
- double x = packet.arrival_time_ms;
- double y = packet.smoothed_delay_ms;
- numerator += (x - x_avg) * (y - y_avg);
- denominator += (x - x_avg) * (x - x_avg);
- }
- if (denominator == 0) return OptionalDouble();
- return OptionalDouble(numerator / denominator);
-}
-
-OptionalDouble ComputeSlopeCap(
- const std::deque<TrendlineEstimator::PacketTiming>& packets,
- const TrendlineEstimatorSettings& settings) {
- /*RTC_DCHECK(1 <= settings.beginning_packets &&
- settings.beginning_packets < packets.size());
- RTC_DCHECK(1 <= settings.end_packets &&
- settings.end_packets < packets.size());
- RTC_DCHECK(settings.beginning_packets + settings.end_packets <=
- packets.size());*/
- TrendlineEstimator::PacketTiming early = packets[0];
- for (size_t i = 1; i < settings.beginning_packets; ++i) {
- if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i];
- }
- size_t late_start = packets.size() - settings.end_packets;
- TrendlineEstimator::PacketTiming late = packets[late_start];
- for (size_t i = late_start + 1; i < packets.size(); ++i) {
- if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i];
- }
- if (late.arrival_time_ms - early.arrival_time_ms < 1) {
- return OptionalDouble();
- }
- return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) /
- (late.arrival_time_ms - early.arrival_time_ms) +
- settings.cap_uncertainty);
-}
-
-constexpr double kMaxAdaptOffsetMs = 15.0;
-constexpr double kOverUsingTimeThreshold = 10;
-constexpr int kMinNumDeltas = 60;
-constexpr int kDeltaCounterMax = 1000;
-
-//} // namespace
-
-constexpr char TrendlineEstimatorSettings::kKey[];
-
-TrendlineEstimatorSettings::TrendlineEstimatorSettings(
- /*const WebRtcKeyValueConfig* key_value_config*/) {
- /*if (absl::StartsWith(
- key_value_config->Lookup(kBweWindowSizeInPacketsExperiment),
- "Enabled")) {
- window_size = ReadTrendlineFilterWindowSize(key_value_config);
- }
- Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/
- window_size = kDefaultTrendlineWindowSize;
- enable_cap = false;
- beginning_packets = end_packets = 0;
- cap_uncertainty = 0.0;
-
- /*if (window_size < 10 || 200 < window_size) {
- RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets";
- window_size = kDefaultTrendlineWindowSize;
- }
- if (enable_cap) {
- if (beginning_packets < 1 || end_packets < 1 ||
- beginning_packets > window_size || end_packets > window_size) {
- RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and "
- << window_size;
- enable_cap = false;
- beginning_packets = end_packets = 0;
- cap_uncertainty = 0.0;
- }
- if (beginning_packets + end_packets > window_size) {
- RTC_LOG(LS_WARNING)
- << "Size of beginning plus end can't exceed the window size";
- enable_cap = false;
- beginning_packets = end_packets = 0;
- cap_uncertainty = 0.0;
- }
- if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) {
- RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025";
- cap_uncertainty = 0.0;
- }
- }*/
-}
-
-/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() {
- return StructParametersParser::Create("sort", &enable_sort, //
- "cap", &enable_cap, //
- "beginning_packets",
- &beginning_packets, //
- "end_packets", &end_packets, //
- "cap_uncertainty", &cap_uncertainty, //
- "window_size", &window_size);
-}*/
-
-TrendlineEstimator::TrendlineEstimator(
- /*const WebRtcKeyValueConfig* key_value_config,
- NetworkStatePredictor* network_state_predictor*/)
- : settings_(),
- smoothing_coef_(kDefaultTrendlineSmoothingCoeff),
- threshold_gain_(kDefaultTrendlineThresholdGain),
- num_of_deltas_(0),
- first_arrival_time_ms_(-1),
- accumulated_delay_(0),
- smoothed_delay_(0),
- delay_hist_(),
- k_up_(0.0087),
- k_down_(0.039),
- overusing_time_threshold_(kOverUsingTimeThreshold),
- threshold_(12.5),
- prev_modified_trend_(NAN),
- last_update_ms_(-1),
- prev_trend_(0.0),
- time_over_using_(-1),
- overuse_counter_(0),
- hypothesis_(BandwidthUsage::kBwNormal){
- // hypothesis_predicted_(BandwidthUsage::kBwNormal){//},
- // network_state_predictor_(network_state_predictor) {
- /* RTC_LOG(LS_INFO)
- << "Using Trendline filter for delay change estimation with settings "
- << settings_.Parser()->Encode() << " and "
- // << (network_state_predictor_ ? "injected" : "no")
- << " network state predictor";*/
-}
-
-TrendlineEstimator::~TrendlineEstimator() {}
-
-void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
- double send_delta_ms,
- int64_t send_time_ms,
- int64_t arrival_time_ms,
- size_t packet_size) {
- const double delta_ms = recv_delta_ms - send_delta_ms;
- ++num_of_deltas_;
- num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
- if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms;
-
- // Exponential backoff filter.
- accumulated_delay_ += delta_ms;
- // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
- // accumulated_delay_);
- smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
- (1 - smoothing_coef_) * accumulated_delay_;
- // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
- // smoothed_delay_);
-
- // Maintain packet window
- delay_hist_.emplace_back(
- static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
- smoothed_delay_, accumulated_delay_);
- if (settings_.enable_sort) {
- for (size_t i = delay_hist_.size() - 1;
- i > 0 &&
- delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
- --i) {
- std::swap(delay_hist_[i], delay_hist_[i - 1]);
- }
- }
- if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front();
-
- // Simple linear regression.
- double trend = prev_trend_;
- if (delay_hist_.size() == settings_.window_size) {
- // Update trend_ if it is possible to fit a line to the data. The delay
- // trend can be seen as an estimate of (send_rate - capacity)/capacity.
- // 0 < trend < 1 -> the delay increases, queues are filling up
- // trend == 0 -> the delay does not change
- // trend < 0 -> the delay decreases, queues are being emptied
- OptionalDouble trendO = LinearFitSlope(delay_hist_);
- if (trendO.has_value()) trend = trendO.value();
- if (settings_.enable_cap) {
- OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_);
- // We only use the cap to filter out overuse detections, not
- // to detect additional underuses.
- if (trend >= 0 && cap.has_value() && trend > cap.value()) {
- trend = cap.value();
- }
- }
- }
- // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
-
- Detect(trend, send_delta_ms, arrival_time_ms);
-}
-
-void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms,
- int64_t send_time_ms, int64_t arrival_time_ms,
- size_t packet_size, bool calculated_deltas) {
- if (calculated_deltas) {
- UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
- packet_size);
- }
- /*if (network_state_predictor_) {
- hypothesis_predicted_ = network_state_predictor_->Update(
- send_time_ms, arrival_time_ms, hypothesis_);
- }*/
-}
-
-BandwidthUsage TrendlineEstimator::State() const {
- return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_;
-}
-
-void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
- /*if (num_of_deltas_ < 2) {
- hypothesis_ = BandwidthUsage::kBwNormal;
- return;
- }*/
-
- const double modified_trend =
- std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
- prev_modified_trend_ = modified_trend;
- // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
- // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
- if (modified_trend > threshold_) {
- if (time_over_using_ == -1) {
- // Initialize the timer. Assume that we've been
- // over-using half of the time since the previous
- // sample.
- time_over_using_ = ts_delta / 2;
- } else {
- // Increment timer
- time_over_using_ += ts_delta;
- }
- overuse_counter_++;
- if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
- if (trend >= prev_trend_) {
- time_over_using_ = 0;
- overuse_counter_ = 0;
- hypothesis_ = BandwidthUsage::kBwOverusing;
- }
- }
- } else if (modified_trend < -threshold_) {
- time_over_using_ = -1;
- overuse_counter_ = 0;
- hypothesis_ = BandwidthUsage::kBwUnderusing;
- } else {
- time_over_using_ = -1;
- overuse_counter_ = 0;
- hypothesis_ = BandwidthUsage::kBwNormal;
- }
- prev_trend_ = trend;
- UpdateThreshold(modified_trend, now_ms);
-}
-
-void TrendlineEstimator::UpdateThreshold(double modified_trend,
- int64_t now_ms) {
- if (last_update_ms_ == -1) last_update_ms_ = now_ms;
-
- if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
- // Avoid adapting the threshold to big latency spikes, caused e.g.,
- // by a sudden capacity drop.
- last_update_ms_ = now_ms;
- return;
- }
-
- const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
- const int64_t kMaxTimeDeltaMs = 100;
- int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
- threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
- if (threshold_ < 6.f) threshold_ = 6.f;
- if (threshold_ > 600.f) threshold_ = 600.f;
- // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
- last_update_ms_ = now_ms;
-}
-
-} // namespace rtc
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h
deleted file mode 100644
index 372acbc67..000000000
--- a/libtransport/src/protocols/rtc/trendline_estimator.h
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-// FROM
-// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h
-
-#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
-#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <algorithm>
-#include <deque>
-#include <memory>
-#include <utility>
-
-namespace transport {
-
-namespace protocol {
-
-namespace rtc {
-
-class OptionalDouble {
- public:
- OptionalDouble() : val(0), has_val(false){};
- OptionalDouble(double val) : val(val), has_val(true){};
-
- double value() { return val; }
- bool has_value() { return has_val; }
-
- private:
- double val;
- bool has_val;
-};
-
-enum class BandwidthUsage {
- kBwNormal = 0,
- kBwUnderusing = 1,
- kBwOverusing = 2,
- kLast
-};
-
-struct TrendlineEstimatorSettings {
- static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings";
- static constexpr unsigned kDefaultTrendlineWindowSize = 20;
-
- // TrendlineEstimatorSettings() = delete;
- TrendlineEstimatorSettings(
- /*const WebRtcKeyValueConfig* key_value_config*/);
-
- // Sort the packets in the window. Should be redundant,
- // but then almost no cost.
- bool enable_sort = false;
-
- // Cap the trendline slope based on the minimum delay seen
- // in the beginning_packets and end_packets respectively.
- bool enable_cap = false;
- unsigned beginning_packets = 7;
- unsigned end_packets = 7;
- double cap_uncertainty = 0.0;
-
- // Size (in packets) of the window.
- unsigned window_size = kDefaultTrendlineWindowSize;
-
- // std::unique_ptr<StructParametersParser> Parser();
-};
-
-class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ {
- public:
- TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config,
- NetworkStatePredictor* network_state_predictor*/);
-
- ~TrendlineEstimator();
-
- // Update the estimator with a new sample. The deltas should represent deltas
- // between timestamp groups as defined by the InterArrival class.
- void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms,
- int64_t arrival_time_ms, size_t packet_size,
- bool calculated_deltas);
-
- void UpdateTrendline(double recv_delta_ms, double send_delta_ms,
- int64_t send_time_ms, int64_t arrival_time_ms,
- size_t packet_size);
-
- BandwidthUsage State() const;
-
- struct PacketTiming {
- PacketTiming(double arrival_time_ms, double smoothed_delay_ms,
- double raw_delay_ms)
- : arrival_time_ms(arrival_time_ms),
- smoothed_delay_ms(smoothed_delay_ms),
- raw_delay_ms(raw_delay_ms) {}
- double arrival_time_ms;
- double smoothed_delay_ms;
- double raw_delay_ms;
- };
-
- private:
- // friend class GoogCcStatePrinter;
- void Detect(double trend, double ts_delta, int64_t now_ms);
-
- void UpdateThreshold(double modified_offset, int64_t now_ms);
-
- // Parameters.
- TrendlineEstimatorSettings settings_;
- const double smoothing_coef_;
- const double threshold_gain_;
- // Used by the existing threshold.
- int num_of_deltas_;
- // Keep the arrival times small by using the change from the first packet.
- int64_t first_arrival_time_ms_;
- // Exponential backoff filtering.
- double accumulated_delay_;
- double smoothed_delay_;
- // Linear least squares regression.
- std::deque<PacketTiming> delay_hist_;
-
- const double k_up_;
- const double k_down_;
- double overusing_time_threshold_;
- double threshold_;
- double prev_modified_trend_;
- int64_t last_update_ms_;
- double prev_trend_;
- double time_over_using_;
- int overuse_counter_;
- BandwidthUsage hypothesis_;
- // BandwidthUsage hypothesis_predicted_;
- // NetworkStatePredictor* network_state_predictor_;
-
- // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator);
-};
-
-} // namespace rtc
-
-} // end namespace protocol
-
-} // end namespace transport
-#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_