diff options
Diffstat (limited to 'libtransport/src/protocols/fec/rely.cc')
-rw-r--r-- | libtransport/src/protocols/fec/rely.cc | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc new file mode 100644 index 000000000..7a30a62e2 --- /dev/null +++ b/libtransport/src/protocols/fec/rely.cc @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/core/global_object_pool.h> +#include <protocols/fec/rely.h> + +#include <rely/packet.hpp> + +namespace transport { +namespace protocol { +namespace fec { + +RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : RelyBase(k, n) { + configure(kmtu, ktimeout, kmax_stream_size); + set_repair_trigger(k_, n_ - k_, n_ - k_); +} + +void RelyEncoder::onPacketProduced(core::ContentObject &content_object, + uint32_t offset) { + // Get pointer to payload, leaving space to insert FEC header. + // TODO Check if this additional header is really needed. + auto data = content_object.writableData() + offset - sizeof(fec_header); + auto length = content_object.length() - offset + sizeof(fec_header); + + // Check packet length does not exceed maximum length supported by the + // encoder (otherwise segmentation would take place). + assert(length < max_packet_bytes()); + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Encoding packet of length " << length - sizeof(fec_header); + + // Get the suffix. With rely we need to write it in the fec_header in order to + // be able to recognize the seq number upon recovery. + auto suffix = content_object.getName().getSuffix(); + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Producing packet " << suffix + << " (index == " << current_index_ << ")"; + + // Consume payload. Add fec_header in front before feeding payload to encoder, + // and copy original content of packet + fec_header *h = reinterpret_cast<fec_header *>(data); + fec_header copy = *h; + h->setSeqNumberBase(suffix); + auto packets = consume(data, length, getCurrentTime()); + assert(packets == 1); + + // Update packet counter + current_index_ += packets; + + // Restore original packet content and increment data pointer to the correct + // position + *h = copy; + data += sizeof(fec_header); + + // Check position of this packet inside N size block + auto i = current_index_ % n_; + + // encoder will produce a source packet + if (i <= k_) { + // Rely modifies the payload of the packet. We replace the packet with the + // one returned by rely. + // TODO Optimize it by copying only the RELY header + + // Be sure encoder can produce + assert(can_produce()); + + // Check new payload size and make sure it fits in packet buffer + auto new_payload_size = produce_bytes(); + int difference = new_payload_size - length; + + assert(difference > 0); + assert(content_object.ensureCapacity(difference)); + + // Update length + DLOG_IF(INFO, VLOG_IS_ON(4)) << "The packet length will be incremented by " + << difference + sizeof(fec_header); + content_object.append(difference + sizeof(fec_header)); + content_object.updateLength(); + + // Make sure we got a source packet, otherwise we would put a repair symbol + // in a source packet + assert(rely::packet_is_systematic(produce_data())); + + // Copy rely packet replacing old source packet. + std::memcpy(data, produce_data(), new_payload_size); + + // Advance the encoder to next symbol. + produce_next(); + } + +#if 0 + if (i == k_) { + // Ensure repair are generated after k source packets + flush_repair(); + } +#endif + + // Here we should produce all the repair packets + while (can_produce()) { + // The current index MUST be k_, because we enforce n - k repair to be + // produced after k sources + assert(current_index_ == k_); + + buffer packet; + if (!buffer_callback_) { + // If no callback is installed, let's allocate a buffer from global pool + packet = core::PacketManager<>::getInstance().getMemBuf(); + packet->append(produce_bytes()); + } else { + // Otherwise let's ask a buffer to the caller. + packet = buffer_callback_(produce_bytes()); + } + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Producing symbol of size " << produce_bytes(); + + // Copy symbol to packet buffer + std::memcpy(packet->writableData(), produce_data(), produce_bytes()); + + // Push symbol in repair_packets + packets_.emplace_back(0, std::move(packet)); + + // Advance the encoder + produce_next(); + } + + // Print number of unprotected symbols + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Number of unprotected symbols: " << unprotected_symbols(); + + // If we have generated repair symbols, let's notify caller via the installed + // callback + if (packets_.size()) { + assert(packets_.size() == n_ - k_); + fec_callback_(packets_); + packets_.clear(); + current_index_ = 0; + } +} + +RelyDecoder::RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : RelyBase(k, n, seq_offset) { + configure(kmtu, ktimeout, kmax_stream_size); +} + +void RelyDecoder::onDataPacket(core::ContentObject &content_object, + uint32_t offset) { + // Adjust pointers to point to packet payload + auto data = content_object.writableData() + offset; + auto size = content_object.length() - offset; + + // Pass payload to decoder + consume(data, size, getCurrentTime()); + + // Drain decoder if possible + while (can_produce()) { + // Get size of decoded packet + auto size = produce_bytes(); + + // Get buffer to copy packet in + auto packet = core::PacketManager<>::getInstance().getMemBuf(); + + // Copy buffer + packet->append(size); + std::memcpy(packet->writableData(), produce_data(), size); + + // Read seq number + fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + uint32_t index = h->getSeqNumberBase(); + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "The index written in the packet is " << index; + + // Remove FEC header + packet->trimStart(sizeof(fec_header)); + + // Save packet in buffer + packets_.emplace_back(index, std::move(packet)); + + // Advance to next packet + produce_next(); + } + + // If we produced packets, lets notify the caller via the callback + if (packets_.size() > 0) { + fec_callback_(packets_); + packets_.clear(); + } +} + +} // namespace fec +} // namespace protocol +} // namespace transport
\ No newline at end of file |