aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc45
1 files changed, 37 insertions, 8 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 740f9f77c..8ca27d24c 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -70,6 +70,7 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
this->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -88,6 +89,7 @@ RTCProducerSocket::RTCProducerSocket()
this->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -148,24 +150,31 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
producedPackets_++;
- ContentObject content_object(flowName_.setSuffix(currentSeg_));
-
+ auto content_object = std::make_shared<ContentObject>(
+ flowName_.setSuffix(currentSeg_));
auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
payload->append(TIMESTAMP_LEN);
payload->prependChain(std::move(buffer));
- content_object.appendPayload(std::move(payload));
+ content_object->appendPayload(std::move(payload));
- content_object.setLifetime(500); // XXX this should be set by the APP
+ content_object->setLifetime(500); // XXX this should be set by the APP
- content_object.setPathLabel(prodLabel_);
+ content_object->setPathLabel(prodLabel_);
- if (on_content_object_output_ != VOID_HANDLER) {
- on_content_object_output_(*this, content_object);
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object->shared_from_this()));
+
+ if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ on_content_object_in_output_buffer_(*this, *content_object);
}
- portal_->sendContentObject(content_object);
+ portal_->sendContentObject(*content_object);
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
//remove interests from the interest cache if it exists
if(!seqs_map_.empty()){
@@ -205,6 +214,26 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
return;
}
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(*interest);
+
+ if (content_object) {
+ if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_(*this, *interest);
+ }
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ return;
+ } else {
+ if (on_interest_process_ != VOID_HANDLER) {
+ on_interest_process_(*this, *interest);
+ }
+ }
+
// if the production rate is less than MIN_PRODUCTION_RATE we put the
// interest in a queue, otherwise we handle it in the usual way
if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){