aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/transport_protocol.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.h')
-rw-r--r--libtransport/src/protocols/transport_protocol.h74
1 files changed, 59 insertions, 15 deletions
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h
index 124c57122..1008a238b 100644
--- a/libtransport/src/protocols/transport_protocol.h
+++ b/libtransport/src/protocols/transport_protocol.h
@@ -21,9 +21,11 @@
#include <hicn/transport/utils/object_pool.h>
#include <implementation/socket.h>
#include <protocols/data_processing_events.h>
+#include <protocols/fec_base.h>
#include <protocols/indexer.h>
#include <protocols/reassembly.h>
+#include <array>
#include <atomic>
namespace transport {
@@ -36,12 +38,6 @@ class IndexVerificationManager;
using ReadCallback = interface::ConsumerSocket::ReadCallback;
-class TransportProtocolCallback {
- virtual void onContentObject(const core::Interest &interest,
- const core::ContentObject &content_object) = 0;
- virtual void onTimeout(const core::Interest &interest) = 0;
-};
-
class TransportProtocol : public core::Portal::ConsumerCallback,
public ContentObjectProcessingEventCallback {
static constexpr std::size_t interest_pool_size = 4096;
@@ -50,7 +46,7 @@ class TransportProtocol : public core::Portal::ConsumerCallback,
public:
TransportProtocol(implementation::ConsumerSocket *icn_socket,
- Reassembly *reassembly_protocol);
+ Indexer *indexer, Reassembly *reassembly);
virtual ~TransportProtocol() = default;
@@ -62,27 +58,70 @@ class TransportProtocol : public core::Portal::ConsumerCallback,
virtual void resume();
+ /**
+ * @brief Get the size of any additional header added by the specific
+ * transport implementation.
+ *
+ * @return The header length in bytes.
+ */
+ virtual std::size_t transportHeaderLength() { return 0; }
+
virtual void scheduleNextInterests() = 0;
// Events generated by the indexing
virtual void onContentReassembled(std::error_code ec);
virtual void onPacketDropped(Interest &interest,
- ContentObject &content_object) override = 0;
+ ContentObject &content_object,
+ const std::error_code &ec) override = 0;
virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0;
protected:
+ virtual void onContentObjectReceived(Interest &i, ContentObject &c,
+ std::error_code &ec) = 0;
+ virtual void onInterestTimeout(Interest::Ptr &i, const Name &n) = 0;
+
+ virtual void sendInterest(const Name &interest_name,
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST>
+ *additional_suffixes = nullptr,
+ uint32_t len = 0);
+
+ template <typename FECHandler, typename AllocatorHandler>
+ void enableFEC(FECHandler &&fec_handler,
+ AllocatorHandler &&allocator_handler) {
+ if (!fec_decoder_) {
+ // Try to get FEC from environment
+ if (const char *fec_str = std::getenv("TRANSPORT_FEC_TYPE")) {
+ LOG(INFO) << "Using FEC " << fec_str;
+ fec_type_ = fec::FECUtils::fecTypeFromString(fec_str);
+ }
+
+ if (fec_type_ == fec::FECType::UNKNOWN) {
+ return;
+ }
+
+ fec_decoder_ = fec::FECUtils::getDecoder(
+ fec_type_, indexer_verifier_->getFirstSuffix());
+ fec_decoder_->setFECCallback(std::forward<FECHandler>(fec_handler));
+ fec_decoder_->setBufferCallback(
+ std::forward<AllocatorHandler>(allocator_handler));
+ indexer_verifier_->enableFec(fec_type_);
+ }
+ }
+
+ virtual void reset();
+
+ private:
// Consumer Callback
- virtual void reset() = 0;
- virtual void onContentObject(Interest &i, ContentObject &c) override = 0;
- virtual void onTimeout(Interest::Ptr &&i) override = 0;
- virtual void onError(std::error_code ec) override {}
+ void onContentObject(Interest &i, ContentObject &c) override;
+ void onTimeout(Interest::Ptr &i, const Name &n) override;
+ void onError(std::error_code ec) override {}
protected:
implementation::ConsumerSocket *socket_;
- std::unique_ptr<Reassembly> reassembly_protocol_;
- std::unique_ptr<IndexManager> index_manager_;
+ std::unique_ptr<Indexer> indexer_verifier_;
+ std::unique_ptr<Reassembly> reassembly_;
+ std::unique_ptr<fec::ConsumerFEC> fec_decoder_;
std::shared_ptr<core::Portal> portal_;
- std::atomic<bool> is_running_;
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
interface::TransportStatistics *stats_;
@@ -98,6 +137,11 @@ class TransportProtocol : public core::Portal::ConsumerCallback,
ReadCallback *on_payload_;
bool is_async_;
+
+ fec::FECType fec_type_;
+
+ private:
+ std::atomic<bool> is_running_;
};
} // end namespace protocol