diff options
author | Mauro <you@example.com> | 2021-06-30 07:57:22 +0000 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2021-07-06 16:16:04 +0000 |
commit | 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 (patch) | |
tree | 7ecc534d55bdc7e8dd15ecab084720910bcdf4d9 /libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc | |
parent | 147ba39bed26887f5eba84757e2463ab8e370a9a (diff) |
[HICN-713] Transport Library Major Refactoring 2
Co-authored-by: Luca Muscariello <muscariello@ieee.org>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Giulio Grassi <gigrassi@cisco.com>
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2
Diffstat (limited to 'libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc')
-rw-r--r-- | libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc new file mode 100644 index 000000000..168aa57af --- /dev/null +++ b/libtransport/src/protocols/manifest_incremental_indexer_bytestream.cc @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_consumer.h> +#include <protocols/errors.h> +#include <protocols/manifest_incremental_indexer_bytestream.h> +#include <protocols/transport_protocol.h> + +#include <cmath> +#include <deque> + +namespace transport { + +namespace protocol { + +using namespace interface; + +ManifestIncrementalIndexer::ManifestIncrementalIndexer( + implementation::ConsumerSocket *icn_socket, TransportProtocol *transport) + : IncrementalIndexer(icn_socket, transport), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + NextSegmentCalculationStrategy::INCREMENTAL, next_download_suffix_, + 0)) {} + +void ManifestIncrementalIndexer::onContentObject( + core::Interest &interest, core::ContentObject &content_object, + bool reassembly) { + switch (content_object.getPayloadType()) { + case PayloadType::DATA: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received content " << content_object.getName(); + onUntrustedContentObject(interest, content_object, reassembly); + break; + } + case PayloadType::MANIFEST: { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received manifest " << content_object.getName(); + onUntrustedManifest(interest, content_object, reassembly); + break; + } + default: { + return; + } + } +} + +void ManifestIncrementalIndexer::onUntrustedManifest( + core::Interest &interest, core::ContentObject &content_object, + bool reassembly) { + auto manifest = + std::make_unique<ContentObjectManifest>(std::move(content_object)); + + auth::VerificationPolicy policy = verifier_->verifyPackets(manifest.get()); + + manifest->decode(); + + if (policy != auth::VerificationPolicy::ACCEPT) { + transport_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + return; + } + + processTrustedManifest(interest, std::move(manifest), reassembly); +} + +void ManifestIncrementalIndexer::processTrustedManifest( + core::Interest &interest, std::unique_ptr<ContentObjectManifest> manifest, + bool reassembly) { + if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != + core::ManifestVersion::VERSION_1)) { + throw errors::RuntimeException("Received manifest with unknown version."); + } + + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); + + // The packets to verify with the received manifest + std::vector<auth::PacketPtr> packets; + + // Convert the received manifest to a map of packet suffixes to hashes + auth::Verifier::SuffixMap current_manifest = + core::ContentObjectManifest::getSuffixMap(manifest.get()); + + // Update 'suffix_map_' with new hashes from the received manifest and + // build 'packets' + for (auto it = current_manifest.begin(); it != current_manifest.end();) { + if (unverified_segments_.find(it->first) == + unverified_segments_.end()) { + suffix_map_[it->first] = std::move(it->second); + current_manifest.erase(it++); + continue; + } + + packets.push_back(std::get<1>(unverified_segments_[it->first]).get()); + it++; + } + + // Verify unverified segments using the received manifest + auth::Verifier::PolicyMap policies = + verifier_->verifyPackets(packets, current_manifest); + + for (unsigned int i = 0; i < packets.size(); ++i) { + auth::Suffix suffix = packets[i]->getName().getSuffix(); + + auto it = unverified_segments_.find(suffix); + + if (policies[suffix] != auth::VerificationPolicy::UNKNOWN) { + unverified_segments_.erase(it); + continue; + } + + applyPolicy(*std::get<0>(it->second), *std::get<1>(it->second), + std::get<2>(it->second), policies[suffix]); + } + + if (reassembly) { + reassembly_->reassemble(std::move(manifest)); + } + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } +} + +void ManifestIncrementalIndexer::onUntrustedContentObject( + Interest &interest, ContentObject &content_object, bool reassembly) { + auth::Suffix suffix = content_object.getName().getSuffix(); + auth::VerificationPolicy policy = + verifier_->verifyPackets(&content_object, suffix_map_); + + switch (policy) { + case auth::VerificationPolicy::UNKNOWN: { + unverified_segments_[suffix] = + std::make_tuple(interest.shared_from_this(), + content_object.shared_from_this(), reassembly); + break; + } + default: { + suffix_map_.erase(suffix); + break; + } + } + + applyPolicy(interest, content_object, reassembly, policy); +} + +void ManifestIncrementalIndexer::applyPolicy( + core::Interest &interest, core::ContentObject &content_object, + bool reassembly, auth::VerificationPolicy policy) { + assert(reassembly_); + switch (policy) { + case auth::VerificationPolicy::ACCEPT: { + if (reassembly && !reassembly_->reassembleUnverified()) { + reassembly_->reassemble(content_object); + } + break; + } + case auth::VerificationPolicy::DROP: { + transport_->onPacketDropped( + interest, content_object, + make_error_code(protocol_error::verification_failed)); + break; + } + case auth::VerificationPolicy::ABORT: { + transport_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + case auth::VerificationPolicy::UNKNOWN: { + if (reassembly && reassembly_->reassembleUnverified()) { + reassembly_->reassemble(content_object); + } + } + } +} + +uint32_t ManifestIncrementalIndexer::checkNextSuffix() { + return suffix_strategy_->getNextSuffix(); +} + +uint32_t ManifestIncrementalIndexer::getNextSuffix() { + auto ret = suffix_strategy_->getNextSuffix(); + + if (ret <= suffix_strategy_->getFinalSuffix() && + ret != utils::SuffixStrategy::INVALID_SUFFIX) { + suffix_queue_.push(ret); + return ret; + } + + return Indexer::invalid_index; +} + +uint32_t ManifestIncrementalIndexer::getFinalSuffix() { + return suffix_strategy_->getFinalSuffix(); +} + +bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() { + return IncrementalIndexer::isFinalSuffixDiscovered(); +} + +uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { + if (suffix_queue_.empty()) { + return Indexer::invalid_index; + } + + auto ret = suffix_queue_.front(); + suffix_queue_.pop(); + return ret; +} + +void ManifestIncrementalIndexer::reset() { + IncrementalIndexer::reset(); + suffix_map_.clear(); + unverified_segments_.clear(); + SuffixQueue empty; + std::swap(suffix_queue_, empty); + suffix_strategy_->reset(first_suffix_); +} + +} // namespace protocol + +} // namespace transport |