From eaaff7fa111c821ed6710dec7b6c49c5ecac6ad4 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Thu, 3 Oct 2019 12:09:01 +0200 Subject: [HICN-291] handle multiple paths in RTC Change-Id: I69d331aa6e953e802e2f4b3e60325f852941fd94 Signed-off-by: michele papalini --- libtransport/src/hicn/transport/protocols/rtc.cc | 372 +++++++++++++++------ libtransport/src/hicn/transport/protocols/rtc.h | 58 +++- .../src/hicn/transport/protocols/rtc_data_path.cc | 91 ++++- .../src/hicn/transport/protocols/rtc_data_path.h | 19 +- .../src/hicn/transport/protocols/statistics.h | 23 +- 5 files changed, 432 insertions(+), 131 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 5ff0126b0..070ce2c6a 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -14,6 +14,7 @@ */ #include +#include #include #include @@ -32,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol( icnet_socket->getSocketOption(PORTAL, portal_); nack_timer_ = std::make_unique(portal_->getIoService()); rtx_timer_ = std::make_unique(portal_->getIoService()); + probe_timer_ = std::make_unique(portal_->getIoService()); nack_timer_used_ = false; reset(); } @@ -43,7 +45,7 @@ RTCTransportProtocol::~RTCTransportProtocol() { } int RTCTransportProtocol::start() { - checkRtx(); + probeRtt(); return TransportProtocol::start(); } @@ -62,8 +64,8 @@ void RTCTransportProtocol::resume() { lastRoundBegin_ = std::chrono::steady_clock::now(); inflightInterestsCount_ = 0; + probeRtt(); scheduleNextInterests(); - checkRtx(); portal_->runEventsLoop(); @@ -87,10 +89,11 @@ void RTCTransportProtocol::reset() { interestRetransmissions_.clear(); lastSegNacked_ = 0; lastReceived_ = 0; - nackedByProducer_.clear(); - nackedByProducerMaxSize_ = 512; + highestReceived_ = 0; + firstSequenceInRound_ = 0; nack_timer_used_ = false; + rtx_timer_used_ = false; for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ inflightInterests_[i] = {0}; } @@ -100,12 +103,12 @@ void RTCTransportProtocol::reset() { sentInterest_ = 0; receivedData_ = 0; packetLost_ = 0; + lossRecovered_ = 0; avgPacketSize_ = HICN_INIT_PACKET_SIZE; gotNack_ = false; gotFutureNack_ = 0; roundsWithoutNacks_ = 0; pathTable_.clear(); - minRtt_ = UINT_MAX; // CC var estimatedBw_ = 0.0; @@ -113,7 +116,10 @@ void RTCTransportProtocol::reset() { queuingDelay_ = 0.0; protocolState_ = HICN_RTC_NORMAL_STATE; - producerPathLabel_ = 0; + producerPathLabels_[0] = 0; + producerPathLabels_[1] = 0; + initied = false; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, (uint32_t)HICN_RTC_INTEREST_LIFETIME); // XXX this should be done by the application @@ -183,10 +189,16 @@ void RTCTransportProtocol::updateDelayStats( *senderTimeStamp; pathTable_[pathLabel]->insertOwdSample(OWD); + pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); + }else{ + pathTable_[pathLabel]->receivedNack(); } } void RTCTransportProtocol::updateStats(uint32_t round_duration) { + if(pathTable_.empty()) + return; + if (receivedBytes_ != 0) { double bytesPerSec = (double)(receivedBytes_ * @@ -195,33 +207,78 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec); } - auto it = pathTable_.find(producerPathLabel_); - if (it == pathTable_.end()) return; - - minRtt_ = it->second->getMinRtt(); - queuingDelay_ = it->second->getQueuingDealy(); - - if (minRtt_ == 0) minRtt_ = 1; + uint64_t minRtt = UINT_MAX; + uint64_t maxRtt = 0; for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { it->second->roundEnd(); + if(it->second->isActive()){ + if(it->second->getMinRtt() < minRtt){ + minRtt = it->second->getMinRtt(); + producerPathLabels_[0] = it->first; + } + if(it->second->getMinRtt() > maxRtt){ + maxRtt = it->second->getMinRtt(); + producerPathLabels_[1] = it->first; + } + } } + if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) + return; //this should not happen + + //as a queuing delay we keep the lowest one among the two paths + //if one path is congested the forwarder should decide to do not + //use it soon so it does not make sens to inform the application + //that maybe we have a problem + if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + pathTable_[producerPathLabels_[1]]->getQueuingDealy()) + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + else + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - double lossRate = (double)((double)packetLost_ / (double)sentInterest_); - lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + + uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_; + double lossRate = 0; + if(numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_); + + if(lossRate < 0) + lossRate = 0; + + if(initied){ + lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + }else { + lossRate_ =lossRate; + initied = true; + } } if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; + //for the BDP we use the max rtt, so that we calibrate the window on the + //RTT of the slowest path. In this way we are sure that the window will + //never be too small uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * (double)((double)minRtt_ / (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); + (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); + ConsumerTimerCallback *stats_callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback != VOID_HANDLER) { + //Send the stats to the app + stats_.updateQueuingDelay(queuingDelay_); + stats_.updateLossRatio(lossRate_); + (*stats_callback)(*socket_, stats_); + } + // bound also by interest lifitime* production rate if (!gotNack_) { roundsWithoutNacks_++; @@ -244,6 +301,8 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { sentInterest_ = 0; receivedData_ = 0; packetLost_ = 0; + lossRecovered_ = 0; + firstSequenceInRound_ = highestReceived_; } void RTCTransportProtocol::updateCCState() { @@ -320,6 +379,33 @@ void RTCTransportProtocol::increaseWindow() { } } +void RTCTransportProtocol::probeRtt(){ + time_sent_probe_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + //get a random numbe in the probe seq range + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution idis( + HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ); + probe_seq_number_ = idis(eng); + interest_name->setSuffix(probe_seq_number_); + + //we considere the probe as a rtx so that we do not incresea inFlightInt + received_probe_ = false; + sendInterest(interest_name, true); + + probe_timer_->expires_from_now(std::chrono::milliseconds(1000)); + probe_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + probeRtt(); + }); +} + + void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -363,7 +449,7 @@ void RTCTransportProtocol::scheduleNextInterests() { // we send the packet only if it is not pending yet interest_name->setSuffix(actualSegment_); if (portal_->interestIsPending(*interest_name)) { - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -371,7 +457,14 @@ void RTCTransportProtocol::scheduleNextInterests() { // if we already reacevied the content we don't ask it again if (inflightInterests_[pkt].state == received_ && inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + //same if the packet is lost + if (inflightInterests_[pkt].state == lost_ && + inflightInterests_[pkt].sequence == actualSegment_){ + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -379,9 +472,11 @@ void RTCTransportProtocol::scheduleNextInterests() { std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); + + //here the packet can be in any state except for lost or recevied inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; - actualSegment_++; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; sendInterest(interest_name, false); checkRound(); @@ -394,20 +489,31 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) { } void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); if (it == interestRetransmissions_.end()) { if (lastSegNacked_ <= i) { // i must be larger than the last past nack received + packetLost_++; interestRetransmissions_[i] = 0; + uint32_t pkt = i & modMask_; + //we reset the transmission time setting to now, so that rtx will + //happne in one RTT on waint one inter arrival gap + inflightInterests_[pkt].transmissionTime = now; } } // if the retransmission is already there the rtx timer will // take care of it } - retransmit(true); + + if(!rtx_timer_used_) + checkRtx(); } -void RTCTransportProtocol::retransmit(bool first_rtx) { +void RTCTransportProtocol::retransmit() { auto it = interestRetransmissions_.begin(); // cut len to max HICN_MAX_RTX_SIZE @@ -441,51 +547,63 @@ void RTCTransportProtocol::retransmit(bool first_rtx) { continue; } - if (first_rtx) { - // TODO (optimization) - // the rtx that we never sent (it->second == 0) are all at the - // end, so we can go directly there - if (it->second == 0) { - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - it->second++; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - sendInterest(interest_name, true); + uint64_t sent_time = inflightInterests_[pkt].transmissionTime; + uint64_t rtx_time = sent_time; + + if(it->second == 0){ + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ + //first rtx: wait RTTmax - RTTmin + gap + rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt() + + pathTable_[producerPathLabels_[1]]->getInterArrivalGap(); } - ++it; - } else { - // base on time - uint64_t now = std::chrono::duration_cast( + }else{ + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ + //second+ rtx: waint min rtt + rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); + } + } + + uint64_t now = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); - if ((now - inflightInterests_[pkt].transmissionTime) > 20) { - // XXX replace 20 with rtt - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - it->second++; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + if(now >= rtx_time){ + inflightInterests_[pkt].transmissionTime = now; + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - interest_name->setSuffix(it->first); - sendInterest(interest_name, true); - } - ++it; + interest_name->setSuffix(it->first); + sendInterest(interest_name, true); } + + ++it; } } void RTCTransportProtocol::checkRtx() { - retransmit(false); - rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); + if(interestRetransmissions_.empty()){ + rtx_timer_used_ = false; + return; + } + + //we use the packet intearriva time on the fastest path + //even if this stats should be the same on both + auto pathStats = pathTable_.find(producerPathLabels_[0]); + uint64_t wait = 1; + if(pathStats != pathTable_.end()){ + wait = floor(pathStats->second->getInterArrivalGap() / 2.0); + if(wait < 1) + wait = 1; + } + + rtx_timer_used_ = true; + retransmit(); + rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); rtx_timer_->async_wait([this](std::error_code ec) { if (ec) return; checkRtx(); @@ -555,63 +673,62 @@ bool RTCTransportProtocol::checkIfProducerIsActive( return true; } -void RTCTransportProtocol::onNack(const ContentObject &content_object) { +bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); - gotNack_ = true; - // we synch the estimated production rate with the actual one - estimatedBw_ = (double)productionRate; + bool old_nack = false; + + if(!rtx){ + gotNack_ = true; + // we synch the estimated production rate with the actual one + estimatedBw_ = (double)productionRate; + } if (productionSeg > nackSegment) { // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg + 1, actualSegment_); - if (currentState_ == HICN_RTC_NORMAL_STATE) { - currentState_ = HICN_RTC_SYNC_STATE; + actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ; + + if(!rtx) { + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + + computeMaxWindow(productionRate, 0); + increaseWindow(); } - computeMaxWindow(productionRate, 0); - increaseWindow(); + //we need to remove the rtx for packets with seq number + //< productionSeg + for(auto it = interestRetransmissions_.begin(); it != + interestRetransmissions_.end();){ + if(it->first < productionSeg) + it = interestRetransmissions_.erase(it); + else + ++it; + } - interestRetransmissions_.clear(); lastSegNacked_ = productionSeg; - - if (nackedByProducer_.size() >= nackedByProducerMaxSize_) - nackedByProducer_.erase(nackedByProducer_.begin()); - nackedByProducer_.insert(nackSegment); + old_nack = true; } else if (productionSeg < nackSegment) { - // we are asking stuff in the future - gotFutureNack_++; - - actualSegment_ = productionSeg + 1; + actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ; - computeMaxWindow(productionRate, 0); - decreaseWindow(); + if(!rtx){ + // we are asking stuff in the future + gotFutureNack_++; + computeMaxWindow(productionRate, 0); + decreaseWindow(); - if (currentState_ == HICN_RTC_SYNC_STATE) { - currentState_ = HICN_RTC_NORMAL_STATE; + if (currentState_ == HICN_RTC_SYNC_STATE) { + currentState_ = HICN_RTC_NORMAL_STATE; + } } } // equal should not happen -} - -void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) { - uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); - uint32_t productionSeg = *payload; - uint32_t nackSegment = content_object.getName().getSuffix(); - - if (productionSeg > nackSegment) { - // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg + 1, actualSegment_); - - interestRetransmissions_.clear(); - lastSegNacked_ = productionSeg; - } else if (productionSeg < nackSegment) { - actualSegment_ = productionSeg + 1; - } // equal should not happen + return old_nack; } void RTCTransportProtocol::onContentObject( @@ -629,20 +746,63 @@ void RTCTransportProtocol::onContentObject( (*callback_content_object)(*socket_, *content_object); } + if(segmentNumber == probe_seq_number_){ + if(payload_size == HICN_NACK_HEADER_SIZE){ + if(!received_probe_){ + received_probe_ = true; + + uint32_t pathLabel = content_object->getPathLabel(); + if (pathTable_.find(pathLabel) == pathTable_.end()){ + //if this path does not exists we cannot create a new one so drop + return; + } + + //this is the expected probe, update the RTT and drop the packet + uint64_t RTT = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - time_sent_probe_; + + pathTable_[pathLabel]->insertRttSample(RTT); + pathTable_[pathLabel]->receivedNack(); + return; + } + }else{ + //this should never happen + //don't know what to do, let's try to process it as normal packet + } + } + if (payload_size == HICN_NACK_HEADER_SIZE) { - // Nacks always come form the producer, so we set the producerPathLabel_; - producerPathLabel_ = content_object->getPathLabel(); schedule_next_interest = checkIfProducerIsActive(*content_object); + if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; - // if checkIfProducerIsActive returns false, we did all we need to do - // inside that function, no need to call onNack - if (schedule_next_interest) onNack(*content_object); - updateDelayStats(*content_object); - } else { - if (schedule_next_interest) onNackForRtx(*content_object); } + // if checkIfProducerIsActive returns false, we did all we need to do + // inside that function, no need to call onNack + bool old_nack = false; + + if (schedule_next_interest){ + if (interestRetransmissions_.find(segmentNumber) == + interestRetransmissions_.end()){ + //this is not a retransmitted packet + old_nack = onNack(*content_object, false); + updateDelayStats(*content_object); + } else { + old_nack = onNack(*content_object, true); + } + } + + //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ + //multiple times. In fact, every time that we receive an event related to an + //interest (timeout, nacked, content) we cange the state. In this way we are + //sure that we do not decrease twice the counter + if(old_nack) + inflightInterests_[pkt].state = lost_; + else + inflightInterests_[pkt].state = nacked_; + } else { avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); @@ -661,6 +821,8 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); addRetransmissions(lastReceived_ + 1, segmentNumber); + if(segmentNumber > highestReceived_) + highestReceived_ = segmentNumber; // lastReceived_ is updated only for data packets received without RTX lastReceived_ = segmentNumber; } @@ -673,6 +835,10 @@ void RTCTransportProtocol::onContentObject( } // in any case we remove the packet from the rtx list + auto it = interestRetransmissions_.find(segmentNumber); + if(it != interestRetransmissions_.end()) + lossRecovered_ ++; + interestRetransmissions_.erase(segmentNumber); if (schedule_next_interest) { diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 66ad05a88..770768e31 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -35,6 +35,13 @@ #define HICN_TIMESTAMP_SIZE 8 // bytes #define HICN_RTC_INTEREST_LIFETIME 1000 // ms +//rtt measurement +//normal interests for data goes from 0 to +//HICN_MIN_PROBE_SEQ, the rest is reserverd for +//probes +#define HICN_MIN_PROBE_SEQ 0xefffffff +#define HICN_MAX_PROBE_SEQ 0xffffffff + // controller constant #define HICN_ROUND_LEN 200 // ms interval of time on which // we take decisions / measurements @@ -62,12 +69,14 @@ #define HICN_MICRO_IN_A_SEC 1000000 #define HICN_MILLI_IN_A_SEC 1000 + namespace transport { namespace protocol { enum packetState { sent_, + nacked_, received_, timeout1_, timeout2_, @@ -115,16 +124,15 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void scheduleNextInterests() override; void addRetransmissions(uint32_t val); void addRetransmissions(uint32_t start, uint32_t stop); - void retransmit(bool first_rtx); + void retransmit(); void checkRtx(); + void probeRtt(); void onTimeout(Interest::Ptr &&interest) override; // checkIfProducerIsActive: return true if we need to schedule an interest // immediatly after, false otherwise (this happens when the producer socket // is not active) bool checkIfProducerIsActive(const ContentObject &content_object); - void onNack(const ContentObject &content_object); - //funtcion used to handle nacks for retransmitted interests - void onNackForRtx(const ContentObject &content_object); + bool onNack(const ContentObject &content_object, bool rtx); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; void returnContentToApplication(const ContentObject &content_object); @@ -148,48 +156,62 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { //map seq to rtx std::map interestRetransmissions_; std::unique_ptr rtx_timer_; - //std::queue interestRetransmissions_; std::vector inflightInterests_; uint32_t lastSegNacked_; //indicates the segment id in the last received // past Nack. we do not ask for retransmissions //for samething that is older than this value. uint32_t lastReceived_; //segment of the last content object received //indicates the base of the window on the client - uint32_t nackedByProducerMaxSize_; - std::set - nackedByProducer_; // this is used to avoid retransmissions from the - // application for pakets for which we already got a - // past NACK by the producer these packet are too old, - // they will never be retrived + bool nack_timer_used_; + bool rtx_timer_used_; std::unique_ptr nack_timer_; // timer used to schedule - // a nack retransmission in + // a nack retransmission in case // of inactive prod socket + //rtt probes + //the RTC transport tends to overestimate the RTT + //du to the production time on the server side + //once per second we send an interest for wich we know + //we will get a nack. This nack will keep our estimation + //close to the reality + std::unique_ptr probe_timer_; + uint64_t time_sent_probe_; + uint32_t probe_seq_number_; + bool received_probe_; + uint32_t modMask_; // stats uint32_t receivedBytes_; uint32_t sentInterest_; uint32_t receivedData_; - uint32_t packetLost_; + int32_t packetLost_; + int32_t lossRecovered_; + uint32_t firstSequenceInRound_; + uint32_t highestReceived_; double avgPacketSize_; bool gotNack_; uint32_t gotFutureNack_; uint32_t roundsWithoutNacks_; - uint32_t producerPathLabel_; // XXX we pick only one path lable for the - // producer for now, assuming the usage of a - // single path this should be extended to a - // vector + + //we keep track of up two paths (if only one path is in use + //the two values in the vector will be the same) + //position 0 stores the path with minRTT + //position 1 stores the path with maxRTT + uint32_t producerPathLabels_[2]; + std::unordered_map> pathTable_; uint32_t roundCounter_; - uint64_t minRtt_; // CC var double estimatedBw_; double lossRate_; double queuingDelay_; unsigned protocolState_; + + bool initied; + TransportStatistics stats_; }; } // namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc index 6c9605fb2..0cbff0e3c 100644 --- a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc +++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc @@ -14,6 +14,10 @@ */ #include +#include +#include + +#define MAX_ROUNDS_WITHOUT_PKTS 10 //2sec namespace transport { @@ -30,7 +34,13 @@ RTCDataPath::RTCDataPath() // (for congestion/quality control) prev_min_owd(INT_MAX), avg_owd(0.0), - queuing_delay(0.0), + queuing_delay(DBL_MAX), + lastRecvSeq_(0), + lastRecvTime_(0), + avg_inter_arrival_(DBL_MAX), + received_nacks_(false), + received_packets_(false), + rounds_without_packets_(0), RTThistory_(HISTORY_LEN), OWDhistory_(HISTORY_LEN){}; @@ -43,14 +53,62 @@ void RTCDataPath::insertOwdSample(int64_t owd) { // for owd we use both min and avg if (owd < min_owd) min_owd = owd; - avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + if(avg_owd != DBL_MAX) + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + else { + avg_owd = owd; + } + + //owd is computed only for valid data packets so we count only + //this for decide if we recevie traffic or not + received_packets_ = true; } -void RTCDataPath::roundEnd() { - // compute queuing delay - queuing_delay = avg_owd - getMinOwd(); +void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber){ + + //got packet in sequence, compute gap + if(lastRecvSeq_ == (segmentNumber - 1)){ + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t delta = now - lastRecvTime_; + lastRecvSeq_ = segmentNumber; + lastRecvTime_ = now; + if(avg_inter_arrival_ == DBL_MAX) + avg_inter_arrival_ = delta; + else + avg_inter_arrival_ = (avg_inter_arrival_ * (1 -ALPHA_RTC)) + + (delta * ALPHA_RTC); + return; + } + + //ooo packet, update the stasts if needed + if(lastRecvSeq_ <= segmentNumber){ + lastRecvSeq_ = segmentNumber; + lastRecvTime_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } +} + +void RTCDataPath::receivedNack(){ + received_nacks_ = true; +} + +double RTCDataPath::getInterArrivalGap(){ + if(avg_inter_arrival_ == DBL_MAX) + return 0; + return avg_inter_arrival_; +} + +bool RTCDataPath::isActive(){ + if(received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) + return true; + return false; +} - // reset min_rtt and add it to the history +void RTCDataPath::roundEnd() { + // reset min_rtt and add it to the history if (min_rtt != UINT_MAX) { prev_min_rtt = min_rtt; } else { @@ -60,6 +118,9 @@ void RTCDataPath::roundEnd() { min_rtt = prev_min_rtt; } + if(min_rtt == 0) + min_rtt = 1; + RTThistory_.pushBack(min_rtt); min_rtt = UINT_MAX; @@ -70,8 +131,22 @@ void RTCDataPath::roundEnd() { min_owd = prev_min_owd; } - OWDhistory_.pushBack(min_owd); - min_owd = INT_MAX; + if (min_owd != INT_MAX) { + OWDhistory_.pushBack(min_owd); + min_owd = INT_MAX; + + // compute queuing delay + queuing_delay = avg_owd - getMinOwd(); + + } else { + queuing_delay = 0.0; + } + + if(!received_packets_) + rounds_without_packets_++; + else + rounds_without_packets_ = 0; + received_packets_ = false; } double RTCDataPath::getQueuingDealy() { return queuing_delay; } diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h index b55139d52..c8a049368 100644 --- a/libtransport/src/hicn/transport/protocols/rtc_data_path.h +++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.h @@ -33,10 +33,13 @@ class RTCDataPath { public: void insertRttSample(uint64_t rtt); void insertOwdSample(int64_t owd); + void computeInterArrivalGap(uint32_t segmentNumber); + void receivedNack(); uint64_t getMinRtt(); - double getQueuingDealy(); + double getInterArrivalGap(); + bool isActive(); void roundEnd(); @@ -53,6 +56,20 @@ class RTCDataPath { double queuing_delay; + uint32_t lastRecvSeq_; + uint64_t lastRecvTime_; + double avg_inter_arrival_; + + //flags to check if a path is active + //we considere a path active if it reaches a producer + //(not a cache) --aka we got at least one nack on this path-- + //and if we receives packets + bool received_nacks_; + bool received_packets_; + uint8_t rounds_without_packets_; //if we don't get any packet + //for MAX_ROUNDS_WITHOUT_PKTS + //we consider the path inactive + utils::MinFilter RTThistory_; utils::MinFilter OWDhistory_; }; diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h index 47d164158..d5e89b96d 100644 --- a/libtransport/src/hicn/transport/protocols/statistics.h +++ b/libtransport/src/hicn/transport/protocols/statistics.h @@ -33,7 +33,9 @@ class TransportStatistics { average_rtt_(0), avg_window_size_(0), interest_tx_(0), - alpha_(alpha) {} + alpha_(alpha), + loss_ratio_(0.0), + queuing_delay_(0.0) {} TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) { retx_count_ += retx; @@ -56,6 +58,14 @@ class TransportStatistics { interest_tx_ += int_tx; } + TRANSPORT_ALWAYS_INLINE void updateLossRatio(double loss_ratio) { + loss_ratio_ = loss_ratio; + } + + TRANSPORT_ALWAYS_INLINE void updateQueuingDelay(double queuing_delay) { + queuing_delay_ = queuing_delay; + } + TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; } TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const { @@ -72,12 +82,21 @@ class TransportStatistics { return interest_tx_; } + TRANSPORT_ALWAYS_INLINE double getLossRatio() const { + return loss_ratio_; + } + + TRANSPORT_ALWAYS_INLINE double getQueuingDelay() const { + return queuing_delay_; + } + TRANSPORT_ALWAYS_INLINE void reset() { retx_count_ = 0; bytes_received_ = 0; average_rtt_ = 0; avg_window_size_ = 0; interest_tx_ = 0; + loss_ratio_ = 0; } private: @@ -87,6 +106,8 @@ class TransportStatistics { double avg_window_size_; uint64_t interest_tx_; double alpha_; + double loss_ratio_; + double queuing_delay_; }; } // end namespace protocol -- cgit 1.2.3-korg