diff options
Diffstat (limited to 'libtransport/src/protocols/byte_stream_reassembly.cc')
-rw-r--r-- | libtransport/src/protocols/byte_stream_reassembly.cc | 59 |
1 files changed, 36 insertions, 23 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 12631637e..d2bc961c4 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -16,12 +16,11 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/utils/array.h> #include <hicn/transport/utils/membuf.h> - #include <implementation/socket_consumer.h> #include <protocols/byte_stream_reassembly.h> #include <protocols/errors.h> #include <protocols/indexer.h> -#include <protocols/protocol.h> +#include <protocols/transport_protocol.h> namespace transport { @@ -46,11 +45,11 @@ void ByteStreamReassembly::reassemble( } } -void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) && - read_buffer_->capacity()) { - received_packets_.emplace(std::make_pair( - content_object->getName().getSuffix(), std::move(content_object))); +void ByteStreamReassembly::reassemble(ContentObject &content_object) { + if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) { + received_packets_.emplace( + std::make_pair(content_object.getName().getSuffix(), + content_object.shared_from_this())); assembleContent(); } } @@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() { while (it != received_packets_.end()) { // Check if valid packet if (it->second) { - copyContent(*it->second); + if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) { + return; + } } received_packets_.erase(it); @@ -80,32 +81,44 @@ void ByteStreamReassembly::assembleContent() { } } -void ByteStreamReassembly::copyContent(const ContentObject &content_object) { - auto payload = content_object.getPayloadReference(); - auto payload_length = payload.second; - auto write_size = std::min(payload_length, read_buffer_->tailroom()); - auto additional_bytes = payload_length > read_buffer_->tailroom() - ? payload_length - read_buffer_->tailroom() - : 0; +bool ByteStreamReassembly::copyContent(ContentObject &content_object) { + bool ret = false; - std::memcpy(read_buffer_->writableTail(), payload.first, write_size); - read_buffer_->append(write_size); + content_object.trimStart(content_object.headerSize()); - if (!read_buffer_->tailroom()) { - notifyApplication(); - std::memcpy(read_buffer_->writableTail(), payload.first + write_size, - additional_bytes); - read_buffer_->append(additional_bytes); - } + utils::MemBuf *current = &content_object; + + do { + auto payload_length = current->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), current->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), current->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } + + current = current->next(); + } while (current != &content_object); download_complete_ = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + ret = download_complete_; notifyApplication(); transport_protocol_->onContentReassembled( make_error_code(protocol_error::success)); } + + return ret; } void ByteStreamReassembly::reInitialize() { |