diff options
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/reassembly.cc')
-rw-r--r-- | libtransport/src/hicn/transport/protocols/reassembly.cc | 78 |
1 files changed, 69 insertions, 9 deletions
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index 899f701c7..a2062df93 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -17,6 +17,7 @@ #include <hicn/transport/protocols/indexing_manager.h> #include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/utils/array.h> +#include <hicn/transport/utils/membuf.h> namespace transport { @@ -30,7 +31,8 @@ BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, manifest_index_manager_( std::make_unique<ManifestIndexManager>(icn_socket)), index_manager_(incremental_index_manager_.get()), - index_(0) { + index_(0), + read_buffer_(nullptr) { setContentCallback(content_callback); } @@ -54,30 +56,88 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { void BaseReassembly::copyContent(const ContentObject &content_object) { auto a = content_object.getPayload(); - - std::shared_ptr<std::vector<uint8_t>> content_buffer; - reassembly_consumer_socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - - content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(), - (uint8_t *)a->data() + a->length()); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } bool download_completed = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_completed)) { + notifyApplication(); content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); } } +void BaseReassembly::notifyApplication() { + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + if (TRANSPORT_EXPECT_FALSE(!read_callback)) { + TRANSPORT_LOGE("Read callback not installed!"); + return; + } + + if (read_callback->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback->readBufferAvailable(std::move(read_buffer_)); + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer_->length(); + + while (read_buffer_->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer_->length(), length); + std::memcpy(buffer, read_buffer_->data(), to_copy); + read_buffer_->trimStart(to_copy); + } + + read_callback->readDataAvailable(total_length); + read_buffer_->clear(); + } +} + void BaseReassembly::reset() { manifest_index_manager_->reset(); incremental_index_manager_->reset(); index_ = index_manager_->getNextReassemblySegment(); received_packets_.clear(); + + // reset read buffer + interface::ConsumerSocket::ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); } } // namespace protocol -} // end namespace transport +} // namespace transport |