aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-07 20:00:06 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-12 18:40:52 +0100
commit3bce9bfdce707313de4f9cccdc867abd9edf82df (patch)
treebd7d75a7251888a3fc269fadebd59842c46a14a1 /libtransport
parentf9243a2bf823086404be1c41c7bcc1b27cfab7de (diff)
[HICN-508] [HICN-509] [HICN-506] Manifest rework
Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/core/manifest_format.h10
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.cc18
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.h7
-rw-r--r--libtransport/src/hicn/transport/interfaces/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.h15
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc60
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h16
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_keys.h11
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc75
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h3
-rw-r--r--libtransport/src/hicn/transport/interfaces/verification_policy.h33
-rw-r--r--libtransport/src/hicn/transport/protocols/CMakeLists.txt18
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc121
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h54
-rw-r--r--libtransport/src/hicn/transport/protocols/data_processing_events.h33
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.cc35
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.h39
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.cc60
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.cc52
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.h (renamed from libtransport/src/hicn/transport/protocols/indexing_manager.h)114
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.cc74
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.h102
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc232
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc293
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h82
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc31
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h24
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc118
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.h14
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc80
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h50
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc210
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h106
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.cc71
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.h49
-rw-r--r--libtransport/src/hicn/transport/utils/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/utils/suffix_strategy.cc73
-rw-r--r--libtransport/src/hicn/transport/utils/suffix_strategy.h185
40 files changed, 1714 insertions, 1038 deletions
diff --git a/libtransport/src/hicn/transport/core/manifest_format.h b/libtransport/src/hicn/transport/core/manifest_format.h
index 451e3db6a..9b6777270 100644
--- a/libtransport/src/hicn/transport/core/manifest_format.h
+++ b/libtransport/src/hicn/transport/core/manifest_format.h
@@ -51,8 +51,18 @@ enum class HashAlgorithm : uint8_t {
CRC32C = static_cast<uint8_t>(utils::CryptoHashType::CRC32C),
};
+/**
+ * INCREMENTAL: Manifests will be received inline with the data with no specific
+ * assumption regarding the manifest capacity. Consumers can send interests
+ * using a +1 heuristic.
+ *
+ * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of
+ * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when
+ * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3
+ */
enum class NextSegmentCalculationStrategy : uint8_t {
INCREMENTAL = 1,
+ MANIFEST_CAPACITY_BASED = 2,
};
template <typename T>
diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc
index 32269d49d..aa9cb0463 100644
--- a/libtransport/src/hicn/transport/http/client_connection.cc
+++ b/libtransport/src/hicn/transport/http/client_connection.cc
@@ -16,6 +16,8 @@
#include <hicn/transport/http/client_connection.h>
#include <hicn/transport/utils/hash.h>
+#include <fstream>
+
#define DEFAULT_BETA 0.99
#define DEFAULT_GAMMA 0.07
@@ -38,6 +40,12 @@ HTTPClientConnection::HTTPClientConnection()
std::placeholders::_2));
consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ (ConsumerContentObjectVerificationFailedCallback)std::bind(
+ &HTTPClientConnection::onSignatureVerificationFailed, this,
+ std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, false);
consumer_.connect();
std::shared_ptr<typename ConsumerSocket::Portal> portal;
@@ -87,6 +95,10 @@ HTTPClientConnection::RC HTTPClientConnection::sendRequest(
return return_code_;
}
+void HTTPClientConnection::verifyPacketSignature(bool verify) {
+ consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, verify);
+}
+
void HTTPClientConnection::sendRequestGetReply(
const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response,
std::string &ipv6_first_word) {
@@ -186,6 +198,12 @@ HTTPClientConnection &HTTPClientConnection::setCertificate(
return *this;
}
+VerificationPolicy HTTPClientConnection::onSignatureVerificationFailed(
+ ConsumerSocket &consumer, const core::ContentObject &content_object,
+ std::error_code reason) {
+ return VerificationPolicy::ACCEPT_PACKET;
+}
+
// Read buffer management
void HTTPClientConnection::readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h
index 5bcf9c4c7..e001653ab 100644
--- a/libtransport/src/hicn/transport/http/client_connection.h
+++ b/libtransport/src/hicn/transport/http/client_connection.h
@@ -20,6 +20,7 @@
#include <hicn/transport/http/response.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/interfaces/verification_policy.h>
#include <hicn/transport/utils/uri.h>
#include <vector>
@@ -68,6 +69,8 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback {
HTTPClientConnection &setCertificate(const std::string &cert_path);
+ void verifyPacketSignature(bool verify);
+
private:
void sendRequestGetReply(const HTTPRequest &request,
std::shared_ptr<HTTPResponse> &response,
@@ -80,6 +83,10 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback {
const core::Interest &interest,
std::string &payload);
+ VerificationPolicy onSignatureVerificationFailed(
+ ConsumerSocket &consumer, const core::ContentObject &content_object,
+ std::error_code reason);
+
// Read callback
bool isBufferMovable() noexcept override { return true; }
void getReadBuffer(uint8_t **application_buffer,
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
index 0c2c73623..1f3c29b1f 100644
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -22,6 +22,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h
${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h
)
list(APPEND SOURCE_FILES
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
index 7194cca42..6de48d14b 100644
--- a/libtransport/src/hicn/transport/interfaces/callbacks.h
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.h
@@ -15,11 +15,12 @@
#pragma once
+#include <hicn/transport/core/facade.h>
+#include <hicn/transport/interfaces/verification_policy.h>
+
#include <functional>
#include <system_error>
-#include <hicn/transport/core/facade.h>
-
namespace utils {
class MemBuf;
}
@@ -85,6 +86,16 @@ using ConsumerContentObjectVerificationCallback =
std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
/**
+ * The ConsumerContentObjectVerificationFailedCallback will be caled by the
+ * transport if a data packet (either manifest or content object) cannot be
+ * verified. The application here decides what to do by returning a
+ * VerificationFailedPolicy object.
+ */
+using ConsumerContentObjectVerificationFailedCallback =
+ std::function<VerificationPolicy(
+ ConsumerSocket &, const core::ContentObject &, std::error_code ec)>;
+
+/**
* The ConsumerManifestCallback will be called by the consumer socket when a
* manifest is received.
*/
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index fbe4bed1a..fba972fe5 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -417,6 +417,28 @@ int ConsumerSocket::setSocketOption(
});
}
+int ConsumerSocket::setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ verification_failed_callback_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
int ConsumerSocket::setSocketOption(int socket_option_key,
IcnObserver *socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
@@ -712,6 +734,29 @@ int ConsumerSocket::getSocketOption(
}
int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ *socket_option_value = &verification_failed_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
@@ -767,6 +812,19 @@ int ConsumerSocket::getSocketOption(int socket_option_key,
return SOCKET_OPTION_GET;
}
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ TransportStatistics **socket_option_value) {
+ switch (socket_option_key) {
+ case OtherOptions::STATISTICS:
+ *socket_option_value = &stats_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
int ConsumerSocket::getSocketOption(
int socket_option_key, ConsumerTimerCallback **socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in case
@@ -789,4 +847,4 @@ int ConsumerSocket::getSocketOption(
} // namespace interface
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 0f83fd38f..acce28c1d 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -18,6 +18,7 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/statistics.h>
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/verifier.h>
@@ -224,6 +225,10 @@ class ConsumerSocket : public BaseSocket {
virtual int setSocketOption(
int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value);
+
+ virtual int setSocketOption(
+ int socket_option_key,
ConsumerContentObjectVerificationCallback socket_option_value);
virtual int setSocketOption(int socket_option_key,
@@ -262,6 +267,10 @@ class ConsumerSocket : public BaseSocket {
virtual int getSocketOption(
int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value);
+
+ virtual int getSocketOption(
+ int socket_option_key,
ConsumerContentObjectVerificationCallback **socket_option_value);
virtual int getSocketOption(int socket_option_key,
@@ -286,6 +295,9 @@ class ConsumerSocket : public BaseSocket {
virtual int getSocketOption(int socket_option_key,
ConsumerTimerCallback **socket_option_value);
+ virtual int getSocketOption(int socket_option_key,
+ TransportStatistics **socket_option_value);
+
protected:
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
@@ -364,6 +376,7 @@ class ConsumerSocket : public BaseSocket {
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
ConsumerTimerCallback stats_summary_;
+ ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
@@ -375,6 +388,9 @@ class ConsumerSocket : public BaseSocket {
// Transport protocol
std::unique_ptr<TransportProtocol> transport_protocol_;
+ // Statistic
+ TransportStatistics stats_;
+
utils::SpinLock guard_raaqm_params_;
};
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
index e14f0f412..b25bacbb9 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -77,8 +77,9 @@ typedef enum {
CONTENT_OBJECT_INPUT = 411,
MANIFEST_INPUT = 412,
CONTENT_OBJECT_TO_VERIFY = 413,
- READ_CALLBACK = 414,
- STATS_SUMMARY = 415
+ VERIFICATION_FAILED = 414,
+ READ_CALLBACK = 415,
+ STATS_SUMMARY = 416
} ConsumerCallbacksOptions;
typedef enum {
@@ -96,7 +97,11 @@ typedef enum {
typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions;
-typedef enum { VIRTUAL_DOWNLOAD = 701, USE_CFG_FILE = 702 } OtherOptions;
+typedef enum {
+ VIRTUAL_DOWNLOAD = 701,
+ USE_CFG_FILE = 702,
+ STATISTICS
+} OtherOptions;
typedef enum {
SHA_256 = 801,
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 6782000ac..4fef5d1e2 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -38,8 +38,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
hash_algorithm_(HashAlgorithm::SHA_256),
- suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
- suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -159,8 +158,8 @@ uint32_t ProducerSocket::produce(Name content_name,
uint32_t content_object_expiry_time = content_object_expiry_time_;
HashAlgorithm hash_algo = hash_algorithm_;
bool making_manifest = making_manifest_;
- utils::SuffixContent suffix_content = suffix_content_;
- utils::SuffixManifest suffix_manifest = suffix_manifest_;
+ auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
+ suffix_strategy_, start_offset);
std::shared_ptr<utils::Identity> identity;
getSocketOption(GeneralTransportOptions::IDENTITY, identity);
@@ -169,19 +168,16 @@ uint32_t ProducerSocket::produce(Name content_name,
std::size_t header_size;
std::size_t manifest_header_size = 0;
std::size_t signature_length = 0;
- std::uint32_t final_block_number = 0;
+ std::uint32_t final_block_number = start_offset;
uint64_t free_space_for_content = 0;
core::Packet::Format format;
std::shared_ptr<ContentObjectManifest> manifest;
bool is_last_manifest = false;
- suffix_content.updateSuffix(start_offset);
- suffix_content.setUsingManifest(making_manifest);
// TODO Manifest may still be used for indexing
if (making_manifest && !identity) {
- throw errors::RuntimeException(
- "Making manifests without setting producer identity. Aborting.");
+ TRANSPORT_LOGD("Making manifests without setting producer identity.");
}
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
@@ -200,9 +196,9 @@ uint32_t ProducerSocket::produce(Name content_name,
format = hf_format;
if (making_manifest) {
- format = hf_format;
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- hf_format_ah, identity->getSignatureLength());
+ identity ? hf_format_ah : hf_format,
+ identity ? identity->getSignatureLength() : 0);
} else if (identity) {
format = hf_format_ah;
signature_length = identity->getSignatureLength();
@@ -225,28 +221,20 @@ uint32_t ProducerSocket::produce(Name content_name,
1.0);
uint32_t number_of_manifests = static_cast<uint32_t>(
std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number = number_of_segments + number_of_manifests - 1;
-
- suffix_manifest.updateSuffix(start_offset);
- suffix_manifest.setNbSegments(segment_in_manifest);
- suffix_content.updateSuffix(start_offset + 1);
- suffix_content.setNbSegments(segment_in_manifest);
+ final_block_number += number_of_segments + number_of_manifests - 1;
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_manifest.getSuffix()),
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name,
- core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity->getSignatureLength()));
+ hash_algo, is_last_manifest, content_name, suffix_strategy_,
+ identity ? identity->getSignatureLength() : 0));
manifest->setLifetime(content_object_expiry_time);
- suffix_manifest++;
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
} else {
- manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
+ manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
}
-
}
for (unsigned int packaged_segments = 0;
@@ -256,7 +244,12 @@ uint32_t ProducerSocket::produce(Name content_name,
data_packet_size - manifest_header_size) {
// Send the current manifest
manifest->encode();
- identity->getSigner().sign(*manifest);
+
+ // If identity set, sign manifest
+ if (identity) {
+ identity->getSigner().sign(*manifest);
+ }
+
passContentObjectToCallbacks(manifest);
// Send content objects stored in the queue
@@ -269,25 +262,22 @@ uint32_t ProducerSocket::produce(Name content_name,
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_manifest.getSuffix()),
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1,
core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity->getSignatureLength()));
- manifest->setLifetime(content_object_expiry_time);
+ content_name, suffix_strategy_,
+ identity ? identity->getSignatureLength() : 0));
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
- }
-
- suffix_manifest++;
+ manifest->setLifetime(content_object_expiry_time);
+ manifest->setFinalBlockNumber(
+ is_last ? final_block_number
+ : utils::SuffixStrategy::INVALID_SUFFIX);
}
}
+ auto content_suffix = suffix_strategy->getNextContentSuffix();
auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(suffix_content.getSuffix()), format);
+ content_name.setSuffix(content_suffix), format);
content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
@@ -314,7 +304,7 @@ uint32_t ProducerSocket::produce(Name content_name,
if (making_manifest) {
using namespace std::chrono_literals;
utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(suffix_content.getSuffix(), hash);
+ manifest->addSuffixHash(content_suffix, hash);
content_queue_.push(content_object);
} else {
if (identity) {
@@ -322,8 +312,6 @@ uint32_t ProducerSocket::produce(Name content_name,
}
passContentObjectToCallbacks(content_object);
}
-
- suffix_content++;
}
if (making_manifest) {
@@ -332,7 +320,10 @@ uint32_t ProducerSocket::produce(Name content_name,
}
manifest->encode();
- identity->getSigner().sign(*manifest);
+ if (identity) {
+ identity->getSigner().sign(*manifest);
+ }
+
passContentObjectToCallbacks(manifest);
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
@@ -347,7 +338,7 @@ uint32_t ProducerSocket::produce(Name content_name,
});
}
- return suffix_content.getSuffix() - start_offset;
+ return suffix_strategy->getTotalCount();
}
void ProducerSocket::asyncProduce(ContentObject &content_object) {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 83d0f73f3..ff6f49723 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -193,8 +193,7 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic<utils::CryptoSuite> crypto_suite_;
utils::SpinLock identity_lock_;
std::shared_ptr<utils::Identity> identity_;
- utils::SuffixManifest suffix_manifest_;
- utils::SuffixContent suffix_content_;
+ core::NextSegmentCalculationStrategy suffix_strategy_;
// While manifests are being built, contents are stored in a queue
std::queue<std::shared_ptr<ContentObject>> content_queue_;
diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h
new file mode 100644
index 000000000..cb5140ac1
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/verification_policy.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace transport {
+namespace interface {
+
+/**
+ * This policy allows the application to tell the transport what to do in case
+ * the verification of a content object fails.
+ */
+enum class VerificationPolicy : std::uint8_t {
+ DROP_PACKET,
+ ACCEPT_PACKET,
+ ABORT_SESSION
+};
+} // namespace interface
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
index 23aeca9bf..06515e0e2 100644
--- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
@@ -14,8 +14,12 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h
${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h
${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/statistics.h
@@ -27,11 +31,18 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
)
list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc
${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc
${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
@@ -39,7 +50,8 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
)
set(RAAQM_CONFIG_INSTALL_PREFIX
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
new file mode 100644
index 000000000..2f1e5d8fd
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/byte_stream_reassembly.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/errors.h>
+#include <hicn/transport/protocols/indexer.h>
+#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
+
+namespace transport {
+
+namespace protocol {
+
+ByteStreamReassembly::ByteStreamReassembly(
+ interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : Reassembly(icn_socket, transport_protocol),
+ index_(IndexManager::invalid_index),
+ download_complete_(false) {}
+
+void ByteStreamReassembly::reassemble(
+ std::unique_ptr<ContentObjectManifest> &&manifest) {
+ if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
+ received_packets_.emplace(
+ std::make_pair(manifest->getName().getSuffix(), nullptr));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ received_packets_.emplace(std::make_pair(
+ content_object->getName().getSuffix(), std::move(content_object)));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::assembleContent() {
+ if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) {
+ index_ = index_manager_->getNextReassemblySegment();
+ if (index_ == IndexManager::invalid_index) {
+ return;
+ }
+ }
+
+ auto it = received_packets_.find((const unsigned int)index_);
+ while (it != received_packets_.end()) {
+ // Check if valid packet
+ if (it->second) {
+ copyContent(*it->second);
+ }
+
+ received_packets_.erase(it);
+ index_ = index_manager_->getNextReassemblySegment();
+ it = received_packets_.find((const unsigned int)index_);
+ }
+
+ if (!download_complete_ && index_ != IndexManager::invalid_index) {
+ transport_protocol_->onReassemblyFailed(index_);
+ }
+}
+
+void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+ auto a = content_object.getPayload();
+ auto payload_length = a->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
+
+ download_complete_ =
+ index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ notifyApplication();
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::success));
+ }
+}
+
+void ByteStreamReassembly::reInitialize() {
+ index_ = IndexManager::invalid_index;
+ download_complete_ = false;
+
+ received_packets_.clear();
+
+ // reset read buffer
+ interface::ConsumerSocket::ReadCallback *read_callback;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
new file mode 100644
index 000000000..7c77d486f
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class ByteStreamReassembly : public Reassembly {
+ public:
+ ByteStreamReassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ protected:
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
+
+ virtual void copyContent(const core::ContentObject &content_object);
+
+ virtual void reInitialize() override;
+
+ private:
+ void assembleContent();
+
+ protected:
+ // The consumer socket
+ // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
+ // std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
+ // IndexVerificationManager *index_manager_;
+ std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_;
+ uint32_t index_;
+ bool download_complete_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h
new file mode 100644
index 000000000..8975c2b4a
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/data_processing_events.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+namespace transport {
+namespace protocol {
+
+class ContentObjectProcessingEventCallback {
+ public:
+ virtual ~ContentObjectProcessingEventCallback() = default;
+ virtual void onPacketDropped(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+};
+
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
new file mode 100644
index 000000000..7b01ad4bc
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/datagram_reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket,
+ TransportProtocol* transport_protocol)
+ : Reassembly(icn_socket, transport_protocol) {}
+
+void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) {
+ read_buffer_ = content_object->getPayload();
+ Reassembly::notifyApplication();
+}
+
+void DatagramReassembly::reInitialize() {}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
new file mode 100644
index 000000000..923b6f2c1
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class DatagramReassembly : public Reassembly {
+ public:
+ DatagramReassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+ virtual void reInitialize() override;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
+ return;
+ }
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc
new file mode 100644
index 000000000..c2249ed4a
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/errors.cc
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/errors.h>
+
+namespace transport {
+namespace protocol {
+
+const std::error_category& protocol_category() {
+ static protocol_category_impl instance;
+
+ return instance;
+}
+
+const char* protocol_category_impl::name() const throw() {
+ return "transport::protocol::error";
+}
+
+std::string protocol_category_impl::message(int ev) const {
+ switch (static_cast<protocol_error>(ev)) {
+ case protocol_error::success: {
+ return "Success";
+ }
+ case protocol_error::signature_verification_failed: {
+ return "Signature verification failed.";
+ }
+ case protocol_error::integrity_verification_failed: {
+ return "Integrity verification failed";
+ }
+ case protocol_error::no_verifier_provided: {
+ return "Transport cannot get any verifier for the given data.";
+ }
+ case protocol_error::io_error: {
+ return "Conectivity error between transport and local forwarder";
+ }
+ case protocol_error::max_retransmissions_error: {
+ return "Transport protocol reached max number of retransmissions allowed "
+ "for the same interest.";
+ }
+ case protocol_error::session_aborted: {
+ return "The session has been aborted by the application.";
+ }
+ default: { return "Unknown protocol error"; }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h
new file mode 100644
index 000000000..cb3d3474e
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/errors.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+
+namespace transport {
+namespace protocol {
+
+/**
+ * @brief Get the default server error category.
+ * @return The default server error category instance.
+ *
+ * @warning The first call to this function is thread-safe only starting with
+ * C++11.
+ */
+const std::error_category& protocol_category();
+
+/**
+ * The list of errors.
+ */
+enum class protocol_error {
+ success = 0,
+ signature_verification_failed,
+ integrity_verification_failed,
+ no_verifier_provided,
+ io_error,
+ max_retransmissions_error,
+ session_aborted,
+};
+
+/**
+ * @brief Create an error_code instance for the given error.
+ * @param error The error.
+ * @return The error_code instance.
+ */
+inline std::error_code make_error_code(protocol_error error) {
+ return std::error_code(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief Create an error_condition instance for the given error.
+ * @param error The error.
+ * @return The error_condition instance.
+ */
+inline std::error_condition make_error_condition(protocol_error error) {
+ return std::error_condition(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief A server error category.
+ */
+class protocol_category_impl : public std::error_category {
+ public:
+ /**
+ * @brief Get the name of the category.
+ * @return The name of the category.
+ */
+ virtual const char* name() const throw();
+
+ /**
+ * @brief Get the error message for a given error.
+ * @param ev The error numeric value.
+ * @return The message associated to the error.
+ */
+ virtual std::string message(int ev) const;
+};
+} // namespace protocol
+} // namespace transport
+
+namespace std {
+// namespace system {
+template <>
+struct is_error_code_enum<::transport::protocol::protocol_error>
+ : public std::true_type {};
+// } // namespace system
+} // namespace std \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
new file mode 100644
index 000000000..5a8046daa
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/incremental_indexer.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+namespace protocol {
+
+void IncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ using namespace interface;
+
+ if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
+ final_suffix_ = content_object->getName().getSuffix();
+ }
+
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(interest),
+ std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
index b6b8bb4a6..ea84d645a 100644
--- a/libtransport/src/hicn/transport/protocols/indexing_manager.h
+++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
@@ -15,9 +15,11 @@
#pragma once
+#include <hicn/transport/protocols/indexer.h>
+
#include <hicn/transport/errors/runtime_exception.h>
#include <hicn/transport/errors/unexpected_manifest_exception.h>
-#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/verification_manager.h>
#include <hicn/transport/utils/literals.h>
@@ -25,74 +27,59 @@
namespace transport {
-namespace protocol {
-
-class IndexManager {
- public:
- static constexpr uint32_t invalid_index = ~0;
-
- /**
- *
- */
- virtual ~IndexManager() = default;
- /**
- * Retrieve from the manifest the next suffix to retrieve.
- */
- virtual uint32_t getNextSuffix() = 0;
-
- virtual void setFirstSuffix(uint32_t suffix) = 0;
-
- /**
- * Retrive the next segment to be reassembled.
- */
- virtual uint32_t getNextReassemblySegment() = 0;
-
- virtual bool isFinalSuffixDiscovered() = 0;
-
- virtual uint32_t getFinalSuffix() = 0;
-
- virtual void reset() = 0;
-};
+namespace interface {
+class ConsumerSocket;
+}
-class IndexVerificationManager : public IndexManager {
- public:
- /**
- *
- */
- virtual ~IndexVerificationManager() = default;
+namespace protocol {
- /**
- * The ownership of the ContentObjectManifest is moved
- * from the caller to the VerificationManager
- */
- virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0;
+class Reassembly;
+class TransportProtocol;
- /**
- * The content object must just be verified; the ownership is still of the
- * caller.
- */
- virtual bool onContentObject(const core::ContentObject &content_object) = 0;
-};
-
-class IncrementalIndexManager : public IndexVerificationManager {
+class IncrementalIndexer : public Indexer {
public:
- IncrementalIndexManager(interface::ConsumerSocket *icn_socket)
+ IncrementalIndexer(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
: socket_(icn_socket),
+ reassembly_(reassembly),
+ transport_protocol_(transport),
final_suffix_(std::numeric_limits<uint32_t>::max()),
+ first_suffix_(0),
next_download_suffix_(0),
next_reassembly_suffix_(0),
verification_manager_(
- std::make_unique<SignatureVerificationManager>(icn_socket)) {}
+ std::make_unique<SignatureVerificationManager>(icn_socket)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
+
+ IncrementalIndexer(const IncrementalIndexer &) = delete;
+
+ IncrementalIndexer(IncrementalIndexer &&other)
+ : socket_(other.socket_),
+ reassembly_(other.reassembly_),
+ transport_protocol_(other.transport_protocol_),
+ final_suffix_(other.final_suffix_),
+ first_suffix_(other.first_suffix_),
+ next_download_suffix_(other.next_download_suffix_),
+ next_reassembly_suffix_(other.next_reassembly_suffix_),
+ verification_manager_(std::move(other.verification_manager_)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
/**
*
*/
- virtual ~IncrementalIndexManager() {}
+ virtual ~IncrementalIndexer() {}
- TRANSPORT_ALWAYS_INLINE virtual void reset() override {
+ TRANSPORT_ALWAYS_INLINE virtual void reset(
+ std::uint32_t offset = 0) override {
final_suffix_ = std::numeric_limits<uint32_t>::max();
- next_download_suffix_ = first_suffix_;
- next_reassembly_suffix_ = 0;
+ next_download_suffix_ = offset;
+ next_reassembly_suffix_ = offset;
}
/**
@@ -125,24 +112,21 @@ class IncrementalIndexManager : public IndexVerificationManager {
return final_suffix_;
}
- TRANSPORT_ALWAYS_INLINE bool onManifest(
- core::ContentObject::Ptr &&content_object) override {
- throw errors::UnexpectedManifestException();
- }
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
- TRANSPORT_ALWAYS_INLINE bool onContentObject(
- const core::ContentObject &content_object) override {
- auto ret = verification_manager_->onPacketToVerify(content_object);
+ TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) {
+ reassembly_ = reassembly;
- if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) {
- final_suffix_ = content_object.getName().getSuffix();
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
}
-
- return ret;
}
protected:
interface::ConsumerSocket *socket_;
+ Reassembly *reassembly_;
+ TransportProtocol *transport_protocol_;
uint32_t final_suffix_;
uint32_t first_suffix_;
uint32_t next_download_suffix_;
diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc
new file mode 100644
index 000000000..c50c4236b
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/indexer.cc
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/indexer.h>
+
+#include <hicn/transport/protocols/incremental_indexer.h>
+#include <hicn/transport/protocols/manifest_incremental_indexer.h>
+#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+namespace transport {
+namespace protocol {
+
+IndexManager::IndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
+ : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport,
+ reassembly)),
+ first_segment_received_(false),
+ icn_socket_(icn_socket),
+ transport_(transport),
+ reassembly_(reassembly) {}
+
+void IndexManager::onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) {
+ if (first_segment_received_) {
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ } else {
+ std::uint32_t segment_number = interest->getName().getSuffix();
+
+ if (segment_number == 0) {
+ // Check if manifest
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ IncrementalIndexer *indexer =
+ static_cast<IncrementalIndexer *>(indexer_.release());
+ indexer_ =
+ std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer));
+ delete indexer;
+ }
+
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ auto it = interest_data_set_.begin();
+ while (it != interest_data_set_.end()) {
+ indexer_->onContentObject(std::move(const_cast<core::Interest::Ptr &&>(it->first)), std::move(const_cast<core::ContentObject::Ptr &&>(it->second)));
+ it = interest_data_set_.erase(it);
+ }
+
+ first_segment_received_ = true;
+ } else {
+ interest_data_set_.emplace(std::move(interest), std::move(content_object));
+ }
+ }
+}
+
+void IndexManager::reset(std::uint32_t offset) {
+ indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
+ reassembly_);
+ first_segment_received_ = false;
+ interest_data_set_.clear();
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h
new file mode 100644
index 000000000..89751095e
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/indexer.h
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+#include <set>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+class Reassembly;
+class TransportProtocol;
+
+class Indexer {
+ public:
+ /**
+ *
+ */
+ virtual ~Indexer() = default;
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ virtual uint32_t getNextSuffix() = 0;
+
+ virtual void setFirstSuffix(uint32_t suffix) = 0;
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ virtual uint32_t getNextReassemblySegment() = 0;
+
+ virtual bool isFinalSuffixDiscovered() = 0;
+
+ virtual uint32_t getFinalSuffix() = 0;
+
+ virtual void reset(std::uint32_t offset = 0) = 0;
+
+ virtual void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) = 0;
+};
+
+class IndexManager : Indexer {
+ public:
+ static constexpr uint32_t invalid_index = ~0;
+
+ IndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly);
+
+ uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); }
+
+ void setFirstSuffix(uint32_t suffix) override {
+ indexer_->setFirstSuffix(suffix);
+ }
+
+ uint32_t getNextReassemblySegment() override {
+ return indexer_->getNextReassemblySegment();
+ }
+
+ bool isFinalSuffixDiscovered() override {
+ return indexer_->isFinalSuffixDiscovered();
+ }
+
+ uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); }
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ private:
+ std::unique_ptr<Indexer> indexer_;
+ bool first_segment_received_;
+ std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ interest_data_set_;
+ interface::ConsumerSocket *icn_socket_;
+ TransportProtocol *transport_;
+ Reassembly *reassembly_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
new file mode 100644
index 000000000..592daa4d4
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
@@ -0,0 +1,232 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_incremental_indexer.h>
+
+#include <cmath>
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ManifestIncrementalIndexer::ManifestIncrementalIndexer(
+ interface::ConsumerSocket *icn_socket, TransportProtocol *transport,
+ Reassembly *reassembly)
+ : IncrementalIndexer(icn_socket, transport, reassembly),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ NextSegmentCalculationStrategy::INCREMENTAL,
+ next_download_suffix_, 0)) {}
+
+void ManifestIncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ // Check if mainfiest or not
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ onUntrustedManifest(std::move(interest), std::move(content_object));
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ onUntrustedContentObject(std::move(interest), std::move(content_object));
+ }
+}
+
+void ManifestIncrementalIndexer::onUntrustedManifest(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ processTrustedManifest(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET:
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+void ManifestIncrementalIndexer::processTrustedManifest(
+ ContentObject::Ptr &&content_object) {
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(*content_object));
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
+ core::ManifestVersion::VERSION_1)) {
+ throw errors::RuntimeException("Received manifest with unknown version.");
+ }
+
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = manifest->getSuffixList().end();
+
+ suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber());
+
+ for (; _it != _end; _it++) {
+ auto hash =
+ std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+
+ if (!checkUnverifiedSegments(_it->first, hash)) {
+ suffix_hash_map_[_it->first] = std::move(hash);
+ }
+ }
+
+ reassembly_->reassemble(std::move(manifest));
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+}
+
+bool ManifestIncrementalIndexer::checkUnverifiedSegments(
+ std::uint32_t suffix, const HashEntry &hash) {
+ auto it = unverified_segments_.find(suffix);
+
+ if (it != unverified_segments_.end()) {
+ auto ret = verifyContentObject(hash, *it->second.second);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(it->second.first),
+ std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+
+ unverified_segments_.erase(it);
+ return true;
+ }
+
+ return false;
+}
+
+VerificationPolicy ManifestIncrementalIndexer::verifyContentObject(
+ const HashEntry &manifest_hash, const ContentObject &content_object) {
+ VerificationPolicy ret;
+
+ auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second);
+ auto data_packet_digest = content_object.computeDigest(manifest_hash.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(
+ data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ } else {
+ ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+ ret = (*verification_failed_callback)(
+ *socket_, content_object,
+ make_error_code(protocol_error::integrity_verification_failed));
+ }
+
+ return ret;
+}
+
+void ManifestIncrementalIndexer::onUntrustedContentObject(
+ Interest::Ptr &&i, ContentObject::Ptr &&c) {
+ auto suffix = c->getName().getSuffix();
+ auto it = suffix_hash_map_.find(suffix);
+
+ if (it != suffix_hash_map_.end()) {
+ auto ret = verifyContentObject(it->second, *c);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ suffix_hash_map_.erase(it);
+ reassembly_->reassemble(std::move(c));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(i), std::move(c));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+ } else {
+ unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c));
+ }
+}
+
+uint32_t ManifestIncrementalIndexer::getNextSuffix() {
+ auto ret = suffix_strategy_->getNextSuffix();
+
+ if (ret <= suffix_strategy_->getFinalSuffix() &&
+ ret != utils::SuffixStrategy::INVALID_SUFFIX) {
+ suffix_queue_.push(ret);
+ return ret;
+ }
+
+ return IndexManager::invalid_index;
+}
+
+uint32_t ManifestIncrementalIndexer::getFinalSuffix() {
+ return suffix_strategy_->getFinalSuffix();
+}
+
+bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() {
+ return IncrementalIndexer::isFinalSuffixDiscovered();
+}
+
+uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() {
+ if (suffix_queue_.empty()) {
+ return IndexManager::invalid_index;
+ }
+
+ auto ret = suffix_queue_.front();
+ suffix_queue_.pop();
+ return ret;
+}
+
+void ManifestIncrementalIndexer::reset(std::uint32_t offset) {
+ IncrementalIndexer::reset(offset);
+ suffix_hash_map_.clear();
+ unverified_segments_.clear();
+ SuffixQueue empty;
+ std::swap(suffix_queue_, empty);
+ suffix_strategy_->reset(offset);
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
new file mode 100644
index 000000000..6e991f86f
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/incremental_indexer.h>
+#include <hicn/transport/utils/suffix_strategy.h>
+
+#include <list>
+
+namespace transport {
+
+namespace protocol {
+
+class ManifestIncrementalIndexer : public IncrementalIndexer {
+ static constexpr double alpha = 0.3;
+
+ public:
+ using SuffixQueue = std::queue<uint32_t>;
+ using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
+
+ ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly);
+
+ ManifestIncrementalIndexer(IncrementalIndexer &&indexer)
+ : IncrementalIndexer(std::move(indexer)),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ core::NextSegmentCalculationStrategy::INCREMENTAL,
+ next_download_suffix_, 0)) {
+ for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) {
+ suffix_queue_.push(i);
+ }
+ }
+
+ virtual ~ManifestIncrementalIndexer() = default;
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ uint32_t getNextSuffix() override;
+
+ uint32_t getNextReassemblySegment() override;
+
+ bool isFinalSuffixDiscovered() override;
+
+ uint32_t getFinalSuffix() override;
+
+ private:
+ void onUntrustedManifest(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void onUntrustedContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void processTrustedManifest(core::ContentObject::Ptr &&content_object);
+ void onManifestReceived(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c);
+ void onManifestTimeout(core::Interest::Ptr &&i);
+ VerificationPolicy verifyContentObject(
+ const HashEntry &manifest_hash,
+ const core::ContentObject &content_object);
+ bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash);
+
+ protected:
+ std::unique_ptr<utils::SuffixStrategy> suffix_strategy_;
+ SuffixQueue suffix_queue_;
+
+ // Hash verification
+ std::unordered_map<uint32_t, HashEntry> suffix_hash_map_;
+
+ std::unordered_map<uint32_t,
+ std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ unverified_segments_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
deleted file mode 100644
index ea13bf9e6..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
-
-#include <cmath>
-#include <deque>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-ManifestIndexManager::ManifestIndexManager(
- interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
- : IncrementalIndexManager(icn_socket),
- PacketManager<Interest>(1024),
- next_to_retrieve_segment_(suffix_queue_.end()),
- suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
- next_reassembly_segment_(
- core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true),
- ignored_segments_(),
- next_interest_(next_interest) {}
-
-bool ManifestIndexManager::onManifest(
- core::ContentObject::Ptr &&content_object) {
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(*content_object));
- bool manifest_verified = verification_manager_->onPacketToVerify(*manifest);
-
- if (manifest_verified) {
- manifest->decode();
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
- core::ManifestVersion::VERSION_1)) {
- throw errors::RuntimeException("Received manifest with unknown version.");
- }
-
- switch (manifest->getManifestType()) {
- case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = manifest->getSuffixList().end();
- size_t nb_segments = std::distance(_it, _end);
- final_suffix_ = manifest->getFinalBlockNumber(); // final block number
-
- suffix_hash_map_[_it->first] =
- std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
-
- // If the transport protocol finished the list of segments to retrieve,
- // reset the next_to_retrieve_segment_ iterator to the next segment
- // provided by this manifest.
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- next_to_retrieve_segment_ = --suffix_queue_.end();
- }
-
- std::advance(_it, 1);
- for (; _it != _end; _it++) {
- suffix_hash_map_[_it->first] = std::make_pair(
- std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
- core::NextSegmentCalculationStrategy strategy =
- manifest->getNextSegmentCalculationStrategy();
-
- suffix_manifest_.reset(0);
- suffix_manifest_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- TRANSPORT_LOGD("Capacity of 1st manifest %zu",
- suffix_manifest_.getNbSegments());
-
- next_reassembly_segment_.reset(*suffix_queue_.begin());
- next_reassembly_segment_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- }
-
- // If the manifest is not full, we add the suffixes of missing segments
- // to the list of segments to ignore when computing the next reassembly
- // index.
- if (TRANSPORT_EXPECT_FALSE(
- suffix_manifest_.getNbSegments() - nb_segments > 0)) {
- auto start = manifest->getSuffixList().begin();
- auto last = --_end;
- for (uint32_t i = last->first + 1;
- i < start->first + suffix_manifest_.getNbSegments(); i++) {
- ignored_segments_.push_back(i);
- }
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) {
- fillWindow(manifest->getWritableName(),
- manifest->getName().getSuffix());
- }
-
- break;
- }
- case core::ManifestType::FLIC_MANIFEST: {
- throw errors::NotImplementedException();
- }
- case core::ManifestType::FINAL_CHUNK_NUMBER: {
- throw errors::NotImplementedException();
- }
- }
- }
-
- return manifest_verified;
-}
-
-void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
- ContentObject::Ptr &&c) {
- onManifest(std::move(c));
- if (next_interest_) {
- next_interest_->scheduleNextInterests();
- }
-}
-
-void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
- const Name &n = i->getName();
- uint32_t segment = n.getSuffix();
-
- if (segment > final_suffix_) {
- return;
- }
-
- // Get portal
- std::shared_ptr<interface::BasePortal> portal;
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
-
- // Send requests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(i),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
-}
-
-void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) {
- /* Send as many manifest as required for filling window. */
- uint32_t interest_lifetime;
- double window_size;
- std::shared_ptr<interface::BasePortal> portal;
- Interest::Ptr interest;
- uint32_t current_segment = *next_to_retrieve_segment_;
- // suffix_manifest_ now points to the next manifest to request
- uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix();
-
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
- socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- window_size);
-
- if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- if (current_segment + window_size < suffix_manifest_.getSuffix() &&
- current_manifest != last_requested_manifest) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- do {
- interest = getPacket();
- name.setSuffix(suffix_manifest_.getSuffix());
- interest->setName(name);
- interest->setLifetime(interest_lifetime);
-
- // Send interests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(interest),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
-
- last_requested_manifest = (suffix_manifest_++).getSuffix();
- } while (current_segment + window_size >= suffix_manifest_.getSuffix() &&
- suffix_manifest_.getSuffix() < final_suffix_);
-
- // suffix_manifest_ now points to the last requested manifest
- suffix_manifest_.updateSuffix(last_requested_manifest);
-}
-
-bool ManifestIndexManager::onContentObject(
- const core::ContentObject &content_object) {
- bool verify_signature;
- socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
-
- if (!verify_signature) {
- return true;
- }
-
- uint64_t segment = content_object.getName().getSuffix();
-
- bool ret = false;
-
- auto it = suffix_hash_map_.find((const unsigned int)segment);
- if (it != suffix_hash_map_.end()) {
- auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
- auto data_packet_digest = content_object.computeDigest(it->second.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
-
- if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
- manifest_digest_bytes.data(),
- hash_type)) {
- suffix_hash_map_.erase(it);
- ret = true;
- } else {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
- }
-
- return ret;
-}
-
-uint32_t ManifestIndexManager::getNextSuffix() {
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- return invalid_index;
- }
-
- return *next_to_retrieve_segment_++;
-}
-
-uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; }
-
-bool ManifestIndexManager::isFinalSuffixDiscovered() {
- return IncrementalIndexManager::isFinalSuffixDiscovered();
-}
-
-uint32_t ManifestIndexManager::getNextReassemblySegment() {
- uint32_t current_reassembly_segment;
-
- while (true) {
- current_reassembly_segment = next_reassembly_segment_.getSuffix();
- next_reassembly_segment_++;
-
- if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) {
- return invalid_index;
- }
-
- if (ignored_segments_.empty()) break;
-
- auto is_ignored =
- std::find(ignored_segments_.begin(), ignored_segments_.end(),
- current_reassembly_segment);
-
- if (is_ignored == ignored_segments_.end()) break;
-
- ignored_segments_.erase(is_ignored);
- }
-
- return current_reassembly_segment;
-}
-
-void ManifestIndexManager::reset() {
- IncrementalIndexManager::reset();
- suffix_manifest_.reset(0);
- suffix_queue_.clear();
- suffix_hash_map_.clear();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
deleted file mode 100644
index 645b20e9a..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/protocols/indexing_manager.h>
-#include <hicn/transport/utils/suffix_strategy.h>
-
-#include <list>
-
-namespace transport {
-
-namespace protocol {
-
-class ManifestIndexManager : public IncrementalIndexManager,
- public PacketManager<Interest> {
- static constexpr double alpha = 0.3;
-
- public:
- using SuffixQueue = std::list<uint32_t>;
- using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
-
- ManifestIndexManager(interface::ConsumerSocket *icn_socket,
- TransportProtocol *next_interest);
-
- virtual ~ManifestIndexManager() = default;
-
- void reset() override;
-
- bool onManifest(core::ContentObject::Ptr &&content_object) override;
-
- bool onContentObject(const core::ContentObject &content_object) override;
-
- uint32_t getNextSuffix() override;
-
- uint32_t getNextReassemblySegment() override;
-
- bool isFinalSuffixDiscovered() override;
-
- uint32_t getFinalSuffix() override;
-
- private:
- void onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c);
- void onManifestTimeout(Interest::Ptr &&i);
- void fillWindow(Name &name, uint32_t current_manifest);
-
- protected:
- SuffixQueue suffix_queue_;
- SuffixQueue::iterator next_to_retrieve_segment_;
- utils::SuffixManifest suffix_manifest_;
- utils::SuffixContent next_reassembly_segment_;
-
- // Holds segments that should not be requested. Useful when
- // computing the next reassembly segment because some manifests
- // may be incomplete.
- std::vector<uint32_t> ignored_segments_;
-
- // Hash verification
- std::unordered_map<uint32_t,
- std::pair<std::vector<uint8_t>, core::HashAlgorithm>>
- suffix_hash_map_;
-
- // (temporary) To call scheduleNextInterests() after receiving a manifest
- TransportProtocol *next_interest_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
index 8da9529d6..a0f847453 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -22,9 +22,16 @@ namespace protocol {
using namespace interface;
-TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket)
- : socket_(icn_socket), is_running_(false), is_first_(false) {
+TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol)
+ : socket_(icn_socket),
+ reassembly_protocol_(reassembly_protocol),
+ index_manager_(
+ std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
+ is_running_(false),
+ is_first_(false) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+ socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
int TransportProtocol::start() {
@@ -71,6 +78,26 @@ void TransportProtocol::resume() {
is_running_ = false;
}
+void TransportProtocol::onContentReassembled(std::error_code ec) {
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (!on_payload) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_->getBytesRecv());
+ } else {
+ on_payload->readError(ec);
+ }
+
+ stop();
+}
+
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index e4821b6a0..87fab588b 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -18,7 +18,10 @@
#include <atomic>
#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/data_processing_events.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/packet_manager.h>
+#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -28,6 +31,8 @@ namespace protocol {
using namespace core;
+class IndexVerificationManager;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -35,11 +40,15 @@ class TransportProtocolCallback {
};
class TransportProtocol : public interface::BasePortal::ConsumerCallback,
- public PacketManager<Interest> {
+ public PacketManager<Interest>,
+ public ContentObjectProcessingEventCallback {
static constexpr std::size_t interest_pool_size = 4096;
+ friend class ManifestIndexManager;
+
public:
- TransportProtocol(interface::ConsumerSocket *icn_socket);
+ TransportProtocol(interface::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol);
virtual ~TransportProtocol() = default;
@@ -53,6 +62,12 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
virtual void scheduleNextInterests() = 0;
+ // Events generated by the indexing
+ virtual void onContentReassembled(std::error_code ec);
+ virtual void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+
protected:
// Consumer Callback
virtual void reset() = 0;
@@ -61,13 +76,14 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
protected:
interface::ConsumerSocket *socket_;
+ std::unique_ptr<Reassembly> reassembly_protocol_;
+ std::unique_ptr<IndexManager> index_manager_;
std::shared_ptr<interface::BasePortal> portal_;
std::atomic<bool> is_running_;
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
- TransportStatistics stats_;
+ TransportStatistics *stats_;
};
} // end namespace protocol
-
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index a57eb7cd9..641ae45c3 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -14,8 +14,9 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/raaqm.h>
+#include <hicn/transport/protocols/errors.h>
#include <cstdlib>
#include <fstream>
@@ -26,9 +27,8 @@ namespace protocol {
using namespace interface;
-RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
- BaseReassembly(icnet_socket, this, this),
+RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
@@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() {
// Set first segment to retrieve
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ index_manager_->reset();
index_manager_->setFirstSuffix(name->getSuffix());
std::queue<Interest::Ptr> empty;
std::swap(interest_to_retransmit_, empty);
- stats_.reset();
+ stats_->reset();
// Reset reassembly component
- BaseReassembly::reset();
+ reassembly_protocol_->reInitialize();
// Reset protocol variables
interests_in_flight_ = 0;
@@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() {
void RaaqmTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
-
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject(
(*callback_interest)(*socket_, *interest);
}
- if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
- PayloadType::MANIFEST)) {
- if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
- index_manager_ = manifest_index_manager_.get();
- interests_in_flight_--;
- }
-
- index_manager_->onManifest(std::move(content_object));
-
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- onContentSegment(std::move(interest), std::move(content_object));
+ if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ stats_->updateBytesRecv(content_object->payloadSize());
}
+ onContentSegment(std::move(interest), std::move(content_object));
scheduleNextInterests();
}
void RaaqmTransportProtocol::onContentSegment(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
uint32_t incremental_suffix = content_object->getName().getSuffix();
- bool virtual_download = false;
- socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download);
// Decrease in-flight interests
interests_in_flight_--;
@@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment(
afterContentReception(*interest, *content_object);
}
- if (index_manager_->onContentObject(*content_object)) {
- stats_.updateBytesRecv(content_object->payloadSize());
+ index_manager_->onContentObject(std::move(interest),
+ std::move(content_object));
+}
- if (!virtual_download) {
- reassemble(std::move(content_object));
- } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
- index_manager_->getFinalSuffix())) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
+void RaaqmTransportProtocol::onPacketDropped(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- if (on_payload) {
- on_payload->readSuccess(stats_.getBytesRecv());
- }
+ uint64_t segment = interest->getName().getSuffix();
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_->updateRetxCount(1);
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
}
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
} else {
- // TODO Application policy check
- // unverified_segments_.emplace(
- // std::make_pair(incremental_suffix, std::move(content_object)));
- TRANSPORT_LOGE("Received not trusted segment.");
+ TRANSPORT_LOGE(
+ "Stop: received not trusted packet %llu times",
+ (unsigned long long)interest_retransmissions_[segment & mask]);
+ onContentReassembled(
+ make_error_code(protocol_error::max_retransmissions_error));
}
}
+void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
+
+}
+
void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
checkForStalePaths();
@@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
uint64_t segment = n.getSuffix();
// Do not retransmit interests asking contents that do not exist.
- if (segment >= index_manager_->getFinalSuffix()) {
+ if (segment > index_manager_->getFinalSuffix()) {
return;
}
@@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
- stats_.updateRetxCount(1);
+ stats_->updateRetxCount(1);
callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
@@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before "
- "starting "
- "the content retrieval.");
- }
-
- if (!ec) {
- on_payload->readSuccess(stats_.getBytesRecv());
- } else {
- on_payload->readError(ec);
- }
-
rate_estimator_->onDownloadFinished();
- stop();
+ TransportProtocol::onContentReassembled(ec);
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
@@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() {
// Change drop probability according to RTT statistics
cur_path_->updateDropProb();
- double coin = ((double) rand() / (RAND_MAX));
+ double coin = ((double)rand() / (RAND_MAX));
if (coin <= cur_path_->getDropProb()) {
decreaseWindow();
}
@@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() {
void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
utils::TimePoint &now) {
// Update RTT statistics
- stats_.updateAverageRtt(rtt);
- stats_.updateAverageWindowSize(current_window_size_);
+ stats_->updateAverageRtt(rtt);
+ stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
ConsumerTimerCallback *stats_callback = VOID_HANDLER;
@@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_, stats_);
+ (*stats_callback)(*socket_, *stats_);
t0_ = utils::SteadyClock::now();
}
}
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
index 09d22cd4f..7fc540c9f 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.h
+++ b/libtransport/src/hicn/transport/protocols/raaqm.h
@@ -15,11 +15,11 @@
#pragma once
+#include <hicn/transport/protocols/byte_stream_reassembly.h>
#include <hicn/transport/protocols/congestion_window_protocol.h>
#include <hicn/transport/protocols/protocol.h>
#include <hicn/transport/protocols/raaqm_data_path.h>
#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <queue>
@@ -29,11 +29,8 @@ namespace transport {
namespace protocol {
-class RaaqmTransportProtocol
- : public TransportProtocol,
- public BaseReassembly,
- public CWindowProtocol,
- public BaseReassembly::ContentReassembledCallback {
+class RaaqmTransportProtocol : public TransportProtocol,
+ public CWindowProtocol {
public:
RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket);
@@ -70,6 +67,11 @@ class RaaqmTransportProtocol
void onContentSegment(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object);
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override;
+
+ void onReassemblyFailed(std::uint32_t missing_segment) override;
+
void onTimeout(Interest::Ptr &&i) override;
virtual void scheduleNextInterests() override;
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index c45d876a0..9682d338d 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -14,7 +14,8 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/protocols/errors.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
@@ -23,66 +24,7 @@ namespace transport {
namespace protocol {
-BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback,
- TransportProtocol *next_interest)
- : reassembly_consumer_socket_(icn_socket),
- incremental_index_manager_(
- std::make_unique<IncrementalIndexManager>(icn_socket)),
- manifest_index_manager_(
- std::make_unique<ManifestIndexManager>(icn_socket, next_interest)),
- index_manager_(incremental_index_manager_.get()),
- index_(0),
- read_buffer_(nullptr) {
- setContentCallback(content_callback);
-}
-
-void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
- }
-
- auto it = received_packets_.find((const unsigned int)index_);
- while (it != received_packets_.end()) {
- if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- copyContent(*it->second);
- received_packets_.erase(it);
- }
-
- index_ = index_manager_->getNextReassemblySegment();
- it = received_packets_.find((const unsigned int)index_);
- }
-}
-
-void BaseReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
-
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
- read_buffer_->append(write_size);
-
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
-
- bool download_completed =
- index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
-
- if (TRANSPORT_EXPECT_FALSE(download_completed)) {
- notifyApplication();
- content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
- }
-}
-
-void BaseReassembly::notifyApplication() {
+void Reassembly::notifyApplication() {
interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
reassembly_consumer_socket_->getSocketOption(
interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
@@ -124,21 +66,5 @@ void BaseReassembly::notifyApplication() {
}
}
-void BaseReassembly::reset() {
- manifest_index_manager_->reset();
- incremental_index_manager_->reset();
- index_ = index_manager_->getNextReassemblySegment();
-
- received_packets_.clear();
-
- // reset read buffer
- interface::ConsumerSocket::ReadCallback *read_callback;
- reassembly_consumer_socket_->getSocketOption(
- interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
-
- read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
-}
-
} // namespace protocol
-
} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index e859ca294..34af2a70a 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -15,17 +15,20 @@
#pragma once
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
+#include <hicn/transport/core/facade.h>
namespace transport {
namespace interface {
class ConsumerReadCallback;
-}
+class ConsumerSocket;
+} // namespace interface
namespace protocol {
+class TransportProtocol;
+class Indexer;
+
// Forward Declaration
class ManifestManager;
@@ -36,41 +39,26 @@ class Reassembly {
virtual void onContentReassembled(std::error_code ec) = 0;
};
- virtual void reassemble(ContentObject::Ptr &&content_object) = 0;
- virtual void reset() = 0;
- virtual void setContentCallback(ContentReassembledCallback *callback) {
- content_callback_ = callback;
- }
+ Reassembly(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : reassembly_consumer_socket_(icn_socket),
+ transport_protocol_(transport_protocol) {}
- protected:
- ContentReassembledCallback *content_callback_;
-};
+ virtual ~Reassembly() = default;
-class BaseReassembly : public Reassembly {
- public:
- BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback,
- TransportProtocol *next_interest);
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
+ virtual void reInitialize() = 0;
+ virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; }
protected:
- virtual void reassemble(ContentObject::Ptr &&content_object) override;
-
- virtual void copyContent(const ContentObject &content_object);
-
- virtual void reset() override;
-
- private:
- void notifyApplication();
+ virtual void notifyApplication();
protected:
- // The consumer socket
interface::ConsumerSocket *reassembly_consumer_socket_;
- std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
- std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
- IndexVerificationManager *index_manager_;
- std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_;
-
- uint32_t index_;
+ TransportProtocol *transport_protocol_;
+ Indexer *index_manager_;
std::unique_ptr<utils::MemBuf> read_buffer_;
};
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 559e86592..e371217f8 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -13,11 +13,12 @@
* limitations under the License.
*/
-#include <math.h>
-#include <random>
+#include <hicn/transport/protocols/rtc.h>
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/rtc.h>
+
+#include <math.h>
+#include <random>
namespace transport {
@@ -26,14 +27,16 @@ namespace protocol {
using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
- interface::ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
+ interface::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
- icnet_socket->getSocketOption(PORTAL, portal_);
+ icn_socket->getSocketOption(PORTAL, portal_);
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ sentinel_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
reset();
}
@@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) {
}
void RTCTransportProtocol::newRound() {
- round_timer_->expires_from_now(std::chrono::milliseconds(
- HICN_ROUND_LEN));
+ round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN));
round_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
updateStats(HICN_ROUND_LEN);
@@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
&stats_callback);
if (*stats_callback) {
// Send the stats to the app
- stats_.updateQueuingDelay(queuingDelay_);
- stats_.updateLossRatio(lossRate_);
- stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_, stats_);
+ stats_->updateQueuingDelay(queuingDelay_);
+ stats_->updateLossRatio(lossRate_);
+ stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
+ (*stats_callback)(*socket_, *stats_);
}
// bound also by interest lifitime* production rate
@@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
updateCCState();
updateWindow();
- if(queuingDelay_ > 25.0){
- //this indicates that the client will go soon out of synch,
- //switch to synch mode
+ if (queuingDelay_ > 25.0) {
+ // this indicates that the client will go soon out of synch,
+ // switch to synch mode
if (currentState_ == HICN_RTC_NORMAL_STATE) {
currentState_ = HICN_RTC_SYNC_STATE;
}
@@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
maxCWin_ = min(maxWaintingInterest, maxCWin_);
}
- if(maxCWin_ < HICN_MIN_CWIN)
- maxCWin_ = HICN_MIN_CWIN;
+ if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN;
}
void RTCTransportProtocol::updateWindow() {
@@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
}
-void RTCTransportProtocol::sentinelTimer(){
+void RTCTransportProtocol::sentinelTimer() {
uint32_t wait = 50;
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
- //we have all the info to set the timers
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) {
+ // we have all the info to set the timers
wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
- if(wait == 0)
- wait = 1;
+ if (wait == 0) wait = 1;
}
sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
sentinel_timer_->async_wait([this](std::error_code ec) {
-
if (ec) return;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){
- //we have no info, so we send again
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
- uint32_t pkt = it->first & modMask_;
- if (inflightInterests_[pkt].sequence == it->first) {
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
- }
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) {
+ // we have no info, so we send again
+
+ for (auto it = packets_in_window_.begin(); it != packets_in_window_.end();
+ it++) {
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
- }else{
- uint64_t max_waiting_time = //wait at least 50ms
- (pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt()) +
- (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
+ }
+ } else {
+ uint64_t max_waiting_time = // wait at least 50ms
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
- if((currentState_ == HICN_RTC_NORMAL_STATE) &&
+ if ((currentState_ == HICN_RTC_NORMAL_STATE) &&
(inflightInterestsCount_ >= currentCWin_) &&
- ((now - lastEvent_) > max_waiting_time) &&
- (lossRate_ >= 0.05)){
+ ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) {
+ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
- uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
+ for (auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++) {
uint32_t pkt = it->first & modMask_;
if (inflightInterests_[pkt].sequence == it->first &&
- ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
+ ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
}
}
@@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
// and over until we get at least a packet
inflightInterestsCount_--;
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
scheduleNextInterests();
return;
@@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
@@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject(
return;
}
- //check if the packet is a rtx
+ // check if the packet is a rtx
bool is_rtx = false;
- if(interestRetransmissions_.find(segmentNumber) !=
- interestRetransmissions_.end()){
+ if (interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end()) {
is_rtx = true;
- }else{
+ } else {
auto it_win = packets_in_window_.find(segmentNumber);
- if(it_win != packets_in_window_.end() &&
- it_win->second != 0)
- is_rtx = true;
+ if (it_win != packets_in_window_.end() && it_win->second != 0)
+ is_rtx = true;
}
if (payload_size == HICN_NACK_HEADER_SIZE) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
bool old_nack = false;
- if (!is_rtx){
+ if (!is_rtx) {
// this is not a retransmitted packet
old_nack = onNack(*content_object, false);
updateDelayStats(*content_object);
@@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject(
// the nacked_ state is used only to avoid to decrease
// inflightInterestsCount_ multiple times. In fact, every time that we
// receive an event related to an interest (timeout, nacked, content) we
- // cange the state. In this way we are sure that we do not decrease twice the
- // counter
+ // cange the state. In this way we are sure that we do not decrease twice
+ // the counter
if (old_nack) {
inflightInterests_[pkt].state = lost_;
interestRetransmissions_.erase(segmentNumber);
@@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject(
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--; // packet sent without timeouts
}
- if (inflightInterests_[pkt].state == sent_ && !is_rtx){
+ if (inflightInterests_[pkt].state == sent_ && !is_rtx) {
// delay stats are computed only for non retransmitted data
updateDelayStats(*content_object);
}
@@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject(
scheduleNextInterests();
}
-void RTCTransportProtocol::returnContentToApplication(
- const ContentObject &content_object) {
- // return content to the user
- auto read_buffer = content_object.getPayload();
-
- read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
-
- interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
- socket_->getSocketOption(READ_CALLBACK, &read_callback);
-
- if (read_callback == nullptr) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before starting "
- "the content retrieval.");
- }
-
- if (read_callback->isBufferMovable()) {
- read_callback->readBufferAvailable(
- utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
- } else {
- // The buffer will be copied into the application-provided buffer
- uint8_t *buffer;
- std::size_t length;
- std::size_t total_length = read_buffer->length();
-
- while (read_buffer->length()) {
- buffer = nullptr;
- length = 0;
- read_callback->getReadBuffer(&buffer, &length);
-
- if (!buffer || !length) {
- throw errors::RuntimeException(
- "Invalid buffer provided by the application.");
- }
-
- auto to_copy = std::min(read_buffer->length(), length);
-
- std::memcpy(buffer, read_buffer->data(), to_copy);
- read_buffer->trimStart(to_copy);
- }
-
- read_callback->readDataAvailable(total_length);
- read_buffer->clear();
- }
-}
-
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 2b9ed10a6..9e1731e96 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -15,12 +15,12 @@
#pragma once
-#include <queue>
#include <map>
+#include <queue>
#include <unordered_map>
+#include <hicn/transport/protocols/datagram_reassembly.h>
#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/rtc_data_path.h>
// algorithm state
@@ -35,26 +35,27 @@
#define HICN_TIMESTAMP_SIZE 8 // bytes
#define HICN_RTC_INTEREST_LIFETIME 1000 // ms
-//rtt measurement
-//normal interests for data goes from 0 to
-//HICN_MIN_PROBE_SEQ, the rest is reserverd for
-//probes
+// rtt measurement
+// normal interests for data goes from 0 to
+// HICN_MIN_PROBE_SEQ, the rest is reserverd for
+// probes
#define HICN_MIN_PROBE_SEQ 0xefffffff
#define HICN_MAX_PROBE_SEQ 0xffffffff
// controller constant
-#define HICN_ROUND_LEN 200 // ms interval of time on which
- // we take decisions / measurements
+#define HICN_ROUND_LEN \
+ 200 // ms interval of time on which
+ // we take decisions / measurements
#define HICN_MAX_RTX 10
#define HICN_MAX_RTX_SIZE 1024
#define HICN_MAX_RTX_MAX_AGE 10000
-#define HICN_MIN_RTT_WIN 30 // rounds
-#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms
+#define HICN_MIN_RTT_WIN 30 // rounds
+#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms
// cwin
#define HICN_INITIAL_CWIN 1 // packets
#define HICN_INITIAL_CWIN_MAX 100000 // packets
-#define HICN_MIN_CWIN 10 // packets
+#define HICN_MIN_CWIN 10 // packets
#define HICN_WIN_INCREASE_FACTOR 1.5
#define HICN_WIN_DECREASE_FACTOR 0.9
@@ -70,30 +71,23 @@
#define HICN_MICRO_IN_A_SEC 1000000
#define HICN_MILLI_IN_A_SEC 1000
-
namespace transport {
namespace protocol {
-enum packetState {
- sent_,
- nacked_,
- received_,
- timeout1_,
- timeout2_,
- lost_
-};
+enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ };
typedef enum packetState packetState_t;
struct sentInterest {
uint64_t transmissionTime;
- uint32_t sequence; //sequence number of the interest sent
- //to handle seq % buffer_size
- packetState_t state; //see packet state
+ uint32_t sequence; // sequence number of the interest sent
+ // to handle seq % buffer_size
+ packetState_t state; // see packet state
};
-class RTCTransportProtocol : public TransportProtocol, public Reassembly {
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
public:
RTCTransportProtocol(interface::ConsumerSocket *icnet_socket);
@@ -133,11 +127,16 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
bool onNack(const ContentObject &content_object, bool rtx);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
- void returnContentToApplication(const ContentObject &content_object);
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
TRANSPORT_ALWAYS_INLINE virtual void reassemble(
ContentObject::Ptr &&content_object) override {
- returnContentToApplication(*content_object);
+ auto read_buffer = content_object->getPayload();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
}
// controller var
@@ -151,36 +150,36 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// names/packets var
uint32_t actualSegment_;
uint32_t inflightInterestsCount_;
- //map seq to rtx
+ // map seq to rtx
std::map<uint32_t, uint8_t> interestRetransmissions_;
bool rtx_timer_used_;
std::unique_ptr<asio::steady_timer> rtx_timer_;
std::vector<sentInterest> inflightInterests_;
- uint32_t lastSegNacked_; //indicates the segment id in the last received
- // past Nack. we do not ask for retransmissions
- //for samething that is older than this value.
- uint32_t lastReceived_; //segment of the last content object received
- //indicates the base of the window on the client
- uint64_t lastReceivedTime_; //time at which we recevied the
- //lastReceived_ packet
-
- //sentinel
- //if all packets in the window get lost we need something that
- //wakes up our consumer socket. Interest timeouts set to 1 sec
- //expire too late. This timers expire much sooner and if it
- //detects that all the interest in the window may be lost
- //it sends all of them again
+ uint32_t lastSegNacked_; // indicates the segment id in the last received
+ // past Nack. we do not ask for retransmissions
+ // for samething that is older than this value.
+ uint32_t lastReceived_; // segment of the last content object received
+ // indicates the base of the window on the client
+ uint64_t lastReceivedTime_; // time at which we recevied the
+ // lastReceived_ packet
+
+ // sentinel
+ // if all packets in the window get lost we need something that
+ // wakes up our consumer socket. Interest timeouts set to 1 sec
+ // expire too late. This timers expire much sooner and if it
+ // detects that all the interest in the window may be lost
+ // it sends all of them again
std::unique_ptr<asio::steady_timer> sentinel_timer_;
- uint64_t lastEvent_; //time at which we removed a pending
- //interest from the window
+ uint64_t lastEvent_; // time at which we removed a pending
+ // interest from the window
std::unordered_map<uint32_t, uint8_t> packets_in_window_;
- //rtt probes
- //the RTC transport tends to overestimate the RTT
- //du to the production time on the server side
- //once per second we send an interest for wich we know
- //we will get a nack. This nack will keep our estimation
- //close to the reality
+ // rtt probes
+ // the RTC transport tends to overestimate the RTT
+ // du to the production time on the server side
+ // once per second we send an interest for wich we know
+ // we will get a nack. This nack will keep our estimation
+ // close to the reality
std::unique_ptr<asio::steady_timer> probe_timer_;
uint64_t time_sent_probe_;
uint32_t probe_seq_number_;
@@ -203,10 +202,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t rounds_;
uint32_t roundsWithoutNacks_;
- //we keep track of up two paths (if only one path is in use
- //the two values in the vector will be the same)
- //position 0 stores the path with minRTT
- //position 1 stores the path with maxRTT
+ // we keep track of up two paths (if only one path is in use
+ // the two values in the vector will be the same)
+ // position 0 stores the path with minRTT
+ // position 1 stores the path with maxRTT
uint32_t producerPathLabels_[2];
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
@@ -219,7 +218,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
unsigned protocolState_;
bool initied;
- TransportStatistics stats_;
};
} // namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc
new file mode 100644
index 000000000..f45cab743
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.cc
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/verification_manager.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+
+namespace protocol {
+
+interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
+ const Packet& packet) {
+ using namespace interface;
+
+ bool verify_signature;
+ VerificationPolicy ret = VerificationPolicy::DROP_PACKET;
+
+ ConsumerContentObjectVerificationFailedCallback*
+ verification_failed_callback = VOID_HANDLER;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+
+ if (!verify_signature) {
+ return VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+ if (!verification_failed_callback) {
+ throw errors::RuntimeException(
+ "No verification failed callback provided by application. "
+ "Aborting.");
+ }
+
+ std::shared_ptr<utils::Verifier> verifier;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ if (TRANSPORT_EXPECT_FALSE(!verifier)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_, dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::no_verifier_provided));
+ return ret;
+ }
+
+ if (!verifier->verify(packet)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_, dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::signature_verification_failed));
+ } else {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ return ret;
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h
index da67e86f8..6e5d32127 100644
--- a/libtransport/src/hicn/transport/protocols/verification_manager.h
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.h
@@ -15,56 +15,37 @@
#pragma once
-#include <hicn/transport/interfaces/socket_consumer.h>
-
-#include <deque>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/verification_policy.h>
+#include <hicn/transport/protocols/errors.h>
namespace transport {
+namespace interface {
+class ConsumerSocket;
+}
+
namespace protocol {
+using Packet = core::Packet;
+using interface::ConsumerSocket;
+using interface::VerificationPolicy;
+
class VerificationManager {
public:
virtual ~VerificationManager() = default;
- virtual bool onPacketToVerify(const Packet& packet) = 0;
+ virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0;
};
class SignatureVerificationManager : public VerificationManager {
public:
- SignatureVerificationManager(interface::ConsumerSocket* icn_socket)
+ SignatureVerificationManager(ConsumerSocket* icn_socket)
: icn_socket_(icn_socket) {}
- TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override {
- using namespace interface;
-
- bool verify_signature, ret = false;
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
-
- if (!verify_signature) {
- return true;
- }
-
- std::shared_ptr<utils::Verifier> verifier;
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
-
- if (TRANSPORT_EXPECT_FALSE(!verifier)) {
- throw errors::RuntimeException(
- "No certificate provided by the application.");
- }
-
- ret = verifier->verify(packet);
-
- if (!ret) {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
-
- return ret;
- }
+ interface::VerificationPolicy onPacketToVerify(const Packet& packet) override;
private:
- interface::ConsumerSocket* icn_socket_;
+ ConsumerSocket* icn_socket_;
};
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/utils/CMakeLists.txt b/libtransport/src/hicn/transport/utils/CMakeLists.txt
index cbbca86ed..5a7dbe9cc 100644
--- a/libtransport/src/hicn/transport/utils/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/utils/CMakeLists.txt
@@ -19,7 +19,6 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/signer.cc
${CMAKE_CURRENT_SOURCE_DIR}/verifier.cc
${CMAKE_CURRENT_SOURCE_DIR}/identity.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.cc
${CMAKE_CURRENT_SOURCE_DIR}/log.cc
${CMAKE_CURRENT_SOURCE_DIR}/membuf.cc
${CMAKE_CURRENT_SOURCE_DIR}/content_store.cc
diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.cc b/libtransport/src/hicn/transport/utils/suffix_strategy.cc
deleted file mode 100644
index f3bcc4562..000000000
--- a/libtransport/src/hicn/transport/utils/suffix_strategy.cc
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/utils/suffix_strategy.h>
-
-using transport::core::NextSegmentCalculationStrategy;
-
-namespace utils {
-std::uint32_t SuffixManifest::getNextSuffix() {
- uint32_t next_suffix;
-
- switch (suffix_stragegy_) {
- case NextSegmentCalculationStrategy::INCREMENTAL:
- if (!nb_segments_) {
- throw errors::RuntimeException(
- "The number of segments in a manifest must be set "
- "before assigning incremental suffixes.");
- }
- /* The current manifest's suffix + the number of segments in a */
- /* manifest give the suffix of the last segment in the manifest. */
- /* The next manifest's suffix is therefore that number plus one. */
- next_suffix = suffix_ + nb_segments_ + 1;
- break;
-
- default:
- throw errors::RuntimeException("Unknown suffix strategy.");
- }
-
- return next_suffix;
-}
-
-std::uint32_t SuffixContent::getNextSuffix() {
- uint32_t next_suffix;
-
- switch (suffix_stragegy_) {
- case NextSegmentCalculationStrategy::INCREMENTAL:
- next_suffix = suffix_ + 1;
- if (making_manifest_) {
- if (!nb_segments_) {
- throw errors::RuntimeException(
- "The number of segments in a manifest must be set "
- "before assigning incremental suffixes.");
- }
-
- content_counter_++;
- /* If the counter have reached the manifest's capacity,
- * it means that the next suffix will be a manifest, so we skip it. */
- if (content_counter_ % nb_segments_ == 0) {
- next_suffix++;
- content_counter_ = 0;
- }
- }
- break;
-
- default:
- throw errors::RuntimeException("Unknown suffix strategy.");
- }
-
- return next_suffix;
-}
-} // namespace utils
diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.h b/libtransport/src/hicn/transport/utils/suffix_strategy.h
index 3014855f6..0ed3c5b0e 100644
--- a/libtransport/src/hicn/transport/utils/suffix_strategy.h
+++ b/libtransport/src/hicn/transport/utils/suffix_strategy.h
@@ -18,111 +18,148 @@
#include <hicn/transport/core/manifest_format.h>
namespace utils {
+
+using transport::core::NextSegmentCalculationStrategy;
+
class SuffixStrategy {
public:
- SuffixStrategy(
- transport::core::NextSegmentCalculationStrategy suffix_stragegy,
- std::uint32_t start_offset)
- : suffix_stragegy_(suffix_stragegy),
- suffix_(start_offset),
- nb_segments_(0) {}
-
- transport::core::NextSegmentCalculationStrategy getSuffixStrategy() {
- return suffix_stragegy_;
- }
+ static constexpr uint32_t INVALID_SUFFIX =
+ std::numeric_limits<uint32_t>::max();
+
+ SuffixStrategy(NextSegmentCalculationStrategy strategy)
+ : suffix_stragegy_(strategy),
+ total_count_(0),
+ final_suffix_(INVALID_SUFFIX) {}
+
+ virtual ~SuffixStrategy() = default;
+
+ virtual uint32_t getNextSuffix() = 0;
+
+ virtual uint32_t getFinalSuffix() { return final_suffix_; }
- void setSuffixStrategy(
- transport::core::NextSegmentCalculationStrategy strategy) {
- suffix_stragegy_ = strategy;
+ virtual void setFinalSuffix(std::uint32_t final_suffix) {
+ if (final_suffix != INVALID_SUFFIX) {
+ final_suffix_ = final_suffix;
+ }
}
- std::uint32_t getSuffix() { return suffix_; }
+ virtual uint32_t getNextManifestSuffix() = 0;
- void updateSuffix(std::uint32_t new_suffix) { suffix_ = new_suffix; }
+ virtual uint32_t getNextContentSuffix() = 0;
- std::size_t getNbSegments() { return nb_segments_; }
+ virtual void reset(uint32_t offset = 0) = 0;
- void setNbSegments(std::size_t nb_segments) { nb_segments_ = nb_segments; }
+ virtual uint32_t getManifestCapacity() = 0;
- void reset(std::uint32_t reset_suffix) {
- suffix_ = reset_suffix;
- nb_segments_ = 0;
+ virtual void setManifestCapacity(uint32_t capacity) = 0;
+
+ virtual uint32_t getTotalCount() { return total_count_; };
+
+ NextSegmentCalculationStrategy getSuffixStrategy() {
+ return suffix_stragegy_;
}
- ~SuffixStrategy() {}
+ protected:
+ inline void incrementTotalCount() { total_count_++; };
protected:
- transport::core::NextSegmentCalculationStrategy suffix_stragegy_;
- std::uint32_t suffix_;
- std::size_t nb_segments_;
- virtual std::uint32_t getNextSuffix() = 0;
+ NextSegmentCalculationStrategy suffix_stragegy_;
+ std::uint32_t total_count_;
+ std::uint32_t final_suffix_;
};
-class SuffixManifest : public SuffixStrategy {
+class IncrementalSuffixStrategy : public SuffixStrategy {
public:
- SuffixManifest(
- transport::core::NextSegmentCalculationStrategy suffix_stragegy,
- std::uint32_t start_offset)
- : SuffixStrategy(suffix_stragegy, start_offset) {}
-
- SuffixManifest operator++() {
- updateSuffix(getNextSuffix());
- SuffixManifest temp_suffix(suffix_stragegy_, suffix_);
- temp_suffix.setNbSegments(getNbSegments());
- return temp_suffix;
+ IncrementalSuffixStrategy(std::uint32_t start_offset)
+ : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL),
+ next_suffix_(start_offset) {}
+
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override {
+ incrementTotalCount();
+ return next_suffix_++;
+ }
+
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override {
+ return getNextSuffix();
}
- SuffixManifest operator++(int) {
- SuffixManifest temp_suffix(suffix_stragegy_, suffix_);
- temp_suffix.setNbSegments(getNbSegments());
- updateSuffix(getNextSuffix());
- return temp_suffix;
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override {
+ return getNextSuffix();
}
+ uint32_t getManifestCapacity() override {
+ throw errors::RuntimeException(
+ "No manifest capacity in IncrementalSuffixStrategy.");
+ }
+
+ void setManifestCapacity(uint32_t capacity) override {
+ throw errors::RuntimeException(
+ "No manifest capacity in IncrementalSuffixStrategy.");
+ }
+
+ void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; }
+
protected:
- std::uint32_t getNextSuffix();
+ std::uint32_t next_suffix_;
};
-class SuffixContent : public SuffixStrategy {
+class CapacityBasedSuffixStrategy : public SuffixStrategy {
public:
- SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy,
- std::uint32_t start_offset, bool making_manifest)
- : SuffixStrategy(suffix_stragegy, start_offset),
- making_manifest_(making_manifest),
- content_counter_(0) {}
-
- SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy,
- std::uint32_t start_offset)
- : SuffixContent(suffix_stragegy, start_offset, false) {}
-
- SuffixContent operator++() {
- updateSuffix(getNextSuffix());
- SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_);
- temp_suffix.setNbSegments(getNbSegments());
- temp_suffix.content_counter_ = content_counter_;
- return temp_suffix;
+ CapacityBasedSuffixStrategy(std::uint32_t start_offset,
+ std::uint32_t manifest_capacity)
+ : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL),
+ next_suffix_(start_offset),
+ segments_in_manifest_(manifest_capacity),
+ current_manifest_iteration_(0) {}
+
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override {
+ incrementTotalCount();
+ return next_suffix_++;
+ }
+
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override {
+ incrementTotalCount();
+ return next_suffix_ % segments_in_manifest_ == 0 ? next_suffix_++
+ : ++next_suffix_;
}
- SuffixContent operator++(int) {
- SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_);
- temp_suffix.setNbSegments(getNbSegments());
- temp_suffix.content_counter_ = content_counter_;
- updateSuffix(getNextSuffix());
- return temp_suffix;
+ TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override {
+ incrementTotalCount();
+ return (current_manifest_iteration_++) * (segments_in_manifest_ + 1);
}
- void setUsingManifest(bool value) { making_manifest_ = value; }
+ TRANSPORT_ALWAYS_INLINE uint32_t getManifestCapacity() override {
+ return segments_in_manifest_;
+ }
- void reset(std::uint32_t reset_suffix) {
- SuffixStrategy::reset(reset_suffix);
- content_counter_ = 0;
+ TRANSPORT_ALWAYS_INLINE void setManifestCapacity(uint32_t capacity) override {
+ segments_in_manifest_ = capacity;
}
+ void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; }
+
protected:
- bool making_manifest_;
- /* content_counter_ keeps track of the number of segments */
- /* between two manifests */
- uint32_t content_counter_;
- std::uint32_t getNextSuffix();
+ std::uint32_t next_suffix_;
+ std::uint32_t segments_in_manifest_;
+ std::uint32_t current_manifest_iteration_;
};
+
+class SuffixStrategyFactory {
+ public:
+ static std::unique_ptr<SuffixStrategy> getSuffixStrategy(
+ NextSegmentCalculationStrategy strategy, uint32_t start_offset,
+ uint32_t manifest_capacity = 0) {
+ switch (strategy) {
+ case NextSegmentCalculationStrategy::INCREMENTAL:
+ return std::make_unique<IncrementalSuffixStrategy>(start_offset);
+ case NextSegmentCalculationStrategy::MANIFEST_CAPACITY_BASED:
+ return std::make_unique<CapacityBasedSuffixStrategy>(start_offset,
+ manifest_capacity);
+ default:
+ throw errors::RuntimeException(
+ "No valid NextSegmentCalculationStrategy specified.");
+ }
+ }
+};
+
} // namespace utils