/* * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include namespace transport { namespace protocol { using namespace core; class IndexVerificationManager; using ReadCallback = interface::ConsumerSocket::ReadCallback; class TransportProtocol : public ContentObjectProcessingEventCallback, public Protocol, public std::enable_shared_from_this { static constexpr std::size_t interest_pool_size = 4096; friend class ManifestIndexManager; public: TransportProtocol(implementation::ConsumerSocket *icn_socket, Indexer *indexer, Reassembly *reassembly); virtual ~TransportProtocol(); virtual int start(); using Protocol::stop; virtual void resume(); /** * @brief Get the size of any additional header added by the specific * transport implementation. * * @return The header length in bytes. */ virtual std::size_t transportHeaderLength(bool isFEC) { return 0; } virtual void scheduleNextInterests() = 0; // Events generated by the indexing virtual void onContentReassembled(const std::error_code &ec); virtual void onPacketDropped(Interest &interest, ContentObject &content_object, const std::error_code &ec) override = 0; virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0; protected: virtual void onContentObjectReceived(Interest &i, ContentObject &c, std::error_code &ec) = 0; virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0; virtual void sendInterest(const Name &interest_name, std::array *additional_suffixes = nullptr, uint32_t len = 0); template void enableFEC(FECHandler &&fec_handler, AllocatorHandler &&allocator_handler) { if (!fec_decoder_) { // Try to get FEC from environment const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE"); if (fec_str && (fec_type_ == fec::FECType::UNKNOWN)) { LOG(INFO) << "Using FEC " << fec_str; fec_type_ = fec::FECUtils::fecTypeFromString(fec_str); } if (fec_type_ == fec::FECType::UNKNOWN) { return; } fec_decoder_ = fec::FECUtils::getDecoder( fec_type_, indexer_verifier_->getFirstSuffix()); fec_decoder_->setFECCallback(std::forward(fec_handler)); fec_decoder_->setBufferCallback( std::forward(allocator_handler)); indexer_verifier_->enableFec(fec_type_); } } virtual void reset(); private: // Consumer Callback void onContentObject(Interest &i, ContentObject &c) override; void onTimeout(Interest::Ptr &i, const Name &n) override; void onError(const std::error_code &ec) override; protected: implementation::ConsumerSocket *socket_; std::unique_ptr indexer_verifier_; std::unique_ptr reassembly_; std::unique_ptr fec_decoder_; // True if it si the first time we schedule an interest std::atomic is_first_; interface::TransportStatistics *stats_; // Callbacks interface::ConsumerInterestCallback *on_interest_retransmission_; interface::ConsumerInterestCallback *on_interest_output_; interface::ConsumerInterestCallback *on_interest_timeout_; interface::ConsumerInterestCallback *on_interest_satisfied_; interface::ConsumerContentObjectCallback *on_content_object_input_; interface::ConsumerContentObjectCallback *on_content_object_; interface::ConsumerTimerCallback *stats_summary_; interface::StrategyCallback *on_fwd_strategy_; interface::StrategyCallback *on_rec_strategy_; ReadCallback *on_payload_; bool is_async_; fec::FECType fec_type_; // Signer for aggregated interests std::shared_ptr signer_; }; } // end namespace protocol } // end namespace transport