diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-02-21 11:52:28 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2020-02-26 13:19:16 +0100 |
commit | f4433f28b509a9f67ca85d79000ccf9c2f4b7a24 (patch) | |
tree | 0f754bc9d8222f3ace11849165753acd85be3b38 /libtransport/src/protocols/rtc.cc | |
parent | 0e7669445b6be1163189521eabed7dd0124043c8 (diff) |
[HICN-534] Major rework on libtransport organization
Change-Id: I361b83a18b4fd59be136d5f0817fc28e17e89884
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/protocols/rtc.cc')
-rw-r--r-- | libtransport/src/protocols/rtc.cc | 1017 |
1 files changed, 1017 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc new file mode 100644 index 000000000..0ac3839dd --- /dev/null +++ b/libtransport/src/protocols/rtc.cc @@ -0,0 +1,1017 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc.h> + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> + +#include <math.h> +#include <random> + +namespace transport { + +namespace protocol { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), + inflightInterests_(1 << default_values::log_2_default_buffer_size), + modMask_((1 << default_values::log_2_default_buffer_size) - 1) { + icn_socket->getSocketOption(PORTAL, portal_); + rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + sentinel_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + reset(); +} + +RTCTransportProtocol::~RTCTransportProtocol() { + if (is_running_) { + stop(); + } +} + +int RTCTransportProtocol::start() { + if (is_running_) return -1; + + reset(); + is_first_ = true; + + probeRtt(); + sentinelTimer(); + newRound(); + scheduleNextInterests(); + + is_first_ = false; + is_running_ = true; + portal_->runEventsLoop(); + is_running_ = false; + + return 0; +} + +void RTCTransportProtocol::stop() { + if (!is_running_) return; + + is_running_ = false; + portal_->stopEventsLoop(); +} + +void RTCTransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + inflightInterestsCount_ = 0; + + probeRtt(); + sentinelTimer(); + newRound(); + scheduleNextInterests(); + + portal_->runEventsLoop(); + is_running_ = false; +} + +// private +void RTCTransportProtocol::reset() { + portal_->setConsumerCallback(this); + // controller var + currentState_ = HICN_RTC_SYNC_STATE; + + // cwin var + currentCWin_ = HICN_INITIAL_CWIN; + maxCWin_ = HICN_INITIAL_CWIN_MAX; + + // names/packets var + actualSegment_ = 0; + inflightInterestsCount_ = 0; + interestRetransmissions_.clear(); + lastSegNacked_ = 0; + lastReceived_ = 0; + lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + lastEvent_ = lastReceivedTime_; + highestReceived_ = 0; + firstSequenceInRound_ = 0; + + rtx_timer_used_ = false; + for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { + inflightInterests_[i] = {0}; + } + + // stats + firstPckReceived_ = false; + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; + lossRecovered_ = 0; + avgPacketSize_ = HICN_INIT_PACKET_SIZE; + gotNack_ = false; + gotFutureNack_ = 0; + rounds_ = 0; + roundsWithoutNacks_ = 0; + pathTable_.clear(); + + // CC var + estimatedBw_ = 0.0; + lossRate_ = 0.0; + queuingDelay_ = 0.0; + protocolState_ = HICN_RTC_NORMAL_STATE; + + producerPathLabels_[0] = 0; + producerPathLabels_[1] = 0; + initied = false; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + (uint32_t)HICN_RTC_INTEREST_LIFETIME); + // XXX this should be done by the application +} + +uint32_t max(uint32_t a, uint32_t b) { + if (a > b) + return a; + else + return b; +} + +uint32_t min(uint32_t a, uint32_t b) { + if (a < b) + return a; + else + return b; +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(HICN_ROUND_LEN); + newRound(); + }); +} + +void RTCTransportProtocol::updateDelayStats( + const ContentObject &content_object) { + uint32_t segmentNumber = content_object.getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + if (inflightInterests_[pkt].state != sent_) return; + + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) + // this packet was rtx at least once + return; + + uint32_t pathLabel = content_object.getPathLabel(); + + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>(); + pathTable_[pathLabel] = newPath; + } + + // RTT measurements are useful both from NACKs and data packets + uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + inflightInterests_[pkt].transmissionTime; + + pathTable_[pathLabel]->insertRttSample(RTT); + auto payload = content_object.getPayload(); + + // we collect OWD only for datapackets + if (payload->length() != HICN_NACK_HEADER_SIZE) { + uint64_t *senderTimeStamp = (uint64_t *)payload->data(); + int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + *senderTimeStamp; + + pathTable_[pathLabel]->insertOwdSample(OWD); + pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); + } else { + pathTable_[pathLabel]->receivedNack(); + } +} + +void RTCTransportProtocol::updateStats(uint32_t round_duration) { + if (pathTable_.empty()) return; + + if (receivedBytes_ != 0) { + double bytesPerSec = + (double)(receivedBytes_ * + ((double)HICN_MILLI_IN_A_SEC / (double)round_duration)); + estimatedBw_ = (estimatedBw_ * HICN_ESTIMATED_BW_ALPHA) + + ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec); + } + + uint64_t minRtt = UINT_MAX; + uint64_t maxRtt = 0; + + for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { + it->second->roundEnd(); + if (it->second->isActive()) { + if (it->second->getMinRtt() < minRtt) { + minRtt = it->second->getMinRtt(); + producerPathLabels_[0] = it->first; + } + if (it->second->getMinRtt() > maxRtt) { + maxRtt = it->second->getMinRtt(); + producerPathLabels_[1] = it->first; + } + } + } + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) + return; // this should not happen + + // as a queuing delay we keep the lowest one among the two paths + // if one path is congested the forwarder should decide to do not + // use it so it does not make sense to inform the application + // that maybe we have a problem + if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + pathTable_[producerPathLabels_[1]]->getQueuingDealy()) + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + else + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + + if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { + uint32_t numberTheoricallyReceivedPackets_ = + highestReceived_ - firstSequenceInRound_; + double lossRate = 0; + if (numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / + (double)numberTheoricallyReceivedPackets_); + + if (lossRate < 0) lossRate = 0; + + if (initied) { + lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + } else { + lossRate_ = lossRate; + initied = true; + } + } + + if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; + + // for the BDP we use the max rtt, so that we calibrate the window on the + // RTT of the slowest path. In this way we are sure that the window will + // never be too small + uint32_t BDP = (uint32_t)ceil( + (estimatedBw_ * + (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); + uint32_t BW = (uint32_t)ceil(estimatedBw_); + computeMaxWindow(BW, BDP); + + ConsumerTimerCallback *stats_callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback) { + // Send the stats to the app + stats_->updateQueuingDelay(queuingDelay_); + stats_->updateLossRatio(lossRate_); + stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); + (*stats_callback)(*socket_->getInterface(), *stats_); + } + // bound also by interest lifitime* production rate + if (!gotNack_) { + roundsWithoutNacks_++; + if (currentState_ == HICN_RTC_SYNC_STATE && + roundsWithoutNacks_ >= HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH) { + currentState_ = HICN_RTC_NORMAL_STATE; + } + } else { + roundsWithoutNacks_ = 0; + } + + updateCCState(); + updateWindow(); + + if (queuingDelay_ > 25.0) { + // this indicates that the client will go soon out of synch, + // switch to synch mode + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + computeMaxWindow(BW, 0); + increaseWindow(); + } + + // in any case we reset all the counters + + gotNack_ = false; + gotFutureNack_ = 0; + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; + lossRecovered_ = 0; + rounds_++; + firstSequenceInRound_ = highestReceived_; +} + +void RTCTransportProtocol::updateCCState() { + // TODO +} + +void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, + uint32_t BDPWin) { + if (productionRate == + 0) // we have no info about the producer, keep the previous maxCWin + return; + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + uint32_t maxWaintingInterest = (uint32_t)ceil( + (productionRate / avgPacketSize_) * + (double)((double)(interestLifetime * + HICN_INTEREST_LIFETIME_REDUCTION_FACTOR) / + (double)HICN_MILLI_IN_A_SEC)); + + if (currentState_ == HICN_RTC_SYNC_STATE) { + // in this case we do not limit the window with the BDP, beacuse most + // likely it is wrong + maxCWin_ = maxWaintingInterest; + return; + } + + // currentState = RTC_NORMAL_STATE + if (BDPWin != 0) { + maxCWin_ = (uint32_t)ceil((double)BDPWin + + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% + } else { + maxCWin_ = min(maxWaintingInterest, maxCWin_); + } + + if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; +} + +void RTCTransportProtocol::updateWindow() { + if (currentState_ == HICN_RTC_SYNC_STATE) return; + + if (currentCWin_ < maxCWin_ * 0.9) { + currentCWin_ = + min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR)); + } else if (currentCWin_ > maxCWin_) { + currentCWin_ = + max((uint32_t)(currentCWin_ * HICN_WIN_DECREASE_FACTOR), HICN_MIN_CWIN); + } +} + +void RTCTransportProtocol::decreaseWindow() { + // this is used only in SYNC mode + if (currentState_ == HICN_RTC_NORMAL_STATE) return; + + if (gotFutureNack_ == 1) + currentCWin_ = min((currentCWin_ - 1), + (uint32_t)ceil((double)maxCWin_ * 0.66)); // 2/3 + else + currentCWin_--; + + currentCWin_ = max(currentCWin_, HICN_MIN_CWIN); +} + +void RTCTransportProtocol::increaseWindow() { + // this is used only in SYNC mode + if (currentState_ == HICN_RTC_NORMAL_STATE) return; + + // we need to be carefull to do not increase the window to much + if (currentCWin_ < ((double)maxCWin_ * 0.7)) { + currentCWin_ = currentCWin_ + 1; // exponential + } else { + currentCWin_ = min( + maxCWin_, + (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear + } +} + +void RTCTransportProtocol::probeRtt() { + probe_timer_->expires_from_now(std::chrono::milliseconds(1000)); + probe_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + probeRtt(); + }); + + // To avoid sending the first probe, because the transport is not running yet + if (is_first_ && !is_running_) return; + + time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + // get a random numbe in the probe seq range + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ, + HICN_MAX_PROBE_SEQ); + probe_seq_number_ = idis(eng); + interest_name->setSuffix(probe_seq_number_); + + // we considere the probe as a rtx so that we do not incresea inFlightInt + received_probe_ = false; + TRANSPORT_LOGD("Send content interest %u (probeRtt)", + interest_name->getSuffix()); + sendInterest(interest_name, true); +} + +void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { + auto interest = getPacket(); + interest->setName(*interest_name); + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + interest->setLifetime(uint32_t(interestLifetime)); + + ConsumerInterestCallback *on_interest_output = nullptr; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output); + + if (*on_interest_output) { + (*on_interest_output)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); + + sentInterest_++; + + if (!rtx) { + packets_in_window_[interest_name->getSuffix()] = 0; + inflightInterestsCount_++; + } +} + +void RTCTransportProtocol::scheduleNextInterests() { + if (!is_running_ && !is_first_) return; + + TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----", + currentCWin_, inflightInterestsCount_, + currentCWin_ - inflightInterestsCount_); + + while (inflightInterestsCount_ < currentCWin_) { + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + interest_name->setSuffix(actualSegment_); + + // if the producer socket is not stated (does not reply even with nacks) + // we keep asking for something without marking anything as lost (see + // timeout). In this way when the producer socket will start the + // consumer socket will not miss any packet + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { + uint32_t pkt = actualSegment_ & modMask_; + inflightInterests_[pkt].state = sent_; + inflightInterests_[pkt].sequence = actualSegment_; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + TRANSPORT_LOGD( + "Send content interest %u (scheduleNextInterests no replies)", + interest_name->getSuffix()); + sendInterest(interest_name, false); + return; + } + + // we send the packet only if it is not pending yet + // notice that this is not true for rtx packets + if (portal_->interestIsPending(*interest_name)) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + uint32_t pkt = actualSegment_ & modMask_; + // if we already reacevied the content we don't ask it again + if (inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + // same if the packet is lost + if (inflightInterests_[pkt].state == lost_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + // here the packet can be in any state except for lost or recevied + inflightInterests_[pkt].state = sent_; + inflightInterests_[pkt].sequence = actualSegment_; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + + TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)", + interest_name->getSuffix()); + sendInterest(interest_name, false); + } + + TRANSPORT_LOGD("----- end of scheduleNextInterest -----"); +} + +bool RTCTransportProtocol::verifyKeyPackets() { + // Not yet implemented + return false; +} + +void RTCTransportProtocol::sentinelTimer() { + uint32_t wait = 50; + + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { + // we have all the info to set the timers + wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); + if (wait == 0) wait = 1; + } + + sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); + sentinel_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { + // we have no info, so we send again + + for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); + it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + } else { + uint64_t max_waiting_time = // wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); + + if ((currentState_ == HICN_RTC_NORMAL_STATE) && + (inflightInterestsCount_ >= currentCWin_) && + ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); + + for (auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first && + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + } + } + + sentinelTimer(); + }); +} +void RTCTransportProtocol::addRetransmissions(uint32_t val) { + // add only val in the rtx list + addRetransmissions(val, val + 1); +} + +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + bool new_rtx = false; + for (uint32_t i = start; i < stop; i++) { + auto it = interestRetransmissions_.find(i); + if (it == interestRetransmissions_.end()) { + uint32_t pkt = i & modMask_; + if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { + // it must be larger than the last past nack received + packetLost_++; + interestRetransmissions_[i] = 0; + uint32_t pkt = i & modMask_; + // we reset the transmission time setting to now, so that rtx will + // happne in one RTT on waint one inter arrival gap + inflightInterests_[pkt].transmissionTime = now; + new_rtx = true; + } + } // if the retransmission is already there the rtx timer will + // take care of it + } + + // in case a new rtx is added to the map we need to run checkRtx() + if (new_rtx) { + if (rtx_timer_used_) { + // if a timer is pending we need to delete it + rtx_timer_->cancel(); + rtx_timer_used_ = false; + } + checkRtx(); + } +} + +uint64_t RTCTransportProtocol::retransmit() { + auto it = interestRetransmissions_.begin(); + + // cut len to max HICN_MAX_RTX_SIZE + // since we use a map, the smaller (and so the older) sequence number are at + // the beginnin of the map + while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { + it = interestRetransmissions_.erase(it); + } + + it = interestRetransmissions_.begin(); + uint64_t smallest_timeout = ULONG_MAX; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + while (it != interestRetransmissions_.end()) { + uint32_t pkt = it->first & modMask_; + + if (inflightInterests_[pkt].sequence != it->first) { + // this packet is not anymore in the inflight buffer, erase it + it = interestRetransmissions_.erase(it); + continue; + } + + // we retransmitted the packet too many times + if (it->second >= HICN_MAX_RTX) { + it = interestRetransmissions_.erase(it); + continue; + } + + // this packet is too old + if ((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { + it = interestRetransmissions_.erase(it); + continue; + } + + uint64_t rtx_time = now; + + if (it->second == 0) { + // first rtx + if (producerPathLabels_[0] != producerPathLabels_[1]) { + // multipath + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer, send it immediatly + } else { + // single path + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer send immediatly + } + } else { + // second or plus rtx, wait for the min rtt + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { + uint64_t sent_time = inflightInterests_[pkt].transmissionTime; + rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); + } // if we don't have info we send it immediatly + } + + if (now >= rtx_time) { + inflightInterests_[pkt].transmissionTime = now; + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + TRANSPORT_LOGD("Send content interest %u (retransmit)", + interest_name->getSuffix()); + sendInterest(interest_name, true); + } else if (rtx_time < smallest_timeout) { + smallest_timeout = rtx_time; + } + + ++it; + } + return smallest_timeout; +} + +void RTCTransportProtocol::checkRtx() { + if (interestRetransmissions_.empty()) { + rtx_timer_used_ = false; + return; + } + + uint64_t next_timeout = retransmit(); + uint64_t wait = 1; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (next_timeout != ULONG_MAX && now < next_timeout) { + wait = next_timeout - now; + } + rtx_timer_used_ = true; + rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); + rtx_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + rtx_timer_used_ = false; + checkRtx(); + }); +} + +void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { + uint32_t segmentNumber = interest->getName().getSuffix(); + + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + // this is a timeout on a probe, do nothing + return; + } + + uint32_t pkt = segmentNumber & modMask_; + + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { + // we do nothing, and we keep asking the same stuff over + // and over until we get at least a packet + inflightInterestsCount_--; + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + scheduleNextInterests(); + return; + } + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; + } + + // check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { + inflightInterests_[pkt].state = lost_; + } + + if (inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } + + if (inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + } else { + addRetransmissions(segmentNumber); + } + + scheduleNextInterests(); +} + +bool RTCTransportProtocol::onNack(const ContentObject &content_object, + bool rtx) { + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); + uint32_t productionSeg = *payload; + uint32_t productionRate = *(++payload); + uint32_t nackSegment = content_object.getName().getSuffix(); + + bool old_nack = false; + + // if we did not received anything between lastReceived_ + 1 and productionSeg + // most likelly some packets got lost + if (lastReceived_ != 0) { + addRetransmissions(lastReceived_ + 1, productionSeg); + } + + if (!rtx) { + gotNack_ = true; + // we synch the estimated production rate with the actual one + estimatedBw_ = (double)productionRate; + } + + if (productionSeg > nackSegment) { + // we are asking for stuff produced in the past + actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; + + if (!rtx) { + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + + computeMaxWindow(productionRate, 0); + increaseWindow(); + } + + lastSegNacked_ = productionSeg; + old_nack = true; + + } else if (productionSeg < nackSegment) { + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; + + if (!rtx) { + // we are asking stuff in the future + gotFutureNack_++; + computeMaxWindow(productionRate, 0); + decreaseWindow(); + + if (currentState_ == HICN_RTC_SYNC_STATE) { + currentState_ = HICN_RTC_NORMAL_STATE; + } + } + } else { + // we are asking the right thing, but the producer is slow + // keep doing the same until the packet is produced + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; + } + + return old_nack; +} + +void RTCTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + // as soon as we get a packet firstPckReceived_ will never be false + firstPckReceived_ = true; + + auto payload = content_object->getPayload(); + uint32_t payload_size = (uint32_t)payload->length(); + uint32_t segmentNumber = content_object->getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + ConsumerContentObjectCallback *callback_content_object = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &callback_content_object); + if (*callback_content_object) { + (*callback_content_object)(*socket_->getInterface(), *content_object); + } + + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + TRANSPORT_LOGD("Received probe %u", segmentNumber); + if (segmentNumber == probe_seq_number_ && !received_probe_) { + received_probe_ = true; + + uint32_t pathLabel = content_object->getPathLabel(); + if (pathTable_.find(pathLabel) == pathTable_.end()) { + std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>(); + pathTable_[pathLabel] = newPath; + } + + // this is the expected probe, update the RTT and drop the packet + uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + time_sent_probe_; + + pathTable_[pathLabel]->insertRttSample(RTT); + pathTable_[pathLabel]->receivedNack(); + } + return; + } + + // check if the packet is a rtx + bool is_rtx = false; + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) { + is_rtx = true; + } else { + auto it_win = packets_in_window_.find(segmentNumber); + if (it_win != packets_in_window_.end() && it_win->second != 0) + is_rtx = true; + } + + if (payload_size == HICN_NACK_HEADER_SIZE) { + TRANSPORT_LOGD("Received nack %u", segmentNumber); + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; + } + + bool old_nack = false; + + if (!is_rtx) { + // this is not a retransmitted packet + old_nack = onNack(*content_object, false); + updateDelayStats(*content_object); + } else { + old_nack = onNack(*content_object, true); + } + + // the nacked_ state is used only to avoid to decrease + // inflightInterestsCount_ multiple times. In fact, every time that we + // receive an event related to an interest (timeout, nacked, content) we + // cange the state. In this way we are sure that we do not decrease twice + // the counter + if (old_nack) { + inflightInterests_[pkt].state = lost_; + interestRetransmissions_.erase(segmentNumber); + } else { + inflightInterests_[pkt].state = nacked_; + } + + } else { + TRANSPORT_LOGD("Received content %u", segmentNumber); + + avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); + + receivedBytes_ += (uint32_t)(content_object->headerSize() + + content_object->payloadSize()); + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; // packet sent without timeouts + } + + if (inflightInterests_[pkt].state == sent_ && !is_rtx) { + // delay stats are computed only for non retransmitted data + updateDelayStats(*content_object); + } + + addRetransmissions(lastReceived_ + 1, segmentNumber); + if (segmentNumber > highestReceived_) { + highestReceived_ = segmentNumber; + } + if (segmentNumber > lastReceived_) { + lastReceived_ = segmentNumber; + lastReceivedTime_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } + receivedData_++; + inflightInterests_[pkt].state = received_; + + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end()) lossRecovered_++; + + interestRetransmissions_.erase(segmentNumber); + + reassemble(std::move(content_object)); + increaseWindow(); + } + + scheduleNextInterests(); +} + +} // end namespace protocol + +} // end namespace transport |