diff options
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.cc')
-rw-r--r-- | libtransport/src/protocols/transport_protocol.cc | 144 |
1 files changed, 75 insertions, 69 deletions
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc index d6954ac37..f1e49ec0b 100644 --- a/libtransport/src/protocols/transport_protocol.cc +++ b/libtransport/src/protocols/transport_protocol.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -25,7 +25,8 @@ using namespace interface; TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, Indexer *indexer, Reassembly *reassembly) - : socket_(icn_socket), + : Protocol(), + socket_(icn_socket), indexer_verifier_(indexer), reassembly_(reassembly), fec_decoder_(nullptr), @@ -36,90 +37,82 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), on_payload_(VOID_HANDLER), - fec_type_(fec::FECType::UNKNOWN), - is_running_(false) { + fec_type_(fec::FECType::UNKNOWN) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); - // Set this transport protocol as portal's consumer callback - portal_->setConsumerCallback(this); - indexer_verifier_->setReassembly(reassembly_.get()); reassembly->setIndexer(indexer_verifier_.get()); } +TransportProtocol::~TransportProtocol() {} + int TransportProtocol::start() { // If the protocol is already running, return otherwise set as running - if (is_running_) return -1; - - // Get all callbacks references - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &on_interest_retransmission_); - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &on_interest_output_); - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, - &on_interest_timeout_); - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, - &on_interest_satisfied_); - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &on_content_object_input_); - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_); - socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - &on_payload_); - - socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); - - // Set it is the first time we schedule an interest - is_first_ = true; - - // Reset the protocol state machine - reset(); - - // Schedule next interests - scheduleNextInterests(); - - is_first_ = false; - - // Set the protocol as running - is_running_ = true; - - if (!is_async_) { - // Start Event loop - portal_->runEventsLoop(); - - // Not running anymore - is_running_ = false; + if (isRunning()) { + return -1; } - return 0; -} - -void TransportProtocol::stop() { - is_running_ = false; + // Start protocol on its own thread + portal_->getThread().add([this]() { + // Get all callbacks references + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &on_interest_retransmission_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &on_interest_timeout_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &on_interest_satisfied_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &on_content_object_input_); + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_); + socket_->getSocketOption(ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + &on_fwd_strategy_); + socket_->getSocketOption(ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + &on_rec_strategy_); + socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + &on_payload_); + + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + + std::string fec_type_str = ""; + socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str); + if (fec_type_str != "") { + fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str()); + } + + // Set it is the first time we schedule an interest + is_first_ = true; + + // Reset the protocol state machine + reset(); + + // Set this transport protocol as portal's consumer callback + portal_->registerTransportCallback(this); + + // Schedule next interests + scheduleNextInterests(); + + is_first_ = false; + + // Set the protocol as running + setRunning(); + }); - if (!is_async_) { - portal_->stopEventsLoop(); - } else { - portal_->clear(); - } + return 0; } void TransportProtocol::resume() { if (isRunning()) return; - is_running_ = true; + setRunning(); - scheduleNextInterests(); - - if (!is_async_) { - // Start Event loop - portal_->runEventsLoop(); - - // Not running anymore - is_running_ = false; - } + portal_->getThread().tryRunHandlerNow([this]() { scheduleNextInterests(); }); } void TransportProtocol::reset() { @@ -130,7 +123,7 @@ void TransportProtocol::reset() { } } -void TransportProtocol::onContentReassembled(std::error_code ec) { +void TransportProtocol::onContentReassembled(const std::error_code &ec) { stop(); if (!on_payload_) { @@ -153,7 +146,12 @@ void TransportProtocol::sendInterest( uint32_t len) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name; - auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + + auto interest = + core::PacketManager<>::getInstance().getPacket<Interest>(format); interest->setName(interest_name); for (uint32_t i = 0; i < len; i++) { @@ -176,6 +174,14 @@ void TransportProtocol::sendInterest( portal_->sendInterest(std::move(interest)); } +void TransportProtocol::onError(const std::error_code &ec) { + // error from portal: stop socket + stop(); + + // signal error to application + on_payload_->readError(ec); +} + void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) { if (TRANSPORT_EXPECT_FALSE(!isRunning())) { return; |