From 6d7704c1b497341fd6dd3c27e3f64d0db062ccc2 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Mon, 4 Feb 2019 11:06:18 +0100 Subject: [HICN-11] Rework on transport protocols improving components modularity Change-Id: I6683ec5b494238dc93591c103d25275e89b9f267 Signed-off-by: Mauro Sardara --- .../src/hicn/transport/protocols/CMakeLists.txt | 27 +- libtransport/src/hicn/transport/protocols/cbr.cc | 30 +- libtransport/src/hicn/transport/protocols/cbr.h | 18 +- .../protocols/congestion_window_protocol.h | 30 + .../hicn/transport/protocols/indexing_manager.h | 193 +++++++ .../protocols/manifest_indexing_manager.cc | 217 +++++++ .../protocols/manifest_indexing_manager.h | 69 +++ .../src/hicn/transport/protocols/packet_manager.h | 66 +++ .../src/hicn/transport/protocols/protocol.cc | 52 +- .../src/hicn/transport/protocols/protocol.h | 37 +- libtransport/src/hicn/transport/protocols/raaqm.cc | 575 +++++++++++++------ libtransport/src/hicn/transport/protocols/raaqm.h | 79 ++- .../hicn/transport/protocols/raaqm_data_path.cc | 16 +- .../src/hicn/transport/protocols/raaqm_data_path.h | 12 +- .../hicn/transport/protocols/rate_estimation.cc | 5 +- .../src/hicn/transport/protocols/rate_estimation.h | 5 - .../src/hicn/transport/protocols/reassembly.cc | 83 +++ .../src/hicn/transport/protocols/reassembly.h | 71 +++ libtransport/src/hicn/transport/protocols/rtc.cc | 74 +-- libtransport/src/hicn/transport/protocols/rtc.h | 28 +- .../src/hicn/transport/protocols/statistics.h | 94 +++ libtransport/src/hicn/transport/protocols/vegas.cc | 627 --------------------- libtransport/src/hicn/transport/protocols/vegas.h | 161 ------ .../transport/protocols/vegas_rto_estimator.cc | 57 -- .../hicn/transport/protocols/vegas_rto_estimator.h | 48 -- .../transport/protocols/verification_manager.h | 72 +++ 26 files changed, 1530 insertions(+), 1216 deletions(-) create mode 100644 libtransport/src/hicn/transport/protocols/congestion_window_protocol.h create mode 100644 libtransport/src/hicn/transport/protocols/indexing_manager.h create mode 100644 libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc create mode 100644 libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h create mode 100644 libtransport/src/hicn/transport/protocols/packet_manager.h create mode 100644 libtransport/src/hicn/transport/protocols/reassembly.cc create mode 100644 libtransport/src/hicn/transport/protocols/reassembly.h create mode 100644 libtransport/src/hicn/transport/protocols/statistics.h delete mode 100644 libtransport/src/hicn/transport/protocols/vegas.cc delete mode 100644 libtransport/src/hicn/transport/protocols/vegas.h delete mode 100644 libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc delete mode 100644 libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h create mode 100644 libtransport/src/hicn/transport/protocols/verification_manager.h (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt index 1c3b76c24..84a4d5279 100644 --- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt +++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt @@ -14,33 +14,52 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h ${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h - ${CMAKE_CURRENT_SOURCE_DIR}/vegas.h ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h - ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.h ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h ) list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/vegas.cc + ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc - ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.cc ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc +) + +set(RAAQM_CONFIG_INSTALL_PREFIX +${CMAKE_INSTALL_PREFIX}/etc/hicn +) + +set(raaqm_config_path + ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf + PARENT_SCOPE ) set(TRANSPORT_CONFIG ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf ) +install( + FILES ${TRANSPORT_CONFIG} + DESTINATION etc/hicn + COMPONENT lib${LIBTRANSPORT} +) + set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc index 3da4819c3..efd2149ad 100644 --- a/libtransport/src/hicn/transport/protocols/cbr.cc +++ b/libtransport/src/hicn/transport/protocols/cbr.cc @@ -22,25 +22,27 @@ namespace protocol { using namespace interface; -CbrTransportProtocol::CbrTransportProtocol(BaseSocket *icnet_socket) - : VegasTransportProtocol(icnet_socket) {} - -void CbrTransportProtocol::start( - utils::SharableVector &receive_buffer) { - current_window_size_ = socket_->current_window_size_; - VegasTransportProtocol::start(receive_buffer); +CbrTransportProtocol::CbrTransportProtocol( + interface::ConsumerSocket *icnet_socket) + : RaaqmTransportProtocol(icnet_socket) {} + +int CbrTransportProtocol::start() { + socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + return RaaqmTransportProtocol::start(); } -void CbrTransportProtocol::changeInterestLifetime(uint64_t segment) { return; } - -void CbrTransportProtocol::increaseWindow() {} - -void CbrTransportProtocol::decreaseWindow() {} - void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} void CbrTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) {} + const Interest &interest, const ContentObject &content_object) { + auto segment = content_object.getName().getSuffix(); + auto now = utils::SteadyClock::now(); + auto rtt = std::chrono::duration_cast( + now - interest_timepoints_[segment & mask]); + // Update stats + updateStats(segment, rtt.count(), now); +} } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h index 0a572292a..a8eff2182 100644 --- a/libtransport/src/hicn/transport/protocols/cbr.h +++ b/libtransport/src/hicn/transport/protocols/cbr.h @@ -15,32 +15,22 @@ #pragma once -#include -#include -#include -#include +#include namespace transport { namespace protocol { -class CbrTransportProtocol : public VegasTransportProtocol { +class CbrTransportProtocol : public RaaqmTransportProtocol { public: - CbrTransportProtocol(interface::BaseSocket *icnet_socket); + CbrTransportProtocol(interface::ConsumerSocket *icnet_socket); - void start(utils::SharableVector &receive_buffer) override; + int start() override; private: void afterContentReception(const Interest &interest, const ContentObject &content_object) override; - void afterDataUnsatisfied(uint64_t segment) override; - - void increaseWindow() override; - - void decreaseWindow() override; - - void changeInterestLifetime(uint64_t segment) override; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h new file mode 100644 index 000000000..36ac6eb17 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { + +namespace protocol { + +class CWindowProtocol { + protected: + virtual void increaseWindow() = 0; + virtual void decreaseWindow() = 0; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/indexing_manager.h new file mode 100644 index 000000000..888b17d9d --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/indexing_manager.h @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace transport { + +namespace protocol { + +class IndexManager { + public: + static constexpr uint32_t invalid_index = ~0; + + /** + * + */ + virtual ~IndexManager() = default; + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + virtual uint32_t getNextSuffix() = 0; + + /** + * Retrive the next segment to be reassembled. + */ + virtual uint32_t getNextReassemblySegment() = 0; + + virtual bool isFinalSuffixDiscovered() = 0; + + virtual uint32_t getFinalSuffix() = 0; + + virtual void reset() = 0; +}; + +class IndexVerificationManager : public IndexManager { + public: + /** + * + */ + virtual ~IndexVerificationManager() = default; + + /** + * The ownership of the ContentObjectManifest is moved + * from the caller to the VerificationManager + */ + virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0; + + /** + * The content object must just be verified; the ownership is still of the + * caller. + */ + virtual bool onContentObject(const core::ContentObject &content_object) = 0; +}; + +class ZeroIndexManager : public IndexVerificationManager { + public: + ZeroIndexManager() : reset_(true) {} + + TRANSPORT_ALWAYS_INLINE virtual void reset() override { reset_ = true; } + + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { + uint32_t ret = reset_ ? 0 : IndexManager::invalid_index; + reset_ = false; + return ret; + } + + /** + * Retrive the next segment to be reassembled. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { + return IndexManager::invalid_index; + } + + TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { + return false; + } + + TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { + return IndexManager::invalid_index; + } + + TRANSPORT_ALWAYS_INLINE bool onManifest( + core::ContentObject::Ptr &&content_object) override { + throw errors::UnexpectedManifestException(); + } + + TRANSPORT_ALWAYS_INLINE bool onContentObject( + const core::ContentObject &content_object) override { + throw errors::RuntimeException( + "Called onContentObject on a ZeroIndexManager, which is not able to " + "process packets."); + } + + private: + bool reset_; +}; + +class IncrementalIndexManager : public IndexVerificationManager { + public: + IncrementalIndexManager(interface::ConsumerSocket *icn_socket) + : socket_(icn_socket), + final_suffix_(std::numeric_limits::max()), + next_download_suffix_(0), + next_reassembly_suffix_(0), + verification_manager_( + std::make_unique(icn_socket)) {} + + /** + * + */ + virtual ~IncrementalIndexManager() {} + + TRANSPORT_ALWAYS_INLINE virtual void reset() override { + final_suffix_ = std::numeric_limits::max(); + next_download_suffix_ = 0; + next_reassembly_suffix_ = 0; + } + + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { + return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++ + : IndexManager::invalid_index; + } + + /** + * Retrive the next segment to be reassembled. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { + return next_reassembly_suffix_ <= final_suffix_ + ? next_reassembly_suffix_++ + : IndexManager::invalid_index; + } + + TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { + return final_suffix_ != std::numeric_limits::max(); + } + + TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { + return final_suffix_; + } + + TRANSPORT_ALWAYS_INLINE bool onManifest( + core::ContentObject::Ptr &&content_object) override { + throw errors::UnexpectedManifestException(); + } + + TRANSPORT_ALWAYS_INLINE bool onContentObject( + const core::ContentObject &content_object) override { + auto ret = verification_manager_->onPacketToVerify(content_object); + + if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) { + final_suffix_ = content_object.getName().getSuffix(); + } + + return ret; + } + + protected: + interface::ConsumerSocket *socket_; + uint64_t final_suffix_; + uint64_t next_download_suffix_; + uint64_t next_reassembly_suffix_; + std::unique_ptr verification_manager_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc new file mode 100644 index 000000000..1afb4eaac --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +namespace transport { + +namespace protocol { + +using namespace interface; + +ManifestIndexManager::ManifestIndexManager( + interface::ConsumerSocket *icn_socket) + : IncrementalIndexManager(icn_socket), + PacketManager(1024), + next_reassembly_segment_(suffix_queue_.end()), + next_to_retrieve_segment_(suffix_queue_.end()), + next_manifest_(0) {} + +bool ManifestIndexManager::onManifest( + core::ContentObject::Ptr &&content_object) { + auto manifest = + std::make_unique(std::move(*content_object)); + + bool manifest_verified = verification_manager_->onPacketToVerify(*manifest); + + if (manifest_verified) { + manifest->decode(); + + if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != + core::ManifestVersion::VERSION_1)) { + throw errors::RuntimeException("Received manifest with unknown version."); + } + + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = --manifest->getSuffixList().end(); + + if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { + _end++; + } + + // Get final block number + final_suffix_ = manifest->getFinalBlockNumber(); + + suffix_hash_map_[_it->first] = + std::make_pair(std::vector(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + suffix_queue_.push_back(_it->first); + + // If the transport protocol finished the list of segments to retrieve, + // reset the next_to_retrieve_segment_ iterator to the next segment + // provided by this manifest. + if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == + suffix_queue_.end())) { + next_to_retrieve_segment_ = --suffix_queue_.end(); + } + + std::advance(_it, 1); + for (; _it != _end; _it++) { + suffix_hash_map_[_it->first] = std::make_pair( + std::vector(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + suffix_queue_.push_back(_it->first); + } + + if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { + // Set the iterators to the beginning of the suffix queue + next_reassembly_segment_ = suffix_queue_.begin(); + } + + if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() || + next_manifest_ > final_suffix_)) { + break; + } + + // Get current window size + double current_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size); + + // Get portal + std::shared_ptr portal; + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); + + // Number of segments in manifest + std::size_t segments_in_manifest = std::distance( + manifest->getSuffixList().begin(), manifest->getSuffixList().end()); + std::size_t segment_count = 0; + + // Manifest namespace + Name &name = manifest->getWritableName(); + + // Send as many manifest as required for filling window. + do { + segment_count += segments_in_manifest; + next_manifest_ += segments_in_manifest; + + Interest::Ptr interest = getPacket(); + name.setSuffix(next_manifest_); + interest->setName(name); + + uint32_t interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + interest->setLifetime(interest_lifetime); + + // Send requests for manifest out of the congestion window (no + // in_flight_interest++) + portal->sendInterest(std::move(interest)); + } while (segment_count < current_window_size && + next_manifest_ < final_suffix_); + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } + } + + return manifest_verified; +} + +bool ManifestIndexManager::onContentObject( + const core::ContentObject &content_object) { + bool verify_signature; + socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, + verify_signature); + + if (!verify_signature) { + return true; + } + + uint64_t segment = content_object.getName().getSuffix(); + + bool ret = false; + + auto it = suffix_hash_map_.find(segment); + if (it != suffix_hash_map_.end()) { + auto hash_type = static_cast(it->second.second); + auto data_packet_digest = content_object.computeDigest(it->second.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest().data(); + std::vector &manifest_digest_bytes = it->second.first; + + if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, + manifest_digest_bytes.data(), + hash_type)) { + suffix_hash_map_.erase(it); + ret = true; + } else { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + } + + return ret; +} + +uint32_t ManifestIndexManager::getNextSuffix() { + if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == + suffix_queue_.end())) { + return invalid_index; + } + + return *next_to_retrieve_segment_++; +} + +uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; } + +bool ManifestIndexManager::isFinalSuffixDiscovered() { + return IncrementalIndexManager::isFinalSuffixDiscovered(); +} + +uint32_t ManifestIndexManager::getNextReassemblySegment() { + if (TRANSPORT_EXPECT_FALSE(next_reassembly_segment_ == suffix_queue_.end())) { + return invalid_index; + } + + if (TRANSPORT_EXPECT_TRUE(next_reassembly_segment_ != + suffix_queue_.begin())) { + suffix_queue_.erase(std::prev(next_reassembly_segment_)); + } + + return *next_reassembly_segment_++; +} + +void ManifestIndexManager::reset() { + IncrementalIndexManager::reset(); + suffix_queue_.clear(); + suffix_hash_map_.clear(); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h new file mode 100644 index 000000000..ee2c531c7 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace transport { + +namespace protocol { + +class ManifestIndexManager : public IncrementalIndexManager, + public PacketManager { + static constexpr double alpha = 0.3; + + public: + using SuffixQueue = std::list; + using HashEntry = std::pair, core::HashAlgorithm>; + + ManifestIndexManager(interface::ConsumerSocket *icn_socket); + + virtual ~ManifestIndexManager() = default; + + void reset() override; + + bool onManifest(core::ContentObject::Ptr &&content_object) override; + + bool onContentObject(const core::ContentObject &content_object) override; + + uint32_t getNextSuffix() override; + + uint32_t getNextReassemblySegment() override; + + bool isFinalSuffixDiscovered() override; + + uint32_t getFinalSuffix() override; + + protected: + SuffixQueue suffix_queue_; + SuffixQueue::iterator next_reassembly_segment_; + SuffixQueue::iterator next_to_retrieve_segment_; + + // Hash verification + std::unordered_map, core::HashAlgorithm>> + suffix_hash_map_; + + // Next Manifest + std::uint32_t next_manifest_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/packet_manager.h b/libtransport/src/hicn/transport/protocols/packet_manager.h new file mode 100644 index 000000000..53486edde --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/packet_manager.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace transport { + +namespace protocol { + +using namespace core; + +template +class PacketManager { + static_assert(std::is_base_of::value, + "The packet manager support just Interest and Data."); + + static constexpr std::size_t packet_pool_size = 4096; + + public: + PacketManager(std::size_t size = packet_pool_size) : size_(0) { + // Create pool of interests + increasePoolSize(size); + } + + TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) { + for (std::size_t i = 0; i < size; i++) { + interest_pool_.add(new PacketType()); + } + + size_ += size; + } + + TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() { + auto result = interest_pool_.get(); + + while (TRANSPORT_EXPECT_FALSE(!result.first)) { + // Add packets to the pool + increasePoolSize(size_); + result = interest_pool_.get(); + } + + return std::move(result.second); + } + + private: + utils::ObjectPool interest_pool_; + std::size_t size_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc index ea4fd6dbf..9caa2eca7 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -20,24 +20,50 @@ namespace transport { namespace protocol { -TransportProtocol::TransportProtocol(interface::BaseSocket *icn_socket) - : socket_(dynamic_cast(icn_socket)), - is_running_(false), - interest_pool_() { - // Create pool of interests - increasePoolSize(); +using namespace interface; + +TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket) + : socket_(icn_socket), is_running_(false) { + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); +} + +int TransportProtocol::start() { + // If the protocol is already running, return + if (is_running_) return -1; + + // Set the protocol as running + is_running_ = true; + + // Reset the protocol state machine + reset(); + + // Schedule next interests + scheduleNextInterests(); + + // Start Event loop + portal_->runEventsLoop(); + + // Not running anymore + is_running_ = false; + + return 0; } -TransportProtocol::~TransportProtocol() {} +void TransportProtocol::stop() { + is_running_ = false; + portal_->stopEventsLoop(); +} + +void TransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; -void TransportProtocol::updatePortal() { portal_ = socket_->portal_; } + scheduleNextInterests(); -bool TransportProtocol::isRunning() { return is_running_; } + portal_->runEventsLoop(); -void TransportProtocol::increasePoolSize(std::size_t size) { - for (std::size_t i = 0; i < size; i++) { - interest_pool_.add(new Interest()); - } + is_running_ = false; } } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index 56c57e025..6a3b23753 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -16,8 +16,9 @@ #pragma once #include +#include +#include #include -#include namespace transport { @@ -31,39 +32,28 @@ class TransportProtocolCallback { virtual void onTimeout(const core::Interest &interest) = 0; }; -class TransportProtocol : public interface::BasePortal::ConsumerCallback { +class TransportProtocol : public interface::BasePortal::ConsumerCallback, + public PacketManager { static constexpr std::size_t interest_pool_size = 4096; public: - TransportProtocol(interface::BaseSocket *icn_socket); + TransportProtocol(interface::ConsumerSocket *icn_socket); - virtual ~TransportProtocol(); + virtual ~TransportProtocol() { stop(); }; - void updatePortal(); + TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } - bool isRunning(); + virtual int start(); - virtual void start(utils::SharableVector &content_buffer) = 0; + virtual void stop(); - virtual void stop() = 0; + virtual void resume(); - virtual void resume() = 0; + virtual void scheduleNextInterests() = 0; protected: - virtual void increasePoolSize(std::size_t size = interest_pool_size); - - TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { - auto result = interest_pool_.get(); - - while (TRANSPORT_EXPECT_FALSE(!result.first)) { - // Add packets to the pool - increasePoolSize(); - result = interest_pool_.get(); - } - - return std::move(result.second); - } // Consumer Callback + virtual void reset() = 0; virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; virtual void onTimeout(Interest::Ptr &&i) = 0; @@ -71,7 +61,8 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback { interface::ConsumerSocket *socket_; std::shared_ptr portal_; volatile bool is_running_; - utils::ObjectPool interest_pool_; + TransportStatistics stats_; + std::shared_ptr> content_buffer_; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 3e04689e9..f5eb48bd8 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -14,8 +14,10 @@ */ #include +#include #include +#include #include namespace transport { @@ -24,8 +26,14 @@ namespace protocol { using namespace interface; -RaaqmTransportProtocol::RaaqmTransportProtocol(BaseSocket *icnet_socket) - : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) { +RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) + : TransportProtocol(icnet_socket), + BaseReassembly(icnet_socket, this), + current_window_size_(1), + interests_in_flight_(0), + cur_path_(nullptr), + t0_(utils::SteadyClock::now()), + rate_estimator_(nullptr) { init(); } @@ -35,16 +43,123 @@ RaaqmTransportProtocol::~RaaqmTransportProtocol() { } } +int RaaqmTransportProtocol::start() { + if (this->rate_estimator_) { + this->rate_estimator_->onStart(); + } + + if (!cur_path_) { + // RAAQM + double drop_factor; + double minimum_drop_probability; + uint32_t sample_number; + uint32_t interest_lifetime; + + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); + socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, + minimum_drop_probability); + socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, + sample_number); + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + + // Rate Estimation + double alpha = 0.0; + uint32_t batching_param = 0; + uint32_t choice_param = 0; + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, + alpha); + socket_->getSocketOption( + RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, + choice_param); + + if (choice_param == 1) { + this->rate_estimator_ = new ALaTcpEstimator(); + } else { + this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + } + + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, + &this->rate_estimator_->observer_); + + // Current path + auto cur_path = std::make_unique( + drop_factor, minimum_drop_probability, interest_lifetime * 1000, + sample_number); + cur_path_ = cur_path.get(); + path_table_[default_values::path_id] = std::move(cur_path); + } + + portal_->setConsumerCallback(this); + return TransportProtocol::start(); +} + +void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } + +void RaaqmTransportProtocol::reset() { + // Reset reassembly component + BaseReassembly::reset(); + + // Reset protocol variables + interests_in_flight_ = 0; + t0_ = utils::SteadyClock::now(); +} + +void RaaqmTransportProtocol::increaseWindow() { + double max_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window_size); + if (current_window_size_ < max_window_size) { + double gamma = 0.; + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + this->rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + double min_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window_size); + if (current_window_size_ > min_window_size) { + double beta = 0.; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + this->rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().getSuffix()); + this->rate_estimator_->onDataReceived((int)content_object.payloadSize() + + content_object.headerSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + void RaaqmTransportProtocol::init() { std::ifstream is(RAAQM_CONFIG_PATH); std::string line; - - socket_->beta_ = default_values::beta_value; - socket_->drop_factor_ = default_values::drop_factor; - socket_->interest_lifetime_ = default_values::interest_lifetime; - socket_->max_retransmissions_ = - default_values::transport_protocol_max_retransmissions; raaqm_autotune_ = false; default_beta_ = default_values::beta_value; default_drop_ = default_values::drop_factor; @@ -56,7 +171,9 @@ void RaaqmTransportProtocol::init() { lte_delay_ = 15000; if (!is) { - TRANSPORT_LOGW("WARNING: RAAQM parameters not found, set default values"); + TRANSPORT_LOGW( + "WARNING: RAAQM parameters not found at %s, set default values", + RAAQM_CONFIG_PATH); return; } @@ -86,7 +203,8 @@ void RaaqmTransportProtocol::init() { std::string tmp; uint32_t lifetime; line_s >> tmp >> lifetime; - socket_->interest_lifetime_ = lifetime; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); continue; } @@ -94,21 +212,23 @@ void RaaqmTransportProtocol::init() { std::string tmp; uint32_t rtx; line_s >> tmp >> rtx; - socket_->max_retransmissions_ = rtx; + socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx); continue; } if (command == "beta") { std::string tmp; line_s >> tmp >> default_beta_; - socket_->beta_ = default_beta_; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, + default_beta_); continue; } if (command == "drop") { std::string tmp; line_s >> tmp >> default_drop_; - socket_->drop_factor_ = default_drop_; + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, + default_drop_); continue; } @@ -151,7 +271,8 @@ void RaaqmTransportProtocol::init() { std::string tmp; double rate_alpha = 0.0; line_s >> tmp >> rate_alpha; - socket_->rate_estimation_alpha_ = rate_alpha; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, + rate_alpha); continue; } @@ -159,7 +280,9 @@ void RaaqmTransportProtocol::init() { std::string tmp; uint32_t batching_param = 0; line_s >> tmp >> batching_param; - socket_->rate_estimation_batching_parameter_ = batching_param; + socket_->setSocketOption( + RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, + batching_param); continue; } @@ -167,133 +290,332 @@ void RaaqmTransportProtocol::init() { std::string tmp; uint32_t choice_param = 0; line_s >> tmp >> choice_param; - socket_->rate_estimation_choice_ = choice_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, + choice_param); continue; } } + is.close(); } -void RaaqmTransportProtocol::start( - utils::SharableVector &content_buffer) { - if (this->rate_estimator_) { - this->rate_estimator_->onStart(); +void RaaqmTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + + // Check whether makes sense to continue + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; } - if (!cur_path_) { - double drop_factor; - double minimum_drop_probability; - uint32_t sample_number; - uint32_t interest_lifetime; - // double beta; + // Call application-defined callbacks + ConsumerContentObjectCallback *callback_content_object = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &callback_content_object); + if (*callback_content_object != VOID_HANDLER) { + (*callback_content_object)(*socket_, *content_object); + } - drop_factor = socket_->drop_factor_; - minimum_drop_probability = socket_->minimum_drop_probability_; - sample_number = socket_->sample_number_; - interest_lifetime = socket_->interest_lifetime_; - // beta = socket_->beta_; + ConsumerInterestCallback *callback_interest = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &callback_interest); + if (*callback_content_object != VOID_HANDLER) { + (*callback_interest)(*socket_, *interest); + } - double alpha = 0.0; - uint32_t batching_param = 0; - uint32_t choice_param = 0; - alpha = socket_->rate_estimation_alpha_; - batching_param = socket_->rate_estimation_batching_parameter_; - choice_param = socket_->rate_estimation_choice_; + if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == + PayloadType::MANIFEST)) { + if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { + index_manager_ = manifest_index_manager_.get(); + interests_in_flight_--; + } - if (choice_param == 1) { - this->rate_estimator_ = new ALaTcpEstimator(); - } else { - this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + index_manager_->onManifest(std::move(content_object)); + + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { + index_manager_ = incremental_index_manager_.get(); } - this->rate_estimator_->observer_ = socket_->rate_estimation_observer_; + onContentSegment(std::move(interest), std::move(content_object)); + } - cur_path_ = std::make_shared( - drop_factor, minimum_drop_probability, interest_lifetime * 1000, - sample_number); - path_table_[default_values::path_id] = cur_path_; + if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { + BaseReassembly::index_ = index_manager_->getNextReassemblySegment(); } - VegasTransportProtocol::start(content_buffer); + scheduleNextInterests(); } -void RaaqmTransportProtocol::copyContent(const ContentObject &content_object) { - if (TRANSPORT_EXPECT_FALSE( - (content_object.getName().getSuffix() == final_block_number_) || - !(is_running_))) { - this->rate_estimator_->onDownloadFinished(); +void RaaqmTransportProtocol::onContentSegment( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + bool virtual_download = false; + socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download); + + // Decrease in-flight interests + interests_in_flight_--; + + // Update stats + if (!interest_retransmissions_[incremental_suffix & mask]) { + afterContentReception(*interest, *content_object); + } + + if (index_manager_->onContentObject(*content_object)) { + stats_.updateBytesRecv(content_object->payloadSize()); + + if (!virtual_download) { + reassemble(std::move(content_object)); + } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == + index_manager_->getFinalSuffix())) { + onContentReassembled(std::make_error_code(std::errc(0))); + } + } else { + // TODO Application policy check + // unverified_segments_.emplace( + // std::make_pair(incremental_suffix, std::move(content_object))); + TRANSPORT_LOGE("Received not trusted segment."); } - VegasTransportProtocol::copyContent(content_object); } -void RaaqmTransportProtocol::updatePathTable( - const ContentObject &content_object) { - uint32_t path_id = content_object.getPathLabel(); +void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { + checkForStalePaths(); - if (path_table_.find(path_id) == path_table_.end()) { - if (cur_path_) { - // Create a new path with some default param - if (path_table_.empty()) { - throw errors::RuntimeException( - "No path initialized for path table, error could be in default " - "path initialization."); - } else { - // Initiate the new path default param - std::shared_ptr new_path = - std::make_shared( - *(path_table_.at(default_values::path_id))); - // Insert the new path into hash table - path_table_[path_id] = new_path; - } + const Name &n = interest->getName(); + + TRANSPORT_LOGW("Timeout on %s", n.toString().c_str()); + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_--; + + uint64_t segment = n.getSuffix(); + + // Do not retransmit interests asking contents that do not exist. + if (segment >= index_manager_->getFinalSuffix()) { + return; + } + + ConsumerInterestCallback *callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &callback); + if (*callback != VOID_HANDLER) { + (*callback)(*socket_, *interest); + } + + afterDataUnsatisfied(segment); + + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); + + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_.updateRetxCount(1); + + callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback != VOID_HANDLER) { + (*callback)(*socket_, *interest); + } + + callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if ((*callback) != VOID_HANDLER) { + (*callback)(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); + + scheduleNextInterests(); + } else { + TRANSPORT_LOGE("Stop: reached max retx limit."); + onContentReassembled(std::make_error_code(std::errc(std::errc::io_error))); + } +} + +void RaaqmTransportProtocol::scheduleNextInterests() { + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + uint32_t index = IndexManager::invalid_index; + // Send the interest needed for filling the window + while (interests_in_flight_ < current_window_size_) { + if (interest_to_retransmit_.size() > 0) { + sendInterest(std::move(interest_to_retransmit_.front())); + interest_to_retransmit_.pop(); } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); + index = index_manager_->getNextSuffix(); + if (index == IndexManager::invalid_index) { + break; + } + sendInterest(index); } } +} - cur_path_ = path_table_[path_id]; +void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = getPacket(); + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + name->setSuffix(next_suffix); + interest->setName(*name); + + uint32_t interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + interest->setLifetime(interest_lifetime); + + ConsumerInterestCallback *callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback != VOID_HANDLER) { + callback->operator()(*socket_, *interest); + } - size_t header_size = content_object.headerSize(); - size_t data_size = content_object.payloadSize(); + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } - // Update measurements for path - cur_path_->updateReceivedStats(header_size + data_size, data_size); + interest_retransmissions_[next_suffix & mask] = ~0; + interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); + sendInterest(std::move(interest)); +} + +void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { + interests_in_flight_++; + interest_retransmissions_[interest->getName().getSuffix() & mask]++; + + portal_->sendInterest(std::move(interest)); +} + +void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { + interface::ConsumerContentCallback *on_payload = nullptr; + socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); + if (*on_payload != VOID_HANDLER) { + std::shared_ptr> content_buffer; + socket_->getSocketOption( + interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); + (*on_payload)(*socket_, content_buffer->size(), ec); + } + + this->rate_estimator_->onDownloadFinished(); + stop(); } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("ERROR: no current path found, exit"); + throw std::runtime_error("RAAQM ERROR: no current path found, exit"); } else { - std::chrono::microseconds rtt; + auto now = utils::SteadyClock::now(); + utils::Microseconds rtt = std::chrono::duration_cast( + now - interest_timepoints_[segment & mask]); - std::chrono::steady_clock::duration duration = - std::chrono::steady_clock::now() - - interest_timepoints_[segment & mask_]; - rtt = std::chrono::duration_cast(duration); + // Update stats + updateStats(segment, rtt.count(), now); if (this->rate_estimator_) { this->rate_estimator_->onRttUpdate((double)rtt.count()); } + cur_path_->insertNewRtt(rtt.count()); cur_path_->smoothTimer(); if (cur_path_->newPropagationDelayAvailable()) { - check_drop_probability(); + checkDropProbability(); } } } -void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) { - return; +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + if (std::rand() % 10000 <= cur_path_->getDropProb() * 10000) { + decreaseWindow(); + } + } } -void RaaqmTransportProtocol::check_drop_probability() { +void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now) { + // Update RTT statistics + stats_.updateAverageRtt(rtt); + stats_.updateAverageWindowSize(current_window_size_); + + // Call statistics callback + ConsumerTimerCallback *stats_callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback != VOID_HANDLER) { + auto dt = std::chrono::duration_cast(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_callback)(*socket_, stats_); + t0_ = utils::SteadyClock::now(); + } + } +} + +void RaaqmTransportProtocol::updatePathTable( + const ContentObject &content_object) { + uint32_t path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { + // Create a new path with some default param + + if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { + throw errors::RuntimeException( + "[RAAQM] No path initialized for path table, error could be in " + "default path initialization."); + } + + // Initiate the new path default param + auto new_path = std::make_unique( + *(path_table_.at(default_values::path_id))); + + // Insert the new path into hash table + path_table_[path_id] = std::move(new_path); + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id].get(); + + size_t header_size = content_object.headerSize(); + size_t data_size = content_object.payloadSize(); + + // Update measurements for path + cur_path_->updateReceivedStats(header_size + data_size, data_size); +} + +void RaaqmTransportProtocol::checkDropProbability() { if (!raaqm_autotune_) { return; } unsigned int max_pd = 0; - std::unordered_map>::iterator it; + PathTable::iterator it; for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { if (it->second->getPropagationDelay() > max_pd && it->second->getPropagationDelay() != UINT_MAX && @@ -317,28 +639,28 @@ void RaaqmTransportProtocol::check_drop_probability() { double old_drop_prob = 0; double old_beta = 0; - old_beta = socket_->beta_; - old_drop_prob = socket_->drop_factor_; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); if (drop_prob == old_drop_prob && beta == old_beta) { return; } - socket_->beta_ = beta; - socket_->drop_factor_ = drop_prob; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); for (it = path_table_.begin(); it != path_table_.end(); it++) { it->second->setDropProb(drop_prob); } } -void RaaqmTransportProtocol::check_for_stale_paths() { +void RaaqmTransportProtocol::checkForStalePaths() { if (!raaqm_autotune_) { return; } bool stale = false; - std::unordered_map>::iterator it; + PathTable::iterator it; for (it = path_table_.begin(); it != path_table_.end(); ++it) { if (it->second->isStale()) { stale = true; @@ -346,71 +668,10 @@ void RaaqmTransportProtocol::check_for_stale_paths() { } } if (stale) { - check_drop_probability(); + checkDropProbability(); } } -void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - check_for_stale_paths(); - VegasTransportProtocol::onTimeout(std::move(interest)); -} - -void RaaqmTransportProtocol::increaseWindow() { - double max_window_size = socket_->max_window_size_; - if (current_window_size_ < max_window_size) { - double gamma = socket_->gamma_; - - current_window_size_ += gamma / current_window_size_; - socket_->current_window_size_ = current_window_size_; - } - this->rate_estimator_->onWindowIncrease(current_window_size_); -} - -void RaaqmTransportProtocol::decreaseWindow() { - double min_window_size = socket_->min_window_size_; - if (current_window_size_ > min_window_size) { - double beta = socket_->beta_; - - current_window_size_ = current_window_size_ * beta; - if (current_window_size_ < min_window_size) { - current_window_size_ = min_window_size; - } - - socket_->current_window_size_ = current_window_size_; - } - this->rate_estimator_->onWindowDecrease(current_window_size_); -} - -void RaaqmTransportProtocol::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - if (rand() % 10000 <= cur_path_->getDropProb() * 10000) { - decreaseWindow(); - } - } -} - -void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - // Decrease the window because the timeout happened - decreaseWindow(); -} - -void RaaqmTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - updatePathTable(content_object); - increaseWindow(); - updateRtt(interest.getName().getSuffix()); - this->rate_estimator_->onDataReceived( - (int)(content_object.payloadSize() + content_object.headerSize())); - // Set drop probablility and window size accordingly - RAAQM(); -} - } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h index 6ca410251..09d22cd4f 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.h +++ b/libtransport/src/hicn/transport/protocols/raaqm.h @@ -15,65 +15,108 @@ #pragma once +#include +#include #include #include -#include -#include +#include +#include + +#include +#include namespace transport { namespace protocol { -class RaaqmTransportProtocol : public VegasTransportProtocol { +class RaaqmTransportProtocol + : public TransportProtocol, + public BaseReassembly, + public CWindowProtocol, + public BaseReassembly::ContentReassembledCallback { public: - RaaqmTransportProtocol(interface::BaseSocket *icnet_socket); + RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket); ~RaaqmTransportProtocol(); - void start(utils::SharableVector &content_buffer) override; + int start() override; + + void resume() override; + + void reset() override; protected: - void copyContent(const ContentObject &content_object) override; + static constexpr uint32_t buffer_size = + 1 << interface::default_values::log_2_default_buffer_size; + static constexpr uint16_t mask = buffer_size - 1; + using PathTable = + std::unordered_map>; + + void increaseWindow() override; + void decreaseWindow() override; + + virtual void afterContentReception(const Interest &interest, + const ContentObject &content_object); + virtual void afterDataUnsatisfied(uint64_t segment); + + virtual void updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now); private: void init(); - void afterContentReception(const Interest &interest, - const ContentObject &content_object) override; + void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override; - void afterDataUnsatisfied(uint64_t segment) override; + void onContentSegment(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object); - void increaseWindow() override; + void onTimeout(Interest::Ptr &&i) override; - void updateRtt(uint64_t segment); + virtual void scheduleNextInterests() override; - void decreaseWindow() override; + void sendInterest(std::uint64_t next_suffix); - void changeInterestLifetime(uint64_t segment) override; + void sendInterest(Interest::Ptr &&interest); - void onTimeout(Interest::Ptr &&interest) override; + void onContentReassembled(std::error_code ec) override; + + void updateRtt(uint64_t segment); void RAAQM(); void updatePathTable(const ContentObject &content_object); - void check_drop_probability(); + void checkDropProbability(); - void check_for_stale_paths(); + void checkForStalePaths(); void printRtt(); + protected: + // Congestion window management + double current_window_size_; + // Protocol management + uint64_t interests_in_flight_; + std::array interest_retransmissions_; + std::array interest_timepoints_; + std::queue interest_to_retransmit_; + + private: /** * Current download path */ - std::shared_ptr cur_path_; + RaaqmDataPath *cur_path_; /** * Hash table for path: each entry is a pair path ID(key) - path object */ - std::unordered_map> path_table_; + PathTable path_table_; + + // TimePoints for statistic + utils::TimePoint t0_; bool set_interest_filter_; + // for rate-estimation at packet level IcnRateEstimator *rate_estimator_; diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc index ef26eabb5..e25646205 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc @@ -14,6 +14,7 @@ */ #include +#include namespace transport { @@ -42,7 +43,7 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor, raw_data_bytes_received_(0), last_raw_data_bytes_received_(0), rtt_samples_(samples_), - last_received_pkt_(std::chrono::steady_clock::now()), + last_received_pkt_(utils::SteadyClock::now()), average_rtt_(0), alpha_(ALPHA) {} @@ -58,7 +59,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { prop_delay_ = rtt_min_; } - last_received_pkt_ = std::chrono::steady_clock::now(); + last_received_pkt_ = utils::SteadyClock::now(); return *this; } @@ -124,10 +125,6 @@ RaaqmDataPath &RaaqmDataPath::updateDropProb() { return *this; } -double RaaqmDataPath::getMicroSeconds(struct timeval &time) { - return (double)(time.tv_sec) * 1000000 + (double)(time.tv_usec); -} - void RaaqmDataPath::setAlpha(double alpha) { if (alpha >= 0 && alpha <= 1) { alpha_ = alpha; @@ -145,9 +142,10 @@ unsigned int RaaqmDataPath::getPropagationDelay() { } bool RaaqmDataPath::isStale() { - TimePoint now = std::chrono::steady_clock::now(); - auto time = std::chrono::duration_cast(now - last_received_pkt_) - .count(); + utils::TimePoint now = utils::SteadyClock::now(); + auto time = + std::chrono::duration_cast(now - last_received_pkt_) + .count(); if (time > 2000000) { return true; } diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h index a0b9ec9ca..9e4accfa5 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h +++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h @@ -15,6 +15,7 @@ #pragma once +#include #include #include @@ -30,9 +31,6 @@ namespace transport { namespace protocol { class RaaqmDataPath { - using TimePoint = std::chrono::steady_clock::time_point; - using Microseconds = std::chrono::microseconds; - public: RaaqmDataPath(double drop_factor, double minimum_drop_probability, unsigned new_timer, unsigned int samples, @@ -125,12 +123,6 @@ class RaaqmDataPath { */ RaaqmDataPath &updateDropProb(); - /** - * @brief This function convert the time from struct timeval to its value in - * microseconds - */ - static double getMicroSeconds(struct timeval &time); - void setAlpha(double alpha); /** @@ -222,7 +214,7 @@ class RaaqmDataPath { /** * Time of the last call to the path reporter method */ - TimePoint last_received_pkt_; + utils::TimePoint last_received_pkt_; double average_rtt_; double alpha_; diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc index a0bb69153..99fc78c3e 100644 --- a/libtransport/src/hicn/transport/protocols/rate_estimation.cc +++ b/libtransport/src/hicn/transport/protocols/rate_estimation.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include #include #include @@ -33,8 +34,8 @@ void *Timer(void *data) { pthread_mutex_unlock(&(estimator->mutex_)); while (estimator->is_running_) { - std::this_thread::sleep_for( - std::chrono::microseconds((uint64_t)(KV * dat_rtt))); + std::this_thread::sleep_for(std::chrono::microseconds( + (uint64_t)(interface::default_values::kv * dat_rtt))); pthread_mutex_lock(&(estimator->mutex_)); diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h index 91964ec1d..616501b24 100644 --- a/libtransport/src/hicn/transport/protocols/rate_estimation.h +++ b/libtransport/src/hicn/transport/protocols/rate_estimation.h @@ -20,11 +20,6 @@ #include -#define BATCH 50 -#define KV 20 -#define ALPHA 0.8 -#define RATE_CHOICE 0 - namespace transport { namespace protocol { diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc new file mode 100644 index 000000000..36cfb89a7 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, + ContentReassembledCallback *content_callback) + : reassembly_consumer_socket_(icn_socket), + zero_index_manager_(std::make_unique()), + incremental_index_manager_( + std::make_unique(icn_socket)), + manifest_index_manager_( + std::make_unique(icn_socket)), + index_manager_(zero_index_manager_.get()), + index_(0) { + setContentCallback(content_callback); +} + +void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + received_packets_.emplace(std::make_pair( + content_object->getName().getSuffix(), std::move(content_object))); + } + + auto it = received_packets_.find(index_); + while (it != received_packets_.end()) { + if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { + copyContent(*it->second); + received_packets_.erase(it); + } + + index_ = index_manager_->getNextReassemblySegment(); + it = received_packets_.find(index_); + } +} + +void BaseReassembly::copyContent(const ContentObject &content_object) { + utils::Array<> a = content_object.getPayload(); + + std::shared_ptr> content_buffer; + reassembly_consumer_socket_->getSocketOption( + interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); + + content_buffer->insert(content_buffer->end(), (uint8_t *)a.data(), + (uint8_t *)a.data() + a.length()); + + bool download_completed = + index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(download_completed)) { + content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); + } +} + +void BaseReassembly::reset() { + manifest_index_manager_->reset(); + incremental_index_manager_->reset(); + + received_packets_.clear(); +} + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h new file mode 100644 index 000000000..ef3e99fc5 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace transport { + +namespace protocol { + +// Forward Declaration +class ManifestManager; + +class Reassembly { + public: + class ContentReassembledCallback { + public: + virtual void onContentReassembled(std::error_code ec) = 0; + }; + + virtual void reassemble(ContentObject::Ptr &&content_object) = 0; + virtual void reset() = 0; + virtual void setContentCallback(ContentReassembledCallback *callback) { + content_callback_ = callback; + } + + protected: + ContentReassembledCallback *content_callback_; +}; + +class BaseReassembly : public Reassembly { + public: + BaseReassembly(interface::ConsumerSocket *icn_socket, + ContentReassembledCallback *content_callback); + + protected: + virtual void reassemble(ContentObject::Ptr &&content_object) override; + + virtual void copyContent(const ContentObject &content_object); + + virtual void reset() override; + + protected: + // The consumer socket + interface::ConsumerSocket *reassembly_consumer_socket_; + std::unique_ptr zero_index_manager_; + std::unique_ptr incremental_index_manager_; + std::unique_ptr manifest_index_manager_; + IndexVerificationManager *index_manager_; + std::unordered_map received_packets_; + + uint64_t index_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 1356ad566..c2323345f 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -22,8 +22,8 @@ * TODO * 2) start/constructor/rest variable implementation * 3) interest retransmission: now I always recover, we should recover only if - * we have enough time 4) returnContentToUser: rememeber to remove the first - * 32bits from the payload + * we have enough time 4) returnContentToApplication: rememeber to remove the + * first 32bits from the payload */ namespace transport { @@ -32,7 +32,8 @@ namespace protocol { using namespace interface; -RTCTransportProtocol::RTCTransportProtocol(BaseSocket *icnet_socket) +RTCTransportProtocol::RTCTransportProtocol( + interface::ConsumerSocket *icnet_socket) : TransportProtocol(icnet_socket), inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { @@ -46,19 +47,7 @@ RTCTransportProtocol::~RTCTransportProtocol() { } } -void RTCTransportProtocol::start( - utils::SharableVector &content_buffer) { - if (is_running_) return; - - is_running_ = true; - content_buffer_ = content_buffer.shared_from_this(); - - reset(); - scheduleNextInterest(); - - portal_->runEventsLoop(); - is_running_ = false; -} +int RTCTransportProtocol::start() { return TransportProtocol::start(); } void RTCTransportProtocol::stop() { if (!is_running_) return; @@ -74,9 +63,8 @@ void RTCTransportProtocol::resume() { lastRoundBegin_ = std::chrono::steady_clock::now(); inflightInterestsCount_ = 0; - if (content_buffer_) content_buffer_->clear(); - scheduleNextInterest(); + scheduleNextInterests(); portal_->runEventsLoop(); @@ -117,7 +105,6 @@ void RTCTransportProtocol::reset() { while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); nackedByProducer_.clear(); nackedByProducerMaxSize_ = 512; - if (content_buffer_) content_buffer_->clear(); // stats receivedBytes_ = 0; @@ -364,9 +351,9 @@ void RTCTransportProtocol::increaseWindow() { } void RTCTransportProtocol::sendInterest() { - Name interest_name; + Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - interest_name); + &interest_name); bool isRTX = false; // uint32_t sentInt = 0; @@ -382,11 +369,11 @@ void RTCTransportProtocol::sendInterest() { packetLost_++; uint32_t pkt = rtxSeg & modMask_; - interest_name.setSuffix(rtxSeg); + interest_name->setSuffix(rtxSeg); // if the interest is not pending anymore we encrease the retrasnmission // counter in order to avoid to handle a recovered packt as a normal one - if (!portal_->interestIsPending(interest_name)) { + if (!portal_->interestIsPending(*interest_name)) { inflightInterests_[pkt].retransmissions++; } @@ -394,8 +381,8 @@ void RTCTransportProtocol::sendInterest() { isRTX = true; } else { // in this case we send the packet only if it is not pending yet - interest_name.setSuffix(actualSegment_); - if (portal_->interestIsPending(interest_name)) { + interest_name->setSuffix(actualSegment_); + if (portal_->interestIsPending(*interest_name)) { actualSegment_++; return; } @@ -410,21 +397,21 @@ void RTCTransportProtocol::sendInterest() { actualSegment_++; } - auto interest = getInterest(); - interest->setName(interest_name); + auto interest = getPacket(); + interest->setName(*interest_name); uint32_t interestLifetime = default_values::interest_lifetime; socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interestLifetime); interest->setLifetime(uint32_t(interestLifetime)); - ConsumerInterestCallback on_interest_output = VOID_HANDLER; + ConsumerInterestCallback *on_interest_output = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - on_interest_output); + &on_interest_output); - if (on_interest_output != VOID_HANDLER) { - on_interest_output(*dynamic_cast(socket_), *interest); + if (*on_interest_output != VOID_HANDLER) { + (*on_interest_output)(*socket_, *interest); } if (TRANSPORT_EXPECT_FALSE(!is_running_)) { @@ -441,7 +428,7 @@ void RTCTransportProtocol::sendInterest() { } } -void RTCTransportProtocol::scheduleNextInterest() { +void RTCTransportProtocol::scheduleNextInterests() { checkRound(); if (!is_running_) return; @@ -467,7 +454,7 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector &nacks) { interestRetransmissions_.push(nacks[i]); } - scheduleNextInterest(); + scheduleNextInterests(); } void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { // packetLost_++; @@ -483,7 +470,7 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { interestRetransmissions_.push(segmentNumber); } - scheduleNextInterest(); + scheduleNextInterests(); } void RTCTransportProtocol::onNack(const ContentObject &content_object) { @@ -573,14 +560,14 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); } - returnContentToUser(*content_object); + reassemble(std::move(content_object)); increaseWindow(); } - scheduleNextInterest(); + scheduleNextInterests(); } -void RTCTransportProtocol::returnContentToUser( +void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user Array a = content_object.getPayload(); @@ -592,13 +579,14 @@ void RTCTransportProtocol::returnContentToUser( uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq; - content_buffer_->insert(content_buffer_->end(), start, start + size); + std::shared_ptr> content_buffer; + socket_->getSocketOption(APPLICATION_BUFFER, content_buffer); + content_buffer->insert(content_buffer_->end(), start, start + size); - ConsumerContentCallback on_payload = VOID_HANDLER; - socket_->getSocketOption(CONTENT_RETRIEVED, on_payload); - if (on_payload != VOID_HANDLER) { - on_payload(*dynamic_cast(socket_), size, - std::make_error_code(std::errc(0))); + ConsumerContentCallback *on_payload = nullptr; + socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); + if ((*on_payload) != VOID_HANDLER) { + (*on_payload)(*socket_, size, std::make_error_code(std::errc(0))); } } diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 61590fc8e..58c143988 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -20,6 +20,7 @@ #include #include +#include #include // algorithm state @@ -86,23 +87,23 @@ struct sentInterest { uint8_t retransmissions; }; -class RTCTransportProtocol : public TransportProtocol { +class RTCTransportProtocol : public TransportProtocol, public Reassembly { public: - RTCTransportProtocol(interface::BaseSocket *icnet_socket); + RTCTransportProtocol(interface::ConsumerSocket *icnet_socket); ~RTCTransportProtocol(); - void start(utils::SharableVector &content_buffer); + int start() override; - void stop(); + void stop() override; - void resume(); + void resume() override; void onRTCPPacket(uint8_t *packet, size_t len); private: // algo functions - void reset(); + void reset() override; void checkRound(); // CC functions @@ -117,13 +118,18 @@ class RTCTransportProtocol : public TransportProtocol { // packet functions void sendInterest(); - void scheduleNextInterest(); + void scheduleNextInterests() override; void scheduleAppNackRtx(std::vector &nacks); - void onTimeout(Interest::Ptr &&interest); + void onTimeout(Interest::Ptr &&interest) override; void onNack(const ContentObject &content_object); void onContentObject(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object); - void returnContentToUser(const ContentObject &content_object); + ContentObject::Ptr &&content_object) override; + void returnContentToApplication(const ContentObject &content_object); + + TRANSPORT_ALWAYS_INLINE virtual void reassemble( + ContentObject::Ptr &&content_object) override { + returnContentToApplication(*content_object); + } // RTCP functions uint32_t hICN2RTP(uint32_t hicn_seq); @@ -161,7 +167,7 @@ class RTCTransportProtocol : public TransportProtocol { // application for pakets for which we already got a // past NACK by the producer these packet are too old, // they will never be retrived - std::shared_ptr> content_buffer_; + uint32_t modMask_; // stats diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h new file mode 100644 index 000000000..47d164158 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/statistics.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace transport { + +namespace protocol { + +class TransportStatistics { + static constexpr double default_alpha = 0.7; + + public: + TransportStatistics(double alpha = default_alpha) + : retx_count_(0), + bytes_received_(0), + average_rtt_(0), + avg_window_size_(0), + interest_tx_(0), + alpha_(alpha) {} + + TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) { + retx_count_ += retx; + } + + TRANSPORT_ALWAYS_INLINE void updateBytesRecv(uint64_t bytes) { + bytes_received_ += bytes; + } + + TRANSPORT_ALWAYS_INLINE void updateAverageRtt(uint64_t rtt) { + average_rtt_ = (alpha_ * average_rtt_) + ((1. - alpha_) * double(rtt)); + } + + TRANSPORT_ALWAYS_INLINE void updateAverageWindowSize(double current_window) { + avg_window_size_ = + (alpha_ * avg_window_size_) + ((1. - alpha_) * current_window); + } + + TRANSPORT_ALWAYS_INLINE void updateInterestTx(uint64_t int_tx) { + interest_tx_ += int_tx; + } + + TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; } + + TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const { + return bytes_received_; + } + + TRANSPORT_ALWAYS_INLINE double getAverageRtt() const { return average_rtt_; } + + TRANSPORT_ALWAYS_INLINE double getAverageWindowSize() const { + return avg_window_size_; + } + + TRANSPORT_ALWAYS_INLINE uint64_t getInterestTx() const { + return interest_tx_; + } + + TRANSPORT_ALWAYS_INLINE void reset() { + retx_count_ = 0; + bytes_received_ = 0; + average_rtt_ = 0; + avg_window_size_ = 0; + interest_tx_ = 0; + } + + private: + uint64_t retx_count_; + uint64_t bytes_received_; + double average_rtt_; + double avg_window_size_; + uint64_t interest_tx_; + double alpha_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/vegas.cc b/libtransport/src/hicn/transport/protocols/vegas.cc deleted file mode 100644 index f99eb83e0..000000000 --- a/libtransport/src/hicn/transport/protocols/vegas.cc +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -VegasTransportProtocol::VegasTransportProtocol(BaseSocket *icnet_socket) - : TransportProtocol(icnet_socket), - is_final_block_number_discovered_(false), - final_block_number_(std::numeric_limits::max()), - last_reassembled_segment_(0), - content_buffer_size_(0), - current_window_size_(default_values::min_window_size), - interests_in_flight_(0), - next_suffix_(0), - interest_retransmissions_(1 << default_values::log_2_default_buffer_size), - interest_timepoints_(1 << default_values::log_2_default_buffer_size), - retx_count_(0), - receive_buffer_(1 << default_values::log_2_default_buffer_size), - unverified_segments_(1 << default_values::log_2_default_buffer_size), - verified_manifests_(1 << default_values::log_2_default_buffer_size), - mask_((1 << default_values::log_2_default_buffer_size) - 1), - incremental_suffix_index_(0), - suffix_queue_completed_(false), - download_with_manifest_(false), - next_manifest_interval_(0_U16), - interest_tx_(0), - interest_count_(0), - byte_count_(0), - average_rtt_(0.0) { - portal_ = socket_->portal_; - incremental_suffix_index_++; -} - -VegasTransportProtocol::~VegasTransportProtocol() { stop(); } - -void VegasTransportProtocol::reset() { - portal_->setConsumerCallback(this); - - is_final_block_number_discovered_ = false; - interest_pool_index_ = 0; - final_block_number_ = std::numeric_limits::max(); - next_suffix_ = 0; - interests_in_flight_ = 0; - last_reassembled_segment_ = 0; - content_buffer_size_ = 0; - content_buffer_->clear(); - interest_retransmissions_.clear(); - interest_retransmissions_.resize( - 1 << default_values::log_2_default_buffer_size, 0); - interest_timepoints_.clear(); - interest_timepoints_.resize(1 << default_values::log_2_default_buffer_size, - std::chrono::steady_clock::time_point()); - receive_buffer_.clear(); - unverified_segments_.clear(); - verified_manifests_.clear(); - next_manifest_interval_ = 0; - next_manifest_ = 0; - download_with_manifest_ = false; - incremental_suffix_index_ = 0; - - interest_tx_ = 0; - interest_count_ = 0; - byte_count_ = 0; - average_rtt_ = 0; - - // asio::io_service &io_service = portal_->getIoService(); - - // if (io_service.stopped()) { - // io_service.reset(); - // } -} - -void VegasTransportProtocol::start( - utils::SharableVector &content_buffer) { - if (is_running_) return; - - socket_->t0_ = std::chrono::steady_clock::now(); - - is_running_ = true; - content_buffer_ = content_buffer.shared_from_this(); - - reset(); - - sendInterest(next_suffix_++); - portal_->runEventsLoop(); - removeAllPendingInterests(); - is_running_ = false; -} - -void VegasTransportProtocol::resume() { - if (is_running_) return; - - is_running_ = true; - sendInterest(next_suffix_++); - portal_->runEventsLoop(); - removeAllPendingInterests(); - is_running_ = false; -} - -void VegasTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getInterest(); - socket_->network_name_.setSuffix((uint32_t)next_suffix); - interest->setName(socket_->network_name_); - - interest->setLifetime(uint32_t(socket_->interest_lifetime_)); - - if (socket_->on_interest_output_ != VOID_HANDLER) { - socket_->on_interest_output_(*socket_, *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - interests_in_flight_++; - interest_retransmissions_[next_suffix & mask_] = 0; - interest_timepoints_[next_suffix & mask_] = std::chrono::steady_clock::now(); - - using namespace std::placeholders; - portal_->sendInterest(std::move(interest)); -} - -void VegasTransportProtocol::stop() { - is_running_ = false; - portal_->stopEventsLoop(); -} - -void VegasTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - bool virtual_download = socket_->virtual_download_; - - if (verifyContentObject(*content_object)) { - byte_count_ += content_object->getPayload().length(); - - if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { - is_final_block_number_discovered_ = true; - final_block_number_ = incremental_suffix; - } - - if (!virtual_download) { - receive_buffer_.emplace( - std::make_pair(incremental_suffix, std::move(content_object))); - reassemble(); - } else if (TRANSPORT_EXPECT_FALSE(is_final_block_number_discovered_ && - incremental_suffix == - final_block_number_)) { - returnContentToUser(); - } - } else { - unverified_segments_.emplace( - std::make_pair(incremental_suffix, std::move(content_object))); - } -} - -void VegasTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - increaseWindow(); -} - -void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - decreaseWindow(); -} - -void VegasTransportProtocol::scheduleNextInterests() { - if (is_running_) { - uint32_t next_suffix; - while (interests_in_flight_ < current_window_size_) { - if (download_with_manifest_) { - if (suffix_queue_.size() * 2 < current_window_size_ && - next_manifest_ < final_block_number_ && next_manifest_interval_) { - next_manifest_ += next_manifest_interval_; - sendInterest(next_manifest_); - continue; - } - - if (suffix_queue_.pop(next_suffix)) { - // next_suffix = suffix_queue_.front(); - sendInterest(next_suffix); - // suffix_queue_.pop_front(); - } else { - if (!suffix_queue_completed_) { - TRANSPORT_LOGE("Empty queue!!!!!!"); - } - break; - } - } else { - if (is_final_block_number_discovered_) { - if (next_suffix_ > final_block_number_) { - return; - } - } - - sendInterest(next_suffix_++); - } - } - } -} - -void VegasTransportProtocol::decreaseWindow() { - if (current_window_size_ > socket_->min_window_size_) { - current_window_size_ = std::ceil(current_window_size_ / 2); - socket_->current_window_size_ = current_window_size_; - } -} - -void VegasTransportProtocol::increaseWindow() { - if (current_window_size_ < socket_->max_window_size_) { - current_window_size_++; - socket_->max_window_size_ = current_window_size_; - } -}; - -void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) { - std::chrono::steady_clock::duration duration = - std::chrono::steady_clock::now() - interest_timepoints_[segment]; - rtt_estimator_.addMeasurement( - std::chrono::duration_cast(duration)); - - RtoEstimator::Duration rto = rtt_estimator_.computeRto(); - std::chrono::milliseconds lifetime = - std::chrono::duration_cast(rto); - - socket_->interest_lifetime_ = (int)lifetime.count(); -} - -void VegasTransportProtocol::returnContentToUser() { - if (socket_->on_payload_retrieved_ != VOID_HANDLER) { - socket_->on_payload_retrieved_(*socket_, byte_count_, - std::make_error_code(std::errc(0))); - } - - stop(); -} - -void VegasTransportProtocol::onManifest( - std::unique_ptr &&manifest) { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - download_with_manifest_ = true; - - uint32_t segment = manifest->getName().getSuffix(); - - if (verifyManifest(*manifest)) { - manifest->decode(); - - if (TRANSPORT_EXPECT_TRUE(manifest->getVersion() == - core::ManifestVersion::VERSION_1)) { - switch (manifest->getManifestType()) { - case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = --manifest->getSuffixList().end(); - - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { - _end++; - } - - // Get final block number - is_final_block_number_discovered_ = true; - final_block_number_ = manifest->getFinalBlockNumber(); - - for (; _it != _end; _it++) { - suffix_hash_map_[_it->first] = std::make_pair( - std::vector(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push(_it->first); - } - - next_manifest_interval_ = - (unsigned short)manifest->getSuffixList().size(); - - if (manifest->isFinalManifest()) { - suffix_queue_completed_ = true; - // Give it a try - if (verifier_thread_) { - asio::io_service &io_service = portal_->getIoService(); - io_service.post([this]() { scheduleNextInterests(); }); - } - } - - break; - } - case core::ManifestType::FLIC_MANIFEST: { - throw errors::NotImplementedException(); - } - case core::ManifestType::FINAL_CHUNK_NUMBER: { - throw errors::NotImplementedException(); - } - } - } - - if (!socket_->virtual_download_) { - receive_buffer_.emplace( - std::make_pair(segment, std::move(manifest->getPacket()))); - reassemble(); - } else { - if (segment >= final_block_number_) { - stop(); - } - } - } -} - -bool VegasTransportProtocol::verifyManifest( - const ContentObjectManifest &manifest) { - if (!socket_->verify_signature_) { - return true; - } - - bool is_data_secure = false; - - if (socket_->on_content_object_verification_ == VOID_HANDLER) { - is_data_secure = static_cast(socket_->verifier_.verify(manifest)); - } else if (socket_->on_content_object_verification_(*socket_, manifest)) { - is_data_secure = true; - } - - if (TRANSPORT_EXPECT_FALSE(!is_data_secure)) { - TRANSPORT_LOGE("Verification failed for %s\n", - manifest.getName().toString().c_str()); - } - - return is_data_secure; -} - -// TODO Add the name in the digest computation! -void VegasTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - - std::chrono::microseconds rtt; - Time now = std::chrono::steady_clock::now(); - std::chrono::steady_clock::duration duration = - now - interest_timepoints_[incremental_suffix & mask_]; - rtt = std::chrono::duration_cast(duration); - - average_rtt_ = (0.7 * average_rtt_) + (0.3 * (double)rtt.count()); - - if (socket_->on_timer_expires_ != VOID_HANDLER) { - auto dt = std::chrono::duration_cast(now - socket_->t0_); - if (dt.count() > socket_->timer_interval_milliseconds_) { - socket_->on_timer_expires_(*socket_, byte_count_, dt, - (float)current_window_size_, retx_count_, - (uint32_t)std::round(average_rtt_)); - socket_->t0_ = std::chrono::steady_clock::now(); - } - } - - interests_in_flight_--; - - if (TRANSPORT_EXPECT_FALSE(!is_running_ || incremental_suffix == ~0_U64 || - receive_buffer_.find(incremental_suffix) != - receive_buffer_.end())) { - return; - } - - changeInterestLifetime(incremental_suffix); - - if (socket_->on_content_object_input_ != VOID_HANDLER) { - socket_->on_content_object_input_(*socket_, *content_object); - } - - if (socket_->on_interest_satisfied_ != VOID_HANDLER) { - socket_->on_interest_satisfied_(*socket_, *interest); - } - - if (!interest_retransmissions_[incremental_suffix & mask_]) { - afterContentReception(*interest, *content_object); - } - - if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == - PayloadType::MANIFEST)) { - // TODO Fix manifest!! - auto manifest = - std::make_unique(std::move(content_object)); - - if (verifier_thread_ && incremental_suffix != 0) { - // verifier_thread_->add(std::bind(&VegasTransportProtocol::onManifest, - // this, std::move(manifest))); - } else { - onManifest(std::move(manifest)); - } - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - if (verifier_thread_) { - // verifier_thread_->add(std::bind(&VegasTransportProtocol::onContentSegment, - // this, std::move(content_object))); - } else { - onContentSegment(std::move(interest), std::move(content_object)); - } - } - - scheduleNextInterests(); -} - -bool VegasTransportProtocol::verifyContentObject( - const ContentObject &content_object) { - if (!dynamic_cast(socket_)->verify_signature_) { - return true; - } - - uint64_t segment = content_object.getName().getSuffix(); - - bool ret = false; - - if (download_with_manifest_) { - auto it = suffix_hash_map_.find((const unsigned int)segment); - if (it != suffix_hash_map_.end()) { - auto hash_type = static_cast(it->second.second); - auto data_packet_digest = content_object.computeDigest(it->second.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest().data(); - std::vector &manifest_digest_bytes = it->second.first; - - if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, - manifest_digest_bytes.data(), - hash_type)) { - suffix_hash_map_.erase(it); - ret = true; - } else { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - } - } else { - ret = static_cast( - dynamic_cast(socket_)->verifier_.verify( - content_object)); - - if (!ret) { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - } - - return ret; - ; -} - -void VegasTransportProtocol::onTimeout(Interest::Ptr &&interest) { - TRANSPORT_LOGW("Timeout on %s", interest->getName().toString().c_str()); - - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - interests_in_flight_--; - - uint64_t segment = interest->getName().getSuffix(); - - // Do not retransmit interests asking contents that do not exist. - if (is_final_block_number_discovered_) { - if (segment > final_block_number_) { - return; - } - } - - if (socket_->on_interest_timeout_ != VOID_HANDLER) { - socket_->on_interest_timeout_(*socket_, *interest); - } - - afterDataUnsatisfied(segment); - - if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask_] < - socket_->max_retransmissions_)) { - retx_count_++; - - if (socket_->on_interest_retransmission_ != VOID_HANDLER) { - socket_->on_interest_retransmission_(*socket_, *interest); - } - - if (socket_->on_interest_output_ != VOID_HANDLER) { - socket_->on_interest_output_(*socket_, *interest); - } - - if (!is_running_) { - return; - } - - // retransmit - interests_in_flight_++; - interest_retransmissions_[segment & mask_]++; - - using namespace std::placeholders; - portal_->sendInterest(std::move(interest)); - } else { - TRANSPORT_LOGE("Stop: reached max retx limit."); - partialDownload(); - stop(); - } -} - -void VegasTransportProtocol::copyContent(const ContentObject &content_object) { - Array a = content_object.getPayload(); - - content_buffer_->insert(content_buffer_->end(), (uint8_t *)a.data(), - (uint8_t *)a.data() + a.length()); - - bool download_completed = - is_final_block_number_discovered_ && - content_object.getName().getSuffix() == final_block_number_; - - if (TRANSPORT_EXPECT_FALSE(download_completed || !is_running_)) { - // asio::io_service& io_service = portal_->getIoService(); - // io_service.post([this] () { - returnContentToUser(); - // }); - } -} - -void VegasTransportProtocol::reassemble() { - uint64_t index = last_reassembled_segment_; - auto it = receive_buffer_.find((const unsigned int)index); - - while (it != receive_buffer_.end()) { - if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { - copyContent(*it->second); - receive_buffer_.erase(it); - } - - index = ++last_reassembled_segment_; - it = receive_buffer_.find((const unsigned int)index); - } -} - -void VegasTransportProtocol::partialDownload() { - if (!socket_->virtual_download_) { - reassemble(); - } - - if (socket_->on_payload_retrieved_ != VOID_HANDLER) { - socket_->on_payload_retrieved_( - *socket_, byte_count_, - std::make_error_code(std::errc(std::errc::io_error))); - } -} - -// TODO Check vegas protocol -// void VegasTransportProtocol::checkForFastRetransmission(const Interest -// &interest) { -// uint64_t segNumber = interest.getName().getSuffix(); -// received_segments_[segNumber] = true; -// fast_retransmitted_segments.erase(segNumber); - -// uint64_t possibly_lost_segment = 0; -// uint64_t highest_received_segment = received_segments_.rbegin()->first; - -// for (uint64_t i = 0; i <= highest_received_segment; i++) { -// if (received_segments_.find(i) == received_segments_.end()) { -// if (fast_retransmitted_segments.find(i) == -// fast_retransmitted_segments.end()) { -// possibly_lost_segment = i; -// uint8_t out_of_order_segments = 0; -// for (uint64_t j = i; j <= highest_received_segment; j++) { -// if (received_segments_.find(j) != received_segments_.end()) { -// out_of_order_segments++; -// if (out_of_order_segments >= -// default_values::max_out_of_order_segments) { -// fast_retransmitted_segments[possibly_lost_segment] = true; -// fastRetransmit(interest, possibly_lost_segment); -// } -// } -// } -// } -// } -// } -// } - -// void VegasTransportProtocol::fastRetransmit(const Interest &interest, -// uint32_t chunk_number) { -// if (interest_retransmissions_[chunk_number & mask_] < -// socket_->max_retransmissions_) { -// Name name = interest.getName(); -// name.setSuffix(chunk_number); - -// std::shared_ptr retx_interest = -// std::make_shared(name); - -// if (socket_->on_interest_retransmission_ != VOID_HANDLER) { -// socket_->on_interest_retransmission_(*socket_, *retx_interest); -// } - -// if (socket_->on_interest_output_ != VOID_HANDLER) { -// socket_->on_interest_output_(*socket_, *retx_interest); -// } - -// if (!is_running_) { -// return; -// } - -// interests_in_flight_++; -// interest_retransmissions_[chunk_number & mask_]++; - -// using namespace std::placeholders; -// portal_->sendInterest(std::move(retx_interest)); -// } -// } - -void VegasTransportProtocol::removeAllPendingInterests() { portal_->clear(); } - -} // end namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/vegas.h b/libtransport/src/hicn/transport/protocols/vegas.h deleted file mode 100644 index 7791ffc94..000000000 --- a/libtransport/src/hicn/transport/protocols/vegas.h +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include - -namespace transport { - -namespace protocol { - -typedef utils::CircularFifo SuffixQueue; -typedef std::chrono::time_point Time; -typedef std::chrono::milliseconds TimeDuration; - -class VegasTransportProtocol : public TransportProtocol { - public: - VegasTransportProtocol(interface::BaseSocket *icnet_socket); - - virtual ~VegasTransportProtocol(); - - virtual void start(utils::SharableVector &content_buffer) override; - - void stop() override; - - void resume() override; - - protected: - void reset(); - - void sendInterest(std::uint64_t next_suffix); - - void onContentSegment(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object); - - bool verifyContentObject(const ContentObject &content_object); - - bool verifyManifest(const interface::ContentObjectManifest &manifest); - - virtual void onTimeout(Interest::Ptr &&interest) override; - - void onManifest(std::unique_ptr &&manifest); - - void onContentObject(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) override; - - virtual void changeInterestLifetime(uint64_t segment); - - void scheduleNextInterests(); - - virtual void decreaseWindow(); - - virtual void increaseWindow(); - - virtual void afterContentReception(const Interest &interest, - const ContentObject &content_object); - - virtual void afterDataUnsatisfied(uint64_t segment); - - void reassemble(); - - void returnContentToUser(); - - void partialDownload(); - - virtual void copyContent(const ContentObject &content_object); - - // virtual void checkForFastRetransmission(const Interest &interest); - - // void fastRetransmit(const Interest &interest, uint32_t chunk_number); - - void removeAllPendingInterests(); - - protected: - void handleTimeout(const std::error_code &ec); - - // reassembly variables - volatile bool is_final_block_number_discovered_; - std::atomic final_block_number_; - uint64_t last_reassembled_segment_; - std::shared_ptr> content_buffer_; - size_t content_buffer_size_; - - // transmission variablesis_final_block_number_discovered_ - double current_window_size_; - double pending_window_size_; - uint64_t interests_in_flight_; - uint64_t next_suffix_; - std::vector interest_retransmissions_; - std::vector interest_timepoints_; - RtoEstimator rtt_estimator_; - - uint32_t retx_count_; - - // buffers - std::unordered_map - receive_buffer_; // verified segments by segment number - std::unordered_map - unverified_segments_; // used with embedded manifests - std::unordered_map - verified_manifests_; // by segment number - - std::uint16_t interest_pool_index_; - std::uint16_t mask_; - - // suffix randomization: since the suffixes in the manifests could not be in a - // sequential order, we need to map those suffixes into an ordered sequence. - std::unordered_map - incremental_suffix_to_real_suffix_map_; - std::unordered_map - real_suffix_to_incremental_suffix_map_; - std::uint32_t incremental_suffix_index_; - - // verification - std::unordered_map, HashAlgorithm>> - suffix_hash_map_; - - // Fast Retransmission - std::map received_segments_; - std::unordered_map fast_retransmitted_segments; - - // Suffix queue - volatile bool suffix_queue_completed_; - SuffixQueue suffix_queue_; - - volatile bool download_with_manifest_; - uint32_t next_manifest_; - std::atomic next_manifest_interval_; - - std::unique_ptr verifier_thread_; - - uint32_t interest_tx_; - uint32_t interest_count_; - - uint64_t byte_count_; - double average_rtt_; - - std::unordered_map sign_time_; -}; - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc deleted file mode 100644 index a61fd05f0..000000000 --- a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -RtoEstimator::RtoEstimator(Duration min_rto) - : smoothed_rtt_((double)RtoEstimator::getInitialRtt().count()), - rtt_variation_(0), - first_measurement_(true), - last_rto_((double)min_rto.count()) {} - -void RtoEstimator::addMeasurement(Duration rtt) { - double duration = static_cast(rtt.count()); - if (first_measurement_) { - smoothed_rtt_ = duration; - rtt_variation_ = duration / 2; - first_measurement_ = false; - } else { - rtt_variation_ = (1 - default_values::beta) * rtt_variation_ + - default_values::beta * std::abs(smoothed_rtt_ - duration); - smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ + - default_values::alpha * duration; - } -} - -RtoEstimator::Duration RtoEstimator::computeRto() const { - double rto = smoothed_rtt_ + - std::max(double(default_values::clock_granularity.count()), - default_values::k* rtt_variation_); - return Duration(static_cast(rto)); -} - -} // end namespace protocol - -} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h deleted file mode 100644 index e84afc49c..000000000 --- a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -// Implementation inspired from RFC6298 -// (https://tools.ietf.org/search/rfc6298#ref-JK88) - -namespace transport { - -namespace protocol { - -class RtoEstimator { - public: - typedef std::chrono::microseconds Duration; - - static Duration getInitialRtt() { return std::chrono::seconds(1); } - - RtoEstimator(Duration min_rto = std::chrono::seconds(1)); - - void addMeasurement(Duration measure); - - Duration computeRto() const; - - private: - double smoothed_rtt_; - double rtt_variation_; - bool first_measurement_; - double last_rto_; -}; - -} // end namespace protocol - -} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h new file mode 100644 index 000000000..da67e86f8 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/verification_manager.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace transport { + +namespace protocol { + +class VerificationManager { + public: + virtual ~VerificationManager() = default; + virtual bool onPacketToVerify(const Packet& packet) = 0; +}; + +class SignatureVerificationManager : public VerificationManager { + public: + SignatureVerificationManager(interface::ConsumerSocket* icn_socket) + : icn_socket_(icn_socket) {} + + TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override { + using namespace interface; + + bool verify_signature, ret = false; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, + verify_signature); + + if (!verify_signature) { + return true; + } + + std::shared_ptr verifier; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + if (TRANSPORT_EXPECT_FALSE(!verifier)) { + throw errors::RuntimeException( + "No certificate provided by the application."); + } + + ret = verifier->verify(packet); + + if (!ret) { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + + return ret; + } + + private: + interface::ConsumerSocket* icn_socket_; +}; + +} // end namespace protocol + +} // end namespace transport -- cgit 1.2.3-korg