aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/fec_base.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/fec_base.h')
-rw-r--r--libtransport/src/protocols/fec_base.h86
1 files changed, 78 insertions, 8 deletions
diff --git a/libtransport/src/protocols/fec_base.h b/libtransport/src/protocols/fec_base.h
index a1929d85e..28f6a820a 100644
--- a/libtransport/src/protocols/fec_base.h
+++ b/libtransport/src/protocols/fec_base.h
@@ -15,6 +15,7 @@
#pragma once
+#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/errors/not_implemented_exception.h>
@@ -25,12 +26,67 @@ namespace protocol {
namespace fec {
-using buffer = typename utils::MemBuf::Ptr;
-using BufferArray = std::vector<std::pair<uint32_t, buffer>>;
+using buffer = utils::MemBuf::Ptr;
+
+class FECBufferInfo {
+ public:
+ FECBufferInfo()
+ : index_(~0), metadata_(~0), received_(false), buffer_(nullptr) {}
+
+ template <typename T>
+ FECBufferInfo(uint32_t index, uint32_t metadata, T &&buffer)
+ : index_(index),
+ metadata_(metadata),
+ received_(false),
+ buffer_(std::forward<T>(buffer)) {}
+
+ // Getters
+ uint32_t getIndex() const { return index_; }
+
+ uint32_t getMetadata() const { return metadata_; }
+
+ bool getReceived() const { return received_; }
+
+ buffer &getBuffer() { return buffer_; }
+
+ // Setters
+ void setReceived() { received_ = true; }
+
+ FECBufferInfo &setIndex(uint32_t index) {
+ index_ = index;
+ return *this;
+ }
+
+ FECBufferInfo &setMetadata(uint32_t metadata) {
+ metadata_ = metadata;
+ return *this;
+ }
+
+ FECBufferInfo &setBuffer(buffer &buffer) {
+ buffer_ = buffer;
+ return *this;
+ }
+
+ FECBufferInfo &setBuffer(buffer &&buffer) {
+ buffer_ = std::move(buffer);
+ return *this;
+ }
+
+ private:
+ uint32_t index_;
+ uint32_t metadata_;
+ bool received_;
+ buffer buffer_;
+};
+
+using BufferArray = typename std::vector<FECBufferInfo>;
class FECBase {
public:
- virtual ~FECBase() = default;
+ static inline uint32_t INVALID_METADATA = ~0;
+ static inline uint32_t INVALID_INDEX = ~0;
+
+ virtual ~FECBase() {}
/**
* Callback to be called after the encode or the decode operations. In the
* former case it will contain the symbols, while in the latter the sources.
@@ -45,8 +101,10 @@ class FECBase {
/**
* @brief Get size of FEC header.
+ * the fec header size may be different if a packet is a data packet or a FEC
+ * packet
*/
- virtual std::size_t getFecHeaderSize() = 0;
+ virtual std::size_t getFecHeaderSize(bool isFEC) = 0;
/**
* Set callback to call after packet encoding / decoding
@@ -64,11 +122,21 @@ class FECBase {
buffer_callback_ = buffer_callback;
}
+ /**
+ * Creates the timer to flush packets. So far needed only if using Rely and
+ * want to avoid expired packets blocked by missing pkts to wait for a new
+ * packet to arrive and trigger the flush
+ */
+ void setIOService(asio::io_service &io_service) {
+ flush_timer_ = std::make_unique<asio::steady_timer>(io_service);
+ }
+
virtual void reset() = 0;
protected:
PacketsReady fec_callback_{0};
BufferRequested buffer_callback_{0};
+ std::unique_ptr<asio::steady_timer> flush_timer_;
};
/**
@@ -80,8 +148,9 @@ class ProducerFEC : public virtual FECBase {
/**
* Producers will call this function upon production of a new packet.
*/
- virtual void onPacketProduced(core::ContentObject &content_object,
- uint32_t offset) = 0;
+ virtual void onPacketProduced(
+ core::ContentObject &content_object, uint32_t offset,
+ uint32_t metadata = FECBase::INVALID_METADATA) = 0;
};
/**
@@ -95,9 +164,10 @@ class ConsumerFEC : public virtual FECBase {
* Consumers will call this function when they receive a data packet
*/
virtual void onDataPacket(core::ContentObject &content_object,
- uint32_t offset) = 0;
+ uint32_t offset,
+ uint32_t metadata = FECBase::INVALID_METADATA) = 0;
};
} // namespace fec
} // namespace protocol
-} // namespace transport \ No newline at end of file
+} // namespace transport