From 34007e52db95c5e49546e27e264d4bc7e1fb46a6 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Wed, 16 Oct 2019 17:16:48 +0200 Subject: [HICN-332] add output buffer to the RTC producer socket Signed-off-by: michele papalini Change-Id: I119f2a4b4b7153e8da7ca891112f4f0ddf8251e5 --- .../transport/interfaces/rtc_socket_producer.cc | 45 ++++++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 740f9f77c..8ca27d24c 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -70,6 +70,7 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) this->getIoService()); round_timer_ = std::make_unique( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -88,6 +89,7 @@ RTCProducerSocket::RTCProducerSocket() this->getIoService()); round_timer_ = std::make_unique( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -148,24 +150,31 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - ContentObject content_object(flowName_.setSuffix(currentSeg_)); - + auto content_object = std::make_shared( + flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); payload->append(TIMESTAMP_LEN); payload->prependChain(std::move(buffer)); - content_object.appendPayload(std::move(payload)); + content_object->appendPayload(std::move(payload)); - content_object.setLifetime(500); // XXX this should be set by the APP + content_object->setLifetime(500); // XXX this should be set by the APP - content_object.setPathLabel(prodLabel_); + content_object->setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + output_buffer_.insert(std::static_pointer_cast( + content_object->shared_from_this())); + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, *content_object); } - portal_->sendContentObject(content_object); + portal_->sendContentObject(*content_object); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } //remove interests from the interest cache if it exists if(!seqs_map_.empty()){ @@ -205,6 +214,26 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { return; } + const std::shared_ptr content_object = + output_buffer_.find(*interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + on_interest_satisfied_output_buffer_(*this, *interest); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + return; + } else { + if (on_interest_process_ != VOID_HANDLER) { + on_interest_process_(*this, *interest); + } + } + // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ -- cgit 1.2.3-korg