diff options
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.cc')
-rw-r--r-- | libtransport/src/protocols/transport_protocol.cc | 102 |
1 files changed, 93 insertions, 9 deletions
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc index 611c39212..d6954ac37 100644 --- a/libtransport/src/protocols/transport_protocol.cc +++ b/libtransport/src/protocols/transport_protocol.cc @@ -24,12 +24,11 @@ namespace protocol { using namespace interface; TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, - Reassembly *reassembly_protocol) + Indexer *indexer, Reassembly *reassembly) : socket_(icn_socket), - reassembly_protocol_(reassembly_protocol), - index_manager_( - std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), - is_running_(false), + indexer_verifier_(indexer), + reassembly_(reassembly), + fec_decoder_(nullptr), is_first_(false), on_interest_retransmission_(VOID_HANDLER), on_interest_output_(VOID_HANDLER), @@ -37,9 +36,17 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), - on_payload_(VOID_HANDLER) { + on_payload_(VOID_HANDLER), + fec_type_(fec::FECType::UNKNOWN), + is_running_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); + + // Set this transport protocol as portal's consumer callback + portal_->setConsumerCallback(this); + + indexer_verifier_->setReassembly(reassembly_.get()); + reassembly->setIndexer(indexer_verifier_.get()); } int TransportProtocol::start() { @@ -69,6 +76,7 @@ int TransportProtocol::start() { // Reset the protocol state machine reset(); + // Schedule next interests scheduleNextInterests(); @@ -99,15 +107,27 @@ void TransportProtocol::stop() { } void TransportProtocol::resume() { - if (is_running_) return; + if (isRunning()) return; is_running_ = true; scheduleNextInterests(); - portal_->runEventsLoop(); + if (!is_async_) { + // Start Event loop + portal_->runEventsLoop(); - is_running_ = false; + // Not running anymore + is_running_ = false; + } +} + +void TransportProtocol::reset() { + reassembly_->reInitialize(); + indexer_verifier_->reset(); + if (fec_decoder_) { + fec_decoder_->reset(); + } } void TransportProtocol::onContentReassembled(std::error_code ec) { @@ -127,6 +147,70 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } } +void TransportProtocol::sendInterest( + const Name &interest_name, + std::array<uint32_t, MAX_AGGREGATED_INTEREST> *additional_suffixes, + uint32_t len) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name; + + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + interest->setName(interest_name); + + for (uint32_t i = 0; i < len; i++) { + interest->appendSuffix(additional_suffixes->at(i)); + } + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + interest->setLifetime(uint32_t(lifetime)); + + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); +} + +void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) { + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Timeout on content " << n; + + onInterestTimeout(i, n); +} + +void TransportProtocol::onContentObject(Interest &i, ContentObject &c) { + // Check whether it makes sense to continue + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + // Call transport protocol function + std::error_code ec; + onContentObjectReceived(i, c, ec); + + // Call reassemble function, if packet is eligible for reassemblying + bool reassemble = false; + if (!ec) { + reassemble = true; + } + + // Perform verification and update indexer. This step may be performed offline + // - i.e. we may not get a result here (e.g. we use manifest). Verification + // failures in that case will be handled in the onPacketDropped function. + // XXX This step should be done before calling onContentObjectReceived, but + // for now we do it here since currently the indexer does not need manifests + // to move forward. + indexer_verifier_->onContentObject(i, c, reassemble); +} + } // end namespace protocol } // end namespace transport |