From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/protocols/rtc.cc | 204 +++++++++++++---------- 1 file changed, 113 insertions(+), 91 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc') diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 9402d3b02..4205ade4e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats( uint32_t segmentNumber = content_object.getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - if (inflightInterests_[pkt].state != sent_) - return; + if (inflightInterests_[pkt].state != sent_) return; - if(interestRetransmissions_.find(segmentNumber) != + if (interestRetransmissions_.find(segmentNumber) != interestRetransmissions_.end()) - //this packet was rtx at least once + // this packet was rtx at least once return; uint32_t pathLabel = content_object.getPathLabel(); @@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() { } else { currentCWin_ = min( maxCWin_, - (uint32_t)ceil(currentCWin_ + - (1.0 / (double)currentCWin_))); // linear + (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear } } @@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { if (!rtx) { inflightInterestsCount_++; } - } void RTCTransportProtocol::scheduleNextInterests() { @@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() { while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); - //we send the packet only if it is not pending yet + // we send the packet only if it is not pending yet interest_name->setSuffix(actualSegment_); if (portal_->interestIsPending(*interest_name)) { actualSegment_++; @@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() { } uint32_t pkt = actualSegment_ & modMask_; - //if we already reacevied the content we don't ask it again - if(inflightInterests_[pkt].state == received_ && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; - continue; + // if we already reacevied the content we don't ask it again + if (inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_++; + continue; } inflightInterests_[pkt].transmissionTime = @@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector &nacks) { #endif } -void RTCTransportProtocol::addRetransmissions(uint32_t val){ - //add only val in the rtx list +void RTCTransportProtocol::addRetransmissions(uint32_t val) { + // add only val in the rtx list addRetransmissions(val, val + 1); } -void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){ - for(uint32_t i = start; i < stop; i++){ +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); - if(it == interestRetransmissions_.end()){ + if (it == interestRetransmissions_.end()) { if (lastSegNacked_ <= i) { - //i must be larger than the last past nack received + // i must be larger than the last past nack received interestRetransmissions_[i] = 0; } - }//if the retransmission is already there the rtx timer will - //take care of it + } // if the retransmission is already there the rtx timer will + // take care of it } retransmit(true); } -void RTCTransportProtocol::retransmit(bool first_rtx){ +void RTCTransportProtocol::retransmit(bool first_rtx) { auto it = interestRetransmissions_.begin(); - //cut len to max HICN_MAX_RTX_SIZE - //since we use a map, the smaller (and so the older) sequence number are at - //the beginnin of the map - while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){ + // cut len to max HICN_MAX_RTX_SIZE + // since we use a map, the smaller (and so the older) sequence number are at + // the beginnin of the map + while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { it = interestRetransmissions_.erase(it); } it = interestRetransmissions_.begin(); - while (it != interestRetransmissions_.end()){ + while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; - if(inflightInterests_[pkt].sequence != it->first){ - //this packet is not anymore in the inflight buffer, erase it + if (inflightInterests_[pkt].sequence != it->first) { + // this packet is not anymore in the inflight buffer, erase it it = interestRetransmissions_.erase(it); continue; } - //we retransmitted the packet too many times - if(it->second >= HICN_MAX_RTX){ + // we retransmitted the packet too many times + if (it->second >= HICN_MAX_RTX) { it = interestRetransmissions_.erase(it); continue; } - //this packet is too old - if((lastReceived_ > it->first) && - (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){ + // this packet is too old + if ((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { it = interestRetransmissions_.erase(it); continue; } - if(first_rtx){ - //TODO (optimization) - //the rtx that we never sent (it->second == 0) are all at the - //end, so we can go directly there - if(it->second == 0){ + if (first_rtx) { + // TODO (optimization) + // the rtx that we never sent (it->second == 0) are all at the + // end, so we can go directly there + if (it->second == 0) { inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } ++it; - }else{ - //base on time + } else { + // base on time uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if((now - inflightInterests_[pkt].transmissionTime) > 20){ - //XXX replace 20 with rtt + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if ((now - inflightInterests_[pkt].transmissionTime) > 20) { + // XXX replace 20 with rtt inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } @@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){ } } -void RTCTransportProtocol::checkRtx(){ +void RTCTransportProtocol::checkRtx() { retransmit(false); rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); rtx_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - checkRtx(); - }); + if (ec) return; + checkRtx(); + }); } void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { @@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { inflightInterestsCount_--; } - //check how many times we sent this packet - auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){ - inflightInterests_[pkt].state = lost_; - } + // check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == sent_) { - inflightInterests_[pkt].state = timeout1_; - } else if (inflightInterests_[pkt].state == timeout1_) { - inflightInterests_[pkt].state = timeout2_; - } else if (inflightInterests_[pkt].state == timeout2_) { - inflightInterests_[pkt].state = lost_; - } + if (inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == lost_) { - interestRetransmissions_.erase(segmentNumber); - }else{ - addRetransmissions(segmentNumber); - } + if (inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + } else { + addRetransmissions(segmentNumber); + } scheduleNextInterests(); } @@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive( uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); - if (productionRate == 0) { // the producer socket is not active // in this case we consider only the first nack if (nack_timer_used_) { - return false; + return false; } nack_timer_used_ = true; @@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject( ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); if (inflightInterests_[pkt].state == sent_) { - inflightInterestsCount_--; //packet sent without timeouts + inflightInterestsCount_--; // packet sent without timeouts } if (inflightInterests_[pkt].state == sent_ && interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ + interestRetransmissions_.end()) { // we count only non retransmitted data in order to take into accunt only // the transmition rate of the producer receivedBytes_ += (uint32_t)(content_object->headerSize() + @@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); addRetransmissions(lastReceived_ + 1, segmentNumber); - //lastReceived_ is updated only for data packets received without RTX + // lastReceived_ is updated only for data packets received without RTX lastReceived_ = segmentNumber; } @@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject( increaseWindow(); } - //in any case we remove the packet from the rtx list + // in any case we remove the packet from the rtx list interestRetransmissions_.erase(segmentNumber); if (schedule_next_interest) { @@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject( void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user - auto a = content_object.getPayload(); + auto read_buffer = content_object.getPayload(); - a->trimStart(HICN_TIMESTAMP_SIZE); - uint8_t *start = a->writableData(); - unsigned size = (unsigned)a->length(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); // set offset between hICN and RTP packets - uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); + uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1)); RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq; - std::shared_ptr> content_buffer; - socket_->getSocketOption(APPLICATION_BUFFER, content_buffer); - content_buffer->insert(content_buffer->end(), start, start + size); + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + socket_->getSocketOption(READ_CALLBACK, &read_callback); + + if (read_callback == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before starting " + "the content retrieval."); + } + + if (read_callback->isBufferMovable()) { + read_callback->readBufferAvailable( + utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer->length(); + + while (read_buffer->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer->length(), length); + + std::memcpy(buffer, read_buffer->data(), to_copy); + read_buffer->trimStart(to_copy); + } - ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if ((*on_payload) != VOID_HANDLER) { - (*on_payload)(*socket_, size, std::make_error_code(std::errc(0))); + read_callback->readDataAvailable(total_length); + read_buffer->clear(); } } -- cgit 1.2.3-korg