aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc204
1 files changed, 113 insertions, 91 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 9402d3b02..4205ade4e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats(
uint32_t segmentNumber = content_object.getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- if (inflightInterests_[pkt].state != sent_)
- return;
+ if (inflightInterests_[pkt].state != sent_) return;
- if(interestRetransmissions_.find(segmentNumber) !=
+ if (interestRetransmissions_.find(segmentNumber) !=
interestRetransmissions_.end())
- //this packet was rtx at least once
+ // this packet was rtx at least once
return;
uint32_t pathLabel = content_object.getPathLabel();
@@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() {
} else {
currentCWin_ = min(
maxCWin_,
- (uint32_t)ceil(currentCWin_ +
- (1.0 / (double)currentCWin_))); // linear
+ (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
}
}
@@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
if (!rtx) {
inflightInterestsCount_++;
}
-
}
void RTCTransportProtocol::scheduleNextInterests() {
@@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
while (inflightInterestsCount_ < currentCWin_) {
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
- //we send the packet only if it is not pending yet
+ // we send the packet only if it is not pending yet
interest_name->setSuffix(actualSegment_);
if (portal_->interestIsPending(*interest_name)) {
actualSegment_++;
@@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
uint32_t pkt = actualSegment_ & modMask_;
- //if we already reacevied the content we don't ask it again
- if(inflightInterests_[pkt].state == received_ &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
- continue;
+ // if we already reacevied the content we don't ask it again
+ if (inflightInterests_[pkt].state == received_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_++;
+ continue;
}
inflightInterests_[pkt].transmissionTime =
@@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
#endif
}
-void RTCTransportProtocol::addRetransmissions(uint32_t val){
- //add only val in the rtx list
+void RTCTransportProtocol::addRetransmissions(uint32_t val) {
+ // add only val in the rtx list
addRetransmissions(val, val + 1);
}
-void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){
- for(uint32_t i = start; i < stop; i++){
+void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
- if(it == interestRetransmissions_.end()){
+ if (it == interestRetransmissions_.end()) {
if (lastSegNacked_ <= i) {
- //i must be larger than the last past nack received
+ // i must be larger than the last past nack received
interestRetransmissions_[i] = 0;
}
- }//if the retransmission is already there the rtx timer will
- //take care of it
+ } // if the retransmission is already there the rtx timer will
+ // take care of it
}
retransmit(true);
}
-void RTCTransportProtocol::retransmit(bool first_rtx){
+void RTCTransportProtocol::retransmit(bool first_rtx) {
auto it = interestRetransmissions_.begin();
- //cut len to max HICN_MAX_RTX_SIZE
- //since we use a map, the smaller (and so the older) sequence number are at
- //the beginnin of the map
- while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){
+ // cut len to max HICN_MAX_RTX_SIZE
+ // since we use a map, the smaller (and so the older) sequence number are at
+ // the beginnin of the map
+ while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) {
it = interestRetransmissions_.erase(it);
}
it = interestRetransmissions_.begin();
- while (it != interestRetransmissions_.end()){
+ while (it != interestRetransmissions_.end()) {
uint32_t pkt = it->first & modMask_;
- if(inflightInterests_[pkt].sequence != it->first){
- //this packet is not anymore in the inflight buffer, erase it
+ if (inflightInterests_[pkt].sequence != it->first) {
+ // this packet is not anymore in the inflight buffer, erase it
it = interestRetransmissions_.erase(it);
continue;
}
- //we retransmitted the packet too many times
- if(it->second >= HICN_MAX_RTX){
+ // we retransmitted the packet too many times
+ if (it->second >= HICN_MAX_RTX) {
it = interestRetransmissions_.erase(it);
continue;
}
- //this packet is too old
- if((lastReceived_ > it->first) &&
- (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){
+ // this packet is too old
+ if ((lastReceived_ > it->first) &&
+ (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) {
it = interestRetransmissions_.erase(it);
continue;
}
- if(first_rtx){
- //TODO (optimization)
- //the rtx that we never sent (it->second == 0) are all at the
- //end, so we can go directly there
- if(it->second == 0){
+ if (first_rtx) {
+ // TODO (optimization)
+ // the rtx that we never sent (it->second == 0) are all at the
+ // end, so we can go directly there
+ if (it->second == 0) {
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
++it;
- }else{
- //base on time
+ } else {
+ // base on time
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- if((now - inflightInterests_[pkt].transmissionTime) > 20){
- //XXX replace 20 with rtt
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if ((now - inflightInterests_[pkt].transmissionTime) > 20) {
+ // XXX replace 20 with rtt
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
@@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){
}
}
-void RTCTransportProtocol::checkRtx(){
+void RTCTransportProtocol::checkRtx() {
retransmit(false);
rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
rtx_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- checkRtx();
- });
+ if (ec) return;
+ checkRtx();
+ });
}
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
@@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
inflightInterestsCount_--;
}
- //check how many times we sent this packet
- auto it = interestRetransmissions_.find(segmentNumber);
- if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){
- inflightInterests_[pkt].state = lost_;
- }
+ // check how many times we sent this packet
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == sent_) {
- inflightInterests_[pkt].state = timeout1_;
- } else if (inflightInterests_[pkt].state == timeout1_) {
- inflightInterests_[pkt].state = timeout2_;
- } else if (inflightInterests_[pkt].state == timeout2_) {
- inflightInterests_[pkt].state = lost_;
- }
+ if (inflightInterests_[pkt].state == sent_) {
+ inflightInterests_[pkt].state = timeout1_;
+ } else if (inflightInterests_[pkt].state == timeout1_) {
+ inflightInterests_[pkt].state = timeout2_;
+ } else if (inflightInterests_[pkt].state == timeout2_) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == lost_) {
- interestRetransmissions_.erase(segmentNumber);
- }else{
- addRetransmissions(segmentNumber);
- }
+ if (inflightInterests_[pkt].state == lost_) {
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
+ addRetransmissions(segmentNumber);
+ }
scheduleNextInterests();
}
@@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
-
if (productionRate == 0) {
// the producer socket is not active
// in this case we consider only the first nack
if (nack_timer_used_) {
- return false;
+ return false;
}
nack_timer_used_ = true;
@@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject(
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
if (inflightInterests_[pkt].state == sent_) {
- inflightInterestsCount_--; //packet sent without timeouts
+ inflightInterestsCount_--; // packet sent without timeouts
}
if (inflightInterests_[pkt].state == sent_ &&
interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()){
+ interestRetransmissions_.end()) {
// we count only non retransmitted data in order to take into accunt only
// the transmition rate of the producer
receivedBytes_ += (uint32_t)(content_object->headerSize() +
@@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
addRetransmissions(lastReceived_ + 1, segmentNumber);
- //lastReceived_ is updated only for data packets received without RTX
+ // lastReceived_ is updated only for data packets received without RTX
lastReceived_ = segmentNumber;
}
@@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject(
increaseWindow();
}
- //in any case we remove the packet from the rtx list
+ // in any case we remove the packet from the rtx list
interestRetransmissions_.erase(segmentNumber);
if (schedule_next_interest) {
@@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject(
void RTCTransportProtocol::returnContentToApplication(
const ContentObject &content_object) {
// return content to the user
- auto a = content_object.getPayload();
+ auto read_buffer = content_object.getPayload();
- a->trimStart(HICN_TIMESTAMP_SIZE);
- uint8_t *start = a->writableData();
- unsigned size = (unsigned)a->length();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
// set offset between hICN and RTP packets
- uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));
+ uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1));
RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq;
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(APPLICATION_BUFFER, content_buffer);
- content_buffer->insert(content_buffer->end(), start, start + size);
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &read_callback);
+
+ if (read_callback == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before starting "
+ "the content retrieval.");
+ }
+
+ if (read_callback->isBufferMovable()) {
+ read_callback->readBufferAvailable(
+ utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer->length();
+
+ while (read_buffer->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer->length(), length);
+
+ std::memcpy(buffer, read_buffer->data(), to_copy);
+ read_buffer->trimStart(to_copy);
+ }
- ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if ((*on_payload) != VOID_HANDLER) {
- (*on_payload)(*socket_, size, std::make_error_code(std::errc(0)));
+ read_callback->readDataAvailable(total_length);
+ read_buffer->clear();
}
}