aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/transport_protocol.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/transport_protocol.cc')
-rw-r--r--libtransport/src/protocols/transport_protocol.cc102
1 files changed, 93 insertions, 9 deletions
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
index 611c39212..d6954ac37 100644
--- a/libtransport/src/protocols/transport_protocol.cc
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -24,12 +24,11 @@ namespace protocol {
using namespace interface;
TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
- Reassembly *reassembly_protocol)
+ Indexer *indexer, Reassembly *reassembly)
: socket_(icn_socket),
- reassembly_protocol_(reassembly_protocol),
- index_manager_(
- std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
- is_running_(false),
+ indexer_verifier_(indexer),
+ reassembly_(reassembly),
+ fec_decoder_(nullptr),
is_first_(false),
on_interest_retransmission_(VOID_HANDLER),
on_interest_output_(VOID_HANDLER),
@@ -37,9 +36,17 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
- on_payload_(VOID_HANDLER) {
+ on_payload_(VOID_HANDLER),
+ fec_type_(fec::FECType::UNKNOWN),
+ is_running_(false) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
+
+ // Set this transport protocol as portal's consumer callback
+ portal_->setConsumerCallback(this);
+
+ indexer_verifier_->setReassembly(reassembly_.get());
+ reassembly->setIndexer(indexer_verifier_.get());
}
int TransportProtocol::start() {
@@ -69,6 +76,7 @@ int TransportProtocol::start() {
// Reset the protocol state machine
reset();
+
// Schedule next interests
scheduleNextInterests();
@@ -99,15 +107,27 @@ void TransportProtocol::stop() {
}
void TransportProtocol::resume() {
- if (is_running_) return;
+ if (isRunning()) return;
is_running_ = true;
scheduleNextInterests();
- portal_->runEventsLoop();
+ if (!is_async_) {
+ // Start Event loop
+ portal_->runEventsLoop();
- is_running_ = false;
+ // Not running anymore
+ is_running_ = false;
+ }
+}
+
+void TransportProtocol::reset() {
+ reassembly_->reInitialize();
+ indexer_verifier_->reset();
+ if (fec_decoder_) {
+ fec_decoder_->reset();
+ }
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
@@ -127,6 +147,70 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
}
+void TransportProtocol::sendInterest(
+ const Name &interest_name,
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST> *additional_suffixes,
+ uint32_t len) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending interest for name " << interest_name;
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(interest_name);
+
+ for (uint32_t i = 0; i < len; i++) {
+ interest->appendSuffix(additional_suffixes->at(i));
+ }
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ interest->setLifetime(uint32_t(lifetime));
+
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void TransportProtocol::onTimeout(Interest::Ptr &i, const Name &n) {
+ if (TRANSPORT_EXPECT_FALSE(!isRunning())) {
+ return;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Timeout on content " << n;
+
+ onInterestTimeout(i, n);
+}
+
+void TransportProtocol::onContentObject(Interest &i, ContentObject &c) {
+ // Check whether it makes sense to continue
+ if (TRANSPORT_EXPECT_FALSE(!isRunning())) {
+ return;
+ }
+
+ // Call transport protocol function
+ std::error_code ec;
+ onContentObjectReceived(i, c, ec);
+
+ // Call reassemble function, if packet is eligible for reassemblying
+ bool reassemble = false;
+ if (!ec) {
+ reassemble = true;
+ }
+
+ // Perform verification and update indexer. This step may be performed offline
+ // - i.e. we may not get a result here (e.g. we use manifest). Verification
+ // failures in that case will be handled in the onPacketDropped function.
+ // XXX This step should be done before calling onContentObjectReceived, but
+ // for now we do it here since currently the indexer does not need manifests
+ // to move forward.
+ indexer_verifier_->onContentObject(i, c, reassemble);
+}
+
} // end namespace protocol
} // end namespace transport