aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/reassembly.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/reassembly.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc80
1 files changed, 3 insertions, 77 deletions
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index c45d876a0..9682d338d 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -14,7 +14,8 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/protocols/errors.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
@@ -23,66 +24,7 @@ namespace transport {
namespace protocol {
-BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback,
- TransportProtocol *next_interest)
- : reassembly_consumer_socket_(icn_socket),
- incremental_index_manager_(
- std::make_unique<IncrementalIndexManager>(icn_socket)),
- manifest_index_manager_(
- std::make_unique<ManifestIndexManager>(icn_socket, next_interest)),
- index_manager_(incremental_index_manager_.get()),
- index_(0),
- read_buffer_(nullptr) {
- setContentCallback(content_callback);
-}
-
-void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
- }
-
- auto it = received_packets_.find((const unsigned int)index_);
- while (it != received_packets_.end()) {
- if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- copyContent(*it->second);
- received_packets_.erase(it);
- }
-
- index_ = index_manager_->getNextReassemblySegment();
- it = received_packets_.find((const unsigned int)index_);
- }
-}
-
-void BaseReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
-
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
- read_buffer_->append(write_size);
-
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
-
- bool download_completed =
- index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
-
- if (TRANSPORT_EXPECT_FALSE(download_completed)) {
- notifyApplication();
- content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
- }
-}
-
-void BaseReassembly::notifyApplication() {
+void Reassembly::notifyApplication() {
interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
reassembly_consumer_socket_->getSocketOption(
interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
@@ -124,21 +66,5 @@ void BaseReassembly::notifyApplication() {
}
}
-void BaseReassembly::reset() {
- manifest_index_manager_->reset();
- incremental_index_manager_->reset();
- index_ = index_manager_->getNextReassemblySegment();
-
- received_packets_.clear();
-
- // reset read buffer
- interface::ConsumerSocket::ReadCallback *read_callback;
- reassembly_consumer_socket_->getSocketOption(
- interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
-
- read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
-}
-
} // namespace protocol
-
} // namespace transport