diff options
Diffstat (limited to 'libtransport/src/protocols/fec/rs.cc')
-rw-r--r-- | libtransport/src/protocols/fec/rs.cc | 161 |
1 files changed, 96 insertions, 65 deletions
diff --git a/libtransport/src/protocols/fec/rs.cc b/libtransport/src/protocols/fec/rs.cc index 2c23d515d..9c0a3d4fb 100644 --- a/libtransport/src/protocols/fec/rs.cc +++ b/libtransport/src/protocols/fec/rs.cc @@ -46,23 +46,26 @@ bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i, to_decode_ = true; DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding symbol of size " << packet->length(); return addSymbol(packet, i, offset, - packet->length() - sizeof(fec_header) - offset); + packet->length() - sizeof(fec_header) - offset, + FECBase::INVALID_METADATA); } bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding source symbol of size " << packet->length() << ", offset " << offset; - return addSymbol(packet, i, offset, packet->length() - offset); + return addSymbol(packet, i, offset, packet->length() - offset, metadata); } bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, - uint32_t offset, std::size_t size) { + uint32_t offset, std::size_t size, + uint32_t metadata) { if (size > max_buffer_size_) { max_buffer_size_ = size; } - operator[](current_block_size_++) = std::make_tuple(i, packet, offset); + operator[](current_block_size_) = RSBufferInfo(offset, i, metadata, packet); + current_block_size_++; if (current_block_size_ >= k_) { if (to_decode_) { @@ -80,12 +83,13 @@ bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, void BlockCode::encode() { gf *data[n_]; - uint32_t base = std::get<0>(operator[](0)); + uint32_t base = operator[](0).getIndex(); // Set packet length in first 2 bytes for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - auto offset = std::get<2>(operator[](i)); + auto &packet = operator[](i).getBuffer(); + auto offset = operator[](i).getOffset(); + auto metadata_base = operator[](i).getMetadata(); auto ret = packet->ensureCapacityAndFillUnused(max_buffer_size_ + offset, 0); @@ -98,10 +102,11 @@ void BlockCode::encode() { // Buffers should hold 2 *after* the padding, in order to be // able to set the length for the encoding operation. // packet->trimStart(offset); - uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData() + - max_buffer_size_ + offset); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ + offset); auto buffer_length = packet->length() - offset; - *length = htons(buffer_length); + metadata->setPacketLength(buffer_length); + metadata->setMetadataBase(metadata_base); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current buffer size: " << packet->length(); @@ -109,7 +114,7 @@ void BlockCode::encode() { } // Finish to fill source block with the buffers to hold the repair symbols - auto length = max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES; + auto length = max_buffer_size_ + sizeof(fec_header) + METADATA_BYTES; for (uint32_t i = k_; i < n_; i++) { buffer packet; if (!params_.buffer_callback_) { @@ -133,19 +138,20 @@ void BlockCode::encode() { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current symbol size: " << packet->length(); data[i] = packet->writableData(); - operator[](i) = std::make_tuple(i, std::move(packet), uint32_t(0)); + operator[](i) = RSBufferInfo(uint32_t(0), i, FECBase::INVALID_METADATA, + std::move(packet)); } // Generate repair symbols and put them in corresponding buffers DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling encode with max_buffer_size_ = " << max_buffer_size_; for (uint32_t i = k_; i < n_; i++) { - fec_encode(code_, data, data[i], i, max_buffer_size_ + LEN_SIZE_BYTES); + fec_encode(code_, data, data[i], i, max_buffer_size_ + METADATA_BYTES); } // Re-include header in repair packets for (uint32_t i = k_; i < n_; i++) { - auto &packet = std::get<1>(operator[](i)); + auto &packet = operator[](i).getBuffer(); packet->prepend(sizeof(fec_header)); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Produced repair symbol of size = " << packet->length(); @@ -153,16 +159,31 @@ void BlockCode::encode() { } void BlockCode::decode() { - gf *data[k_]; + gf *data[n_]; uint32_t index[k_]; + buffer aux_fec_packets[n_ - k_]; + // FEC packet number k0 + uint32_t k0 = 0; + + // Reorder block by index with in-place sorting + for (uint32_t i = 0; i < k_;) { + uint32_t idx = operator[](i).getIndex(); + if (idx >= k_ || idx == i) { + i++; + } else { + std::swap(operator[](i), operator[](idx)); + } + } for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - index[i] = std::get<0>(operator[](i)); - auto offset = std::get<2>(operator[](i)); + auto &packet = operator[](i).getBuffer(); + index[i] = operator[](i).getIndex(); + auto offset = operator[](i).getOffset(); + auto metadata_base = operator[](i).getMetadata(); sorted_index_[i] = index[i]; if (index[i] < k_) { + operator[](i).setReceived(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "DECODE SOURCE - index " << index[i] << " - Current buffer size: " << packet->length(); @@ -173,49 +194,51 @@ void BlockCode::decode() { // able to set the length for the encoding operation packet->trimStart(offset); packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); - uint16_t *length = reinterpret_cast<uint16_t *>( - packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); - - *length = htons(packet->length()); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ - METADATA_BYTES); + metadata->setPacketLength(packet->length()); + metadata->setMetadataBase(metadata_base); } else { DLOG_IF(INFO, VLOG_IS_ON(4)) << "DECODE SYMBOL - index " << index[i] << " - Current buffer size: " << packet->length(); packet->trimStart(sizeof(fec_header) + offset); + aux_fec_packets[k0] = core::PacketManager<>::getInstance().getMemBuf(); + data[k_ + k0] = aux_fec_packets[k0]->writableData(); + k0++; } - data[i] = packet->writableData(); } - // We decode the source block DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling decode with max_buffer_size_ = " << max_buffer_size_; + fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); // Find the index in the block for recovered packets - for (uint32_t i = 0; i < k_; i++) { - if (index[i] != i) { - for (uint32_t j = 0; j < k_; j++) - if (sorted_index_[j] == uint32_t(index[i])) { - sorted_index_[j] = i; - } - } - } - - // Reorder block by index with in-place sorting - for (uint32_t i = 0; i < k_; i++) { - for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { - std::swap(sorted_index_[j], sorted_index_[i]); - std::swap(operator[](j), operator[](i)); + for (uint32_t i = 0, j = 0; i < k_; i++) { + if (index[i] >= k_) { + operator[](i).setBuffer(aux_fec_packets[j++]); + operator[](i).setIndex(i); } } // Adjust length according to the one written in the source packet for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - uint16_t *length = reinterpret_cast<uint16_t *>( - packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); - packet->setLength(ntohs(*length)); + auto &packet = operator[](i).getBuffer(); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ - METADATA_BYTES); + // Adjust buffer length + packet->setLength(metadata->getPacketLength()); + // Adjust metadata + operator[](i).setMetadata(metadata->getMetadataBase()); + + // reset the point to the beginning of the packets for all received packets + if (operator[](i).getReceived()) { + auto &packet = operator[](i).getBuffer(); + auto offset = operator[](i).getOffset(); + packet->prepend(offset); + } } } @@ -252,12 +275,11 @@ RSEncoder::RSEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) source_block_(k_, n_, seq_offset_, current_code_, *this) {} void RSEncoder::consume(const fec::buffer &packet, uint32_t index, - uint32_t offset) { - if (!source_block_.addSourceSymbol(packet, index, offset)) { - std::vector<std::pair<uint32_t, buffer>> repair_packets; + uint32_t offset, uint32_t metadata) { + if (!source_block_.addSourceSymbol(packet, index, offset, metadata)) { + fec::BufferArray repair_packets; for (uint32_t i = k_; i < n_; i++) { - repair_packets.emplace_back(std::move(std::get<0>(source_block_[i])), - std::move(std::get<1>(source_block_[i]))); + repair_packets.emplace_back(std::move(source_block_[i])); } fec_callback_(repair_packets); @@ -265,9 +287,9 @@ void RSEncoder::consume(const fec::buffer &packet, uint32_t index, } void RSEncoder::onPacketProduced(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { consume(content_object.shared_from_this(), - content_object.getName().getSuffix(), offset); + content_object.getName().getSuffix(), offset, metadata); } RSDecoder::RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) @@ -276,10 +298,15 @@ RSDecoder::RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) void RSDecoder::recoverPackets(SourceBlocks::iterator &src_block_it) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "recoverPackets for " << k_; auto &src_block = src_block_it->second; - std::vector<std::pair<uint32_t, buffer>> source_packets(k_); + auto base_index = src_block_it->first; + BufferArray source_packets(k_); + + // Iterate over packets in the block and adjust indexed accordingly. This must + // be done because indexes are from 0 to (n - k - 1), but we need indexes from + // base_index to base_index + (n - k - 1) for (uint32_t i = 0; i < src_block.getK(); i++) { - source_packets[i] = std::make_pair(src_block_it->first + i, - std::move(std::get<1>(src_block[i]))); + src_block[i].setIndex(base_index + src_block[i].getIndex()); + source_packets[i] = FECBufferInfo(std::move(src_block[i])); } setProcessed(src_block_it->first); @@ -296,9 +323,9 @@ void RSDecoder::recoverPackets(SourceBlocks::iterator &src_block_it) { } void RSDecoder::consumeSource(const fec::buffer &packet, uint32_t index, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Normalize index - assert(index >= seq_offset_); + DCHECK(index >= seq_offset_); auto i = (index - seq_offset_) % n_; // Get base @@ -324,16 +351,19 @@ void RSDecoder::consumeSource(const fec::buffer &packet, uint32_t index, // protocol that may come in the future. auto it = src_blocks_.find(base); if (it != src_blocks_.end()) { - auto ret = it->second.addSourceSymbol(packet, i, offset); + auto ret = it->second.addSourceSymbol(packet, i, offset, metadata); if (!ret) { recoverPackets(it); } } else { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding to parked source packets"; - auto ret = parked_packets_.emplace( - base, std::vector<std::pair<buffer, uint32_t>>()); - ret.first->second.emplace_back(packet, i); + auto ret = parked_packets_.emplace(base, BufferInfoArray()); + ret.first->second.emplace_back(offset, i, metadata, packet); + /** + * If we reached k source packets, we do not have any missing packet to + * recover via FEC. Delete the block. + */ if (ret.first->second.size() >= k_) { setProcessed(ret.first->first); parked_packets_.erase(ret.first); @@ -356,8 +386,8 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Decoder consume called for repair symbol. BASE = " << base - << ", index = " << base + i << " and i = " << i << ". K=" << k - << ", N=" << n; + << ", index = " << base + i << " and i = " << (int)i << ". K=" << (int)k + << ", N=" << (int)n; // check if a source block already exist for this symbol auto it = src_blocks_.find(base); @@ -380,8 +410,9 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { auto it2 = parked_packets_.find(base); if (it2 != parked_packets_.end()) { for (auto &packet_index : it2->second) { - auto ret = it->second.addSourceSymbol(packet_index.first, - packet_index.second, offset); + auto ret = it->second.addSourceSymbol( + packet_index.getBuffer(), packet_index.getIndex(), + packet_index.getOffset(), packet_index.getMetadata()); if (!ret) { recoverPackets(it); // Finish to delete packets in same source block that were @@ -399,7 +430,7 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { } void RSDecoder::onDataPacket(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling fec for data packet " << content_object.getName() << ". Offset: " << offset; @@ -409,7 +440,7 @@ void RSDecoder::onDataPacket(core::ContentObject &content_object, if (isSymbol(suffix)) { consumeRepair(content_object.shared_from_this(), offset); } else { - consumeSource(content_object.shared_from_this(), suffix, offset); + consumeSource(content_object.shared_from_this(), suffix, offset, metadata); } } |