diff options
Diffstat (limited to 'libtransport/src/protocols/fec/rely.cc')
-rw-r--r-- | libtransport/src/protocols/fec/rely.cc | 103 |
1 files changed, 66 insertions, 37 deletions
diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc index 7a30a62e2..d4d98a90b 100644 --- a/libtransport/src/protocols/fec/rely.cc +++ b/libtransport/src/protocols/fec/rely.cc @@ -23,38 +23,39 @@ namespace transport { namespace protocol { namespace fec { -RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) +RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t /* seq_offset */) : RelyBase(k, n) { configure(kmtu, ktimeout, kmax_stream_size); set_repair_trigger(k_, n_ - k_, n_ - k_); } void RelyEncoder::onPacketProduced(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Get pointer to payload, leaving space to insert FEC header. // TODO Check if this additional header is really needed. - auto data = content_object.writableData() + offset - sizeof(fec_header); - auto length = content_object.length() - offset + sizeof(fec_header); + auto data = content_object.writableData() + offset - sizeof(fec_metadata); + auto length = content_object.length() - offset + sizeof(fec_metadata); // Check packet length does not exceed maximum length supported by the // encoder (otherwise segmentation would take place). - assert(length < max_packet_bytes()); + DCHECK(length < max_packet_bytes()); DLOG_IF(INFO, VLOG_IS_ON(4)) - << "Encoding packet of length " << length - sizeof(fec_header); + << "Encoding packet of length " << length - sizeof(fec_metadata); - // Get the suffix. With rely we need to write it in the fec_header in order to - // be able to recognize the seq number upon recovery. + // Get the suffix. With rely we need to write it in the fec_metadata in order + // to be able to recognize the seq number upon recovery. auto suffix = content_object.getName().getSuffix(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Producing packet " << suffix << " (index == " << current_index_ << ")"; - // Consume payload. Add fec_header in front before feeding payload to encoder, - // and copy original content of packet - fec_header *h = reinterpret_cast<fec_header *>(data); - fec_header copy = *h; + // Consume payload. Add fec_metadata in front before feeding payload to + // encoder, and copy original content of packet + fec_metadata *h = reinterpret_cast<fec_metadata *>(data); + fec_metadata copy = *h; h->setSeqNumberBase(suffix); + h->setMetadataBase(metadata); auto packets = consume(data, length, getCurrentTime()); - assert(packets == 1); + DCHECK(packets == 1); // Update packet counter current_index_ += packets; @@ -62,7 +63,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // Restore original packet content and increment data pointer to the correct // position *h = copy; - data += sizeof(fec_header); + data += sizeof(fec_metadata); // Check position of this packet inside N size block auto i = current_index_ % n_; @@ -74,24 +75,24 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // TODO Optimize it by copying only the RELY header // Be sure encoder can produce - assert(can_produce()); + DCHECK(can_produce()); // Check new payload size and make sure it fits in packet buffer auto new_payload_size = produce_bytes(); int difference = new_payload_size - length; - assert(difference > 0); - assert(content_object.ensureCapacity(difference)); + DCHECK(difference > 0); + DCHECK(content_object.ensureCapacity(difference)); // Update length DLOG_IF(INFO, VLOG_IS_ON(4)) << "The packet length will be incremented by " - << difference + sizeof(fec_header); - content_object.append(difference + sizeof(fec_header)); + << difference + sizeof(fec_metadata); + content_object.append(difference + sizeof(fec_metadata)); content_object.updateLength(); // Make sure we got a source packet, otherwise we would put a repair symbol // in a source packet - assert(rely::packet_is_systematic(produce_data())); + DCHECK(rely::packet_is_systematic(produce_data())); // Copy rely packet replacing old source packet. std::memcpy(data, produce_data(), new_payload_size); @@ -111,7 +112,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, while (can_produce()) { // The current index MUST be k_, because we enforce n - k repair to be // produced after k sources - assert(current_index_ == k_); + DCHECK(current_index_ == k_); buffer packet; if (!buffer_callback_) { @@ -130,7 +131,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, std::memcpy(packet->writableData(), produce_data(), produce_bytes()); // Push symbol in repair_packets - packets_.emplace_back(0, std::move(packet)); + packets_.emplace_back(0, metadata, std::move(packet)); // Advance the encoder produce_next(); @@ -143,7 +144,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // If we have generated repair symbols, let's notify caller via the installed // callback if (packets_.size()) { - assert(packets_.size() == n_ - k_); + DCHECK(packets_.size() == n_ - k_); fec_callback_(packets_); packets_.clear(); current_index_ = 0; @@ -156,7 +157,7 @@ RelyDecoder::RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) } void RelyDecoder::onDataPacket(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Adjust pointers to point to packet payload auto data = content_object.writableData() + offset; auto size = content_object.length() - offset; @@ -164,30 +165,38 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object, // Pass payload to decoder consume(data, size, getCurrentTime()); + producePackets(); +} + +void RelyDecoder::producePackets() { // Drain decoder if possible while (can_produce()) { - // Get size of decoded packet - auto size = produce_bytes(); - - // Get buffer to copy packet in - auto packet = core::PacketManager<>::getInstance().getMemBuf(); + auto fec_header_size = sizeof(fec_metadata); + auto payload_size = produce_bytes() - sizeof(fec_metadata); - // Copy buffer - packet->append(size); - std::memcpy(packet->writableData(), produce_data(), size); + buffer packet; + if (!buffer_callback_) { + packet = core::PacketManager<>::getInstance().getMemBuf(); + packet->append(payload_size); + } else { + packet = buffer_callback_(payload_size); + } // Read seq number - fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + const fec_metadata *h = + reinterpret_cast<const fec_metadata *>(produce_data()); uint32_t index = h->getSeqNumberBase(); + uint32_t metadata = h->getMetadataBase(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "The index written in the packet is " << index; - // Remove FEC header - packet->trimStart(sizeof(fec_header)); + // Copy payload + std::memcpy(packet->writableData(), produce_data() + fec_header_size, + payload_size); // Save packet in buffer - packets_.emplace_back(index, std::move(packet)); + packets_.emplace_back(index, metadata, std::move(packet)); // Advance to next packet produce_next(); @@ -198,8 +207,28 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object, fec_callback_(packets_); packets_.clear(); } + + flushOutOfOrder(); +} + +void RelyDecoder::flushOutOfOrder() { + if (flush_timer_ == nullptr) return; + flush_timer_->cancel(); + + if (has_upcoming_flush()) { + flush_timer_->expires_from_now(std::chrono::milliseconds( + std::max((int64_t)0, upcoming_flush(getCurrentTime())))); + + flush_timer_->async_wait([this](const std::error_code &ec) { + if (ec) return; + if (has_upcoming_flush()) { + flush(getCurrentTime()); + producePackets(); + } + }); + } } } // namespace fec } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport |