/* * Copyright (c) 2017-2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include namespace transport { namespace protocol { using namespace core; class IndexVerificationManager; using ReadCallback = interface::ConsumerSocket::ReadCallback; class TransportProtocol : public core::Portal::ConsumerCallback, public ContentObjectProcessingEventCallback { static constexpr std::size_t interest_pool_size = 4096; friend class ManifestIndexManager; public: TransportProtocol(implementation::ConsumerSocket *icn_socket, Indexer *indexer, Reassembly *reassembly); virtual ~TransportProtocol() = default; TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } virtual int start(); virtual void stop(); virtual void resume(); /** * @brief Get the size of any additional header added by the specific * transport implementation. * * @return The header length in bytes. */ virtual std::size_t transportHeaderLength() { return 0; } virtual void scheduleNextInterests() = 0; // Events generated by the indexing virtual void onContentReassembled(std::error_code ec); virtual void onPacketDropped(Interest &interest, ContentObject &content_object, const std::error_code &ec) override = 0; virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0; protected: virtual void onContentObjectReceived(Interest &i, ContentObject &c, std::error_code &ec) = 0; virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0; virtual void sendInterest(const Name &interest_name, std::array *additional_suffixes = nullptr, uint32_t len = 0); template void enableFEC(FECHandler &&fec_handler, AllocatorHandler &&allocator_handler) { if (!fec_decoder_) { // Try to get FEC from environment if (const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE")) { LOG(INFO) << "Using FEC " << fec_str; fec_type_ = fec::FECUtils::fecTypeFromString(fec_str); } if (fec_type_ == fec::FECType::UNKNOWN) { return; } fec_decoder_ = fec::FECUtils::getDecoder( fec_type_, indexer_verifier_->getFirstSuffix()); fec_decoder_->setFECCallback(std::forward(fec_handler)); fec_decoder_->setBufferCallback( std::forward(allocator_handler)); indexer_verifier_->enableFec(fec_type_); } } virtual void reset(); private: // Consumer Callback void onContentObject(Interest &i, ContentObject &c) override; void onTimeout(Interest::Ptr &i, const Name &n) override; void onError(std::error_code ec) override {} protected: implementation::ConsumerSocket *socket_; std::unique_ptr indexer_verifier_; std::unique_ptr reassembly_; std::unique_ptr fec_decoder_; std::shared_ptr portal_; // True if it si the first time we schedule an interest std::atomic is_first_; interface::TransportStatistics *stats_; // Callbacks interface::ConsumerInterestCallback *on_interest_retransmission_; interface::ConsumerInterestCallback *on_interest_output_; interface::ConsumerInterestCallback *on_interest_timeout_; interface::ConsumerInterestCallback *on_interest_satisfied_; interface::ConsumerContentObjectCallback *on_content_object_input_; interface::ConsumerContentObjectCallback *on_content_object_; interface::ConsumerTimerCallback *stats_summary_; ReadCallback *on_payload_; bool is_async_; fec::FECType fec_type_; private: std::atomic is_running_; }; } // end namespace protocol } // end namespace transport