summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_rtc.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.h')
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.h58
1 files changed, 49 insertions, 9 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h
index 96ad5673d..7f50a2505 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.h
+++ b/libtransport/src/protocols/prod_protocol_rtc.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -32,6 +32,7 @@ class RTCProductionProtocol : public ProductionProtocol {
using ProductionProtocol::start;
using ProductionProtocol::stop;
+ void setProducerParam() override;
void produce(ContentObject &content_object) override;
uint32_t produceStream(const Name &content_name,
@@ -49,21 +50,31 @@ class RTCProductionProtocol : public ProductionProtocol {
buffer, buffer_size, buffer_size));
}
- void registerNamespaceWithNetwork(const Prefix &producer_namespace) override;
-
void setConsumerInSyncCallback(
interface::ProducerInterestCallback &&callback) {
on_consumer_in_sync_ = std::move(callback);
}
+ auto shared_from_this() { return utils::shared_from(this); }
+
private:
// packet handlers
void onInterest(Interest &interest) override;
- void onError(std::error_code ec) override;
+ void onError(const std::error_code &ec) override{};
void processInterest(uint32_t interest_seg, uint32_t lifetime);
+ void producePktInternal(std::shared_ptr<ContentObject> &&content_object,
+ const Name &content_name, bool fec = false);
void produceInternal(std::shared_ptr<ContentObject> &&content_object,
const Name &content_name, bool fec = false);
void sendNack(uint32_t sequence);
+ void sendContentObject(std::shared_ptr<ContentObject> content_object,
+ bool nac = false, bool fec = false);
+
+ // manifests
+ void sendManifestProbe(uint32_t sequence);
+ void sendManifest(const Name &content_name);
+ std::shared_ptr<core::ContentObjectManifest> createManifest(
+ const Name &name) const;
// stats
void updateStats();
@@ -77,15 +88,25 @@ class RTCProductionProtocol : public ProductionProtocol {
void interestQueueTimer();
// FEC functions
- void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets);
+ void onFecPackets(fec::BufferArray &packets);
fec::buffer getBuffer(std::size_t size);
+ void postponeFecPacket();
+ void dispatchFecPacket();
+ void flushFecPkts(uint32_t current_seq_num);
+ // aggregated data functions
+ void emptyQueue();
+ void addPacketToQueue(std::unique_ptr<utils::MemBuf> &&buffer);
core::Name flow_name_;
- uint32_t current_seg_; // seq id of the next packet produced
- uint32_t prod_label_; // path lable of the producer
- uint32_t cache_label_; // path lable for content from the producer cache
- uint16_t data_header_size_; // hicn data header size
+ std::pair<core::Packet::Format, size_t> data_header_format_;
+ std::pair<core::Packet::Format, size_t> manifest_header_format_;
+ std::pair<core::Packet::Format, size_t> fec_header_format_;
+ std::pair<core::Packet::Format, size_t> nack_header_format_;
+
+ uint32_t current_seg_; // seq id of the next packet produced
+ uint32_t prod_label_; // path label of the producer
+ uint32_t cache_label_; // path label for content from the producer cache
uint32_t produced_bytes_; // bytes produced in the last round
uint32_t produced_packets_; // packet produed in the last round
@@ -98,7 +119,11 @@ class RTCProductionProtocol : public ProductionProtocol {
uint32_t packets_production_rate_; // pps
uint32_t fec_packets_production_rate_; // pps
+ uint64_t last_produced_data_ts_; // ms
+
std::unique_ptr<asio::steady_timer> round_timer_;
+ std::unique_ptr<asio::steady_timer> fec_pacing_timer_;
+
uint64_t last_round_;
// delayed nacks are used by the producer to avoid to send too
@@ -131,6 +156,21 @@ class RTCProductionProtocol : public ProductionProtocol {
// Save FEC packets here before sending them
std::queue<ContentObject::Ptr> pending_fec_packets_;
+ std::queue<std::pair<uint64_t, ContentObject::Ptr>> paced_fec_packets_;
+ bool pending_fec_pace_;
+
+ // Save application packets if they are small
+ std::queue<std::unique_ptr<utils::MemBuf>> waiting_app_packets_;
+ uint16_t max_len_; // len of the largest packet
+ uint16_t queue_len_; // total size of all packet in the queue
+ bool data_aggregation_; // turns on/off data aggregation
+ // timer to check the queue len
+ std::unique_ptr<asio::steady_timer> app_packets_timer_;
+ bool data_aggregation_timer_switch_; // bool to check if the timer is on
+
+ // Manifest
+ std::queue<std::pair<uint32_t, auth::CryptoHash>>
+ manifest_entries_; // map a packet suffix to a packet hash
};
} // namespace protocol