aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/byte_stream_reassembly.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/byte_stream_reassembly.cc')
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc59
1 files changed, 36 insertions, 23 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index 12631637e..d2bc961c4 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -16,12 +16,11 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/byte_stream_reassembly.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
-#include <protocols/protocol.h>
+#include <protocols/transport_protocol.h>
namespace transport {
@@ -46,11 +45,11 @@ void ByteStreamReassembly::reassemble(
}
}
-void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) &&
- read_buffer_->capacity()) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
+void ByteStreamReassembly::reassemble(ContentObject &content_object) {
+ if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) {
+ received_packets_.emplace(
+ std::make_pair(content_object.getName().getSuffix(),
+ content_object.shared_from_this()));
assembleContent();
}
}
@@ -67,7 +66,9 @@ void ByteStreamReassembly::assembleContent() {
while (it != received_packets_.end()) {
// Check if valid packet
if (it->second) {
- copyContent(*it->second);
+ if (TRANSPORT_EXPECT_FALSE(copyContent(*it->second))) {
+ return;
+ }
}
received_packets_.erase(it);
@@ -80,32 +81,44 @@ void ByteStreamReassembly::assembleContent() {
}
}
-void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto payload = content_object.getPayloadReference();
- auto payload_length = payload.second;
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
+bool ByteStreamReassembly::copyContent(ContentObject &content_object) {
+ bool ret = false;
- std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
- read_buffer_->append(write_size);
+ content_object.trimStart(content_object.headerSize());
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
+ utils::MemBuf *current = &content_object;
+
+ do {
+ auto payload_length = current->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), current->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), current->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
+
+ current = current->next();
+ } while (current != &content_object);
download_complete_ =
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ ret = download_complete_;
notifyApplication();
transport_protocol_->onContentReassembled(
make_error_code(protocol_error::success));
}
+
+ return ret;
}
void ByteStreamReassembly::reInitialize() {