diff options
42 files changed, 1728 insertions, 1040 deletions
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h index aa5662e24..db485605f 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/ATSConnector.h @@ -36,7 +36,7 @@ typedef std::deque< BufferQueue; class ATSConnector { - static constexpr uint32_t buffer_size = 1024 * 64; + static constexpr uint32_t buffer_size = 1024 * 512; enum class ConnectorState { CLOSED, diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc index 18553d84b..8d0fb4917 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/IcnReceiver.cc @@ -80,6 +80,13 @@ AsyncConsumerProducer::AsyncConsumerProducer( } ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::MAKE_MANIFEST, true); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: impossible to enable signatures."); + } + + ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); if (ret != SOCKET_OPTION_SET) { @@ -130,6 +137,8 @@ void AsyncConsumerProducer::manageIncomingInterest( auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); + std::cout << "Received interest " << seg << std::endl; + if (_it != _end) { if (_it->second.second) { // Content is in production @@ -137,7 +146,7 @@ void AsyncConsumerProducer::manageIncomingInterest( } if (seg >= _it->second.first) { - TRANSPORT_LOGD( + TRANSPORT_LOGI( "Ignoring interest with name %s for a content object which does not " "exist. (Request: %u, max: %u)", name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); @@ -145,6 +154,8 @@ void AsyncConsumerProducer::manageIncomingInterest( } } + std::cout << "Received interest " << seg << std::endl; + bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); @@ -194,6 +205,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, if (headers) { request_counter_++; } + it->second.first += producer_socket_.produce(name, data, size, is_last, start_suffix); diff --git a/libtransport/src/hicn/transport/core/manifest_format.h b/libtransport/src/hicn/transport/core/manifest_format.h index 451e3db6a..9b6777270 100644 --- a/libtransport/src/hicn/transport/core/manifest_format.h +++ b/libtransport/src/hicn/transport/core/manifest_format.h @@ -51,8 +51,18 @@ enum class HashAlgorithm : uint8_t { CRC32C = static_cast<uint8_t>(utils::CryptoHashType::CRC32C), }; +/** + * INCREMENTAL: Manifests will be received inline with the data with no specific + * assumption regarding the manifest capacity. Consumers can send interests + * using a +1 heuristic. + * + * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of + * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when + * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3 + */ enum class NextSegmentCalculationStrategy : uint8_t { INCREMENTAL = 1, + MANIFEST_CAPACITY_BASED = 2, }; template <typename T> diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index 32269d49d..aa9cb0463 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -16,6 +16,8 @@ #include <hicn/transport/http/client_connection.h> #include <hicn/transport/utils/hash.h> +#include <fstream> + #define DEFAULT_BETA 0.99 #define DEFAULT_GAMMA 0.07 @@ -38,6 +40,12 @@ HTTPClientConnection::HTTPClientConnection() std::placeholders::_2)); consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::VERIFICATION_FAILED, + (ConsumerContentObjectVerificationFailedCallback)std::bind( + &HTTPClientConnection::onSignatureVerificationFailed, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, false); consumer_.connect(); std::shared_ptr<typename ConsumerSocket::Portal> portal; @@ -87,6 +95,10 @@ HTTPClientConnection::RC HTTPClientConnection::sendRequest( return return_code_; } +void HTTPClientConnection::verifyPacketSignature(bool verify) { + consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, verify); +} + void HTTPClientConnection::sendRequestGetReply( const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response, std::string &ipv6_first_word) { @@ -186,6 +198,12 @@ HTTPClientConnection &HTTPClientConnection::setCertificate( return *this; } +VerificationPolicy HTTPClientConnection::onSignatureVerificationFailed( + ConsumerSocket &consumer, const core::ContentObject &content_object, + std::error_code reason) { + return VerificationPolicy::ACCEPT_PACKET; +} + // Read buffer management void HTTPClientConnection::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h index 5bcf9c4c7..e001653ab 100644 --- a/libtransport/src/hicn/transport/http/client_connection.h +++ b/libtransport/src/hicn/transport/http/client_connection.h @@ -20,6 +20,7 @@ #include <hicn/transport/http/response.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/interfaces/verification_policy.h> #include <hicn/transport/utils/uri.h> #include <vector> @@ -68,6 +69,8 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback { HTTPClientConnection &setCertificate(const std::string &cert_path); + void verifyPacketSignature(bool verify); + private: void sendRequestGetReply(const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response, @@ -80,6 +83,10 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback { const core::Interest &interest, std::string &payload); + VerificationPolicy onSignatureVerificationFailed( + ConsumerSocket &consumer, const core::ContentObject &content_object, + std::error_code reason); + // Read callback bool isBufferMovable() noexcept override { return true; } void getReadBuffer(uint8_t **application_buffer, diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index 0c2c73623..1f3c29b1f 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -22,6 +22,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h ) list(APPEND SOURCE_FILES diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 7194cca42..6de48d14b 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -15,11 +15,12 @@ #pragma once +#include <hicn/transport/core/facade.h> +#include <hicn/transport/interfaces/verification_policy.h> + #include <functional> #include <system_error> -#include <hicn/transport/core/facade.h> - namespace utils { class MemBuf; } @@ -85,6 +86,16 @@ using ConsumerContentObjectVerificationCallback = std::function<bool(ConsumerSocket &, const core::ContentObject &)>; /** + * The ConsumerContentObjectVerificationFailedCallback will be caled by the + * transport if a data packet (either manifest or content object) cannot be + * verified. The application here decides what to do by returning a + * VerificationFailedPolicy object. + */ +using ConsumerContentObjectVerificationFailedCallback = + std::function<VerificationPolicy( + ConsumerSocket &, const core::ContentObject &, std::error_code ec)>; + +/** * The ConsumerManifestCallback will be called by the consumer socket when a * manifest is received. */ diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index fbe4bed1a..fba972fe5 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -417,6 +417,28 @@ int ConsumerSocket::setSocketOption( }); } +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + verification_failed_callback_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + int ConsumerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); @@ -712,6 +734,29 @@ int ConsumerSocket::getSocketOption( } int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + *socket_option_value = &verification_failed_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: @@ -767,6 +812,19 @@ int ConsumerSocket::getSocketOption(int socket_option_key, return SOCKET_OPTION_GET; } +int ConsumerSocket::getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value) { + switch (socket_option_key) { + case OtherOptions::STATISTICS: + *socket_option_value = &stats_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + int ConsumerSocket::getSocketOption( int socket_option_key, ConsumerTimerCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in case @@ -789,4 +847,4 @@ int ConsumerSocket::getSocketOption( } // namespace interface -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f83fd38f..acce28c1d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -18,6 +18,7 @@ #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/protocols/protocol.h> +#include <hicn/transport/protocols/statistics.h> #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/verifier.h> @@ -224,6 +225,10 @@ class ConsumerSocket : public BaseSocket { virtual int setSocketOption( int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value); virtual int setSocketOption(int socket_option_key, @@ -262,6 +267,10 @@ class ConsumerSocket : public BaseSocket { virtual int getSocketOption( int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value); + + virtual int getSocketOption( + int socket_option_key, ConsumerContentObjectVerificationCallback **socket_option_value); virtual int getSocketOption(int socket_option_key, @@ -286,6 +295,9 @@ class ConsumerSocket : public BaseSocket { virtual int getSocketOption(int socket_option_key, ConsumerTimerCallback **socket_option_value); + virtual int getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value); + protected: // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it @@ -364,6 +376,7 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; ConsumerTimerCallback stats_summary_; + ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; @@ -375,6 +388,9 @@ class ConsumerSocket : public BaseSocket { // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + // Statistic + TransportStatistics stats_; + utils::SpinLock guard_raaqm_params_; }; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index e14f0f412..b25bacbb9 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -77,8 +77,9 @@ typedef enum { CONTENT_OBJECT_INPUT = 411, MANIFEST_INPUT = 412, CONTENT_OBJECT_TO_VERIFY = 413, - READ_CALLBACK = 414, - STATS_SUMMARY = 415 + VERIFICATION_FAILED = 414, + READ_CALLBACK = 415, + STATS_SUMMARY = 416 } ConsumerCallbacksOptions; typedef enum { @@ -96,7 +97,11 @@ typedef enum { typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions; -typedef enum { VIRTUAL_DOWNLOAD = 701, USE_CFG_FILE = 702 } OtherOptions; +typedef enum { + VIRTUAL_DOWNLOAD = 701, + USE_CFG_FILE = 702, + STATISTICS +} OtherOptions; typedef enum { SHA_256 = 801, diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 6782000ac..4fef5d1e2 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -38,8 +38,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -159,8 +158,8 @@ uint32_t ProducerSocket::produce(Name content_name, uint32_t content_object_expiry_time = content_object_expiry_time_; HashAlgorithm hash_algo = hash_algorithm_; bool making_manifest = making_manifest_; - utils::SuffixContent suffix_content = suffix_content_; - utils::SuffixManifest suffix_manifest = suffix_manifest_; + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + suffix_strategy_, start_offset); std::shared_ptr<utils::Identity> identity; getSocketOption(GeneralTransportOptions::IDENTITY, identity); @@ -169,19 +168,16 @@ uint32_t ProducerSocket::produce(Name content_name, std::size_t header_size; std::size_t manifest_header_size = 0; std::size_t signature_length = 0; - std::uint32_t final_block_number = 0; + std::uint32_t final_block_number = start_offset; uint64_t free_space_for_content = 0; core::Packet::Format format; std::shared_ptr<ContentObjectManifest> manifest; bool is_last_manifest = false; - suffix_content.updateSuffix(start_offset); - suffix_content.setUsingManifest(making_manifest); // TODO Manifest may still be used for indexing if (making_manifest && !identity) { - throw errors::RuntimeException( - "Making manifests without setting producer identity. Aborting."); + TRANSPORT_LOGD("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -200,9 +196,9 @@ uint32_t ProducerSocket::produce(Name content_name, format = hf_format; if (making_manifest) { - format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity->getSignatureLength()); + identity ? hf_format_ah : hf_format, + identity ? identity->getSignatureLength() : 0); } else if (identity) { format = hf_format_ah; signature_length = identity->getSignatureLength(); @@ -225,28 +221,20 @@ uint32_t ProducerSocket::produce(Name content_name, 1.0); uint32_t number_of_manifests = static_cast<uint32_t>( std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number = number_of_segments + number_of_manifests - 1; - - suffix_manifest.updateSuffix(start_offset); - suffix_manifest.setNbSegments(segment_in_manifest); - suffix_content.updateSuffix(start_offset + 1); - suffix_content.setNbSegments(segment_in_manifest); + final_block_number += number_of_segments + number_of_manifests - 1; manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, - core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); + hash_algo, is_last_manifest, content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); - suffix_manifest++; if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { - manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); } - } for (unsigned int packaged_segments = 0; @@ -256,7 +244,12 @@ uint32_t ProducerSocket::produce(Name content_name, data_packet_size - manifest_header_size) { // Send the current manifest manifest->encode(); - identity->getSigner().sign(*manifest); + + // If identity set, sign manifest + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); // Send content objects stored in the queue @@ -269,25 +262,22 @@ uint32_t ProducerSocket::produce(Name content_name, // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time); + content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); - } - - suffix_manifest++; + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); } } + auto content_suffix = suffix_strategy->getNextContentSuffix(); auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(suffix_content.getSuffix()), format); + content_name.setSuffix(content_suffix), format); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); @@ -314,7 +304,7 @@ uint32_t ProducerSocket::produce(Name content_name, if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(suffix_content.getSuffix(), hash); + manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { if (identity) { @@ -322,8 +312,6 @@ uint32_t ProducerSocket::produce(Name content_name, } passContentObjectToCallbacks(content_object); } - - suffix_content++; } if (making_manifest) { @@ -332,7 +320,10 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity->getSigner().sign(*manifest); + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); @@ -347,7 +338,7 @@ uint32_t ProducerSocket::produce(Name content_name, }); } - return suffix_content.getSuffix() - start_offset; + return suffix_strategy->getTotalCount(); } void ProducerSocket::asyncProduce(ContentObject &content_object) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 83d0f73f3..ff6f49723 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -193,8 +193,7 @@ class ProducerSocket : public Socket<BasePortal>, std::atomic<utils::CryptoSuite> crypto_suite_; utils::SpinLock identity_lock_; std::shared_ptr<utils::Identity> identity_; - utils::SuffixManifest suffix_manifest_; - utils::SuffixContent suffix_content_; + core::NextSegmentCalculationStrategy suffix_strategy_; // While manifests are being built, contents are stored in a queue std::queue<std::shared_ptr<ContentObject>> content_queue_; diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h new file mode 100644 index 000000000..cb5140ac1 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/verification_policy.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> + +namespace transport { +namespace interface { + +/** + * This policy allows the application to tell the transport what to do in case + * the verification of a content object fails. + */ +enum class VerificationPolicy : std::uint8_t { + DROP_PACKET, + ACCEPT_PACKET, + ABORT_SESSION +}; +} // namespace interface +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt index 23aeca9bf..06515e0e2 100644 --- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt +++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt @@ -14,8 +14,12 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h @@ -27,11 +31,18 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h ) list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc @@ -39,7 +50,8 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc ) set(RAAQM_CONFIG_INSTALL_PREFIX diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc new file mode 100644 index 000000000..2f1e5d8fd --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/byte_stream_reassembly.h> + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/errors.h> +#include <hicn/transport/protocols/indexer.h> +#include <hicn/transport/utils/array.h> +#include <hicn/transport/utils/membuf.h> + +namespace transport { + +namespace protocol { + +ByteStreamReassembly::ByteStreamReassembly( + interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : Reassembly(icn_socket, transport_protocol), + index_(IndexManager::invalid_index), + download_complete_(false) {} + +void ByteStreamReassembly::reassemble( + std::unique_ptr<ContentObjectManifest> &&manifest) { + if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { + received_packets_.emplace( + std::make_pair(manifest->getName().getSuffix(), nullptr)); + assembleContent(); + } +} + +void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + received_packets_.emplace(std::make_pair( + content_object->getName().getSuffix(), std::move(content_object))); + assembleContent(); + } +} + +void ByteStreamReassembly::assembleContent() { + if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) { + index_ = index_manager_->getNextReassemblySegment(); + if (index_ == IndexManager::invalid_index) { + return; + } + } + + auto it = received_packets_.find((const unsigned int)index_); + while (it != received_packets_.end()) { + // Check if valid packet + if (it->second) { + copyContent(*it->second); + } + + received_packets_.erase(it); + index_ = index_manager_->getNextReassemblySegment(); + it = received_packets_.find((const unsigned int)index_); + } + + if (!download_complete_ && index_ != IndexManager::invalid_index) { + transport_protocol_->onReassemblyFailed(index_); + } +} + +void ByteStreamReassembly::copyContent(const ContentObject &content_object) { + auto a = content_object.getPayload(); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } + + download_complete_ = + index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + notifyApplication(); + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::success)); + } +} + +void ByteStreamReassembly::reInitialize() { + index_ = IndexManager::invalid_index; + download_complete_ = false; + + received_packets_.clear(); + + // reset read buffer + interface::ConsumerSocket::ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h new file mode 100644 index 000000000..7c77d486f --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class ByteStreamReassembly : public Reassembly { + public: + ByteStreamReassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + protected: + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override; + + virtual void copyContent(const core::ContentObject &content_object); + + virtual void reInitialize() override; + + private: + void assembleContent(); + + protected: + // The consumer socket + // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; + // std::unique_ptr<ManifestIndexManager> manifest_index_manager_; + // IndexVerificationManager *index_manager_; + std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_; + uint32_t index_; + bool download_complete_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h new file mode 100644 index 000000000..8975c2b4a --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/data_processing_events.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +namespace transport { +namespace protocol { + +class ContentObjectProcessingEventCallback { + public: + virtual ~ContentObjectProcessingEventCallback() = default; + virtual void onPacketDropped(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; +}; + +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc new file mode 100644 index 000000000..7b01ad4bc --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/datagram_reassembly.h> + +namespace transport { + +namespace protocol { + +DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket, + TransportProtocol* transport_protocol) + : Reassembly(icn_socket, transport_protocol) {} + +void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) { + read_buffer_ = content_object->getPayload(); + Reassembly::notifyApplication(); +} + +void DatagramReassembly::reInitialize() {} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h new file mode 100644 index 000000000..923b6f2c1 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class DatagramReassembly : public Reassembly { + public: + DatagramReassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + virtual void reInitialize() override; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override { + return; + } +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc new file mode 100644 index 000000000..c2249ed4a --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/errors.cc @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/errors.h> + +namespace transport { +namespace protocol { + +const std::error_category& protocol_category() { + static protocol_category_impl instance; + + return instance; +} + +const char* protocol_category_impl::name() const throw() { + return "transport::protocol::error"; +} + +std::string protocol_category_impl::message(int ev) const { + switch (static_cast<protocol_error>(ev)) { + case protocol_error::success: { + return "Success"; + } + case protocol_error::signature_verification_failed: { + return "Signature verification failed."; + } + case protocol_error::integrity_verification_failed: { + return "Integrity verification failed"; + } + case protocol_error::no_verifier_provided: { + return "Transport cannot get any verifier for the given data."; + } + case protocol_error::io_error: { + return "Conectivity error between transport and local forwarder"; + } + case protocol_error::max_retransmissions_error: { + return "Transport protocol reached max number of retransmissions allowed " + "for the same interest."; + } + case protocol_error::session_aborted: { + return "The session has been aborted by the application."; + } + default: { return "Unknown protocol error"; } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h new file mode 100644 index 000000000..cb3d3474e --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/errors.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace protocol { + +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& protocol_category(); + +/** + * The list of errors. + */ +enum class protocol_error { + success = 0, + signature_verification_failed, + integrity_verification_failed, + no_verifier_provided, + io_error, + max_retransmissions_error, + session_aborted, +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(protocol_error error) { + return std::error_code(static_cast<int>(error), protocol_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(protocol_error error) { + return std::error_condition(static_cast<int>(error), protocol_category()); +} + +/** + * @brief A server error category. + */ +class protocol_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace protocol +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::protocol::protocol_error> + : public std::true_type {}; +// } // namespace system +} // namespace std
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc new file mode 100644 index 000000000..5a8046daa --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/incremental_indexer.h> + +#include <hicn/transport/interfaces/socket_consumer.h> + +namespace transport { +namespace protocol { + +void IncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + using namespace interface; + + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { + final_suffix_ = content_object->getName().getSuffix(); + } + + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(interest), + std::move(content_object)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h index b6b8bb4a6..ea84d645a 100644 --- a/libtransport/src/hicn/transport/protocols/indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.h @@ -15,9 +15,11 @@ #pragma once +#include <hicn/transport/protocols/indexer.h> + #include <hicn/transport/errors/runtime_exception.h> #include <hicn/transport/errors/unexpected_manifest_exception.h> -#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/verification_manager.h> #include <hicn/transport/utils/literals.h> @@ -25,74 +27,59 @@ namespace transport { -namespace protocol { - -class IndexManager { - public: - static constexpr uint32_t invalid_index = ~0; - - /** - * - */ - virtual ~IndexManager() = default; - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - virtual uint32_t getNextSuffix() = 0; - - virtual void setFirstSuffix(uint32_t suffix) = 0; - - /** - * Retrive the next segment to be reassembled. - */ - virtual uint32_t getNextReassemblySegment() = 0; - - virtual bool isFinalSuffixDiscovered() = 0; - - virtual uint32_t getFinalSuffix() = 0; - - virtual void reset() = 0; -}; +namespace interface { +class ConsumerSocket; +} -class IndexVerificationManager : public IndexManager { - public: - /** - * - */ - virtual ~IndexVerificationManager() = default; +namespace protocol { - /** - * The ownership of the ContentObjectManifest is moved - * from the caller to the VerificationManager - */ - virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0; +class Reassembly; +class TransportProtocol; - /** - * The content object must just be verified; the ownership is still of the - * caller. - */ - virtual bool onContentObject(const core::ContentObject &content_object) = 0; -}; - -class IncrementalIndexManager : public IndexVerificationManager { +class IncrementalIndexer : public Indexer { public: - IncrementalIndexManager(interface::ConsumerSocket *icn_socket) + IncrementalIndexer(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) : socket_(icn_socket), + reassembly_(reassembly), + transport_protocol_(transport), final_suffix_(std::numeric_limits<uint32_t>::max()), + first_suffix_(0), next_download_suffix_(0), next_reassembly_suffix_(0), verification_manager_( - std::make_unique<SignatureVerificationManager>(icn_socket)) {} + std::make_unique<SignatureVerificationManager>(icn_socket)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } + + IncrementalIndexer(const IncrementalIndexer &) = delete; + + IncrementalIndexer(IncrementalIndexer &&other) + : socket_(other.socket_), + reassembly_(other.reassembly_), + transport_protocol_(other.transport_protocol_), + final_suffix_(other.final_suffix_), + first_suffix_(other.first_suffix_), + next_download_suffix_(other.next_download_suffix_), + next_reassembly_suffix_(other.next_reassembly_suffix_), + verification_manager_(std::move(other.verification_manager_)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } /** * */ - virtual ~IncrementalIndexManager() {} + virtual ~IncrementalIndexer() {} - TRANSPORT_ALWAYS_INLINE virtual void reset() override { + TRANSPORT_ALWAYS_INLINE virtual void reset( + std::uint32_t offset = 0) override { final_suffix_ = std::numeric_limits<uint32_t>::max(); - next_download_suffix_ = first_suffix_; - next_reassembly_suffix_ = 0; + next_download_suffix_ = offset; + next_reassembly_suffix_ = offset; } /** @@ -125,24 +112,21 @@ class IncrementalIndexManager : public IndexVerificationManager { return final_suffix_; } - TRANSPORT_ALWAYS_INLINE bool onManifest( - core::ContentObject::Ptr &&content_object) override { - throw errors::UnexpectedManifestException(); - } + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; - TRANSPORT_ALWAYS_INLINE bool onContentObject( - const core::ContentObject &content_object) override { - auto ret = verification_manager_->onPacketToVerify(content_object); + TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) { + reassembly_ = reassembly; - if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) { - final_suffix_ = content_object.getName().getSuffix(); + if (reassembly_) { + reassembly_->setIndexer(this); } - - return ret; } protected: interface::ConsumerSocket *socket_; + Reassembly *reassembly_; + TransportProtocol *transport_protocol_; uint32_t final_suffix_; uint32_t first_suffix_; uint32_t next_download_suffix_; diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc new file mode 100644 index 000000000..c50c4236b --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/indexer.cc @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/indexer.h> + +#include <hicn/transport/protocols/incremental_indexer.h> +#include <hicn/transport/protocols/manifest_incremental_indexer.h> +#include <hicn/transport/protocols/protocol.h> +#include <hicn/transport/utils/branch_prediction.h> + +namespace transport { +namespace protocol { + +IndexManager::IndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) + : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport, + reassembly)), + first_segment_received_(false), + icn_socket_(icn_socket), + transport_(transport), + reassembly_(reassembly) {} + +void IndexManager::onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) { + if (first_segment_received_) { + indexer_->onContentObject(std::move(interest), std::move(content_object)); + } else { + std::uint32_t segment_number = interest->getName().getSuffix(); + + if (segment_number == 0) { + // Check if manifest + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + IncrementalIndexer *indexer = + static_cast<IncrementalIndexer *>(indexer_.release()); + indexer_ = + std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer)); + delete indexer; + } + + indexer_->onContentObject(std::move(interest), std::move(content_object)); + auto it = interest_data_set_.begin(); + while (it != interest_data_set_.end()) { + indexer_->onContentObject(std::move(const_cast<core::Interest::Ptr &&>(it->first)), std::move(const_cast<core::ContentObject::Ptr &&>(it->second))); + it = interest_data_set_.erase(it); + } + + first_segment_received_ = true; + } else { + interest_data_set_.emplace(std::move(interest), std::move(content_object)); + } + } +} + +void IndexManager::reset(std::uint32_t offset) { + indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_, + reassembly_); + first_segment_received_ = false; + interest_data_set_.clear(); +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h new file mode 100644 index 000000000..89751095e --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/indexer.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +#include <set> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +class Reassembly; +class TransportProtocol; + +class Indexer { + public: + /** + * + */ + virtual ~Indexer() = default; + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + virtual uint32_t getNextSuffix() = 0; + + virtual void setFirstSuffix(uint32_t suffix) = 0; + + /** + * Retrive the next segment to be reassembled. + */ + virtual uint32_t getNextReassemblySegment() = 0; + + virtual bool isFinalSuffixDiscovered() = 0; + + virtual uint32_t getFinalSuffix() = 0; + + virtual void reset(std::uint32_t offset = 0) = 0; + + virtual void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) = 0; +}; + +class IndexManager : Indexer { + public: + static constexpr uint32_t invalid_index = ~0; + + IndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly); + + uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); } + + void setFirstSuffix(uint32_t suffix) override { + indexer_->setFirstSuffix(suffix); + } + + uint32_t getNextReassemblySegment() override { + return indexer_->getNextReassemblySegment(); + } + + bool isFinalSuffixDiscovered() override { + return indexer_->isFinalSuffixDiscovered(); + } + + uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); } + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + private: + std::unique_ptr<Indexer> indexer_; + bool first_segment_received_; + std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + interest_data_set_; + interface::ConsumerSocket *icn_socket_; + TransportProtocol *transport_; + Reassembly *reassembly_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc new file mode 100644 index 000000000..592daa4d4 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/manifest_incremental_indexer.h> + +#include <cmath> +#include <deque> + +namespace transport { + +namespace protocol { + +using namespace interface; + +ManifestIncrementalIndexer::ManifestIncrementalIndexer( + interface::ConsumerSocket *icn_socket, TransportProtocol *transport, + Reassembly *reassembly) + : IncrementalIndexer(icn_socket, transport, reassembly), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + NextSegmentCalculationStrategy::INCREMENTAL, + next_download_suffix_, 0)) {} + +void ManifestIncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + // Check if mainfiest or not + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + onUntrustedManifest(std::move(interest), std::move(content_object)); + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + onUntrustedContentObject(std::move(interest), std::move(content_object)); + } +} + +void ManifestIncrementalIndexer::onUntrustedManifest( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + processTrustedManifest(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +void ManifestIncrementalIndexer::processTrustedManifest( + ContentObject::Ptr &&content_object) { + auto manifest = + std::make_unique<ContentObjectManifest>(std::move(*content_object)); + manifest->decode(); + + if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != + core::ManifestVersion::VERSION_1)) { + throw errors::RuntimeException("Received manifest with unknown version."); + } + + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = manifest->getSuffixList().end(); + + suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); + + for (; _it != _end; _it++) { + auto hash = + std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + + if (!checkUnverifiedSegments(_it->first, hash)) { + suffix_hash_map_[_it->first] = std::move(hash); + } + } + + reassembly_->reassemble(std::move(manifest)); + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } +} + +bool ManifestIncrementalIndexer::checkUnverifiedSegments( + std::uint32_t suffix, const HashEntry &hash) { + auto it = unverified_segments_.find(suffix); + + if (it != unverified_segments_.end()) { + auto ret = verifyContentObject(hash, *it->second.second); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(it->second.second)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(it->second.first), + std::move(it->second.second)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + + unverified_segments_.erase(it); + return true; + } + + return false; +} + +VerificationPolicy ManifestIncrementalIndexer::verifyContentObject( + const HashEntry &manifest_hash, const ContentObject &content_object) { + VerificationPolicy ret; + + auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second); + auto data_packet_digest = content_object.computeDigest(manifest_hash.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest<uint8_t>().data(); + const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first; + + if (utils::CryptoHash::compareBinaryDigest( + data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) { + ret = VerificationPolicy::ACCEPT_PACKET; + } else { + ConsumerContentObjectVerificationFailedCallback + *verification_failed_callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + ret = (*verification_failed_callback)( + *socket_, content_object, + make_error_code(protocol_error::integrity_verification_failed)); + } + + return ret; +} + +void ManifestIncrementalIndexer::onUntrustedContentObject( + Interest::Ptr &&i, ContentObject::Ptr &&c) { + auto suffix = c->getName().getSuffix(); + auto it = suffix_hash_map_.find(suffix); + + if (it != suffix_hash_map_.end()) { + auto ret = verifyContentObject(it->second, *c); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + suffix_hash_map_.erase(it); + reassembly_->reassemble(std::move(c)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(i), std::move(c)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + } else { + unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c)); + } +} + +uint32_t ManifestIncrementalIndexer::getNextSuffix() { + auto ret = suffix_strategy_->getNextSuffix(); + + if (ret <= suffix_strategy_->getFinalSuffix() && + ret != utils::SuffixStrategy::INVALID_SUFFIX) { + suffix_queue_.push(ret); + return ret; + } + + return IndexManager::invalid_index; +} + +uint32_t ManifestIncrementalIndexer::getFinalSuffix() { + return suffix_strategy_->getFinalSuffix(); +} + +bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() { + return IncrementalIndexer::isFinalSuffixDiscovered(); +} + +uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { + if (suffix_queue_.empty()) { + return IndexManager::invalid_index; + } + + auto ret = suffix_queue_.front(); + suffix_queue_.pop(); + return ret; +} + +void ManifestIncrementalIndexer::reset(std::uint32_t offset) { + IncrementalIndexer::reset(offset); + suffix_hash_map_.clear(); + unverified_segments_.clear(); + SuffixQueue empty; + std::swap(suffix_queue_, empty); + suffix_strategy_->reset(offset); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h new file mode 100644 index 000000000..6e991f86f --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket.h> +#include <hicn/transport/protocols/incremental_indexer.h> +#include <hicn/transport/utils/suffix_strategy.h> + +#include <list> + +namespace transport { + +namespace protocol { + +class ManifestIncrementalIndexer : public IncrementalIndexer { + static constexpr double alpha = 0.3; + + public: + using SuffixQueue = std::queue<uint32_t>; + using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; + + ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly); + + ManifestIncrementalIndexer(IncrementalIndexer &&indexer) + : IncrementalIndexer(std::move(indexer)), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + core::NextSegmentCalculationStrategy::INCREMENTAL, + next_download_suffix_, 0)) { + for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) { + suffix_queue_.push(i); + } + } + + virtual ~ManifestIncrementalIndexer() = default; + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + uint32_t getNextSuffix() override; + + uint32_t getNextReassemblySegment() override; + + bool isFinalSuffixDiscovered() override; + + uint32_t getFinalSuffix() override; + + private: + void onUntrustedManifest(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void onUntrustedContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void processTrustedManifest(core::ContentObject::Ptr &&content_object); + void onManifestReceived(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c); + void onManifestTimeout(core::Interest::Ptr &&i); + VerificationPolicy verifyContentObject( + const HashEntry &manifest_hash, + const core::ContentObject &content_object); + bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash); + + protected: + std::unique_ptr<utils::SuffixStrategy> suffix_strategy_; + SuffixQueue suffix_queue_; + + // Hash verification + std::unordered_map<uint32_t, HashEntry> suffix_hash_map_; + + std::unordered_map<uint32_t, + std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + unverified_segments_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc deleted file mode 100644 index ea13bf9e6..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> - -#include <cmath> -#include <deque> - -namespace transport { - -namespace protocol { - -using namespace interface; - -ManifestIndexManager::ManifestIndexManager( - interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) - : IncrementalIndexManager(icn_socket), - PacketManager<Interest>(1024), - next_to_retrieve_segment_(suffix_queue_.end()), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - next_reassembly_segment_( - core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true), - ignored_segments_(), - next_interest_(next_interest) {} - -bool ManifestIndexManager::onManifest( - core::ContentObject::Ptr &&content_object) { - auto manifest = - std::make_unique<ContentObjectManifest>(std::move(*content_object)); - bool manifest_verified = verification_manager_->onPacketToVerify(*manifest); - - if (manifest_verified) { - manifest->decode(); - - if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != - core::ManifestVersion::VERSION_1)) { - throw errors::RuntimeException("Received manifest with unknown version."); - } - - switch (manifest->getManifestType()) { - case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = manifest->getSuffixList().end(); - size_t nb_segments = std::distance(_it, _end); - final_suffix_ = manifest->getFinalBlockNumber(); // final block number - - suffix_hash_map_[_it->first] = - std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - - // If the transport protocol finished the list of segments to retrieve, - // reset the next_to_retrieve_segment_ iterator to the next segment - // provided by this manifest. - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - next_to_retrieve_segment_ = --suffix_queue_.end(); - } - - std::advance(_it, 1); - for (; _it != _end; _it++) { - suffix_hash_map_[_it->first] = std::make_pair( - std::vector<uint8_t>(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - } - - if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { - core::NextSegmentCalculationStrategy strategy = - manifest->getNextSegmentCalculationStrategy(); - - suffix_manifest_.reset(0); - suffix_manifest_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - TRANSPORT_LOGD("Capacity of 1st manifest %zu", - suffix_manifest_.getNbSegments()); - - next_reassembly_segment_.reset(*suffix_queue_.begin()); - next_reassembly_segment_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - } - - // If the manifest is not full, we add the suffixes of missing segments - // to the list of segments to ignore when computing the next reassembly - // index. - if (TRANSPORT_EXPECT_FALSE( - suffix_manifest_.getNbSegments() - nb_segments > 0)) { - auto start = manifest->getSuffixList().begin(); - auto last = --_end; - for (uint32_t i = last->first + 1; - i < start->first + suffix_manifest_.getNbSegments(); i++) { - ignored_segments_.push_back(i); - } - } - - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) { - fillWindow(manifest->getWritableName(), - manifest->getName().getSuffix()); - } - - break; - } - case core::ManifestType::FLIC_MANIFEST: { - throw errors::NotImplementedException(); - } - case core::ManifestType::FINAL_CHUNK_NUMBER: { - throw errors::NotImplementedException(); - } - } - } - - return manifest_verified; -} - -void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, - ContentObject::Ptr &&c) { - onManifest(std::move(c)); - if (next_interest_) { - next_interest_->scheduleNextInterests(); - } -} - -void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { - const Name &n = i->getName(); - uint32_t segment = n.getSuffix(); - - if (segment > final_suffix_) { - return; - } - - // Get portal - std::shared_ptr<interface::BasePortal> portal; - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - - // Send requests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(i), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); -} - -void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) { - /* Send as many manifest as required for filling window. */ - uint32_t interest_lifetime; - double window_size; - std::shared_ptr<interface::BasePortal> portal; - Interest::Ptr interest; - uint32_t current_segment = *next_to_retrieve_segment_; - // suffix_manifest_ now points to the next manifest to request - uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix(); - - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - window_size); - - if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - if (current_segment + window_size < suffix_manifest_.getSuffix() && - current_manifest != last_requested_manifest) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - do { - interest = getPacket(); - name.setSuffix(suffix_manifest_.getSuffix()); - interest->setName(name); - interest->setLifetime(interest_lifetime); - - // Send interests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(interest), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); - - last_requested_manifest = (suffix_manifest_++).getSuffix(); - } while (current_segment + window_size >= suffix_manifest_.getSuffix() && - suffix_manifest_.getSuffix() < final_suffix_); - - // suffix_manifest_ now points to the last requested manifest - suffix_manifest_.updateSuffix(last_requested_manifest); -} - -bool ManifestIndexManager::onContentObject( - const core::ContentObject &content_object) { - bool verify_signature; - socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - - if (!verify_signature) { - return true; - } - - uint64_t segment = content_object.getName().getSuffix(); - - bool ret = false; - - auto it = suffix_hash_map_.find((const unsigned int)segment); - if (it != suffix_hash_map_.end()) { - auto hash_type = static_cast<utils::CryptoHashType>(it->second.second); - auto data_packet_digest = content_object.computeDigest(it->second.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest<uint8_t>().data(); - std::vector<uint8_t> &manifest_digest_bytes = it->second.first; - - if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, - manifest_digest_bytes.data(), - hash_type)) { - suffix_hash_map_.erase(it); - ret = true; - } else { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - } - - return ret; -} - -uint32_t ManifestIndexManager::getNextSuffix() { - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - return invalid_index; - } - - return *next_to_retrieve_segment_++; -} - -uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; } - -bool ManifestIndexManager::isFinalSuffixDiscovered() { - return IncrementalIndexManager::isFinalSuffixDiscovered(); -} - -uint32_t ManifestIndexManager::getNextReassemblySegment() { - uint32_t current_reassembly_segment; - - while (true) { - current_reassembly_segment = next_reassembly_segment_.getSuffix(); - next_reassembly_segment_++; - - if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) { - return invalid_index; - } - - if (ignored_segments_.empty()) break; - - auto is_ignored = - std::find(ignored_segments_.begin(), ignored_segments_.end(), - current_reassembly_segment); - - if (is_ignored == ignored_segments_.end()) break; - - ignored_segments_.erase(is_ignored); - } - - return current_reassembly_segment; -} - -void ManifestIndexManager::reset() { - IncrementalIndexManager::reset(); - suffix_manifest_.reset(0); - suffix_queue_.clear(); - suffix_hash_map_.clear(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h deleted file mode 100644 index 645b20e9a..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket.h> -#include <hicn/transport/protocols/indexing_manager.h> -#include <hicn/transport/utils/suffix_strategy.h> - -#include <list> - -namespace transport { - -namespace protocol { - -class ManifestIndexManager : public IncrementalIndexManager, - public PacketManager<Interest> { - static constexpr double alpha = 0.3; - - public: - using SuffixQueue = std::list<uint32_t>; - using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; - - ManifestIndexManager(interface::ConsumerSocket *icn_socket, - TransportProtocol *next_interest); - - virtual ~ManifestIndexManager() = default; - - void reset() override; - - bool onManifest(core::ContentObject::Ptr &&content_object) override; - - bool onContentObject(const core::ContentObject &content_object) override; - - uint32_t getNextSuffix() override; - - uint32_t getNextReassemblySegment() override; - - bool isFinalSuffixDiscovered() override; - - uint32_t getFinalSuffix() override; - - private: - void onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c); - void onManifestTimeout(Interest::Ptr &&i); - void fillWindow(Name &name, uint32_t current_manifest); - - protected: - SuffixQueue suffix_queue_; - SuffixQueue::iterator next_to_retrieve_segment_; - utils::SuffixManifest suffix_manifest_; - utils::SuffixContent next_reassembly_segment_; - - // Holds segments that should not be requested. Useful when - // computing the next reassembly segment because some manifests - // may be incomplete. - std::vector<uint32_t> ignored_segments_; - - // Hash verification - std::unordered_map<uint32_t, - std::pair<std::vector<uint8_t>, core::HashAlgorithm>> - suffix_hash_map_; - - // (temporary) To call scheduleNextInterests() after receiving a manifest - TransportProtocol *next_interest_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc index 8da9529d6..a0f847453 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -22,9 +22,16 @@ namespace protocol { using namespace interface; -TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket) - : socket_(icn_socket), is_running_(false), is_first_(false) { +TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol) + : socket_(icn_socket), + reassembly_protocol_(reassembly_protocol), + index_manager_( + std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), + is_running_(false), + is_first_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); } int TransportProtocol::start() { @@ -71,6 +78,26 @@ void TransportProtocol::resume() { is_running_ = false; } +void TransportProtocol::onContentReassembled(std::error_code ec) { + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (!on_payload) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_->getBytesRecv()); + } else { + on_payload->readError(ec); + } + + stop(); +} + } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index e4821b6a0..87fab588b 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -18,7 +18,10 @@ #include <atomic> #include <hicn/transport/interfaces/socket.h> +#include <hicn/transport/protocols/data_processing_events.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/packet_manager.h> +#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/statistics.h> #include <hicn/transport/utils/object_pool.h> @@ -28,6 +31,8 @@ namespace protocol { using namespace core; +class IndexVerificationManager; + class TransportProtocolCallback { virtual void onContentObject(const core::Interest &interest, const core::ContentObject &content_object) = 0; @@ -35,11 +40,15 @@ class TransportProtocolCallback { }; class TransportProtocol : public interface::BasePortal::ConsumerCallback, - public PacketManager<Interest> { + public PacketManager<Interest>, + public ContentObjectProcessingEventCallback { static constexpr std::size_t interest_pool_size = 4096; + friend class ManifestIndexManager; + public: - TransportProtocol(interface::ConsumerSocket *icn_socket); + TransportProtocol(interface::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol); virtual ~TransportProtocol() = default; @@ -53,6 +62,12 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, virtual void scheduleNextInterests() = 0; + // Events generated by the indexing + virtual void onContentReassembled(std::error_code ec); + virtual void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; + protected: // Consumer Callback virtual void reset() = 0; @@ -61,13 +76,14 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, protected: interface::ConsumerSocket *socket_; + std::unique_ptr<Reassembly> reassembly_protocol_; + std::unique_ptr<IndexManager> index_manager_; std::shared_ptr<interface::BasePortal> portal_; std::atomic<bool> is_running_; // True if it si the first time we schedule an interest std::atomic<bool> is_first_; - TransportStatistics stats_; + TransportStatistics *stats_; }; } // end namespace protocol - } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index a57eb7cd9..641ae45c3 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -14,8 +14,9 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/raaqm.h> +#include <hicn/transport/protocols/errors.h> #include <cstdlib> #include <fstream> @@ -26,9 +27,8 @@ namespace protocol { using namespace interface; -RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), - BaseReassembly(icnet_socket, this, this), +RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), @@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() { // Set first segment to retrieve core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->reset(); index_manager_->setFirstSuffix(name->getSuffix()); std::queue<Interest::Ptr> empty; std::swap(interest_to_retransmit_, empty); - stats_.reset(); + stats_->reset(); // Reset reassembly component - BaseReassembly::reset(); + reassembly_protocol_->reInitialize(); // Reset protocol variables interests_in_flight_ = 0; @@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() { void RaaqmTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - // Check whether makes sense to continue if (TRANSPORT_EXPECT_FALSE(!is_running_)) { return; @@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject( (*callback_interest)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == - PayloadType::MANIFEST)) { - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - index_manager_ = manifest_index_manager_.get(); - interests_in_flight_--; - } - - index_manager_->onManifest(std::move(content_object)); - - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - onContentSegment(std::move(interest), std::move(content_object)); + if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + stats_->updateBytesRecv(content_object->payloadSize()); } + onContentSegment(std::move(interest), std::move(content_object)); scheduleNextInterests(); } void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { uint32_t incremental_suffix = content_object->getName().getSuffix(); - bool virtual_download = false; - socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download); // Decrease in-flight interests interests_in_flight_--; @@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment( afterContentReception(*interest, *content_object); } - if (index_manager_->onContentObject(*content_object)) { - stats_.updateBytesRecv(content_object->payloadSize()); + index_manager_->onContentObject(std::move(interest), + std::move(content_object)); +} - if (!virtual_download) { - reassemble(std::move(content_object)); - } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == - index_manager_->getFinalSuffix())) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); +void RaaqmTransportProtocol::onPacketDropped( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - if (on_payload) { - on_payload->readSuccess(stats_.getBytesRecv()); - } + uint64_t segment = interest->getName().getSuffix(); + ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_->updateRetxCount(1); + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); } + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); } else { - // TODO Application policy check - // unverified_segments_.emplace( - // std::make_pair(incremental_suffix, std::move(content_object))); - TRANSPORT_LOGE("Received not trusted segment."); + TRANSPORT_LOGE( + "Stop: received not trusted packet %llu times", + (unsigned long long)interest_retransmissions_[segment & mask]); + onContentReassembled( + make_error_code(protocol_error::max_retransmissions_error)); } } +void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { + +} + void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { checkForStalePaths(); @@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint64_t segment = n.getSuffix(); // Do not retransmit interests asking contents that do not exist. - if (segment >= index_manager_->getFinalSuffix()) { + if (segment > index_manager_->getFinalSuffix()) { return; } @@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { - stats_.updateRetxCount(1); + stats_->updateRetxCount(1); callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, @@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before " - "starting " - "the content retrieval."); - } - - if (!ec) { - on_payload->readSuccess(stats_.getBytesRecv()); - } else { - on_payload->readError(ec); - } - rate_estimator_->onDownloadFinished(); - stop(); + TransportProtocol::onContentReassembled(ec); } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { @@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() { // Change drop probability according to RTT statistics cur_path_->updateDropProb(); - double coin = ((double) rand() / (RAND_MAX)); + double coin = ((double)rand() / (RAND_MAX)); if (coin <= cur_path_->getDropProb()) { decreaseWindow(); } @@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() { void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, utils::TimePoint &now) { // Update RTT statistics - stats_.updateAverageRtt(rtt); - stats_.updateAverageWindowSize(current_window_size_); + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback ConsumerTimerCallback *stats_callback = VOID_HANDLER; @@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, timer_interval_milliseconds); if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_, stats_); + (*stats_callback)(*socket_, *stats_); t0_ = utils::SteadyClock::now(); } } diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h index 09d22cd4f..7fc540c9f 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.h +++ b/libtransport/src/hicn/transport/protocols/raaqm.h @@ -15,11 +15,11 @@ #pragma once +#include <hicn/transport/protocols/byte_stream_reassembly.h> #include <hicn/transport/protocols/congestion_window_protocol.h> #include <hicn/transport/protocols/protocol.h> #include <hicn/transport/protocols/raaqm_data_path.h> #include <hicn/transport/protocols/rate_estimation.h> -#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <queue> @@ -29,11 +29,8 @@ namespace transport { namespace protocol { -class RaaqmTransportProtocol - : public TransportProtocol, - public BaseReassembly, - public CWindowProtocol, - public BaseReassembly::ContentReassembledCallback { +class RaaqmTransportProtocol : public TransportProtocol, + public CWindowProtocol { public: RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket); @@ -70,6 +67,11 @@ class RaaqmTransportProtocol void onContentSegment(Interest::Ptr &&interest, ContentObject::Ptr &&content_object); + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override; + + void onReassemblyFailed(std::uint32_t missing_segment) override; + void onTimeout(Interest::Ptr &&i) override; virtual void scheduleNextInterests() override; diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index c45d876a0..9682d338d 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -14,7 +14,8 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/indexing_manager.h> +#include <hicn/transport/protocols/errors.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/utils/array.h> #include <hicn/transport/utils/membuf.h> @@ -23,66 +24,7 @@ namespace transport { namespace protocol { -BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback, - TransportProtocol *next_interest) - : reassembly_consumer_socket_(icn_socket), - incremental_index_manager_( - std::make_unique<IncrementalIndexManager>(icn_socket)), - manifest_index_manager_( - std::make_unique<ManifestIndexManager>(icn_socket, next_interest)), - index_manager_(incremental_index_manager_.get()), - index_(0), - read_buffer_(nullptr) { - setContentCallback(content_callback); -} - -void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { - received_packets_.emplace(std::make_pair( - content_object->getName().getSuffix(), std::move(content_object))); - } - - auto it = received_packets_.find((const unsigned int)index_); - while (it != received_packets_.end()) { - if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { - copyContent(*it->second); - received_packets_.erase(it); - } - - index_ = index_manager_->getNextReassemblySegment(); - it = received_packets_.find((const unsigned int)index_); - } -} - -void BaseReassembly::copyContent(const ContentObject &content_object) { - auto a = content_object.getPayload(); - auto payload_length = a->length(); - auto write_size = std::min(payload_length, read_buffer_->tailroom()); - auto additional_bytes = payload_length > read_buffer_->tailroom() - ? payload_length - read_buffer_->tailroom() - : 0; - - std::memcpy(read_buffer_->writableTail(), a->data(), write_size); - read_buffer_->append(write_size); - - if (!read_buffer_->tailroom()) { - notifyApplication(); - std::memcpy(read_buffer_->writableTail(), a->data() + write_size, - additional_bytes); - read_buffer_->append(additional_bytes); - } - - bool download_completed = - index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); - - if (TRANSPORT_EXPECT_FALSE(download_completed)) { - notifyApplication(); - content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); - } -} - -void BaseReassembly::notifyApplication() { +void Reassembly::notifyApplication() { interface::ConsumerSocket::ReadCallback *read_callback = nullptr; reassembly_consumer_socket_->getSocketOption( interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); @@ -124,21 +66,5 @@ void BaseReassembly::notifyApplication() { } } -void BaseReassembly::reset() { - manifest_index_manager_->reset(); - incremental_index_manager_->reset(); - index_ = index_manager_->getNextReassemblySegment(); - - received_packets_.clear(); - - // reset read buffer - interface::ConsumerSocket::ReadCallback *read_callback; - reassembly_consumer_socket_->getSocketOption( - interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); - - read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); -} - } // namespace protocol - } // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index e859ca294..34af2a70a 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -15,17 +15,20 @@ #pragma once -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> +#include <hicn/transport/core/facade.h> namespace transport { namespace interface { class ConsumerReadCallback; -} +class ConsumerSocket; +} // namespace interface namespace protocol { +class TransportProtocol; +class Indexer; + // Forward Declaration class ManifestManager; @@ -36,41 +39,26 @@ class Reassembly { virtual void onContentReassembled(std::error_code ec) = 0; }; - virtual void reassemble(ContentObject::Ptr &&content_object) = 0; - virtual void reset() = 0; - virtual void setContentCallback(ContentReassembledCallback *callback) { - content_callback_ = callback; - } + Reassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : reassembly_consumer_socket_(icn_socket), + transport_protocol_(transport_protocol) {} - protected: - ContentReassembledCallback *content_callback_; -}; + virtual ~Reassembly() = default; -class BaseReassembly : public Reassembly { - public: - BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback, - TransportProtocol *next_interest); + virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0; + virtual void reInitialize() = 0; + virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; } protected: - virtual void reassemble(ContentObject::Ptr &&content_object) override; - - virtual void copyContent(const ContentObject &content_object); - - virtual void reset() override; - - private: - void notifyApplication(); + virtual void notifyApplication(); protected: - // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; - std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; - std::unique_ptr<ManifestIndexManager> manifest_index_manager_; - IndexVerificationManager *index_manager_; - std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_; - - uint32_t index_; + TransportProtocol *transport_protocol_; + Indexer *index_manager_; std::unique_ptr<utils::MemBuf> read_buffer_; }; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 559e86592..e371217f8 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -13,11 +13,12 @@ * limitations under the License. */ -#include <math.h> -#include <random> +#include <hicn/transport/protocols/rtc.h> #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/rtc.h> + +#include <math.h> +#include <random> namespace transport { @@ -26,14 +27,16 @@ namespace protocol { using namespace interface; RTCTransportProtocol::RTCTransportProtocol( - interface::ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), + interface::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { - icnet_socket->getSocketOption(PORTAL, portal_); + icn_socket->getSocketOption(PORTAL, portal_); rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); - sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + sentinel_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); reset(); } @@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) { } void RTCTransportProtocol::newRound() { - round_timer_->expires_from_now(std::chrono::milliseconds( - HICN_ROUND_LEN)); + round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); round_timer_->async_wait([this](std::error_code ec) { if (ec) return; updateStats(HICN_ROUND_LEN); @@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { &stats_callback); if (*stats_callback) { // Send the stats to the app - stats_.updateQueuingDelay(queuingDelay_); - stats_.updateLossRatio(lossRate_); - stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_, stats_); + stats_->updateQueuingDelay(queuingDelay_); + stats_->updateLossRatio(lossRate_); + stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); + (*stats_callback)(*socket_, *stats_); } // bound also by interest lifitime* production rate @@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { updateCCState(); updateWindow(); - if(queuingDelay_ > 25.0){ - //this indicates that the client will go soon out of synch, - //switch to synch mode + if (queuingDelay_ > 25.0) { + // this indicates that the client will go soon out of synch, + // switch to synch mode if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, maxCWin_ = min(maxWaintingInterest, maxCWin_); } - if(maxCWin_ < HICN_MIN_CWIN) - maxCWin_ = HICN_MIN_CWIN; + if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; } void RTCTransportProtocol::updateWindow() { @@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() { } } -void RTCTransportProtocol::sentinelTimer(){ +void RTCTransportProtocol::sentinelTimer() { uint32_t wait = 50; - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ - //we have all the info to set the timers + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { + // we have all the info to set the timers wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); - if(wait == 0) - wait = 1; + if (wait == 0) wait = 1; } sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); sentinel_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){ - //we have no info, so we send again - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ - uint32_t pkt = it->first & modMask_; - if (inflightInterests_[pkt].sequence == it->first) { - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); - } + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { + // we have no info, so we send again + + for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); + it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } - }else{ - uint64_t max_waiting_time = //wait at least 50ms - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); + } + } else { + uint64_t max_waiting_time = // wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); - if((currentState_ == HICN_RTC_NORMAL_STATE) && + if ((currentState_ == HICN_RTC_NORMAL_STATE) && (inflightInterestsCount_ >= currentCWin_) && - ((now - lastEvent_) > max_waiting_time) && - (lossRate_ >= 0.05)){ + ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ + for (auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++) { uint32_t pkt = it->first & modMask_; if (inflightInterests_[pkt].sequence == it->first && - ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){ - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } } } @@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { // and over until we get at least a packet inflightInterestsCount_--; lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); scheduleNextInterests(); return; @@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } @@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject( return; } - //check if the packet is a rtx + // check if the packet is a rtx bool is_rtx = false; - if(interestRetransmissions_.find(segmentNumber) != - interestRetransmissions_.end()){ + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) { is_rtx = true; - }else{ + } else { auto it_win = packets_in_window_.find(segmentNumber); - if(it_win != packets_in_window_.end() && - it_win->second != 0) - is_rtx = true; + if (it_win != packets_in_window_.end() && it_win->second != 0) + is_rtx = true; } if (payload_size == HICN_NACK_HEADER_SIZE) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } bool old_nack = false; - if (!is_rtx){ + if (!is_rtx) { // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); @@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject( // the nacked_ state is used only to avoid to decrease // inflightInterestsCount_ multiple times. In fact, every time that we // receive an event related to an interest (timeout, nacked, content) we - // cange the state. In this way we are sure that we do not decrease twice the - // counter + // cange the state. In this way we are sure that we do not decrease twice + // the counter if (old_nack) { inflightInterests_[pkt].state = lost_; interestRetransmissions_.erase(segmentNumber); @@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject( if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; // packet sent without timeouts } - if (inflightInterests_[pkt].state == sent_ && !is_rtx){ + if (inflightInterests_[pkt].state == sent_ && !is_rtx) { // delay stats are computed only for non retransmitted data updateDelayStats(*content_object); } @@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject( scheduleNextInterests(); } -void RTCTransportProtocol::returnContentToApplication( - const ContentObject &content_object) { - // return content to the user - auto read_buffer = content_object.getPayload(); - - read_buffer->trimStart(HICN_TIMESTAMP_SIZE); - - interface::ConsumerSocket::ReadCallback *read_callback = nullptr; - socket_->getSocketOption(READ_CALLBACK, &read_callback); - - if (read_callback == nullptr) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before starting " - "the content retrieval."); - } - - if (read_callback->isBufferMovable()) { - read_callback->readBufferAvailable( - utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); - } else { - // The buffer will be copied into the application-provided buffer - uint8_t *buffer; - std::size_t length; - std::size_t total_length = read_buffer->length(); - - while (read_buffer->length()) { - buffer = nullptr; - length = 0; - read_callback->getReadBuffer(&buffer, &length); - - if (!buffer || !length) { - throw errors::RuntimeException( - "Invalid buffer provided by the application."); - } - - auto to_copy = std::min(read_buffer->length(), length); - - std::memcpy(buffer, read_buffer->data(), to_copy); - read_buffer->trimStart(to_copy); - } - - read_callback->readDataAvailable(total_length); - read_buffer->clear(); - } -} - } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 2b9ed10a6..9e1731e96 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -15,12 +15,12 @@ #pragma once -#include <queue> #include <map> +#include <queue> #include <unordered_map> +#include <hicn/transport/protocols/datagram_reassembly.h> #include <hicn/transport/protocols/protocol.h> -#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/rtc_data_path.h> // algorithm state @@ -35,26 +35,27 @@ #define HICN_TIMESTAMP_SIZE 8 // bytes #define HICN_RTC_INTEREST_LIFETIME 1000 // ms -//rtt measurement -//normal interests for data goes from 0 to -//HICN_MIN_PROBE_SEQ, the rest is reserverd for -//probes +// rtt measurement +// normal interests for data goes from 0 to +// HICN_MIN_PROBE_SEQ, the rest is reserverd for +// probes #define HICN_MIN_PROBE_SEQ 0xefffffff #define HICN_MAX_PROBE_SEQ 0xffffffff // controller constant -#define HICN_ROUND_LEN 200 // ms interval of time on which - // we take decisions / measurements +#define HICN_ROUND_LEN \ + 200 // ms interval of time on which + // we take decisions / measurements #define HICN_MAX_RTX 10 #define HICN_MAX_RTX_SIZE 1024 #define HICN_MAX_RTX_MAX_AGE 10000 -#define HICN_MIN_RTT_WIN 30 // rounds -#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms +#define HICN_MIN_RTT_WIN 30 // rounds +#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms // cwin #define HICN_INITIAL_CWIN 1 // packets #define HICN_INITIAL_CWIN_MAX 100000 // packets -#define HICN_MIN_CWIN 10 // packets +#define HICN_MIN_CWIN 10 // packets #define HICN_WIN_INCREASE_FACTOR 1.5 #define HICN_WIN_DECREASE_FACTOR 0.9 @@ -70,30 +71,23 @@ #define HICN_MICRO_IN_A_SEC 1000000 #define HICN_MILLI_IN_A_SEC 1000 - namespace transport { namespace protocol { -enum packetState { - sent_, - nacked_, - received_, - timeout1_, - timeout2_, - lost_ -}; +enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ }; typedef enum packetState packetState_t; struct sentInterest { uint64_t transmissionTime; - uint32_t sequence; //sequence number of the interest sent - //to handle seq % buffer_size - packetState_t state; //see packet state + uint32_t sequence; // sequence number of the interest sent + // to handle seq % buffer_size + packetState_t state; // see packet state }; -class RTCTransportProtocol : public TransportProtocol, public Reassembly { +class RTCTransportProtocol : public TransportProtocol, + public DatagramReassembly { public: RTCTransportProtocol(interface::ConsumerSocket *icnet_socket); @@ -133,11 +127,16 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { bool onNack(const ContentObject &content_object, bool rtx); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; - void returnContentToApplication(const ContentObject &content_object); + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} TRANSPORT_ALWAYS_INLINE virtual void reassemble( ContentObject::Ptr &&content_object) override { - returnContentToApplication(*content_object); + auto read_buffer = content_object->getPayload(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); } // controller var @@ -151,36 +150,36 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // names/packets var uint32_t actualSegment_; uint32_t inflightInterestsCount_; - //map seq to rtx + // map seq to rtx std::map<uint32_t, uint8_t> interestRetransmissions_; bool rtx_timer_used_; std::unique_ptr<asio::steady_timer> rtx_timer_; std::vector<sentInterest> inflightInterests_; - uint32_t lastSegNacked_; //indicates the segment id in the last received - // past Nack. we do not ask for retransmissions - //for samething that is older than this value. - uint32_t lastReceived_; //segment of the last content object received - //indicates the base of the window on the client - uint64_t lastReceivedTime_; //time at which we recevied the - //lastReceived_ packet - - //sentinel - //if all packets in the window get lost we need something that - //wakes up our consumer socket. Interest timeouts set to 1 sec - //expire too late. This timers expire much sooner and if it - //detects that all the interest in the window may be lost - //it sends all of them again + uint32_t lastSegNacked_; // indicates the segment id in the last received + // past Nack. we do not ask for retransmissions + // for samething that is older than this value. + uint32_t lastReceived_; // segment of the last content object received + // indicates the base of the window on the client + uint64_t lastReceivedTime_; // time at which we recevied the + // lastReceived_ packet + + // sentinel + // if all packets in the window get lost we need something that + // wakes up our consumer socket. Interest timeouts set to 1 sec + // expire too late. This timers expire much sooner and if it + // detects that all the interest in the window may be lost + // it sends all of them again std::unique_ptr<asio::steady_timer> sentinel_timer_; - uint64_t lastEvent_; //time at which we removed a pending - //interest from the window + uint64_t lastEvent_; // time at which we removed a pending + // interest from the window std::unordered_map<uint32_t, uint8_t> packets_in_window_; - //rtt probes - //the RTC transport tends to overestimate the RTT - //du to the production time on the server side - //once per second we send an interest for wich we know - //we will get a nack. This nack will keep our estimation - //close to the reality + // rtt probes + // the RTC transport tends to overestimate the RTT + // du to the production time on the server side + // once per second we send an interest for wich we know + // we will get a nack. This nack will keep our estimation + // close to the reality std::unique_ptr<asio::steady_timer> probe_timer_; uint64_t time_sent_probe_; uint32_t probe_seq_number_; @@ -203,10 +202,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t rounds_; uint32_t roundsWithoutNacks_; - //we keep track of up two paths (if only one path is in use - //the two values in the vector will be the same) - //position 0 stores the path with minRTT - //position 1 stores the path with maxRTT + // we keep track of up two paths (if only one path is in use + // the two values in the vector will be the same) + // position 0 stores the path with minRTT + // position 1 stores the path with maxRTT uint32_t producerPathLabels_[2]; std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_; @@ -219,7 +218,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { unsigned protocolState_; bool initied; - TransportStatistics stats_; }; } // namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc new file mode 100644 index 000000000..f45cab743 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/verification_manager.cc @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/verification_manager.h> + +#include <hicn/transport/interfaces/socket_consumer.h> + +namespace transport { + +namespace protocol { + +interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify( + const Packet& packet) { + using namespace interface; + + bool verify_signature; + VerificationPolicy ret = VerificationPolicy::DROP_PACKET; + + ConsumerContentObjectVerificationFailedCallback* + verification_failed_callback = VOID_HANDLER; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, + verify_signature); + + if (!verify_signature) { + return VerificationPolicy::ACCEPT_PACKET; + } + + icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + if (!verification_failed_callback) { + throw errors::RuntimeException( + "No verification failed callback provided by application. " + "Aborting."); + } + + std::shared_ptr<utils::Verifier> verifier; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + if (TRANSPORT_EXPECT_FALSE(!verifier)) { + ret = (*verification_failed_callback)( + *icn_socket_, dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::no_verifier_provided)); + return ret; + } + + if (!verifier->verify(packet)) { + ret = (*verification_failed_callback)( + *icn_socket_, dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::signature_verification_failed)); + } else { + ret = VerificationPolicy::ACCEPT_PACKET; + } + + return ret; +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h index da67e86f8..6e5d32127 100644 --- a/libtransport/src/hicn/transport/protocols/verification_manager.h +++ b/libtransport/src/hicn/transport/protocols/verification_manager.h @@ -15,56 +15,37 @@ #pragma once -#include <hicn/transport/interfaces/socket_consumer.h> - -#include <deque> +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/verification_policy.h> +#include <hicn/transport/protocols/errors.h> namespace transport { +namespace interface { +class ConsumerSocket; +} + namespace protocol { +using Packet = core::Packet; +using interface::ConsumerSocket; +using interface::VerificationPolicy; + class VerificationManager { public: virtual ~VerificationManager() = default; - virtual bool onPacketToVerify(const Packet& packet) = 0; + virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0; }; class SignatureVerificationManager : public VerificationManager { public: - SignatureVerificationManager(interface::ConsumerSocket* icn_socket) + SignatureVerificationManager(ConsumerSocket* icn_socket) : icn_socket_(icn_socket) {} - TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override { - using namespace interface; - - bool verify_signature, ret = false; - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - - if (!verify_signature) { - return true; - } - - std::shared_ptr<utils::Verifier> verifier; - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - - if (TRANSPORT_EXPECT_FALSE(!verifier)) { - throw errors::RuntimeException( - "No certificate provided by the application."); - } - - ret = verifier->verify(packet); - - if (!ret) { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - - return ret; - } + interface::VerificationPolicy onPacketToVerify(const Packet& packet) override; private: - interface::ConsumerSocket* icn_socket_; + ConsumerSocket* icn_socket_; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/utils/CMakeLists.txt b/libtransport/src/hicn/transport/utils/CMakeLists.txt index cbbca86ed..5a7dbe9cc 100644 --- a/libtransport/src/hicn/transport/utils/CMakeLists.txt +++ b/libtransport/src/hicn/transport/utils/CMakeLists.txt @@ -19,7 +19,6 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/signer.cc ${CMAKE_CURRENT_SOURCE_DIR}/verifier.cc ${CMAKE_CURRENT_SOURCE_DIR}/identity.cc - ${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.cc ${CMAKE_CURRENT_SOURCE_DIR}/log.cc ${CMAKE_CURRENT_SOURCE_DIR}/membuf.cc ${CMAKE_CURRENT_SOURCE_DIR}/content_store.cc diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.cc b/libtransport/src/hicn/transport/utils/suffix_strategy.cc deleted file mode 100644 index f3bcc4562..000000000 --- a/libtransport/src/hicn/transport/utils/suffix_strategy.cc +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/utils/suffix_strategy.h> - -using transport::core::NextSegmentCalculationStrategy; - -namespace utils { -std::uint32_t SuffixManifest::getNextSuffix() { - uint32_t next_suffix; - - switch (suffix_stragegy_) { - case NextSegmentCalculationStrategy::INCREMENTAL: - if (!nb_segments_) { - throw errors::RuntimeException( - "The number of segments in a manifest must be set " - "before assigning incremental suffixes."); - } - /* The current manifest's suffix + the number of segments in a */ - /* manifest give the suffix of the last segment in the manifest. */ - /* The next manifest's suffix is therefore that number plus one. */ - next_suffix = suffix_ + nb_segments_ + 1; - break; - - default: - throw errors::RuntimeException("Unknown suffix strategy."); - } - - return next_suffix; -} - -std::uint32_t SuffixContent::getNextSuffix() { - uint32_t next_suffix; - - switch (suffix_stragegy_) { - case NextSegmentCalculationStrategy::INCREMENTAL: - next_suffix = suffix_ + 1; - if (making_manifest_) { - if (!nb_segments_) { - throw errors::RuntimeException( - "The number of segments in a manifest must be set " - "before assigning incremental suffixes."); - } - - content_counter_++; - /* If the counter have reached the manifest's capacity, - * it means that the next suffix will be a manifest, so we skip it. */ - if (content_counter_ % nb_segments_ == 0) { - next_suffix++; - content_counter_ = 0; - } - } - break; - - default: - throw errors::RuntimeException("Unknown suffix strategy."); - } - - return next_suffix; -} -} // namespace utils diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.h b/libtransport/src/hicn/transport/utils/suffix_strategy.h index 3014855f6..0ed3c5b0e 100644 --- a/libtransport/src/hicn/transport/utils/suffix_strategy.h +++ b/libtransport/src/hicn/transport/utils/suffix_strategy.h @@ -18,111 +18,148 @@ #include <hicn/transport/core/manifest_format.h> namespace utils { + +using transport::core::NextSegmentCalculationStrategy; + class SuffixStrategy { public: - SuffixStrategy( - transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : suffix_stragegy_(suffix_stragegy), - suffix_(start_offset), - nb_segments_(0) {} - - transport::core::NextSegmentCalculationStrategy getSuffixStrategy() { - return suffix_stragegy_; - } + static constexpr uint32_t INVALID_SUFFIX = + std::numeric_limits<uint32_t>::max(); + + SuffixStrategy(NextSegmentCalculationStrategy strategy) + : suffix_stragegy_(strategy), + total_count_(0), + final_suffix_(INVALID_SUFFIX) {} + + virtual ~SuffixStrategy() = default; + + virtual uint32_t getNextSuffix() = 0; + + virtual uint32_t getFinalSuffix() { return final_suffix_; } - void setSuffixStrategy( - transport::core::NextSegmentCalculationStrategy strategy) { - suffix_stragegy_ = strategy; + virtual void setFinalSuffix(std::uint32_t final_suffix) { + if (final_suffix != INVALID_SUFFIX) { + final_suffix_ = final_suffix; + } } - std::uint32_t getSuffix() { return suffix_; } + virtual uint32_t getNextManifestSuffix() = 0; - void updateSuffix(std::uint32_t new_suffix) { suffix_ = new_suffix; } + virtual uint32_t getNextContentSuffix() = 0; - std::size_t getNbSegments() { return nb_segments_; } + virtual void reset(uint32_t offset = 0) = 0; - void setNbSegments(std::size_t nb_segments) { nb_segments_ = nb_segments; } + virtual uint32_t getManifestCapacity() = 0; - void reset(std::uint32_t reset_suffix) { - suffix_ = reset_suffix; - nb_segments_ = 0; + virtual void setManifestCapacity(uint32_t capacity) = 0; + + virtual uint32_t getTotalCount() { return total_count_; }; + + NextSegmentCalculationStrategy getSuffixStrategy() { + return suffix_stragegy_; } - ~SuffixStrategy() {} + protected: + inline void incrementTotalCount() { total_count_++; }; protected: - transport::core::NextSegmentCalculationStrategy suffix_stragegy_; - std::uint32_t suffix_; - std::size_t nb_segments_; - virtual std::uint32_t getNextSuffix() = 0; + NextSegmentCalculationStrategy suffix_stragegy_; + std::uint32_t total_count_; + std::uint32_t final_suffix_; }; -class SuffixManifest : public SuffixStrategy { +class IncrementalSuffixStrategy : public SuffixStrategy { public: - SuffixManifest( - transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : SuffixStrategy(suffix_stragegy, start_offset) {} - - SuffixManifest operator++() { - updateSuffix(getNextSuffix()); - SuffixManifest temp_suffix(suffix_stragegy_, suffix_); - temp_suffix.setNbSegments(getNbSegments()); - return temp_suffix; + IncrementalSuffixStrategy(std::uint32_t start_offset) + : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL), + next_suffix_(start_offset) {} + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override { + incrementTotalCount(); + return next_suffix_++; + } + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override { + return getNextSuffix(); } - SuffixManifest operator++(int) { - SuffixManifest temp_suffix(suffix_stragegy_, suffix_); - temp_suffix.setNbSegments(getNbSegments()); - updateSuffix(getNextSuffix()); - return temp_suffix; + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override { + return getNextSuffix(); } + uint32_t getManifestCapacity() override { + throw errors::RuntimeException( + "No manifest capacity in IncrementalSuffixStrategy."); + } + + void setManifestCapacity(uint32_t capacity) override { + throw errors::RuntimeException( + "No manifest capacity in IncrementalSuffixStrategy."); + } + + void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; } + protected: - std::uint32_t getNextSuffix(); + std::uint32_t next_suffix_; }; -class SuffixContent : public SuffixStrategy { +class CapacityBasedSuffixStrategy : public SuffixStrategy { public: - SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset, bool making_manifest) - : SuffixStrategy(suffix_stragegy, start_offset), - making_manifest_(making_manifest), - content_counter_(0) {} - - SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : SuffixContent(suffix_stragegy, start_offset, false) {} - - SuffixContent operator++() { - updateSuffix(getNextSuffix()); - SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); - temp_suffix.setNbSegments(getNbSegments()); - temp_suffix.content_counter_ = content_counter_; - return temp_suffix; + CapacityBasedSuffixStrategy(std::uint32_t start_offset, + std::uint32_t manifest_capacity) + : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL), + next_suffix_(start_offset), + segments_in_manifest_(manifest_capacity), + current_manifest_iteration_(0) {} + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override { + incrementTotalCount(); + return next_suffix_++; + } + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override { + incrementTotalCount(); + return next_suffix_ % segments_in_manifest_ == 0 ? next_suffix_++ + : ++next_suffix_; } - SuffixContent operator++(int) { - SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); - temp_suffix.setNbSegments(getNbSegments()); - temp_suffix.content_counter_ = content_counter_; - updateSuffix(getNextSuffix()); - return temp_suffix; + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override { + incrementTotalCount(); + return (current_manifest_iteration_++) * (segments_in_manifest_ + 1); } - void setUsingManifest(bool value) { making_manifest_ = value; } + TRANSPORT_ALWAYS_INLINE uint32_t getManifestCapacity() override { + return segments_in_manifest_; + } - void reset(std::uint32_t reset_suffix) { - SuffixStrategy::reset(reset_suffix); - content_counter_ = 0; + TRANSPORT_ALWAYS_INLINE void setManifestCapacity(uint32_t capacity) override { + segments_in_manifest_ = capacity; } + void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; } + protected: - bool making_manifest_; - /* content_counter_ keeps track of the number of segments */ - /* between two manifests */ - uint32_t content_counter_; - std::uint32_t getNextSuffix(); + std::uint32_t next_suffix_; + std::uint32_t segments_in_manifest_; + std::uint32_t current_manifest_iteration_; }; + +class SuffixStrategyFactory { + public: + static std::unique_ptr<SuffixStrategy> getSuffixStrategy( + NextSegmentCalculationStrategy strategy, uint32_t start_offset, + uint32_t manifest_capacity = 0) { + switch (strategy) { + case NextSegmentCalculationStrategy::INCREMENTAL: + return std::make_unique<IncrementalSuffixStrategy>(start_offset); + case NextSegmentCalculationStrategy::MANIFEST_CAPACITY_BASED: + return std::make_unique<CapacityBasedSuffixStrategy>(start_offset, + manifest_capacity); + default: + throw errors::RuntimeException( + "No valid NextSegmentCalculationStrategy specified."); + } + } +}; + } // namespace utils |