From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/protocols/raaqm.cc | 62 ++++--- .../src/hicn/transport/protocols/reassembly.cc | 78 +++++++- .../src/hicn/transport/protocols/reassembly.h | 8 + libtransport/src/hicn/transport/protocols/rtc.cc | 204 ++++++++++++--------- 4 files changed, 227 insertions(+), 125 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index b8a7c9610..7f0310e7c 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) } RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (this->rate_estimator_) { - delete this->rate_estimator_; + if (rate_estimator_) { + delete rate_estimator_; } } int RaaqmTransportProtocol::start() { - if (this->rate_estimator_) { - this->rate_estimator_->onStart(); + if (rate_estimator_) { + rate_estimator_->onStart(); } if (!cur_path_) { @@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - this->rate_estimator_ = new ALaTcpEstimator(); + rate_estimator_ = new ALaTcpEstimator(); } else { - this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + rate_estimator_ = new SimpleEstimator(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, - &this->rate_estimator_->observer_); + &rate_estimator_->observer_); // Current path auto cur_path = std::make_unique( @@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowIncrease(current_window_size_); + rate_estimator_->onWindowIncrease(current_window_size_); } void RaaqmTransportProtocol::decreaseWindow() { @@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowDecrease(current_window_size_); + rate_estimator_->onWindowDecrease(current_window_size_); } void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { @@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception( updatePathTable(content_object); increaseWindow(); updateRtt(interest.getName().getSuffix()); - this->rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); // Set drop probablility and window size accordingly RAAQM(); } @@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment( reassemble(std::move(content_object)); } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == index_manager_->getFinalSuffix())) { - onContentReassembled(std::make_error_code(std::errc(0))); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload != nullptr) { + on_payload->readSuccess(stats_.getBytesRecv()); + } } } else { // TODO Application policy check @@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { return; } - // This is set to ~0 so that the next interest_retransmissions_ + 1, performed - // by sendInterest, will result in 0 + // This is set to ~0 so that the next interest_retransmissions_ + 1, + // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if (*on_payload != VOID_HANDLER) { - std::shared_ptr> content_buffer; - socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - (*on_payload)(*socket_, content_buffer->size(), ec); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_.getBytesRecv()); + } else { + on_payload->readError(ec); } - this->rate_estimator_->onDownloadFinished(); + rate_estimator_->onDownloadFinished(); stop(); } @@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { // Update stats updateStats((uint32_t)segment, rtt.count(), now); - if (this->rate_estimator_) { - this->rate_estimator_->onRttUpdate((double)rtt.count()); + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); } cur_path_->insertNewRtt(rtt.count()); @@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() { } // end namespace protocol -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index 899f701c7..a2062df93 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -17,6 +17,7 @@ #include #include #include +#include namespace transport { @@ -30,7 +31,8 @@ BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, manifest_index_manager_( std::make_unique(icn_socket)), index_manager_(incremental_index_manager_.get()), - index_(0) { + index_(0), + read_buffer_(nullptr) { setContentCallback(content_callback); } @@ -54,30 +56,88 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { void BaseReassembly::copyContent(const ContentObject &content_object) { auto a = content_object.getPayload(); - - std::shared_ptr> content_buffer; - reassembly_consumer_socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - - content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(), - (uint8_t *)a->data() + a->length()); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } bool download_completed = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_completed)) { + notifyApplication(); content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); } } +void BaseReassembly::notifyApplication() { + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + if (TRANSPORT_EXPECT_FALSE(!read_callback)) { + TRANSPORT_LOGE("Read callback not installed!"); + return; + } + + if (read_callback->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback->readBufferAvailable(std::move(read_buffer_)); + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer_->length(); + + while (read_buffer_->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer_->length(), length); + std::memcpy(buffer, read_buffer_->data(), to_copy); + read_buffer_->trimStart(to_copy); + } + + read_callback->readDataAvailable(total_length); + read_buffer_->clear(); + } +} + void BaseReassembly::reset() { manifest_index_manager_->reset(); incremental_index_manager_->reset(); index_ = index_manager_->getNextReassemblySegment(); received_packets_.clear(); + + // reset read buffer + interface::ConsumerSocket::ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); } } // namespace protocol -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index 9efddb773..79f0ea4d2 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -20,6 +20,10 @@ namespace transport { +namespace interface { +class ConsumerReadCallback; +} + namespace protocol { // Forward Declaration @@ -54,6 +58,9 @@ class BaseReassembly : public Reassembly { virtual void reset() override; + private: + void notifyApplication(); + protected: // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; @@ -63,6 +70,7 @@ class BaseReassembly : public Reassembly { std::unordered_map received_packets_; uint64_t index_; + std::unique_ptr read_buffer_; }; } // namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 9402d3b02..4205ade4e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats( uint32_t segmentNumber = content_object.getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - if (inflightInterests_[pkt].state != sent_) - return; + if (inflightInterests_[pkt].state != sent_) return; - if(interestRetransmissions_.find(segmentNumber) != + if (interestRetransmissions_.find(segmentNumber) != interestRetransmissions_.end()) - //this packet was rtx at least once + // this packet was rtx at least once return; uint32_t pathLabel = content_object.getPathLabel(); @@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() { } else { currentCWin_ = min( maxCWin_, - (uint32_t)ceil(currentCWin_ + - (1.0 / (double)currentCWin_))); // linear + (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear } } @@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { if (!rtx) { inflightInterestsCount_++; } - } void RTCTransportProtocol::scheduleNextInterests() { @@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() { while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); - //we send the packet only if it is not pending yet + // we send the packet only if it is not pending yet interest_name->setSuffix(actualSegment_); if (portal_->interestIsPending(*interest_name)) { actualSegment_++; @@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() { } uint32_t pkt = actualSegment_ & modMask_; - //if we already reacevied the content we don't ask it again - if(inflightInterests_[pkt].state == received_ && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; - continue; + // if we already reacevied the content we don't ask it again + if (inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_++; + continue; } inflightInterests_[pkt].transmissionTime = @@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector &nacks) { #endif } -void RTCTransportProtocol::addRetransmissions(uint32_t val){ - //add only val in the rtx list +void RTCTransportProtocol::addRetransmissions(uint32_t val) { + // add only val in the rtx list addRetransmissions(val, val + 1); } -void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){ - for(uint32_t i = start; i < stop; i++){ +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); - if(it == interestRetransmissions_.end()){ + if (it == interestRetransmissions_.end()) { if (lastSegNacked_ <= i) { - //i must be larger than the last past nack received + // i must be larger than the last past nack received interestRetransmissions_[i] = 0; } - }//if the retransmission is already there the rtx timer will - //take care of it + } // if the retransmission is already there the rtx timer will + // take care of it } retransmit(true); } -void RTCTransportProtocol::retransmit(bool first_rtx){ +void RTCTransportProtocol::retransmit(bool first_rtx) { auto it = interestRetransmissions_.begin(); - //cut len to max HICN_MAX_RTX_SIZE - //since we use a map, the smaller (and so the older) sequence number are at - //the beginnin of the map - while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){ + // cut len to max HICN_MAX_RTX_SIZE + // since we use a map, the smaller (and so the older) sequence number are at + // the beginnin of the map + while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { it = interestRetransmissions_.erase(it); } it = interestRetransmissions_.begin(); - while (it != interestRetransmissions_.end()){ + while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; - if(inflightInterests_[pkt].sequence != it->first){ - //this packet is not anymore in the inflight buffer, erase it + if (inflightInterests_[pkt].sequence != it->first) { + // this packet is not anymore in the inflight buffer, erase it it = interestRetransmissions_.erase(it); continue; } - //we retransmitted the packet too many times - if(it->second >= HICN_MAX_RTX){ + // we retransmitted the packet too many times + if (it->second >= HICN_MAX_RTX) { it = interestRetransmissions_.erase(it); continue; } - //this packet is too old - if((lastReceived_ > it->first) && - (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){ + // this packet is too old + if ((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { it = interestRetransmissions_.erase(it); continue; } - if(first_rtx){ - //TODO (optimization) - //the rtx that we never sent (it->second == 0) are all at the - //end, so we can go directly there - if(it->second == 0){ + if (first_rtx) { + // TODO (optimization) + // the rtx that we never sent (it->second == 0) are all at the + // end, so we can go directly there + if (it->second == 0) { inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } ++it; - }else{ - //base on time + } else { + // base on time uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if((now - inflightInterests_[pkt].transmissionTime) > 20){ - //XXX replace 20 with rtt + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if ((now - inflightInterests_[pkt].transmissionTime) > 20) { + // XXX replace 20 with rtt inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } @@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){ } } -void RTCTransportProtocol::checkRtx(){ +void RTCTransportProtocol::checkRtx() { retransmit(false); rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); rtx_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - checkRtx(); - }); + if (ec) return; + checkRtx(); + }); } void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { @@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { inflightInterestsCount_--; } - //check how many times we sent this packet - auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){ - inflightInterests_[pkt].state = lost_; - } + // check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == sent_) { - inflightInterests_[pkt].state = timeout1_; - } else if (inflightInterests_[pkt].state == timeout1_) { - inflightInterests_[pkt].state = timeout2_; - } else if (inflightInterests_[pkt].state == timeout2_) { - inflightInterests_[pkt].state = lost_; - } + if (inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == lost_) { - interestRetransmissions_.erase(segmentNumber); - }else{ - addRetransmissions(segmentNumber); - } + if (inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + } else { + addRetransmissions(segmentNumber); + } scheduleNextInterests(); } @@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive( uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); - if (productionRate == 0) { // the producer socket is not active // in this case we consider only the first nack if (nack_timer_used_) { - return false; + return false; } nack_timer_used_ = true; @@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject( ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); if (inflightInterests_[pkt].state == sent_) { - inflightInterestsCount_--; //packet sent without timeouts + inflightInterestsCount_--; // packet sent without timeouts } if (inflightInterests_[pkt].state == sent_ && interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ + interestRetransmissions_.end()) { // we count only non retransmitted data in order to take into accunt only // the transmition rate of the producer receivedBytes_ += (uint32_t)(content_object->headerSize() + @@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); addRetransmissions(lastReceived_ + 1, segmentNumber); - //lastReceived_ is updated only for data packets received without RTX + // lastReceived_ is updated only for data packets received without RTX lastReceived_ = segmentNumber; } @@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject( increaseWindow(); } - //in any case we remove the packet from the rtx list + // in any case we remove the packet from the rtx list interestRetransmissions_.erase(segmentNumber); if (schedule_next_interest) { @@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject( void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user - auto a = content_object.getPayload(); + auto read_buffer = content_object.getPayload(); - a->trimStart(HICN_TIMESTAMP_SIZE); - uint8_t *start = a->writableData(); - unsigned size = (unsigned)a->length(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); // set offset between hICN and RTP packets - uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); + uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1)); RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq; - std::shared_ptr> content_buffer; - socket_->getSocketOption(APPLICATION_BUFFER, content_buffer); - content_buffer->insert(content_buffer->end(), start, start + size); + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + socket_->getSocketOption(READ_CALLBACK, &read_callback); + + if (read_callback == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before starting " + "the content retrieval."); + } + + if (read_callback->isBufferMovable()) { + read_callback->readBufferAvailable( + utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer->length(); + + while (read_buffer->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer->length(), length); + + std::memcpy(buffer, read_buffer->data(), to_copy); + read_buffer->trimStart(to_copy); + } - ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if ((*on_payload) != VOID_HANDLER) { - (*on_payload)(*socket_, size, std::make_error_code(std::errc(0))); + read_callback->readDataAvailable(total_length); + read_buffer->clear(); } } -- cgit 1.2.3-korg