diff options
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.cc')
-rw-r--r-- | libtransport/src/protocols/transport_protocol.cc | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc new file mode 100644 index 000000000..29d140454 --- /dev/null +++ b/libtransport/src/protocols/transport_protocol.cc @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> +#include <protocols/transport_protocol.h> + +namespace transport { + +namespace protocol { + +using namespace interface; + +TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, + Indexer *indexer, Reassembly *reassembly) + : Protocol(), + socket_(icn_socket), + indexer_verifier_(indexer), + reassembly_(reassembly), + fec_decoder_(nullptr), + is_first_(false), + on_interest_retransmission_(VOID_HANDLER), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), + on_payload_(VOID_HANDLER), + fec_type_(fec::FECType::UNKNOWN) { + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); + + indexer_verifier_->setReassembly(reassembly_.get()); + reassembly->setIndexer(indexer_verifier_.get()); +} + +TransportProtocol::~TransportProtocol() {} + +int TransportProtocol::start() { + // If the protocol is already running, return otherwise set as running + if (isRunning()) { + return -1; + } + + // Start protocol on its own thread + portal_->getThread().add([this]() { + // Get all callbacks references + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &on_interest_retransmission_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &on_interest_timeout_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &on_interest_satisfied_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &on_content_object_input_); + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_); + socket_->getSocketOption(ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE, + &on_fwd_strategy_); + socket_->getSocketOption(ConsumerCallbacksOptions::REC_STRATEGY_CHANGE, + &on_rec_strategy_); + socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + &on_payload_); + + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_); + + // Set it is the first time we schedule an interest + is_first_ = true; + + // Reset the protocol state machine + reset(); + + // Set this transport protocol as portal's consumer callback + portal_->registerTransportCallback(this); + + // Schedule next interests + scheduleNextInterests(); + + is_first_ = false; + + // Set the protocol as running + setRunning(); + }); + + return 0; +} + +void TransportProtocol::resume() { + if (isRunning()) return; + + setRunning(); + + portal_->getThread().tryRunHandlerNow([this]() { scheduleNextInterests(); }); +} + +void TransportProtocol::reset() { + reassembly_->reInitialize(); + indexer_verifier_->reset(); + if (fec_decoder_) { + fec_decoder_->reset(); + } +} + +void TransportProtocol::onContentReassembled(const std::error_code &ec) { + stop(); + + if (!on_payload_) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload_->readSuccess(stats_->getBytesRecv()); + } else { + on_payload_->readError(ec); + } +} + +void TransportProtocol::sendInterest( + const Name &interest_name, + std::array<uint32_t, MAX_AGGREGATED_INTEREST> *additional_suffixes, + uint32_t len) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name; + + Packet::Format format; + socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT, + format); + size_t signature_size = 0; + + // If aggregated interest, add spapce for signature + if (len > 0) { + format = Packet::toAHFormat(format); + signature_size = signer_->getSignatureFieldSize(); + } + + auto interest = core::PacketManager<>::getInstance().getPacket<Interest>( + format, signature_size); + interest->setName(interest_name); + + for (uint32_t i = 0; i < len; i++) { + interest->appendSuffix(additional_suffixes->at(i)); + } + + uint32_t lifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + interest->setLifetime(uint32_t(lifetime)); + + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) { + return; + } + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) lifetime = ceil((double)lifetime * 0.9); + + // Compute signature + bool is_ah = HICN_PACKET_FORMAT_IS_AH(interest->getFormat()); + if (is_ah) signer_->signPacket(interest.get()); + + portal_->sendInterest(interest, lifetime); +} + +void TransportProtocol::onError(const std::error_code &ec) { + // error from portal: stop socket + stop(); + + // signal error to application + on_payload_->readError(ec); +} + +void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) { + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Timeout on content " << n; + + onInterestTimeout(i, n); +} + +void TransportProtocol::onContentObject(Interest &i, ContentObject &c) { + // Check whether it makes sense to continue + if (TRANSPORT_EXPECT_FALSE(!isRunning())) { + return; + } + + // Call transport protocol function + std::error_code ec; + onContentObjectReceived(i, c, ec); + + // Call reassemble function, if packet is eligible for reassemblying + bool reassemble = false; + if (!ec) { + reassemble = true; + } + + // Perform verification and update indexer. This step may be performed offline + // - i.e. we may not get a result here (e.g. we use manifest). Verification + // failures in that case will be handled in the onPacketDropped function. + // XXX This step should be done before calling onContentObjectReceived, but + // for now we do it here since currently the indexer does not need manifests + // to move forward. + indexer_verifier_->onContentObject(i, c, reassemble); +} + +} // end namespace protocol + +} // end namespace transport |