diff options
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc')
-rw-r--r-- | libtransport/src/hicn/transport/protocols/raaqm.cc | 62 |
1 files changed, 37 insertions, 25 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index b8a7c9610..7f0310e7c 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) } RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (this->rate_estimator_) { - delete this->rate_estimator_; + if (rate_estimator_) { + delete rate_estimator_; } } int RaaqmTransportProtocol::start() { - if (this->rate_estimator_) { - this->rate_estimator_->onStart(); + if (rate_estimator_) { + rate_estimator_->onStart(); } if (!cur_path_) { @@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - this->rate_estimator_ = new ALaTcpEstimator(); + rate_estimator_ = new ALaTcpEstimator(); } else { - this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + rate_estimator_ = new SimpleEstimator(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, - &this->rate_estimator_->observer_); + &rate_estimator_->observer_); // Current path auto cur_path = std::make_unique<RaaqmDataPath>( @@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowIncrease(current_window_size_); + rate_estimator_->onWindowIncrease(current_window_size_); } void RaaqmTransportProtocol::decreaseWindow() { @@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowDecrease(current_window_size_); + rate_estimator_->onWindowDecrease(current_window_size_); } void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { @@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception( updatePathTable(content_object); increaseWindow(); updateRtt(interest.getName().getSuffix()); - this->rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); // Set drop probablility and window size accordingly RAAQM(); } @@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment( reassemble(std::move(content_object)); } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == index_manager_->getFinalSuffix())) { - onContentReassembled(std::make_error_code(std::errc(0))); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload != nullptr) { + on_payload->readSuccess(stats_.getBytesRecv()); + } } } else { // TODO Application policy check @@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { return; } - // This is set to ~0 so that the next interest_retransmissions_ + 1, performed - // by sendInterest, will result in 0 + // This is set to ~0 so that the next interest_retransmissions_ + 1, + // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if (*on_payload != VOID_HANDLER) { - std::shared_ptr<std::vector<uint8_t>> content_buffer; - socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - (*on_payload)(*socket_, content_buffer->size(), ec); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_.getBytesRecv()); + } else { + on_payload->readError(ec); } - this->rate_estimator_->onDownloadFinished(); + rate_estimator_->onDownloadFinished(); stop(); } @@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { // Update stats updateStats((uint32_t)segment, rtt.count(), now); - if (this->rate_estimator_) { - this->rate_estimator_->onRttUpdate((double)rtt.count()); + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); } cur_path_->insertNewRtt(rtt.count()); @@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() { } // end namespace protocol -} // end namespace transport +} // namespace transport |