diff options
author | michele papalini <micpapal@cisco.com> | 2019-10-16 17:16:48 +0200 |
---|---|---|
committer | michele papalini <micpapal@cisco.com> | 2019-10-16 17:16:48 +0200 |
commit | 34007e52db95c5e49546e27e264d4bc7e1fb46a6 (patch) | |
tree | 3f038a010defd8b63284f53d80f32dfd582f2764 /libtransport/src | |
parent | d173417e820c98d4a8734703f36dbeb3a82ec03f (diff) |
[HICN-332] add output buffer to the RTC producer socket
Signed-off-by: michele papalini <micpapal@cisco.com>
Change-Id: I119f2a4b4b7153e8da7ca891112f4f0ddf8251e5
Diffstat (limited to 'libtransport/src')
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 740f9f77c..8ca27d24c 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -70,6 +70,7 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) this->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -88,6 +89,7 @@ RTCProducerSocket::RTCProducerSocket() this->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -148,24 +150,31 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - ContentObject content_object(flowName_.setSuffix(currentSeg_)); - + auto content_object = std::make_shared<ContentObject>( + flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); payload->append(TIMESTAMP_LEN); payload->prependChain(std::move(buffer)); - content_object.appendPayload(std::move(payload)); + content_object->appendPayload(std::move(payload)); - content_object.setLifetime(500); // XXX this should be set by the APP + content_object->setLifetime(500); // XXX this should be set by the APP - content_object.setPathLabel(prodLabel_); + content_object->setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object->shared_from_this())); + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, *content_object); } - portal_->sendContentObject(content_object); + portal_->sendContentObject(*content_object); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } //remove interests from the interest cache if it exists if(!seqs_map_.empty()){ @@ -205,6 +214,26 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { return; } + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(*interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + on_interest_satisfied_output_buffer_(*this, *interest); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + return; + } else { + if (on_interest_process_ != VOID_HANDLER) { + on_interest_process_(*this, *interest); + } + } + // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ |