aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/transport_protocol.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.h')
-rw-r--r--libtransport/src/protocols/transport_protocol.h150
1 files changed, 150 insertions, 0 deletions
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h
new file mode 100644
index 000000000..e71992561
--- /dev/null
+++ b/libtransport/src/protocols/transport_protocol.h
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/statistics.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <protocols/data_processing_events.h>
+#include <protocols/fec_base.h>
+#include <protocols/indexer.h>
+#include <protocols/protocol.h>
+#include <protocols/reassembly.h>
+
+#include <array>
+#include <atomic>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class IndexVerificationManager;
+
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
+class TransportProtocol
+ : public ContentObjectProcessingEventCallback,
+ public Protocol,
+ public std::enable_shared_from_this<TransportProtocol> {
+ static constexpr std::size_t interest_pool_size = 4096;
+
+ friend class ManifestIndexManager;
+
+ public:
+ TransportProtocol(implementation::ConsumerSocket *icn_socket,
+ Indexer *indexer, Reassembly *reassembly);
+
+ virtual ~TransportProtocol();
+
+ virtual int start();
+
+ using Protocol::stop;
+
+ virtual void resume();
+
+ /**
+ * @brief Get the size of any additional header added by the specific
+ * transport implementation.
+ *
+ * @return The header length in bytes.
+ */
+ virtual std::size_t transportHeaderLength(bool isFEC) { return 0; }
+
+ virtual void scheduleNextInterests() = 0;
+
+ // Events generated by the indexing
+ virtual void onContentReassembled(const std::error_code &ec);
+ virtual void onPacketDropped(Interest &interest,
+ ContentObject &content_object,
+ const std::error_code &ec) override = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0;
+
+ protected:
+ virtual void onContentObjectReceived(Interest &i, ContentObject &c,
+ std::error_code &ec) = 0;
+ virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0;
+
+ virtual void sendInterest(const Name &interest_name,
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST>
+ *additional_suffixes = nullptr,
+ uint32_t len = 0);
+
+ template <typename FECHandler, typename AllocatorHandler>
+ void enableFEC(FECHandler &&fec_handler,
+ AllocatorHandler &&allocator_handler) {
+ if (!fec_decoder_) {
+ // Try to get FEC from environment
+ const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE");
+ if (fec_str && (fec_type_ == fec::FECType::UNKNOWN)) {
+ LOG(INFO) << "Using FEC " << fec_str;
+ fec_type_ = fec::FECUtils::fecTypeFromString(fec_str);
+ }
+
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return;
+ }
+
+ fec_decoder_ = fec::FECUtils::getDecoder(
+ fec_type_, indexer_verifier_->getFirstSuffix());
+ fec_decoder_->setFECCallback(std::forward<FECHandler>(fec_handler));
+ fec_decoder_->setBufferCallback(
+ std::forward<AllocatorHandler>(allocator_handler));
+ indexer_verifier_->enableFec(fec_type_);
+ }
+ }
+
+ virtual void reset();
+
+ private:
+ // Consumer Callback
+ void onContentObject(Interest &i, ContentObject &c) override;
+ void onTimeout(Interest::Ptr &i, const Name &n) override;
+ void onError(const std::error_code &ec) override;
+
+ protected:
+ implementation::ConsumerSocket *socket_;
+ std::unique_ptr<Indexer> indexer_verifier_;
+ std::unique_ptr<Reassembly> reassembly_;
+ std::unique_ptr<fec::ConsumerFEC> fec_decoder_;
+ // True if it si the first time we schedule an interest
+ std::atomic<bool> is_first_;
+ interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ interface::StrategyCallback *on_fwd_strategy_;
+ interface::StrategyCallback *on_rec_strategy_;
+ ReadCallback *on_payload_;
+
+ bool is_async_;
+
+ fec::FECType fec_type_;
+
+ // Signer for aggregated interests
+ std::shared_ptr<auth::Signer> signer_;
+};
+
+} // end namespace protocol
+} // end namespace transport