aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc4
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc372
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h58
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc_data_path.cc91
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc_data_path.h19
-rw-r--r--libtransport/src/hicn/transport/protocols/statistics.h23
6 files changed, 435 insertions, 132 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 6a45019a4..481b42a10 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -29,6 +29,8 @@
// is considered inactive
#define MILLI_IN_A_SEC 1000 // ms in a second
+#define HICN_MAX_DATA_SEQ 0xefffffff
+
// NACK HEADER
// +-----------------------------------------+
// | 4 bytes: current segment in production |
@@ -157,7 +159,7 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
portal_->sendContentObject(content_object);
- currentSeg_++;
+ currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ;
}
void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 5ff0126b0..070ce2c6a 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -14,6 +14,7 @@
*/
#include <math.h>
+#include <random>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/protocols/rtc.h>
@@ -32,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol(
icnet_socket->getSocketOption(PORTAL, portal_);
nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
nack_timer_used_ = false;
reset();
}
@@ -43,7 +45,7 @@ RTCTransportProtocol::~RTCTransportProtocol() {
}
int RTCTransportProtocol::start() {
- checkRtx();
+ probeRtt();
return TransportProtocol::start();
}
@@ -62,8 +64,8 @@ void RTCTransportProtocol::resume() {
lastRoundBegin_ = std::chrono::steady_clock::now();
inflightInterestsCount_ = 0;
+ probeRtt();
scheduleNextInterests();
- checkRtx();
portal_->runEventsLoop();
@@ -87,10 +89,11 @@ void RTCTransportProtocol::reset() {
interestRetransmissions_.clear();
lastSegNacked_ = 0;
lastReceived_ = 0;
- nackedByProducer_.clear();
- nackedByProducerMaxSize_ = 512;
+ highestReceived_ = 0;
+ firstSequenceInRound_ = 0;
nack_timer_used_ = false;
+ rtx_timer_used_ = false;
for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){
inflightInterests_[i] = {0};
}
@@ -100,12 +103,12 @@ void RTCTransportProtocol::reset() {
sentInterest_ = 0;
receivedData_ = 0;
packetLost_ = 0;
+ lossRecovered_ = 0;
avgPacketSize_ = HICN_INIT_PACKET_SIZE;
gotNack_ = false;
gotFutureNack_ = 0;
roundsWithoutNacks_ = 0;
pathTable_.clear();
- minRtt_ = UINT_MAX;
// CC var
estimatedBw_ = 0.0;
@@ -113,7 +116,10 @@ void RTCTransportProtocol::reset() {
queuingDelay_ = 0.0;
protocolState_ = HICN_RTC_NORMAL_STATE;
- producerPathLabel_ = 0;
+ producerPathLabels_[0] = 0;
+ producerPathLabels_[1] = 0;
+ initied = false;
+
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
(uint32_t)HICN_RTC_INTEREST_LIFETIME);
// XXX this should be done by the application
@@ -183,10 +189,16 @@ void RTCTransportProtocol::updateDelayStats(
*senderTimeStamp;
pathTable_[pathLabel]->insertOwdSample(OWD);
+ pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber);
+ }else{
+ pathTable_[pathLabel]->receivedNack();
}
}
void RTCTransportProtocol::updateStats(uint32_t round_duration) {
+ if(pathTable_.empty())
+ return;
+
if (receivedBytes_ != 0) {
double bytesPerSec =
(double)(receivedBytes_ *
@@ -195,33 +207,78 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec);
}
- auto it = pathTable_.find(producerPathLabel_);
- if (it == pathTable_.end()) return;
-
- minRtt_ = it->second->getMinRtt();
- queuingDelay_ = it->second->getQueuingDealy();
-
- if (minRtt_ == 0) minRtt_ = 1;
+ uint64_t minRtt = UINT_MAX;
+ uint64_t maxRtt = 0;
for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
it->second->roundEnd();
+ if(it->second->isActive()){
+ if(it->second->getMinRtt() < minRtt){
+ minRtt = it->second->getMinRtt();
+ producerPathLabels_[0] = it->first;
+ }
+ if(it->second->getMinRtt() > maxRtt){
+ maxRtt = it->second->getMinRtt();
+ producerPathLabels_[1] = it->first;
+ }
+ }
}
+ if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
+ return; //this should not happen
+
+ //as a queuing delay we keep the lowest one among the two paths
+ //if one path is congested the forwarder should decide to do not
+ //use it soon so it does not make sens to inform the application
+ //that maybe we have a problem
+ if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
+ pathTable_[producerPathLabels_[1]]->getQueuingDealy())
+ queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
+ else
+ queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
+
if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
- double lossRate = (double)((double)packetLost_ / (double)sentInterest_);
- lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
+ uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_;
+ double lossRate = 0;
+ if(numberTheoricallyReceivedPackets_ != 0)
+ lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_);
+
+ if(lossRate < 0)
+ lossRate = 0;
+
+ if(initied){
+ lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
(lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
+ }else {
+ lossRate_ =lossRate;
+ initied = true;
+ }
}
if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE;
+ //for the BDP we use the max rtt, so that we calibrate the window on the
+ //RTT of the slowest path. In this way we are sure that the window will
+ //never be too small
uint32_t BDP = (uint32_t)ceil(
- (estimatedBw_ * (double)((double)minRtt_ / (double)HICN_MILLI_IN_A_SEC) *
- HICN_BANDWIDTH_SLACK_FACTOR) /
- avgPacketSize_);
+ (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() /
+ (double)HICN_MILLI_IN_A_SEC) *
+ HICN_BANDWIDTH_SLACK_FACTOR) /
+ avgPacketSize_);
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
+ ConsumerTimerCallback *stats_callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback != VOID_HANDLER) {
+ //Send the stats to the app
+ stats_.updateQueuingDelay(queuingDelay_);
+ stats_.updateLossRatio(lossRate_);
+ (*stats_callback)(*socket_, stats_);
+ }
+
// bound also by interest lifitime* production rate
if (!gotNack_) {
roundsWithoutNacks_++;
@@ -244,6 +301,8 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
sentInterest_ = 0;
receivedData_ = 0;
packetLost_ = 0;
+ lossRecovered_ = 0;
+ firstSequenceInRound_ = highestReceived_;
}
void RTCTransportProtocol::updateCCState() {
@@ -320,6 +379,33 @@ void RTCTransportProtocol::increaseWindow() {
}
}
+void RTCTransportProtocol::probeRtt(){
+ time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ //get a random numbe in the probe seq range
+ std::default_random_engine eng((std::random_device())());
+ std::uniform_int_distribution<uint32_t> idis(
+ HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ);
+ probe_seq_number_ = idis(eng);
+ interest_name->setSuffix(probe_seq_number_);
+
+ //we considere the probe as a rtx so that we do not incresea inFlightInt
+ received_probe_ = false;
+ sendInterest(interest_name, true);
+
+ probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ probe_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ probeRtt();
+ });
+}
+
+
void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
auto interest = getPacket();
interest->setName(*interest_name);
@@ -363,7 +449,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
// we send the packet only if it is not pending yet
interest_name->setSuffix(actualSegment_);
if (portal_->interestIsPending(*interest_name)) {
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
continue;
}
@@ -371,7 +457,14 @@ void RTCTransportProtocol::scheduleNextInterests() {
// if we already reacevied the content we don't ask it again
if (inflightInterests_[pkt].state == received_ &&
inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ continue;
+ }
+
+ //same if the packet is lost
+ if (inflightInterests_[pkt].state == lost_ &&
+ inflightInterests_[pkt].sequence == actualSegment_){
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
continue;
}
@@ -379,9 +472,11 @@ void RTCTransportProtocol::scheduleNextInterests() {
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
+
+ //here the packet can be in any state except for lost or recevied
inflightInterests_[pkt].state = sent_;
inflightInterests_[pkt].sequence = actualSegment_;
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
sendInterest(interest_name, false);
checkRound();
@@ -394,20 +489,31 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) {
}
void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
if (it == interestRetransmissions_.end()) {
if (lastSegNacked_ <= i) {
// i must be larger than the last past nack received
+ packetLost_++;
interestRetransmissions_[i] = 0;
+ uint32_t pkt = i & modMask_;
+ //we reset the transmission time setting to now, so that rtx will
+ //happne in one RTT on waint one inter arrival gap
+ inflightInterests_[pkt].transmissionTime = now;
}
} // if the retransmission is already there the rtx timer will
// take care of it
}
- retransmit(true);
+
+ if(!rtx_timer_used_)
+ checkRtx();
}
-void RTCTransportProtocol::retransmit(bool first_rtx) {
+void RTCTransportProtocol::retransmit() {
auto it = interestRetransmissions_.begin();
// cut len to max HICN_MAX_RTX_SIZE
@@ -441,51 +547,63 @@ void RTCTransportProtocol::retransmit(bool first_rtx) {
continue;
}
- if (first_rtx) {
- // TODO (optimization)
- // the rtx that we never sent (it->second == 0) are all at the
- // end, so we can go directly there
- if (it->second == 0) {
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- it->second++;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- sendInterest(interest_name, true);
+ uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
+ uint64_t rtx_time = sent_time;
+
+ if(it->second == 0){
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
+ //first rtx: wait RTTmax - RTTmin + gap
+ rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt() +
+ pathTable_[producerPathLabels_[1]]->getInterArrivalGap();
}
- ++it;
- } else {
- // base on time
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ }else{
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){
+ //second+ rtx: waint min rtt
+ rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt();
+ }
+ }
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- if ((now - inflightInterests_[pkt].transmissionTime) > 20) {
- // XXX replace 20 with rtt
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- it->second++;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ if(now >= rtx_time){
+ inflightInterests_[pkt].transmissionTime = now;
+ it->second++;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- interest_name->setSuffix(it->first);
- sendInterest(interest_name, true);
- }
- ++it;
+ interest_name->setSuffix(it->first);
+ sendInterest(interest_name, true);
}
+
+ ++it;
}
}
void RTCTransportProtocol::checkRtx() {
- retransmit(false);
- rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
+ if(interestRetransmissions_.empty()){
+ rtx_timer_used_ = false;
+ return;
+ }
+
+ //we use the packet intearriva time on the fastest path
+ //even if this stats should be the same on both
+ auto pathStats = pathTable_.find(producerPathLabels_[0]);
+ uint64_t wait = 1;
+ if(pathStats != pathTable_.end()){
+ wait = floor(pathStats->second->getInterArrivalGap() / 2.0);
+ if(wait < 1)
+ wait = 1;
+ }
+
+ rtx_timer_used_ = true;
+ retransmit();
+ rtx_timer_->expires_from_now(std::chrono::milliseconds(wait));
rtx_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
checkRtx();
@@ -555,63 +673,62 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
return true;
}
-void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) {
uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
uint32_t nackSegment = content_object.getName().getSuffix();
- gotNack_ = true;
- // we synch the estimated production rate with the actual one
- estimatedBw_ = (double)productionRate;
+ bool old_nack = false;
+
+ if(!rtx){
+ gotNack_ = true;
+ // we synch the estimated production rate with the actual one
+ estimatedBw_ = (double)productionRate;
+ }
if (productionSeg > nackSegment) {
// we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg + 1, actualSegment_);
- if (currentState_ == HICN_RTC_NORMAL_STATE) {
- currentState_ = HICN_RTC_SYNC_STATE;
+ actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ;
+
+ if(!rtx) {
+ if (currentState_ == HICN_RTC_NORMAL_STATE) {
+ currentState_ = HICN_RTC_SYNC_STATE;
+ }
+
+ computeMaxWindow(productionRate, 0);
+ increaseWindow();
}
- computeMaxWindow(productionRate, 0);
- increaseWindow();
+ //we need to remove the rtx for packets with seq number
+ //< productionSeg
+ for(auto it = interestRetransmissions_.begin(); it !=
+ interestRetransmissions_.end();){
+ if(it->first < productionSeg)
+ it = interestRetransmissions_.erase(it);
+ else
+ ++it;
+ }
- interestRetransmissions_.clear();
lastSegNacked_ = productionSeg;
-
- if (nackedByProducer_.size() >= nackedByProducerMaxSize_)
- nackedByProducer_.erase(nackedByProducer_.begin());
- nackedByProducer_.insert(nackSegment);
+ old_nack = true;
} else if (productionSeg < nackSegment) {
- // we are asking stuff in the future
- gotFutureNack_++;
-
- actualSegment_ = productionSeg + 1;
+ actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ;
- computeMaxWindow(productionRate, 0);
- decreaseWindow();
+ if(!rtx){
+ // we are asking stuff in the future
+ gotFutureNack_++;
+ computeMaxWindow(productionRate, 0);
+ decreaseWindow();
- if (currentState_ == HICN_RTC_SYNC_STATE) {
- currentState_ = HICN_RTC_NORMAL_STATE;
+ if (currentState_ == HICN_RTC_SYNC_STATE) {
+ currentState_ = HICN_RTC_NORMAL_STATE;
+ }
}
} // equal should not happen
-}
-
-void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) {
- uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
- uint32_t productionSeg = *payload;
- uint32_t nackSegment = content_object.getName().getSuffix();
-
- if (productionSeg > nackSegment) {
- // we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg + 1, actualSegment_);
-
- interestRetransmissions_.clear();
- lastSegNacked_ = productionSeg;
- } else if (productionSeg < nackSegment) {
- actualSegment_ = productionSeg + 1;
- } // equal should not happen
+ return old_nack;
}
void RTCTransportProtocol::onContentObject(
@@ -629,20 +746,63 @@ void RTCTransportProtocol::onContentObject(
(*callback_content_object)(*socket_, *content_object);
}
+ if(segmentNumber == probe_seq_number_){
+ if(payload_size == HICN_NACK_HEADER_SIZE){
+ if(!received_probe_){
+ received_probe_ = true;
+
+ uint32_t pathLabel = content_object->getPathLabel();
+ if (pathTable_.find(pathLabel) == pathTable_.end()){
+ //if this path does not exists we cannot create a new one so drop
+ return;
+ }
+
+ //this is the expected probe, update the RTT and drop the packet
+ uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() - time_sent_probe_;
+
+ pathTable_[pathLabel]->insertRttSample(RTT);
+ pathTable_[pathLabel]->receivedNack();
+ return;
+ }
+ }else{
+ //this should never happen
+ //don't know what to do, let's try to process it as normal packet
+ }
+ }
+
if (payload_size == HICN_NACK_HEADER_SIZE) {
- // Nacks always come form the producer, so we set the producerPathLabel_;
- producerPathLabel_ = content_object->getPathLabel();
schedule_next_interest = checkIfProducerIsActive(*content_object);
+
if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--;
- // if checkIfProducerIsActive returns false, we did all we need to do
- // inside that function, no need to call onNack
- if (schedule_next_interest) onNack(*content_object);
- updateDelayStats(*content_object);
- } else {
- if (schedule_next_interest) onNackForRtx(*content_object);
}
+ // if checkIfProducerIsActive returns false, we did all we need to do
+ // inside that function, no need to call onNack
+ bool old_nack = false;
+
+ if (schedule_next_interest){
+ if (interestRetransmissions_.find(segmentNumber) ==
+ interestRetransmissions_.end()){
+ //this is not a retransmitted packet
+ old_nack = onNack(*content_object, false);
+ updateDelayStats(*content_object);
+ } else {
+ old_nack = onNack(*content_object, true);
+ }
+ }
+
+ //the nacked_ state is used only to avoid to decrease inflightInterestsCount_
+ //multiple times. In fact, every time that we receive an event related to an
+ //interest (timeout, nacked, content) we cange the state. In this way we are
+ //sure that we do not decrease twice the counter
+ if(old_nack)
+ inflightInterests_[pkt].state = lost_;
+ else
+ inflightInterests_[pkt].state = nacked_;
+
} else {
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
@@ -661,6 +821,8 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
addRetransmissions(lastReceived_ + 1, segmentNumber);
+ if(segmentNumber > highestReceived_)
+ highestReceived_ = segmentNumber;
// lastReceived_ is updated only for data packets received without RTX
lastReceived_ = segmentNumber;
}
@@ -673,6 +835,10 @@ void RTCTransportProtocol::onContentObject(
}
// in any case we remove the packet from the rtx list
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if(it != interestRetransmissions_.end())
+ lossRecovered_ ++;
+
interestRetransmissions_.erase(segmentNumber);
if (schedule_next_interest) {
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 66ad05a88..770768e31 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -35,6 +35,13 @@
#define HICN_TIMESTAMP_SIZE 8 // bytes
#define HICN_RTC_INTEREST_LIFETIME 1000 // ms
+//rtt measurement
+//normal interests for data goes from 0 to
+//HICN_MIN_PROBE_SEQ, the rest is reserverd for
+//probes
+#define HICN_MIN_PROBE_SEQ 0xefffffff
+#define HICN_MAX_PROBE_SEQ 0xffffffff
+
// controller constant
#define HICN_ROUND_LEN 200 // ms interval of time on which
// we take decisions / measurements
@@ -62,12 +69,14 @@
#define HICN_MICRO_IN_A_SEC 1000000
#define HICN_MILLI_IN_A_SEC 1000
+
namespace transport {
namespace protocol {
enum packetState {
sent_,
+ nacked_,
received_,
timeout1_,
timeout2_,
@@ -115,16 +124,15 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
void scheduleNextInterests() override;
void addRetransmissions(uint32_t val);
void addRetransmissions(uint32_t start, uint32_t stop);
- void retransmit(bool first_rtx);
+ void retransmit();
void checkRtx();
+ void probeRtt();
void onTimeout(Interest::Ptr &&interest) override;
// checkIfProducerIsActive: return true if we need to schedule an interest
// immediatly after, false otherwise (this happens when the producer socket
// is not active)
bool checkIfProducerIsActive(const ContentObject &content_object);
- void onNack(const ContentObject &content_object);
- //funtcion used to handle nacks for retransmitted interests
- void onNackForRtx(const ContentObject &content_object);
+ bool onNack(const ContentObject &content_object, bool rtx);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
void returnContentToApplication(const ContentObject &content_object);
@@ -148,48 +156,62 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
//map seq to rtx
std::map<uint32_t, uint8_t> interestRetransmissions_;
std::unique_ptr<asio::steady_timer> rtx_timer_;
- //std::queue<uint32_t> interestRetransmissions_;
std::vector<sentInterest> inflightInterests_;
uint32_t lastSegNacked_; //indicates the segment id in the last received
// past Nack. we do not ask for retransmissions
//for samething that is older than this value.
uint32_t lastReceived_; //segment of the last content object received
//indicates the base of the window on the client
- uint32_t nackedByProducerMaxSize_;
- std::set<uint32_t>
- nackedByProducer_; // this is used to avoid retransmissions from the
- // application for pakets for which we already got a
- // past NACK by the producer these packet are too old,
- // they will never be retrived
+
bool nack_timer_used_;
+ bool rtx_timer_used_;
std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule
- // a nack retransmission in
+ // a nack retransmission in case
// of inactive prod socket
+ //rtt probes
+ //the RTC transport tends to overestimate the RTT
+ //du to the production time on the server side
+ //once per second we send an interest for wich we know
+ //we will get a nack. This nack will keep our estimation
+ //close to the reality
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+ uint64_t time_sent_probe_;
+ uint32_t probe_seq_number_;
+ bool received_probe_;
+
uint32_t modMask_;
// stats
uint32_t receivedBytes_;
uint32_t sentInterest_;
uint32_t receivedData_;
- uint32_t packetLost_;
+ int32_t packetLost_;
+ int32_t lossRecovered_;
+ uint32_t firstSequenceInRound_;
+ uint32_t highestReceived_;
double avgPacketSize_;
bool gotNack_;
uint32_t gotFutureNack_;
uint32_t roundsWithoutNacks_;
- uint32_t producerPathLabel_; // XXX we pick only one path lable for the
- // producer for now, assuming the usage of a
- // single path this should be extended to a
- // vector
+
+ //we keep track of up two paths (if only one path is in use
+ //the two values in the vector will be the same)
+ //position 0 stores the path with minRTT
+ //position 1 stores the path with maxRTT
+ uint32_t producerPathLabels_[2];
+
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
uint32_t roundCounter_;
- uint64_t minRtt_;
// CC var
double estimatedBw_;
double lossRate_;
double queuingDelay_;
unsigned protocolState_;
+
+ bool initied;
+ TransportStatistics stats_;
};
} // namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
index 6c9605fb2..0cbff0e3c 100644
--- a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
@@ -14,6 +14,10 @@
*/
#include <hicn/transport/protocols/rtc_data_path.h>
+#include <chrono>
+#include <cfloat>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 //2sec
namespace transport {
@@ -30,7 +34,13 @@ RTCDataPath::RTCDataPath()
// (for congestion/quality control)
prev_min_owd(INT_MAX),
avg_owd(0.0),
- queuing_delay(0.0),
+ queuing_delay(DBL_MAX),
+ lastRecvSeq_(0),
+ lastRecvTime_(0),
+ avg_inter_arrival_(DBL_MAX),
+ received_nacks_(false),
+ received_packets_(false),
+ rounds_without_packets_(0),
RTThistory_(HISTORY_LEN),
OWDhistory_(HISTORY_LEN){};
@@ -43,14 +53,62 @@ void RTCDataPath::insertOwdSample(int64_t owd) {
// for owd we use both min and avg
if (owd < min_owd) min_owd = owd;
- avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ if(avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ //owd is computed only for valid data packets so we count only
+ //this for decide if we recevie traffic or not
+ received_packets_ = true;
}
-void RTCDataPath::roundEnd() {
- // compute queuing delay
- queuing_delay = avg_owd - getMinOwd();
+void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber){
+
+ //got packet in sequence, compute gap
+ if(lastRecvSeq_ == (segmentNumber - 1)){
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t delta = now - lastRecvTime_;
+ lastRecvSeq_ = segmentNumber;
+ lastRecvTime_ = now;
+ if(avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ = (avg_inter_arrival_ * (1 -ALPHA_RTC))
+ + (delta * ALPHA_RTC);
+ return;
+ }
+
+ //ooo packet, update the stasts if needed
+ if(lastRecvSeq_ <= segmentNumber){
+ lastRecvSeq_ = segmentNumber;
+ lastRecvTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+}
+
+void RTCDataPath::receivedNack(){
+ received_nacks_ = true;
+}
+
+double RTCDataPath::getInterArrivalGap(){
+ if(avg_inter_arrival_ == DBL_MAX)
+ return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isActive(){
+ if(received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
- // reset min_rtt and add it to the history
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
if (min_rtt != UINT_MAX) {
prev_min_rtt = min_rtt;
} else {
@@ -60,6 +118,9 @@ void RTCDataPath::roundEnd() {
min_rtt = prev_min_rtt;
}
+ if(min_rtt == 0)
+ min_rtt = 1;
+
RTThistory_.pushBack(min_rtt);
min_rtt = UINT_MAX;
@@ -70,8 +131,22 @@ void RTCDataPath::roundEnd() {
min_owd = prev_min_owd;
}
- OWDhistory_.pushBack(min_owd);
- min_owd = INT_MAX;
+ if (min_owd != INT_MAX) {
+ OWDhistory_.pushBack(min_owd);
+ min_owd = INT_MAX;
+
+ // compute queuing delay
+ queuing_delay = avg_owd - getMinOwd();
+
+ } else {
+ queuing_delay = 0.0;
+ }
+
+ if(!received_packets_)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = false;
}
double RTCDataPath::getQueuingDealy() { return queuing_delay; }
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h
index b55139d52..c8a049368 100644
--- a/libtransport/src/hicn/transport/protocols/rtc_data_path.h
+++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.h
@@ -33,10 +33,13 @@ class RTCDataPath {
public:
void insertRttSample(uint64_t rtt);
void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segmentNumber);
+ void receivedNack();
uint64_t getMinRtt();
-
double getQueuingDealy();
+ double getInterArrivalGap();
+ bool isActive();
void roundEnd();
@@ -53,6 +56,20 @@ class RTCDataPath {
double queuing_delay;
+ uint32_t lastRecvSeq_;
+ uint64_t lastRecvTime_;
+ double avg_inter_arrival_;
+
+ //flags to check if a path is active
+ //we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ //and if we receives packets
+ bool received_nacks_;
+ bool received_packets_;
+ uint8_t rounds_without_packets_; //if we don't get any packet
+ //for MAX_ROUNDS_WITHOUT_PKTS
+ //we consider the path inactive
+
utils::MinFilter<uint64_t> RTThistory_;
utils::MinFilter<int64_t> OWDhistory_;
};
diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h
index 47d164158..d5e89b96d 100644
--- a/libtransport/src/hicn/transport/protocols/statistics.h
+++ b/libtransport/src/hicn/transport/protocols/statistics.h
@@ -33,7 +33,9 @@ class TransportStatistics {
average_rtt_(0),
avg_window_size_(0),
interest_tx_(0),
- alpha_(alpha) {}
+ alpha_(alpha),
+ loss_ratio_(0.0),
+ queuing_delay_(0.0) {}
TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) {
retx_count_ += retx;
@@ -56,6 +58,14 @@ class TransportStatistics {
interest_tx_ += int_tx;
}
+ TRANSPORT_ALWAYS_INLINE void updateLossRatio(double loss_ratio) {
+ loss_ratio_ = loss_ratio;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateQueuingDelay(double queuing_delay) {
+ queuing_delay_ = queuing_delay;
+ }
+
TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; }
TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const {
@@ -72,12 +82,21 @@ class TransportStatistics {
return interest_tx_;
}
+ TRANSPORT_ALWAYS_INLINE double getLossRatio() const {
+ return loss_ratio_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE double getQueuingDelay() const {
+ return queuing_delay_;
+ }
+
TRANSPORT_ALWAYS_INLINE void reset() {
retx_count_ = 0;
bytes_received_ = 0;
average_rtt_ = 0;
avg_window_size_ = 0;
interest_tx_ = 0;
+ loss_ratio_ = 0;
}
private:
@@ -87,6 +106,8 @@ class TransportStatistics {
double avg_window_size_;
uint64_t interest_tx_;
double alpha_;
+ double loss_ratio_;
+ double queuing_delay_;
};
} // end namespace protocol