From 393a0a4018936134fa1b343523bbafaab606973d Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Tue, 19 Mar 2019 16:21:55 +0100 Subject: [HICN-125 HICN-126] - Add MemBuf as return type of getPayload of transport::core::Packet - Fix incremental index manager Change-Id: Ib557d56b1bf42e3974364c611b825b21f1e3d3f1 Signed-off-by: Mauro Sardara --- .../hicn/transport/protocols/indexing_manager.h | 56 ++++------------------ libtransport/src/hicn/transport/protocols/raaqm.cc | 13 ++--- .../src/hicn/transport/protocols/reassembly.cc | 10 ++-- .../src/hicn/transport/protocols/reassembly.h | 1 - libtransport/src/hicn/transport/protocols/rtc.cc | 16 +++---- 5 files changed, 27 insertions(+), 69 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/indexing_manager.h index c692695a3..b6b8bb4a6 100644 --- a/libtransport/src/hicn/transport/protocols/indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/indexing_manager.h @@ -40,6 +40,8 @@ class IndexManager { */ virtual uint32_t getNextSuffix() = 0; + virtual void setFirstSuffix(uint32_t suffix) = 0; + /** * Retrive the next segment to be reassembled. */ @@ -72,52 +74,6 @@ class IndexVerificationManager : public IndexManager { virtual bool onContentObject(const core::ContentObject &content_object) = 0; }; -class ZeroIndexManager : public IndexVerificationManager { - public: - ZeroIndexManager() : reset_(true) {} - - TRANSPORT_ALWAYS_INLINE virtual void reset() override { reset_ = true; } - - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { - uint32_t ret = reset_ ? 0 : IndexManager::invalid_index; - reset_ = false; - return ret; - } - - /** - * Retrive the next segment to be reassembled. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { - return IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { - return false; - } - - TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { - return IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE bool onManifest( - core::ContentObject::Ptr &&content_object) override { - throw errors::UnexpectedManifestException(); - } - - TRANSPORT_ALWAYS_INLINE bool onContentObject( - const core::ContentObject &content_object) override { - throw errors::RuntimeException( - "Called onContentObject on a ZeroIndexManager, which is not able to " - "process packets."); - } - - private: - bool reset_; -}; - class IncrementalIndexManager : public IndexVerificationManager { public: IncrementalIndexManager(interface::ConsumerSocket *icn_socket) @@ -135,7 +91,7 @@ class IncrementalIndexManager : public IndexVerificationManager { TRANSPORT_ALWAYS_INLINE virtual void reset() override { final_suffix_ = std::numeric_limits::max(); - next_download_suffix_ = 0; + next_download_suffix_ = first_suffix_; next_reassembly_suffix_ = 0; } @@ -147,6 +103,11 @@ class IncrementalIndexManager : public IndexVerificationManager { : IndexManager::invalid_index; } + TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix( + uint32_t suffix) override { + first_suffix_ = suffix; + } + /** * Retrive the next segment to be reassembled. */ @@ -183,6 +144,7 @@ class IncrementalIndexManager : public IndexVerificationManager { protected: interface::ConsumerSocket *socket_; uint32_t final_suffix_; + uint32_t first_suffix_; uint32_t next_download_suffix_; uint32_t next_reassembly_suffix_; std::unique_ptr verification_manager_; diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index cd3a6cb85..be76f7c23 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -98,6 +98,11 @@ int RaaqmTransportProtocol::start() { void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } void RaaqmTransportProtocol::reset() { + // Set first segment to retrieve + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->setFirstSuffix(name->getSuffix()); + // Reset reassembly component BaseReassembly::reset(); @@ -333,17 +338,9 @@ void RaaqmTransportProtocol::onContentObject( index_manager_->onManifest(std::move(content_object)); } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - index_manager_ = incremental_index_manager_.get(); - } - onContentSegment(std::move(interest), std::move(content_object)); } - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - BaseReassembly::index_ = index_manager_->getNextReassemblySegment(); - } - scheduleNextInterests(); } diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index 53180935c..899f701c7 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -25,12 +25,11 @@ namespace protocol { BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, ContentReassembledCallback *content_callback) : reassembly_consumer_socket_(icn_socket), - zero_index_manager_(std::make_unique()), incremental_index_manager_( std::make_unique(icn_socket)), manifest_index_manager_( std::make_unique(icn_socket)), - index_manager_(zero_index_manager_.get()), + index_manager_(incremental_index_manager_.get()), index_(0) { setContentCallback(content_callback); } @@ -54,14 +53,14 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { } void BaseReassembly::copyContent(const ContentObject &content_object) { - utils::Array<> a = content_object.getPayload(); + auto a = content_object.getPayload(); std::shared_ptr> content_buffer; reassembly_consumer_socket_->getSocketOption( interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - content_buffer->insert(content_buffer->end(), (uint8_t *)a.data(), - (uint8_t *)a.data() + a.length()); + content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(), + (uint8_t *)a->data() + a->length()); bool download_completed = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); @@ -74,6 +73,7 @@ void BaseReassembly::copyContent(const ContentObject &content_object) { void BaseReassembly::reset() { manifest_index_manager_->reset(); incremental_index_manager_->reset(); + index_ = index_manager_->getNextReassemblySegment(); received_packets_.clear(); } diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index ef3e99fc5..9efddb773 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -57,7 +57,6 @@ class BaseReassembly : public Reassembly { protected: // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; - std::unique_ptr zero_index_manager_; std::unique_ptr incremental_index_manager_; std::unique_ptr manifest_index_manager_; IndexVerificationManager *index_manager_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index c2323345f..48ba90f9e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -186,8 +186,8 @@ void RTCTransportProtocol::updateDelayStats( pathTable_[pathLabel]->insertRttSample(RTT); // we collect OWD only for datapackets - if (content_object.getPayload().length() != HICN_NACK_HEADER_SIZE) { - uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload().data(); + if (content_object.getPayload()->length() != HICN_NACK_HEADER_SIZE) { + uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload()->data(); int64_t OWD = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) @@ -474,7 +474,7 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RTCTransportProtocol::onNack(const ContentObject &content_object) { - uint32_t *payload = (uint32_t *)content_object.getPayload().data(); + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); @@ -531,7 +531,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t payload_size = (uint32_t)content_object->getPayload().length(); + uint32_t payload_size = (uint32_t)content_object->getPayload()->length(); uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; @@ -549,7 +549,7 @@ void RTCTransportProtocol::onContentObject( avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * - content_object->getPayload().length()); + content_object->getPayload()->length()); if (inflightInterests_[pkt].retransmissions == 0) { inflightInterestsCount_--; @@ -570,10 +570,10 @@ void RTCTransportProtocol::onContentObject( void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user - Array a = content_object.getPayload(); + auto a = content_object.getPayload(); - uint8_t *start = ((uint8_t *)a.data()) + HICN_TIMESTAMP_SIZE; - unsigned size = (unsigned)(a.length() - HICN_TIMESTAMP_SIZE); + uint8_t *start = ((uint8_t *)a->data()) + HICN_TIMESTAMP_SIZE; + unsigned size = (unsigned)(a->length() - HICN_TIMESTAMP_SIZE); // set offset between hICN and RTP packets uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); -- cgit 1.2.3-korg