aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rw-r--r--libtransport/src/hicn/transport/protocols/CMakeLists.txt18
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc121
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h54
-rw-r--r--libtransport/src/hicn/transport/protocols/data_processing_events.h33
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.cc35
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.h39
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.cc60
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.cc52
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.h (renamed from libtransport/src/hicn/transport/protocols/indexing_manager.h)114
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.cc74
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.h102
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc232
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc293
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h82
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc31
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h24
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc118
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.h14
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc80
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h50
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc210
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h106
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.cc71
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.h49
26 files changed, 1404 insertions, 840 deletions
diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
index 23aeca9bf..06515e0e2 100644
--- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
@@ -14,8 +14,12 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h
${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h
${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/statistics.h
@@ -27,11 +31,18 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
)
list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc
${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc
${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
@@ -39,7 +50,8 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
)
set(RAAQM_CONFIG_INSTALL_PREFIX
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
new file mode 100644
index 000000000..2f1e5d8fd
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/byte_stream_reassembly.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/errors.h>
+#include <hicn/transport/protocols/indexer.h>
+#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
+
+namespace transport {
+
+namespace protocol {
+
+ByteStreamReassembly::ByteStreamReassembly(
+ interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : Reassembly(icn_socket, transport_protocol),
+ index_(IndexManager::invalid_index),
+ download_complete_(false) {}
+
+void ByteStreamReassembly::reassemble(
+ std::unique_ptr<ContentObjectManifest> &&manifest) {
+ if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
+ received_packets_.emplace(
+ std::make_pair(manifest->getName().getSuffix(), nullptr));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ received_packets_.emplace(std::make_pair(
+ content_object->getName().getSuffix(), std::move(content_object)));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::assembleContent() {
+ if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) {
+ index_ = index_manager_->getNextReassemblySegment();
+ if (index_ == IndexManager::invalid_index) {
+ return;
+ }
+ }
+
+ auto it = received_packets_.find((const unsigned int)index_);
+ while (it != received_packets_.end()) {
+ // Check if valid packet
+ if (it->second) {
+ copyContent(*it->second);
+ }
+
+ received_packets_.erase(it);
+ index_ = index_manager_->getNextReassemblySegment();
+ it = received_packets_.find((const unsigned int)index_);
+ }
+
+ if (!download_complete_ && index_ != IndexManager::invalid_index) {
+ transport_protocol_->onReassemblyFailed(index_);
+ }
+}
+
+void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+ auto a = content_object.getPayload();
+ auto payload_length = a->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
+
+ download_complete_ =
+ index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ notifyApplication();
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::success));
+ }
+}
+
+void ByteStreamReassembly::reInitialize() {
+ index_ = IndexManager::invalid_index;
+ download_complete_ = false;
+
+ received_packets_.clear();
+
+ // reset read buffer
+ interface::ConsumerSocket::ReadCallback *read_callback;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
new file mode 100644
index 000000000..7c77d486f
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class ByteStreamReassembly : public Reassembly {
+ public:
+ ByteStreamReassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ protected:
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
+
+ virtual void copyContent(const core::ContentObject &content_object);
+
+ virtual void reInitialize() override;
+
+ private:
+ void assembleContent();
+
+ protected:
+ // The consumer socket
+ // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
+ // std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
+ // IndexVerificationManager *index_manager_;
+ std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_;
+ uint32_t index_;
+ bool download_complete_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h
new file mode 100644
index 000000000..8975c2b4a
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/data_processing_events.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+namespace transport {
+namespace protocol {
+
+class ContentObjectProcessingEventCallback {
+ public:
+ virtual ~ContentObjectProcessingEventCallback() = default;
+ virtual void onPacketDropped(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+};
+
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
new file mode 100644
index 000000000..7b01ad4bc
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/datagram_reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket,
+ TransportProtocol* transport_protocol)
+ : Reassembly(icn_socket, transport_protocol) {}
+
+void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) {
+ read_buffer_ = content_object->getPayload();
+ Reassembly::notifyApplication();
+}
+
+void DatagramReassembly::reInitialize() {}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
new file mode 100644
index 000000000..923b6f2c1
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class DatagramReassembly : public Reassembly {
+ public:
+ DatagramReassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+ virtual void reInitialize() override;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
+ return;
+ }
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc
new file mode 100644
index 000000000..c2249ed4a
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/errors.cc
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/errors.h>
+
+namespace transport {
+namespace protocol {
+
+const std::error_category& protocol_category() {
+ static protocol_category_impl instance;
+
+ return instance;
+}
+
+const char* protocol_category_impl::name() const throw() {
+ return "transport::protocol::error";
+}
+
+std::string protocol_category_impl::message(int ev) const {
+ switch (static_cast<protocol_error>(ev)) {
+ case protocol_error::success: {
+ return "Success";
+ }
+ case protocol_error::signature_verification_failed: {
+ return "Signature verification failed.";
+ }
+ case protocol_error::integrity_verification_failed: {
+ return "Integrity verification failed";
+ }
+ case protocol_error::no_verifier_provided: {
+ return "Transport cannot get any verifier for the given data.";
+ }
+ case protocol_error::io_error: {
+ return "Conectivity error between transport and local forwarder";
+ }
+ case protocol_error::max_retransmissions_error: {
+ return "Transport protocol reached max number of retransmissions allowed "
+ "for the same interest.";
+ }
+ case protocol_error::session_aborted: {
+ return "The session has been aborted by the application.";
+ }
+ default: { return "Unknown protocol error"; }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h
new file mode 100644
index 000000000..cb3d3474e
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/errors.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+
+namespace transport {
+namespace protocol {
+
+/**
+ * @brief Get the default server error category.
+ * @return The default server error category instance.
+ *
+ * @warning The first call to this function is thread-safe only starting with
+ * C++11.
+ */
+const std::error_category& protocol_category();
+
+/**
+ * The list of errors.
+ */
+enum class protocol_error {
+ success = 0,
+ signature_verification_failed,
+ integrity_verification_failed,
+ no_verifier_provided,
+ io_error,
+ max_retransmissions_error,
+ session_aborted,
+};
+
+/**
+ * @brief Create an error_code instance for the given error.
+ * @param error The error.
+ * @return The error_code instance.
+ */
+inline std::error_code make_error_code(protocol_error error) {
+ return std::error_code(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief Create an error_condition instance for the given error.
+ * @param error The error.
+ * @return The error_condition instance.
+ */
+inline std::error_condition make_error_condition(protocol_error error) {
+ return std::error_condition(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief A server error category.
+ */
+class protocol_category_impl : public std::error_category {
+ public:
+ /**
+ * @brief Get the name of the category.
+ * @return The name of the category.
+ */
+ virtual const char* name() const throw();
+
+ /**
+ * @brief Get the error message for a given error.
+ * @param ev The error numeric value.
+ * @return The message associated to the error.
+ */
+ virtual std::string message(int ev) const;
+};
+} // namespace protocol
+} // namespace transport
+
+namespace std {
+// namespace system {
+template <>
+struct is_error_code_enum<::transport::protocol::protocol_error>
+ : public std::true_type {};
+// } // namespace system
+} // namespace std \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
new file mode 100644
index 000000000..5a8046daa
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/incremental_indexer.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+namespace protocol {
+
+void IncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ using namespace interface;
+
+ if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
+ final_suffix_ = content_object->getName().getSuffix();
+ }
+
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(interest),
+ std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
index b6b8bb4a6..ea84d645a 100644
--- a/libtransport/src/hicn/transport/protocols/indexing_manager.h
+++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
@@ -15,9 +15,11 @@
#pragma once
+#include <hicn/transport/protocols/indexer.h>
+
#include <hicn/transport/errors/runtime_exception.h>
#include <hicn/transport/errors/unexpected_manifest_exception.h>
-#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/verification_manager.h>
#include <hicn/transport/utils/literals.h>
@@ -25,74 +27,59 @@
namespace transport {
-namespace protocol {
-
-class IndexManager {
- public:
- static constexpr uint32_t invalid_index = ~0;
-
- /**
- *
- */
- virtual ~IndexManager() = default;
- /**
- * Retrieve from the manifest the next suffix to retrieve.
- */
- virtual uint32_t getNextSuffix() = 0;
-
- virtual void setFirstSuffix(uint32_t suffix) = 0;
-
- /**
- * Retrive the next segment to be reassembled.
- */
- virtual uint32_t getNextReassemblySegment() = 0;
-
- virtual bool isFinalSuffixDiscovered() = 0;
-
- virtual uint32_t getFinalSuffix() = 0;
-
- virtual void reset() = 0;
-};
+namespace interface {
+class ConsumerSocket;
+}
-class IndexVerificationManager : public IndexManager {
- public:
- /**
- *
- */
- virtual ~IndexVerificationManager() = default;
+namespace protocol {
- /**
- * The ownership of the ContentObjectManifest is moved
- * from the caller to the VerificationManager
- */
- virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0;
+class Reassembly;
+class TransportProtocol;
- /**
- * The content object must just be verified; the ownership is still of the
- * caller.
- */
- virtual bool onContentObject(const core::ContentObject &content_object) = 0;
-};
-
-class IncrementalIndexManager : public IndexVerificationManager {
+class IncrementalIndexer : public Indexer {
public:
- IncrementalIndexManager(interface::ConsumerSocket *icn_socket)
+ IncrementalIndexer(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
: socket_(icn_socket),
+ reassembly_(reassembly),
+ transport_protocol_(transport),
final_suffix_(std::numeric_limits<uint32_t>::max()),
+ first_suffix_(0),
next_download_suffix_(0),
next_reassembly_suffix_(0),
verification_manager_(
- std::make_unique<SignatureVerificationManager>(icn_socket)) {}
+ std::make_unique<SignatureVerificationManager>(icn_socket)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
+
+ IncrementalIndexer(const IncrementalIndexer &) = delete;
+
+ IncrementalIndexer(IncrementalIndexer &&other)
+ : socket_(other.socket_),
+ reassembly_(other.reassembly_),
+ transport_protocol_(other.transport_protocol_),
+ final_suffix_(other.final_suffix_),
+ first_suffix_(other.first_suffix_),
+ next_download_suffix_(other.next_download_suffix_),
+ next_reassembly_suffix_(other.next_reassembly_suffix_),
+ verification_manager_(std::move(other.verification_manager_)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
/**
*
*/
- virtual ~IncrementalIndexManager() {}
+ virtual ~IncrementalIndexer() {}
- TRANSPORT_ALWAYS_INLINE virtual void reset() override {
+ TRANSPORT_ALWAYS_INLINE virtual void reset(
+ std::uint32_t offset = 0) override {
final_suffix_ = std::numeric_limits<uint32_t>::max();
- next_download_suffix_ = first_suffix_;
- next_reassembly_suffix_ = 0;
+ next_download_suffix_ = offset;
+ next_reassembly_suffix_ = offset;
}
/**
@@ -125,24 +112,21 @@ class IncrementalIndexManager : public IndexVerificationManager {
return final_suffix_;
}
- TRANSPORT_ALWAYS_INLINE bool onManifest(
- core::ContentObject::Ptr &&content_object) override {
- throw errors::UnexpectedManifestException();
- }
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
- TRANSPORT_ALWAYS_INLINE bool onContentObject(
- const core::ContentObject &content_object) override {
- auto ret = verification_manager_->onPacketToVerify(content_object);
+ TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) {
+ reassembly_ = reassembly;
- if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) {
- final_suffix_ = content_object.getName().getSuffix();
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
}
-
- return ret;
}
protected:
interface::ConsumerSocket *socket_;
+ Reassembly *reassembly_;
+ TransportProtocol *transport_protocol_;
uint32_t final_suffix_;
uint32_t first_suffix_;
uint32_t next_download_suffix_;
diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc
new file mode 100644
index 000000000..c50c4236b
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/indexer.cc
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/indexer.h>
+
+#include <hicn/transport/protocols/incremental_indexer.h>
+#include <hicn/transport/protocols/manifest_incremental_indexer.h>
+#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+namespace transport {
+namespace protocol {
+
+IndexManager::IndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
+ : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport,
+ reassembly)),
+ first_segment_received_(false),
+ icn_socket_(icn_socket),
+ transport_(transport),
+ reassembly_(reassembly) {}
+
+void IndexManager::onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) {
+ if (first_segment_received_) {
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ } else {
+ std::uint32_t segment_number = interest->getName().getSuffix();
+
+ if (segment_number == 0) {
+ // Check if manifest
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ IncrementalIndexer *indexer =
+ static_cast<IncrementalIndexer *>(indexer_.release());
+ indexer_ =
+ std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer));
+ delete indexer;
+ }
+
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ auto it = interest_data_set_.begin();
+ while (it != interest_data_set_.end()) {
+ indexer_->onContentObject(std::move(const_cast<core::Interest::Ptr &&>(it->first)), std::move(const_cast<core::ContentObject::Ptr &&>(it->second)));
+ it = interest_data_set_.erase(it);
+ }
+
+ first_segment_received_ = true;
+ } else {
+ interest_data_set_.emplace(std::move(interest), std::move(content_object));
+ }
+ }
+}
+
+void IndexManager::reset(std::uint32_t offset) {
+ indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
+ reassembly_);
+ first_segment_received_ = false;
+ interest_data_set_.clear();
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h
new file mode 100644
index 000000000..89751095e
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/indexer.h
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+#include <set>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+class Reassembly;
+class TransportProtocol;
+
+class Indexer {
+ public:
+ /**
+ *
+ */
+ virtual ~Indexer() = default;
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ virtual uint32_t getNextSuffix() = 0;
+
+ virtual void setFirstSuffix(uint32_t suffix) = 0;
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ virtual uint32_t getNextReassemblySegment() = 0;
+
+ virtual bool isFinalSuffixDiscovered() = 0;
+
+ virtual uint32_t getFinalSuffix() = 0;
+
+ virtual void reset(std::uint32_t offset = 0) = 0;
+
+ virtual void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) = 0;
+};
+
+class IndexManager : Indexer {
+ public:
+ static constexpr uint32_t invalid_index = ~0;
+
+ IndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly);
+
+ uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); }
+
+ void setFirstSuffix(uint32_t suffix) override {
+ indexer_->setFirstSuffix(suffix);
+ }
+
+ uint32_t getNextReassemblySegment() override {
+ return indexer_->getNextReassemblySegment();
+ }
+
+ bool isFinalSuffixDiscovered() override {
+ return indexer_->isFinalSuffixDiscovered();
+ }
+
+ uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); }
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ private:
+ std::unique_ptr<Indexer> indexer_;
+ bool first_segment_received_;
+ std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ interest_data_set_;
+ interface::ConsumerSocket *icn_socket_;
+ TransportProtocol *transport_;
+ Reassembly *reassembly_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
new file mode 100644
index 000000000..592daa4d4
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
@@ -0,0 +1,232 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_incremental_indexer.h>
+
+#include <cmath>
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ManifestIncrementalIndexer::ManifestIncrementalIndexer(
+ interface::ConsumerSocket *icn_socket, TransportProtocol *transport,
+ Reassembly *reassembly)
+ : IncrementalIndexer(icn_socket, transport, reassembly),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ NextSegmentCalculationStrategy::INCREMENTAL,
+ next_download_suffix_, 0)) {}
+
+void ManifestIncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ // Check if mainfiest or not
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ onUntrustedManifest(std::move(interest), std::move(content_object));
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ onUntrustedContentObject(std::move(interest), std::move(content_object));
+ }
+}
+
+void ManifestIncrementalIndexer::onUntrustedManifest(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ processTrustedManifest(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET:
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+void ManifestIncrementalIndexer::processTrustedManifest(
+ ContentObject::Ptr &&content_object) {
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(*content_object));
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
+ core::ManifestVersion::VERSION_1)) {
+ throw errors::RuntimeException("Received manifest with unknown version.");
+ }
+
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = manifest->getSuffixList().end();
+
+ suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber());
+
+ for (; _it != _end; _it++) {
+ auto hash =
+ std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+
+ if (!checkUnverifiedSegments(_it->first, hash)) {
+ suffix_hash_map_[_it->first] = std::move(hash);
+ }
+ }
+
+ reassembly_->reassemble(std::move(manifest));
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+}
+
+bool ManifestIncrementalIndexer::checkUnverifiedSegments(
+ std::uint32_t suffix, const HashEntry &hash) {
+ auto it = unverified_segments_.find(suffix);
+
+ if (it != unverified_segments_.end()) {
+ auto ret = verifyContentObject(hash, *it->second.second);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(it->second.first),
+ std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+
+ unverified_segments_.erase(it);
+ return true;
+ }
+
+ return false;
+}
+
+VerificationPolicy ManifestIncrementalIndexer::verifyContentObject(
+ const HashEntry &manifest_hash, const ContentObject &content_object) {
+ VerificationPolicy ret;
+
+ auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second);
+ auto data_packet_digest = content_object.computeDigest(manifest_hash.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(
+ data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ } else {
+ ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+ ret = (*verification_failed_callback)(
+ *socket_, content_object,
+ make_error_code(protocol_error::integrity_verification_failed));
+ }
+
+ return ret;
+}
+
+void ManifestIncrementalIndexer::onUntrustedContentObject(
+ Interest::Ptr &&i, ContentObject::Ptr &&c) {
+ auto suffix = c->getName().getSuffix();
+ auto it = suffix_hash_map_.find(suffix);
+
+ if (it != suffix_hash_map_.end()) {
+ auto ret = verifyContentObject(it->second, *c);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ suffix_hash_map_.erase(it);
+ reassembly_->reassemble(std::move(c));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(i), std::move(c));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+ } else {
+ unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c));
+ }
+}
+
+uint32_t ManifestIncrementalIndexer::getNextSuffix() {
+ auto ret = suffix_strategy_->getNextSuffix();
+
+ if (ret <= suffix_strategy_->getFinalSuffix() &&
+ ret != utils::SuffixStrategy::INVALID_SUFFIX) {
+ suffix_queue_.push(ret);
+ return ret;
+ }
+
+ return IndexManager::invalid_index;
+}
+
+uint32_t ManifestIncrementalIndexer::getFinalSuffix() {
+ return suffix_strategy_->getFinalSuffix();
+}
+
+bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() {
+ return IncrementalIndexer::isFinalSuffixDiscovered();
+}
+
+uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() {
+ if (suffix_queue_.empty()) {
+ return IndexManager::invalid_index;
+ }
+
+ auto ret = suffix_queue_.front();
+ suffix_queue_.pop();
+ return ret;
+}
+
+void ManifestIncrementalIndexer::reset(std::uint32_t offset) {
+ IncrementalIndexer::reset(offset);
+ suffix_hash_map_.clear();
+ unverified_segments_.clear();
+ SuffixQueue empty;
+ std::swap(suffix_queue_, empty);
+ suffix_strategy_->reset(offset);
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
new file mode 100644
index 000000000..6e991f86f
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/incremental_indexer.h>
+#include <hicn/transport/utils/suffix_strategy.h>
+
+#include <list>
+
+namespace transport {
+
+namespace protocol {
+
+class ManifestIncrementalIndexer : public IncrementalIndexer {
+ static constexpr double alpha = 0.3;
+
+ public:
+ using SuffixQueue = std::queue<uint32_t>;
+ using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
+
+ ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly);
+
+ ManifestIncrementalIndexer(IncrementalIndexer &&indexer)
+ : IncrementalIndexer(std::move(indexer)),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ core::NextSegmentCalculationStrategy::INCREMENTAL,
+ next_download_suffix_, 0)) {
+ for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) {
+ suffix_queue_.push(i);
+ }
+ }
+
+ virtual ~ManifestIncrementalIndexer() = default;
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ uint32_t getNextSuffix() override;
+
+ uint32_t getNextReassemblySegment() override;
+
+ bool isFinalSuffixDiscovered() override;
+
+ uint32_t getFinalSuffix() override;
+
+ private:
+ void onUntrustedManifest(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void onUntrustedContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void processTrustedManifest(core::ContentObject::Ptr &&content_object);
+ void onManifestReceived(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c);
+ void onManifestTimeout(core::Interest::Ptr &&i);
+ VerificationPolicy verifyContentObject(
+ const HashEntry &manifest_hash,
+ const core::ContentObject &content_object);
+ bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash);
+
+ protected:
+ std::unique_ptr<utils::SuffixStrategy> suffix_strategy_;
+ SuffixQueue suffix_queue_;
+
+ // Hash verification
+ std::unordered_map<uint32_t, HashEntry> suffix_hash_map_;
+
+ std::unordered_map<uint32_t,
+ std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ unverified_segments_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
deleted file mode 100644
index ea13bf9e6..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
-
-#include <cmath>
-#include <deque>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-ManifestIndexManager::ManifestIndexManager(
- interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
- : IncrementalIndexManager(icn_socket),
- PacketManager<Interest>(1024),
- next_to_retrieve_segment_(suffix_queue_.end()),
- suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
- next_reassembly_segment_(
- core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true),
- ignored_segments_(),
- next_interest_(next_interest) {}
-
-bool ManifestIndexManager::onManifest(
- core::ContentObject::Ptr &&content_object) {
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(*content_object));
- bool manifest_verified = verification_manager_->onPacketToVerify(*manifest);
-
- if (manifest_verified) {
- manifest->decode();
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
- core::ManifestVersion::VERSION_1)) {
- throw errors::RuntimeException("Received manifest with unknown version.");
- }
-
- switch (manifest->getManifestType()) {
- case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = manifest->getSuffixList().end();
- size_t nb_segments = std::distance(_it, _end);
- final_suffix_ = manifest->getFinalBlockNumber(); // final block number
-
- suffix_hash_map_[_it->first] =
- std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
-
- // If the transport protocol finished the list of segments to retrieve,
- // reset the next_to_retrieve_segment_ iterator to the next segment
- // provided by this manifest.
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- next_to_retrieve_segment_ = --suffix_queue_.end();
- }
-
- std::advance(_it, 1);
- for (; _it != _end; _it++) {
- suffix_hash_map_[_it->first] = std::make_pair(
- std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
- core::NextSegmentCalculationStrategy strategy =
- manifest->getNextSegmentCalculationStrategy();
-
- suffix_manifest_.reset(0);
- suffix_manifest_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- TRANSPORT_LOGD("Capacity of 1st manifest %zu",
- suffix_manifest_.getNbSegments());
-
- next_reassembly_segment_.reset(*suffix_queue_.begin());
- next_reassembly_segment_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- }
-
- // If the manifest is not full, we add the suffixes of missing segments
- // to the list of segments to ignore when computing the next reassembly
- // index.
- if (TRANSPORT_EXPECT_FALSE(
- suffix_manifest_.getNbSegments() - nb_segments > 0)) {
- auto start = manifest->getSuffixList().begin();
- auto last = --_end;
- for (uint32_t i = last->first + 1;
- i < start->first + suffix_manifest_.getNbSegments(); i++) {
- ignored_segments_.push_back(i);
- }
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) {
- fillWindow(manifest->getWritableName(),
- manifest->getName().getSuffix());
- }
-
- break;
- }
- case core::ManifestType::FLIC_MANIFEST: {
- throw errors::NotImplementedException();
- }
- case core::ManifestType::FINAL_CHUNK_NUMBER: {
- throw errors::NotImplementedException();
- }
- }
- }
-
- return manifest_verified;
-}
-
-void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
- ContentObject::Ptr &&c) {
- onManifest(std::move(c));
- if (next_interest_) {
- next_interest_->scheduleNextInterests();
- }
-}
-
-void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
- const Name &n = i->getName();
- uint32_t segment = n.getSuffix();
-
- if (segment > final_suffix_) {
- return;
- }
-
- // Get portal
- std::shared_ptr<interface::BasePortal> portal;
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
-
- // Send requests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(i),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
-}
-
-void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) {
- /* Send as many manifest as required for filling window. */
- uint32_t interest_lifetime;
- double window_size;
- std::shared_ptr<interface::BasePortal> portal;
- Interest::Ptr interest;
- uint32_t current_segment = *next_to_retrieve_segment_;
- // suffix_manifest_ now points to the next manifest to request
- uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix();
-
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
- socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- window_size);
-
- if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- if (current_segment + window_size < suffix_manifest_.getSuffix() &&
- current_manifest != last_requested_manifest) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- do {
- interest = getPacket();
- name.setSuffix(suffix_manifest_.getSuffix());
- interest->setName(name);
- interest->setLifetime(interest_lifetime);
-
- // Send interests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(interest),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
-
- last_requested_manifest = (suffix_manifest_++).getSuffix();
- } while (current_segment + window_size >= suffix_manifest_.getSuffix() &&
- suffix_manifest_.getSuffix() < final_suffix_);
-
- // suffix_manifest_ now points to the last requested manifest
- suffix_manifest_.updateSuffix(last_requested_manifest);
-}
-
-bool ManifestIndexManager::onContentObject(
- const core::ContentObject &content_object) {
- bool verify_signature;
- socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
-
- if (!verify_signature) {
- return true;
- }
-
- uint64_t segment = content_object.getName().getSuffix();
-
- bool ret = false;
-
- auto it = suffix_hash_map_.find((const unsigned int)segment);
- if (it != suffix_hash_map_.end()) {
- auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
- auto data_packet_digest = content_object.computeDigest(it->second.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
-
- if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
- manifest_digest_bytes.data(),
- hash_type)) {
- suffix_hash_map_.erase(it);
- ret = true;
- } else {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
- }
-
- return ret;
-}
-
-uint32_t ManifestIndexManager::getNextSuffix() {
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- return invalid_index;
- }
-
- return *next_to_retrieve_segment_++;
-}
-
-uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; }
-
-bool ManifestIndexManager::isFinalSuffixDiscovered() {
- return IncrementalIndexManager::isFinalSuffixDiscovered();
-}
-
-uint32_t ManifestIndexManager::getNextReassemblySegment() {
- uint32_t current_reassembly_segment;
-
- while (true) {
- current_reassembly_segment = next_reassembly_segment_.getSuffix();
- next_reassembly_segment_++;
-
- if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) {
- return invalid_index;
- }
-
- if (ignored_segments_.empty()) break;
-
- auto is_ignored =
- std::find(ignored_segments_.begin(), ignored_segments_.end(),
- current_reassembly_segment);
-
- if (is_ignored == ignored_segments_.end()) break;
-
- ignored_segments_.erase(is_ignored);
- }
-
- return current_reassembly_segment;
-}
-
-void ManifestIndexManager::reset() {
- IncrementalIndexManager::reset();
- suffix_manifest_.reset(0);
- suffix_queue_.clear();
- suffix_hash_map_.clear();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
deleted file mode 100644
index 645b20e9a..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/protocols/indexing_manager.h>
-#include <hicn/transport/utils/suffix_strategy.h>
-
-#include <list>
-
-namespace transport {
-
-namespace protocol {
-
-class ManifestIndexManager : public IncrementalIndexManager,
- public PacketManager<Interest> {
- static constexpr double alpha = 0.3;
-
- public:
- using SuffixQueue = std::list<uint32_t>;
- using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
-
- ManifestIndexManager(interface::ConsumerSocket *icn_socket,
- TransportProtocol *next_interest);
-
- virtual ~ManifestIndexManager() = default;
-
- void reset() override;
-
- bool onManifest(core::ContentObject::Ptr &&content_object) override;
-
- bool onContentObject(const core::ContentObject &content_object) override;
-
- uint32_t getNextSuffix() override;
-
- uint32_t getNextReassemblySegment() override;
-
- bool isFinalSuffixDiscovered() override;
-
- uint32_t getFinalSuffix() override;
-
- private:
- void onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c);
- void onManifestTimeout(Interest::Ptr &&i);
- void fillWindow(Name &name, uint32_t current_manifest);
-
- protected:
- SuffixQueue suffix_queue_;
- SuffixQueue::iterator next_to_retrieve_segment_;
- utils::SuffixManifest suffix_manifest_;
- utils::SuffixContent next_reassembly_segment_;
-
- // Holds segments that should not be requested. Useful when
- // computing the next reassembly segment because some manifests
- // may be incomplete.
- std::vector<uint32_t> ignored_segments_;
-
- // Hash verification
- std::unordered_map<uint32_t,
- std::pair<std::vector<uint8_t>, core::HashAlgorithm>>
- suffix_hash_map_;
-
- // (temporary) To call scheduleNextInterests() after receiving a manifest
- TransportProtocol *next_interest_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
index 8da9529d6..a0f847453 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -22,9 +22,16 @@ namespace protocol {
using namespace interface;
-TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket)
- : socket_(icn_socket), is_running_(false), is_first_(false) {
+TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol)
+ : socket_(icn_socket),
+ reassembly_protocol_(reassembly_protocol),
+ index_manager_(
+ std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
+ is_running_(false),
+ is_first_(false) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+ socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
int TransportProtocol::start() {
@@ -71,6 +78,26 @@ void TransportProtocol::resume() {
is_running_ = false;
}
+void TransportProtocol::onContentReassembled(std::error_code ec) {
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (!on_payload) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_->getBytesRecv());
+ } else {
+ on_payload->readError(ec);
+ }
+
+ stop();
+}
+
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index e4821b6a0..87fab588b 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -18,7 +18,10 @@
#include <atomic>
#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/data_processing_events.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/packet_manager.h>
+#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -28,6 +31,8 @@ namespace protocol {
using namespace core;
+class IndexVerificationManager;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -35,11 +40,15 @@ class TransportProtocolCallback {
};
class TransportProtocol : public interface::BasePortal::ConsumerCallback,
- public PacketManager<Interest> {
+ public PacketManager<Interest>,
+ public ContentObjectProcessingEventCallback {
static constexpr std::size_t interest_pool_size = 4096;
+ friend class ManifestIndexManager;
+
public:
- TransportProtocol(interface::ConsumerSocket *icn_socket);
+ TransportProtocol(interface::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol);
virtual ~TransportProtocol() = default;
@@ -53,6 +62,12 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
virtual void scheduleNextInterests() = 0;
+ // Events generated by the indexing
+ virtual void onContentReassembled(std::error_code ec);
+ virtual void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+
protected:
// Consumer Callback
virtual void reset() = 0;
@@ -61,13 +76,14 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
protected:
interface::ConsumerSocket *socket_;
+ std::unique_ptr<Reassembly> reassembly_protocol_;
+ std::unique_ptr<IndexManager> index_manager_;
std::shared_ptr<interface::BasePortal> portal_;
std::atomic<bool> is_running_;
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
- TransportStatistics stats_;
+ TransportStatistics *stats_;
};
} // end namespace protocol
-
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index a57eb7cd9..641ae45c3 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -14,8 +14,9 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/raaqm.h>
+#include <hicn/transport/protocols/errors.h>
#include <cstdlib>
#include <fstream>
@@ -26,9 +27,8 @@ namespace protocol {
using namespace interface;
-RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
- BaseReassembly(icnet_socket, this, this),
+RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
@@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() {
// Set first segment to retrieve
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ index_manager_->reset();
index_manager_->setFirstSuffix(name->getSuffix());
std::queue<Interest::Ptr> empty;
std::swap(interest_to_retransmit_, empty);
- stats_.reset();
+ stats_->reset();
// Reset reassembly component
- BaseReassembly::reset();
+ reassembly_protocol_->reInitialize();
// Reset protocol variables
interests_in_flight_ = 0;
@@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() {
void RaaqmTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
-
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject(
(*callback_interest)(*socket_, *interest);
}
- if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
- PayloadType::MANIFEST)) {
- if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
- index_manager_ = manifest_index_manager_.get();
- interests_in_flight_--;
- }
-
- index_manager_->onManifest(std::move(content_object));
-
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- onContentSegment(std::move(interest), std::move(content_object));
+ if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ stats_->updateBytesRecv(content_object->payloadSize());
}
+ onContentSegment(std::move(interest), std::move(content_object));
scheduleNextInterests();
}
void RaaqmTransportProtocol::onContentSegment(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
uint32_t incremental_suffix = content_object->getName().getSuffix();
- bool virtual_download = false;
- socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download);
// Decrease in-flight interests
interests_in_flight_--;
@@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment(
afterContentReception(*interest, *content_object);
}
- if (index_manager_->onContentObject(*content_object)) {
- stats_.updateBytesRecv(content_object->payloadSize());
+ index_manager_->onContentObject(std::move(interest),
+ std::move(content_object));
+}
- if (!virtual_download) {
- reassemble(std::move(content_object));
- } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
- index_manager_->getFinalSuffix())) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
+void RaaqmTransportProtocol::onPacketDropped(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- if (on_payload) {
- on_payload->readSuccess(stats_.getBytesRecv());
- }
+ uint64_t segment = interest->getName().getSuffix();
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_->updateRetxCount(1);
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
}
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
} else {
- // TODO Application policy check
- // unverified_segments_.emplace(
- // std::make_pair(incremental_suffix, std::move(content_object)));
- TRANSPORT_LOGE("Received not trusted segment.");
+ TRANSPORT_LOGE(
+ "Stop: received not trusted packet %llu times",
+ (unsigned long long)interest_retransmissions_[segment & mask]);
+ onContentReassembled(
+ make_error_code(protocol_error::max_retransmissions_error));
}
}
+void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
+
+}
+
void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
checkForStalePaths();
@@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
uint64_t segment = n.getSuffix();
// Do not retransmit interests asking contents that do not exist.
- if (segment >= index_manager_->getFinalSuffix()) {
+ if (segment > index_manager_->getFinalSuffix()) {
return;
}
@@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
- stats_.updateRetxCount(1);
+ stats_->updateRetxCount(1);
callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
@@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before "
- "starting "
- "the content retrieval.");
- }
-
- if (!ec) {
- on_payload->readSuccess(stats_.getBytesRecv());
- } else {
- on_payload->readError(ec);
- }
-
rate_estimator_->onDownloadFinished();
- stop();
+ TransportProtocol::onContentReassembled(ec);
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
@@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() {
// Change drop probability according to RTT statistics
cur_path_->updateDropProb();
- double coin = ((double) rand() / (RAND_MAX));
+ double coin = ((double)rand() / (RAND_MAX));
if (coin <= cur_path_->getDropProb()) {
decreaseWindow();
}
@@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() {
void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
utils::TimePoint &now) {
// Update RTT statistics
- stats_.updateAverageRtt(rtt);
- stats_.updateAverageWindowSize(current_window_size_);
+ stats_->updateAverageRtt(rtt);
+ stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
ConsumerTimerCallback *stats_callback = VOID_HANDLER;
@@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_, stats_);
+ (*stats_callback)(*socket_, *stats_);
t0_ = utils::SteadyClock::now();
}
}
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
index 09d22cd4f..7fc540c9f 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.h
+++ b/libtransport/src/hicn/transport/protocols/raaqm.h
@@ -15,11 +15,11 @@
#pragma once
+#include <hicn/transport/protocols/byte_stream_reassembly.h>
#include <hicn/transport/protocols/congestion_window_protocol.h>
#include <hicn/transport/protocols/protocol.h>
#include <hicn/transport/protocols/raaqm_data_path.h>
#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <queue>
@@ -29,11 +29,8 @@ namespace transport {
namespace protocol {
-class RaaqmTransportProtocol
- : public TransportProtocol,
- public BaseReassembly,
- public CWindowProtocol,
- public BaseReassembly::ContentReassembledCallback {
+class RaaqmTransportProtocol : public TransportProtocol,
+ public CWindowProtocol {
public:
RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket);
@@ -70,6 +67,11 @@ class RaaqmTransportProtocol
void onContentSegment(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object);
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override;
+
+ void onReassemblyFailed(std::uint32_t missing_segment) override;
+
void onTimeout(Interest::Ptr &&i) override;
virtual void scheduleNextInterests() override;
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index c45d876a0..9682d338d 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -14,7 +14,8 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/protocols/errors.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
@@ -23,66 +24,7 @@ namespace transport {
namespace protocol {
-BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback,
- TransportProtocol *next_interest)
- : reassembly_consumer_socket_(icn_socket),
- incremental_index_manager_(
- std::make_unique<IncrementalIndexManager>(icn_socket)),
- manifest_index_manager_(
- std::make_unique<ManifestIndexManager>(icn_socket, next_interest)),
- index_manager_(incremental_index_manager_.get()),
- index_(0),
- read_buffer_(nullptr) {
- setContentCallback(content_callback);
-}
-
-void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
- }
-
- auto it = received_packets_.find((const unsigned int)index_);
- while (it != received_packets_.end()) {
- if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- copyContent(*it->second);
- received_packets_.erase(it);
- }
-
- index_ = index_manager_->getNextReassemblySegment();
- it = received_packets_.find((const unsigned int)index_);
- }
-}
-
-void BaseReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
-
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
- read_buffer_->append(write_size);
-
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
-
- bool download_completed =
- index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
-
- if (TRANSPORT_EXPECT_FALSE(download_completed)) {
- notifyApplication();
- content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
- }
-}
-
-void BaseReassembly::notifyApplication() {
+void Reassembly::notifyApplication() {
interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
reassembly_consumer_socket_->getSocketOption(
interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
@@ -124,21 +66,5 @@ void BaseReassembly::notifyApplication() {
}
}
-void BaseReassembly::reset() {
- manifest_index_manager_->reset();
- incremental_index_manager_->reset();
- index_ = index_manager_->getNextReassemblySegment();
-
- received_packets_.clear();
-
- // reset read buffer
- interface::ConsumerSocket::ReadCallback *read_callback;
- reassembly_consumer_socket_->getSocketOption(
- interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
-
- read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
-}
-
} // namespace protocol
-
} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index e859ca294..34af2a70a 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -15,17 +15,20 @@
#pragma once
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
+#include <hicn/transport/core/facade.h>
namespace transport {
namespace interface {
class ConsumerReadCallback;
-}
+class ConsumerSocket;
+} // namespace interface
namespace protocol {
+class TransportProtocol;
+class Indexer;
+
// Forward Declaration
class ManifestManager;
@@ -36,41 +39,26 @@ class Reassembly {
virtual void onContentReassembled(std::error_code ec) = 0;
};
- virtual void reassemble(ContentObject::Ptr &&content_object) = 0;
- virtual void reset() = 0;
- virtual void setContentCallback(ContentReassembledCallback *callback) {
- content_callback_ = callback;
- }
+ Reassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : reassembly_consumer_socket_(icn_socket),
+ transport_protocol_(transport_protocol) {}
- protected:
- ContentReassembledCallback *content_callback_;
-};
+ virtual ~Reassembly() = default;
-class BaseReassembly : public Reassembly {
- public:
- BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback,
- TransportProtocol *next_interest);
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
+ virtual void reInitialize() = 0;
+ virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; }
protected:
- virtual void reassemble(ContentObject::Ptr &&content_object) override;
-
- virtual void copyContent(const ContentObject &content_object);
-
- virtual void reset() override;
-
- private:
- void notifyApplication();
+ virtual void notifyApplication();
protected:
- // The consumer socket
interface::ConsumerSocket *reassembly_consumer_socket_;
- std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
- std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
- IndexVerificationManager *index_manager_;
- std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_;
-
- uint32_t index_;
+ TransportProtocol *transport_protocol_;
+ Indexer *index_manager_;
std::unique_ptr<utils::MemBuf> read_buffer_;
};
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 559e86592..e371217f8 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -13,11 +13,12 @@
* limitations under the License.
*/
-#include <math.h>
-#include <random>
+#include <hicn/transport/protocols/rtc.h>
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/rtc.h>
+
+#include <math.h>
+#include <random>
namespace transport {
@@ -26,14 +27,16 @@ namespace protocol {
using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
- interface::ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
+ interface::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
- icnet_socket->getSocketOption(PORTAL, portal_);
+ icn_socket->getSocketOption(PORTAL, portal_);
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ sentinel_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
reset();
}
@@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) {
}
void RTCTransportProtocol::newRound() {
- round_timer_->expires_from_now(std::chrono::milliseconds(
- HICN_ROUND_LEN));
+ round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN));
round_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
updateStats(HICN_ROUND_LEN);
@@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
&stats_callback);
if (*stats_callback) {
// Send the stats to the app
- stats_.updateQueuingDelay(queuingDelay_);
- stats_.updateLossRatio(lossRate_);
- stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_, stats_);
+ stats_->updateQueuingDelay(queuingDelay_);
+ stats_->updateLossRatio(lossRate_);
+ stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
+ (*stats_callback)(*socket_, *stats_);
}
// bound also by interest lifitime* production rate
@@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
updateCCState();
updateWindow();
- if(queuingDelay_ > 25.0){
- //this indicates that the client will go soon out of synch,
- //switch to synch mode
+ if (queuingDelay_ > 25.0) {
+ // this indicates that the client will go soon out of synch,
+ // switch to synch mode
if (currentState_ == HICN_RTC_NORMAL_STATE) {
currentState_ = HICN_RTC_SYNC_STATE;
}
@@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
maxCWin_ = min(maxWaintingInterest, maxCWin_);
}
- if(maxCWin_ < HICN_MIN_CWIN)
- maxCWin_ = HICN_MIN_CWIN;
+ if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN;
}
void RTCTransportProtocol::updateWindow() {
@@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
}
-void RTCTransportProtocol::sentinelTimer(){
+void RTCTransportProtocol::sentinelTimer() {
uint32_t wait = 50;
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
- //we have all the info to set the timers
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) {
+ // we have all the info to set the timers
wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
- if(wait == 0)
- wait = 1;
+ if (wait == 0) wait = 1;
}
sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
sentinel_timer_->async_wait([this](std::error_code ec) {
-
if (ec) return;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){
- //we have no info, so we send again
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
- uint32_t pkt = it->first & modMask_;
- if (inflightInterests_[pkt].sequence == it->first) {
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
- }
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) {
+ // we have no info, so we send again
+
+ for (auto it = packets_in_window_.begin(); it != packets_in_window_.end();
+ it++) {
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
- }else{
- uint64_t max_waiting_time = //wait at least 50ms
- (pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt()) +
- (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
+ }
+ } else {
+ uint64_t max_waiting_time = // wait at least 50ms
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
- if((currentState_ == HICN_RTC_NORMAL_STATE) &&
+ if ((currentState_ == HICN_RTC_NORMAL_STATE) &&
(inflightInterestsCount_ >= currentCWin_) &&
- ((now - lastEvent_) > max_waiting_time) &&
- (lossRate_ >= 0.05)){
+ ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) {
+ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
- uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
+ for (auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++) {
uint32_t pkt = it->first & modMask_;
if (inflightInterests_[pkt].sequence == it->first &&
- ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
+ ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
}
}
@@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
// and over until we get at least a packet
inflightInterestsCount_--;
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
scheduleNextInterests();
return;
@@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
@@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject(
return;
}
- //check if the packet is a rtx
+ // check if the packet is a rtx
bool is_rtx = false;
- if(interestRetransmissions_.find(segmentNumber) !=
- interestRetransmissions_.end()){
+ if (interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end()) {
is_rtx = true;
- }else{
+ } else {
auto it_win = packets_in_window_.find(segmentNumber);
- if(it_win != packets_in_window_.end() &&
- it_win->second != 0)
- is_rtx = true;
+ if (it_win != packets_in_window_.end() && it_win->second != 0)
+ is_rtx = true;
}
if (payload_size == HICN_NACK_HEADER_SIZE) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
bool old_nack = false;
- if (!is_rtx){
+ if (!is_rtx) {
// this is not a retransmitted packet
old_nack = onNack(*content_object, false);
updateDelayStats(*content_object);
@@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject(
// the nacked_ state is used only to avoid to decrease
// inflightInterestsCount_ multiple times. In fact, every time that we
// receive an event related to an interest (timeout, nacked, content) we
- // cange the state. In this way we are sure that we do not decrease twice the
- // counter
+ // cange the state. In this way we are sure that we do not decrease twice
+ // the counter
if (old_nack) {
inflightInterests_[pkt].state = lost_;
interestRetransmissions_.erase(segmentNumber);
@@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject(
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--; // packet sent without timeouts
}
- if (inflightInterests_[pkt].state == sent_ && !is_rtx){
+ if (inflightInterests_[pkt].state == sent_ && !is_rtx) {
// delay stats are computed only for non retransmitted data
updateDelayStats(*content_object);
}
@@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject(
scheduleNextInterests();
}
-void RTCTransportProtocol::returnContentToApplication(
- const ContentObject &content_object) {
- // return content to the user
- auto read_buffer = content_object.getPayload();
-
- read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
-
- interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
- socket_->getSocketOption(READ_CALLBACK, &read_callback);
-
- if (read_callback == nullptr) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before starting "
- "the content retrieval.");
- }
-
- if (read_callback->isBufferMovable()) {
- read_callback->readBufferAvailable(
- utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
- } else {
- // The buffer will be copied into the application-provided buffer
- uint8_t *buffer;
- std::size_t length;
- std::size_t total_length = read_buffer->length();
-
- while (read_buffer->length()) {
- buffer = nullptr;
- length = 0;
- read_callback->getReadBuffer(&buffer, &length);
-
- if (!buffer || !length) {
- throw errors::RuntimeException(
- "Invalid buffer provided by the application.");
- }
-
- auto to_copy = std::min(read_buffer->length(), length);
-
- std::memcpy(buffer, read_buffer->data(), to_copy);
- read_buffer->trimStart(to_copy);
- }
-
- read_callback->readDataAvailable(total_length);
- read_buffer->clear();
- }
-}
-
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 2b9ed10a6..9e1731e96 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -15,12 +15,12 @@
#pragma once
-#include <queue>
#include <map>
+#include <queue>
#include <unordered_map>
+#include <hicn/transport/protocols/datagram_reassembly.h>
#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/rtc_data_path.h>
// algorithm state
@@ -35,26 +35,27 @@
#define HICN_TIMESTAMP_SIZE 8 // bytes
#define HICN_RTC_INTEREST_LIFETIME 1000 // ms
-//rtt measurement
-//normal interests for data goes from 0 to
-//HICN_MIN_PROBE_SEQ, the rest is reserverd for
-//probes
+// rtt measurement
+// normal interests for data goes from 0 to
+// HICN_MIN_PROBE_SEQ, the rest is reserverd for
+// probes
#define HICN_MIN_PROBE_SEQ 0xefffffff
#define HICN_MAX_PROBE_SEQ 0xffffffff
// controller constant
-#define HICN_ROUND_LEN 200 // ms interval of time on which
- // we take decisions / measurements
+#define HICN_ROUND_LEN \
+ 200 // ms interval of time on which
+ // we take decisions / measurements
#define HICN_MAX_RTX 10
#define HICN_MAX_RTX_SIZE 1024
#define HICN_MAX_RTX_MAX_AGE 10000
-#define HICN_MIN_RTT_WIN 30 // rounds
-#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms
+#define HICN_MIN_RTT_WIN 30 // rounds
+#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms
// cwin
#define HICN_INITIAL_CWIN 1 // packets
#define HICN_INITIAL_CWIN_MAX 100000 // packets
-#define HICN_MIN_CWIN 10 // packets
+#define HICN_MIN_CWIN 10 // packets
#define HICN_WIN_INCREASE_FACTOR 1.5
#define HICN_WIN_DECREASE_FACTOR 0.9
@@ -70,30 +71,23 @@
#define HICN_MICRO_IN_A_SEC 1000000
#define HICN_MILLI_IN_A_SEC 1000
-
namespace transport {
namespace protocol {
-enum packetState {
- sent_,
- nacked_,
- received_,
- timeout1_,
- timeout2_,
- lost_
-};
+enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ };
typedef enum packetState packetState_t;
struct sentInterest {
uint64_t transmissionTime;
- uint32_t sequence; //sequence number of the interest sent
- //to handle seq % buffer_size
- packetState_t state; //see packet state
+ uint32_t sequence; // sequence number of the interest sent
+ // to handle seq % buffer_size
+ packetState_t state; // see packet state
};
-class RTCTransportProtocol : public TransportProtocol, public Reassembly {
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
public:
RTCTransportProtocol(interface::ConsumerSocket *icnet_socket);
@@ -133,11 +127,16 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
bool onNack(const ContentObject &content_object, bool rtx);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
- void returnContentToApplication(const ContentObject &content_object);
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
TRANSPORT_ALWAYS_INLINE virtual void reassemble(
ContentObject::Ptr &&content_object) override {
- returnContentToApplication(*content_object);
+ auto read_buffer = content_object->getPayload();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
}
// controller var
@@ -151,36 +150,36 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// names/packets var
uint32_t actualSegment_;
uint32_t inflightInterestsCount_;
- //map seq to rtx
+ // map seq to rtx
std::map<uint32_t, uint8_t> interestRetransmissions_;
bool rtx_timer_used_;
std::unique_ptr<asio::steady_timer> rtx_timer_;
std::vector<sentInterest> inflightInterests_;
- uint32_t lastSegNacked_; //indicates the segment id in the last received
- // past Nack. we do not ask for retransmissions
- //for samething that is older than this value.
- uint32_t lastReceived_; //segment of the last content object received
- //indicates the base of the window on the client
- uint64_t lastReceivedTime_; //time at which we recevied the
- //lastReceived_ packet
-
- //sentinel
- //if all packets in the window get lost we need something that
- //wakes up our consumer socket. Interest timeouts set to 1 sec
- //expire too late. This timers expire much sooner and if it
- //detects that all the interest in the window may be lost
- //it sends all of them again
+ uint32_t lastSegNacked_; // indicates the segment id in the last received
+ // past Nack. we do not ask for retransmissions
+ // for samething that is older than this value.
+ uint32_t lastReceived_; // segment of the last content object received
+ // indicates the base of the window on the client
+ uint64_t lastReceivedTime_; // time at which we recevied the
+ // lastReceived_ packet
+
+ // sentinel
+ // if all packets in the window get lost we need something that
+ // wakes up our consumer socket. Interest timeouts set to 1 sec
+ // expire too late. This timers expire much sooner and if it
+ // detects that all the interest in the window may be lost
+ // it sends all of them again
std::unique_ptr<asio::steady_timer> sentinel_timer_;
- uint64_t lastEvent_; //time at which we removed a pending
- //interest from the window
+ uint64_t lastEvent_; // time at which we removed a pending
+ // interest from the window
std::unordered_map<uint32_t, uint8_t> packets_in_window_;
- //rtt probes
- //the RTC transport tends to overestimate the RTT
- //du to the production time on the server side
- //once per second we send an interest for wich we know
- //we will get a nack. This nack will keep our estimation
- //close to the reality
+ // rtt probes
+ // the RTC transport tends to overestimate the RTT
+ // du to the production time on the server side
+ // once per second we send an interest for wich we know
+ // we will get a nack. This nack will keep our estimation
+ // close to the reality
std::unique_ptr<asio::steady_timer> probe_timer_;
uint64_t time_sent_probe_;
uint32_t probe_seq_number_;
@@ -203,10 +202,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t rounds_;
uint32_t roundsWithoutNacks_;
- //we keep track of up two paths (if only one path is in use
- //the two values in the vector will be the same)
- //position 0 stores the path with minRTT
- //position 1 stores the path with maxRTT
+ // we keep track of up two paths (if only one path is in use
+ // the two values in the vector will be the same)
+ // position 0 stores the path with minRTT
+ // position 1 stores the path with maxRTT
uint32_t producerPathLabels_[2];
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
@@ -219,7 +218,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
unsigned protocolState_;
bool initied;
- TransportStatistics stats_;
};
} // namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc
new file mode 100644
index 000000000..f45cab743
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.cc
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/verification_manager.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+
+namespace protocol {
+
+interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
+ const Packet& packet) {
+ using namespace interface;
+
+ bool verify_signature;
+ VerificationPolicy ret = VerificationPolicy::DROP_PACKET;
+
+ ConsumerContentObjectVerificationFailedCallback*
+ verification_failed_callback = VOID_HANDLER;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+
+ if (!verify_signature) {
+ return VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+ if (!verification_failed_callback) {
+ throw errors::RuntimeException(
+ "No verification failed callback provided by application. "
+ "Aborting.");
+ }
+
+ std::shared_ptr<utils::Verifier> verifier;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ if (TRANSPORT_EXPECT_FALSE(!verifier)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_, dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::no_verifier_provided));
+ return ret;
+ }
+
+ if (!verifier->verify(packet)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_, dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::signature_verification_failed));
+ } else {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ return ret;
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h
index da67e86f8..6e5d32127 100644
--- a/libtransport/src/hicn/transport/protocols/verification_manager.h
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.h
@@ -15,56 +15,37 @@
#pragma once
-#include <hicn/transport/interfaces/socket_consumer.h>
-
-#include <deque>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/verification_policy.h>
+#include <hicn/transport/protocols/errors.h>
namespace transport {
+namespace interface {
+class ConsumerSocket;
+}
+
namespace protocol {
+using Packet = core::Packet;
+using interface::ConsumerSocket;
+using interface::VerificationPolicy;
+
class VerificationManager {
public:
virtual ~VerificationManager() = default;
- virtual bool onPacketToVerify(const Packet& packet) = 0;
+ virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0;
};
class SignatureVerificationManager : public VerificationManager {
public:
- SignatureVerificationManager(interface::ConsumerSocket* icn_socket)
+ SignatureVerificationManager(ConsumerSocket* icn_socket)
: icn_socket_(icn_socket) {}
- TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override {
- using namespace interface;
-
- bool verify_signature, ret = false;
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
-
- if (!verify_signature) {
- return true;
- }
-
- std::shared_ptr<utils::Verifier> verifier;
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
-
- if (TRANSPORT_EXPECT_FALSE(!verifier)) {
- throw errors::RuntimeException(
- "No certificate provided by the application.");
- }
-
- ret = verifier->verify(packet);
-
- if (!ret) {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
-
- return ret;
- }
+ interface::VerificationPolicy onPacketToVerify(const Packet& packet) override;
private:
- interface::ConsumerSocket* icn_socket_;
+ ConsumerSocket* icn_socket_;
};
} // end namespace protocol