aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/raaqm.cc')
-rw-r--r--libtransport/src/protocols/raaqm.cc58
1 files changed, 26 insertions, 32 deletions
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 5023adf2e..bc8500227 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
@@ -126,10 +127,6 @@ void RaaqmTransportProtocol::reset() {
}
}
-bool RaaqmTransportProtocol::verifyKeyPackets() {
- return index_manager_->onKeyToVerify();
-}
-
void RaaqmTransportProtocol::increaseWindow() {
// return;
double max_window_size = 0.;
@@ -325,8 +322,8 @@ void RaaqmTransportProtocol::init() {
is.close();
}
-void RaaqmTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -334,54 +331,53 @@ void RaaqmTransportProtocol::onContentObject(
// Call application-defined callbacks
if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), *content_object);
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
}
if (*on_interest_satisfied_) {
- (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
+ (*on_interest_satisfied_)(*socket_->getInterface(), interest);
}
- if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- stats_->updateBytesRecv(content_object->payloadSize());
+ if (content_object.getPayloadType() == PayloadType::DATA) {
+ stats_->updateBytesRecv(content_object.payloadSize());
}
- onContentSegment(std::move(interest), std::move(content_object));
+ onContentSegment(interest, content_object);
scheduleNextInterests();
}
-void RaaqmTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
+void RaaqmTransportProtocol::onContentSegment(Interest &interest,
+ ContentObject &content_object) {
+ uint32_t incremental_suffix = content_object.getName().getSuffix();
// Decrease in-flight interests
interests_in_flight_--;
// Update stats
if (!interest_retransmissions_[incremental_suffix & mask]) {
- afterContentReception(*interest, *content_object);
+ afterContentReception(interest, content_object);
}
- index_manager_->onContentObject(std::move(interest),
- std::move(content_object));
+ index_manager_->onContentObject(interest, content_object);
}
-void RaaqmTransportProtocol::onPacketDropped(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onPacketDropped(Interest &interest,
+ ContentObject &content_object) {
uint32_t max_rtx = 0;
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- uint64_t segment = interest->getName().getSuffix();
+ uint64_t segment = interest.getName().getSuffix();
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
if (*on_interest_retransmission_) {
- (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
+ (*on_interest_retransmission_)(*socket_->getInterface(), interest);
}
if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
+ (*on_interest_output_)(*socket_->getInterface(), interest);
}
if (!is_running_) {
@@ -389,7 +385,7 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(std::move(interest));
+ interest_to_retransmit_.push(interest.shared_from_this());
} else {
TRANSPORT_LOGE(
"Stop: received not trusted packet %llu times",
@@ -477,6 +473,11 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
sendInterest(std::move(interest_to_retransmit_.front()));
interest_to_retransmit_.pop();
} else {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ TRANSPORT_LOGI("Adios");
+ break;
+ }
+
index = index_manager_->getNextSuffix();
if (index == IndexManager::invalid_index) {
break;
@@ -487,8 +488,8 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getPacket();
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
name->setSuffix((uint32_t)next_suffix);
@@ -502,19 +503,12 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
if (*on_interest_output_) {
on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return false;
- }
-
// This is set to ~0 so that the next interest_retransmissions_ + 1,
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
-
- return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {