From eaaff7fa111c821ed6710dec7b6c49c5ecac6ad4 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Thu, 3 Oct 2019 12:09:01 +0200 Subject: [HICN-291] handle multiple paths in RTC Change-Id: I69d331aa6e953e802e2f4b3e60325f852941fd94 Signed-off-by: michele papalini --- libtransport/src/hicn/transport/protocols/rtc.cc | 372 ++++++++++++++++------- 1 file changed, 269 insertions(+), 103 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc') diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 5ff0126b0..070ce2c6a 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -14,6 +14,7 @@ */ #include +#include #include #include @@ -32,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol( icnet_socket->getSocketOption(PORTAL, portal_); nack_timer_ = std::make_unique(portal_->getIoService()); rtx_timer_ = std::make_unique(portal_->getIoService()); + probe_timer_ = std::make_unique(portal_->getIoService()); nack_timer_used_ = false; reset(); } @@ -43,7 +45,7 @@ RTCTransportProtocol::~RTCTransportProtocol() { } int RTCTransportProtocol::start() { - checkRtx(); + probeRtt(); return TransportProtocol::start(); } @@ -62,8 +64,8 @@ void RTCTransportProtocol::resume() { lastRoundBegin_ = std::chrono::steady_clock::now(); inflightInterestsCount_ = 0; + probeRtt(); scheduleNextInterests(); - checkRtx(); portal_->runEventsLoop(); @@ -87,10 +89,11 @@ void RTCTransportProtocol::reset() { interestRetransmissions_.clear(); lastSegNacked_ = 0; lastReceived_ = 0; - nackedByProducer_.clear(); - nackedByProducerMaxSize_ = 512; + highestReceived_ = 0; + firstSequenceInRound_ = 0; nack_timer_used_ = false; + rtx_timer_used_ = false; for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ inflightInterests_[i] = {0}; } @@ -100,12 +103,12 @@ void RTCTransportProtocol::reset() { sentInterest_ = 0; receivedData_ = 0; packetLost_ = 0; + lossRecovered_ = 0; avgPacketSize_ = HICN_INIT_PACKET_SIZE; gotNack_ = false; gotFutureNack_ = 0; roundsWithoutNacks_ = 0; pathTable_.clear(); - minRtt_ = UINT_MAX; // CC var estimatedBw_ = 0.0; @@ -113,7 +116,10 @@ void RTCTransportProtocol::reset() { queuingDelay_ = 0.0; protocolState_ = HICN_RTC_NORMAL_STATE; - producerPathLabel_ = 0; + producerPathLabels_[0] = 0; + producerPathLabels_[1] = 0; + initied = false; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, (uint32_t)HICN_RTC_INTEREST_LIFETIME); // XXX this should be done by the application @@ -183,10 +189,16 @@ void RTCTransportProtocol::updateDelayStats( *senderTimeStamp; pathTable_[pathLabel]->insertOwdSample(OWD); + pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); + }else{ + pathTable_[pathLabel]->receivedNack(); } } void RTCTransportProtocol::updateStats(uint32_t round_duration) { + if(pathTable_.empty()) + return; + if (receivedBytes_ != 0) { double bytesPerSec = (double)(receivedBytes_ * @@ -195,33 +207,78 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec); } - auto it = pathTable_.find(producerPathLabel_); - if (it == pathTable_.end()) return; - - minRtt_ = it->second->getMinRtt(); - queuingDelay_ = it->second->getQueuingDealy(); - - if (minRtt_ == 0) minRtt_ = 1; + uint64_t minRtt = UINT_MAX; + uint64_t maxRtt = 0; for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { it->second->roundEnd(); + if(it->second->isActive()){ + if(it->second->getMinRtt() < minRtt){ + minRtt = it->second->getMinRtt(); + producerPathLabels_[0] = it->first; + } + if(it->second->getMinRtt() > maxRtt){ + maxRtt = it->second->getMinRtt(); + producerPathLabels_[1] = it->first; + } + } } + if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) + return; //this should not happen + + //as a queuing delay we keep the lowest one among the two paths + //if one path is congested the forwarder should decide to do not + //use it soon so it does not make sens to inform the application + //that maybe we have a problem + if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + pathTable_[producerPathLabels_[1]]->getQueuingDealy()) + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + else + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - double lossRate = (double)((double)packetLost_ / (double)sentInterest_); - lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + + uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_; + double lossRate = 0; + if(numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_); + + if(lossRate < 0) + lossRate = 0; + + if(initied){ + lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + }else { + lossRate_ =lossRate; + initied = true; + } } if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; + //for the BDP we use the max rtt, so that we calibrate the window on the + //RTT of the slowest path. In this way we are sure that the window will + //never be too small uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * (double)((double)minRtt_ / (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); + (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); + ConsumerTimerCallback *stats_callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback != VOID_HANDLER) { + //Send the stats to the app + stats_.updateQueuingDelay(queuingDelay_); + stats_.updateLossRatio(lossRate_); + (*stats_callback)(*socket_, stats_); + } + // bound also by interest lifitime* production rate if (!gotNack_) { roundsWithoutNacks_++; @@ -244,6 +301,8 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { sentInterest_ = 0; receivedData_ = 0; packetLost_ = 0; + lossRecovered_ = 0; + firstSequenceInRound_ = highestReceived_; } void RTCTransportProtocol::updateCCState() { @@ -320,6 +379,33 @@ void RTCTransportProtocol::increaseWindow() { } } +void RTCTransportProtocol::probeRtt(){ + time_sent_probe_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + //get a random numbe in the probe seq range + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution idis( + HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ); + probe_seq_number_ = idis(eng); + interest_name->setSuffix(probe_seq_number_); + + //we considere the probe as a rtx so that we do not incresea inFlightInt + received_probe_ = false; + sendInterest(interest_name, true); + + probe_timer_->expires_from_now(std::chrono::milliseconds(1000)); + probe_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + probeRtt(); + }); +} + + void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -363,7 +449,7 @@ void RTCTransportProtocol::scheduleNextInterests() { // we send the packet only if it is not pending yet interest_name->setSuffix(actualSegment_); if (portal_->interestIsPending(*interest_name)) { - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -371,7 +457,14 @@ void RTCTransportProtocol::scheduleNextInterests() { // if we already reacevied the content we don't ask it again if (inflightInterests_[pkt].state == received_ && inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + //same if the packet is lost + if (inflightInterests_[pkt].state == lost_ && + inflightInterests_[pkt].sequence == actualSegment_){ + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -379,9 +472,11 @@ void RTCTransportProtocol::scheduleNextInterests() { std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); + + //here the packet can be in any state except for lost or recevied inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; sendInterest(interest_name, false); checkRound(); @@ -394,20 +489,31 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) { } void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); if (it == interestRetransmissions_.end()) { if (lastSegNacked_ <= i) { // i must be larger than the last past nack received + packetLost_++; interestRetransmissions_[i] = 0; + uint32_t pkt = i & modMask_; + //we reset the transmission time setting to now, so that rtx will + //happne in one RTT on waint one inter arrival gap + inflightInterests_[pkt].transmissionTime = now; } } // if the retransmission is already there the rtx timer will // take care of it } - retransmit(true); + + if(!rtx_timer_used_) + checkRtx(); } -void RTCTransportProtocol::retransmit(bool first_rtx) { +void RTCTransportProtocol::retransmit() { auto it = interestRetransmissions_.begin(); // cut len to max HICN_MAX_RTX_SIZE @@ -441,51 +547,63 @@ void RTCTransportProtocol::retransmit(bool first_rtx) { continue; } - if (first_rtx) { - // TODO (optimization) - // the rtx that we never sent (it->second == 0) are all at the - // end, so we can go directly there - if (it->second == 0) { - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - it->second++; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - sendInterest(interest_name, true); + uint64_t sent_time = inflightInterests_[pkt].transmissionTime; + uint64_t rtx_time = sent_time; + + if(it->second == 0){ + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ + //first rtx: wait RTTmax - RTTmin + gap + rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt() + + pathTable_[producerPathLabels_[1]]->getInterArrivalGap(); } - ++it; - } else { - // base on time - uint64_t now = std::chrono::duration_cast( + }else{ + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ + //second+ rtx: waint min rtt + rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); + } + } + + uint64_t now = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); - if ((now - inflightInterests_[pkt].transmissionTime) > 20) { - // XXX replace 20 with rtt - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - it->second++; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + if(now >= rtx_time){ + inflightInterests_[pkt].transmissionTime = now; + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - interest_name->setSuffix(it->first); - sendInterest(interest_name, true); - } - ++it; + interest_name->setSuffix(it->first); + sendInterest(interest_name, true); } + + ++it; } } void RTCTransportProtocol::checkRtx() { - retransmit(false); - rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); + if(interestRetransmissions_.empty()){ + rtx_timer_used_ = false; + return; + } + + //we use the packet intearriva time on the fastest path + //even if this stats should be the same on both + auto pathStats = pathTable_.find(producerPathLabels_[0]); + uint64_t wait = 1; + if(pathStats != pathTable_.end()){ + wait = floor(pathStats->second->getInterArrivalGap() / 2.0); + if(wait < 1) + wait = 1; + } + + rtx_timer_used_ = true; + retransmit(); + rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); rtx_timer_->async_wait([this](std::error_code ec) { if (ec) return; checkRtx(); @@ -555,63 +673,62 @@ bool RTCTransportProtocol::checkIfProducerIsActive( return true; } -void RTCTransportProtocol::onNack(const ContentObject &content_object) { +bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); - gotNack_ = true; - // we synch the estimated production rate with the actual one - estimatedBw_ = (double)productionRate; + bool old_nack = false; + + if(!rtx){ + gotNack_ = true; + // we synch the estimated production rate with the actual one + estimatedBw_ = (double)productionRate; + } if (productionSeg > nackSegment) { // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg + 1, actualSegment_); - if (currentState_ == HICN_RTC_NORMAL_STATE) { - currentState_ = HICN_RTC_SYNC_STATE; + actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ; + + if(!rtx) { + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + + computeMaxWindow(productionRate, 0); + increaseWindow(); } - computeMaxWindow(productionRate, 0); - increaseWindow(); + //we need to remove the rtx for packets with seq number + //< productionSeg + for(auto it = interestRetransmissions_.begin(); it != + interestRetransmissions_.end();){ + if(it->first < productionSeg) + it = interestRetransmissions_.erase(it); + else + ++it; + } - interestRetransmissions_.clear(); lastSegNacked_ = productionSeg; - - if (nackedByProducer_.size() >= nackedByProducerMaxSize_) - nackedByProducer_.erase(nackedByProducer_.begin()); - nackedByProducer_.insert(nackSegment); + old_nack = true; } else if (productionSeg < nackSegment) { - // we are asking stuff in the future - gotFutureNack_++; - - actualSegment_ = productionSeg + 1; + actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ; - computeMaxWindow(productionRate, 0); - decreaseWindow(); + if(!rtx){ + // we are asking stuff in the future + gotFutureNack_++; + computeMaxWindow(productionRate, 0); + decreaseWindow(); - if (currentState_ == HICN_RTC_SYNC_STATE) { - currentState_ = HICN_RTC_NORMAL_STATE; + if (currentState_ == HICN_RTC_SYNC_STATE) { + currentState_ = HICN_RTC_NORMAL_STATE; + } } } // equal should not happen -} - -void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) { - uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); - uint32_t productionSeg = *payload; - uint32_t nackSegment = content_object.getName().getSuffix(); - - if (productionSeg > nackSegment) { - // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg + 1, actualSegment_); - - interestRetransmissions_.clear(); - lastSegNacked_ = productionSeg; - } else if (productionSeg < nackSegment) { - actualSegment_ = productionSeg + 1; - } // equal should not happen + return old_nack; } void RTCTransportProtocol::onContentObject( @@ -629,20 +746,63 @@ void RTCTransportProtocol::onContentObject( (*callback_content_object)(*socket_, *content_object); } + if(segmentNumber == probe_seq_number_){ + if(payload_size == HICN_NACK_HEADER_SIZE){ + if(!received_probe_){ + received_probe_ = true; + + uint32_t pathLabel = content_object->getPathLabel(); + if (pathTable_.find(pathLabel) == pathTable_.end()){ + //if this path does not exists we cannot create a new one so drop + return; + } + + //this is the expected probe, update the RTT and drop the packet + uint64_t RTT = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - time_sent_probe_; + + pathTable_[pathLabel]->insertRttSample(RTT); + pathTable_[pathLabel]->receivedNack(); + return; + } + }else{ + //this should never happen + //don't know what to do, let's try to process it as normal packet + } + } + if (payload_size == HICN_NACK_HEADER_SIZE) { - // Nacks always come form the producer, so we set the producerPathLabel_; - producerPathLabel_ = content_object->getPathLabel(); schedule_next_interest = checkIfProducerIsActive(*content_object); + if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; - // if checkIfProducerIsActive returns false, we did all we need to do - // inside that function, no need to call onNack - if (schedule_next_interest) onNack(*content_object); - updateDelayStats(*content_object); - } else { - if (schedule_next_interest) onNackForRtx(*content_object); } + // if checkIfProducerIsActive returns false, we did all we need to do + // inside that function, no need to call onNack + bool old_nack = false; + + if (schedule_next_interest){ + if (interestRetransmissions_.find(segmentNumber) == + interestRetransmissions_.end()){ + //this is not a retransmitted packet + old_nack = onNack(*content_object, false); + updateDelayStats(*content_object); + } else { + old_nack = onNack(*content_object, true); + } + } + + //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ + //multiple times. In fact, every time that we receive an event related to an + //interest (timeout, nacked, content) we cange the state. In this way we are + //sure that we do not decrease twice the counter + if(old_nack) + inflightInterests_[pkt].state = lost_; + else + inflightInterests_[pkt].state = nacked_; + } else { avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); @@ -661,6 +821,8 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); addRetransmissions(lastReceived_ + 1, segmentNumber); + if(segmentNumber > highestReceived_) + highestReceived_ = segmentNumber; // lastReceived_ is updated only for data packets received without RTX lastReceived_ = segmentNumber; } @@ -673,6 +835,10 @@ void RTCTransportProtocol::onContentObject( } // in any case we remove the packet from the rtx list + auto it = interestRetransmissions_.find(segmentNumber); + if(it != interestRetransmissions_.end()) + lossRecovered_ ++; + interestRetransmissions_.erase(segmentNumber); if (schedule_next_interest) { -- cgit 1.2.3-korg