From 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 30 Jun 2021 07:57:22 +0000 Subject: [HICN-713] Transport Library Major Refactoring 2 Co-authored-by: Luca Muscariello Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Giulio Grassi Signed-off-by: Mauro Sardara Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2 --- libtransport/src/protocols/transport_protocol.cc | 102 +++++++++++++++++++++-- 1 file changed, 93 insertions(+), 9 deletions(-) (limited to 'libtransport/src/protocols/transport_protocol.cc') diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc index 611c39212..d6954ac37 100644 --- a/libtransport/src/protocols/transport_protocol.cc +++ b/libtransport/src/protocols/transport_protocol.cc @@ -24,12 +24,11 @@ namespace protocol { using namespace interface; TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, - Reassembly *reassembly_protocol) + Indexer *indexer, Reassembly *reassembly) : socket_(icn_socket), - reassembly_protocol_(reassembly_protocol), - index_manager_( - std::make_unique(socket_, this, reassembly_protocol)), - is_running_(false), + indexer_verifier_(indexer), + reassembly_(reassembly), + fec_decoder_(nullptr), is_first_(false), on_interest_retransmission_(VOID_HANDLER), on_interest_output_(VOID_HANDLER), @@ -37,9 +36,17 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), - on_payload_(VOID_HANDLER) { + on_payload_(VOID_HANDLER), + fec_type_(fec::FECType::UNKNOWN), + is_running_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); + + // Set this transport protocol as portal's consumer callback + portal_->setConsumerCallback(this); + + indexer_verifier_->setReassembly(reassembly_.get()); + reassembly->setIndexer(indexer_verifier_.get()); } int TransportProtocol::start() { @@ -69,6 +76,7 @@ int TransportProtocol::start() { // Reset the protocol state machine reset(); + // Schedule next interests scheduleNextInterests(); @@ -99,15 +107,27 @@ void TransportProtocol::stop() { } void TransportProtocol::resume() { - if (is_running_) return; + if (isRunning()) return; is_running_ = true; scheduleNextInterests(); - portal_->runEventsLoop(); + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); - is_running_ = false; + // Not running anymore + is_running_ = false; + } +} + +void TransportProtocol::reset() { + reassembly_->reInitialize(); + indexer_verifier_->reset(); + if (fec_decoder_) { + fec_decoder_->reset(); + } } void TransportProtocol::onContentReassembled(std::error_code ec) { @@ -127,6 +147,70 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } } +void TransportProtocol::sendInterest( + const Name &interest_name, + std::array *additional_suffixes, + uint32_t len) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name; + + auto interest = core::PacketManager<>::getInstance().getPacket(); + interest->setName(interest_name); + + for (uint32_t i = 0; i < len; i++) { + interest->appendSuffix(additional_suffixes->at(i)); + } + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + interest->setLifetime(uint32_t(lifetime)); + + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); +} + +void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) { + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Timeout on content " << n; + + onInterestTimeout(i, n); +} + +void TransportProtocol::onContentObject(Interest &i, ContentObject &c) { + // Check whether it makes sense to continue + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + // Call transport protocol function + std::error_code ec; + onContentObjectReceived(i, c, ec); + + // Call reassemble function, if packet is eligible for reassemblying + bool reassemble = false; + if (!ec) { + reassemble = true; + } + + // Perform verification and update indexer. This step may be performed offline + // - i.e. we may not get a result here (e.g. we use manifest). Verification + // failures in that case will be handled in the onPacketDropped function. + // XXX This step should be done before calling onContentObjectReceived, but + // for now we do it here since currently the indexer does not need manifests + // to move forward. + indexer_verifier_->onContentObject(i, c, reassemble); +} + } // end namespace protocol } // end namespace transport -- cgit 1.2.3-korg