aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-03 10:03:56 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-15 11:37:30 +0200
commitc365689250216861fd7727203ee6ba1049ad5778 (patch)
tree97f1f3d1a6cb7314f1292d97be6d8e8e06cc998b /libtransport/src/hicn/transport/protocols
parentd8ce6d98a2a726393655bd71eb81b8ef5222d6ba (diff)
[HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application.
Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc62
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc78
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h8
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc204
4 files changed, 227 insertions, 125 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index b8a7c9610..7f0310e7c 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
}
RaaqmTransportProtocol::~RaaqmTransportProtocol() {
- if (this->rate_estimator_) {
- delete this->rate_estimator_;
+ if (rate_estimator_) {
+ delete rate_estimator_;
}
}
int RaaqmTransportProtocol::start() {
- if (this->rate_estimator_) {
- this->rate_estimator_->onStart();
+ if (rate_estimator_) {
+ rate_estimator_->onStart();
}
if (!cur_path_) {
@@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() {
choice_param);
if (choice_param == 1) {
- this->rate_estimator_ = new ALaTcpEstimator();
+ rate_estimator_ = new ALaTcpEstimator();
} else {
- this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ rate_estimator_ = new SimpleEstimator(alpha, batching_param);
}
socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
- &this->rate_estimator_->observer_);
+ &rate_estimator_->observer_);
// Current path
auto cur_path = std::make_unique<RaaqmDataPath>(
@@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowIncrease(current_window_size_);
+ rate_estimator_->onWindowIncrease(current_window_size_);
}
void RaaqmTransportProtocol::decreaseWindow() {
@@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowDecrease(current_window_size_);
+ rate_estimator_->onWindowDecrease(current_window_size_);
}
void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
@@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception(
updatePathTable(content_object);
increaseWindow();
updateRtt(interest.getName().getSuffix());
- this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
- (int)content_object.headerSize());
+ rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ (int)content_object.headerSize());
// Set drop probablility and window size accordingly
RAAQM();
}
@@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment(
reassemble(std::move(content_object));
} else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
index_manager_->getFinalSuffix())) {
- onContentReassembled(std::make_error_code(std::errc(0)));
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload != nullptr) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ }
}
} else {
// TODO Application policy check
@@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
return;
}
- // This is set to ~0 so that the next interest_retransmissions_ + 1, performed
- // by sendInterest, will result in 0
+ // This is set to ~0 so that the next interest_retransmissions_ + 1,
+ // performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
@@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if (*on_payload != VOID_HANDLER) {
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(
- interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
- (*on_payload)(*socket_, content_buffer->size(), ec);
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ } else {
+ on_payload->readError(ec);
}
- this->rate_estimator_->onDownloadFinished();
+ rate_estimator_->onDownloadFinished();
stop();
}
@@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
// Update stats
updateStats((uint32_t)segment, rtt.count(), now);
- if (this->rate_estimator_) {
- this->rate_estimator_->onRttUpdate((double)rtt.count());
+ if (rate_estimator_) {
+ rate_estimator_->onRttUpdate((double)rtt.count());
}
cur_path_->insertNewRtt(rtt.count());
@@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() {
} // end namespace protocol
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index 899f701c7..a2062df93 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -17,6 +17,7 @@
#include <hicn/transport/protocols/indexing_manager.h>
#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
namespace transport {
@@ -30,7 +31,8 @@ BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
manifest_index_manager_(
std::make_unique<ManifestIndexManager>(icn_socket)),
index_manager_(incremental_index_manager_.get()),
- index_(0) {
+ index_(0),
+ read_buffer_(nullptr) {
setContentCallback(content_callback);
}
@@ -54,30 +56,88 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
void BaseReassembly::copyContent(const ContentObject &content_object) {
auto a = content_object.getPayload();
-
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- reassembly_consumer_socket_->getSocketOption(
- interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
-
- content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(),
- (uint8_t *)a->data() + a->length());
+ auto payload_length = a->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
bool download_completed =
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
if (TRANSPORT_EXPECT_FALSE(download_completed)) {
+ notifyApplication();
content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
}
}
+void BaseReassembly::notifyApplication() {
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ if (TRANSPORT_EXPECT_FALSE(!read_callback)) {
+ TRANSPORT_LOGE("Read callback not installed!");
+ return;
+ }
+
+ if (read_callback->isBufferMovable()) {
+ // No need to perform an additional copy. The whole buffer will be
+ // tranferred to the application.
+
+ read_callback->readBufferAvailable(std::move(read_buffer_));
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer_->length();
+
+ while (read_buffer_->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer_->length(), length);
+ std::memcpy(buffer, read_buffer_->data(), to_copy);
+ read_buffer_->trimStart(to_copy);
+ }
+
+ read_callback->readDataAvailable(total_length);
+ read_buffer_->clear();
+ }
+}
+
void BaseReassembly::reset() {
manifest_index_manager_->reset();
incremental_index_manager_->reset();
index_ = index_manager_->getNextReassemblySegment();
received_packets_.clear();
+
+ // reset read buffer
+ interface::ConsumerSocket::ReadCallback *read_callback;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
}
} // namespace protocol
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index 9efddb773..79f0ea4d2 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -20,6 +20,10 @@
namespace transport {
+namespace interface {
+class ConsumerReadCallback;
+}
+
namespace protocol {
// Forward Declaration
@@ -54,6 +58,9 @@ class BaseReassembly : public Reassembly {
virtual void reset() override;
+ private:
+ void notifyApplication();
+
protected:
// The consumer socket
interface::ConsumerSocket *reassembly_consumer_socket_;
@@ -63,6 +70,7 @@ class BaseReassembly : public Reassembly {
std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_;
uint64_t index_;
+ std::unique_ptr<utils::MemBuf> read_buffer_;
};
} // namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 9402d3b02..4205ade4e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats(
uint32_t segmentNumber = content_object.getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- if (inflightInterests_[pkt].state != sent_)
- return;
+ if (inflightInterests_[pkt].state != sent_) return;
- if(interestRetransmissions_.find(segmentNumber) !=
+ if (interestRetransmissions_.find(segmentNumber) !=
interestRetransmissions_.end())
- //this packet was rtx at least once
+ // this packet was rtx at least once
return;
uint32_t pathLabel = content_object.getPathLabel();
@@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() {
} else {
currentCWin_ = min(
maxCWin_,
- (uint32_t)ceil(currentCWin_ +
- (1.0 / (double)currentCWin_))); // linear
+ (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
}
}
@@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
if (!rtx) {
inflightInterestsCount_++;
}
-
}
void RTCTransportProtocol::scheduleNextInterests() {
@@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
while (inflightInterestsCount_ < currentCWin_) {
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
- //we send the packet only if it is not pending yet
+ // we send the packet only if it is not pending yet
interest_name->setSuffix(actualSegment_);
if (portal_->interestIsPending(*interest_name)) {
actualSegment_++;
@@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
uint32_t pkt = actualSegment_ & modMask_;
- //if we already reacevied the content we don't ask it again
- if(inflightInterests_[pkt].state == received_ &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
- continue;
+ // if we already reacevied the content we don't ask it again
+ if (inflightInterests_[pkt].state == received_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_++;
+ continue;
}
inflightInterests_[pkt].transmissionTime =
@@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
#endif
}
-void RTCTransportProtocol::addRetransmissions(uint32_t val){
- //add only val in the rtx list
+void RTCTransportProtocol::addRetransmissions(uint32_t val) {
+ // add only val in the rtx list
addRetransmissions(val, val + 1);
}
-void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){
- for(uint32_t i = start; i < stop; i++){
+void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
- if(it == interestRetransmissions_.end()){
+ if (it == interestRetransmissions_.end()) {
if (lastSegNacked_ <= i) {
- //i must be larger than the last past nack received
+ // i must be larger than the last past nack received
interestRetransmissions_[i] = 0;
}
- }//if the retransmission is already there the rtx timer will
- //take care of it
+ } // if the retransmission is already there the rtx timer will
+ // take care of it
}
retransmit(true);
}
-void RTCTransportProtocol::retransmit(bool first_rtx){
+void RTCTransportProtocol::retransmit(bool first_rtx) {
auto it = interestRetransmissions_.begin();
- //cut len to max HICN_MAX_RTX_SIZE
- //since we use a map, the smaller (and so the older) sequence number are at
- //the beginnin of the map
- while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){
+ // cut len to max HICN_MAX_RTX_SIZE
+ // since we use a map, the smaller (and so the older) sequence number are at
+ // the beginnin of the map
+ while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) {
it = interestRetransmissions_.erase(it);
}
it = interestRetransmissions_.begin();
- while (it != interestRetransmissions_.end()){
+ while (it != interestRetransmissions_.end()) {
uint32_t pkt = it->first & modMask_;
- if(inflightInterests_[pkt].sequence != it->first){
- //this packet is not anymore in the inflight buffer, erase it
+ if (inflightInterests_[pkt].sequence != it->first) {
+ // this packet is not anymore in the inflight buffer, erase it
it = interestRetransmissions_.erase(it);
continue;
}
- //we retransmitted the packet too many times
- if(it->second >= HICN_MAX_RTX){
+ // we retransmitted the packet too many times
+ if (it->second >= HICN_MAX_RTX) {
it = interestRetransmissions_.erase(it);
continue;
}
- //this packet is too old
- if((lastReceived_ > it->first) &&
- (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){
+ // this packet is too old
+ if ((lastReceived_ > it->first) &&
+ (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) {
it = interestRetransmissions_.erase(it);
continue;
}
- if(first_rtx){
- //TODO (optimization)
- //the rtx that we never sent (it->second == 0) are all at the
- //end, so we can go directly there
- if(it->second == 0){
+ if (first_rtx) {
+ // TODO (optimization)
+ // the rtx that we never sent (it->second == 0) are all at the
+ // end, so we can go directly there
+ if (it->second == 0) {
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
++it;
- }else{
- //base on time
+ } else {
+ // base on time
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- if((now - inflightInterests_[pkt].transmissionTime) > 20){
- //XXX replace 20 with rtt
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if ((now - inflightInterests_[pkt].transmissionTime) > 20) {
+ // XXX replace 20 with rtt
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
@@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){
}
}
-void RTCTransportProtocol::checkRtx(){
+void RTCTransportProtocol::checkRtx() {
retransmit(false);
rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
rtx_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- checkRtx();
- });
+ if (ec) return;
+ checkRtx();
+ });
}
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
@@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
inflightInterestsCount_--;
}
- //check how many times we sent this packet
- auto it = interestRetransmissions_.find(segmentNumber);
- if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){
- inflightInterests_[pkt].state = lost_;
- }
+ // check how many times we sent this packet
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == sent_) {
- inflightInterests_[pkt].state = timeout1_;
- } else if (inflightInterests_[pkt].state == timeout1_) {
- inflightInterests_[pkt].state = timeout2_;
- } else if (inflightInterests_[pkt].state == timeout2_) {
- inflightInterests_[pkt].state = lost_;
- }
+ if (inflightInterests_[pkt].state == sent_) {
+ inflightInterests_[pkt].state = timeout1_;
+ } else if (inflightInterests_[pkt].state == timeout1_) {
+ inflightInterests_[pkt].state = timeout2_;
+ } else if (inflightInterests_[pkt].state == timeout2_) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == lost_) {
- interestRetransmissions_.erase(segmentNumber);
- }else{
- addRetransmissions(segmentNumber);
- }
+ if (inflightInterests_[pkt].state == lost_) {
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
+ addRetransmissions(segmentNumber);
+ }
scheduleNextInterests();
}
@@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
-
if (productionRate == 0) {
// the producer socket is not active
// in this case we consider only the first nack
if (nack_timer_used_) {
- return false;
+ return false;
}
nack_timer_used_ = true;
@@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject(
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
if (inflightInterests_[pkt].state == sent_) {
- inflightInterestsCount_--; //packet sent without timeouts
+ inflightInterestsCount_--; // packet sent without timeouts
}
if (inflightInterests_[pkt].state == sent_ &&
interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()){
+ interestRetransmissions_.end()) {
// we count only non retransmitted data in order to take into accunt only
// the transmition rate of the producer
receivedBytes_ += (uint32_t)(content_object->headerSize() +
@@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
addRetransmissions(lastReceived_ + 1, segmentNumber);
- //lastReceived_ is updated only for data packets received without RTX
+ // lastReceived_ is updated only for data packets received without RTX
lastReceived_ = segmentNumber;
}
@@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject(
increaseWindow();
}
- //in any case we remove the packet from the rtx list
+ // in any case we remove the packet from the rtx list
interestRetransmissions_.erase(segmentNumber);
if (schedule_next_interest) {
@@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject(
void RTCTransportProtocol::returnContentToApplication(
const ContentObject &content_object) {
// return content to the user
- auto a = content_object.getPayload();
+ auto read_buffer = content_object.getPayload();
- a->trimStart(HICN_TIMESTAMP_SIZE);
- uint8_t *start = a->writableData();
- unsigned size = (unsigned)a->length();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
// set offset between hICN and RTP packets
- uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));
+ uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1));
RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq;
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(APPLICATION_BUFFER, content_buffer);
- content_buffer->insert(content_buffer->end(), start, start + size);
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &read_callback);
+
+ if (read_callback == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before starting "
+ "the content retrieval.");
+ }
+
+ if (read_callback->isBufferMovable()) {
+ read_callback->readBufferAvailable(
+ utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer->length();
+
+ while (read_buffer->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer->length(), length);
+
+ std::memcpy(buffer, read_buffer->data(), to_copy);
+ read_buffer->trimStart(to_copy);
+ }
- ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if ((*on_payload) != VOID_HANDLER) {
- (*on_payload)(*socket_, size, std::make_error_code(std::errc(0)));
+ read_callback->readDataAvailable(total_length);
+ read_buffer->clear();
}
}