From 3bce9bfdce707313de4f9cccdc867abd9edf82df Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 7 Feb 2020 20:00:06 +0100 Subject: [HICN-508] [HICN-509] [HICN-506] Manifest rework Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara --- .../src/hicn/transport/interfaces/CMakeLists.txt | 1 + .../src/hicn/transport/interfaces/callbacks.h | 15 ++++- .../hicn/transport/interfaces/socket_consumer.cc | 60 ++++++++++++++++- .../hicn/transport/interfaces/socket_consumer.h | 16 +++++ .../transport/interfaces/socket_options_keys.h | 11 +++- .../hicn/transport/interfaces/socket_producer.cc | 75 ++++++++++------------ .../hicn/transport/interfaces/socket_producer.h | 3 +- .../transport/interfaces/verification_policy.h | 33 ++++++++++ 8 files changed, 164 insertions(+), 50 deletions(-) create mode 100644 libtransport/src/hicn/transport/interfaces/verification_policy.h (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index 0c2c73623..1f3c29b1f 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -22,6 +22,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h ) list(APPEND SOURCE_FILES diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 7194cca42..6de48d14b 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -15,11 +15,12 @@ #pragma once +#include +#include + #include #include -#include - namespace utils { class MemBuf; } @@ -84,6 +85,16 @@ using ConsumerContentObjectCallback = using ConsumerContentObjectVerificationCallback = std::function; +/** + * The ConsumerContentObjectVerificationFailedCallback will be caled by the + * transport if a data packet (either manifest or content object) cannot be + * verified. The application here decides what to do by returning a + * VerificationFailedPolicy object. + */ +using ConsumerContentObjectVerificationFailedCallback = + std::function; + /** * The ConsumerManifestCallback will be called by the consumer socket when a * manifest is received. diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index fbe4bed1a..fba972fe5 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -417,6 +417,28 @@ int ConsumerSocket::setSocketOption( }); } +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + verification_failed_callback_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + int ConsumerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); @@ -711,6 +733,29 @@ int ConsumerSocket::getSocketOption( }); } +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + *socket_option_value = &verification_failed_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + int ConsumerSocket::getSocketOption( int socket_option_key, std::shared_ptr &socket_option_value) { switch (socket_option_key) { @@ -767,6 +812,19 @@ int ConsumerSocket::getSocketOption(int socket_option_key, return SOCKET_OPTION_GET; } +int ConsumerSocket::getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value) { + switch (socket_option_key) { + case OtherOptions::STATISTICS: + *socket_option_value = &stats_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + int ConsumerSocket::getSocketOption( int socket_option_key, ConsumerTimerCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in case @@ -789,4 +847,4 @@ int ConsumerSocket::getSocketOption( } // namespace interface -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f83fd38f..acce28c1d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -222,6 +223,10 @@ class ConsumerSocket : public BaseSocket { virtual int setSocketOption( int socket_option_key, ConsumerContentObjectCallback socket_option_value); + virtual int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value); + virtual int setSocketOption( int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value); @@ -260,6 +265,10 @@ class ConsumerSocket : public BaseSocket { int socket_option_key, ConsumerContentObjectCallback **socket_option_value); + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value); + virtual int getSocketOption( int socket_option_key, ConsumerContentObjectVerificationCallback **socket_option_value); @@ -286,6 +295,9 @@ class ConsumerSocket : public BaseSocket { virtual int getSocketOption(int socket_option_key, ConsumerTimerCallback **socket_option_value); + virtual int getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value); + protected: // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it @@ -364,6 +376,7 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; ConsumerTimerCallback stats_summary_; + ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; @@ -375,6 +388,9 @@ class ConsumerSocket : public BaseSocket { // Transport protocol std::unique_ptr transport_protocol_; + // Statistic + TransportStatistics stats_; + utils::SpinLock guard_raaqm_params_; }; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index e14f0f412..b25bacbb9 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -77,8 +77,9 @@ typedef enum { CONTENT_OBJECT_INPUT = 411, MANIFEST_INPUT = 412, CONTENT_OBJECT_TO_VERIFY = 413, - READ_CALLBACK = 414, - STATS_SUMMARY = 415 + VERIFICATION_FAILED = 414, + READ_CALLBACK = 415, + STATS_SUMMARY = 416 } ConsumerCallbacksOptions; typedef enum { @@ -96,7 +97,11 @@ typedef enum { typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions; -typedef enum { VIRTUAL_DOWNLOAD = 701, USE_CFG_FILE = 702 } OtherOptions; +typedef enum { + VIRTUAL_DOWNLOAD = 701, + USE_CFG_FILE = 702, + STATISTICS +} OtherOptions; typedef enum { SHA_256 = 801, diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 6782000ac..4fef5d1e2 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -38,8 +38,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -159,8 +158,8 @@ uint32_t ProducerSocket::produce(Name content_name, uint32_t content_object_expiry_time = content_object_expiry_time_; HashAlgorithm hash_algo = hash_algorithm_; bool making_manifest = making_manifest_; - utils::SuffixContent suffix_content = suffix_content_; - utils::SuffixManifest suffix_manifest = suffix_manifest_; + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + suffix_strategy_, start_offset); std::shared_ptr identity; getSocketOption(GeneralTransportOptions::IDENTITY, identity); @@ -169,19 +168,16 @@ uint32_t ProducerSocket::produce(Name content_name, std::size_t header_size; std::size_t manifest_header_size = 0; std::size_t signature_length = 0; - std::uint32_t final_block_number = 0; + std::uint32_t final_block_number = start_offset; uint64_t free_space_for_content = 0; core::Packet::Format format; std::shared_ptr manifest; bool is_last_manifest = false; - suffix_content.updateSuffix(start_offset); - suffix_content.setUsingManifest(making_manifest); // TODO Manifest may still be used for indexing if (making_manifest && !identity) { - throw errors::RuntimeException( - "Making manifests without setting producer identity. Aborting."); + TRANSPORT_LOGD("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -200,9 +196,9 @@ uint32_t ProducerSocket::produce(Name content_name, format = hf_format; if (making_manifest) { - format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity->getSignatureLength()); + identity ? hf_format_ah : hf_format, + identity ? identity->getSignatureLength() : 0); } else if (identity) { format = hf_format_ah; signature_length = identity->getSignatureLength(); @@ -225,28 +221,20 @@ uint32_t ProducerSocket::produce(Name content_name, 1.0); uint32_t number_of_manifests = static_cast( std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number = number_of_segments + number_of_manifests - 1; - - suffix_manifest.updateSuffix(start_offset); - suffix_manifest.setNbSegments(segment_in_manifest); - suffix_content.updateSuffix(start_offset + 1); - suffix_content.setNbSegments(segment_in_manifest); + final_block_number += number_of_segments + number_of_manifests - 1; manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, - core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); + hash_algo, is_last_manifest, content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); - suffix_manifest++; if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { - manifest->setFinalBlockNumber(std::numeric_limits::max()); + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); } - } for (unsigned int packaged_segments = 0; @@ -256,7 +244,12 @@ uint32_t ProducerSocket::produce(Name content_name, data_packet_size - manifest_header_size) { // Send the current manifest manifest->encode(); - identity->getSigner().sign(*manifest); + + // If identity set, sign manifest + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); // Send content objects stored in the queue @@ -269,25 +262,22 @@ uint32_t ProducerSocket::produce(Name content_name, // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time); + content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(std::numeric_limits::max()); - } - - suffix_manifest++; + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); } } + auto content_suffix = suffix_strategy->getNextContentSuffix(); auto content_object = std::make_shared( - content_name.setSuffix(suffix_content.getSuffix()), format); + content_name.setSuffix(content_suffix), format); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); @@ -314,7 +304,7 @@ uint32_t ProducerSocket::produce(Name content_name, if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(suffix_content.getSuffix(), hash); + manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { if (identity) { @@ -322,8 +312,6 @@ uint32_t ProducerSocket::produce(Name content_name, } passContentObjectToCallbacks(content_object); } - - suffix_content++; } if (making_manifest) { @@ -332,7 +320,10 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity->getSigner().sign(*manifest); + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); @@ -347,7 +338,7 @@ uint32_t ProducerSocket::produce(Name content_name, }); } - return suffix_content.getSuffix() - start_offset; + return suffix_strategy->getTotalCount(); } void ProducerSocket::asyncProduce(ContentObject &content_object) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 83d0f73f3..ff6f49723 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -193,8 +193,7 @@ class ProducerSocket : public Socket, std::atomic crypto_suite_; utils::SpinLock identity_lock_; std::shared_ptr identity_; - utils::SuffixManifest suffix_manifest_; - utils::SuffixContent suffix_content_; + core::NextSegmentCalculationStrategy suffix_strategy_; // While manifests are being built, contents are stored in a queue std::queue> content_queue_; diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h new file mode 100644 index 000000000..cb5140ac1 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/verification_policy.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace transport { +namespace interface { + +/** + * This policy allows the application to tell the transport what to do in case + * the verification of a content object fails. + */ +enum class VerificationPolicy : std::uint8_t { + DROP_PACKET, + ACCEPT_PACKET, + ABORT_SESSION +}; +} // namespace interface +} // namespace transport \ No newline at end of file -- cgit 1.2.3-korg