From 3bce9bfdce707313de4f9cccdc867abd9edf82df Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 7 Feb 2020 20:00:06 +0100 Subject: [HICN-508] [HICN-509] [HICN-506] Manifest rework Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/protocols/rtc.cc | 210 +++++++++-------------- 1 file changed, 80 insertions(+), 130 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc') diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 559e86592..e371217f8 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -13,11 +13,12 @@ * limitations under the License. */ -#include -#include +#include #include -#include + +#include +#include namespace transport { @@ -26,14 +27,16 @@ namespace protocol { using namespace interface; RTCTransportProtocol::RTCTransportProtocol( - interface::ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), + interface::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { - icnet_socket->getSocketOption(PORTAL, portal_); + icn_socket->getSocketOption(PORTAL, portal_); rtx_timer_ = std::make_unique(portal_->getIoService()); probe_timer_ = std::make_unique(portal_->getIoService()); - sentinel_timer_ = std::make_unique(portal_->getIoService()); + sentinel_timer_ = + std::make_unique(portal_->getIoService()); round_timer_ = std::make_unique(portal_->getIoService()); reset(); } @@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) { } void RTCTransportProtocol::newRound() { - round_timer_->expires_from_now(std::chrono::milliseconds( - HICN_ROUND_LEN)); + round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); round_timer_->async_wait([this](std::error_code ec) { if (ec) return; updateStats(HICN_ROUND_LEN); @@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { &stats_callback); if (*stats_callback) { // Send the stats to the app - stats_.updateQueuingDelay(queuingDelay_); - stats_.updateLossRatio(lossRate_); - stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_, stats_); + stats_->updateQueuingDelay(queuingDelay_); + stats_->updateLossRatio(lossRate_); + stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); + (*stats_callback)(*socket_, *stats_); } // bound also by interest lifitime* production rate @@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { updateCCState(); updateWindow(); - if(queuingDelay_ > 25.0){ - //this indicates that the client will go soon out of synch, - //switch to synch mode + if (queuingDelay_ > 25.0) { + // this indicates that the client will go soon out of synch, + // switch to synch mode if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, maxCWin_ = min(maxWaintingInterest, maxCWin_); } - if(maxCWin_ < HICN_MIN_CWIN) - maxCWin_ = HICN_MIN_CWIN; + if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; } void RTCTransportProtocol::updateWindow() { @@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() { } } -void RTCTransportProtocol::sentinelTimer(){ +void RTCTransportProtocol::sentinelTimer() { uint32_t wait = 50; - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ - //we have all the info to set the timers + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { + // we have all the info to set the timers wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); - if(wait == 0) - wait = 1; + if (wait == 0) wait = 1; } sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); sentinel_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){ - //we have no info, so we send again - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ - uint32_t pkt = it->first & modMask_; - if (inflightInterests_[pkt].sequence == it->first) { - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); - } + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { + // we have no info, so we send again + + for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); + it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } - }else{ - uint64_t max_waiting_time = //wait at least 50ms - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); + } + } else { + uint64_t max_waiting_time = // wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); - if((currentState_ == HICN_RTC_NORMAL_STATE) && + if ((currentState_ == HICN_RTC_NORMAL_STATE) && (inflightInterestsCount_ >= currentCWin_) && - ((now - lastEvent_) > max_waiting_time) && - (lossRate_ >= 0.05)){ + ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ + for (auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++) { uint32_t pkt = it->first & modMask_; if (inflightInterests_[pkt].sequence == it->first && - ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){ - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } } } @@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { // and over until we get at least a packet inflightInterestsCount_--; lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); scheduleNextInterests(); return; @@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } @@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject( return; } - //check if the packet is a rtx + // check if the packet is a rtx bool is_rtx = false; - if(interestRetransmissions_.find(segmentNumber) != - interestRetransmissions_.end()){ + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) { is_rtx = true; - }else{ + } else { auto it_win = packets_in_window_.find(segmentNumber); - if(it_win != packets_in_window_.end() && - it_win->second != 0) - is_rtx = true; + if (it_win != packets_in_window_.end() && it_win->second != 0) + is_rtx = true; } if (payload_size == HICN_NACK_HEADER_SIZE) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } bool old_nack = false; - if (!is_rtx){ + if (!is_rtx) { // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); @@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject( // the nacked_ state is used only to avoid to decrease // inflightInterestsCount_ multiple times. In fact, every time that we // receive an event related to an interest (timeout, nacked, content) we - // cange the state. In this way we are sure that we do not decrease twice the - // counter + // cange the state. In this way we are sure that we do not decrease twice + // the counter if (old_nack) { inflightInterests_[pkt].state = lost_; interestRetransmissions_.erase(segmentNumber); @@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject( if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; // packet sent without timeouts } - if (inflightInterests_[pkt].state == sent_ && !is_rtx){ + if (inflightInterests_[pkt].state == sent_ && !is_rtx) { // delay stats are computed only for non retransmitted data updateDelayStats(*content_object); } @@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject( scheduleNextInterests(); } -void RTCTransportProtocol::returnContentToApplication( - const ContentObject &content_object) { - // return content to the user - auto read_buffer = content_object.getPayload(); - - read_buffer->trimStart(HICN_TIMESTAMP_SIZE); - - interface::ConsumerSocket::ReadCallback *read_callback = nullptr; - socket_->getSocketOption(READ_CALLBACK, &read_callback); - - if (read_callback == nullptr) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before starting " - "the content retrieval."); - } - - if (read_callback->isBufferMovable()) { - read_callback->readBufferAvailable( - utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); - } else { - // The buffer will be copied into the application-provided buffer - uint8_t *buffer; - std::size_t length; - std::size_t total_length = read_buffer->length(); - - while (read_buffer->length()) { - buffer = nullptr; - length = 0; - read_callback->getReadBuffer(&buffer, &length); - - if (!buffer || !length) { - throw errors::RuntimeException( - "Invalid buffer provided by the application."); - } - - auto to_copy = std::min(read_buffer->length(), length); - - std::memcpy(buffer, read_buffer->data(), to_copy); - read_buffer->trimStart(to_copy); - } - - read_callback->readDataAvailable(total_length); - read_buffer->clear(); - } -} - } // end namespace protocol } // end namespace transport -- cgit 1.2.3-korg