From 755c6833ae2d2eee87e80ed3b84c75e968f48c46 Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Tue, 15 Oct 2019 18:08:41 +0200 Subject: [HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno --- libtransport/src/hicn/transport/protocols/rtc.cc | 266 +++++++++++------------ 1 file changed, 133 insertions(+), 133 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc') diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 620523cbe..e6134f767 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -88,13 +88,13 @@ void RTCTransportProtocol::reset() { lastSegNacked_ = 0; lastReceived_ = 0; lastReceivedTime_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); highestReceived_ = 0; firstSequenceInRound_ = 0; rtx_timer_used_ = false; - for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ + for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { inflightInterests_[i] = {0}; } @@ -191,14 +191,13 @@ void RTCTransportProtocol::updateDelayStats( pathTable_[pathLabel]->insertOwdSample(OWD); pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); - }else{ + } else { pathTable_[pathLabel]->receivedNack(); } } void RTCTransportProtocol::updateStats(uint32_t round_duration) { - if(pathTable_.empty()) - return; + if (pathTable_.empty()) return; if (receivedBytes_ != 0) { double bytesPerSec = @@ -213,68 +212,70 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { it->second->roundEnd(); - if(it->second->isActive()){ - if(it->second->getMinRtt() < minRtt){ + if (it->second->isActive()) { + if (it->second->getMinRtt() < minRtt) { minRtt = it->second->getMinRtt(); producerPathLabels_[0] = it->first; } - if(it->second->getMinRtt() > maxRtt){ + if (it->second->getMinRtt() > maxRtt) { maxRtt = it->second->getMinRtt(); producerPathLabels_[1] = it->first; } } } - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) - return; //this should not happen + return; // this should not happen - //as a queuing delay we keep the lowest one among the two paths - //if one path is congested the forwarder should decide to do not - //use it so it does not make sense to inform the application - //that maybe we have a problem - if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + // as a queuing delay we keep the lowest one among the two paths + // if one path is congested the forwarder should decide to do not + // use it so it does not make sense to inform the application + // that maybe we have a problem + if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < pathTable_[producerPathLabels_[1]]->getQueuingDealy()) - queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); else - queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_; + uint32_t numberTheoricallyReceivedPackets_ = + highestReceived_ - firstSequenceInRound_; double lossRate = 0; - if(numberTheoricallyReceivedPackets_ != 0) - lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_); + if (numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / + (double)numberTheoricallyReceivedPackets_); - if(lossRate < 0) - lossRate = 0; + if (lossRate < 0) lossRate = 0; - if(initied){ + if (initied) { lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + - (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); - }else { - lossRate_ =lossRate; + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + } else { + lossRate_ = lossRate; initied = true; } } if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; - //for the BDP we use the max rtt, so that we calibrate the window on the - //RTT of the slowest path. In this way we are sure that the window will - //never be too small + // for the BDP we use the max rtt, so that we calibrate the window on the + // RTT of the slowest path. In this way we are sure that the window will + // never be too small uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() / - (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); + (estimatedBw_ * + (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); ConsumerTimerCallback *stats_callback = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, &stats_callback); - if (*stats_callback != VOID_HANDLER) { - //Send the stats to the app + if (*stats_callback) { + // Send the stats to the app stats_.updateQueuingDelay(queuingDelay_); stats_.updateLossRatio(lossRate_); (*stats_callback)(*socket_, stats_); @@ -334,8 +335,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, // currentState = RTC_NORMAL_STATE if (BDPWin != 0) { - maxCWin_ = - (uint32_t)ceil((double)BDPWin + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% + maxCWin_ = (uint32_t)ceil((double)BDPWin + + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% } else { maxCWin_ = min(maxWaintingInterest, maxCWin_); } @@ -380,22 +381,22 @@ void RTCTransportProtocol::increaseWindow() { } } -void RTCTransportProtocol::probeRtt(){ +void RTCTransportProtocol::probeRtt() { time_sent_probe_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - //get a random numbe in the probe seq range + &interest_name); + // get a random numbe in the probe seq range std::default_random_engine eng((std::random_device())()); - std::uniform_int_distribution idis( - HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ); + std::uniform_int_distribution idis(HICN_MIN_PROBE_SEQ, + HICN_MAX_PROBE_SEQ); probe_seq_number_ = idis(eng); interest_name->setSuffix(probe_seq_number_); - //we considere the probe as a rtx so that we do not incresea inFlightInt + // we considere the probe as a rtx so that we do not incresea inFlightInt received_probe_ = false; sendInterest(interest_name, true); @@ -406,7 +407,6 @@ void RTCTransportProtocol::probeRtt(){ }); } - void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -421,11 +421,11 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &on_interest_output); - if (*on_interest_output != VOID_HANDLER) { + if (*on_interest_output) { (*on_interest_output)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -440,20 +440,20 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { void RTCTransportProtocol::scheduleNextInterests() { checkRound(); - if (!is_running_) return; + if (!is_running_ && !is_first_) return; while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - interest_name->setSuffix(actualSegment_); + interest_name->setSuffix(actualSegment_); // if the producer socket is not stated (does not reply even with nacks) // we keep asking for something without marking anything as lost (see // timeout). In this way when the producer socket will start the - //consumer socket will not miss any packet - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + // consumer socket will not miss any packet + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { uint32_t pkt = actualSegment_ & modMask_; inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; @@ -477,9 +477,9 @@ void RTCTransportProtocol::scheduleNextInterests() { continue; } - //same if the packet is lost + // same if the packet is lost if (inflightInterests_[pkt].state == lost_ && - inflightInterests_[pkt].sequence == actualSegment_){ + inflightInterests_[pkt].sequence == actualSegment_) { actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -489,7 +489,7 @@ void RTCTransportProtocol::scheduleNextInterests() { std::chrono::steady_clock::now().time_since_epoch()) .count(); - //here the packet can be in any state except for lost or recevied + // here the packet can be in any state except for lost or recevied inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; @@ -506,22 +506,21 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) { void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); bool new_rtx = false; for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); if (it == interestRetransmissions_.end()) { uint32_t pkt = i & modMask_; - if (lastSegNacked_ <= i && - inflightInterests_[pkt].state != received_) { + if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { // it must be larger than the last past nack received packetLost_++; interestRetransmissions_[i] = 0; uint32_t pkt = i & modMask_; - //we reset the transmission time setting to now, so that rtx will - //happne in one RTT on waint one inter arrival gap + // we reset the transmission time setting to now, so that rtx will + // happne in one RTT on waint one inter arrival gap inflightInterests_[pkt].transmissionTime = now; new_rtx = true; } @@ -529,10 +528,10 @@ void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { // take care of it } - //in case a new rtx is added to the map we need to run checkRtx() - if(new_rtx){ - if(rtx_timer_used_){ - //if a timer is pending we need to delete it + // in case a new rtx is added to the map we need to run checkRtx() + if (new_rtx) { + if (rtx_timer_used_) { + // if a timer is pending we need to delete it rtx_timer_->cancel(); rtx_timer_used_ = false; } @@ -553,8 +552,8 @@ uint64_t RTCTransportProtocol::retransmit() { it = interestRetransmissions_.begin(); uint64_t smallest_timeout = ULONG_MAX; uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; @@ -580,46 +579,46 @@ uint64_t RTCTransportProtocol::retransmit() { uint64_t rtx_time = now; - if(it->second == 0) { - //first rtx - if(producerPathLabels_[0] != producerPathLabels_[1]){ - //multipath - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && - (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)){ + if (it->second == 0) { + // first rtx + if (producerPathLabels_[0] != producerPathLabels_[1]) { + // multipath + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { rtx_time = lastReceivedTime_ + - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - }//else low rate producer, send it immediatly - }else{ - //single path - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer, send it immediatly + } else { + // single path + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)){ + HICN_MIN_INTER_ARRIVAL_GAP)) { rtx_time = lastReceivedTime_ + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - }//else low rate producer send immediatly + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer send immediatly } - }else{ - //second or plus rtx, wait for the min rtt - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ + } else { + // second or plus rtx, wait for the min rtt + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { uint64_t sent_time = inflightInterests_[pkt].transmissionTime; rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); - }//if we don't have info we send it immediatly + } // if we don't have info we send it immediatly } - if(now >= rtx_time){ + if (now >= rtx_time) { inflightInterests_[pkt].transmissionTime = now; it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); - }else if(rtx_time < smallest_timeout){ + } else if (rtx_time < smallest_timeout) { smallest_timeout = rtx_time; } @@ -629,7 +628,7 @@ uint64_t RTCTransportProtocol::retransmit() { } void RTCTransportProtocol::checkRtx() { - if(interestRetransmissions_.empty()){ + if (interestRetransmissions_.empty()) { rtx_timer_used_ = false; return; } @@ -637,10 +636,10 @@ void RTCTransportProtocol::checkRtx() { uint64_t next_timeout = retransmit(); uint64_t wait = 1; uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if(next_timeout != ULONG_MAX && now < next_timeout){ - wait = next_timeout - now; + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (next_timeout != ULONG_MAX && now < next_timeout) { + wait = next_timeout - now; } rtx_timer_used_ = true; rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); @@ -656,14 +655,14 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint32_t segmentNumber = interest->getName().getSuffix(); - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { // this is a timeout on a probe, do nothing return; } uint32_t pkt = segmentNumber & modMask_; - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { inflightInterestsCount_--; // we do nothing, and we keep asking the same stuff over // and over until we get at least a packet @@ -698,7 +697,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } -bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { +bool RTCTransportProtocol::onNack(const ContentObject &content_object, + bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); @@ -708,11 +708,11 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) // if we did not received anything between lastReceived_ + 1 and productionSeg // most likelly some packets got lost - if(lastReceived_ != 0){ + if (lastReceived_ != 0) { addRetransmissions(lastReceived_ + 1, productionSeg); } - if(!rtx){ + if (!rtx) { gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; @@ -722,7 +722,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) // we are asking for stuff produced in the past actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; - if(!rtx) { + if (!rtx) { if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -737,7 +737,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) } else if (productionSeg < nackSegment) { actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; - if(!rtx){ + if (!rtx) { // we are asking stuff in the future gotFutureNack_++; computeMaxWindow(productionRate, 0); @@ -748,8 +748,8 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) } } } else { - //we are asking the right thing, but the producer is slow - //keep doing the same until the packet is produced + // we are asking the right thing, but the producer is slow + // keep doing the same until the packet is produced actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; } @@ -758,7 +758,6 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // as soon as we get a packet firstPckReceived_ will never be false firstPckReceived_ = true; @@ -770,24 +769,25 @@ void RTCTransportProtocol::onContentObject( ConsumerContentObjectCallback *callback_content_object = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, &callback_content_object); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_content_object)(*socket_, *content_object); } - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ - if(segmentNumber == probe_seq_number_ && !received_probe_){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + if (segmentNumber == probe_seq_number_ && !received_probe_) { received_probe_ = true; uint32_t pathLabel = content_object->getPathLabel(); - if (pathTable_.find(pathLabel) == pathTable_.end()){ - //if this path does not exists we cannot create a new one so drop + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // if this path does not exists we cannot create a new one so drop return; } - //this is the expected probe, update the RTT and drop the packet + // this is the expected probe, update the RTT and drop the packet uint64_t RTT = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - time_sent_probe_; + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + time_sent_probe_; pathTable_[pathLabel]->insertRttSample(RTT); pathTable_[pathLabel]->receivedNack(); @@ -803,22 +803,23 @@ void RTCTransportProtocol::onContentObject( bool old_nack = false; if (interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ - //this is not a retransmitted packet + interestRetransmissions_.end()) { + // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); } else { old_nack = onNack(*content_object, true); } - //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ - //multiple times. In fact, every time that we receive an event related to an - //interest (timeout, nacked, content) we cange the state. In this way we are - //sure that we do not decrease twice the counter - if(old_nack){ + // the nacked_ state is used only to avoid to decrease + // inflightInterestsCount_ multiple times. In fact, every time that we + // receive an event related to an interest (timeout, nacked, content) we + // cange the state. In this way we are sure that we do not decrease twice the + // counter + if (old_nack) { inflightInterests_[pkt].state = lost_; interestRetransmissions_.erase(segmentNumber); - }else{ + } else { inflightInterests_[pkt].state = nacked_; } @@ -827,7 +828,7 @@ void RTCTransportProtocol::onContentObject( ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); receivedBytes_ += (uint32_t)(content_object->headerSize() + - content_object->payloadSize()); + content_object->payloadSize()); if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; // packet sent without timeouts @@ -841,22 +842,21 @@ void RTCTransportProtocol::onContentObject( } addRetransmissions(lastReceived_ + 1, segmentNumber); - if(segmentNumber > highestReceived_){ + if (segmentNumber > highestReceived_) { highestReceived_ = segmentNumber; } - if(segmentNumber > lastReceived_){ + if (segmentNumber > lastReceived_) { lastReceived_ = segmentNumber; - lastReceivedTime_ = std::chrono::duration_cast< - std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + lastReceivedTime_ = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); } receivedData_++; inflightInterests_[pkt].state = received_; auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end()) - lossRecovered_ ++; + if (it != interestRetransmissions_.end()) lossRecovered_++; interestRetransmissions_.erase(segmentNumber); -- cgit 1.2.3-korg