aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/raaqm.cc')
-rw-r--r--libtransport/src/protocols/raaqm.cc82
1 files changed, 41 insertions, 41 deletions
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 0a93dec44..bc8500227 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -13,8 +13,8 @@
* limitations under the License.
*/
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/interfaces/socket_consumer.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
@@ -36,7 +36,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(
interests_in_flight_(0),
cur_path_(nullptr),
t0_(utils::SteadyClock::now()),
- rate_estimator_(nullptr) {
+ rate_estimator_(nullptr),
+ schedule_interests_(true) {
init();
}
@@ -116,10 +117,14 @@ void RaaqmTransportProtocol::reset() {
// Reset protocol variables
interests_in_flight_ = 0;
t0_ = utils::SteadyClock::now();
-}
-bool RaaqmTransportProtocol::verifyKeyPackets() {
- return index_manager_->onKeyToVerify();
+ // Optionally reset congestion window
+ bool reset_window;
+ socket_->getSocketOption(RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET,
+ reset_window);
+ if (reset_window) {
+ current_window_size_ = 1;
+ }
}
void RaaqmTransportProtocol::increaseWindow() {
@@ -317,8 +322,8 @@ void RaaqmTransportProtocol::init() {
is.close();
}
-void RaaqmTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -326,54 +331,53 @@ void RaaqmTransportProtocol::onContentObject(
// Call application-defined callbacks
if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), *content_object);
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
}
if (*on_interest_satisfied_) {
- (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
+ (*on_interest_satisfied_)(*socket_->getInterface(), interest);
}
- if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- stats_->updateBytesRecv(content_object->payloadSize());
+ if (content_object.getPayloadType() == PayloadType::DATA) {
+ stats_->updateBytesRecv(content_object.payloadSize());
}
- onContentSegment(std::move(interest), std::move(content_object));
+ onContentSegment(interest, content_object);
scheduleNextInterests();
}
-void RaaqmTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
+void RaaqmTransportProtocol::onContentSegment(Interest &interest,
+ ContentObject &content_object) {
+ uint32_t incremental_suffix = content_object.getName().getSuffix();
// Decrease in-flight interests
interests_in_flight_--;
// Update stats
if (!interest_retransmissions_[incremental_suffix & mask]) {
- afterContentReception(*interest, *content_object);
+ afterContentReception(interest, content_object);
}
- index_manager_->onContentObject(std::move(interest),
- std::move(content_object));
+ index_manager_->onContentObject(interest, content_object);
}
-void RaaqmTransportProtocol::onPacketDropped(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onPacketDropped(Interest &interest,
+ ContentObject &content_object) {
uint32_t max_rtx = 0;
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- uint64_t segment = interest->getName().getSuffix();
+ uint64_t segment = interest.getName().getSuffix();
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
if (*on_interest_retransmission_) {
- (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
+ (*on_interest_retransmission_)(*socket_->getInterface(), interest);
}
if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
+ (*on_interest_output_)(*socket_->getInterface(), interest);
}
if (!is_running_) {
@@ -381,7 +385,7 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(std::move(interest));
+ interest_to_retransmit_.push(interest.shared_from_this());
} else {
TRANSPORT_LOGE(
"Stop: received not trusted packet %llu times",
@@ -432,10 +436,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
(*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
- }
-
if (!is_running_) {
return;
}
@@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::scheduleNextInterests() {
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ bool cancel = (!is_running_ && !is_first_) || !schedule_interests_;
+ if (TRANSPORT_EXPECT_FALSE(cancel)) {
+ schedule_interests_ = true;
return;
}
@@ -460,7 +462,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
// send at least one interest if there are retransmissions to perform and
// there is no space left in the window
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Window full, retransmit one content interest");
interest_to_retransmit_.pop();
}
@@ -470,22 +471,25 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
while (interests_in_flight_ < current_window_size_) {
if (interest_to_retransmit_.size() > 0) {
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Retransmit content interest");
interest_to_retransmit_.pop();
} else {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ TRANSPORT_LOGI("Adios");
+ break;
+ }
+
index = index_manager_->getNextSuffix();
if (index == IndexManager::invalid_index) {
break;
}
sendInterest(index);
- TRANSPORT_LOGD("Send content interest %u", index);
}
}
}
-bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getPacket();
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
name->setSuffix((uint32_t)next_suffix);
@@ -499,30 +503,26 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
if (*on_interest_output_) {
on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return false;
- }
-
// This is set to ~0 so that the next interest_retransmissions_ + 1,
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
- sendInterest(std::move(interest));
- return true;
+ sendInterest(std::move(interest));
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
interests_in_flight_++;
interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+ TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str());
portal_->sendInterest(std::move(interest));
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
rate_estimator_->onDownloadFinished();
TransportProtocol::onContentReassembled(ec);
+ schedule_interests_ = false;
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {