aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/transport_protocol.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.cc')
-rw-r--r--libtransport/src/protocols/transport_protocol.cc157
1 files changed, 87 insertions, 70 deletions
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
index d6954ac37..29d140454 100644
--- a/libtransport/src/protocols/transport_protocol.cc
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -25,7 +25,8 @@ using namespace interface;
TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
Indexer *indexer, Reassembly *reassembly)
- : socket_(icn_socket),
+ : Protocol(),
+ socket_(icn_socket),
indexer_verifier_(indexer),
reassembly_(reassembly),
fec_decoder_(nullptr),
@@ -36,90 +37,77 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
+ on_fwd_strategy_(VOID_HANDLER),
+ on_rec_strategy_(VOID_HANDLER),
on_payload_(VOID_HANDLER),
- fec_type_(fec::FECType::UNKNOWN),
- is_running_(false) {
+ fec_type_(fec::FECType::UNKNOWN) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
- // Set this transport protocol as portal's consumer callback
- portal_->setConsumerCallback(this);
-
indexer_verifier_->setReassembly(reassembly_.get());
reassembly->setIndexer(indexer_verifier_.get());
}
+TransportProtocol::~TransportProtocol() {}
+
int TransportProtocol::start() {
// If the protocol is already running, return otherwise set as running
- if (is_running_) return -1;
-
- // Get all callbacks references
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &on_interest_retransmission_);
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output_);
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &on_interest_timeout_);
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &on_interest_satisfied_);
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &on_content_object_input_);
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_);
- socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- &on_payload_);
-
- socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
-
- // Set it is the first time we schedule an interest
- is_first_ = true;
-
- // Reset the protocol state machine
- reset();
-
- // Schedule next interests
- scheduleNextInterests();
-
- is_first_ = false;
-
- // Set the protocol as running
- is_running_ = true;
-
- if (!is_async_) {
- // Start Event loop
- portal_->runEventsLoop();
-
- // Not running anymore
- is_running_ = false;
+ if (isRunning()) {
+ return -1;
}
- return 0;
-}
-
-void TransportProtocol::stop() {
- is_running_ = false;
+ // Start protocol on its own thread
+ portal_->getThread().add([this]() {
+ // Get all callbacks references
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ &on_fwd_strategy_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ &on_rec_strategy_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
+ socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_);
+
+ // Set it is the first time we schedule an interest
+ is_first_ = true;
+
+ // Reset the protocol state machine
+ reset();
+
+ // Set this transport protocol as portal's consumer callback
+ portal_->registerTransportCallback(this);
+
+ // Schedule next interests
+ scheduleNextInterests();
+
+ is_first_ = false;
+
+ // Set the protocol as running
+ setRunning();
+ });
- if (!is_async_) {
- portal_->stopEventsLoop();
- } else {
- portal_->clear();
- }
+ return 0;
}
void TransportProtocol::resume() {
if (isRunning()) return;
- is_running_ = true;
-
- scheduleNextInterests();
-
- if (!is_async_) {
- // Start Event loop
- portal_->runEventsLoop();
+ setRunning();
- // Not running anymore
- is_running_ = false;
- }
+ portal_->getThread().tryRunHandlerNow([this]() { scheduleNextInterests(); });
}
void TransportProtocol::reset() {
@@ -130,7 +118,7 @@ void TransportProtocol::reset() {
}
}
-void TransportProtocol::onContentReassembled(std::error_code ec) {
+void TransportProtocol::onContentReassembled(const std::error_code &ec) {
stop();
if (!on_payload_) {
@@ -153,7 +141,19 @@ void TransportProtocol::sendInterest(
uint32_t len) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name;
- auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ Packet::Format format;
+ socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
+ format);
+ size_t signature_size = 0;
+
+ // If aggregated interest, add spapce for signature
+ if (len > 0) {
+ format = Packet::toAHFormat(format);
+ signature_size = signer_->getSignatureFieldSize();
+ }
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(
+ format, signature_size);
interest->setName(interest_name);
for (uint32_t i = 0; i < len; i++) {
@@ -173,7 +173,24 @@ void TransportProtocol::sendInterest(
return;
}
- portal_->sendInterest(std::move(interest));
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) lifetime = ceil((double)lifetime * 0.9);
+
+ // Compute signature
+ bool is_ah = HICN_PACKET_FORMAT_IS_AH(interest->getFormat());
+ if (is_ah) signer_->signPacket(interest.get());
+
+ portal_->sendInterest(interest, lifetime);
+}
+
+void TransportProtocol::onError(const std::error_code &ec) {
+ // error from portal: stop socket
+ stop();
+
+ // signal error to application
+ on_payload_->readError(ec);
}
void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) {