aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc62
1 files changed, 37 insertions, 25 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index b8a7c9610..7f0310e7c 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
}
RaaqmTransportProtocol::~RaaqmTransportProtocol() {
- if (this->rate_estimator_) {
- delete this->rate_estimator_;
+ if (rate_estimator_) {
+ delete rate_estimator_;
}
}
int RaaqmTransportProtocol::start() {
- if (this->rate_estimator_) {
- this->rate_estimator_->onStart();
+ if (rate_estimator_) {
+ rate_estimator_->onStart();
}
if (!cur_path_) {
@@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() {
choice_param);
if (choice_param == 1) {
- this->rate_estimator_ = new ALaTcpEstimator();
+ rate_estimator_ = new ALaTcpEstimator();
} else {
- this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ rate_estimator_ = new SimpleEstimator(alpha, batching_param);
}
socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
- &this->rate_estimator_->observer_);
+ &rate_estimator_->observer_);
// Current path
auto cur_path = std::make_unique<RaaqmDataPath>(
@@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowIncrease(current_window_size_);
+ rate_estimator_->onWindowIncrease(current_window_size_);
}
void RaaqmTransportProtocol::decreaseWindow() {
@@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowDecrease(current_window_size_);
+ rate_estimator_->onWindowDecrease(current_window_size_);
}
void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
@@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception(
updatePathTable(content_object);
increaseWindow();
updateRtt(interest.getName().getSuffix());
- this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
- (int)content_object.headerSize());
+ rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ (int)content_object.headerSize());
// Set drop probablility and window size accordingly
RAAQM();
}
@@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment(
reassemble(std::move(content_object));
} else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
index_manager_->getFinalSuffix())) {
- onContentReassembled(std::make_error_code(std::errc(0)));
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload != nullptr) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ }
}
} else {
// TODO Application policy check
@@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
return;
}
- // This is set to ~0 so that the next interest_retransmissions_ + 1, performed
- // by sendInterest, will result in 0
+ // This is set to ~0 so that the next interest_retransmissions_ + 1,
+ // performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
@@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if (*on_payload != VOID_HANDLER) {
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(
- interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
- (*on_payload)(*socket_, content_buffer->size(), ec);
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ } else {
+ on_payload->readError(ec);
}
- this->rate_estimator_->onDownloadFinished();
+ rate_estimator_->onDownloadFinished();
stop();
}
@@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
// Update stats
updateStats((uint32_t)segment, rtt.count(), now);
- if (this->rate_estimator_) {
- this->rate_estimator_->onRttUpdate((double)rtt.count());
+ if (rate_estimator_) {
+ rate_estimator_->onRttUpdate((double)rtt.count());
}
cur_path_->insertNewRtt(rtt.count());
@@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() {
} // end namespace protocol
-} // end namespace transport
+} // namespace transport