diff options
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.h')
-rw-r--r-- | libtransport/src/protocols/transport_protocol.h | 74 |
1 files changed, 59 insertions, 15 deletions
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h index 124c57122..1008a238b 100644 --- a/libtransport/src/protocols/transport_protocol.h +++ b/libtransport/src/protocols/transport_protocol.h @@ -21,9 +21,11 @@ #include <hicn/transport/utils/object_pool.h> #include <implementation/socket.h> #include <protocols/data_processing_events.h> +#include <protocols/fec_base.h> #include <protocols/indexer.h> #include <protocols/reassembly.h> +#include <array> #include <atomic> namespace transport { @@ -36,12 +38,6 @@ class IndexVerificationManager; using ReadCallback = interface::ConsumerSocket::ReadCallback; -class TransportProtocolCallback { - virtual void onContentObject(const core::Interest &interest, - const core::ContentObject &content_object) = 0; - virtual void onTimeout(const core::Interest &interest) = 0; -}; - class TransportProtocol : public core::Portal::ConsumerCallback, public ContentObjectProcessingEventCallback { static constexpr std::size_t interest_pool_size = 4096; @@ -50,7 +46,7 @@ class TransportProtocol : public core::Portal::ConsumerCallback, public: TransportProtocol(implementation::ConsumerSocket *icn_socket, - Reassembly *reassembly_protocol); + Indexer *indexer, Reassembly *reassembly); virtual ~TransportProtocol() = default; @@ -62,27 +58,70 @@ class TransportProtocol : public core::Portal::ConsumerCallback, virtual void resume(); + /** + * @brief Get the size of any additional header added by the specific + * transport implementation. + * + * @return The header length in bytes. + */ + virtual std::size_t transportHeaderLength() { return 0; } + virtual void scheduleNextInterests() = 0; // Events generated by the indexing virtual void onContentReassembled(std::error_code ec); virtual void onPacketDropped(Interest &interest, - ContentObject &content_object) override = 0; + ContentObject &content_object, + const std::error_code &ec) override = 0; virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0; protected: + virtual void onContentObjectReceived(Interest &i, ContentObject &c, + std::error_code &ec) = 0; + virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0; + + virtual void sendInterest(const Name &interest_name, + std::array<uint32_t, MAX_AGGREGATED_INTEREST> + *additional_suffixes = nullptr, + uint32_t len = 0); + + template <typename FECHandler, typename AllocatorHandler> + void enableFEC(FECHandler &&fec_handler, + AllocatorHandler &&allocator_handler) { + if (!fec_decoder_) { + // Try to get FEC from environment + if (const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE")) { + LOG(INFO) << "Using FEC " << fec_str; + fec_type_ = fec::FECUtils::fecTypeFromString(fec_str); + } + + if (fec_type_ == fec::FECType::UNKNOWN) { + return; + } + + fec_decoder_ = fec::FECUtils::getDecoder( + fec_type_, indexer_verifier_->getFirstSuffix()); + fec_decoder_->setFECCallback(std::forward<FECHandler>(fec_handler)); + fec_decoder_->setBufferCallback( + std::forward<AllocatorHandler>(allocator_handler)); + indexer_verifier_->enableFec(fec_type_); + } + } + + virtual void reset(); + + private: // Consumer Callback - virtual void reset() = 0; - virtual void onContentObject(Interest &i, ContentObject &c) override = 0; - virtual void onTimeout(Interest::Ptr &&i) override = 0; - virtual void onError(std::error_code ec) override {} + void onContentObject(Interest &i, ContentObject &c) override; + void onTimeout(Interest::Ptr &i, const Name &n) override; + void onError(std::error_code ec) override {} protected: implementation::ConsumerSocket *socket_; - std::unique_ptr<Reassembly> reassembly_protocol_; - std::unique_ptr<IndexManager> index_manager_; + std::unique_ptr<Indexer> indexer_verifier_; + std::unique_ptr<Reassembly> reassembly_; + std::unique_ptr<fec::ConsumerFEC> fec_decoder_; std::shared_ptr<core::Portal> portal_; - std::atomic<bool> is_running_; // True if it si the first time we schedule an interest std::atomic<bool> is_first_; interface::TransportStatistics *stats_; @@ -98,6 +137,11 @@ class TransportProtocol : public core::Portal::ConsumerCallback, ReadCallback *on_payload_; bool is_async_; + + fec::FECType fec_type_; + + private: + std::atomic<bool> is_running_; }; } // end namespace protocol |