summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/fec/rely.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/fec/rely.cc')
-rw-r--r--libtransport/src/protocols/fec/rely.cc103
1 files changed, 66 insertions, 37 deletions
diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc
index 7a30a62e2..d4d98a90b 100644
--- a/libtransport/src/protocols/fec/rely.cc
+++ b/libtransport/src/protocols/fec/rely.cc
@@ -23,38 +23,39 @@ namespace transport {
namespace protocol {
namespace fec {
-RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset)
+RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t /* seq_offset */)
: RelyBase(k, n) {
configure(kmtu, ktimeout, kmax_stream_size);
set_repair_trigger(k_, n_ - k_, n_ - k_);
}
void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
- uint32_t offset) {
+ uint32_t offset, uint32_t metadata) {
// Get pointer to payload, leaving space to insert FEC header.
// TODO Check if this additional header is really needed.
- auto data = content_object.writableData() + offset - sizeof(fec_header);
- auto length = content_object.length() - offset + sizeof(fec_header);
+ auto data = content_object.writableData() + offset - sizeof(fec_metadata);
+ auto length = content_object.length() - offset + sizeof(fec_metadata);
// Check packet length does not exceed maximum length supported by the
// encoder (otherwise segmentation would take place).
- assert(length < max_packet_bytes());
+ DCHECK(length < max_packet_bytes());
DLOG_IF(INFO, VLOG_IS_ON(4))
- << "Encoding packet of length " << length - sizeof(fec_header);
+ << "Encoding packet of length " << length - sizeof(fec_metadata);
- // Get the suffix. With rely we need to write it in the fec_header in order to
- // be able to recognize the seq number upon recovery.
+ // Get the suffix. With rely we need to write it in the fec_metadata in order
+ // to be able to recognize the seq number upon recovery.
auto suffix = content_object.getName().getSuffix();
DLOG_IF(INFO, VLOG_IS_ON(4)) << "Producing packet " << suffix
<< " (index == " << current_index_ << ")";
- // Consume payload. Add fec_header in front before feeding payload to encoder,
- // and copy original content of packet
- fec_header *h = reinterpret_cast<fec_header *>(data);
- fec_header copy = *h;
+ // Consume payload. Add fec_metadata in front before feeding payload to
+ // encoder, and copy original content of packet
+ fec_metadata *h = reinterpret_cast<fec_metadata *>(data);
+ fec_metadata copy = *h;
h->setSeqNumberBase(suffix);
+ h->setMetadataBase(metadata);
auto packets = consume(data, length, getCurrentTime());
- assert(packets == 1);
+ DCHECK(packets == 1);
// Update packet counter
current_index_ += packets;
@@ -62,7 +63,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
// Restore original packet content and increment data pointer to the correct
// position
*h = copy;
- data += sizeof(fec_header);
+ data += sizeof(fec_metadata);
// Check position of this packet inside N size block
auto i = current_index_ % n_;
@@ -74,24 +75,24 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
// TODO Optimize it by copying only the RELY header
// Be sure encoder can produce
- assert(can_produce());
+ DCHECK(can_produce());
// Check new payload size and make sure it fits in packet buffer
auto new_payload_size = produce_bytes();
int difference = new_payload_size - length;
- assert(difference > 0);
- assert(content_object.ensureCapacity(difference));
+ DCHECK(difference > 0);
+ DCHECK(content_object.ensureCapacity(difference));
// Update length
DLOG_IF(INFO, VLOG_IS_ON(4)) << "The packet length will be incremented by "
- << difference + sizeof(fec_header);
- content_object.append(difference + sizeof(fec_header));
+ << difference + sizeof(fec_metadata);
+ content_object.append(difference + sizeof(fec_metadata));
content_object.updateLength();
// Make sure we got a source packet, otherwise we would put a repair symbol
// in a source packet
- assert(rely::packet_is_systematic(produce_data()));
+ DCHECK(rely::packet_is_systematic(produce_data()));
// Copy rely packet replacing old source packet.
std::memcpy(data, produce_data(), new_payload_size);
@@ -111,7 +112,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
while (can_produce()) {
// The current index MUST be k_, because we enforce n - k repair to be
// produced after k sources
- assert(current_index_ == k_);
+ DCHECK(current_index_ == k_);
buffer packet;
if (!buffer_callback_) {
@@ -130,7 +131,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
std::memcpy(packet->writableData(), produce_data(), produce_bytes());
// Push symbol in repair_packets
- packets_.emplace_back(0, std::move(packet));
+ packets_.emplace_back(0, metadata, std::move(packet));
// Advance the encoder
produce_next();
@@ -143,7 +144,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object,
// If we have generated repair symbols, let's notify caller via the installed
// callback
if (packets_.size()) {
- assert(packets_.size() == n_ - k_);
+ DCHECK(packets_.size() == n_ - k_);
fec_callback_(packets_);
packets_.clear();
current_index_ = 0;
@@ -156,7 +157,7 @@ RelyDecoder::RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset)
}
void RelyDecoder::onDataPacket(core::ContentObject &content_object,
- uint32_t offset) {
+ uint32_t offset, uint32_t metadata) {
// Adjust pointers to point to packet payload
auto data = content_object.writableData() + offset;
auto size = content_object.length() - offset;
@@ -164,30 +165,38 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object,
// Pass payload to decoder
consume(data, size, getCurrentTime());
+ producePackets();
+}
+
+void RelyDecoder::producePackets() {
// Drain decoder if possible
while (can_produce()) {
- // Get size of decoded packet
- auto size = produce_bytes();
-
- // Get buffer to copy packet in
- auto packet = core::PacketManager<>::getInstance().getMemBuf();
+ auto fec_header_size = sizeof(fec_metadata);
+ auto payload_size = produce_bytes() - sizeof(fec_metadata);
- // Copy buffer
- packet->append(size);
- std::memcpy(packet->writableData(), produce_data(), size);
+ buffer packet;
+ if (!buffer_callback_) {
+ packet = core::PacketManager<>::getInstance().getMemBuf();
+ packet->append(payload_size);
+ } else {
+ packet = buffer_callback_(payload_size);
+ }
// Read seq number
- fec_header *h = reinterpret_cast<fec_header *>(packet->writableData());
+ const fec_metadata *h =
+ reinterpret_cast<const fec_metadata *>(produce_data());
uint32_t index = h->getSeqNumberBase();
+ uint32_t metadata = h->getMetadataBase();
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "The index written in the packet is " << index;
- // Remove FEC header
- packet->trimStart(sizeof(fec_header));
+ // Copy payload
+ std::memcpy(packet->writableData(), produce_data() + fec_header_size,
+ payload_size);
// Save packet in buffer
- packets_.emplace_back(index, std::move(packet));
+ packets_.emplace_back(index, metadata, std::move(packet));
// Advance to next packet
produce_next();
@@ -198,8 +207,28 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object,
fec_callback_(packets_);
packets_.clear();
}
+
+ flushOutOfOrder();
+}
+
+void RelyDecoder::flushOutOfOrder() {
+ if (flush_timer_ == nullptr) return;
+ flush_timer_->cancel();
+
+ if (has_upcoming_flush()) {
+ flush_timer_->expires_from_now(std::chrono::milliseconds(
+ std::max((int64_t)0, upcoming_flush(getCurrentTime()))));
+
+ flush_timer_->async_wait([this](const std::error_code &ec) {
+ if (ec) return;
+ if (has_upcoming_flush()) {
+ flush(getCurrentTime());
+ producePackets();
+ }
+ });
+ }
}
} // namespace fec
} // namespace protocol
-} // namespace transport \ No newline at end of file
+} // namespace transport