diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.h')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.h | 108 |
1 files changed, 60 insertions, 48 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h index 96ad5673d..285ccb646 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.h +++ b/libtransport/src/protocols/prod_protocol_rtc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -17,6 +17,7 @@ #include <hicn/transport/core/name.h> #include <protocols/production_protocol.h> +#include <protocols/rtc/rtc_verifier.h> #include <atomic> #include <map> @@ -32,6 +33,7 @@ class RTCProductionProtocol : public ProductionProtocol { using ProductionProtocol::start; using ProductionProtocol::stop; + void setProducerParam() override; void produce(ContentObject &content_object) override; uint32_t produceStream(const Name &content_name, @@ -49,56 +51,69 @@ class RTCProductionProtocol : public ProductionProtocol { buffer, buffer_size, buffer_size)); } - void registerNamespaceWithNetwork(const Prefix &producer_namespace) override; - - void setConsumerInSyncCallback( - interface::ProducerInterestCallback &&callback) { - on_consumer_in_sync_ = std::move(callback); - } + auto shared_from_this() { return utils::shared_from(this); } private: // packet handlers void onInterest(Interest &interest) override; - void onError(std::error_code ec) override; + void onError(const std::error_code &ec) override{}; void processInterest(uint32_t interest_seg, uint32_t lifetime); + void producePktInternal(std::shared_ptr<ContentObject> &&content_object, + const Name &content_name, bool fec = false); void produceInternal(std::shared_ptr<ContentObject> &&content_object, const Name &content_name, bool fec = false); void sendNack(uint32_t sequence); + void sendContentObject(std::shared_ptr<ContentObject> content_object, + bool nac = false, bool fec = false); + + // manifests + void sendManifestProbe(uint32_t sequence); + void sendManifest(const Name &content_name); + std::shared_ptr<core::ContentObjectManifest> createManifest( + const Name &name) const; // stats - void updateStats(); + void updateStats(bool new_round); void scheduleRoundTimer(); - // pending intersts functions - void addToInterestQueue(uint32_t interest_seg, uint64_t expiration); - void sendNacksForPendingInterests(); - void removeFromInterestQueue(uint32_t interest_seg); - void scheduleQueueTimer(uint64_t wait); - void interestQueueTimer(); - // FEC functions - void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets); + void onFecPackets(fec::BufferArray &packets); fec::buffer getBuffer(std::size_t size); + void postponeFecPacket(); + void dispatchFecPacket(); + void flushFecPkts(uint32_t current_seq_num); + // aggregated data functions + void emptyQueue(); + void addPacketToQueue(std::unique_ptr<utils::MemBuf> &&buffer); core::Name flow_name_; - uint32_t current_seg_; // seq id of the next packet produced - uint32_t prod_label_; // path lable of the producer - uint32_t cache_label_; // path lable for content from the producer cache - uint16_t data_header_size_; // hicn data header size + std::pair<core::Packet::Format, size_t> data_header_format_; + std::pair<core::Packet::Format, size_t> manifest_header_format_; + std::pair<core::Packet::Format, size_t> fec_header_format_; + std::pair<core::Packet::Format, size_t> nack_header_format_; + + uint32_t current_seg_; // seq id of the next packet produced + uint32_t prod_label_; // path label of the producer + uint32_t cache_label_; // path label for content from the producer cache - uint32_t produced_bytes_; // bytes produced in the last round - uint32_t produced_packets_; // packet produed in the last round - uint32_t produced_fec_packets_; // fec packets produced last round + uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars + uint32_t prev_produced_packets_; + + uint32_t produced_bytes_; // bytes produced in the last round + uint32_t produced_packets_; // packet produed in the last round uint32_t max_packet_production_; // never exceed this number of packets // without update stats - uint32_t bytes_production_rate_; // bytes per sec - uint32_t packets_production_rate_; // pps - uint32_t fec_packets_production_rate_; // pps + uint32_t bytes_production_rate_; // bytes per sec + uint32_t packets_production_rate_; // pps + + uint64_t last_produced_data_ts_; // ms std::unique_ptr<asio::steady_timer> round_timer_; + std::unique_ptr<asio::steady_timer> fec_pacing_timer_; + uint64_t last_round_; // delayed nacks are used by the producer to avoid to send too @@ -108,29 +123,26 @@ class RTCProductionProtocol : public ProductionProtocol { // of the new rate. bool allow_delayed_nacks_; - // queue for the received interests - // this map maps the expiration time of an interest to - // its sequence number. the map is sorted by timeouts - // the same timeout may be used for multiple sequence numbers - // but for each sequence number we store only the smallest - // expiry time. In this way the mapping from seqs_map_ to - // timers_map_ is unique - std::multimap<uint64_t, uint32_t> timers_map_; - - // this map does the opposite, this map is not ordered - std::unordered_map<uint32_t, uint64_t> seqs_map_; - bool queue_timer_on_; - std::unique_ptr<asio::steady_timer> interests_queue_timer_; - - // this callback is called when the remote consumer is in sync with high - // probability. it is called only the first time that the switch happen. - // XXX this makes sense only in P2P mode, while in standard mode is - // impossible to know the state of the consumers so it should not be used. - bool consumer_in_sync_; - interface::ProducerInterestCallback on_consumer_in_sync_; - // Save FEC packets here before sending them std::queue<ContentObject::Ptr> pending_fec_packets_; + std::queue<std::pair<uint64_t, ContentObject::Ptr>> paced_fec_packets_; + bool pending_fec_pace_; + + // Save application packets if they are small + std::queue<std::unique_ptr<utils::MemBuf>> waiting_app_packets_; + uint16_t max_len_; // len of the largest packet + uint16_t queue_len_; // total size of all packet in the queue + bool data_aggregation_; // turns on/off data aggregation + // timer to check the queue len + std::unique_ptr<asio::steady_timer> app_packets_timer_; + bool data_aggregation_timer_switch_; // bool to check if the timer is on + + // Manifest + std::queue<std::pair<uint32_t, auth::CryptoHash>> + manifest_entries_; // map a packet suffix to a packet hash + + // Verifier for aggregated interests + std::shared_ptr<rtc::RTCVerifier> verifier_; }; } // namespace protocol |