diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc new file mode 100644 index 000000000..049752876 --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -0,0 +1,481 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/global_object_pool.h> +#include <implementation/socket_producer.h> +#include <protocols/prod_protocol_rtc.h> +#include <protocols/rtc/rtc_consts.h> +#include <stdlib.h> +#include <time.h> + +#include <unordered_set> + +namespace transport { +namespace protocol { + +RTCProductionProtocol::RTCProductionProtocol( + implementation::ProducerSocket *icn_socket) + : ProductionProtocol(icn_socket), + current_seg_(1), + produced_bytes_(0), + produced_packets_(0), + max_packet_production_(1), + bytes_production_rate_(0), + packets_production_rate_(0), + last_round_(std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count()), + allow_delayed_nacks_(false), + queue_timer_on_(false), + consumer_in_sync_(false), + on_consumer_in_sync_(nullptr) { + srand((unsigned int)time(NULL)); + prod_label_ = rand() % 256; + interests_queue_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + setOutputBufferSize(10000); + scheduleRoundTimer(); +} + +RTCProductionProtocol::~RTCProductionProtocol() {} + +void RTCProductionProtocol::registerNamespaceWithNetwork( + const Prefix &producer_namespace) { + ProductionProtocol::registerNamespaceWithNetwork(producer_namespace); + + flow_name_ = producer_namespace.getName(); + auto family = flow_name_.getAddressFamily(); + + switch (family) { + case AF_INET6: + header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + break; + case AF_INET: + header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); + break; + default: + throw errors::RuntimeException("Unknown name format."); + } +} + +void RTCProductionProtocol::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); +} + +void RTCProductionProtocol::updateStats() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t duration = now - last_round_; + if (duration == 0) duration = 1; + double per_second = rtc::MILLI_IN_A_SEC / duration; + + uint32_t prev_packets_production_rate = packets_production_rate_; + + bytes_production_rate_ = (uint32_t)ceil((double)produced_bytes_ * per_second); + packets_production_rate_ = (uint32_t)ceil((double)produced_packets_ * per_second); + + TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u", + produced_bytes_, bytes_production_rate_); + + // update the production rate as soon as it increases by 10% with respect to + // the last round + max_packet_production_ = + produced_packets_ + (uint32_t)ceil((double)produced_packets_ * 0.1); + if (max_packet_production_ < rtc::WIN_MIN) + max_packet_production_ = rtc::WIN_MIN; + + if (packets_production_rate_ != 0) { + allow_delayed_nacks_ = false; + } else if (prev_packets_production_rate == 0) { + // at least 2 rounds with production rate = 0 + allow_delayed_nacks_ = true; + } + + // check if the production rate is decreased. if yes send nacks if needed + if (prev_packets_production_rate < packets_production_rate_) { + sendNacksForPendingInterests(); + } + + produced_bytes_ = 0; + produced_packets_ = 0; + last_round_ = now; + scheduleRoundTimer(); +} + +uint32_t RTCProductionProtocol::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + throw errors::NotImplementedException(); +} + +uint32_t RTCProductionProtocol::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, bool is_last, + uint32_t start_offset) { + throw errors::NotImplementedException(); +} + +void RTCProductionProtocol::produce(ContentObject &content_object) { + throw errors::NotImplementedException(); +} + +uint32_t RTCProductionProtocol::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + std::size_t buffer_size = buffer->length(); + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0; + + uint32_t data_packet_size; + socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ + + rtc::DATA_HEADER_SIZE) > data_packet_size)) { + return 0; + } + + auto content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>(); + // add rtc header to the payload + struct rtc::data_packet_t header; + content_object->appendPayload((const uint8_t *)&header, + rtc::DATA_HEADER_SIZE); + content_object->appendPayload(buffer->data(), buffer->length()); + + std::shared_ptr<ContentObject> co = std::move(content_object); + + // schedule actual sending on internal thread + portal_->getIoService().dispatch( + [this, content_object{std::move(co)}, content_name]() mutable { + produceInternal(std::move(content_object), content_name); + }); + + return 1; +} + +void RTCProductionProtocol::produceInternal( + std::shared_ptr<ContentObject> &&content_object, const Name &content_name) { + // set rtc header + struct rtc::data_packet_t *data_pkt = + (struct rtc::data_packet_t *)content_object->getPayload()->data(); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + data_pkt->setTimestamp(now); + data_pkt->setProductionRate(bytes_production_rate_); + + // set hicn stuff + Name n(content_name); + content_object->setName(n.setSuffix(current_seg_)); + content_object->setLifetime(500); // XXX this should be set by the APP + content_object->setPathLabel(prod_label_); + + // update stats + produced_bytes_ += (uint32_t)( + content_object->headerSize() + content_object->payloadSize()); + produced_packets_++; + + if (produced_packets_ >= max_packet_production_) { + // in this case all the pending interests may be used to accomodate the + // sudden increase in the production rate. calling the updateStats we will + // notify all the clients + round_timer_->cancel(); + updateStats(); + } + + TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str()); + + output_buffer_.insert(content_object); + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()(*socket_->getInterface(), + *content_object); + } + + portal_->sendContentObject(*content_object); + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + // remove interests from the interest cache if it exists + removeFromInterestQueue(current_seg_); + + current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ; +} + +void RTCProductionProtocol::onInterest(Interest &interest) { + uint32_t interest_seg = interest.getName().getSuffix(); + uint32_t lifetime = interest.getLifetime(); + + if (interest_seg == 0) { + // first packet from the consumer, reset sync state + consumer_in_sync_ = false; + } + + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (interest_seg > rtc::MIN_PROBE_SEQ) { + TRANSPORT_LOGD("received probe %u", interest_seg); + sendNack(interest_seg); + return; + } + + TRANSPORT_LOGD("received interest %u", interest_seg); + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), + interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + TRANSPORT_LOGD("Send content %u (onInterest)", + content_object->getName().getSuffix()); + portal_->sendContentObject(*content_object); + return; + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + } + + // if the production rate 0 use delayed nacks + if (allow_delayed_nacks_ && interest_seg >= current_seg_) { + uint64_t next_timer = ~0; + if (!timers_map_.empty()) { + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL; + addToInterestQueue(interest_seg, expiration); + + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!queue_timer_on_) { + // set timeout + queue_timer_on_ = true; + scheduleQueueTimer(timers_map_.begin()->first - now); + } else { + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { + interests_queue_timer_->cancel(); + scheduleQueueTimer(timers_map_.begin()->first - now); + } + } + return; + } + + if (queue_timer_on_) { + // the producer is producing. Send nacks to packets that will expire before + // the data production and remove the timer + queue_timer_on_ = false; + interests_queue_timer_->cancel(); + sendNacksForPendingInterests(); + } + + uint32_t max_gap = (uint32_t)floor( + (double)((double)((double)lifetime * + rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / + rtc::MILLI_IN_A_SEC) * + (double)packets_production_rate_)); + + if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { + sendNack(interest_seg); + } else { + if (!consumer_in_sync_ && on_consumer_in_sync_) { + // we consider the remote consumer to be in sync as soon as it covers 70% + // of the production window with interests + uint32_t perc = (uint32_t)ceil((double)max_gap * 0.7); + if (interest_seg > (perc + current_seg_)) { + consumer_in_sync_ = true; + on_consumer_in_sync_(*socket_->getInterface(), interest); + } + } + uint64_t expiration =(uint32_t)( + now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR)); + addToInterestQueue(interest_seg, expiration); + } +} + +void RTCProductionProtocol::onError(std::error_code ec) {} + +void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) { + interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait)); + interests_queue_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestQueueTimer(); + }); +} + +void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, + uint64_t expiration) { + // check if the seq number exists already + auto it_seqs = seqs_map_.find(interest_seg); + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interest_seg)); + it_seqs->second = expiration; + } else { + // nothing to do here + return; + } + } else { + // add the new seq + timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration)); + } +} + +void RTCProductionProtocol::sendNacksForPendingInterests() { + std::unordered_set<uint32_t> to_remove; + + uint32_t packet_gap = 100000; // set it to a high value (100sec) + if (packets_production_rate_ != 0) + packet_gap = (uint32_t)ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first > current_seg_) { + uint64_t production_time = + ((it->first - current_seg_) * packet_gap) + now; + if (production_time >= it->second) { + sendNack(it->first); + to_remove.insert(it->first); + } + } + } + + // delete nacked interests + for (auto it = to_remove.begin(); it != to_remove.end(); it++) { + removeFromInterestQueue(*it); + } +} + +void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) { + auto seq_it = seqs_map_.find(interest_seg); + if (seq_it != seqs_map_.end()) { + auto range = timers_map_.equal_range(seq_it->second); + for (auto it_timers = range.first; it_timers != range.second; it_timers++) { + if (it_timers->second == seq_it->first) { + timers_map_.erase(it_timers); + break; + } + } + seqs_map_.erase(seq_it); + } +} + +void RTCProductionProtocol::interestQueueTimer() { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { + uint64_t expire = it_timers->first; + if (expire <= now) { + uint32_t seq = it_timers->second; + sendNack(seq); + // remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + } else { + // stop, we are done! + break; + } + } + if (timers_map_.empty()) { + queue_timer_on_ = false; + } else { + queue_timer_on_ = true; + scheduleQueueTimer(timers_map_.begin()->first - now); + } +} + +void RTCProductionProtocol::sendNack(uint32_t sequence) { + auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>(); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint32_t next_packet = current_seg_; + uint32_t prod_rate = bytes_production_rate_; + + struct rtc::nack_packet_t header; + header.setTimestamp(now); + header.setProductionRate(prod_rate); + header.setProductionSegement(next_packet); + nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE); + + Name n(flow_name_); + n.setSuffix(sequence); + nack->setName(n); + nack->setLifetime(0); + nack->setPathLabel(prod_label_); + + if (!consumer_in_sync_ && on_consumer_in_sync_ && + sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) { + consumer_in_sync_ = true; + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + interest->setName(n); + on_consumer_in_sync_(*socket_->getInterface(), *interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *nack); + } + + TRANSPORT_LOGD("Send nack %u", sequence); + portal_->sendContentObject(*nack); +} + +} // namespace protocol + +} // end namespace transport |