diff options
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.h')
-rw-r--r-- | libtransport/src/protocols/transport_protocol.h | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h new file mode 100644 index 000000000..e71992561 --- /dev/null +++ b/libtransport/src/protocols/transport_protocol.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/statistics.h> +#include <hicn/transport/utils/object_pool.h> +#include <protocols/data_processing_events.h> +#include <protocols/fec_base.h> +#include <protocols/indexer.h> +#include <protocols/protocol.h> +#include <protocols/reassembly.h> + +#include <array> +#include <atomic> + +namespace transport { + +namespace protocol { + +using namespace core; + +class IndexVerificationManager; + +using ReadCallback = interface::ConsumerSocket::ReadCallback; + +class TransportProtocol + : public ContentObjectProcessingEventCallback, + public Protocol, + public std::enable_shared_from_this<TransportProtocol> { + static constexpr std::size_t interest_pool_size = 4096; + + friend class ManifestIndexManager; + + public: + TransportProtocol(implementation::ConsumerSocket *icn_socket, + Indexer *indexer, Reassembly *reassembly); + + virtual ~TransportProtocol(); + + virtual int start(); + + using Protocol::stop; + + virtual void resume(); + + /** + * @brief Get the size of any additional header added by the specific + * transport implementation. + * + * @return The header length in bytes. + */ + virtual std::size_t transportHeaderLength(bool isFEC) { return 0; } + + virtual void scheduleNextInterests() = 0; + + // Events generated by the indexing + virtual void onContentReassembled(const std::error_code &ec); + virtual void onPacketDropped(Interest &interest, + ContentObject &content_object, + const std::error_code &ec) override = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0; + + protected: + virtual void onContentObjectReceived(Interest &i, ContentObject &c, + std::error_code &ec) = 0; + virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0; + + virtual void sendInterest(const Name &interest_name, + std::array<uint32_t, MAX_AGGREGATED_INTEREST> + *additional_suffixes = nullptr, + uint32_t len = 0); + + template <typename FECHandler, typename AllocatorHandler> + void enableFEC(FECHandler &&fec_handler, + AllocatorHandler &&allocator_handler) { + if (!fec_decoder_) { + // Try to get FEC from environment + const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE"); + if (fec_str && (fec_type_ == fec::FECType::UNKNOWN)) { + LOG(INFO) << "Using FEC " << fec_str; + fec_type_ = fec::FECUtils::fecTypeFromString(fec_str); + } + + if (fec_type_ == fec::FECType::UNKNOWN) { + return; + } + + fec_decoder_ = fec::FECUtils::getDecoder( + fec_type_, indexer_verifier_->getFirstSuffix()); + fec_decoder_->setFECCallback(std::forward<FECHandler>(fec_handler)); + fec_decoder_->setBufferCallback( + std::forward<AllocatorHandler>(allocator_handler)); + indexer_verifier_->enableFec(fec_type_); + } + } + + virtual void reset(); + + private: + // Consumer Callback + void onContentObject(Interest &i, ContentObject &c) override; + void onTimeout(Interest::Ptr &i, const Name &n) override; + void onError(const std::error_code &ec) override; + + protected: + implementation::ConsumerSocket *socket_; + std::unique_ptr<Indexer> indexer_verifier_; + std::unique_ptr<Reassembly> reassembly_; + std::unique_ptr<fec::ConsumerFEC> fec_decoder_; + // True if it si the first time we schedule an interest + std::atomic<bool> is_first_; + interface::TransportStatistics *stats_; + + // Callbacks + interface::ConsumerInterestCallback *on_interest_retransmission_; + interface::ConsumerInterestCallback *on_interest_output_; + interface::ConsumerInterestCallback *on_interest_timeout_; + interface::ConsumerInterestCallback *on_interest_satisfied_; + interface::ConsumerContentObjectCallback *on_content_object_input_; + interface::ConsumerContentObjectCallback *on_content_object_; + interface::ConsumerTimerCallback *stats_summary_; + interface::StrategyCallback *on_fwd_strategy_; + interface::StrategyCallback *on_rec_strategy_; + ReadCallback *on_payload_; + + bool is_async_; + + fec::FECType fec_type_; + + // Signer for aggregated interests + std::shared_ptr<auth::Signer> signer_; +}; + +} // end namespace protocol +} // end namespace transport |