diff options
Diffstat (limited to 'libtransport/src/protocols')
52 files changed, 5726 insertions, 284 deletions
diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt index 8bfbdd6ad..eba8d1aab 100644 --- a/libtransport/src/protocols/CMakeLists.txt +++ b/libtransport/src/protocols/CMakeLists.txt @@ -21,16 +21,15 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h - ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h - ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/transport_protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/production_protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_bytestream.h + ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_rtc.h ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h ${CMAKE_CURRENT_SOURCE_DIR}/errors.h - ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h ) @@ -41,15 +40,15 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc - ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc + ${CMAKE_CURRENT_SOURCE_DIR}/transport_protocol.cc + ${CMAKE_CURRENT_SOURCE_DIR}/production_protocol.cc + ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_bytestream.cc + ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_rtc.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc - ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc ) set(RAAQM_CONFIG_INSTALL_PREFIX @@ -71,5 +70,7 @@ install( COMPONENT lib${LIBTRANSPORT} ) +add_subdirectory(rtc) + set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) -set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index 6662bec3f..d2bc961c4 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -20,7 +20,7 @@ #include <protocols/byte_stream_reassembly.h> #include <protocols/errors.h> #include <protocols/indexer.h> -#include <protocols/protocol.h> +#include <protocols/transport_protocol.h> namespace transport { @@ -45,11 +45,11 @@ void ByteStreamReassembly::reassemble( } } -void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) && - read_buffer_->capacity()) { - received_packets_.emplace(std::make_pair( - content_object->getName().getSuffix(), std::move(content_object))); +void ByteStreamReassembly::reassemble(ContentObject &content_object) { + if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) { + received_packets_.emplace( + std::make_pair(content_object.getName().getSuffix(), + content_object.shared_from_this())); assembleContent(); } } @@ -81,25 +81,32 @@ void ByteStreamReassembly::assembleContent() { } } -bool ByteStreamReassembly::copyContent(const ContentObject &content_object) { +bool ByteStreamReassembly::copyContent(ContentObject &content_object) { bool ret = false; - auto payload = content_object.getPayloadReference(); - auto payload_length = payload.second; - auto write_size = std::min(payload_length, read_buffer_->tailroom()); - auto additional_bytes = payload_length > read_buffer_->tailroom() - ? payload_length - read_buffer_->tailroom() - : 0; + content_object.trimStart(content_object.headerSize()); - std::memcpy(read_buffer_->writableTail(), payload.first, write_size); - read_buffer_->append(write_size); + utils::MemBuf *current = &content_object; - if (!read_buffer_->tailroom()) { - notifyApplication(); - std::memcpy(read_buffer_->writableTail(), payload.first + write_size, - additional_bytes); - read_buffer_->append(additional_bytes); - } + do { + auto payload_length = current->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), current->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), current->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } + + current = current->next(); + } while (current != &content_object); download_complete_ = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h index e4f62b3a8..c682d58cb 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.h +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -27,12 +27,12 @@ class ByteStreamReassembly : public Reassembly { TransportProtocol *transport_protocol); protected: - virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + virtual void reassemble(core::ContentObject &content_object) override; virtual void reassemble( std::unique_ptr<core::ContentObjectManifest> &&manifest) override; - bool copyContent(const core::ContentObject &content_object); + bool copyContent(core::ContentObject &content_object); virtual void reInitialize() override; diff --git a/libtransport/src/protocols/data_processing_events.h b/libtransport/src/protocols/data_processing_events.h index 8975c2b4a..5c8c16157 100644 --- a/libtransport/src/protocols/data_processing_events.h +++ b/libtransport/src/protocols/data_processing_events.h @@ -24,8 +24,7 @@ namespace protocol { class ContentObjectProcessingEventCallback { public: virtual ~ContentObjectProcessingEventCallback() = default; - virtual void onPacketDropped(core::Interest::Ptr &&i, - core::ContentObject::Ptr &&c) = 0; + virtual void onPacketDropped(core::Interest &i, core::ContentObject &c) = 0; virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; }; diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc index abd7e984d..962c1e020 100644 --- a/libtransport/src/protocols/datagram_reassembly.cc +++ b/libtransport/src/protocols/datagram_reassembly.cc @@ -24,8 +24,8 @@ DatagramReassembly::DatagramReassembly( TransportProtocol* transport_protocol) : Reassembly(icn_socket, transport_protocol) {} -void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) { - read_buffer_ = content_object->getPayload(); +void DatagramReassembly::reassemble(core::ContentObject& content_object) { + read_buffer_ = content_object.getPayload(); Reassembly::notifyApplication(); } diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h index 2427ae62f..3462212d3 100644 --- a/libtransport/src/protocols/datagram_reassembly.h +++ b/libtransport/src/protocols/datagram_reassembly.h @@ -26,7 +26,7 @@ class DatagramReassembly : public Reassembly { DatagramReassembly(implementation::ConsumerSocket *icn_socket, TransportProtocol *transport_protocol); - virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + virtual void reassemble(core::ContentObject &content_object) override; virtual void reInitialize() override; virtual void reassemble( std::unique_ptr<core::ContentObjectManifest> &&manifest) override { diff --git a/libtransport/src/protocols/errors.cc b/libtransport/src/protocols/errors.cc index eefb6f957..ae7b6e634 100644 --- a/libtransport/src/protocols/errors.cc +++ b/libtransport/src/protocols/errors.cc @@ -52,7 +52,9 @@ std::string protocol_category_impl::message(int ev) const { case protocol_error::session_aborted: { return "The session has been aborted by the application."; } - default: { return "Unknown protocol error"; } + default: { + return "Unknown protocol error"; + } } } diff --git a/libtransport/src/protocols/fec_base.h b/libtransport/src/protocols/fec_base.h new file mode 100644 index 000000000..a135c474f --- /dev/null +++ b/libtransport/src/protocols/fec_base.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> + +#include <functional> + +namespace transport { +namespace protocol { + +/** + * Interface classes to integrate FEC inside any producer transport protocol + */ +class ProducerFECBase { + public: + /** + * Callback, to be called by implementations as soon as a repair packet is + * ready. + */ + using RepairPacketsReady = + std::function<void(std::vector<core::ContentObject::Ptr> &)>; + + /** + * Producers will call this function upon production of a new packet. + */ + virtual void onPacketProduced(const core::ContentObject &content_object) = 0; + + /** + * Set callback to signal production protocol the repair packet is ready. + */ + void setFECCallback(const RepairPacketsReady &on_repair_packet) { + rep_packet_ready_callback_ = on_repair_packet; + } + + protected: + RepairPacketsReady rep_packet_ready_callback_; +}; + +/** + * Interface classes to integrate FEC inside any consumer transport protocol + */ +class ConsumerFECBase { + public: + /** + * Callback, to be called by implemrntations as soon as a packet is recovered. + */ + using OnPacketsRecovered = + std::function<void(std::vector<core::ContentObject::Ptr> &)>; + + /** + * Consumers will call this function when they receive a FEC packet. + */ + virtual void onFECPacket(const core::ContentObject &content_object) = 0; + + /** + * Consumers will call this function when they receive a data packet + */ + virtual void onDataPacket(const core::ContentObject &content_object) = 0; + + /** + * Set callback to signal consumer protocol the repair packet is ready. + */ + void setFECCallback(const OnPacketsRecovered &on_repair_packet) { + packet_recovered_callback_ = on_repair_packet; + } + + protected: + OnPacketsRecovered packet_recovered_callback_; +}; + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc index 0872c4554..95daa0a3e 100644 --- a/libtransport/src/protocols/incremental_indexer.cc +++ b/libtransport/src/protocols/incremental_indexer.cc @@ -13,37 +13,38 @@ * limitations under the License. */ -#include <protocols/incremental_indexer.h> - #include <hicn/transport/interfaces/socket_consumer.h> -#include <protocols/protocol.h> +#include <protocols/errors.h> +#include <protocols/incremental_indexer.h> +#include <protocols/transport_protocol.h> namespace transport { namespace protocol { -void IncrementalIndexer::onContentObject( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { +void IncrementalIndexer::onContentObject(core::Interest &interest, + core::ContentObject &content_object) { using namespace interface; - TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); + TRANSPORT_LOGD("Received content %s", + content_object.getName().toString().c_str()); - if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { - final_suffix_ = content_object->getName().getSuffix(); + if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) { + final_suffix_ = content_object.getName().getSuffix(); } - auto ret = verification_manager_->onPacketToVerify(*content_object); + auto ret = verifier_->verifyPackets(&content_object); switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - reassembly_->reassemble(std::move(content_object)); + case auth::VerificationPolicy::ACCEPT: { + reassembly_->reassemble(content_object); break; } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(interest), - std::move(content_object)); + case auth::VerificationPolicy::UNKNOWN: + case auth::VerificationPolicy::DROP: { + transport_protocol_->onPacketDropped(interest, content_object); break; } - case VerificationPolicy::ABORT_SESSION: { + case auth::VerificationPolicy::ABORT: { transport_protocol_->onContentReassembled( make_error_code(protocol_error::session_aborted)); break; diff --git a/libtransport/src/protocols/incremental_indexer.h b/libtransport/src/protocols/incremental_indexer.h index 20c5e4759..d7760f8e6 100644 --- a/libtransport/src/protocols/incremental_indexer.h +++ b/libtransport/src/protocols/incremental_indexer.h @@ -15,13 +15,13 @@ #pragma once -#include <hicn/transport/errors/runtime_exception.h> -#include <hicn/transport/errors/unexpected_manifest_exception.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/auth/verifier.h> #include <hicn/transport/utils/literals.h> - +#include <implementation/socket_consumer.h> #include <protocols/indexer.h> #include <protocols/reassembly.h> -#include <protocols/verification_manager.h> #include <deque> @@ -47,11 +47,12 @@ class IncrementalIndexer : public Indexer { first_suffix_(0), next_download_suffix_(0), next_reassembly_suffix_(0), - verification_manager_( - std::make_unique<SignatureVerificationManager>(icn_socket)) { + verifier_(nullptr) { if (reassembly_) { reassembly_->setIndexer(this); } + socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER, + verifier_); } IncrementalIndexer(const IncrementalIndexer &) = delete; @@ -64,15 +65,14 @@ class IncrementalIndexer : public Indexer { first_suffix_(other.first_suffix_), next_download_suffix_(other.next_download_suffix_), next_reassembly_suffix_(other.next_reassembly_suffix_), - verification_manager_(std::move(other.verification_manager_)) { + verifier_(nullptr) { if (reassembly_) { reassembly_->setIndexer(this); } + socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER, + verifier_); } - /** - * - */ virtual ~IncrementalIndexer() {} TRANSPORT_ALWAYS_INLINE virtual void reset( @@ -112,8 +112,8 @@ class IncrementalIndexer : public Indexer { return final_suffix_; } - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; + void onContentObject(core::Interest &interest, + core::ContentObject &content_object) override; TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) { reassembly_ = reassembly; @@ -123,10 +123,6 @@ class IncrementalIndexer : public Indexer { } } - TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override { - return verification_manager_->onKeyToVerify(); - } - protected: implementation::ConsumerSocket *socket_; Reassembly *reassembly_; @@ -135,9 +131,8 @@ class IncrementalIndexer : public Indexer { uint32_t first_suffix_; uint32_t next_download_suffix_; uint32_t next_reassembly_suffix_; - std::unique_ptr<VerificationManager> verification_manager_; + std::shared_ptr<auth::Verifier> verifier_; }; -} // end namespace protocol - -} // end namespace transport +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/indexer.cc b/libtransport/src/protocols/indexer.cc index ca12330a6..1379a609c 100644 --- a/libtransport/src/protocols/indexer.cc +++ b/libtransport/src/protocols/indexer.cc @@ -14,11 +14,9 @@ */ #include <hicn/transport/utils/branch_prediction.h> - #include <protocols/incremental_indexer.h> #include <protocols/indexer.h> #include <protocols/manifest_incremental_indexer.h> -#include <protocols/protocol.h> namespace transport { namespace protocol { @@ -32,16 +30,16 @@ IndexManager::IndexManager(implementation::ConsumerSocket *icn_socket, transport_(transport), reassembly_(reassembly) {} -void IndexManager::onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) { +void IndexManager::onContentObject(core::Interest &interest, + core::ContentObject &content_object) { if (first_segment_received_) { - indexer_->onContentObject(std::move(interest), std::move(content_object)); + indexer_->onContentObject(interest, content_object); } else { - std::uint32_t segment_number = interest->getName().getSuffix(); + std::uint32_t segment_number = interest.getName().getSuffix(); if (segment_number == 0) { // Check if manifest - if (content_object->getPayloadType() == PayloadType::MANIFEST) { + if (content_object.getPayloadType() == core::PayloadType::MANIFEST) { IncrementalIndexer *indexer = static_cast<IncrementalIndexer *>(indexer_.release()); indexer_ = @@ -49,25 +47,21 @@ void IndexManager::onContentObject(core::Interest::Ptr &&interest, delete indexer; } - indexer_->onContentObject(std::move(interest), std::move(content_object)); + indexer_->onContentObject(interest, content_object); auto it = interest_data_set_.begin(); while (it != interest_data_set_.end()) { - indexer_->onContentObject( - std::move(const_cast<core::Interest::Ptr &&>(it->first)), - std::move(const_cast<core::ContentObject::Ptr &&>(it->second))); + indexer_->onContentObject(*it->first, *it->second); it = interest_data_set_.erase(it); } first_segment_received_ = true; } else { - interest_data_set_.emplace(std::move(interest), - std::move(content_object)); + interest_data_set_.emplace(interest.shared_from_this(), + content_object.shared_from_this()); } } } -bool IndexManager::onKeyToVerify() { return indexer_->onKeyToVerify(); } - void IndexManager::reset(std::uint32_t offset) { indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_, reassembly_); diff --git a/libtransport/src/protocols/indexer.h b/libtransport/src/protocols/indexer.h index 8213a1503..49e22a4cf 100644 --- a/libtransport/src/protocols/indexer.h +++ b/libtransport/src/protocols/indexer.h @@ -33,10 +33,8 @@ class TransportProtocol; class Indexer { public: - /** - * - */ virtual ~Indexer() = default; + /** * Retrieve from the manifest the next suffix to retrieve. */ @@ -55,10 +53,8 @@ class Indexer { virtual void reset(std::uint32_t offset = 0) = 0; - virtual void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) = 0; - - virtual bool onKeyToVerify() = 0; + virtual void onContentObject(core::Interest &interest, + core::ContentObject &content_object) = 0; }; class IndexManager : Indexer { @@ -86,10 +82,8 @@ class IndexManager : Indexer { void reset(std::uint32_t offset = 0) override; - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; - - bool onKeyToVerify() override; + void onContentObject(core::Interest &interest, + core::ContentObject &content_object) override; private: std::unique_ptr<Indexer> indexer_; diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc index da835b577..a6312ca90 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer.cc +++ b/libtransport/src/protocols/manifest_incremental_indexer.cc @@ -14,9 +14,9 @@ */ #include <implementation/socket_consumer.h> - +#include <protocols/errors.h> #include <protocols/manifest_incremental_indexer.h> -#include <protocols/protocol.h> +#include <protocols/transport_protocol.h> #include <cmath> #include <deque> @@ -36,41 +36,46 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer( 0)) {} void ManifestIncrementalIndexer::onContentObject( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - // Check if manifest or not - if (content_object->getPayloadType() == PayloadType::MANIFEST) { - TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str()); - onUntrustedManifest(std::move(interest), std::move(content_object)); - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str()); - onUntrustedContentObject(std::move(interest), std::move(content_object)); - } -} - -void ManifestIncrementalIndexer::onUntrustedManifest( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - auto ret = verification_manager_->onPacketToVerify(*content_object); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - processTrustedManifest(std::move(content_object)); + core::Interest &interest, core::ContentObject &content_object) { + switch (content_object.getPayloadType()) { + case PayloadType::DATA: { + TRANSPORT_LOGD("Received content %s", + content_object.getName().toString().c_str()); + onUntrustedContentObject(interest, content_object); break; } - case VerificationPolicy::DROP_PACKET: - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); + case PayloadType::MANIFEST: { + TRANSPORT_LOGD("Received manifest %s", + content_object.getName().toString().c_str()); + onUntrustedManifest(interest, content_object); break; } + default: { + return; + } } } -void ManifestIncrementalIndexer::processTrustedManifest( - ContentObject::Ptr &&content_object) { +void ManifestIncrementalIndexer::onUntrustedManifest( + core::Interest &interest, core::ContentObject &content_object) { auto manifest = - std::make_unique<ContentObjectManifest>(std::move(*content_object)); + std::make_unique<ContentObjectManifest>(std::move(content_object)); + + auth::VerificationPolicy policy = verifier_->verifyPackets(manifest.get()); + manifest->decode(); + if (policy != auth::VerificationPolicy::ACCEPT) { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + return; + } + + processTrustedManifest(interest, std::move(manifest)); +} + +void ManifestIncrementalIndexer::processTrustedManifest( + core::Interest &interest, std::unique_ptr<ContentObjectManifest> manifest) { if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != core::ManifestVersion::VERSION_1)) { throw errors::RuntimeException("Received manifest with unknown version."); @@ -78,23 +83,45 @@ void ManifestIncrementalIndexer::processTrustedManifest( switch (manifest->getManifestType()) { case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = manifest->getSuffixList().end(); - suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); - for (; _it != _end; _it++) { - auto hash = - std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); + // The packets to verify with the received manifest + std::vector<auth::PacketPtr> packets; + + // Convert the received manifest to a map of packet suffixes to hashes + std::unordered_map<auth::Suffix, auth::HashEntry> current_manifest = + core::ContentObjectManifest::getSuffixMap(manifest.get()); + + // Update 'suffix_map_' with new hashes from the received manifest and + // build 'packets' + for (auto it = current_manifest.begin(); it != current_manifest.end();) { + if (unverified_segments_.find(it->first) == + unverified_segments_.end()) { + suffix_map_[it->first] = std::move(it->second); + current_manifest.erase(it++); + continue; + } - if (!checkUnverifiedSegments(_it->first, hash)) { - suffix_hash_map_[_it->first] = std::move(hash); + packets.push_back(unverified_segments_[it->first].second.get()); + it++; + } + + // Verify unverified segments using the received manifest + std::vector<auth::VerificationPolicy> policies = + verifier_->verifyPackets(packets, current_manifest); + + for (unsigned int i = 0; i < packets.size(); ++i) { + auth::Suffix suffix = packets[i]->getName().getSuffix(); + + if (policies[i] != auth::VerificationPolicy::UNKNOWN) { + unverified_segments_.erase(suffix); } + + applyPolicy(*unverified_segments_[suffix].first, + *unverified_segments_[suffix].second, policies[i]); } reassembly_->reassemble(std::move(manifest)); - break; } case core::ManifestType::FLIC_MANIFEST: { @@ -106,89 +133,47 @@ void ManifestIncrementalIndexer::processTrustedManifest( } } -bool ManifestIncrementalIndexer::checkUnverifiedSegments( - std::uint32_t suffix, const HashEntry &hash) { - auto it = unverified_segments_.find(suffix); - - if (it != unverified_segments_.end()) { - auto ret = verifyContentObject(hash, *it->second.second); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - reassembly_->reassemble(std::move(it->second.second)); - break; - } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(it->second.first), - std::move(it->second.second)); - break; - } - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } +void ManifestIncrementalIndexer::onUntrustedContentObject( + Interest &interest, ContentObject &content_object) { + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = + verifier_->verifyPackets(&content_object, suffix_map_); + + switch (policy) { + case auth::VerificationPolicy::UNKNOWN: { + unverified_segments_[suffix] = std::make_pair( + interest.shared_from_this(), content_object.shared_from_this()); + break; + } + default: { + suffix_map_.erase(suffix); + break; } - - unverified_segments_.erase(it); - return true; - } - - return false; -} - -VerificationPolicy ManifestIncrementalIndexer::verifyContentObject( - const HashEntry &manifest_hash, const ContentObject &content_object) { - VerificationPolicy ret; - - auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second); - auto data_packet_digest = content_object.computeDigest(manifest_hash.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest<uint8_t>().data(); - const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first; - - if (utils::CryptoHash::compareBinaryDigest( - data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) { - ret = VerificationPolicy::ACCEPT_PACKET; - } else { - ConsumerContentObjectVerificationFailedCallback - *verification_failed_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, - &verification_failed_callback); - ret = (*verification_failed_callback)( - *socket_->getInterface(), content_object, - make_error_code(protocol_error::integrity_verification_failed)); } - return ret; + applyPolicy(interest, content_object, policy); } -void ManifestIncrementalIndexer::onUntrustedContentObject( - Interest::Ptr &&i, ContentObject::Ptr &&c) { - auto suffix = c->getName().getSuffix(); - auto it = suffix_hash_map_.find(suffix); - - if (it != suffix_hash_map_.end()) { - auto ret = verifyContentObject(it->second, *c); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - suffix_hash_map_.erase(it); - reassembly_->reassemble(std::move(c)); - break; - } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(i), std::move(c)); - break; - } - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } +void ManifestIncrementalIndexer::applyPolicy( + core::Interest &interest, core::ContentObject &content_object, + auth::VerificationPolicy policy) { + switch (policy) { + case auth::VerificationPolicy::ACCEPT: { + reassembly_->reassemble(content_object); + break; + } + case auth::VerificationPolicy::DROP: { + transport_protocol_->onPacketDropped(interest, content_object); + break; + } + case auth::VerificationPolicy::ABORT: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + default: { + break; } - } else { - unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c)); } } @@ -224,7 +209,7 @@ uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { void ManifestIncrementalIndexer::reset(std::uint32_t offset) { IncrementalIndexer::reset(offset); - suffix_hash_map_.clear(); + suffix_map_.clear(); unverified_segments_.clear(); SuffixQueue empty; std::swap(suffix_queue_, empty); diff --git a/libtransport/src/protocols/manifest_incremental_indexer.h b/libtransport/src/protocols/manifest_incremental_indexer.h index 38b01533e..1bb76eb87 100644 --- a/libtransport/src/protocols/manifest_incremental_indexer.h +++ b/libtransport/src/protocols/manifest_incremental_indexer.h @@ -15,6 +15,7 @@ #pragma once +#include <hicn/transport/auth/common.h> #include <implementation/socket.h> #include <protocols/incremental_indexer.h> #include <utils/suffix_strategy.h> @@ -22,7 +23,6 @@ #include <list> namespace transport { - namespace protocol { class ManifestIncrementalIndexer : public IncrementalIndexer { @@ -30,7 +30,8 @@ class ManifestIncrementalIndexer : public IncrementalIndexer { public: using SuffixQueue = std::queue<uint32_t>; - using HashEntry = std::pair<std::vector<uint8_t>, utils::CryptoHashType>; + using InterestContentPair = + std::pair<core::Interest::Ptr, core::ContentObject::Ptr>; ManifestIncrementalIndexer(implementation::ConsumerSocket *icn_socket, TransportProtocol *transport, @@ -50,8 +51,8 @@ class ManifestIncrementalIndexer : public IncrementalIndexer { void reset(std::uint32_t offset = 0) override; - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; + void onContentObject(core::Interest &interest, + core::ContentObject &content_object) override; uint32_t getNextSuffix() override; @@ -61,30 +62,24 @@ class ManifestIncrementalIndexer : public IncrementalIndexer { uint32_t getFinalSuffix() override; - private: - void onUntrustedManifest(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object); - void onUntrustedContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object); - void processTrustedManifest(core::ContentObject::Ptr &&content_object); - void onManifestReceived(core::Interest::Ptr &&i, - core::ContentObject::Ptr &&c); - void onManifestTimeout(core::Interest::Ptr &&i); - VerificationPolicy verifyContentObject( - const HashEntry &manifest_hash, - const core::ContentObject &content_object); - bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash); - protected: std::unique_ptr<utils::SuffixStrategy> suffix_strategy_; SuffixQueue suffix_queue_; // Hash verification - std::unordered_map<uint32_t, HashEntry> suffix_hash_map_; + std::unordered_map<auth::Suffix, auth::HashEntry> suffix_map_; + std::unordered_map<auth::Suffix, InterestContentPair> unverified_segments_; - std::unordered_map<uint32_t, - std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> - unverified_segments_; + private: + void onUntrustedManifest(core::Interest &interest, + core::ContentObject &content_object); + void processTrustedManifest(core::Interest &interest, + std::unique_ptr<ContentObjectManifest> manifest); + void onUntrustedContentObject(core::Interest &interest, + core::ContentObject &content_object); + void applyPolicy(core::Interest &interest, + core::ContentObject &content_object, + auth::VerificationPolicy policy); }; } // end namespace protocol diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc new file mode 100644 index 000000000..6bd989fe4 --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_bytestream.cc @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_producer.h> +#include <protocols/prod_protocol_bytestream.h> + +#include <atomic> + +namespace transport { + +namespace protocol { + +using namespace core; +using namespace implementation; + +ByteStreamProductionProtocol::ByteStreamProductionProtocol( + implementation::ProducerSocket *icn_socket) + : ProductionProtocol(icn_socket) {} + +ByteStreamProductionProtocol::~ByteStreamProductionProtocol() { + stop(); + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +uint32_t ByteStreamProductionProtocol::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceDatagram(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, + bool is_last, + uint32_t start_offset) { + if (!buffer_size) { + return 0; + } + + return produceStream(content_name, + utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, + start_offset); +} + +uint32_t ByteStreamProductionProtocol::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { + return 0; + } + + Name name(content_name); + + // Get the atomic variables to ensure they keep the same value + // during the production + + // Total size of the data packet + uint32_t data_packet_size; + socket_->getSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + // Expiry time + uint32_t content_object_expiry_time; + socket_->getSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + content_object_expiry_time); + + // Hash algorithm + auth::CryptoHashType hash_algo; + socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo); + + // Use manifest + bool making_manifest; + socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + + // Suffix calculation strategy + core::NextSegmentCalculationStrategy _suffix_strategy; + socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY, + _suffix_strategy); + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + _suffix_strategy, start_offset); + + std::shared_ptr<auth::Signer> signer; + socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer); + + auto buffer_size = buffer->length(); + int bytes_segmented = 0; + std::size_t header_size; + std::size_t manifest_header_size = 0; + std::size_t signature_length = 0; + std::uint32_t final_block_number = start_offset; + uint64_t free_space_for_content = 0; + + core::Packet::Format format; + std::shared_ptr<core::ContentObjectManifest> manifest; + bool is_last_manifest = false; + + // TODO Manifest may still be used for indexing + if (making_manifest && !signer) { + TRANSPORT_LOGE("Making manifests without setting producer identity."); + } + + core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; + core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; + + if (name.getType() == HNT_CONTIGUOUS_V4 || name.getType() == HNT_IOV_V4) { + hf_format = core::Packet::Format::HF_INET_TCP; + hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; + } else if (name.getType() == HNT_CONTIGUOUS_V6 || + name.getType() == HNT_IOV_V6) { + hf_format = core::Packet::Format::HF_INET6_TCP; + hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; + } else { + throw errors::RuntimeException("Unknown name format."); + } + + format = hf_format; + if (making_manifest) { + manifest_header_size = core::Packet::getHeaderSizeFromFormat( + signer ? hf_format_ah : hf_format, + signer ? signer->getSignatureSize() : 0); + } else if (signer) { + format = hf_format_ah; + signature_length = signer->getSignatureSize(); + } + + header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); + free_space_for_content = data_packet_size - header_size; + uint32_t number_of_segments = + uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); + if (free_space_for_content * number_of_segments < buffer_size) { + number_of_segments++; + } + + // TODO allocate space for all the headers + if (making_manifest) { + uint32_t segment_in_manifest = static_cast<uint32_t>( + std::floor(double(data_packet_size - manifest_header_size - + ContentObjectManifest::getManifestHeaderSize()) / + ContentObjectManifest::getManifestEntrySize()) - + 1.0); + uint32_t number_of_manifests = static_cast<uint32_t>( + std::ceil(float(number_of_segments) / segment_in_manifest)); + final_block_number += number_of_segments + number_of_manifests - 1; + + manifest.reset(ContentObjectManifest::createManifest( + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, + hash_algo, is_last_manifest, name, _suffix_strategy, + signer ? signer->getSignatureSize() : 0)); + manifest->setLifetime(content_object_expiry_time); + + if (is_last) { + manifest->setFinalBlockNumber(final_block_number); + } else { + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + for (unsigned int packaged_segments = 0; + packaged_segments < number_of_segments; packaged_segments++) { + if (making_manifest) { + if (manifest->estimateManifestSize(2) > + data_packet_size - manifest_header_size) { + manifest->encode(); + + // If identity set, sign manifest + if (signer) { + signer->signPacket(manifest.get()); + } + + // Send the current manifest + passContentObjectToCallbacks(manifest); + + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); + + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); + content_queue_.pop(); + } + + // Create new manifest. The reference to the last manifest has been + // acquired in the passContentObjectToCallbacks function, so we can + // safely release this reference + manifest.reset(ContentObjectManifest::createManifest( + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, + name, _suffix_strategy, signer ? signer->getSignatureSize() : 0)); + + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + auto content_suffix = suffix_strategy->getNextContentSuffix(); + auto content_object = std::make_shared<ContentObject>( + name.setSuffix(content_suffix), format, + signer && !making_manifest ? signer->getSignatureSize() : 0); + content_object->setLifetime(content_object_expiry_time); + + auto b = buffer->cloneOne(); + b->trimStart(free_space_for_content * packaged_segments); + b->trimEnd(b->length()); + + if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { + b->append(buffer_size - bytes_segmented); + bytes_segmented += (int)(buffer_size - bytes_segmented); + + if (is_last && making_manifest) { + is_last_manifest = true; + } else if (is_last) { + content_object->setRst(); + } + + } else { + b->append(free_space_for_content); + bytes_segmented += (int)(free_space_for_content); + } + + content_object->appendPayload(std::move(b)); + + if (making_manifest) { + using namespace std::chrono_literals; + auth::CryptoHash hash = content_object->computeDigest(hash_algo); + manifest->addSuffixHash(content_suffix, hash); + content_queue_.push(content_object); + } else { + if (signer) { + signer->signPacket(content_object.get()); + } + passContentObjectToCallbacks(content_object); + TRANSPORT_LOGD("Send content %s", + content_object->getName().toString().c_str()); + } + } + + if (making_manifest) { + if (is_last_manifest) { + manifest->setFinalManifest(is_last_manifest); + } + + manifest->encode(); + + if (signer) { + signer->signPacket(manifest.get()); + } + + passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); + content_queue_.pop(); + } + } + + portal_->getIoService().post([this]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } + }); + + portal_->getIoService().dispatch([this, buffer_size]() { + if (*on_content_produced_) { + on_content_produced_->operator()(*socket_->getInterface(), + std::make_error_code(std::errc(0)), + buffer_size); + } + }); + + return suffix_strategy->getTotalCount(); +} + +void ByteStreamProductionProtocol::scheduleSendBurst() { + portal_->getIoService().post([this]() { + std::shared_ptr<ContentObject> co; + + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } else { + break; + } + } + }); +} + +void ByteStreamProductionProtocol::passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); + } +} + +void ByteStreamProductionProtocol::onInterest(Interest &interest) { + TRANSPORT_LOGD("Received interest for %s", + interest.getName().toString().c_str()); + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), + interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + portal_->sendContentObject(*content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + } +} + +void ByteStreamProductionProtocol::onError(std::error_code ec) {} + +} // namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/prod_protocol_bytestream.h b/libtransport/src/protocols/prod_protocol_bytestream.h new file mode 100644 index 000000000..cf36b90a5 --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_bytestream.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/ring_buffer.h> +#include <protocols/production_protocol.h> + +#include <atomic> +#include <queue> + +namespace transport { + +namespace protocol { + +using namespace core; + +class ByteStreamProductionProtocol : public ProductionProtocol { + static constexpr uint32_t burst_size = 256; + + public: + ByteStreamProductionProtocol(implementation::ProducerSocket *icn_socket); + + ~ByteStreamProductionProtocol() override; + + using ProductionProtocol::start; + using ProductionProtocol::stop; + + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; + uint32_t produceDatagram(const Name &content_name, const uint8_t *buffer, + size_t buffer_size) override; + + protected: + // Consumer Callback + // void reset() override; + void onInterest(core::Interest &i) override; + void onError(std::error_code ec) override; + + private: + void passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object); + void scheduleSendBurst(); + + private: + // While manifests are being built, contents are stored in a queue + std::queue<std::shared_ptr<ContentObject>> content_queue_; + utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> + object_queue_for_callbacks_; +}; + +} // end namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc new file mode 100644 index 000000000..8081923e3 --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -0,0 +1,481 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/global_object_pool.h> +#include <implementation/socket_producer.h> +#include <protocols/prod_protocol_rtc.h> +#include <protocols/rtc/rtc_consts.h> +#include <stdlib.h> +#include <time.h> + +#include <unordered_set> + +namespace transport { +namespace protocol { + +RTCProductionProtocol::RTCProductionProtocol( + implementation::ProducerSocket *icn_socket) + : ProductionProtocol(icn_socket), + current_seg_(1), + produced_bytes_(0), + produced_packets_(0), + max_packet_production_(1), + bytes_production_rate_(0), + packets_production_rate_(0), + last_round_(std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count()), + allow_delayed_nacks_(false), + queue_timer_on_(false), + consumer_in_sync_(false), + on_consumer_in_sync_(nullptr) { + srand((unsigned int)time(NULL)); + prod_label_ = rand() % 256; + interests_queue_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + setOutputBufferSize(10000); + scheduleRoundTimer(); +} + +RTCProductionProtocol::~RTCProductionProtocol() {} + +void RTCProductionProtocol::registerNamespaceWithNetwork( + const Prefix &producer_namespace) { + ProductionProtocol::registerNamespaceWithNetwork(producer_namespace); + + flow_name_ = producer_namespace.getName(); + auto family = flow_name_.getAddressFamily(); + + switch (family) { + case AF_INET6: + header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + break; + case AF_INET: + header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); + break; + default: + throw errors::RuntimeException("Unknown name format."); + } +} + +void RTCProductionProtocol::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); +} + +void RTCProductionProtocol::updateStats() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t duration = now - last_round_; + if (duration == 0) duration = 1; + double per_second = rtc::MILLI_IN_A_SEC / duration; + + uint32_t prev_packets_production_rate = packets_production_rate_; + + bytes_production_rate_ = ceil((double)produced_bytes_ * per_second); + packets_production_rate_ = ceil((double)produced_packets_ * per_second); + + TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u", + produced_bytes_, bytes_production_rate_); + + // update the production rate as soon as it increases by 10% with respect to + // the last round + max_packet_production_ = + produced_packets_ + ceil((double)produced_packets_ * 0.1); + if (max_packet_production_ < rtc::WIN_MIN) + max_packet_production_ = rtc::WIN_MIN; + + if (packets_production_rate_ != 0) { + allow_delayed_nacks_ = false; + } else if (prev_packets_production_rate == 0) { + // at least 2 rounds with production rate = 0 + allow_delayed_nacks_ = true; + } + + // check if the production rate is decreased. if yes send nacks if needed + if (prev_packets_production_rate < packets_production_rate_) { + sendNacksForPendingInterests(); + } + + produced_bytes_ = 0; + produced_packets_ = 0; + last_round_ = now; + scheduleRoundTimer(); +} + +uint32_t RTCProductionProtocol::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + throw errors::NotImplementedException(); +} + +uint32_t RTCProductionProtocol::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, bool is_last, + uint32_t start_offset) { + throw errors::NotImplementedException(); +} + +void RTCProductionProtocol::produce(ContentObject &content_object) { + throw errors::NotImplementedException(); +} + +uint32_t RTCProductionProtocol::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + std::size_t buffer_size = buffer->length(); + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0; + + uint32_t data_packet_size; + socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ + + rtc::DATA_HEADER_SIZE) > data_packet_size)) { + return 0; + } + + auto content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>(); + // add rtc header to the payload + struct rtc::data_packet_t header; + content_object->appendPayload((const uint8_t *)&header, + rtc::DATA_HEADER_SIZE); + content_object->appendPayload(buffer->data(), buffer->length()); + + std::shared_ptr<ContentObject> co = std::move(content_object); + + // schedule actual sending on internal thread + portal_->getIoService().dispatch( + [this, content_object{std::move(co)}, content_name]() mutable { + produceInternal(std::move(content_object), content_name); + }); + + return 1; +} + +void RTCProductionProtocol::produceInternal( + std::shared_ptr<ContentObject> &&content_object, const Name &content_name) { + // set rtc header + struct rtc::data_packet_t *data_pkt = + (struct rtc::data_packet_t *)content_object->getPayload()->data(); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + data_pkt->setTimestamp(now); + data_pkt->setProductionRate(bytes_production_rate_); + + // set hicn stuff + Name n(content_name); + content_object->setName(n.setSuffix(current_seg_)); + content_object->setLifetime(500); // XXX this should be set by the APP + content_object->setPathLabel(prod_label_); + + // update stats + produced_bytes_ += + content_object->headerSize() + content_object->payloadSize(); + produced_packets_++; + + if (produced_packets_ >= max_packet_production_) { + // in this case all the pending interests may be used to accomodate the + // sudden increase in the production rate. calling the updateStats we will + // notify all the clients + round_timer_->cancel(); + updateStats(); + } + + TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str()); + + output_buffer_.insert(content_object); + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()(*socket_->getInterface(), + *content_object); + } + + portal_->sendContentObject(*content_object); + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + // remove interests from the interest cache if it exists + removeFromInterestQueue(current_seg_); + + current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ; +} + +void RTCProductionProtocol::onInterest(Interest &interest) { + uint32_t interest_seg = interest.getName().getSuffix(); + uint32_t lifetime = interest.getLifetime(); + + if (interest_seg == 0) { + // first packet from the consumer, reset sync state + consumer_in_sync_ = false; + } + + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (interest_seg > rtc::MIN_PROBE_SEQ) { + TRANSPORT_LOGD("received probe %u", interest_seg); + sendNack(interest_seg); + return; + } + + TRANSPORT_LOGD("received interest %u", interest_seg); + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), + interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + TRANSPORT_LOGD("Send content %u (onInterest)", + content_object->getName().getSuffix()); + portal_->sendContentObject(*content_object); + return; + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + } + + // if the production rate 0 use delayed nacks + if (allow_delayed_nacks_ && interest_seg >= current_seg_) { + uint64_t next_timer = ~0; + if (!timers_map_.empty()) { + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL; + addToInterestQueue(interest_seg, expiration); + + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!queue_timer_on_) { + // set timeout + queue_timer_on_ = true; + scheduleQueueTimer(timers_map_.begin()->first - now); + } else { + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { + interests_queue_timer_->cancel(); + scheduleQueueTimer(timers_map_.begin()->first - now); + } + } + return; + } + + if (queue_timer_on_) { + // the producer is producing. Send nacks to packets that will expire before + // the data production and remove the timer + queue_timer_on_ = false; + interests_queue_timer_->cancel(); + sendNacksForPendingInterests(); + } + + uint32_t max_gap = (uint32_t)floor( + (double)((double)((double)lifetime * + rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / + rtc::MILLI_IN_A_SEC) * + (double)packets_production_rate_)); + + if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { + sendNack(interest_seg); + } else { + if (!consumer_in_sync_ && on_consumer_in_sync_) { + // we consider the remote consumer to be in sync as soon as it covers 70% + // of the production window with interests + uint32_t perc = ceil((double)max_gap * 0.7); + if (interest_seg > (perc + current_seg_)) { + consumer_in_sync_ = true; + on_consumer_in_sync_(*socket_->getInterface(), interest); + } + } + uint64_t expiration = + now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR); + addToInterestQueue(interest_seg, expiration); + } +} + +void RTCProductionProtocol::onError(std::error_code ec) {} + +void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { + interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); + interests_queue_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestQueueTimer(); + }); +} + +void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, + uint64_t expiration) { + // check if the seq number exists already + auto it_seqs = seqs_map_.find(interest_seg); + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interest_seg)); + it_seqs->second = expiration; + } else { + // nothing to do here + return; + } + } else { + // add the new seq + timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration)); + } +} + +void RTCProductionProtocol::sendNacksForPendingInterests() { + std::unordered_set<uint32_t> to_remove; + + uint32_t packet_gap = 100000; // set it to a high value (100sec) + if (packets_production_rate_ != 0) + packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first > current_seg_) { + uint64_t production_time = + ((it->first - current_seg_) * packet_gap) + now; + if (production_time >= it->second) { + sendNack(it->first); + to_remove.insert(it->first); + } + } + } + + // delete nacked interests + for (auto it = to_remove.begin(); it != to_remove.end(); it++) { + removeFromInterestQueue(*it); + } +} + +void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { + auto seq_it = seqs_map_.find(interest_seg); + if (seq_it != seqs_map_.end()) { + auto range = timers_map_.equal_range(seq_it->second); + for (auto it_timers = range.first; it_timers != range.second; it_timers++) { + if (it_timers->second == seq_it->first) { + timers_map_.erase(it_timers); + break; + } + } + seqs_map_.erase(seq_it); + } +} + +void RTCProductionProtocol::interestQueueTimer() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { + uint64_t expire = it_timers->first; + if (expire <= now) { + uint32_t seq = it_timers->second; + sendNack(seq); + // remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + } else { + // stop, we are done! + break; + } + } + if (timers_map_.empty()) { + queue_timer_on_ = false; + } else { + queue_timer_on_ = true; + scheduleQueueTimer(timers_map_.begin()->first - now); + } +} + +void RTCProductionProtocol::sendNack(uint32_t sequence) { + auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>(); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint32_t next_packet = current_seg_; + uint32_t prod_rate = bytes_production_rate_; + + struct rtc::nack_packet_t header; + header.setTimestamp(now); + header.setProductionRate(prod_rate); + header.setProductionSegement(next_packet); + nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE); + + Name n(flow_name_); + n.setSuffix(sequence); + nack->setName(n); + nack->setLifetime(0); + nack->setPathLabel(prod_label_); + + if (!consumer_in_sync_ && on_consumer_in_sync_ && + sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) { + consumer_in_sync_ = true; + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + interest->setName(n); + on_consumer_in_sync_(*socket_->getInterface(), *interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *nack); + } + + TRANSPORT_LOGD("Send nack %u", sequence); + portal_->sendContentObject(*nack); +} + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h new file mode 100644 index 000000000..f3584f74a --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_rtc.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/name.h> +#include <protocols/production_protocol.h> + +#include <atomic> +#include <map> +#include <mutex> + +namespace transport { +namespace protocol { + +class RTCProductionProtocol : public ProductionProtocol { + public: + RTCProductionProtocol(implementation::ProducerSocket *icn_socket); + ~RTCProductionProtocol() override; + + using ProductionProtocol::start; + using ProductionProtocol::stop; + + void produce(ContentObject &content_object) override; + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; + uint32_t produceDatagram(const Name &content_name, const uint8_t *buffer, + size_t buffer_size) override { + return produceDatagram(content_name, utils::MemBuf::wrapBuffer( + buffer, buffer_size, buffer_size)); + } + + void registerNamespaceWithNetwork(const Prefix &producer_namespace) override; + + void setConsumerInSyncCallback( + interface::ProducerInterestCallback &&callback) { + on_consumer_in_sync_ = std::move(callback); + } + + private: + // packet handlers + void onInterest(Interest &interest) override; + void onError(std::error_code ec) override; + void produceInternal(std::shared_ptr<ContentObject> &&content_object, + const Name &content_name); + void sendNack(uint32_t sequence); + + // stats + void updateStats(); + void scheduleRoundTimer(); + + // pending intersts functions + void addToInterestQueue(uint32_t interest_seg, uint64_t expiration); + void sendNacksForPendingInterests(); + void removeFromInterestQueue(uint32_t interest_seg); + void scheduleQueueTimer(uint64_t wait); + void interestQueueTimer(); + + core::Name flow_name_; + + uint32_t current_seg_; // seq id of the next packet produced + uint32_t prod_label_; // path lable of the producer + uint16_t header_size_; // hicn header size + + uint32_t produced_bytes_; // bytes produced in the last round + uint32_t produced_packets_; // packet produed in the last round + + uint32_t max_packet_production_; // never exceed this number of packets + // without update stats + + uint32_t bytes_production_rate_; // bytes per sec + uint32_t packets_production_rate_; // pps + + std::unique_ptr<asio::steady_timer> round_timer_; + uint64_t last_round_; + + // delayed nacks are used by the producer to avoid to send too + // many nacks we the producer rate is 0. however, if the producer moves + // from a production rate higher than 0 to 0 the first round the dealyed + // should be avoided in order to notify the consumer as fast as possible + // of the new rate. + bool allow_delayed_nacks_; + + // queue for the received interests + // this map maps the expiration time of an interest to + // its sequence number. the map is sorted by timeouts + // the same timeout may be used for multiple sequence numbers + // but for each sequence number we store only the smallest + // expiry time. In this way the mapping from seqs_map_ to + // timers_map_ is unique + std::multimap<uint64_t, uint32_t> timers_map_; + + // this map does the opposite, this map is not ordered + std::unordered_map<uint32_t, uint64_t> seqs_map_; + bool queue_timer_on_; + std::unique_ptr<asio::steady_timer> interests_queue_timer_; + + // this callback is called when the remote consumer is in sync with high + // probability. it is called only the first time that the switch happen. + // XXX this makes sense only in P2P mode, while in standard mode is + // impossible to know the state of the consumers so it should not be used. + bool consumer_in_sync_; + interface::ProducerInterestCallback on_consumer_in_sync_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc new file mode 100644 index 000000000..8addf52d1 --- /dev/null +++ b/libtransport/src/protocols/production_protocol.cc @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_producer.h> +#include <protocols/production_protocol.h> + +namespace transport { + +namespace protocol { + +using namespace interface; + +ProductionProtocol::ProductionProtocol( + implementation::ProducerSocket *icn_socket) + : socket_(icn_socket), + is_running_(false), + on_interest_input_(VOID_HANDLER), + on_interest_dropped_input_buffer_(VOID_HANDLER), + on_interest_inserted_input_buffer_(VOID_HANDLER), + on_interest_satisfied_output_buffer_(VOID_HANDLER), + on_interest_process_(VOID_HANDLER), + on_new_segment_(VOID_HANDLER), + on_content_object_to_sign_(VOID_HANDLER), + on_content_object_in_output_buffer_(VOID_HANDLER), + on_content_object_output_(VOID_HANDLER), + on_content_object_evicted_from_output_buffer_(VOID_HANDLER), + on_content_produced_(VOID_HANDLER) { + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + // TODO add statistics for producer + // socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); +} + +ProductionProtocol::~ProductionProtocol() { + if (!is_async_ && is_running_) { + stop(); + } + + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +int ProductionProtocol::start() { + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + &on_interest_input_); + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP, + &on_interest_dropped_input_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS, + &on_interest_inserted_input_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT, + &on_interest_satisfied_output_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + &on_interest_process_); + socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT, + &on_new_segment_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY, + &on_content_object_in_output_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT, + &on_content_object_output_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN, + &on_content_object_to_sign_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + &on_content_produced_); + + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + + bool first = true; + + for (core::Prefix &producer_namespace : served_namespaces_) { + if (first) { + core::BindConfig bind_config(producer_namespace, 1000); + portal_->bind(bind_config); + portal_->setProducerCallback(this); + first = !first; + } else { + portal_->registerRoute(producer_namespace); + } + } + + is_running_ = true; + + if (!is_async_) { + listening_thread_ = std::thread([this]() { portal_->runEventsLoop(); }); + } + + return 0; +} + +void ProductionProtocol::stop() { + is_running_ = false; + + if (!is_async_) { + portal_->stopEventsLoop(); + } else { + portal_->clear(); + } +} + +void ProductionProtocol::produce(ContentObject &content_object) { + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()(*socket_->getInterface(), + content_object); + } + + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object.shared_from_this())); + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + content_object); + } + + portal_->sendContentObject(content_object); +} + +void ProductionProtocol::registerNamespaceWithNetwork( + const Prefix &producer_namespace) { + served_namespaces_.push_back(producer_namespace); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/production_protocol.h b/libtransport/src/protocols/production_protocol.h new file mode 100644 index 000000000..780972321 --- /dev/null +++ b/libtransport/src/protocols/production_protocol.h @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/interfaces/statistics.h> +#include <hicn/transport/utils/object_pool.h> +#include <implementation/socket.h> +#include <utils/content_store.h> + +#include <atomic> +#include <thread> + +namespace transport { + +namespace protocol { + +using namespace core; + +class ProductionProtocol : public Portal::ProducerCallback { + public: + ProductionProtocol(implementation::ProducerSocket *icn_socket); + virtual ~ProductionProtocol(); + + bool isRunning() { return is_running_; } + + virtual int start(); + virtual void stop(); + + virtual void produce(ContentObject &content_object); + virtual uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) = 0; + virtual uint32_t produceStream(const Name &content_name, + const uint8_t *buffer, size_t buffer_size, + bool is_last = true, + uint32_t start_offset = 0) = 0; + virtual uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) = 0; + virtual uint32_t produceDatagram(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size) = 0; + + void setOutputBufferSize(std::size_t size) { output_buffer_.setLimit(size); } + std::size_t getOutputBufferSize() { return output_buffer_.getLimit(); } + + virtual void registerNamespaceWithNetwork(const Prefix &producer_namespace); + const std::list<Prefix> &getNamespaces() const { return served_namespaces_; } + + protected: + // Producer callback + virtual void onInterest(core::Interest &i) override = 0; + virtual void onError(std::error_code ec) override{}; + + protected: + implementation::ProducerSocket *socket_; + + // Thread pool responsible for IO operations (send data / receive interests) + std::vector<utils::EventThread> io_threads_; + + // TODO remove this thread + std::thread listening_thread_; + std::shared_ptr<Portal> portal_; + std::atomic<bool> is_running_; + interface::ProductionStatistics *stats_; + + // Callbacks + interface::ProducerInterestCallback *on_interest_input_; + interface::ProducerInterestCallback *on_interest_dropped_input_buffer_; + interface::ProducerInterestCallback *on_interest_inserted_input_buffer_; + interface::ProducerInterestCallback *on_interest_satisfied_output_buffer_; + interface::ProducerInterestCallback *on_interest_process_; + + interface::ProducerContentObjectCallback *on_new_segment_; + interface::ProducerContentObjectCallback *on_content_object_to_sign_; + interface::ProducerContentObjectCallback *on_content_object_in_output_buffer_; + interface::ProducerContentObjectCallback *on_content_object_output_; + interface::ProducerContentObjectCallback + *on_content_object_evicted_from_output_buffer_; + + interface::ProducerContentCallback *on_content_produced_; + + // Output buffer + utils::ContentStore output_buffer_; + + // List ot routes served by current producer protocol + std::list<Prefix> served_namespaces_; + + bool is_async_; +}; + +} // end namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 5023adf2e..bc8500227 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/core/global_object_pool.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <implementation/socket_consumer.h> #include <protocols/errors.h> @@ -126,10 +127,6 @@ void RaaqmTransportProtocol::reset() { } } -bool RaaqmTransportProtocol::verifyKeyPackets() { - return index_manager_->onKeyToVerify(); -} - void RaaqmTransportProtocol::increaseWindow() { // return; double max_window_size = 0.; @@ -325,8 +322,8 @@ void RaaqmTransportProtocol::init() { is.close(); } -void RaaqmTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { +void RaaqmTransportProtocol::onContentObject(Interest &interest, + ContentObject &content_object) { // Check whether makes sense to continue if (TRANSPORT_EXPECT_FALSE(!is_running_)) { return; @@ -334,54 +331,53 @@ void RaaqmTransportProtocol::onContentObject( // Call application-defined callbacks if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), *content_object); + (*on_content_object_input_)(*socket_->getInterface(), content_object); } if (*on_interest_satisfied_) { - (*on_interest_satisfied_)(*socket_->getInterface(), *interest); + (*on_interest_satisfied_)(*socket_->getInterface(), interest); } - if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - stats_->updateBytesRecv(content_object->payloadSize()); + if (content_object.getPayloadType() == PayloadType::DATA) { + stats_->updateBytesRecv(content_object.payloadSize()); } - onContentSegment(std::move(interest), std::move(content_object)); + onContentSegment(interest, content_object); scheduleNextInterests(); } -void RaaqmTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); +void RaaqmTransportProtocol::onContentSegment(Interest &interest, + ContentObject &content_object) { + uint32_t incremental_suffix = content_object.getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; // Update stats if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); + afterContentReception(interest, content_object); } - index_manager_->onContentObject(std::move(interest), - std::move(content_object)); + index_manager_->onContentObject(interest, content_object); } -void RaaqmTransportProtocol::onPacketDropped( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { +void RaaqmTransportProtocol::onPacketDropped(Interest &interest, + ContentObject &content_object) { uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - uint64_t segment = interest->getName().getSuffix(); + uint64_t segment = interest.getName().getSuffix(); if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { stats_->updateRetxCount(1); if (*on_interest_retransmission_) { - (*on_interest_retransmission_)(*socket_->getInterface(), *interest); + (*on_interest_retransmission_)(*socket_->getInterface(), interest); } if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); + (*on_interest_output_)(*socket_->getInterface(), interest); } if (!is_running_) { @@ -389,7 +385,7 @@ void RaaqmTransportProtocol::onPacketDropped( } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); + interest_to_retransmit_.push(interest.shared_from_this()); } else { TRANSPORT_LOGE( "Stop: received not trusted packet %llu times", @@ -477,6 +473,11 @@ void RaaqmTransportProtocol::scheduleNextInterests() { sendInterest(std::move(interest_to_retransmit_.front())); interest_to_retransmit_.pop(); } else { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + TRANSPORT_LOGI("Adios"); + break; + } + index = index_manager_->getNextSuffix(); if (index == IndexManager::invalid_index) { break; @@ -487,8 +488,8 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } } -bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getPacket(); +void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); name->setSuffix((uint32_t)next_suffix); @@ -502,19 +503,12 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { if (*on_interest_output_) { on_interest_output_->operator()(*socket_->getInterface(), *interest); } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return false; - } - // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); - - return true; } void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index fce4194d4..be477d39f 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -18,9 +18,9 @@ #include <hicn/transport/utils/chrono_typedefs.h> #include <protocols/byte_stream_reassembly.h> #include <protocols/congestion_window_protocol.h> -#include <protocols/protocol.h> #include <protocols/raaqm_data_path.h> #include <protocols/rate_estimation.h> +#include <protocols/transport_protocol.h> #include <queue> #include <vector> @@ -42,8 +42,6 @@ class RaaqmTransportProtocol : public TransportProtocol, void reset() override; - virtual bool verifyKeyPackets() override; - protected: static constexpr uint32_t buffer_size = 1 << interface::default_values::log_2_default_buffer_size; @@ -64,13 +62,12 @@ class RaaqmTransportProtocol : public TransportProtocol, private: void init(); - void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override; + void onContentObject(Interest &i, ContentObject &c) override; - void onContentSegment(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object); + void onContentSegment(Interest &interest, ContentObject &content_object); - void onPacketDropped(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) override; + void onPacketDropped(Interest &interest, + ContentObject &content_object) override; void onReassemblyFailed(std::uint32_t missing_segment) override; @@ -78,7 +75,7 @@ class RaaqmTransportProtocol : public TransportProtocol, virtual void scheduleNextInterests() override; - bool sendInterest(std::uint64_t next_suffix); + void sendInterest(std::uint64_t next_suffix); void sendInterest(Interest::Ptr &&interest); diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index 8bbbadcf2..f2c21b9ef 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -14,7 +14,6 @@ */ #include <hicn/transport/utils/chrono_typedefs.h> - #include <protocols/raaqm_data_path.h> namespace transport { diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index 3f037bc76..c0b53a690 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -16,7 +16,6 @@ #pragma once #include <hicn/transport/utils/chrono_typedefs.h> - #include <utils/min_filter.h> #include <chrono> diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc index a2cf1aefe..5ca925760 100644 --- a/libtransport/src/protocols/rate_estimation.cc +++ b/libtransport/src/protocols/rate_estimation.cc @@ -15,7 +15,6 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/utils/log.h> - #include <protocols/rate_estimation.h> #include <thread> diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h index 17f39e0b9..42ae74194 100644 --- a/libtransport/src/protocols/rate_estimation.h +++ b/libtransport/src/protocols/rate_estimation.h @@ -16,7 +16,6 @@ #pragma once #include <hicn/transport/interfaces/statistics.h> - #include <protocols/raaqm_data_path.h> #include <chrono> diff --git a/libtransport/src/protocols/reassembly.cc b/libtransport/src/protocols/reassembly.cc index c6602153c..0e59832dc 100644 --- a/libtransport/src/protocols/reassembly.cc +++ b/libtransport/src/protocols/reassembly.cc @@ -16,7 +16,6 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/utils/array.h> #include <hicn/transport/utils/membuf.h> - #include <implementation/socket_consumer.h> #include <protocols/errors.h> #include <protocols/indexer.h> diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h index fdc9f2a05..385122c53 100644 --- a/libtransport/src/protocols/reassembly.h +++ b/libtransport/src/protocols/reassembly.h @@ -46,7 +46,7 @@ class Reassembly { virtual ~Reassembly() = default; - virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0; + virtual void reassemble(core::ContentObject &content_object) = 0; virtual void reassemble( std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0; virtual void reInitialize() = 0; diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt new file mode 100644 index 000000000..77f065d0e --- /dev/null +++ b/libtransport/src/protocols/rtc/CMakeLists.txt @@ -0,0 +1,38 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc new file mode 100644 index 000000000..e2d44ae66 --- /dev/null +++ b/libtransport/src/protocols/rtc/congestion_detection.cc @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/log.h> +#include <protocols/rtc/congestion_detection.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +CongestionDetection::CongestionDetection() + : cc_estimator_(), last_processed_chunk_() {} + +CongestionDetection::~CongestionDetection() {} + +void CongestionDetection::updateStats() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (chunks_number_.empty()) return; + + uint32_t chunk_number = chunks_number_.front(); + + while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS < + now || + chunks_[chunk_number].isComplete()) { + if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) { + chunks_[chunk_number].setPreviousSentTime( + last_processed_chunk_.getSentTime()); + + chunks_[chunk_number].setPreviousReceivedTime( + last_processed_chunk_.getReceivedTime()); + cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(), + chunks_[chunk_number].getSentDelta(), + chunks_[chunk_number].getSentTime(), + chunks_[chunk_number].getReceivedTime(), + chunks_[chunk_number].getFrameSize(), true); + + } else { + TRANSPORT_LOGD( + "CongestionDetection::updateStats frame %u but not the \ + previous one, last one was %u currentFrame %u", + chunk_number, last_processed_chunk_.getFrameSeqNum(), + chunks_[chunk_number].getFrameSeqNum()); + } + + last_processed_chunk_ = chunks_[chunk_number]; + + chunks_.erase(chunk_number); + + chunks_number_.pop(); + if (chunks_number_.empty()) break; + + chunk_number = chunks_number_.front(); + } +} + +void CongestionDetection::addPacket(const core::ContentObject &content_object) { + auto payload = content_object.getPayload(); + uint32_t payload_size = (uint32_t)payload->length(); + uint32_t segmentNumber = content_object.getName().getSuffix(); + // uint32_t pkt = segmentNumber & modMask_; + uint64_t *sentTimePtr = (uint64_t *)payload->data(); + + // this is just for testing with hiperf, assuming a frame is 10 pkts + // in the final version, the split should be based on the timestamp in the pkt + uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (chunks_.find(frameNum) == chunks_.end()) { + // new chunk of pkts or out of order + if (last_processed_chunk_.getFrameSeqNum() > frameNum) + return; // out of order and we already processed the chunk + + chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE); + chunks_number_.push(frameNum); + } + + chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size); +} + +} // namespace rtc +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h new file mode 100644 index 000000000..17f4aa54c --- /dev/null +++ b/libtransport/src/protocols/rtc/congestion_detection.h @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/content_object.h> +#include <protocols/rtc/trendline_estimator.h> + +#include <map> +#include <queue> + +#define HICN_CC_STATS_CHUNK_SIZE 10 +#define HICN_CC_STATS_MAX_DELAY_MS 100 + +namespace transport { + +namespace protocol { + +namespace rtc { + +class FrameStats { + public: + FrameStats() + : frame_num_(0), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(HICN_CC_STATS_CHUNK_SIZE){}; + + FrameStats(uint32_t burst_size) + : frame_num_(0), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(burst_size){}; + + FrameStats(uint32_t frame_num, uint32_t burst_size) + : frame_num_(frame_num), + sent_time_(0), + received_time_(0), + previous_sent_time_(0), + previous_received_time_(0), + size_(0), + received_pkt_m(0), + burst_size_m(burst_size){}; + + FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time, + uint32_t size, FrameStats previousFrame, uint32_t burst_size) + : frame_num_(frame_num), + sent_time_(sent_time), + received_time_(received_time), + previous_sent_time_(previousFrame.getSentTime()), + previous_received_time_(previousFrame.getReceivedTime()), + size_(size), + received_pkt_m(1), + burst_size_m(burst_size){}; + + void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) { + size_ += size; + sent_time_ = + (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time); + received_time_ = std::max(received_time, received_time_); + received_pkt_m++; + } + + bool isComplete() { return received_pkt_m == burst_size_m; } + + uint32_t getFrameSeqNum() const { return frame_num_; } + uint64_t getSentTime() const { return sent_time_; } + uint64_t getReceivedTime() const { return received_time_; } + uint32_t getFrameSize() const { return size_; } + + void setPreviousReceivedTime(uint64_t time) { + previous_received_time_ = time; + } + void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; } + + // todo manage first frame + double getReceivedDelta() { + return static_cast<double>(received_time_ - previous_received_time_); + } + double getSentDelta() { + return static_cast<double>(sent_time_ - previous_sent_time_); + } + + private: + uint32_t frame_num_; + uint64_t sent_time_; + uint64_t received_time_; + + uint64_t previous_sent_time_; + uint64_t previous_received_time_; + uint32_t size_; + + uint32_t received_pkt_m; + uint32_t burst_size_m; +}; + +class CongestionDetection { + public: + CongestionDetection(); + ~CongestionDetection(); + + void addPacket(const core::ContentObject &content_object); + + BandwidthUsage getState() { return cc_estimator_.State(); } + + void updateStats(); + + private: + TrendlineEstimator cc_estimator_; + std::map<uint32_t, FrameStats> chunks_; + std::queue<uint32_t> chunks_number_; + + FrameStats last_processed_chunk_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc new file mode 100644 index 000000000..efba362d4 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/probe_handler.h> +#include <protocols/rtc/rtc_consts.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback, + asio::io_service &io_service) + : probe_interval_(0), + max_probes_(0), + sent_probes_(0), + probe_timer_(std::make_unique<asio::steady_timer>(io_service)), + rand_eng_((std::random_device())()), + distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ), + send_probe_callback_(std::move(send_callback)) {} + +ProbeHandler::~ProbeHandler() {} + +uint64_t ProbeHandler::getRtt(uint32_t seq) { + auto it = pending_probes_.find(seq); + + if (it == pending_probes_.end()) return 0; + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t rtt = now - it->second; + if(rtt < 1) rtt = 1; + + pending_probes_.erase(it); + + return rtt; +} + +void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) { + stopProbes(); + probe_interval_ = probe_interval; + max_probes_ = max_probes; +} + +void ProbeHandler::stopProbes() { + probe_interval_ = 0; + max_probes_ = 0; + sent_probes_ = 0; + probe_timer_->cancel(); +} + +void ProbeHandler::sendProbes() { + if (probe_interval_ == 0) return; + if (max_probes_ != 0 && sent_probes_ >= max_probes_) return; + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint32_t seq = distr_(rand_eng_); + pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now)); + send_probe_callback_(seq); + sent_probes_++; + + // clean up + // a probe may get lost. if the pending_probes_ size becomes bigger than + // MAX_PENDING_PROBES remove all the probes older than a seconds + if (pending_probes_.size() > MAX_PENDING_PROBES) { + for (auto it = pending_probes_.begin(); it != pending_probes_.end();) { + if ((now - it->second) > 1000) + it = pending_probes_.erase(it); + else + it++; + } + } + + if (probe_interval_ == 0) return; + + std::weak_ptr<ProbeHandler> self(shared_from_this()); + probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_)); + probe_timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->sendProbes(); + } + }); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h new file mode 100644 index 000000000..b8ed84445 --- /dev/null +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include <hicn/transport/config.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <functional> +#include <random> +#include <unordered_map> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { + public: + using SendProbeCallback = std::function<void(uint32_t)>; + + public: + ProbeHandler(SendProbeCallback &&send_callback, + asio::io_service &io_service); + + ~ProbeHandler(); + + // if the function returns 0 the probe is not valaid + uint64_t getRtt(uint32_t seq); + + // reset the probes parameters. it stop the current probing. + // to restar call sendProbes. + // probe_interval = 0 means that no event will be scheduled + // max_probe = 0 means no limit to the number of probe to send + void setProbes(uint32_t probe_interval, uint32_t max_probes); + + // stop to schedule probes + void stopProbes(); + + void sendProbes(); + + private: + uint32_t probe_interval_; // us + uint32_t max_probes_; // packets + uint32_t sent_probes_; // packets + + std::unique_ptr<asio::steady_timer> probe_timer_; + + // map from seqnumber to timestamp + std::unordered_map<uint32_t, uint64_t> pending_probes_; + + // random generator + std::default_random_engine rand_eng_; + std::uniform_int_distribution<uint32_t> distr_; + + SendProbeCallback send_probe_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc new file mode 100644 index 000000000..bb95ab686 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -0,0 +1,607 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <math.h> +#include <protocols/rtc/rtc.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_queue.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), + number_(0) { + icn_socket->getSocketOption(PORTAL, portal_); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + scheduler_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); +} + +RTCTransportProtocol::~RTCTransportProtocol() {} + +void RTCTransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + + newRound(); + + portal_->runEventsLoop(); + is_running_ = false; +} + +// private +void RTCTransportProtocol::initParams() { + portal_->setConsumerCallback(this); + + rc_ = std::make_shared<RTCRateControlQueue>(); + ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( + std::bind(&RTCTransportProtocol::sendRtxInterest, this, + std::placeholders::_1), + portal_->getIoService()); + + state_ = std::make_shared<RTCState>( + std::bind(&RTCTransportProtocol::sendProbeInterest, this, + std::placeholders::_1), + std::bind(&RTCTransportProtocol::discoveredRtt, this), + portal_->getIoService()); + + rc_->setState(state_); + // TODO: for the moment we keep the congestion control disabled + // rc_->tunrOnRateControl(); + ldr_->setState(state_); + + // protocol state + start_send_interest_ = false; + current_state_ = SyncState::catch_up; + + // Cancel timer + number_++; + round_timer_->cancel(); + scheduler_timer_->cancel(); + scheduler_timer_on_ = false; + + // delete all timeouts and future nacks + timeouts_or_nacks_.clear(); + + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + // names/packets var + next_segment_ = 0; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + RTC_INTEREST_LIFETIME); +} + +// private +void RTCTransportProtocol::reset() { + TRANSPORT_LOGD("reset called"); + initParams(); + newRound(); +} + +void RTCTransportProtocol::inactiveProducer() { + // when the producer is inactive we reset the consumer state + // cwin vars + current_sync_win_ = INITIAL_WIN; + max_sync_win_ = INITIAL_WIN_MAX; + + TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_, + max_sync_win_); + + // names/packets var + next_segment_ = 0; + + ldr_->clear(); +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN)); + // TODO pass weak_ptr here + round_timer_->async_wait([this, n{number_}](std::error_code ec) { + if (ec) return; + + if (n != number_) { + return; + } + + // saving counters that will be reset on new round + uint32_t sent_retx = state_->getSentRtxInRound(); + uint32_t received_bytes = state_->getReceivedBytesInRound(); + uint32_t sent_interest = state_->getSentInterestInRound(); + uint32_t lost_data = state_->getLostData(); + uint32_t recovered_losses = state_->getRecoveredLosses(); + uint32_t received_nacks = state_->getReceivedNacksInRound(); + + bool in_sync = (current_state_ == SyncState::in_sync); + state_->onNewRound((double)ROUND_LEN, in_sync); + rc_->onNewRound((double)ROUND_LEN); + + // update sync state if needed + if (current_state_ == SyncState::in_sync) { + double cache_rate = state_->getPacketFromCacheRatio(); + if (cache_rate > MAX_DATA_FROM_CACHE) { + current_state_ = SyncState::catch_up; + } + } else { + double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION; + double received_rate = state_->getReceivedRate(); + uint32_t round_without_nacks = state_->getRoundsWithoutNacks(); + double cache_ratio = state_->getPacketFromCacheRatio(); + if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH && + received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) { + current_state_ = SyncState::in_sync; + } + } + + TRANSPORT_LOGD("Calling updateSyncWindow in newRound function"); + updateSyncWindow(); + + sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, + recovered_losses, received_nacks); + newRound(); + }); +} + +void RTCTransportProtocol::discoveredRtt() { + start_send_interest_ = true; + ldr_->turnOnRTX(); + updateSyncWindow(); +} + +void RTCTransportProtocol::computeMaxSyncWindow() { + double production_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + if (production_rate == 0.0 || packet_size == 0.0) { + // the consumer has no info about the producer, + // keep the previous maxCWin + TRANSPORT_LOGD( + "Returning in computeMaxSyncWindow because: prod_rate: %d || " + "packet_size: %d", + (int)(production_rate == 0.0), (int)(packet_size == 0.0)); + return; + } + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; + + + max_sync_win_ = + (uint32_t)ceil((production_rate * lifetime_ms * + INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); + + max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); +} + +void RTCTransportProtocol::updateSyncWindow() { + computeMaxSyncWindow(); + + if (max_sync_win_ == INITIAL_WIN_MAX) { + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return; + + current_sync_win_ = INITIAL_WIN; + scheduleNextInterests(); + return; + } + + double prod_rate = state_->getProducerRate(); + double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = state_->getAveragePacketSize(); + + // if some of the info are not available do not update the current win + if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + current_sync_win_ += + ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size); + + if(current_state_ == SyncState::catch_up) { + current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; + } + + current_sync_win_ = std::min(current_sync_win_, max_sync_win_); + current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + } + + scheduleNextInterests(); +} + +void RTCTransportProtocol::decreaseSyncWindow() { + // called on future nack + // we have a new sample of the production rate, so update max win first + computeMaxSyncWindow(); + current_sync_win_--; + current_sync_win_ = std::max(current_sync_win_, WIN_MIN); + scheduleNextInterests(); +} + +void RTCTransportProtocol::sendInterest(Name *interest_name) { + TRANSPORT_LOGD("Sending interest for name %s", + interest_name->toString().c_str()); + + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + interest->setName(*interest_name); + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + interest->setLifetime(uint32_t(lifetime)); + + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); +} + +void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { + if (!is_running_ && !is_first_) return; + + if(!start_send_interest_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send rtx %u", seq); + interest_name->setSuffix(seq); + sendInterest(interest_name); +} + +void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { + if (!is_running_ && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send probe %u", seq); + interest_name->setSuffix(seq); + sendInterest(interest_name); +} + +void RTCTransportProtocol::scheduleNextInterests() { + TRANSPORT_LOGD("Schedule next interests"); + + if (!is_running_ && !is_first_) return; + + if(!start_send_interest_) return; // RTT discovering phase is not finished so + // do not start to send interests + + if (scheduler_timer_on_) return; // wait befor send other interests + + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + TRANSPORT_LOGD("Inactive producer."); + // here we keep seding the same interest until the producer + // does not start again + if (next_segment_ != 0) { + // the producer just become inactive, reset the state + inactiveProducer(); + } + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + TRANSPORT_LOGD("send interest %u", next_segment_); + interest_name->setSuffix(next_segment_); + + if (portal_->interestIsPending(*interest_name)) { + // if interest 0 is already pending we return + return; + } + + sendInterest(interest_name); + state_->onSendNewInterest(interest_name); + return; + } + + TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d", + state_->getPendingInterestNumber(), current_sync_win_); + + // skip nacked pacekts + if (next_segment_ <= state_->getLastSeqNacked()) { + next_segment_ = state_->getLastSeqNacked() + 1; + } + + // skipe received packets + if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) { + next_segment_ = state_->getHighestSeqReceivedInOrder() + 1; + } + + uint32_t sent_interests = 0; + while ((state_->getPendingInterestNumber() < current_sync_win_) && + (sent_interests < MAX_INTERESTS_IN_BATCH)) { + TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_); + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + interest_name->setSuffix(next_segment_); + + // send the packet only if: + // 1) it is not pending yet (not true for rtx) + // 2) the packet is not received or lost + // 3) is not in the rtx list + if (portal_->interestIsPending(*interest_name) || + state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN || + ldr_->isRtx(next_segment_)) { + TRANSPORT_LOGD( + "skip interest %u because: pending %u, recv %u, rtx %u", + next_segment_, (portal_->interestIsPending(*interest_name)), + (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN), + (ldr_->isRtx(next_segment_))); + next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + continue; + } + + + sent_interests++; + TRANSPORT_LOGD("send interest %u", next_segment_); + sendInterest(interest_name); + state_->onSendNewInterest(interest_name); + + next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + } + + if (state_->getPendingInterestNumber() < current_sync_win_) { + // we still have space in the window but we already sent a batch of + // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one + // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop + + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now( + std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES)); + scheduler_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + if (!scheduler_timer_on_) return; + + scheduler_timer_on_ = false; + scheduleNextInterests(); + }); + } +} + +void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { + uint32_t segment_number = interest->getName().getSuffix(); + + TRANSPORT_LOGD("timeout for packet %u", segment_number); + + if (segment_number >= MIN_PROBE_SEQ) { + // this is a timeout on a probe, do nothing + return; + } + + timeouts_or_nacks_.insert(segment_number); + + if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && + segment_number <= state_->getHighestSeqReceivedInOrder()) { + // we retransmit packets only if the producer is active, otherwise we + // use timeouts to avoid to send too much traffic + // + // a timeout is sent using RTX only if it is an old packet. if it is for a + // seq number that we didn't reach yet, we send the packet using the normal + // schedule next interest + TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number); + ldr_->onTimeout(segment_number); + state_->onTimeout(segment_number); + scheduleNextInterests(); + return; + } + + TRANSPORT_LOGD("handle timeout for packet %u using normal interests", + segment_number); + + if (segment_number < next_segment_) { + // this is a timeout for a packet that will be generated in the future but + // we are asking for higher sequence numbers. we need to go back like in the + // case of future nacks + TRANSPORT_LOGD("on timeout next seg = %u, jump to %u", + next_segment_, segment_number); + next_segment_ = segment_number; + } + + state_->onTimeout(segment_number); + scheduleNextInterests(); +} + +void RTCTransportProtocol::onNack(const ContentObject &content_object) { + struct nack_packet_t *nack = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = nack->getProductionSegement(); + uint32_t nack_segment = content_object.getName().getSuffix(); + bool is_rtx = ldr_->isRtx(nack_segment); + + // check if the packet got a timeout + + TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment, + production_seg); + + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(nack_segment); + if (tn_it != timeouts_or_nacks_.end() || is_rtx) { + compute_stats = false; + // remove packets from timeouts_or_nacks only in case of a past nack + } + + state_->onNackPacketReceived(content_object, compute_stats); + ldr_->onNackPacketReceived(content_object); + + // both in case of past and future nack we set next_segment_ equal to the + // production segment in the nack. In case of past nack we will skip unneded + // interest (this is already done in the scheduleNextInterest in any case) + // while in case of future nacks we can go back in time and ask again for the + // content that generated the nack + TRANSPORT_LOGD("on nack next seg = %u, jump to %u", + next_segment_, production_seg); + next_segment_ = production_seg; + + if (production_seg > nack_segment) { + // remove the nack is it exists + if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); + + // the client is asking for content in the past + // switch to catch up state and increase the window + // this is true only if the packet is not an RTX + if (!is_rtx) current_state_ = SyncState::catch_up; + + updateSyncWindow(); + } else { + // if production_seg == nack_segment we consider this a future nack, since + // production_seg is not yet created. this may happen in case of low + // production rate (e.g. ping at 1pps) + + // if a future nack was also retransmitted add it to the timeout_or_nacks + // set + if (is_rtx) timeouts_or_nacks_.insert(nack_segment); + + // the client is asking for content in the future + // switch to in sync state and decrease the window + current_state_ = SyncState::in_sync; + decreaseSyncWindow(); + } +} + +void RTCTransportProtocol::onProbe(const ContentObject &content_object) { + bool valid = state_->onProbePacketReceived(content_object); + if(!valid) return; + + struct nack_packet_t *probe = + (struct nack_packet_t *)content_object.getPayload()->data(); + uint32_t production_seg = probe->getProductionSegement(); + + // as for the nacks set next_segment_ + TRANSPORT_LOGD("on probe next seg = %u, jump to %u", + next_segment_, production_seg); + next_segment_ = production_seg; + + ldr_->onProbePacketReceived(content_object); + updateSyncWindow(); +} + +void RTCTransportProtocol::onContentObject(Interest &interest, + ContentObject &content_object) { + TRANSPORT_LOGD("Received content object of size: %zu", + content_object.payloadSize()); + uint32_t payload_size = content_object.payloadSize(); + uint32_t segment_number = content_object.getName().getSuffix(); + + if (segment_number >= MIN_PROBE_SEQ) { + TRANSPORT_LOGD("Received probe %u", segment_number); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onProbe(content_object); + return; + } + + if (payload_size == NACK_HEADER_SIZE) { + TRANSPORT_LOGD("Received nack %u", segment_number); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + onNack(content_object); + return; + } + + TRANSPORT_LOGD("Received content %u", segment_number); + + rc_->onDataPacketReceived(content_object); + bool compute_stats = true; + auto tn_it = timeouts_or_nacks_.find(segment_number); + if (tn_it != timeouts_or_nacks_.end()) { + compute_stats = false; + timeouts_or_nacks_.erase(tn_it); + } + if (ldr_->isRtx(segment_number)) { + compute_stats = false; + } + + // check if the packet was already received + PacketState state = state_->isReceivedOrLost(segment_number); + state_->onDataPacketReceived(content_object, compute_stats); + ldr_->onDataPacketReceived(content_object); + + // if the stat for this seq number is received do not send the packet to app + if (state != PacketState::RECEIVED) { + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + reassemble(content_object); + } else { + TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number); + } + + updateSyncWindow(); +} + +void RTCTransportProtocol::sendStatsToApp( + uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, + uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) { + if (*stats_summary_) { + // Send the stats to the app + stats_->updateQueuingDelay(state_->getQueuing()); + + // stats_->updateInterestFecTx(0); //todo must be implemented + // stats_->updateBytesFecRecv(0); //todo must be implemented + + stats_->updateRetxCount(retx_count); + stats_->updateBytesRecv(received_bytes); + stats_->updateInterestTx(sent_interests); + stats_->updateReceivedNacks(received_nacks); + + stats_->updateAverageWindowSize(current_sync_win_); + stats_->updateLossRatio(state_->getLossRate()); + stats_->updateAverageRtt(state_->getRTT()); + stats_->updateLostData(lost_data); + stats_->updateRecoveredData(recovered_losses); + stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); + (*stats_summary_)(*socket_->getInterface(), *stats_); + } +} + +void RTCTransportProtocol::reassemble(ContentObject &content_object) { + auto read_buffer = content_object.getPayload(); + TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length()); + read_buffer->trimStart(DATA_HEADER_SIZE); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h new file mode 100644 index 000000000..596887067 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc.h @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/datagram_reassembly.h> +#include <protocols/rtc/rtc_ldr.h> +#include <protocols/rtc/rtc_rc.h> +#include <protocols/rtc/rtc_state.h> +#include <protocols/transport_protocol.h> + +#include <unordered_set> +#include <vector> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCTransportProtocol : public TransportProtocol, + public DatagramReassembly { + public: + RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + ~RTCTransportProtocol(); + + using TransportProtocol::start; + + using TransportProtocol::stop; + + void resume() override; + + private: + enum class SyncState { catch_up = 0, in_sync = 1, last }; + + private: + // setup functions + void initParams(); + void reset() override; + + void inactiveProducer(); + + // protocol functions + void discoveredRtt(); + void newRound(); + + // window functions + void computeMaxSyncWindow(); + void updateSyncWindow(); + void decreaseSyncWindow(); + + // packet functions + void sendInterest(Name *interest_name); + void sendRtxInterest(uint32_t seq); + void sendProbeInterest(uint32_t seq); + void scheduleNextInterests() override; + void onTimeout(Interest::Ptr &&interest) override; + void onNack(const ContentObject &content_object); + void onProbe(const ContentObject &content_object); + void reassemble(ContentObject &content_object) override; + void onContentObject(Interest &interest, + ContentObject &content_object) override; + void onPacketDropped(Interest &interest, + ContentObject &content_object) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} + + // interaction with app functions + void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes, + uint32_t sent_interests, uint32_t lost_data, + uint32_t recovered_losses, uint32_t received_nacks); + // protocol state + bool start_send_interest_; + SyncState current_state_; + // cwin vars + uint32_t current_sync_win_; + uint32_t max_sync_win_; + + // controller var + std::unique_ptr<asio::steady_timer> round_timer_; + std::unique_ptr<asio::steady_timer> scheduler_timer_; + bool scheduler_timer_on_; + + // timeouts + std::unordered_set<uint32_t> timeouts_or_nacks_; + + // names/packets var + uint32_t next_segment_; + + std::shared_ptr<RTCState> state_; + std::shared_ptr<RTCRateControl> rc_; + std::shared_ptr<RTCLossDetectionAndRecovery> ldr_; + + uint32_t number_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h new file mode 100644 index 000000000..0cf9516ab --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/rtc/rtc_packet.h> +#include <stdint.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +// used in rtc +// protocol consts +const uint32_t ROUND_LEN = 200; +// ms interval of time on which +// we take decisions / measurements +const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8; +// how big (in ms) should be the buffer at the producer. +// increasing this number we increase the time that an +// interest will wait for the data packet to be produced +// at the producer socket +const uint32_t PRODUCER_BUFFER_MS = 200; // ms + +// interest scheduler +const uint32_t MAX_INTERESTS_IN_BATCH = 5; +const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec + +// packet const +const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes +const uint32_t RTC_INTEREST_LIFETIME = 1000; + +// probes sequence range +const uint32_t MIN_PROBE_SEQ = 0xefffffff; +const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ; +const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1; +// RTT_PROBE_INTERVAL will be used during the section while +// INIT_RTT_PROBE_INTERVAL is used at the beginning to +// quickily estimate the RTT +const uint32_t RTT_PROBE_INTERVAL = 200000; // us +const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us +const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT +// if the produdcer is not yet started we need to probe multple times +// to get an answer. we wait 100ms between each try +const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms +// once we get the first probe we wait at most 60ms for the others +const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms +// we reuires at least 5 probes to be recevied +const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms +const uint32_t MAX_PENDING_PROBES = 10; + + +// congestion +const double MAX_QUEUING_DELAY = 100.0; // ms + +// data from cache +const double MAX_DATA_FROM_CACHE = 0.25; // 25% + +// window const +const uint32_t INITIAL_WIN = 5; // pkts +const uint32_t INITIAL_WIN_MAX = 1000000; // pkts +const uint32_t WIN_MIN = 5; // pkts +const double CATCH_UP_WIN_INCREMENT = 1.2; +// used in rate control +const double WIN_DECREASE_FACTOR = 0.5; +const double WIN_INCREASE_FACTOR = 1.5; + +// round in congestion +const double ROUNDS_BEFORE_TAKE_ACTION = 5; + +// used in state +const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3; +const double PRODUCTION_RATE_FRACTION = 0.8; + +const uint32_t INIT_PACKET_SIZE = 1200; + +const double MOVING_AVG_ALPHA = 0.8; + +const double MILLI_IN_A_SEC = 1000.0; +const double MICRO_IN_A_SEC = 1000000.0; + +const double MAX_CACHED_PACKETS = 262144; // 2^18 + // about 50 sec of traffic at 50Mbps + // with 1200 bytes packets + +const uint32_t MAX_ROUND_WHIOUT_PACKETS = + (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds; + +// used in ldr +const uint32_t RTC_MAX_RTX = 100; +const uint32_t RTC_MAX_AGE = 60000; // in ms +const uint64_t MAX_TIMER_RTX = ~0; +const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms +const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets +const double CATCH_UP_RTT_INCREMENT = 1.2; + +// used by producer +const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms +const uint32_t MIN_PRODUCTION_RATE = 10; // pps + // min prod rate + // set running several test + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc new file mode 100644 index 000000000..c098088a3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_data_path.h> +#include <stdlib.h> + +#include <algorithm> +#include <cfloat> +#include <chrono> + +#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCDataPath::RTCDataPath(uint32_t path_id) + : path_id_(path_id), + min_rtt(UINT_MAX), + prev_min_rtt(UINT_MAX), + min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the + // real OWD, but the measured one, that depends on the + // clock of sender and receiver. the only meaningful + // value is is the queueing delay. for this reason we + // keep both RTT (for the windowd calculation) and OWD + // (for congestion/quality control) + prev_min_owd(INT_MAX), + avg_owd(DBL_MAX), + queuing_delay(DBL_MAX), + jitter_(0.0), + last_owd_(0), + largest_recv_seq_(0), + largest_recv_seq_time_(0), + avg_inter_arrival_(DBL_MAX), + received_nacks_(false), + received_packets_(false), + rounds_without_packets_(0), + last_received_data_packet_(0), + RTT_history_(HISTORY_LEN), + OWD_history_(HISTORY_LEN){}; + +void RTCDataPath::insertRttSample(uint64_t rtt) { + // for the rtt we only keep track of the min one + if (rtt < min_rtt) min_rtt = rtt; + last_received_data_packet_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +void RTCDataPath::insertOwdSample(int64_t owd) { + // for owd we use both min and avg + if (owd < min_owd) min_owd = owd; + + if (avg_owd != DBL_MAX) + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + else { + avg_owd = owd; + } + + int64_t queueVal = owd - std::min(getMinOwd(), min_owd); + + if (queuing_delay != DBL_MAX) + queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC); + else { + queuing_delay = queueVal; + } + + // keep track of the jitter computed as for RTP (RFC 3550) + int64_t diff = std::abs(owd - last_owd_); + last_owd_ = owd; + jitter_ += (1.0 / 16.0) * ((double)diff - jitter_); + + // owd is computed only for valid data packets so we count only + // this for decide if we recevie traffic or not + received_packets_ = true; +} + +void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { + // got packet in sequence, compute gap + if (largest_recv_seq_ == (segment_number - 1)) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t delta = now - largest_recv_seq_time_; + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = now; + if (avg_inter_arrival_ == DBL_MAX) + avg_inter_arrival_ = delta; + else + avg_inter_arrival_ = + (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC); + return; + } + + // ooo packet, update the stasts if needed + if (largest_recv_seq_ <= segment_number) { + largest_recv_seq_ = segment_number; + largest_recv_seq_time_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } +} + +void RTCDataPath::receivedNack() { received_nacks_ = true; } + +double RTCDataPath::getInterArrivalGap() { + if (avg_inter_arrival_ == DBL_MAX) return 0; + return avg_inter_arrival_; +} + +bool RTCDataPath::isActive() { + if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) + return true; + return false; +} + +bool RTCDataPath::pathToProducer() { + if (received_nacks_) return true; + return false; +} + +void RTCDataPath::roundEnd() { + // reset min_rtt and add it to the history + if (min_rtt != UINT_MAX) { + prev_min_rtt = min_rtt; + } else { + // this may happen if we do not receive any packet + // from this path in the last round. in this case + // we use the measure from the previuos round + min_rtt = prev_min_rtt; + } + + if (min_rtt == 0) min_rtt = 1; + + RTT_history_.pushBack(min_rtt); + min_rtt = UINT_MAX; + + // do the same for min owd + if (min_owd != INT_MAX) { + prev_min_owd = min_owd; + } else { + min_owd = prev_min_owd; + } + + if (min_owd != INT_MAX) { + OWD_history_.pushBack(min_owd); + min_owd = INT_MAX; + } + + if (!received_packets_) + rounds_without_packets_++; + else + rounds_without_packets_ = 0; + received_packets_ = false; +} + +uint32_t RTCDataPath::getPathId() { return path_id_; } + +double RTCDataPath::getQueuingDealy() { return queuing_delay; } + +uint64_t RTCDataPath::getMinRtt() { + if (RTT_history_.size() != 0) return RTT_history_.begin(); + return 0; +} + +int64_t RTCDataPath::getMinOwd() { + if (OWD_history_.size() != 0) return OWD_history_.begin(); + return 0; +} + +double RTCDataPath::getJitter() { return jitter_; } + +uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; } + +void RTCDataPath::clearRtt() { RTT_history_.clear(); } + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h new file mode 100644 index 000000000..c5c37fc0d --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <stdint.h> +#include <utils/min_filter.h> + +#include <climits> + +namespace transport { + +namespace protocol { + +namespace rtc { + +const double ALPHA_RTC = 0.125; +const uint32_t HISTORY_LEN = 20; // 4 sec + +class RTCDataPath { + public: + RTCDataPath(uint32_t path_id); + + public: + void insertRttSample(uint64_t rtt); + void insertOwdSample(int64_t owd); + void computeInterArrivalGap(uint32_t segment_number); + void receivedNack(); + + uint32_t getPathId(); + uint64_t getMinRtt(); + double getQueuingDealy(); + double getInterArrivalGap(); + double getJitter(); + bool isActive(); + bool pathToProducer(); + uint64_t getLastPacketTS(); + + void clearRtt(); + + void roundEnd(); + + private: + uint32_t path_id_; + + int64_t getMinOwd(); + + uint64_t min_rtt; + uint64_t prev_min_rtt; + + int64_t min_owd; + int64_t prev_min_owd; + + double avg_owd; + + double queuing_delay; + + double jitter_; + int64_t last_owd_; + + uint32_t largest_recv_seq_; + uint64_t largest_recv_seq_time_; + double avg_inter_arrival_; + + // flags to check if a path is active + // we considere a path active if it reaches a producer + //(not a cache) --aka we got at least one nack on this path-- + // and if we receives packets + bool received_nacks_; + bool received_packets_; + uint8_t rounds_without_packets_; // if we don't get any packet + // for MAX_ROUNDS_WITHOUT_PKTS + // we consider the path inactive + uint64_t last_received_data_packet_; // timestamp for the last data received + // on this path + + utils::MinFilter<uint64_t> RTT_history_; + utils::MinFilter<int64_t> OWD_history_; +}; + +} // namespace rtc + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc new file mode 100644 index 000000000..e91b29c04 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -0,0 +1,427 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_ldr.h> + +#include <algorithm> +#include <unordered_set> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( + SendRtxCallback &&callback, asio::io_service &io_service) + : rtx_on_(false), + next_rtx_timer_(MAX_TIMER_RTX), + last_event_(0), + sentinel_timer_interval_(MAX_TIMER_RTX), + send_rtx_callback_(std::move(callback)) { + timer_ = std::make_unique<asio::steady_timer>(io_service); + sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service); +} + +RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} + +void RTCLossDetectionAndRecovery::turnOnRTX() { + rtx_on_ = true; + scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT); +} + +void RTCLossDetectionAndRecovery::turnOffRTX() { + rtx_on_ = false; + clear(); +} + +void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { + // always add timeouts to the RTX list to avoid to send the same packet as if + // it was not a rtx + addToRetransmissions(seq, seq + 1); + last_event_ = getNow(); +} + +void RTCLossDetectionAndRecovery::onDataPacketReceived( + const core::ContentObject &content_object) { + last_event_ = getNow(); + + uint32_t seq = content_object.getName().getSuffix(); + if (deleteRtx(seq)) { + state_->onPacketRecovered(seq); + } else { + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + TRANSPORT_LOGD("received data. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq); + } +} + +void RTCLossDetectionAndRecovery::onNackPacketReceived( + const core::ContentObject &nack) { + last_event_ = getNow(); + + uint32_t seq = nack.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint32_t production_seq = nack_pkt->getProductionSegement(); + + if (production_seq > seq) { + // this is a past nack, all data before productionSeq are lost. if + // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to + // recover any packet. If this is not the case we can try to recover the + // packets between state_->getHighestSeqReceivedInOrder() and productionSeq. + // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with + // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and + // 14 that are not arrived yet + deleteRtx(seq); + TRANSPORT_LOGD("received past nack. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); + } else { + // future nack. here there should be a gap between the last data received + // and this packet and is it possible to recover the packets between the + // last received data and the production seq. we should not use the seq + // number of the nack since we know that is too early to ask for this seq + // number + // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack + // with productionSeq = 18. this says that all the packets between 12 and 18 + // may got lost and we should ask them + deleteRtx(seq); + TRANSPORT_LOGD("received futrue nack. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); + } +} + +void RTCLossDetectionAndRecovery::onProbePacketReceived( + const core::ContentObject &probe) { + // we don't log the reception of a probe packet for the sentinel timer because + // probes are not taken into account into the sync window. we use them as + // future nacks to detect possible packets lost + if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + uint32_t production_seq = probe_pkt->getProductionSegement(); + TRANSPORT_LOGD("received probe. add from %u to %u ", + state_->getHighestSeqReceivedInOrder() + 1, production_seq); + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, + production_seq); +} + +void RTCLossDetectionAndRecovery::clear() { + rtx_state_.clear(); + rtx_timers_.clear(); + sentinel_timer_->cancel(); + if (next_rtx_timer_ != MAX_TIMER_RTX) { + next_rtx_timer_ = MAX_TIMER_RTX; + timer_->cancel(); + } +} + +void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start, + uint32_t stop) { + // skip nacked packets + if (start <= state_->getLastSeqNacked()) { + start = state_->getLastSeqNacked() + 1; + } + + // skip received or lost packets + if (start <= state_->getHighestSeqReceivedInOrder()) { + start = state_->getHighestSeqReceivedInOrder() + 1; + } + + for (uint32_t seq = start; seq < stop; seq++) { + if (!isRtx(seq) && // is not already an rtx + // is not received or lost + state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { + // add rtx + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.next_send_ = computeNextSend(seq, true); + state.rtx_count_ = 0; + TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq, + (state.next_send_ - getNow())); + rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); + rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq)); + } + } + scheduleNextRtx(); +} + +uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, + bool new_rtx) { + uint64_t now = getNow(); + if (new_rtx) { + // for the new rtx we wait one estimated IAT after the loss detection. this + // is bacause, assuming that packets arrive with a constant IAT, we should + // get a new packet every IAT + double prod_rate = state_->getProducerRate(); + uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; + uint32_t jitter = 0; + + if (prod_rate != 0) { + double packet_size = state_->getAveragePacketSize(); + estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + jitter = ceil(state_->getJitter()); + } + + uint32_t wait = estimated_iat + jitter; + TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u", + seq, wait, state_->getRTT(), estimated_iat, jitter); + + return now + wait; + } else { + // wait one RTT + // however if the IAT is larger than the RTT, wait one IAT + uint32_t wait = SENTINEL_TIMER_INTERVAL; + + double prod_rate = state_->getProducerRate(); + if (prod_rate == 0) { + return now + SENTINEL_TIMER_INTERVAL; + } + + double packet_size = state_->getAveragePacketSize(); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + + uint64_t rtt = state_->getRTT(); + if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; + wait = rtt; + + if (estimated_iat > rtt) wait = estimated_iat; + + uint32_t jitter = ceil(state_->getJitter()); + wait += jitter; + + // it may happen that the channel is congested and we have some additional + // queuing delay to take into account + uint32_t queue = ceil(state_->getQueuing()); + wait += queue; + + TRANSPORT_LOGD( + "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u", + seq, wait, state_->getRTT(), estimated_iat, jitter, queue); + + return now + wait; + } +} + +void RTCLossDetectionAndRecovery::retransmit() { + if (rtx_timers_.size() == 0) return; + + uint64_t now = getNow(); + + auto it = rtx_timers_.begin(); + std::unordered_set<uint32_t> lost_pkt; + uint32_t sent_counter = 0; + while (it != rtx_timers_.end() && it->first <= now && + sent_counter < MAX_INTERESTS_IN_BATCH) { + uint32_t seq = it->second; + auto rtx_it = + rtx_state_.find(seq); // this should always return a valid iter + if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX || + (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || + seq < state_->getLastSeqNacked()) { + // max rtx reached or packet too old or packet nacked, this packet is lost + TRANSPORT_LOGD( + "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u", + seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX), + ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE), + (seq < state_->getLastSeqNacked())); + lost_pkt.insert(seq); + it++; + } else { + // resend the packet + state_->onRetransmission(seq); + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) rtx_it->second.rtx_count_++; + rtx_it->second.next_send_ = computeNextSend(seq, false); + it = rtx_timers_.erase(it); + rtx_timers_.insert( + std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); + TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq, + (rtx_it->second.next_send_ - now)); + send_rtx_callback_(seq); + sent_counter++; + } + } + + // remove packets if needed + for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) { + uint32_t seq = *lost_it; + state_->onPacketLost(seq); + deleteRtx(seq); + } +} + +void RTCLossDetectionAndRecovery::scheduleNextRtx() { + if (rtx_timers_.size() == 0) { + // all the rtx were removed, reset timer + next_rtx_timer_ = MAX_TIMER_RTX; + return; + } + + // check if timer is alreay set + if (next_rtx_timer_ != MAX_TIMER_RTX) { + // a new check for rtx is already scheduled + if (next_rtx_timer_ > rtx_timers_.begin()->first) { + // we need to re-schedule it + timer_->cancel(); + } else { + // wait for the next timer + return; + } + } + + // set a new timer + next_rtx_timer_ = rtx_timers_.begin()->first; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t wait = 1; + if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now) + wait = next_rtx_timer_ - now; + + std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this()); + timer_->expires_from_now(std::chrono::milliseconds(wait)); + timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->retransmit(); + s->next_rtx_timer_ = MAX_TIMER_RTX; + s->scheduleNextRtx(); + } + }); +} + +bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) { + auto it_rtx = rtx_state_.find(seq); + if (it_rtx == rtx_state_.end()) return false; // rtx not found + + uint64_t ts = it_rtx->second.next_send_; + auto it_timers = rtx_timers_.find(ts); + while (it_timers != rtx_timers_.end() && it_timers->first == ts) { + if (it_timers->second == seq) { + rtx_timers_.erase(it_timers); + break; + } + it_timers++; + } + + bool lost = it_rtx->second.rtx_count_ > 0; + rtx_state_.erase(it_rtx); + + return lost; +} + +void RTCLossDetectionAndRecovery::scheduleSentinelTimer( + uint64_t expires_from_now) { + std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this()); + sentinel_timer_->expires_from_now( + std::chrono::milliseconds(expires_from_now)); + sentinel_timer_->async_wait([self](std::error_code ec) { + if (ec) return; + if (auto s = self.lock()) { + s->sentinelTimer(); + } + }); +} + +void RTCLossDetectionAndRecovery::sentinelTimer() { + uint64_t now = getNow(); + + bool expired = false; + bool sent = false; + if ((now - last_event_) >= sentinel_timer_interval_) { + // at least a sentinel_timer_interval_ elapsed since last event + expired = true; + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { + // this happens at the beginning (or if the producer stops for some + // reason) we need to keep sending interest 0 until we get an answer + TRANSPORT_LOGD( + "sentinel timer: the producer is not active, send packet 0"); + state_->onRetransmission(0); + send_rtx_callback_(0); + } else { + TRANSPORT_LOGD( + "sentinel timer: the producer is active, send the 10 oldest packets"); + sent = true; + uint32_t rtx = 0; + auto it = state_->getPendingInterestsMapBegin(); + auto end = state_->getPendingInterestsMapEnd(); + while (it != end && rtx < MAX_RTX_WITH_SENTINEL) { + uint32_t seq = it->first; + TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq); + addToRetransmissions(seq, seq + 1); + rtx++; + it++; + } + } + } else { + // sentinel timer did not expire because we registered at least one event + } + + uint32_t next_timer; + double prod_rate = state_->getProducerRate(); + if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) { + TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL); + next_timer = SENTINEL_TIMER_INTERVAL; + } else { + double prod_rate = state_->getProducerRate(); + double packet_size = state_->getAveragePacketSize(); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint32_t jitter = ceil(state_->getJitter()); + + // try to reduce the number of timers if the estimated IAT is too small + next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1); + TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u", + next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat, + jitter); + + if (!expired) { + // discount the amout of time that is already passed + uint32_t discount = now - last_event_; + if (next_timer > discount) { + next_timer = next_timer - discount; + } else { + // in this case we trigger the timer in 1 ms + next_timer = 1; + } + TRANSPORT_LOGD("timer after discout: %u", next_timer); + } else if (sent) { + // wait at least one producer stats interval + owd to check if the + // production rate is reducing. + uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing()); + next_timer = std::max(next_timer, min_wait); + TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer); + } + } + + scheduleSentinelTimer(next_timer); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h new file mode 100644 index 000000000..c0912303b --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/name.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_state.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <functional> +#include <map> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCLossDetectionAndRecovery + : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> { + struct rtx_state_ { + uint64_t first_send_; + uint64_t next_send_; + uint32_t rtx_count_; + }; + + using rtxState = struct rtx_state_; + using SendRtxCallback = std::function<void(uint32_t)>; + + public: + RTCLossDetectionAndRecovery(SendRtxCallback &&callback, + asio::io_service &io_service); + + ~RTCLossDetectionAndRecovery(); + + void setState(std::shared_ptr<RTCState> state) { state_ = state; } + void turnOnRTX(); + void turnOffRTX(); + + void onTimeout(uint32_t seq); + void onDataPacketReceived(const core::ContentObject &content_object); + void onNackPacketReceived(const core::ContentObject &nack); + void onProbePacketReceived(const core::ContentObject &probe); + + void clear(); + + bool isRtx(uint32_t seq) { + if (rtx_state_.find(seq) != rtx_state_.end()) return true; + return false; + } + + private: + void addToRetransmissions(uint32_t start, uint32_t stop); + uint64_t computeNextSend(uint32_t seq, bool new_rtx); + void retransmit(); + void scheduleNextRtx(); + bool deleteRtx(uint32_t seq); + void scheduleSentinelTimer(uint64_t expires_from_now); + void sentinelTimer(); + + uint64_t getNow() { + using namespace std::chrono; + uint64_t now = + duration_cast<milliseconds>(steady_clock::now().time_since_epoch()) + .count(); + return now; + } + + // this map keeps track of the retransmitted interest, ordered from the oldest + // to the newest one. the state contains the timer of the first send of the + // interest (from pendingIntetests_), the timer of the next send (key of the + // multimap) and the number of rtx + std::map<uint32_t, rtxState> rtx_state_; + // this map stored the rtx by timer. The key is the time at which the rtx + // should be sent, and the val is the interest seq number + std::multimap<uint64_t, uint32_t> rtx_timers_; + + bool rtx_on_; + uint64_t next_rtx_timer_; + uint64_t last_event_; + uint64_t sentinel_timer_interval_; + std::unique_ptr<asio::steady_timer> timer_; + std::unique_ptr<asio::steady_timer> sentinel_timer_; + std::shared_ptr<RTCState> state_; + + SendRtxCallback send_rtx_callback_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h new file mode 100644 index 000000000..abb1323a3 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +/* data packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | payload | + * | ... | + */ + +/* nack packet + * +-----------------------------------------+ + * | uint64_t: timestamp | + * | | + * +-----------------------------------------+ + * | uint32_t: prod rate (bytes per sec) | + * +-----------------------------------------+ + * | uint32_t: current seg in production | + * +-----------------------------------------+ + */ + +#pragma once +#include <arpa/inet.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +inline uint64_t _ntohll(const uint64_t *input) { + uint64_t return_val; + uint8_t *tmp = (uint8_t *)&return_val; + + tmp[0] = *input >> 56; + tmp[1] = *input >> 48; + tmp[2] = *input >> 40; + tmp[3] = *input >> 32; + tmp[4] = *input >> 24; + tmp[5] = *input >> 16; + tmp[6] = *input >> 8; + tmp[7] = *input >> 0; + + return return_val; +} + +inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } + +const uint32_t DATA_HEADER_SIZE = 12; // bytes + // XXX: sizeof(data_packet_t) is 16 + // beacuse of padding +const uint32_t NACK_HEADER_SIZE = 16; + +struct data_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } +}; + +struct nack_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + uint32_t prod_seg; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + + inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } + inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h new file mode 100644 index 000000000..34d090092 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/rtc_state.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> { + public: + RTCRateControl() + : rc_on_(false), + congestion_win_(1000000), // init the win to a large number + congestion_state_(CongestionState::Normal), + protocol_state_(nullptr) {} + + virtual ~RTCRateControl() = default; + + void turnOnRateControl() { rc_on_ = true; } + void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; }; + uint32_t getCongesionWindow() { return congestion_win_; }; + + virtual void onNewRound(double round_len) = 0; + virtual void onDataPacketReceived( + const core::ContentObject &content_object) = 0; + + protected: + enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last }; + + protected: + bool rc_on_; + uint32_t congestion_win_; + CongestionState congestion_state_; + + std::shared_ptr<RTCState> protocol_state_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc new file mode 100644 index 000000000..b577b5bea --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_frame.cc @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_frame.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {} + +RTCRateControlFrame::~RTCRateControlFrame() {} + +void RTCRateControlFrame::onNewRound(double round_len) { + if (!rc_on_) return; + + CongestionState prev_congestion_state = congestion_state_; + cc_detector_.updateStats(); + congestion_state_ = (CongestionState)cc_detector_.getState(); + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // congestion detected, notify app and init congestion win + double prod_rate = protocol_state_->getReceivedRate(); + double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = protocol_state_->getAveragePacketSize(); + + if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) { + // TODO do something + return; + } + + congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + } + uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + congestion_win_ = std::max(win, WIN_MIN); + return; + } +} + +void RTCRateControlFrame::onDataPacketReceived( + const core::ContentObject &content_object) { + if (!rc_on_) return; + + uint32_t seq = content_object.getName().getSuffix(); + if (!protocol_state_->isPending(seq)) return; + + cc_detector_.addPacket(content_object); +} + +void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts, + uint64_t last_probe_ts, + uint32_t total_probes) { + // TODO + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h new file mode 100644 index 000000000..25d5ddbb6 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_frame.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <protocols/rtc/congestion_detection.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlFrame : public RTCRateControl { + public: + RTCRateControlFrame(); + + ~RTCRateControlFrame(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object); + + void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts, + uint32_t total_probes); + + private: + CongestionDetection cc_detector_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc new file mode 100644 index 000000000..a1c89e329 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_rc_queue.h> + +#include <algorithm> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCRateControlQueue::RTCRateControlQueue() + : rounds_since_last_drop_(0), + rounds_without_congestion_(0), + last_queue_(0) {} + +RTCRateControlQueue::~RTCRateControlQueue() {} + +void RTCRateControlQueue::onNewRound(double round_len) { + if (!rc_on_) return; + + double received_rate = protocol_state_->getReceivedRate(); + double target_rate = + protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION; + double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; + double packet_size = protocol_state_->getAveragePacketSize(); + double queue = protocol_state_->getQueuing(); + + if (rtt == 0.0) return; // no info from the producer + + CongestionState prev_congestion_state = congestion_state_; + + if (prev_congestion_state == CongestionState::Normal && + received_rate >= target_rate) { + // if the queue is high in this case we are most likelly fighting with + // a TCP flow and there is enough bandwidth to match the producer rate + congestion_state_ = CongestionState::Normal; + } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) { + // here we detect congestion. in the case that last_queue == queue + // the consumer didn't receive any packet from the producer so we + // consider this case as congestion + // TODO: wath happen in case of high loss rate? + congestion_state_ = CongestionState::Congested; + } else { + // nothing bad is happening + congestion_state_ = CongestionState::Normal; + } + + last_queue_ = queue; + + if (congestion_state_ == CongestionState::Congested) { + if (prev_congestion_state == CongestionState::Normal) { + // init the congetion window using the received rate + congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); + rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1; + } + + if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { + uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; + congestion_win_ = std::max(win, WIN_MIN); + rounds_since_last_drop_ = 0; + return; + } + + rounds_since_last_drop_++; + } + + if (congestion_state_ == CongestionState::Normal) { + if (prev_congestion_state == CongestionState::Congested) { + rounds_without_congestion_ = 0; + } + + rounds_without_congestion_++; + if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; + + congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR; + congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); + } +} + +void RTCRateControlQueue::onDataPacketReceived( + const core::ContentObject &content_object) { + // nothing to do + return; +} + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h new file mode 100644 index 000000000..407354d43 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <protocols/rtc/rtc_rc.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RTCRateControlQueue : public RTCRateControl { + public: + RTCRateControlQueue(); + + ~RTCRateControlQueue(); + + void onNewRound(double round_len); + void onDataPacketReceived(const core::ContentObject &content_object); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + uint32_t rounds_since_last_drop_; + uint32_t rounds_without_congestion_; + double last_queue_; +}; + +} // end namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc new file mode 100644 index 000000000..eabf8942c --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -0,0 +1,560 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_state.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service) + : rtt_probes_(std::make_shared<ProbeHandler>( + std::move(rtt_probes_callback), io_service)), + discovered_rtt_callback_(std::move(discovered_rtt_callback)) { + init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); + initParams(); +} + +RTCState::~RTCState() {} + +void RTCState::initParams() { + // packets counters (total) + sent_interests_ = 0; + sent_rtx_ = 0; + received_data_ = 0; + received_nacks_ = 0; + received_timeouts_ = 0; + received_probes_ = 0; + + // loss counters + packets_lost_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = 0; + highest_seq_received_ = 0; + highest_seq_received_in_order_ = 0; + last_seq_nacked_ = 0; + loss_rate_ = 0.0; + residual_loss_rate_ = 0.0; + + // bw counters + received_bytes_ = 0; + avg_packet_size_ = INIT_PACKET_SIZE; + production_rate_ = 0.0; + received_rate_ = 0.0; + + // nack counter + nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + // packets counter + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + data_from_cache_rate_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + // round conunters + rounds_ = 0; + rounds_without_nacks_ = 0; + rounds_without_packets_ = 0; + + last_production_seq_ = 0; + producer_is_active_ = false; + last_prod_update_ = 0; + + // paths stats + path_table_.clear(); + main_path_ = nullptr; + + // packet received + received_or_lost_packets_.clear(); + + // pending interests + pending_interests_.clear(); + + // init rtt + first_interest_sent_ = ~0; + init_rtt_ = false; + rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + rtt_probes_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); +} + +// packet events +void RTCState::onSendNewInterest(const core::Name *interest_name) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint32_t seq = interest_name->getSuffix(); + pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); + + if(sent_interests_ == 0) first_interest_sent_ = now; + + sent_interests_++; + sent_interests_last_round_++; +} + +void RTCState::onTimeout(uint32_t seq) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + } + received_timeouts_++; +} + +void RTCState::onRetransmission(uint32_t seq) { + // remove the interest for the pendingInterest map only after the first rtx. + // in this way we can handle the ooo packets that come in late as normla + // packet. we consider a packet lost only if we sent at least an RTX for it. + // XXX this may become problematic if we stop the RTX transmissions + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + pending_interests_.erase(it); + packets_lost_++; + } + sent_rtx_++; + sent_rtx_last_round_++; +} + +void RTCState::onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats) { + uint32_t seq = content_object.getName().getSuffix(); + if (compute_stats) { + updatePathStats(content_object, false); + received_data_last_round_++; + } + received_data_++; + + struct data_packet_t *data_pkt = + (struct data_packet_t *)content_object.getPayload()->data(); + uint64_t production_time = data_pkt->getTimestamp(); + if (last_prod_update_ < production_time) { + last_prod_update_ = production_time; + uint32_t production_rate = data_pkt->getProductionRate(); + production_rate_ = (double)production_rate; + } + + updatePacketSize(content_object); + updateReceivedBytes(content_object); + addRecvOrLost(seq, PacketState::RECEIVED); + + if (seq > highest_seq_received_) highest_seq_received_ = seq; + + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; + + received_packets_last_round_++; +} + +void RTCState::onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats) { + uint32_t seq = nack.getName().getSuffix(); + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)nack.getPayload()->data(); + uint64_t production_time = nack_pkt->getTimestamp(); + uint32_t production_seq = nack_pkt->getProductionSegement(); + uint32_t production_rate = nack_pkt->getProductionRate(); + + if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) || + last_prod_update_ < production_time) { + // update production rate + last_prod_update_ = production_time; + last_production_seq_ = production_seq; + production_rate_ = (double)production_rate; + } + + if (compute_stats) { + // this is not an RTX + updatePathStats(nack, true); + nack_on_last_round_ = true; + } + + // for statistics pourpose we log all nacks, also the one received for + // retransmitted packets + received_nacks_++; + received_nacks_last_round_++; + + if (production_seq > seq) { + // old nack, seq is lost + // update last nacked + if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; + TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq); + onPacketLost(seq); + } else if (seq > production_seq) { + // future nack + // remove the nack from the pending interest map + // (the packet is not received/lost yet) + pending_interests_.erase(seq); + } else { + // this should be a quite rear event. simply remove the + // packet from the pending interest list + pending_interests_.erase(seq); + } + + // the producer is responding + // we consider it active only if the production rate is not 0 + // or the production sequence number is not 1 + if (production_rate_ != 0 || production_seq != 1) { + producer_is_active_ = true; + } + + received_packets_last_round_++; +} + +void RTCState::onPacketLost(uint32_t seq) { + TRANSPORT_LOGD("packet %u is lost", seq); + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) { + // this packet was never retransmitted so it does + // not appear in the loss count + packets_lost_++; + } + addRecvOrLost(seq, PacketState::LOST); +} + +void RTCState::onPacketRecovered(uint32_t seq) { + losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); +} + +bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { + uint32_t seq = probe.getName().getSuffix(); + uint64_t rtt; + + rtt = rtt_probes_->getRtt(seq); + + if (rtt == 0) return false; // this is not a valid probe + + // like for data and nacks update the path stats. Here the RTT is computed + // by the probe handler. Both probes for rtt and bw are good to esimate + // info on the path + uint32_t path_label = probe.getPathLabel(); + + auto path_it = path_table_.find(path_label); + + // update production rate and last_seq_nacked like in case of a nack + struct nack_packet_t *probe_pkt = + (struct nack_packet_t *)probe.getPayload()->data(); + uint64_t sender_timestamp = probe_pkt->getTimestamp(); + uint32_t production_seq = probe_pkt->getProductionSegement(); + uint32_t production_rate = probe_pkt->getProductionRate(); + + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + path->insertRttSample(rtt); + path->receivedNack(); + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + int64_t OWD = now - sender_timestamp; + path->insertOwdSample(OWD); + + if (last_prod_update_ < sender_timestamp) { + last_production_seq_ = production_seq; + last_prod_update_ = sender_timestamp; + production_rate_ = (double)production_rate; + } + + // the producer is responding + // we consider it active only if the production rate is not 0 + // or the production sequence numner is not 1 + if (production_rate_ != 0 || production_seq != 1) { + producer_is_active_ = true; + } + + // check for init RTT. if received_probes_ is equal to 0 schedule a timer to + // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't + // wait forever + received_probes_++; + + if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){ + if(received_probes_ == 1){ + // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others + main_path_ = path; + setInitRttTimer(INIT_RTT_PROBE_WAIT); + } + if(received_probes_ == INIT_RTT_PROBES) { + // we are done + init_rtt_timer_->cancel(); + checkInitRttTimer(); + } + } + + received_packets_last_round_++; + + // ignore probes sent before the first interest + if((now - rtt) <= first_interest_sent_) return false; + return true; +} + +void RTCState::onNewRound(double round_len, bool in_sync) { + // XXX + // here we take into account only the single path case so we assume that we + // don't use two paths in parellel for this single flow + + if (path_table_.empty()) return; + + double bytes_per_sec = + ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); + if(received_rate_ == 0) + received_rate_ = bytes_per_sec; + else + received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + + // search for an active path. There should be only one active path (meaning a + // path that leads to the producer socket -no cache- and from which we are + // currently getting data packets) at any time. However it may happen that + // there are mulitple active paths in case of mobility (the old path will + // remain active for a short ammount of time). The main path is selected as + // the active path from where the consumer received the latest data packet + + uint64_t last_packet_ts = 0; + main_path_ = nullptr; + + for (auto it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->roundEnd(); + if (it->second->isActive()) { + uint64_t ts = it->second->getLastPacketTS(); + if (ts > last_packet_ts) { + last_packet_ts = ts; + main_path_ = it->second; + } + } + } + + if (in_sync) updateLossRate(); + + // handle nacks + if (!nack_on_last_round_ && received_bytes_ > 0) { + rounds_without_nacks_++; + } else { + rounds_without_nacks_ = 0; + } + + // check if the producer is active + if (received_packets_last_round_ != 0) { + rounds_without_packets_ = 0; + } else { + rounds_without_packets_++; + if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS && + producer_is_active_ != false) { + initParams(); + } + } + + // compute cache/producer ratio + if (received_data_last_round_ != 0) { + double new_rate = + (double)received_data_from_cache_ / (double)received_data_last_round_; + data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA + + (new_rate * (1 - MOVING_AVG_ALPHA)); + } + + // reset counters + received_bytes_ = 0; + packets_lost_ = 0; + losses_recovered_ = 0; + first_seq_in_round_ = highest_seq_received_; + + nack_on_last_round_ = false; + received_nacks_last_round_ = 0; + + received_packets_last_round_ = 0; + received_data_last_round_ = 0; + received_data_from_cache_ = 0; + sent_interests_last_round_ = 0; + sent_rtx_last_round_ = 0; + + rounds_++; +} + +void RTCState::updateReceivedBytes(const core::ContentObject &content_object) { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); +} + +void RTCState::updatePacketSize(const core::ContentObject &content_object) { + uint32_t pkt_size = + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) + + ((1 - MOVING_AVG_ALPHA) * pkt_size); +} + +void RTCState::updatePathStats(const core::ContentObject &content_object, + bool is_nack) { + // get packet path + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + + if (path_it == path_table_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + + // compute rtt + uint32_t seq = content_object.getName().getSuffix(); + uint64_t interest_sent_time = getInterestSentTime(seq); + if (interest_sent_time == 0) + return; // this should not happen, + // it means that we are processing an interest + // that is not pending + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint64_t RTT = now - interest_sent_time; + + path->insertRttSample(RTT); + + // compute OWD (the first part of the nack and data packet header are the + // same, so we cast to data data packet) + struct data_packet_t *packet = + (struct data_packet_t *)content_object.getPayload()->data(); + uint64_t sender_timestamp = packet->getTimestamp(); + int64_t OWD = now - sender_timestamp; + path->insertOwdSample(OWD); + + // compute IAT or set path to producer + if (!is_nack) { + // compute the iat only for the content packets + uint32_t segment_number = content_object.getName().getSuffix(); + path->computeInterArrivalGap(segment_number); + if (!path->pathToProducer()) received_data_from_cache_++; + } else { + path->receivedNack(); + } +} + +void RTCState::updateLossRate() { + loss_rate_ = 0.0; + residual_loss_rate_ = 0.0; + + uint32_t number_theorically_received_packets_ = + highest_seq_received_ - first_seq_in_round_; + + // in this case no new packet was recevied after the previuos round, avoid + // division by 0 + if (number_theorically_received_packets_ == 0) return; + + loss_rate_ = (double)((double)(packets_lost_) / + (double)number_theorically_received_packets_); + + residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / + (double)number_theorically_received_packets_); + + if (residual_loss_rate_ < 0) residual_loss_rate_ = 0; +} + +void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { + pending_interests_.erase(seq); + if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { + received_or_lost_packets_.erase(received_or_lost_packets_.begin()); + } + // notice that it may happen that a packet that we consider lost arrives after + // some time, in this case we simply overwrite the packet state. + received_or_lost_packets_[seq] = state; + + // keep track of the last packet received/lost + // without holes. + if (highest_seq_received_in_order_ < last_seq_nacked_) { + highest_seq_received_in_order_ = last_seq_nacked_; + } + + if ((highest_seq_received_in_order_ + 1) == seq) { + highest_seq_received_in_order_ = seq; + } else if (seq <= highest_seq_received_in_order_) { + // here we do nothing + } else if (seq > highest_seq_received_in_order_) { + // 1) there is a gap in the sequence so we do not update largest_in_seq_ + // 2) all the packets from largest_in_seq_ to seq are in + // received_or_lost_packets_ an we upate largest_in_seq_ + + for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { + if (received_or_lost_packets_.find(i) == + received_or_lost_packets_.end()) { + break; + } + // this packet is in order so we can update the + // highest_seq_received_in_order_ + highest_seq_received_in_order_ = i; + } + } +} + +void RTCState::setInitRttTimer(uint32_t wait){ + init_rtt_timer_->cancel(); + init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); + init_rtt_timer_->async_wait([this](std::error_code ec) { + if(ec) return; + checkInitRttTimer(); + }); +} + +void RTCState::checkInitRttTimer() { + if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){ + // we didn't received enough probes, restart + received_probes_ = 0; + rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); + rtt_probes_->sendProbes(); + setInitRttTimer(INIT_RTT_PROBE_RESTART); + return; + } + init_rtt_ = true; + main_path_->roundEnd(); + rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0); + rtt_probes_->sendProbes(); + + // init last_seq_nacked_. skip packets that may come from the cache + double prod_rate = getProducerRate(); + double rtt = (double)getRTT() / MILLI_IN_A_SEC; + double packet_size = getAveragePacketSize(); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); + last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; + + discovered_rtt_callback_(); +} + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h new file mode 100644 index 000000000..943a0a113 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2017-2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/name.h> +#include <protocols/rtc/probe_handler.h> +#include <protocols/rtc/rtc_data_path.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <map> +#include <set> + +namespace transport { + +namespace protocol { + +namespace rtc { + +enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN }; + +class RTCState : std::enable_shared_from_this<RTCState> { + public: + using DiscoveredRttCallback = std::function<void()>; + public: + RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, + DiscoveredRttCallback &&discovered_rtt_callback, + asio::io_service &io_service); + + ~RTCState(); + + // packet events + void onSendNewInterest(const core::Name *interest_name); + void onTimeout(uint32_t seq); + void onRetransmission(uint32_t seq); + void onDataPacketReceived(const core::ContentObject &content_object, + bool compute_stats); + void onNackPacketReceived(const core::ContentObject &nack, + bool compute_stats); + void onPacketLost(uint32_t seq); + void onPacketRecovered(uint32_t seq); + bool onProbePacketReceived(const core::ContentObject &probe); + + // protocol state + void onNewRound(double round_len, bool in_sync); + + // main path + uint32_t getProducerPath() const { + if (mainPathIsValid()) return main_path_->getPathId(); + return 0; + } + + // delay metrics + bool isRttDiscovered() const { + return init_rtt_; + } + + uint64_t getRTT() const { + if (mainPathIsValid()) return main_path_->getMinRtt(); + return 0; + } + void resetRttStats() { + if (mainPathIsValid()) main_path_->clearRtt(); + } + + double getQueuing() const { + if (mainPathIsValid()) return main_path_->getQueuingDealy(); + return 0.0; + } + double getIAT() const { + if (mainPathIsValid()) return main_path_->getInterArrivalGap(); + return 0.0; + } + + double getJitter() const { + if (mainPathIsValid()) return main_path_->getJitter(); + return 0.0; + } + + // pending interests + uint64_t getInterestSentTime(uint32_t seq) { + auto it = pending_interests_.find(seq); + if (it != pending_interests_.end()) return it->second; + return 0; + } + bool isPending(uint32_t seq) { + if (pending_interests_.find(seq) != pending_interests_.end()) return true; + return false; + } + uint32_t getPendingInterestNumber() const { + return pending_interests_.size(); + } + PacketState isReceivedOrLost(uint32_t seq) { + auto it = received_or_lost_packets_.find(seq); + if (it != received_or_lost_packets_.end()) return it->second; + return PacketState::UNKNOWN; + } + + // loss rate + double getLossRate() const { return loss_rate_; } + double getResidualLossRate() const { return residual_loss_rate_; } + uint32_t getHighestSeqReceivedInOrder() const { + return highest_seq_received_in_order_; + } + uint32_t getLostData() const { return packets_lost_; }; + uint32_t getRecoveredLosses() const { return losses_recovered_; } + + // generic stats + uint32_t getReceivedBytesInRound() const { return received_bytes_; } + uint32_t getReceivedNacksInRound() const { + return received_nacks_last_round_; + } + uint32_t getSentInterestInRound() const { return sent_interests_last_round_; } + uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; } + + // bandwidth/production metrics + double getAvailableBw() const { return 0.0; }; // TODO + double getProducerRate() const { return production_rate_; } + double getReceivedRate() const { return received_rate_; } + double getAveragePacketSize() const { return avg_packet_size_; } + + // nacks + uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; } + uint32_t getLastSeqNacked() const { return last_seq_nacked_; } + + // producer state + bool isProducerActive() const { return producer_is_active_; } + + // packets from cache + double getPacketFromCacheRatio() const { return data_from_cache_rate_; } + + std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() { + return pending_interests_.begin(); + } + std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() { + return pending_interests_.end(); + } + + private: + void initParams(); + + // update stats + void updateState(); + void updateReceivedBytes(const core::ContentObject &content_object); + void updatePacketSize(const core::ContentObject &content_object); + void updatePathStats(const core::ContentObject &content_object, bool is_nack); + void updateLossRate(); + + void addRecvOrLost(uint32_t seq, PacketState state); + + void setInitRttTimer(uint32_t wait); + void checkInitRttTimer(); + + bool mainPathIsValid() const { + if (main_path_ != nullptr) + return true; + else + return false; + } + + // packets counters (total) + uint32_t sent_interests_; + uint32_t sent_rtx_; + uint32_t received_data_; + uint32_t received_nacks_; + uint32_t received_timeouts_; + uint32_t received_probes_; + + // loss counters + int32_t packets_lost_; + int32_t losses_recovered_; + uint32_t first_seq_in_round_; + uint32_t highest_seq_received_; + uint32_t highest_seq_received_in_order_; + uint32_t last_seq_nacked_; // segment for which we got an oldNack + double loss_rate_; + double residual_loss_rate_; + + // bw counters + uint32_t received_bytes_; + double avg_packet_size_; + double production_rate_; // rate communicated by the producer using nacks + double received_rate_; // rate recevied by the consumer + + // nack counter + // the bool takes tracks only about the valid nacks (no rtx) and it is used to + // switch between the states. Instead received_nacks_last_round_ logs all the + // nacks for statistics + bool nack_on_last_round_; + uint32_t received_nacks_last_round_; + + // packets counter + uint32_t received_packets_last_round_; + uint32_t received_data_last_round_; + uint32_t received_data_from_cache_; + double data_from_cache_rate_; + uint32_t sent_interests_last_round_; + uint32_t sent_rtx_last_round_; + + // round conunters + uint32_t rounds_; + uint32_t rounds_without_nacks_; + uint32_t rounds_without_packets_; + + // init rtt + uint64_t first_interest_sent_; + + // producer state + bool + producer_is_active_; // the prodcuer is active if we receive some packets + uint32_t last_production_seq_; // last production seq received by the producer + uint64_t last_prod_update_; // timestamp of the last packets used to update + // stats from the producer + + // paths stats + std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_; + std::shared_ptr<RTCDataPath> main_path_; + + // packet received + // cache where to store info about the last MAX_CACHED_PACKETS + std::map<uint32_t, PacketState> received_or_lost_packets_; + + // pending interests + std::map<uint32_t, uint64_t> pending_interests_; + + // probes + std::shared_ptr<ProbeHandler> rtt_probes_; + bool init_rtt_; + std::unique_ptr<asio::steady_timer> init_rtt_timer_; + + // callbacks + DiscoveredRttCallback discovered_rtt_callback_; +}; + +} // namespace rtc + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc new file mode 100644 index 000000000..7a0803857 --- /dev/null +++ b/libtransport/src/protocols/rtc/trendline_estimator.cc @@ -0,0 +1,334 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// FROM +// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc + +#include "trendline_estimator.h" + +#include <math.h> + +#include <algorithm> +#include <string> + +namespace transport { + +namespace protocol { + +namespace rtc { + +// Parameters for linear least squares fit of regression line to noisy data. +constexpr double kDefaultTrendlineSmoothingCoeff = 0.9; +constexpr double kDefaultTrendlineThresholdGain = 4.0; +// const char kBweWindowSizeInPacketsExperiment[] = +// "WebRTC-BweWindowSizeInPackets"; + +/*size_t ReadTrendlineFilterWindowSize( + const WebRtcKeyValueConfig* key_value_config) { + std::string experiment_string = + key_value_config->Lookup(kBweWindowSizeInPacketsExperiment); + size_t window_size; + int parsed_values = + sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size); + if (parsed_values == 1) { + if (window_size > 1) + return window_size; + RTC_LOG(WARNING) << "Window size must be greater than 1."; + } + RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets" + " experiment from field trial string. Using default."; + return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize; +} +*/ + +OptionalDouble LinearFitSlope( + const std::deque<TrendlineEstimator::PacketTiming>& packets) { + // RTC_DCHECK(packets.size() >= 2); + // Compute the "center of mass". + double sum_x = 0; + double sum_y = 0; + for (const auto& packet : packets) { + sum_x += packet.arrival_time_ms; + sum_y += packet.smoothed_delay_ms; + } + double x_avg = sum_x / packets.size(); + double y_avg = sum_y / packets.size(); + // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2 + double numerator = 0; + double denominator = 0; + for (const auto& packet : packets) { + double x = packet.arrival_time_ms; + double y = packet.smoothed_delay_ms; + numerator += (x - x_avg) * (y - y_avg); + denominator += (x - x_avg) * (x - x_avg); + } + if (denominator == 0) return OptionalDouble(); + return OptionalDouble(numerator / denominator); +} + +OptionalDouble ComputeSlopeCap( + const std::deque<TrendlineEstimator::PacketTiming>& packets, + const TrendlineEstimatorSettings& settings) { + /*RTC_DCHECK(1 <= settings.beginning_packets && + settings.beginning_packets < packets.size()); + RTC_DCHECK(1 <= settings.end_packets && + settings.end_packets < packets.size()); + RTC_DCHECK(settings.beginning_packets + settings.end_packets <= + packets.size());*/ + TrendlineEstimator::PacketTiming early = packets[0]; + for (size_t i = 1; i < settings.beginning_packets; ++i) { + if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i]; + } + size_t late_start = packets.size() - settings.end_packets; + TrendlineEstimator::PacketTiming late = packets[late_start]; + for (size_t i = late_start + 1; i < packets.size(); ++i) { + if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i]; + } + if (late.arrival_time_ms - early.arrival_time_ms < 1) { + return OptionalDouble(); + } + return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) / + (late.arrival_time_ms - early.arrival_time_ms) + + settings.cap_uncertainty); +} + +constexpr double kMaxAdaptOffsetMs = 15.0; +constexpr double kOverUsingTimeThreshold = 10; +constexpr int kMinNumDeltas = 60; +constexpr int kDeltaCounterMax = 1000; + +//} // namespace + +constexpr char TrendlineEstimatorSettings::kKey[]; + +TrendlineEstimatorSettings::TrendlineEstimatorSettings( + /*const WebRtcKeyValueConfig* key_value_config*/) { + /*if (absl::StartsWith( + key_value_config->Lookup(kBweWindowSizeInPacketsExperiment), + "Enabled")) { + window_size = ReadTrendlineFilterWindowSize(key_value_config); + } + Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/ + window_size = kDefaultTrendlineWindowSize; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + + /*if (window_size < 10 || 200 < window_size) { + RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets"; + window_size = kDefaultTrendlineWindowSize; + } + if (enable_cap) { + if (beginning_packets < 1 || end_packets < 1 || + beginning_packets > window_size || end_packets > window_size) { + RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and " + << window_size; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + } + if (beginning_packets + end_packets > window_size) { + RTC_LOG(LS_WARNING) + << "Size of beginning plus end can't exceed the window size"; + enable_cap = false; + beginning_packets = end_packets = 0; + cap_uncertainty = 0.0; + } + if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) { + RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025"; + cap_uncertainty = 0.0; + } + }*/ +} + +/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() { + return StructParametersParser::Create("sort", &enable_sort, // + "cap", &enable_cap, // + "beginning_packets", + &beginning_packets, // + "end_packets", &end_packets, // + "cap_uncertainty", &cap_uncertainty, // + "window_size", &window_size); +}*/ + +TrendlineEstimator::TrendlineEstimator( + /*const WebRtcKeyValueConfig* key_value_config, + NetworkStatePredictor* network_state_predictor*/) + : settings_(), + smoothing_coef_(kDefaultTrendlineSmoothingCoeff), + threshold_gain_(kDefaultTrendlineThresholdGain), + num_of_deltas_(0), + first_arrival_time_ms_(-1), + accumulated_delay_(0), + smoothed_delay_(0), + delay_hist_(), + k_up_(0.0087), + k_down_(0.039), + overusing_time_threshold_(kOverUsingTimeThreshold), + threshold_(12.5), + prev_modified_trend_(NAN), + last_update_ms_(-1), + prev_trend_(0.0), + time_over_using_(-1), + overuse_counter_(0), + hypothesis_(BandwidthUsage::kBwNormal){ + // hypothesis_predicted_(BandwidthUsage::kBwNormal){//}, + // network_state_predictor_(network_state_predictor) { + /* RTC_LOG(LS_INFO) + << "Using Trendline filter for delay change estimation with settings " + << settings_.Parser()->Encode() << " and " + // << (network_state_predictor_ ? "injected" : "no") + << " network state predictor";*/ +} + +TrendlineEstimator::~TrendlineEstimator() {} + +void TrendlineEstimator::UpdateTrendline(double recv_delta_ms, + double send_delta_ms, + int64_t send_time_ms, + int64_t arrival_time_ms, + size_t packet_size) { + const double delta_ms = recv_delta_ms - send_delta_ms; + ++num_of_deltas_; + num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax); + if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms; + + // Exponential backoff filter. + accumulated_delay_ += delta_ms; + // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms, + // accumulated_delay_); + smoothed_delay_ = smoothing_coef_ * smoothed_delay_ + + (1 - smoothing_coef_) * accumulated_delay_; + // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms, + // smoothed_delay_); + + // Maintain packet window + delay_hist_.emplace_back( + static_cast<double>(arrival_time_ms - first_arrival_time_ms_), + smoothed_delay_, accumulated_delay_); + if (settings_.enable_sort) { + for (size_t i = delay_hist_.size() - 1; + i > 0 && + delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms; + --i) { + std::swap(delay_hist_[i], delay_hist_[i - 1]); + } + } + if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front(); + + // Simple linear regression. + double trend = prev_trend_; + if (delay_hist_.size() == settings_.window_size) { + // Update trend_ if it is possible to fit a line to the data. The delay + // trend can be seen as an estimate of (send_rate - capacity)/capacity. + // 0 < trend < 1 -> the delay increases, queues are filling up + // trend == 0 -> the delay does not change + // trend < 0 -> the delay decreases, queues are being emptied + OptionalDouble trendO = LinearFitSlope(delay_hist_); + if (trendO.has_value()) trend = trendO.value(); + if (settings_.enable_cap) { + OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_); + // We only use the cap to filter out overuse detections, not + // to detect additional underuses. + if (trend >= 0 && cap.has_value() && trend > cap.value()) { + trend = cap.value(); + } + } + } + // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend); + + Detect(trend, send_delta_ms, arrival_time_ms); +} + +void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms, + int64_t send_time_ms, int64_t arrival_time_ms, + size_t packet_size, bool calculated_deltas) { + if (calculated_deltas) { + UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms, + packet_size); + } + /*if (network_state_predictor_) { + hypothesis_predicted_ = network_state_predictor_->Update( + send_time_ms, arrival_time_ms, hypothesis_); + }*/ +} + +BandwidthUsage TrendlineEstimator::State() const { + return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_; +} + +void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) { + /*if (num_of_deltas_ < 2) { + hypothesis_ = BandwidthUsage::kBwNormal; + return; + }*/ + + const double modified_trend = + std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_; + prev_modified_trend_ = modified_trend; + // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend); + // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_); + if (modified_trend > threshold_) { + if (time_over_using_ == -1) { + // Initialize the timer. Assume that we've been + // over-using half of the time since the previous + // sample. + time_over_using_ = ts_delta / 2; + } else { + // Increment timer + time_over_using_ += ts_delta; + } + overuse_counter_++; + if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) { + if (trend >= prev_trend_) { + time_over_using_ = 0; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwOverusing; + } + } + } else if (modified_trend < -threshold_) { + time_over_using_ = -1; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwUnderusing; + } else { + time_over_using_ = -1; + overuse_counter_ = 0; + hypothesis_ = BandwidthUsage::kBwNormal; + } + prev_trend_ = trend; + UpdateThreshold(modified_trend, now_ms); +} + +void TrendlineEstimator::UpdateThreshold(double modified_trend, + int64_t now_ms) { + if (last_update_ms_ == -1) last_update_ms_ = now_ms; + + if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) { + // Avoid adapting the threshold to big latency spikes, caused e.g., + // by a sudden capacity drop. + last_update_ms_ = now_ms; + return; + } + + const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_; + const int64_t kMaxTimeDeltaMs = 100; + int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs); + threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms; + if (threshold_ < 6.f) threshold_ = 6.f; + if (threshold_ > 600.f) threshold_ = 600.f; + // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f); + last_update_ms_ = now_ms; +} + +} // namespace rtc + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h new file mode 100644 index 000000000..372acbc67 --- /dev/null +++ b/libtransport/src/protocols/rtc/trendline_estimator.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// FROM +// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h + +#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ +#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ + +#include <stddef.h> +#include <stdint.h> + +#include <algorithm> +#include <deque> +#include <memory> +#include <utility> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class OptionalDouble { + public: + OptionalDouble() : val(0), has_val(false){}; + OptionalDouble(double val) : val(val), has_val(true){}; + + double value() { return val; } + bool has_value() { return has_val; } + + private: + double val; + bool has_val; +}; + +enum class BandwidthUsage { + kBwNormal = 0, + kBwUnderusing = 1, + kBwOverusing = 2, + kLast +}; + +struct TrendlineEstimatorSettings { + static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings"; + static constexpr unsigned kDefaultTrendlineWindowSize = 20; + + // TrendlineEstimatorSettings() = delete; + TrendlineEstimatorSettings( + /*const WebRtcKeyValueConfig* key_value_config*/); + + // Sort the packets in the window. Should be redundant, + // but then almost no cost. + bool enable_sort = false; + + // Cap the trendline slope based on the minimum delay seen + // in the beginning_packets and end_packets respectively. + bool enable_cap = false; + unsigned beginning_packets = 7; + unsigned end_packets = 7; + double cap_uncertainty = 0.0; + + // Size (in packets) of the window. + unsigned window_size = kDefaultTrendlineWindowSize; + + // std::unique_ptr<StructParametersParser> Parser(); +}; + +class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ { + public: + TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config, + NetworkStatePredictor* network_state_predictor*/); + + ~TrendlineEstimator(); + + // Update the estimator with a new sample. The deltas should represent deltas + // between timestamp groups as defined by the InterArrival class. + void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms, + int64_t arrival_time_ms, size_t packet_size, + bool calculated_deltas); + + void UpdateTrendline(double recv_delta_ms, double send_delta_ms, + int64_t send_time_ms, int64_t arrival_time_ms, + size_t packet_size); + + BandwidthUsage State() const; + + struct PacketTiming { + PacketTiming(double arrival_time_ms, double smoothed_delay_ms, + double raw_delay_ms) + : arrival_time_ms(arrival_time_ms), + smoothed_delay_ms(smoothed_delay_ms), + raw_delay_ms(raw_delay_ms) {} + double arrival_time_ms; + double smoothed_delay_ms; + double raw_delay_ms; + }; + + private: + // friend class GoogCcStatePrinter; + void Detect(double trend, double ts_delta, int64_t now_ms); + + void UpdateThreshold(double modified_offset, int64_t now_ms); + + // Parameters. + TrendlineEstimatorSettings settings_; + const double smoothing_coef_; + const double threshold_gain_; + // Used by the existing threshold. + int num_of_deltas_; + // Keep the arrival times small by using the change from the first packet. + int64_t first_arrival_time_ms_; + // Exponential backoff filtering. + double accumulated_delay_; + double smoothed_delay_; + // Linear least squares regression. + std::deque<PacketTiming> delay_hist_; + + const double k_up_; + const double k_down_; + double overusing_time_threshold_; + double threshold_; + double prev_modified_trend_; + int64_t last_update_ms_; + double prev_trend_; + double time_over_using_; + int overuse_counter_; + BandwidthUsage hypothesis_; + // BandwidthUsage hypothesis_predicted_; + // NetworkStatePredictor* network_state_predictor_; + + // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator); +}; + +} // namespace rtc + +} // end namespace protocol + +} // end namespace transport +#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc new file mode 100644 index 000000000..611c39212 --- /dev/null +++ b/libtransport/src/protocols/transport_protocol.cc @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <protocols/transport_protocol.h> + +namespace transport { + +namespace protocol { + +using namespace interface; + +TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol) + : socket_(icn_socket), + reassembly_protocol_(reassembly_protocol), + index_manager_( + std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), + is_running_(false), + is_first_(false), + on_interest_retransmission_(VOID_HANDLER), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + on_payload_(VOID_HANDLER) { + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); +} + +int TransportProtocol::start() { + // If the protocol is already running, return otherwise set as running + if (is_running_) return -1; + + // Get all callbacks references + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &on_interest_retransmission_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &on_interest_timeout_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &on_interest_satisfied_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &on_content_object_input_); + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_); + socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + &on_payload_); + + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + + // Set it is the first time we schedule an interest + is_first_ = true; + + // Reset the protocol state machine + reset(); + // Schedule next interests + scheduleNextInterests(); + + is_first_ = false; + + // Set the protocol as running + is_running_ = true; + + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); + + // Not running anymore + is_running_ = false; + } + + return 0; +} + +void TransportProtocol::stop() { + is_running_ = false; + + if (!is_async_) { + portal_->stopEventsLoop(); + } else { + portal_->clear(); + } +} + +void TransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + + scheduleNextInterests(); + + portal_->runEventsLoop(); + + is_running_ = false; +} + +void TransportProtocol::onContentReassembled(std::error_code ec) { + stop(); + + if (!on_payload_) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload_->readSuccess(stats_->getBytesRecv()); + } else { + on_payload_->readError(ec); + } +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h new file mode 100644 index 000000000..124c57122 --- /dev/null +++ b/libtransport/src/protocols/transport_protocol.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/statistics.h> +#include <hicn/transport/utils/object_pool.h> +#include <implementation/socket.h> +#include <protocols/data_processing_events.h> +#include <protocols/indexer.h> +#include <protocols/reassembly.h> + +#include <atomic> + +namespace transport { + +namespace protocol { + +using namespace core; + +class IndexVerificationManager; + +using ReadCallback = interface::ConsumerSocket::ReadCallback; + +class TransportProtocolCallback { + virtual void onContentObject(const core::Interest &interest, + const core::ContentObject &content_object) = 0; + virtual void onTimeout(const core::Interest &interest) = 0; +}; + +class TransportProtocol : public core::Portal::ConsumerCallback, + public ContentObjectProcessingEventCallback { + static constexpr std::size_t interest_pool_size = 4096; + + friend class ManifestIndexManager; + + public: + TransportProtocol(implementation::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol); + + virtual ~TransportProtocol() = default; + + TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } + + virtual int start(); + + virtual void stop(); + + virtual void resume(); + + virtual void scheduleNextInterests() = 0; + + // Events generated by the indexing + virtual void onContentReassembled(std::error_code ec); + virtual void onPacketDropped(Interest &interest, + ContentObject &content_object) override = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0; + + protected: + // Consumer Callback + virtual void reset() = 0; + virtual void onContentObject(Interest &i, ContentObject &c) override = 0; + virtual void onTimeout(Interest::Ptr &&i) override = 0; + virtual void onError(std::error_code ec) override {} + + protected: + implementation::ConsumerSocket *socket_; + std::unique_ptr<Reassembly> reassembly_protocol_; + std::unique_ptr<IndexManager> index_manager_; + std::shared_ptr<core::Portal> portal_; + std::atomic<bool> is_running_; + // True if it si the first time we schedule an interest + std::atomic<bool> is_first_; + interface::TransportStatistics *stats_; + + // Callbacks + interface::ConsumerInterestCallback *on_interest_retransmission_; + interface::ConsumerInterestCallback *on_interest_output_; + interface::ConsumerInterestCallback *on_interest_timeout_; + interface::ConsumerInterestCallback *on_interest_satisfied_; + interface::ConsumerContentObjectCallback *on_content_object_input_; + interface::ConsumerContentObjectCallback *on_content_object_; + interface::ConsumerTimerCallback *stats_summary_; + ReadCallback *on_payload_; + + bool is_async_; +}; + +} // end namespace protocol +} // end namespace transport |