diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.h')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.h | 58 |
1 files changed, 49 insertions, 9 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h index 96ad5673d..7f50a2505 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.h +++ b/libtransport/src/protocols/prod_protocol_rtc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -32,6 +32,7 @@ class RTCProductionProtocol : public ProductionProtocol { using ProductionProtocol::start; using ProductionProtocol::stop; + void setProducerParam() override; void produce(ContentObject &content_object) override; uint32_t produceStream(const Name &content_name, @@ -49,21 +50,31 @@ class RTCProductionProtocol : public ProductionProtocol { buffer, buffer_size, buffer_size)); } - void registerNamespaceWithNetwork(const Prefix &producer_namespace) override; - void setConsumerInSyncCallback( interface::ProducerInterestCallback &&callback) { on_consumer_in_sync_ = std::move(callback); } + auto shared_from_this() { return utils::shared_from(this); } + private: // packet handlers void onInterest(Interest &interest) override; - void onError(std::error_code ec) override; + void onError(const std::error_code &ec) override{}; void processInterest(uint32_t interest_seg, uint32_t lifetime); + void producePktInternal(std::shared_ptr<ContentObject> &&content_object, + const Name &content_name, bool fec = false); void produceInternal(std::shared_ptr<ContentObject> &&content_object, const Name &content_name, bool fec = false); void sendNack(uint32_t sequence); + void sendContentObject(std::shared_ptr<ContentObject> content_object, + bool nac = false, bool fec = false); + + // manifests + void sendManifestProbe(uint32_t sequence); + void sendManifest(const Name &content_name); + std::shared_ptr<core::ContentObjectManifest> createManifest( + const Name &name) const; // stats void updateStats(); @@ -77,15 +88,25 @@ class RTCProductionProtocol : public ProductionProtocol { void interestQueueTimer(); // FEC functions - void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets); + void onFecPackets(fec::BufferArray &packets); fec::buffer getBuffer(std::size_t size); + void postponeFecPacket(); + void dispatchFecPacket(); + void flushFecPkts(uint32_t current_seq_num); + // aggregated data functions + void emptyQueue(); + void addPacketToQueue(std::unique_ptr<utils::MemBuf> &&buffer); core::Name flow_name_; - uint32_t current_seg_; // seq id of the next packet produced - uint32_t prod_label_; // path lable of the producer - uint32_t cache_label_; // path lable for content from the producer cache - uint16_t data_header_size_; // hicn data header size + std::pair<core::Packet::Format, size_t> data_header_format_; + std::pair<core::Packet::Format, size_t> manifest_header_format_; + std::pair<core::Packet::Format, size_t> fec_header_format_; + std::pair<core::Packet::Format, size_t> nack_header_format_; + + uint32_t current_seg_; // seq id of the next packet produced + uint32_t prod_label_; // path label of the producer + uint32_t cache_label_; // path label for content from the producer cache uint32_t produced_bytes_; // bytes produced in the last round uint32_t produced_packets_; // packet produed in the last round @@ -98,7 +119,11 @@ class RTCProductionProtocol : public ProductionProtocol { uint32_t packets_production_rate_; // pps uint32_t fec_packets_production_rate_; // pps + uint64_t last_produced_data_ts_; // ms + std::unique_ptr<asio::steady_timer> round_timer_; + std::unique_ptr<asio::steady_timer> fec_pacing_timer_; + uint64_t last_round_; // delayed nacks are used by the producer to avoid to send too @@ -131,6 +156,21 @@ class RTCProductionProtocol : public ProductionProtocol { // Save FEC packets here before sending them std::queue<ContentObject::Ptr> pending_fec_packets_; + std::queue<std::pair<uint64_t, ContentObject::Ptr>> paced_fec_packets_; + bool pending_fec_pace_; + + // Save application packets if they are small + std::queue<std::unique_ptr<utils::MemBuf>> waiting_app_packets_; + uint16_t max_len_; // len of the largest packet + uint16_t queue_len_; // total size of all packet in the queue + bool data_aggregation_; // turns on/off data aggregation + // timer to check the queue len + std::unique_ptr<asio::steady_timer> app_packets_timer_; + bool data_aggregation_timer_switch_; // bool to check if the timer is on + + // Manifest + std::queue<std::pair<uint32_t, auth::CryptoHash>> + manifest_entries_; // map a packet suffix to a packet hash }; } // namespace protocol |