aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-07 20:00:06 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-12 18:40:52 +0100
commit3bce9bfdce707313de4f9cccdc867abd9edf82df (patch)
treebd7d75a7251888a3fc269fadebd59842c46a14a1 /libtransport/src/hicn/transport/interfaces
parentf9243a2bf823086404be1c41c7bcc1b27cfab7de (diff)
[HICN-508] [HICN-509] [HICN-506] Manifest rework
Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
-rw-r--r--libtransport/src/hicn/transport/interfaces/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.h15
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc60
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h16
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_keys.h11
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc75
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h3
-rw-r--r--libtransport/src/hicn/transport/interfaces/verification_policy.h33
8 files changed, 164 insertions, 50 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
index 0c2c73623..1f3c29b1f 100644
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -22,6 +22,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h
${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h
)
list(APPEND SOURCE_FILES
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
index 7194cca42..6de48d14b 100644
--- a/libtransport/src/hicn/transport/interfaces/callbacks.h
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.h
@@ -15,11 +15,12 @@
#pragma once
+#include <hicn/transport/core/facade.h>
+#include <hicn/transport/interfaces/verification_policy.h>
+
#include <functional>
#include <system_error>
-#include <hicn/transport/core/facade.h>
-
namespace utils {
class MemBuf;
}
@@ -85,6 +86,16 @@ using ConsumerContentObjectVerificationCallback =
std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
/**
+ * The ConsumerContentObjectVerificationFailedCallback will be caled by the
+ * transport if a data packet (either manifest or content object) cannot be
+ * verified. The application here decides what to do by returning a
+ * VerificationFailedPolicy object.
+ */
+using ConsumerContentObjectVerificationFailedCallback =
+ std::function<VerificationPolicy(
+ ConsumerSocket &, const core::ContentObject &, std::error_code ec)>;
+
+/**
* The ConsumerManifestCallback will be called by the consumer socket when a
* manifest is received.
*/
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index fbe4bed1a..fba972fe5 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -417,6 +417,28 @@ int ConsumerSocket::setSocketOption(
});
}
+int ConsumerSocket::setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value) {
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ verification_failed_callback_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
int ConsumerSocket::setSocketOption(int socket_option_key,
IcnObserver *socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
@@ -712,6 +734,29 @@ int ConsumerSocket::getSocketOption(
}
int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](
+ int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::VERIFICATION_FAILED:
+ *socket_option_value = &verification_failed_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
switch (socket_option_key) {
case PORTAL:
@@ -767,6 +812,19 @@ int ConsumerSocket::getSocketOption(int socket_option_key,
return SOCKET_OPTION_GET;
}
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ TransportStatistics **socket_option_value) {
+ switch (socket_option_key) {
+ case OtherOptions::STATISTICS:
+ *socket_option_value = &stats_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
int ConsumerSocket::getSocketOption(
int socket_option_key, ConsumerTimerCallback **socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in case
@@ -789,4 +847,4 @@ int ConsumerSocket::getSocketOption(
} // namespace interface
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 0f83fd38f..acce28c1d 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -18,6 +18,7 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/statistics.h>
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/verifier.h>
@@ -224,6 +225,10 @@ class ConsumerSocket : public BaseSocket {
virtual int setSocketOption(
int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback socket_option_value);
+
+ virtual int setSocketOption(
+ int socket_option_key,
ConsumerContentObjectVerificationCallback socket_option_value);
virtual int setSocketOption(int socket_option_key,
@@ -262,6 +267,10 @@ class ConsumerSocket : public BaseSocket {
virtual int getSocketOption(
int socket_option_key,
+ ConsumerContentObjectVerificationFailedCallback **socket_option_value);
+
+ virtual int getSocketOption(
+ int socket_option_key,
ConsumerContentObjectVerificationCallback **socket_option_value);
virtual int getSocketOption(int socket_option_key,
@@ -286,6 +295,9 @@ class ConsumerSocket : public BaseSocket {
virtual int getSocketOption(int socket_option_key,
ConsumerTimerCallback **socket_option_value);
+ virtual int getSocketOption(int socket_option_key,
+ TransportStatistics **socket_option_value);
+
protected:
// If the thread calling lambda_func is not the same of io_service, this
// function reschedule the function on it
@@ -364,6 +376,7 @@ class ConsumerSocket : public BaseSocket {
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
ConsumerTimerCallback stats_summary_;
+ ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
@@ -375,6 +388,9 @@ class ConsumerSocket : public BaseSocket {
// Transport protocol
std::unique_ptr<TransportProtocol> transport_protocol_;
+ // Statistic
+ TransportStatistics stats_;
+
utils::SpinLock guard_raaqm_params_;
};
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
index e14f0f412..b25bacbb9 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -77,8 +77,9 @@ typedef enum {
CONTENT_OBJECT_INPUT = 411,
MANIFEST_INPUT = 412,
CONTENT_OBJECT_TO_VERIFY = 413,
- READ_CALLBACK = 414,
- STATS_SUMMARY = 415
+ VERIFICATION_FAILED = 414,
+ READ_CALLBACK = 415,
+ STATS_SUMMARY = 416
} ConsumerCallbacksOptions;
typedef enum {
@@ -96,7 +97,11 @@ typedef enum {
typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions;
-typedef enum { VIRTUAL_DOWNLOAD = 701, USE_CFG_FILE = 702 } OtherOptions;
+typedef enum {
+ VIRTUAL_DOWNLOAD = 701,
+ USE_CFG_FILE = 702,
+ STATISTICS
+} OtherOptions;
typedef enum {
SHA_256 = 801,
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 6782000ac..4fef5d1e2 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -38,8 +38,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
hash_algorithm_(HashAlgorithm::SHA_256),
- suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
- suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -159,8 +158,8 @@ uint32_t ProducerSocket::produce(Name content_name,
uint32_t content_object_expiry_time = content_object_expiry_time_;
HashAlgorithm hash_algo = hash_algorithm_;
bool making_manifest = making_manifest_;
- utils::SuffixContent suffix_content = suffix_content_;
- utils::SuffixManifest suffix_manifest = suffix_manifest_;
+ auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
+ suffix_strategy_, start_offset);
std::shared_ptr<utils::Identity> identity;
getSocketOption(GeneralTransportOptions::IDENTITY, identity);
@@ -169,19 +168,16 @@ uint32_t ProducerSocket::produce(Name content_name,
std::size_t header_size;
std::size_t manifest_header_size = 0;
std::size_t signature_length = 0;
- std::uint32_t final_block_number = 0;
+ std::uint32_t final_block_number = start_offset;
uint64_t free_space_for_content = 0;
core::Packet::Format format;
std::shared_ptr<ContentObjectManifest> manifest;
bool is_last_manifest = false;
- suffix_content.updateSuffix(start_offset);
- suffix_content.setUsingManifest(making_manifest);
// TODO Manifest may still be used for indexing
if (making_manifest && !identity) {
- throw errors::RuntimeException(
- "Making manifests without setting producer identity. Aborting.");
+ TRANSPORT_LOGD("Making manifests without setting producer identity.");
}
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
@@ -200,9 +196,9 @@ uint32_t ProducerSocket::produce(Name content_name,
format = hf_format;
if (making_manifest) {
- format = hf_format;
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- hf_format_ah, identity->getSignatureLength());
+ identity ? hf_format_ah : hf_format,
+ identity ? identity->getSignatureLength() : 0);
} else if (identity) {
format = hf_format_ah;
signature_length = identity->getSignatureLength();
@@ -225,28 +221,20 @@ uint32_t ProducerSocket::produce(Name content_name,
1.0);
uint32_t number_of_manifests = static_cast<uint32_t>(
std::ceil(float(number_of_segments) / segment_in_manifest));
- final_block_number = number_of_segments + number_of_manifests - 1;
-
- suffix_manifest.updateSuffix(start_offset);
- suffix_manifest.setNbSegments(segment_in_manifest);
- suffix_content.updateSuffix(start_offset + 1);
- suffix_content.setNbSegments(segment_in_manifest);
+ final_block_number += number_of_segments + number_of_manifests - 1;
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_manifest.getSuffix()),
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algo, is_last_manifest, content_name,
- core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity->getSignatureLength()));
+ hash_algo, is_last_manifest, content_name, suffix_strategy_,
+ identity ? identity->getSignatureLength() : 0));
manifest->setLifetime(content_object_expiry_time);
- suffix_manifest++;
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
} else {
- manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
+ manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
}
-
}
for (unsigned int packaged_segments = 0;
@@ -256,7 +244,12 @@ uint32_t ProducerSocket::produce(Name content_name,
data_packet_size - manifest_header_size) {
// Send the current manifest
manifest->encode();
- identity->getSigner().sign(*manifest);
+
+ // If identity set, sign manifest
+ if (identity) {
+ identity->getSigner().sign(*manifest);
+ }
+
passContentObjectToCallbacks(manifest);
// Send content objects stored in the queue
@@ -269,25 +262,22 @@ uint32_t ProducerSocket::produce(Name content_name,
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(suffix_manifest.getSuffix()),
+ content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1,
core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- content_name, core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity->getSignatureLength()));
- manifest->setLifetime(content_object_expiry_time);
+ content_name, suffix_strategy_,
+ identity ? identity->getSignatureLength() : 0));
- if (is_last) {
- manifest->setFinalBlockNumber(final_block_number);
- } else {
- manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
- }
-
- suffix_manifest++;
+ manifest->setLifetime(content_object_expiry_time);
+ manifest->setFinalBlockNumber(
+ is_last ? final_block_number
+ : utils::SuffixStrategy::INVALID_SUFFIX);
}
}
+ auto content_suffix = suffix_strategy->getNextContentSuffix();
auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(suffix_content.getSuffix()), format);
+ content_name.setSuffix(content_suffix), format);
content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
@@ -314,7 +304,7 @@ uint32_t ProducerSocket::produce(Name content_name,
if (making_manifest) {
using namespace std::chrono_literals;
utils::CryptoHash hash = content_object->computeDigest(hash_algo);
- manifest->addSuffixHash(suffix_content.getSuffix(), hash);
+ manifest->addSuffixHash(content_suffix, hash);
content_queue_.push(content_object);
} else {
if (identity) {
@@ -322,8 +312,6 @@ uint32_t ProducerSocket::produce(Name content_name,
}
passContentObjectToCallbacks(content_object);
}
-
- suffix_content++;
}
if (making_manifest) {
@@ -332,7 +320,10 @@ uint32_t ProducerSocket::produce(Name content_name,
}
manifest->encode();
- identity->getSigner().sign(*manifest);
+ if (identity) {
+ identity->getSigner().sign(*manifest);
+ }
+
passContentObjectToCallbacks(manifest);
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
@@ -347,7 +338,7 @@ uint32_t ProducerSocket::produce(Name content_name,
});
}
- return suffix_content.getSuffix() - start_offset;
+ return suffix_strategy->getTotalCount();
}
void ProducerSocket::asyncProduce(ContentObject &content_object) {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 83d0f73f3..ff6f49723 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -193,8 +193,7 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic<utils::CryptoSuite> crypto_suite_;
utils::SpinLock identity_lock_;
std::shared_ptr<utils::Identity> identity_;
- utils::SuffixManifest suffix_manifest_;
- utils::SuffixContent suffix_content_;
+ core::NextSegmentCalculationStrategy suffix_strategy_;
// While manifests are being built, contents are stored in a queue
std::queue<std::shared_ptr<ContentObject>> content_queue_;
diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h
new file mode 100644
index 000000000..cb5140ac1
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/verification_policy.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace transport {
+namespace interface {
+
+/**
+ * This policy allows the application to tell the transport what to do in case
+ * the verification of a content object fails.
+ */
+enum class VerificationPolicy : std::uint8_t {
+ DROP_PACKET,
+ ACCEPT_PACKET,
+ ABORT_SESSION
+};
+} // namespace interface
+} // namespace transport \ No newline at end of file