aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.h4
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.cc6
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.h4
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc297
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc2
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h2
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc14
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.h4
-rw-r--r--libtransport/src/hicn/transport/protocols/rate_estimation.cc4
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc58
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h6
-rw-r--r--libtransport/src/hicn/transport/protocols/statistics.h4
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.cc35
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.h18
14 files changed, 430 insertions, 28 deletions
diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
index ea84d645a..b587a8332 100644
--- a/libtransport/src/hicn/transport/protocols/incremental_indexer.h
+++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
@@ -123,6 +123,10 @@ class IncrementalIndexer : public Indexer {
}
}
+ TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override {
+ return verification_manager_->onKeyToVerify();
+ }
+
protected:
interface::ConsumerSocket *socket_;
Reassembly *reassembly_;
diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc
index c50c4236b..d95c10ff9 100644
--- a/libtransport/src/hicn/transport/protocols/indexer.cc
+++ b/libtransport/src/hicn/transport/protocols/indexer.cc
@@ -63,6 +63,10 @@ void IndexManager::onContentObject(core::Interest::Ptr &&interest,
}
}
+bool IndexManager::onKeyToVerify() {
+ return indexer_->onKeyToVerify();
+}
+
void IndexManager::reset(std::uint32_t offset) {
indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
reassembly_);
@@ -71,4 +75,4 @@ void IndexManager::reset(std::uint32_t offset) {
}
} // namespace protocol
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h
index 89751095e..87cf9b307 100644
--- a/libtransport/src/hicn/transport/protocols/indexer.h
+++ b/libtransport/src/hicn/transport/protocols/indexer.h
@@ -57,6 +57,8 @@ class Indexer {
virtual void onContentObject(core::Interest::Ptr &&interest,
core::ContentObject::Ptr &&content_object) = 0;
+
+ virtual bool onKeyToVerify() = 0;
};
class IndexManager : Indexer {
@@ -87,6 +89,8 @@ class IndexManager : Indexer {
void onContentObject(core::Interest::Ptr &&interest,
core::ContentObject::Ptr &&content_object) override;
+ bool onKeyToVerify() override;
+
private:
std::unique_ptr<Indexer> indexer_;
bool first_segment_received_;
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
new file mode 100644
index 000000000..5bf9c89f7
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_indexing_manager.h>
+
+#include <cmath>
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ManifestIndexManager::ManifestIndexManager(
+ interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
+ : IncrementalIndexManager(icn_socket),
+ PacketManager<Interest>(1024),
+ next_to_retrieve_segment_(suffix_queue_.end()),
+ suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ next_reassembly_segment_(
+ core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true),
+ ignored_segments_(),
+ next_interest_(next_interest) {}
+
+bool ManifestIndexManager::onManifest(
+ core::ContentObject::Ptr &&content_object) {
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(*content_object));
+ bool manifest_verified = verification_manager_->onPacketToVerify(*manifest);
+
+ if (manifest_verified) {
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
+ core::ManifestVersion::VERSION_1)) {
+ throw errors::RuntimeException("Received manifest with unknown version.");
+ }
+
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = manifest->getSuffixList().end();
+ size_t nb_segments = std::distance(_it, _end);
+ final_suffix_ = manifest->getFinalBlockNumber(); // final block number
+
+ TRANSPORT_LOGD("Received manifest %u",
+ manifest->getWritableName().getSuffix());
+ suffix_hash_map_[_it->first] =
+ std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+ suffix_queue_.push_back(_it->first);
+
+ // If the transport protocol finished the list of segments to retrieve,
+ // reset the next_to_retrieve_segment_ iterator to the next segment
+ // provided by this manifest.
+ if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
+ suffix_queue_.end())) {
+ next_to_retrieve_segment_ = --suffix_queue_.end();
+ }
+
+ std::advance(_it, 1);
+ for (; _it != _end; _it++) {
+ suffix_hash_map_[_it->first] = std::make_pair(
+ std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+ suffix_queue_.push_back(_it->first);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
+ core::NextSegmentCalculationStrategy strategy =
+ manifest->getNextSegmentCalculationStrategy();
+
+ suffix_manifest_.reset(0);
+ suffix_manifest_.setNbSegments(nb_segments);
+ suffix_manifest_.setSuffixStrategy(strategy);
+ TRANSPORT_LOGD("Capacity of 1st manifest %zu",
+ suffix_manifest_.getNbSegments());
+
+ next_reassembly_segment_.reset(*suffix_queue_.begin());
+ next_reassembly_segment_.setNbSegments(nb_segments);
+ suffix_manifest_.setSuffixStrategy(strategy);
+ }
+
+ // If the manifest is not full, we add the suffixes of missing segments
+ // to the list of segments to ignore when computing the next reassembly
+ // index.
+ if (TRANSPORT_EXPECT_FALSE(
+ suffix_manifest_.getNbSegments() - nb_segments > 0)) {
+ auto start = manifest->getSuffixList().begin();
+ auto last = --_end;
+ for (uint32_t i = last->first + 1;
+ i < start->first + suffix_manifest_.getNbSegments(); i++) {
+ ignored_segments_.push_back(i);
+ }
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) {
+ fillWindow(manifest->getWritableName(),
+ manifest->getName().getSuffix());
+ }
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+ }
+
+ return manifest_verified;
+}
+
+void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
+ ContentObject::Ptr &&c) {
+ onManifest(std::move(c));
+ if (next_interest_) {
+ next_interest_->scheduleNextInterests();
+ }
+}
+
+void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
+ const Name &n = i->getName();
+ uint32_t segment = n.getSuffix();
+
+ if (segment > final_suffix_) {
+ return;
+ }
+
+ TRANSPORT_LOGD("Timeout on manifest %u", segment);
+ // Get portal
+ std::shared_ptr<interface::BasePortal> portal;
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
+
+ // Send requests for manifest out of the congestion window (no
+ // in_flight_interests++)
+ portal->sendInterest(
+ std::move(i),
+ std::bind(&ManifestIndexManager::onManifestReceived, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::bind(&ManifestIndexManager::onManifestTimeout, this,
+ std::placeholders::_1));
+}
+
+void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) {
+ /* Send as many manifest as required for filling window. */
+ uint32_t interest_lifetime;
+ double window_size;
+ std::shared_ptr<interface::BasePortal> portal;
+ Interest::Ptr interest;
+ uint32_t current_segment = *next_to_retrieve_segment_;
+ // suffix_manifest_ now points to the next manifest to request
+ uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix();
+
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+ socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ window_size);
+
+ if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) {
+ suffix_manifest_.updateSuffix(last_requested_manifest);
+ return;
+ }
+
+ if (current_segment + window_size < suffix_manifest_.getSuffix() &&
+ current_manifest != last_requested_manifest) {
+ suffix_manifest_.updateSuffix(last_requested_manifest);
+ return;
+ }
+
+ do {
+ interest = getPacket();
+ name.setSuffix(suffix_manifest_.getSuffix());
+ interest->setName(name);
+ interest->setLifetime(interest_lifetime);
+
+ // Send interests for manifest out of the congestion window (no
+ // in_flight_interests++)
+ portal->sendInterest(
+ std::move(interest),
+ std::bind(&ManifestIndexManager::onManifestReceived, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::bind(&ManifestIndexManager::onManifestTimeout, this,
+ std::placeholders::_1));
+ TRANSPORT_LOGD("Send manifest interest %u", name.getSuffix());
+
+ last_requested_manifest = (suffix_manifest_++).getSuffix();
+ } while (current_segment + window_size >= suffix_manifest_.getSuffix() &&
+ suffix_manifest_.getSuffix() < final_suffix_);
+
+ // suffix_manifest_ now points to the last requested manifest
+ suffix_manifest_.updateSuffix(last_requested_manifest);
+}
+
+bool ManifestIndexManager::onContentObject(
+ const core::ContentObject &content_object) {
+ bool verify_signature;
+ socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+
+ if (!verify_signature) {
+ return true;
+ }
+
+ uint64_t segment = content_object.getName().getSuffix();
+
+ bool ret = false;
+
+ auto it = suffix_hash_map_.find((const unsigned int)segment);
+ if (it != suffix_hash_map_.end()) {
+ auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
+ auto data_packet_digest = content_object.computeDigest(it->second.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
+ manifest_digest_bytes.data(),
+ hash_type)) {
+ suffix_hash_map_.erase(it);
+ ret = true;
+ } else {
+ throw errors::RuntimeException(
+ "Verification failure policy has to be implemented.");
+ }
+ }
+
+ return ret;
+}
+
+uint32_t ManifestIndexManager::getNextSuffix() {
+ if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
+ suffix_queue_.end())) {
+ return invalid_index;
+ }
+
+ return *next_to_retrieve_segment_++;
+}
+
+uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; }
+
+bool ManifestIndexManager::isFinalSuffixDiscovered() {
+ return IncrementalIndexManager::isFinalSuffixDiscovered();
+}
+
+uint32_t ManifestIndexManager::getNextReassemblySegment() {
+ uint32_t current_reassembly_segment;
+
+ while (true) {
+ current_reassembly_segment = next_reassembly_segment_.getSuffix();
+ next_reassembly_segment_++;
+
+ if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) {
+ return invalid_index;
+ }
+
+ if (ignored_segments_.empty()) break;
+
+ auto is_ignored =
+ std::find(ignored_segments_.begin(), ignored_segments_.end(),
+ current_reassembly_segment);
+
+ if (is_ignored == ignored_segments_.end()) break;
+
+ ignored_segments_.erase(is_ignored);
+ }
+
+ return current_reassembly_segment;
+}
+
+void ManifestIndexManager::reset() {
+ IncrementalIndexManager::reset();
+ suffix_manifest_.reset(0);
+ suffix_queue_.clear();
+ suffix_hash_map_.clear();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
index a0f847453..db461a66f 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -96,6 +96,8 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
stop();
+
+ on_payload->afterRead();
}
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index 87fab588b..4897da902 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -60,6 +60,8 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
virtual void resume();
+ virtual bool verifyKeyPackets() = 0;
+
virtual void scheduleNextInterests() = 0;
// Events generated by the indexing
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 641ae45c3..984470edb 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -115,7 +115,12 @@ void RaaqmTransportProtocol::reset() {
t0_ = utils::SteadyClock::now();
}
+bool RaaqmTransportProtocol::verifyKeyPackets() {
+ return index_manager_->onKeyToVerify();
+}
+
void RaaqmTransportProtocol::increaseWindow() {
+ // return;
double max_window_size = 0.;
socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
max_window_size);
@@ -131,6 +136,7 @@ void RaaqmTransportProtocol::increaseWindow() {
}
void RaaqmTransportProtocol::decreaseWindow() {
+ // return;
double min_window_size = 0.;
socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
min_window_size);
@@ -404,7 +410,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
const Name &n = interest->getName();
- TRANSPORT_LOGW("Timeout on %s", n.toString().c_str());
+ TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str());
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -474,21 +480,27 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
// send at least one interest if there are retransmissions to perform and
// there is no space left in the window
sendInterest(std::move(interest_to_retransmit_.front()));
+ TRANSPORT_LOGD("Window full, retransmit one content interest");
interest_to_retransmit_.pop();
}
uint32_t index = IndexManager::invalid_index;
+
// Send the interest needed for filling the window
while (interests_in_flight_ < current_window_size_) {
if (interest_to_retransmit_.size() > 0) {
sendInterest(std::move(interest_to_retransmit_.front()));
+ TRANSPORT_LOGD("Retransmit content interest");
interest_to_retransmit_.pop();
} else {
index = index_manager_->getNextSuffix();
if (index == IndexManager::invalid_index) {
+ TRANSPORT_LOGE("INVALID INDEX %d", index);
break;
}
+
sendInterest(index);
+ TRANSPORT_LOGD("Send content interest %u", index);
}
}
}
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
index 7fc540c9f..f2d819ec5 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.h
+++ b/libtransport/src/hicn/transport/protocols/raaqm.h
@@ -42,6 +42,8 @@ class RaaqmTransportProtocol : public TransportProtocol,
void reset() override;
+ virtual bool verifyKeyPackets() override;
+
protected:
static constexpr uint32_t buffer_size =
1 << interface::default_values::log_2_default_buffer_size;
@@ -136,4 +138,4 @@ class RaaqmTransportProtocol : public TransportProtocol,
} // end namespace protocol
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
index 99fc78c3e..50306e6e5 100644
--- a/libtransport/src/hicn/transport/protocols/rate_estimation.cc
+++ b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
@@ -186,7 +186,7 @@ void ALaTcpEstimator::onDataReceived(int packet_size) {
SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) {
this->estimation_ = 0.0;
this->estimated_ = false;
- this->observer_ = NULL;
+ this->observer_ = nullptr;
this->batching_param_ = batching_param;
this->total_size_ = 0.0;
this->number_of_packets_ = 0;
@@ -260,7 +260,7 @@ void SimpleEstimator::onDataReceived(int packet_size) {
void SimpleEstimator::onRttUpdate(double rtt) {
this->number_of_packets_++;
- if (number_of_packets_ == this->batching_param_) {
+ if (this->number_of_packets_ == this->batching_param_) {
TimePoint end = std::chrono::steady_clock::now();
auto delay =
std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index e371217f8..fece95d03 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -48,10 +48,22 @@ RTCTransportProtocol::~RTCTransportProtocol() {
}
int RTCTransportProtocol::start() {
+ if (is_running_) return -1;
+
+ reset();
+ is_first_ = true;
+
probeRtt();
sentinelTimer();
newRound();
- return TransportProtocol::start();
+ scheduleNextInterests();
+
+ is_first_ = false;
+ is_running_ = true;
+ portal_->runEventsLoop();
+ is_running_ = false;
+
+ return 0;
}
void RTCTransportProtocol::stop() {
@@ -65,7 +77,6 @@ void RTCTransportProtocol::resume() {
if (is_running_) return;
is_running_ = true;
-
inflightInterestsCount_ = 0;
probeRtt();
@@ -74,7 +85,6 @@ void RTCTransportProtocol::resume() {
scheduleNextInterests();
portal_->runEventsLoop();
-
is_running_ = false;
}
@@ -190,7 +200,6 @@ void RTCTransportProtocol::updateDelayStats(
// we collect OWD only for datapackets
if (payload->length() != HICN_NACK_HEADER_SIZE) {
uint64_t *senderTimeStamp = (uint64_t *)payload->data();
-
int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count() -
@@ -288,7 +297,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
(*stats_callback)(*socket_, *stats_);
}
-
// bound also by interest lifitime* production rate
if (!gotNack_) {
roundsWithoutNacks_++;
@@ -403,6 +411,15 @@ void RTCTransportProtocol::increaseWindow() {
}
void RTCTransportProtocol::probeRtt() {
+ probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ probe_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ probeRtt();
+ });
+
+ // To avoid sending the first probe, because the transport is not running yet
+ if (is_first_ && !is_running_) return;
+
time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
@@ -419,13 +436,9 @@ void RTCTransportProtocol::probeRtt() {
// we considere the probe as a rtx so that we do not incresea inFlightInt
received_probe_ = false;
+ TRANSPORT_LOGD("Send content interest %u (probeRtt)",
+ interest_name->getSuffix());
sendInterest(interest_name, true);
-
- probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
- probe_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- probeRtt();
- });
}
void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
@@ -463,6 +476,10 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
void RTCTransportProtocol::scheduleNextInterests() {
if (!is_running_ && !is_first_) return;
+ TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----",
+ currentCWin_, inflightInterestsCount_,
+ currentCWin_ - inflightInterestsCount_);
+
while (inflightInterestsCount_ < currentCWin_) {
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
@@ -479,6 +496,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
inflightInterests_[pkt].state = sent_;
inflightInterests_[pkt].sequence = actualSegment_;
actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ TRANSPORT_LOGD(
+ "Send content interest %u (scheduleNextInterests no replies)",
+ interest_name->getSuffix());
sendInterest(interest_name, false);
return;
}
@@ -515,8 +535,17 @@ void RTCTransportProtocol::scheduleNextInterests() {
inflightInterests_[pkt].sequence = actualSegment_;
actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)",
+ interest_name->getSuffix());
sendInterest(interest_name, false);
}
+
+ TRANSPORT_LOGD("----- end of scheduleNextInterest -----");
+}
+
+bool RTCTransportProtocol::verifyKeyPackets() {
+ // Not yet implemented
+ return false;
}
void RTCTransportProtocol::sentinelTimer() {
@@ -703,6 +732,8 @@ uint64_t RTCTransportProtocol::retransmit() {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
interest_name->setSuffix(it->first);
+ TRANSPORT_LOGD("Send content interest %u (retransmit)",
+ interest_name->getSuffix());
sendInterest(interest_name, true);
} else if (rtx_time < smallest_timeout) {
smallest_timeout = rtx_time;
@@ -866,6 +897,7 @@ void RTCTransportProtocol::onContentObject(
}
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segmentNumber);
if (segmentNumber == probe_seq_number_ && !received_probe_) {
received_probe_ = true;
@@ -899,6 +931,8 @@ void RTCTransportProtocol::onContentObject(
}
if (payload_size == HICN_NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segmentNumber);
+
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
@@ -930,6 +964,8 @@ void RTCTransportProtocol::onContentObject(
}
} else {
+ TRANSPORT_LOGD("Received content %u", segmentNumber);
+
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 9e1731e96..f34afbb5f 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -81,8 +81,8 @@ typedef enum packetState packetState_t;
struct sentInterest {
uint64_t transmissionTime;
- uint32_t sequence; // sequence number of the interest sent
- // to handle seq % buffer_size
+ uint32_t sequence; // sequence number of the interest sent
+ // to handle seq % buffer_size
packetState_t state; // see packet state
};
@@ -99,6 +99,8 @@ class RTCTransportProtocol : public TransportProtocol,
void resume() override;
+ bool verifyKeyPackets() override;
+
private:
// algo functions
void reset() override;
diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h
index d5e89b96d..c92940ab4 100644
--- a/libtransport/src/hicn/transport/protocols/statistics.h
+++ b/libtransport/src/hicn/transport/protocols/statistics.h
@@ -82,9 +82,7 @@ class TransportStatistics {
return interest_tx_;
}
- TRANSPORT_ALWAYS_INLINE double getLossRatio() const {
- return loss_ratio_;
- }
+ TRANSPORT_ALWAYS_INLINE double getLossRatio() const { return loss_ratio_; }
TRANSPORT_ALWAYS_INLINE double getQueuingDelay() const {
return queuing_delay_;
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc
index f45cab743..74faf0521 100644
--- a/libtransport/src/hicn/transport/protocols/verification_manager.cc
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.cc
@@ -13,9 +13,8 @@
* limitations under the License.
*/
-#include <hicn/transport/protocols/verification_manager.h>
-
#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/verification_manager.h>
namespace transport {
@@ -25,20 +24,30 @@ interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
const Packet& packet) {
using namespace interface;
- bool verify_signature;
+ bool verify_signature = false, key_content = false;
VerificationPolicy ret = VerificationPolicy::DROP_PACKET;
- ConsumerContentObjectVerificationFailedCallback*
- verification_failed_callback = VOID_HANDLER;
icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
verify_signature);
+ icn_socket_->getSocketOption(GeneralTransportOptions::KEY_CONTENT,
+ key_content);
if (!verify_signature) {
return VerificationPolicy::ACCEPT_PACKET;
}
+ if (key_content) {
+ key_packets_.push(copyPacket(packet));
+ return VerificationPolicy::ACCEPT_PACKET;
+ } else if (!key_packets_.empty()) {
+ std::queue<ContentObjectPtr>().swap(key_packets_);
+ }
+
+ ConsumerContentObjectVerificationFailedCallback*
+ verification_failed_callback = VOID_HANDLER;
icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
&verification_failed_callback);
+
if (!verification_failed_callback) {
throw errors::RuntimeException(
"No verification failed callback provided by application. "
@@ -66,6 +75,22 @@ interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
return ret;
}
+bool SignatureVerificationManager::onKeyToVerify() {
+ if (TRANSPORT_EXPECT_FALSE(key_packets_.empty())) {
+ throw errors::RuntimeException("No key to verify.");
+ }
+
+ while (!key_packets_.empty()) {
+ ContentObjectPtr packet_to_verify = key_packets_.front();
+ key_packets_.pop();
+ if (onPacketToVerify(*packet_to_verify) !=
+ VerificationPolicy::ACCEPT_PACKET)
+ return false;
+ }
+
+ return true;
+}
+
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h
index 6e5d32127..293e8103a 100644
--- a/libtransport/src/hicn/transport/protocols/verification_manager.h
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.h
@@ -30,22 +30,36 @@ namespace protocol {
using Packet = core::Packet;
using interface::ConsumerSocket;
using interface::VerificationPolicy;
+using ContentObjectPtr = std::shared_ptr<core::ContentObject>;
class VerificationManager {
public:
virtual ~VerificationManager() = default;
virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0;
+ virtual bool onKeyToVerify() { return false; }
};
class SignatureVerificationManager : public VerificationManager {
public:
- SignatureVerificationManager(ConsumerSocket* icn_socket)
- : icn_socket_(icn_socket) {}
+ SignatureVerificationManager(interface::ConsumerSocket* icn_socket)
+ : icn_socket_(icn_socket), key_packets_() {}
interface::VerificationPolicy onPacketToVerify(const Packet& packet) override;
+ bool onKeyToVerify() override;
private:
ConsumerSocket* icn_socket_;
+ std::queue<ContentObjectPtr> key_packets_;
+
+ ContentObjectPtr copyPacket(const Packet& packet) {
+ std::shared_ptr<utils::MemBuf> packet_copy =
+ packet.acquireMemBufReference();
+ ContentObjectPtr content_object_copy =
+ std::make_shared<core::ContentObject>(std::move(packet_copy));
+ std::unique_ptr<utils::MemBuf> payload_copy = packet.getPayload();
+ content_object_copy->appendPayload(std::move(payload_copy));
+ return content_object_copy;
+ }
};
} // end namespace protocol