diff options
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 740f9f77c..8ca27d24c 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -70,6 +70,7 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) this->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -88,6 +89,7 @@ RTCProducerSocket::RTCProducerSocket() this->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -148,24 +150,31 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - ContentObject content_object(flowName_.setSuffix(currentSeg_)); - + auto content_object = std::make_shared<ContentObject>( + flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); payload->append(TIMESTAMP_LEN); payload->prependChain(std::move(buffer)); - content_object.appendPayload(std::move(payload)); + content_object->appendPayload(std::move(payload)); - content_object.setLifetime(500); // XXX this should be set by the APP + content_object->setLifetime(500); // XXX this should be set by the APP - content_object.setPathLabel(prodLabel_); + content_object->setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object->shared_from_this())); + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, *content_object); } - portal_->sendContentObject(content_object); + portal_->sendContentObject(*content_object); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } //remove interests from the interest cache if it exists if(!seqs_map_.empty()){ @@ -205,6 +214,26 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { return; } + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(*interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + on_interest_satisfied_output_buffer_(*this, *interest); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + return; + } else { + if (on_interest_process_ != VOID_HANDLER) { + on_interest_process_(*this, *interest); + } + } + // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ |