aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/rtc.cc
diff options
context:
space:
mode:
authormichele papalini <micpapal@cisco.com>2019-10-03 12:09:01 +0200
committermichele papalini <micpapal@cisco.com>2019-10-03 13:23:48 +0200
commiteaaff7fa111c821ed6710dec7b6c49c5ecac6ad4 (patch)
tree5faa53d3e0f73f703b0526f2c1295f1359d96062 /libtransport/src/hicn/transport/protocols/rtc.cc
parent523f12c368a3067527546efae5922150281ac203 (diff)
[HICN-291] handle multiple paths in RTC
Change-Id: I69d331aa6e953e802e2f4b3e60325f852941fd94 Signed-off-by: michele papalini <micpapal@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc372
1 files changed, 269 insertions, 103 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 5ff0126b0..070ce2c6a 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -14,6 +14,7 @@
*/
#include <math.h>
+#include <random>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/protocols/rtc.h>
@@ -32,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol(
icnet_socket->getSocketOption(PORTAL, portal_);
nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
nack_timer_used_ = false;
reset();
}
@@ -43,7 +45,7 @@ RTCTransportProtocol::~RTCTransportProtocol() {
}
int RTCTransportProtocol::start() {
- checkRtx();
+ probeRtt();
return TransportProtocol::start();
}
@@ -62,8 +64,8 @@ void RTCTransportProtocol::resume() {
lastRoundBegin_ = std::chrono::steady_clock::now();
inflightInterestsCount_ = 0;
+ probeRtt();
scheduleNextInterests();
- checkRtx();
portal_->runEventsLoop();
@@ -87,10 +89,11 @@ void RTCTransportProtocol::reset() {
interestRetransmissions_.clear();
lastSegNacked_ = 0;
lastReceived_ = 0;
- nackedByProducer_.clear();
- nackedByProducerMaxSize_ = 512;
+ highestReceived_ = 0;
+ firstSequenceInRound_ = 0;
nack_timer_used_ = false;
+ rtx_timer_used_ = false;
for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){
inflightInterests_[i] = {0};
}
@@ -100,12 +103,12 @@ void RTCTransportProtocol::reset() {
sentInterest_ = 0;
receivedData_ = 0;
packetLost_ = 0;
+ lossRecovered_ = 0;
avgPacketSize_ = HICN_INIT_PACKET_SIZE;
gotNack_ = false;
gotFutureNack_ = 0;
roundsWithoutNacks_ = 0;
pathTable_.clear();
- minRtt_ = UINT_MAX;
// CC var
estimatedBw_ = 0.0;
@@ -113,7 +116,10 @@ void RTCTransportProtocol::reset() {
queuingDelay_ = 0.0;
protocolState_ = HICN_RTC_NORMAL_STATE;
- producerPathLabel_ = 0;
+ producerPathLabels_[0] = 0;
+ producerPathLabels_[1] = 0;
+ initied = false;
+
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
(uint32_t)HICN_RTC_INTEREST_LIFETIME);
// XXX this should be done by the application
@@ -183,10 +189,16 @@ void RTCTransportProtocol::updateDelayStats(
*senderTimeStamp;
pathTable_[pathLabel]->insertOwdSample(OWD);
+ pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber);
+ }else{
+ pathTable_[pathLabel]->receivedNack();
}
}
void RTCTransportProtocol::updateStats(uint32_t round_duration) {
+ if(pathTable_.empty())
+ return;
+
if (receivedBytes_ != 0) {
double bytesPerSec =
(double)(receivedBytes_ *
@@ -195,33 +207,78 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec);
}
- auto it = pathTable_.find(producerPathLabel_);
- if (it == pathTable_.end()) return;
-
- minRtt_ = it->second->getMinRtt();
- queuingDelay_ = it->second->getQueuingDealy();
-
- if (minRtt_ == 0) minRtt_ = 1;
+ uint64_t minRtt = UINT_MAX;
+ uint64_t maxRtt = 0;
for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
it->second->roundEnd();
+ if(it->second->isActive()){
+ if(it->second->getMinRtt() < minRtt){
+ minRtt = it->second->getMinRtt();
+ producerPathLabels_[0] = it->first;
+ }
+ if(it->second->getMinRtt() > maxRtt){
+ maxRtt = it->second->getMinRtt();
+ producerPathLabels_[1] = it->first;
+ }
+ }
}
+ if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
+ return; //this should not happen
+
+ //as a queuing delay we keep the lowest one among the two paths
+ //if one path is congested the forwarder should decide to do not
+ //use it soon so it does not make sens to inform the application
+ //that maybe we have a problem
+ if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
+ pathTable_[producerPathLabels_[1]]->getQueuingDealy())
+ queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
+ else
+ queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
+
if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
- double lossRate = (double)((double)packetLost_ / (double)sentInterest_);
- lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
+ uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_;
+ double lossRate = 0;
+ if(numberTheoricallyReceivedPackets_ != 0)
+ lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_);
+
+ if(lossRate < 0)
+ lossRate = 0;
+
+ if(initied){
+ lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
(lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
+ }else {
+ lossRate_ =lossRate;
+ initied = true;
+ }
}
if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE;
+ //for the BDP we use the max rtt, so that we calibrate the window on the
+ //RTT of the slowest path. In this way we are sure that the window will
+ //never be too small
uint32_t BDP = (uint32_t)ceil(
- (estimatedBw_ * (double)((double)minRtt_ / (double)HICN_MILLI_IN_A_SEC) *
- HICN_BANDWIDTH_SLACK_FACTOR) /
- avgPacketSize_);
+ (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() /
+ (double)HICN_MILLI_IN_A_SEC) *
+ HICN_BANDWIDTH_SLACK_FACTOR) /
+ avgPacketSize_);
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
+ ConsumerTimerCallback *stats_callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback != VOID_HANDLER) {
+ //Send the stats to the app
+ stats_.updateQueuingDelay(queuingDelay_);
+ stats_.updateLossRatio(lossRate_);
+ (*stats_callback)(*socket_, stats_);
+ }
+
// bound also by interest lifitime* production rate
if (!gotNack_) {
roundsWithoutNacks_++;
@@ -244,6 +301,8 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
sentInterest_ = 0;
receivedData_ = 0;
packetLost_ = 0;
+ lossRecovered_ = 0;
+ firstSequenceInRound_ = highestReceived_;
}
void RTCTransportProtocol::updateCCState() {
@@ -320,6 +379,33 @@ void RTCTransportProtocol::increaseWindow() {
}
}
+void RTCTransportProtocol::probeRtt(){
+ time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ //get a random numbe in the probe seq range
+ std::default_random_engine eng((std::random_device())());
+ std::uniform_int_distribution<uint32_t> idis(
+ HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ);
+ probe_seq_number_ = idis(eng);
+ interest_name->setSuffix(probe_seq_number_);
+
+ //we considere the probe as a rtx so that we do not incresea inFlightInt
+ received_probe_ = false;
+ sendInterest(interest_name, true);
+
+ probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ probe_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ probeRtt();
+ });
+}
+
+
void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
auto interest = getPacket();
interest->setName(*interest_name);
@@ -363,7 +449,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
// we send the packet only if it is not pending yet
interest_name->setSuffix(actualSegment_);
if (portal_->interestIsPending(*interest_name)) {
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
continue;
}
@@ -371,7 +457,14 @@ void RTCTransportProtocol::scheduleNextInterests() {
// if we already reacevied the content we don't ask it again
if (inflightInterests_[pkt].state == received_ &&
inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ continue;
+ }
+
+ //same if the packet is lost
+ if (inflightInterests_[pkt].state == lost_ &&
+ inflightInterests_[pkt].sequence == actualSegment_){
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
continue;
}
@@ -379,9 +472,11 @@ void RTCTransportProtocol::scheduleNextInterests() {
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
+
+ //here the packet can be in any state except for lost or recevied
inflightInterests_[pkt].state = sent_;
inflightInterests_[pkt].sequence = actualSegment_;
- actualSegment_++;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
sendInterest(interest_name, false);
checkRound();
@@ -394,20 +489,31 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) {
}
void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
if (it == interestRetransmissions_.end()) {
if (lastSegNacked_ <= i) {
// i must be larger than the last past nack received
+ packetLost_++;
interestRetransmissions_[i] = 0;
+ uint32_t pkt = i & modMask_;
+ //we reset the transmission time setting to now, so that rtx will
+ //happne in one RTT on waint one inter arrival gap
+ inflightInterests_[pkt].transmissionTime = now;
}
} // if the retransmission is already there the rtx timer will
// take care of it
}
- retransmit(true);
+
+ if(!rtx_timer_used_)
+ checkRtx();
}
-void RTCTransportProtocol::retransmit(bool first_rtx) {
+void RTCTransportProtocol::retransmit() {
auto it = interestRetransmissions_.begin();
// cut len to max HICN_MAX_RTX_SIZE
@@ -441,51 +547,63 @@ void RTCTransportProtocol::retransmit(bool first_rtx) {
continue;
}
- if (first_rtx) {
- // TODO (optimization)
- // the rtx that we never sent (it->second == 0) are all at the
- // end, so we can go directly there
- if (it->second == 0) {
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- it->second++;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- sendInterest(interest_name, true);
+ uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
+ uint64_t rtx_time = sent_time;
+
+ if(it->second == 0){
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
+ //first rtx: wait RTTmax - RTTmin + gap
+ rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt() +
+ pathTable_[producerPathLabels_[1]]->getInterArrivalGap();
}
- ++it;
- } else {
- // base on time
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ }else{
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){
+ //second+ rtx: waint min rtt
+ rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt();
+ }
+ }
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- if ((now - inflightInterests_[pkt].transmissionTime) > 20) {
- // XXX replace 20 with rtt
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- it->second++;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ if(now >= rtx_time){
+ inflightInterests_[pkt].transmissionTime = now;
+ it->second++;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- interest_name->setSuffix(it->first);
- sendInterest(interest_name, true);
- }
- ++it;
+ interest_name->setSuffix(it->first);
+ sendInterest(interest_name, true);
}
+
+ ++it;
}
}
void RTCTransportProtocol::checkRtx() {
- retransmit(false);
- rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
+ if(interestRetransmissions_.empty()){
+ rtx_timer_used_ = false;
+ return;
+ }
+
+ //we use the packet intearriva time on the fastest path
+ //even if this stats should be the same on both
+ auto pathStats = pathTable_.find(producerPathLabels_[0]);
+ uint64_t wait = 1;
+ if(pathStats != pathTable_.end()){
+ wait = floor(pathStats->second->getInterArrivalGap() / 2.0);
+ if(wait < 1)
+ wait = 1;
+ }
+
+ rtx_timer_used_ = true;
+ retransmit();
+ rtx_timer_->expires_from_now(std::chrono::milliseconds(wait));
rtx_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
checkRtx();
@@ -555,63 +673,62 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
return true;
}
-void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) {
uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
uint32_t nackSegment = content_object.getName().getSuffix();
- gotNack_ = true;
- // we synch the estimated production rate with the actual one
- estimatedBw_ = (double)productionRate;
+ bool old_nack = false;
+
+ if(!rtx){
+ gotNack_ = true;
+ // we synch the estimated production rate with the actual one
+ estimatedBw_ = (double)productionRate;
+ }
if (productionSeg > nackSegment) {
// we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg + 1, actualSegment_);
- if (currentState_ == HICN_RTC_NORMAL_STATE) {
- currentState_ = HICN_RTC_SYNC_STATE;
+ actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ;
+
+ if(!rtx) {
+ if (currentState_ == HICN_RTC_NORMAL_STATE) {
+ currentState_ = HICN_RTC_SYNC_STATE;
+ }
+
+ computeMaxWindow(productionRate, 0);
+ increaseWindow();
}
- computeMaxWindow(productionRate, 0);
- increaseWindow();
+ //we need to remove the rtx for packets with seq number
+ //< productionSeg
+ for(auto it = interestRetransmissions_.begin(); it !=
+ interestRetransmissions_.end();){
+ if(it->first < productionSeg)
+ it = interestRetransmissions_.erase(it);
+ else
+ ++it;
+ }
- interestRetransmissions_.clear();
lastSegNacked_ = productionSeg;
-
- if (nackedByProducer_.size() >= nackedByProducerMaxSize_)
- nackedByProducer_.erase(nackedByProducer_.begin());
- nackedByProducer_.insert(nackSegment);
+ old_nack = true;
} else if (productionSeg < nackSegment) {
- // we are asking stuff in the future
- gotFutureNack_++;
-
- actualSegment_ = productionSeg + 1;
+ actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ;
- computeMaxWindow(productionRate, 0);
- decreaseWindow();
+ if(!rtx){
+ // we are asking stuff in the future
+ gotFutureNack_++;
+ computeMaxWindow(productionRate, 0);
+ decreaseWindow();
- if (currentState_ == HICN_RTC_SYNC_STATE) {
- currentState_ = HICN_RTC_NORMAL_STATE;
+ if (currentState_ == HICN_RTC_SYNC_STATE) {
+ currentState_ = HICN_RTC_NORMAL_STATE;
+ }
}
} // equal should not happen
-}
-
-void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) {
- uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
- uint32_t productionSeg = *payload;
- uint32_t nackSegment = content_object.getName().getSuffix();
-
- if (productionSeg > nackSegment) {
- // we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg + 1, actualSegment_);
-
- interestRetransmissions_.clear();
- lastSegNacked_ = productionSeg;
- } else if (productionSeg < nackSegment) {
- actualSegment_ = productionSeg + 1;
- } // equal should not happen
+ return old_nack;
}
void RTCTransportProtocol::onContentObject(
@@ -629,20 +746,63 @@ void RTCTransportProtocol::onContentObject(
(*callback_content_object)(*socket_, *content_object);
}
+ if(segmentNumber == probe_seq_number_){
+ if(payload_size == HICN_NACK_HEADER_SIZE){
+ if(!received_probe_){
+ received_probe_ = true;
+
+ uint32_t pathLabel = content_object->getPathLabel();
+ if (pathTable_.find(pathLabel) == pathTable_.end()){
+ //if this path does not exists we cannot create a new one so drop
+ return;
+ }
+
+ //this is the expected probe, update the RTT and drop the packet
+ uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() - time_sent_probe_;
+
+ pathTable_[pathLabel]->insertRttSample(RTT);
+ pathTable_[pathLabel]->receivedNack();
+ return;
+ }
+ }else{
+ //this should never happen
+ //don't know what to do, let's try to process it as normal packet
+ }
+ }
+
if (payload_size == HICN_NACK_HEADER_SIZE) {
- // Nacks always come form the producer, so we set the producerPathLabel_;
- producerPathLabel_ = content_object->getPathLabel();
schedule_next_interest = checkIfProducerIsActive(*content_object);
+
if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--;
- // if checkIfProducerIsActive returns false, we did all we need to do
- // inside that function, no need to call onNack
- if (schedule_next_interest) onNack(*content_object);
- updateDelayStats(*content_object);
- } else {
- if (schedule_next_interest) onNackForRtx(*content_object);
}
+ // if checkIfProducerIsActive returns false, we did all we need to do
+ // inside that function, no need to call onNack
+ bool old_nack = false;
+
+ if (schedule_next_interest){
+ if (interestRetransmissions_.find(segmentNumber) ==
+ interestRetransmissions_.end()){
+ //this is not a retransmitted packet
+ old_nack = onNack(*content_object, false);
+ updateDelayStats(*content_object);
+ } else {
+ old_nack = onNack(*content_object, true);
+ }
+ }
+
+ //the nacked_ state is used only to avoid to decrease inflightInterestsCount_
+ //multiple times. In fact, every time that we receive an event related to an
+ //interest (timeout, nacked, content) we cange the state. In this way we are
+ //sure that we do not decrease twice the counter
+ if(old_nack)
+ inflightInterests_[pkt].state = lost_;
+ else
+ inflightInterests_[pkt].state = nacked_;
+
} else {
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
@@ -661,6 +821,8 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
addRetransmissions(lastReceived_ + 1, segmentNumber);
+ if(segmentNumber > highestReceived_)
+ highestReceived_ = segmentNumber;
// lastReceived_ is updated only for data packets received without RTX
lastReceived_ = segmentNumber;
}
@@ -673,6 +835,10 @@ void RTCTransportProtocol::onContentObject(
}
// in any case we remove the packet from the rtx list
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if(it != interestRetransmissions_.end())
+ lossRecovered_ ++;
+
interestRetransmissions_.erase(segmentNumber);
if (schedule_next_interest) {