diff options
author | Mauro Sardara <msardara@cisco.com> | 2019-03-19 16:21:55 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2019-03-19 16:23:22 +0100 |
commit | 393a0a4018936134fa1b343523bbafaab606973d (patch) | |
tree | eb587157a3ff5235ec02611287c02c5e08487eba | |
parent | 7266728a7857b038679fe35321ace30386f0c461 (diff) |
[HICN-125 HICN-126]
- Add MemBuf as return type of getPayload of transport::core::Packet
- Fix incremental index manager
Change-Id: Ib557d56b1bf42e3974364c611b825b21f1e3d3f1
Signed-off-by: Mauro Sardara <msardara@cisco.com>
12 files changed, 112 insertions, 176 deletions
diff --git a/libtransport/src/hicn/transport/core/manifest_format_fixed.cc b/libtransport/src/hicn/transport/core/manifest_format_fixed.cc index 73b33268c..b97aa7079 100644 --- a/libtransport/src/hicn/transport/core/manifest_format_fixed.cc +++ b/libtransport/src/hicn/transport/core/manifest_format_fixed.cc @@ -143,10 +143,9 @@ std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() { FixedManifestDecoder::FixedManifestDecoder(Packet &packet) : packet_(packet), manifest_header_(reinterpret_cast<ManifestHeader *>( - const_cast<uint8_t *>(packet_.getPayload().data()))), + packet_.getPayload()->writableData())), manifest_entries_(reinterpret_cast<ManifestEntry *>( - const_cast<uint8_t *>(packet_.getPayload().data()) + - sizeof(ManifestHeader))) {} + packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {} FixedManifestDecoder::~FixedManifestDecoder() {} diff --git a/libtransport/src/hicn/transport/core/packet.cc b/libtransport/src/hicn/transport/core/packet.cc index d925375ce..ea9666ff7 100644 --- a/libtransport/src/hicn/transport/core/packet.cc +++ b/libtransport/src/hicn/transport/core/packet.cc @@ -34,11 +34,11 @@ const core::Name Packet::base_name("0::0|0"); Packet::Packet(Format format) : packet_(utils::MemBuf::create(getHeaderSizeFromFormat(format, 256)) .release()), - packet_start_(packet_->writableData()), + packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), header_head_(packet_.get()), payload_head_(nullptr), format_(format) { - if (hicn_packet_init_header(format, (hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_init_header(format, packet_start_) < 0) { throw errors::RuntimeException("Unexpected error initializing the packet."); } @@ -47,10 +47,10 @@ Packet::Packet(Format format) Packet::Packet(MemBufPtr &&buffer) : packet_(std::move(buffer)), - packet_start_(packet_->writableData()), + packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), header_head_(packet_.get()), payload_head_(nullptr), - format_(getFormatFromBuffer(packet_start_)) {} + format_(getFormatFromBuffer(packet_->writableData())) {} Packet::Packet(const uint8_t *buffer, std::size_t size) : Packet(MemBufPtr(utils::MemBuf::copyBuffer(buffer, size).release())) {} @@ -124,25 +124,24 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format, void Packet::replace(MemBufPtr &&buffer) { packet_ = std::move(buffer); - packet_start_ = packet_->writableData(); + packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData()); header_head_ = packet_.get(); payload_head_ = nullptr; - format_ = getFormatFromBuffer(packet_start_); + format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_)); } std::size_t Packet::payloadSize() const { - return getPayloadSizeFromBuffer(format_, packet_start_); + return getPayloadSizeFromBuffer(format_, + reinterpret_cast<uint8_t *>(packet_start_)); } std::size_t Packet::headerSize() const { - return getHeaderSizeFromBuffer(format_, packet_start_); + return getHeaderSizeFromBuffer(format_, + reinterpret_cast<uint8_t *>(packet_start_)); } -const uint8_t *Packet::start() const { return packet_start_; } - void Packet::setLifetime(uint32_t lifetime) { - if (hicn_interest_set_lifetime((hicn_header_t *)packet_start_, lifetime) < - 0) { + if (hicn_interest_set_lifetime(packet_start_, lifetime) < 0) { throw errors::MalformedPacketException(); } } @@ -150,7 +149,7 @@ void Packet::setLifetime(uint32_t lifetime) { uint32_t Packet::getLifetime() const { uint32_t lifetime = 0; - if (hicn_packet_get_lifetime((hicn_header_t *)packet_start_, &lifetime) < 0) { + if (hicn_packet_get_lifetime(packet_start_, &lifetime) < 0) { throw errors::MalformedPacketException(); } @@ -190,20 +189,16 @@ Packet &Packet::appendHeader(const uint8_t *buffer, std::size_t length) { return appendHeader(utils::MemBuf::copyBuffer(buffer, length)); } -utils::Array<uint8_t> Packet::getPayload() const { +std::unique_ptr<utils::MemBuf> Packet::getPayload() const { const_cast<Packet *>(this)->separateHeaderPayload(); - if (TRANSPORT_EXPECT_FALSE(payload_head_ == nullptr)) { - return utils::Array<uint8_t>(); - } - // Hopefully the payload is contiguous - if (TRANSPORT_EXPECT_FALSE(payload_head_->next() != header_head_)) { + if (TRANSPORT_EXPECT_FALSE(payload_head_ && + payload_head_->next() != header_head_)) { payload_head_->gather(payloadSize()); } - return utils::Array<uint8_t>(payload_head_->writableData(), - payload_head_->length()); + return payload_head_->cloneOne(); } Packet &Packet::updateLength(std::size_t length) { @@ -214,8 +209,8 @@ Packet &Packet::updateLength(std::size_t length) { total_length += current->length(); } - if (hicn_packet_set_payload_length(format_, (hicn_header_t *)packet_start_, - total_length) < 0) { + if (hicn_packet_set_payload_length(format_, packet_start_, total_length) < + 0) { throw errors::RuntimeException("Error setting the packet payload."); } @@ -225,7 +220,7 @@ Packet &Packet::updateLength(std::size_t length) { PayloadType Packet::getPayloadType() const { hicn_payload_type_t ret = HPT_UNSPEC; - if (hicn_packet_get_payload_type((hicn_header_t *)packet_start_, &ret) < 0) { + if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) { throw errors::RuntimeException("Impossible to retrieve payload type."); } @@ -233,7 +228,7 @@ PayloadType Packet::getPayloadType() const { } Packet &Packet::setPayloadType(PayloadType payload_type) { - if (hicn_packet_set_payload_type((hicn_header_t *)packet_start_, + if (hicn_packet_set_payload_type(packet_start_, hicn_payload_type_t(payload_type)) < 0) { throw errors::RuntimeException("Error setting payload type of the packet."); } @@ -243,7 +238,7 @@ Packet &Packet::setPayloadType(PayloadType payload_type) { Packet::Format Packet::getFormat() const { if (format_ == HF_UNSPEC) { - if (hicn_packet_get_format((hicn_header_t *)packet_start_, &format_) < 0) { + if (hicn_packet_get_format(packet_start_, &format_) < 0) { throw errors::MalformedPacketException(); } } @@ -268,8 +263,7 @@ void Packet::dump() const { } void Packet::setSignatureSize(std::size_t size_bytes) { - int ret = hicn_packet_set_signature_size( - format_, (hicn_header_t *)packet_start_, size_bytes); + int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes); if (ret < 0) { throw errors::RuntimeException("Packet without Authentication Header."); @@ -281,8 +275,7 @@ void Packet::setSignatureSize(std::size_t size_bytes) { uint8_t *Packet::getSignature() const { uint8_t *signature; - int ret = hicn_packet_get_signature(format_, (hicn_header_t *)packet_start_, - &signature); + int ret = hicn_packet_get_signature(format_, packet_start_, &signature); if (ret < 0) { throw errors::RuntimeException("Packet without Authentication Header."); @@ -293,8 +286,7 @@ uint8_t *Packet::getSignature() const { std::size_t Packet::getSignatureSize() const { size_t size_bytes; - int ret = hicn_packet_get_signature_size( - format_, (hicn_header_t *)packet_start_, &size_bytes); + int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes); if (ret < 0) { throw errors::RuntimeException("Packet without Authentication Header."); @@ -304,8 +296,8 @@ std::size_t Packet::getSignatureSize() const { } void Packet::setSignatureTimestamp(const uint64_t ×tamp) { - int ret = hicn_packet_set_signature_timestamp( - format_, (hicn_header_t *)packet_start_, timestamp); + int ret = + hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp); if (ret < 0) { throw errors::RuntimeException("Error setting the signature timestamp."); @@ -314,8 +306,8 @@ void Packet::setSignatureTimestamp(const uint64_t ×tamp) { uint64_t Packet::getSignatureTimestamp() const { uint64_t return_value; - int ret = hicn_packet_get_signature_timestamp( - format_, (hicn_header_t *)packet_start_, &return_value); + int ret = hicn_packet_get_signature_timestamp(format_, packet_start_, + &return_value); if (ret < 0) { throw errors::RuntimeException("Error getting the signature timestamp."); @@ -326,8 +318,8 @@ uint64_t Packet::getSignatureTimestamp() const { void Packet::setValidationAlgorithm( const utils::CryptoSuite &validation_algorithm) { - int ret = hicn_packet_set_validation_algorithm( - format_, (hicn_header_t *)packet_start_, uint8_t(validation_algorithm)); + int ret = hicn_packet_set_validation_algorithm(format_, packet_start_, + uint8_t(validation_algorithm)); if (ret < 0) { throw errors::RuntimeException("Error setting the validation algorithm."); @@ -336,8 +328,8 @@ void Packet::setValidationAlgorithm( utils::CryptoSuite Packet::getValidationAlgorithm() const { uint8_t return_value; - int ret = hicn_packet_get_validation_algorithm( - format_, (hicn_header_t *)packet_start_, &return_value); + int ret = hicn_packet_get_validation_algorithm(format_, packet_start_, + &return_value); if (ret < 0) { throw errors::RuntimeException("Error getting the validation algorithm."); @@ -347,8 +339,7 @@ utils::CryptoSuite Packet::getValidationAlgorithm() const { } void Packet::setKeyId(const utils::KeyId &key_id) { - int ret = hicn_packet_set_key_id(format_, (hicn_header_t *)packet_start_, - key_id.first); + int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first); if (ret < 0) { throw errors::RuntimeException("Error setting the key id."); @@ -357,8 +348,8 @@ void Packet::setKeyId(const utils::KeyId &key_id) { utils::KeyId Packet::getKeyId() const { utils::KeyId return_value; - int ret = hicn_packet_get_key_id(format_, (hicn_header_t *)packet_start_, - &return_value.first, &return_value.second); + int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first, + &return_value.second); if (ret < 0) { throw errors::RuntimeException("Error getting the validation algorithm."); @@ -374,8 +365,7 @@ utils::CryptoHash Packet::computeDigest(HashAlgorithm algorithm) const { // Copy IP+TCP/ICMP header before zeroing them hicn_header_t header_copy; - hicn_packet_copy_header(format_, (hicn_header_t *)packet_start_, &header_copy, - false); + hicn_packet_copy_header(format_, packet_start_, &header_copy, false); const_cast<Packet *>(this)->resetForHash(); @@ -385,8 +375,7 @@ utils::CryptoHash Packet::computeDigest(HashAlgorithm algorithm) const { current = current->next(); } while (current != header_head_); - hicn_packet_copy_header(format_, &header_copy, (hicn_header_t *)packet_start_, - false); + hicn_packet_copy_header(format_, &header_copy, packet_start_, false); return hasher.finalize(); } @@ -401,15 +390,14 @@ void Packet::setChecksum() { } partial_csum = csum(current->data(), current->length(), partial_csum); } - if (hicn_packet_compute_header_checksum( - format_, (hicn_header_t *)packet_start_, partial_csum) < 0) { + if (hicn_packet_compute_header_checksum(format_, packet_start_, + partial_csum) < 0) { throw errors::MalformedPacketException(); } } bool Packet::checkIntegrity() const { - if (hicn_packet_check_integrity(format_, (hicn_header_t *)packet_start_) < - 0) { + if (hicn_packet_check_integrity(format_, packet_start_) < 0) { return false; } @@ -417,7 +405,7 @@ bool Packet::checkIntegrity() const { } Packet &Packet::setSyn() { - if (hicn_packet_set_syn((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_set_syn(packet_start_) < 0) { throw errors::RuntimeException("Error setting syn bit in the packet."); } @@ -425,7 +413,7 @@ Packet &Packet::setSyn() { } Packet &Packet::resetSyn() { - if (hicn_packet_reset_syn((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_reset_syn(packet_start_) < 0) { throw errors::RuntimeException("Error resetting syn bit in the packet."); } @@ -434,7 +422,7 @@ Packet &Packet::resetSyn() { bool Packet::testSyn() const { bool res = false; - if (hicn_packet_test_syn((hicn_header_t *)packet_start_, &res) < 0) { + if (hicn_packet_test_syn(packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing syn bit in the packet."); } @@ -442,7 +430,7 @@ bool Packet::testSyn() const { } Packet &Packet::setAck() { - if (hicn_packet_set_ack((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_set_ack(packet_start_) < 0) { throw errors::RuntimeException("Error setting ack bit in the packet."); } @@ -450,7 +438,7 @@ Packet &Packet::setAck() { } Packet &Packet::resetAck() { - if (hicn_packet_reset_ack((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_reset_ack(packet_start_) < 0) { throw errors::RuntimeException("Error resetting ack bit in the packet."); } @@ -459,7 +447,7 @@ Packet &Packet::resetAck() { bool Packet::testAck() const { bool res = false; - if (hicn_packet_test_ack((hicn_header_t *)packet_start_, &res) < 0) { + if (hicn_packet_test_ack(packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing ack bit in the packet."); } @@ -467,7 +455,7 @@ bool Packet::testAck() const { } Packet &Packet::setRst() { - if (hicn_packet_set_rst((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_set_rst(packet_start_) < 0) { throw errors::RuntimeException("Error setting rst bit in the packet."); } @@ -475,7 +463,7 @@ Packet &Packet::setRst() { } Packet &Packet::resetRst() { - if (hicn_packet_reset_rst((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_reset_rst(packet_start_) < 0) { throw errors::RuntimeException("Error resetting rst bit in the packet."); } @@ -484,7 +472,7 @@ Packet &Packet::resetRst() { bool Packet::testRst() const { bool res = false; - if (hicn_packet_test_rst((hicn_header_t *)packet_start_, &res) < 0) { + if (hicn_packet_test_rst(packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing rst bit in the packet."); } @@ -492,7 +480,7 @@ bool Packet::testRst() const { } Packet &Packet::setFin() { - if (hicn_packet_set_fin((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_set_fin(packet_start_) < 0) { throw errors::RuntimeException("Error setting fin bit in the packet."); } @@ -500,7 +488,7 @@ Packet &Packet::setFin() { } Packet &Packet::resetFin() { - if (hicn_packet_reset_fin((hicn_header_t *)packet_start_) < 0) { + if (hicn_packet_reset_fin(packet_start_) < 0) { throw errors::RuntimeException("Error resetting fin bit in the packet."); } @@ -509,7 +497,7 @@ Packet &Packet::resetFin() { bool Packet::testFin() const { bool res = false; - if (hicn_packet_test_fin((hicn_header_t *)packet_start_, &res) < 0) { + if (hicn_packet_test_fin(packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing fin bit in the packet."); } @@ -543,7 +531,7 @@ std::string Packet::printFlags() const { } Packet &Packet::setSrcPort(uint16_t srcPort) { - if (hicn_packet_set_src_port((hicn_header_t *)packet_start_, srcPort) < 0) { + if (hicn_packet_set_src_port(packet_start_, srcPort) < 0) { throw errors::RuntimeException("Error setting source port in the packet."); } @@ -551,7 +539,7 @@ Packet &Packet::setSrcPort(uint16_t srcPort) { } Packet &Packet::setDstPort(uint16_t dstPort) { - if (hicn_packet_set_dst_port((hicn_header_t *)packet_start_, dstPort) < 0) { + if (hicn_packet_set_dst_port(packet_start_, dstPort) < 0) { throw errors::RuntimeException( "Error setting destination port in the packet."); } @@ -562,7 +550,7 @@ Packet &Packet::setDstPort(uint16_t dstPort) { uint16_t Packet::getSrcPort() const { uint16_t port = 0; - if (hicn_packet_get_src_port((hicn_header_t *)packet_start_, &port) < 0) { + if (hicn_packet_get_src_port(packet_start_, &port) < 0) { throw errors::RuntimeException("Error reading source port in the packet."); } @@ -572,7 +560,7 @@ uint16_t Packet::getSrcPort() const { uint16_t Packet::getDstPort() const { uint16_t port = 0; - if (hicn_packet_get_dst_port((hicn_header_t *)packet_start_, &port) < 0) { + if (hicn_packet_get_dst_port(packet_start_, &port) < 0) { throw errors::RuntimeException( "Error reading destination port in the packet."); } @@ -581,7 +569,7 @@ uint16_t Packet::getDstPort() const { } Packet &Packet::setTTL(uint8_t hops) { - if (hicn_packet_set_hoplimit((hicn_header_t *)packet_start_, hops) < 0) { + if (hicn_packet_set_hoplimit(packet_start_, hops) < 0) { throw errors::RuntimeException("Error setting TTL."); } @@ -590,7 +578,7 @@ Packet &Packet::setTTL(uint8_t hops) { uint8_t Packet::getTTL() const { uint8_t hops = 0; - if (hicn_packet_get_hoplimit((hicn_header_t *)packet_start_, &hops) < 0) { + if (hicn_packet_get_hoplimit(packet_start_, &hops) < 0) { throw errors::RuntimeException("Error reading TTL."); } @@ -603,27 +591,21 @@ void Packet::separateHeaderPayload() { } int signature_size = 0; - if (_is_ah(format_)) { signature_size = (uint32_t)getSignatureSize(); } auto header_size = getHeaderSizeFromFormat(format_, signature_size); auto payload_length = packet_->length() - header_size; - if (!payload_length) { - return; - } packet_->trimEnd(packet_->length()); - if (payload_length) { - auto payload = packet_->cloneOne(); - payload_head_ = payload.get(); - payload_head_->advance(header_size); - payload_head_->append(payload_length); - packet_->prependChain(std::move(payload)); - packet_->append(header_size); - } + auto payload = packet_->cloneOne(); + payload_head_ = payload.get(); + payload_head_->advance(header_size); + payload_head_->append(payload_length); + packet_->prependChain(std::move(payload)); + packet_->append(header_size); } } // end namespace core diff --git a/libtransport/src/hicn/transport/core/packet.h b/libtransport/src/hicn/transport/core/packet.h index 78dbeae07..16191dd7f 100644 --- a/libtransport/src/hicn/transport/core/packet.h +++ b/libtransport/src/hicn/transport/core/packet.h @@ -101,8 +101,6 @@ class Packet : public std::enable_shared_from_this<Packet> { const std::shared_ptr<utils::MemBuf> data(); - const uint8_t *start() const; - virtual const Name &getName() const = 0; virtual Name &getWritableName() = 0; @@ -123,7 +121,7 @@ class Packet : public std::enable_shared_from_this<Packet> { Packet &appendHeader(const uint8_t *buffer, std::size_t length); - utils::Array<uint8_t> getPayload() const; + std::unique_ptr<utils::MemBuf> getPayload() const; Packet &updateLength(std::size_t length = 0); @@ -190,7 +188,7 @@ class Packet : public std::enable_shared_from_this<Packet> { protected: Name name_; MemBufPtr packet_; - uint8_t *packet_start_; + hicn_header_t *packet_start_; utils::MemBuf *header_head_; utils::MemBuf *payload_head_; mutable Format format_; diff --git a/libtransport/src/hicn/transport/http/server_acceptor.cc b/libtransport/src/hicn/transport/http/server_acceptor.cc index 615fa80d8..486b04c57 100644 --- a/libtransport/src/hicn/transport/http/server_acceptor.cc +++ b/libtransport/src/hicn/transport/http/server_acceptor.cc @@ -85,9 +85,9 @@ void HTTPServerAcceptor::listen(bool async) { void HTTPServerAcceptor::processIncomingInterest(ProducerSocket &p, Interest &interest) { // Temporary solution. With - utils::Array<uint8_t> payload = interest.getPayload(); + auto payload = interest.getPayload(); - int request_id = utils::hash::fnv32_buf(payload.data(), payload.length()); + int request_id = utils::hash::fnv32_buf(payload->data(), payload->length()); if (publishers_.find(request_id) != publishers_.end()) { if (publishers_[request_id]) { @@ -98,8 +98,8 @@ void HTTPServerAcceptor::processIncomingInterest(ProducerSocket &p, publishers_[request_id] = std::make_shared<HTTPServerPublisher>(interest.getName()); - callback_(publishers_[request_id], (uint8_t *)payload.data(), - payload.length(), request_id); + callback_(publishers_[request_id], (uint8_t *)payload->data(), + payload->length(), request_id); } std::map<int, std::shared_ptr<HTTPServerPublisher>> diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc index 3bb51e72e..2e180cf34 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc @@ -261,7 +261,7 @@ void AsyncFullDuplexSocket::signalProductionToSubscribers( // Todo consider using preallocated pool of membufs auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); _payload->append(sizeof(ActionMessage)); - auto payload = const_cast<uint8_t *>(interest->getPayload().data()); + auto payload = interest->getPayload()->writableData(); ActionMessage *produce_notification = reinterpret_cast<ActionMessage *>(payload); @@ -294,7 +294,7 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage( const core::Interest &interest) { auto mesg = interest.getPayload(); const MessageHeader *header = - reinterpret_cast<const MessageHeader *>(mesg.data()); + reinterpret_cast<const MessageHeader *>(mesg->data()); switch (header->msg_type) { case MessageType::ACTION: { @@ -350,14 +350,12 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage( // We saved one round trip :) auto buffer = ContentBuffer(); - const uint8_t *data = mesg.data() + sizeof(PayloadMessage); - buffer->assign(data, data + mesg.length() - sizeof(PayloadMessage)); + const uint8_t *data = mesg->data() + sizeof(PayloadMessage); + buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage)); read_callback_->readBufferAvailable(std::move(buffer)); return createAck(); } - default: { - return std::shared_ptr<core::ContentObject>(nullptr); - } + default: { return std::shared_ptr<core::ContentObject>(nullptr); } } return std::shared_ptr<core::ContentObject>(nullptr); @@ -366,7 +364,7 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage( void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, const core::Interest &i) { auto payload = i.getPayload(); - if (payload.length()) { + if (payload->length()) { // Try to decode payload and see if starting an async pull operation auto response = decodeSynchronizationMessage(i); if (response) { @@ -412,14 +410,14 @@ void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( // The ack message should contain the name to be used for notifying // the production of the content to the other part - if (content_object->getPayload().length() == 0) { + if (content_object->getPayload()->length() == 0) { TRANSPORT_LOGW("Connection response message empty...."); return; } SubscriptionResponseMessage *response = reinterpret_cast<SubscriptionResponseMessage *>( - content_object->getPayload().writableData()); + content_object->getPayload()->writableData()); if (response->response.header.msg_type == MessageType::RESPONSE) { if (response->response.return_code == ReturnCode::OK) { @@ -457,7 +455,7 @@ std::shared_ptr<core::ContentObject> AsyncFullDuplexSocket::createAck() { auto response = std::make_shared<core::ContentObject>(name); auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); _payload->append(sizeof(ResponseMessage)); - auto payload = response->getPayload().data(); + auto payload = response->getPayload()->data(); ResponseMessage *response_message = (ResponseMessage *)payload; response_message->header.msg_type = MessageType::RESPONSE; response_message->header.reserved[0] = 0; diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 87cf27b75..00cc82543 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -163,7 +163,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { void RTCProducerSocket::sendNack(const Interest &interest) { nack_->setName(interest.getName()); - uint32_t *payload_ptr = (uint32_t *)nack_->getPayload().data(); + uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data(); *payload_ptr = currentSeg_; *(++payload_ptr) = bytesProductionRate_; diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/indexing_manager.h index c692695a3..b6b8bb4a6 100644 --- a/libtransport/src/hicn/transport/protocols/indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/indexing_manager.h @@ -40,6 +40,8 @@ class IndexManager { */ virtual uint32_t getNextSuffix() = 0; + virtual void setFirstSuffix(uint32_t suffix) = 0; + /** * Retrive the next segment to be reassembled. */ @@ -72,52 +74,6 @@ class IndexVerificationManager : public IndexManager { virtual bool onContentObject(const core::ContentObject &content_object) = 0; }; -class ZeroIndexManager : public IndexVerificationManager { - public: - ZeroIndexManager() : reset_(true) {} - - TRANSPORT_ALWAYS_INLINE virtual void reset() override { reset_ = true; } - - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { - uint32_t ret = reset_ ? 0 : IndexManager::invalid_index; - reset_ = false; - return ret; - } - - /** - * Retrive the next segment to be reassembled. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { - return IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { - return false; - } - - TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { - return IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE bool onManifest( - core::ContentObject::Ptr &&content_object) override { - throw errors::UnexpectedManifestException(); - } - - TRANSPORT_ALWAYS_INLINE bool onContentObject( - const core::ContentObject &content_object) override { - throw errors::RuntimeException( - "Called onContentObject on a ZeroIndexManager, which is not able to " - "process packets."); - } - - private: - bool reset_; -}; - class IncrementalIndexManager : public IndexVerificationManager { public: IncrementalIndexManager(interface::ConsumerSocket *icn_socket) @@ -135,7 +91,7 @@ class IncrementalIndexManager : public IndexVerificationManager { TRANSPORT_ALWAYS_INLINE virtual void reset() override { final_suffix_ = std::numeric_limits<uint32_t>::max(); - next_download_suffix_ = 0; + next_download_suffix_ = first_suffix_; next_reassembly_suffix_ = 0; } @@ -147,6 +103,11 @@ class IncrementalIndexManager : public IndexVerificationManager { : IndexManager::invalid_index; } + TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix( + uint32_t suffix) override { + first_suffix_ = suffix; + } + /** * Retrive the next segment to be reassembled. */ @@ -183,6 +144,7 @@ class IncrementalIndexManager : public IndexVerificationManager { protected: interface::ConsumerSocket *socket_; uint32_t final_suffix_; + uint32_t first_suffix_; uint32_t next_download_suffix_; uint32_t next_reassembly_suffix_; std::unique_ptr<VerificationManager> verification_manager_; diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index cd3a6cb85..be76f7c23 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -98,6 +98,11 @@ int RaaqmTransportProtocol::start() { void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } void RaaqmTransportProtocol::reset() { + // Set first segment to retrieve + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->setFirstSuffix(name->getSuffix()); + // Reset reassembly component BaseReassembly::reset(); @@ -333,17 +338,9 @@ void RaaqmTransportProtocol::onContentObject( index_manager_->onManifest(std::move(content_object)); } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - index_manager_ = incremental_index_manager_.get(); - } - onContentSegment(std::move(interest), std::move(content_object)); } - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - BaseReassembly::index_ = index_manager_->getNextReassemblySegment(); - } - scheduleNextInterests(); } diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index 53180935c..899f701c7 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -25,12 +25,11 @@ namespace protocol { BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, ContentReassembledCallback *content_callback) : reassembly_consumer_socket_(icn_socket), - zero_index_manager_(std::make_unique<ZeroIndexManager>()), incremental_index_manager_( std::make_unique<IncrementalIndexManager>(icn_socket)), manifest_index_manager_( std::make_unique<ManifestIndexManager>(icn_socket)), - index_manager_(zero_index_manager_.get()), + index_manager_(incremental_index_manager_.get()), index_(0) { setContentCallback(content_callback); } @@ -54,14 +53,14 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { } void BaseReassembly::copyContent(const ContentObject &content_object) { - utils::Array<> a = content_object.getPayload(); + auto a = content_object.getPayload(); std::shared_ptr<std::vector<uint8_t>> content_buffer; reassembly_consumer_socket_->getSocketOption( interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - content_buffer->insert(content_buffer->end(), (uint8_t *)a.data(), - (uint8_t *)a.data() + a.length()); + content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(), + (uint8_t *)a->data() + a->length()); bool download_completed = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); @@ -74,6 +73,7 @@ void BaseReassembly::copyContent(const ContentObject &content_object) { void BaseReassembly::reset() { manifest_index_manager_->reset(); incremental_index_manager_->reset(); + index_ = index_manager_->getNextReassemblySegment(); received_packets_.clear(); } diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index ef3e99fc5..9efddb773 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -57,7 +57,6 @@ class BaseReassembly : public Reassembly { protected: // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; - std::unique_ptr<ZeroIndexManager> zero_index_manager_; std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; std::unique_ptr<ManifestIndexManager> manifest_index_manager_; IndexVerificationManager *index_manager_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index c2323345f..48ba90f9e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -186,8 +186,8 @@ void RTCTransportProtocol::updateDelayStats( pathTable_[pathLabel]->insertRttSample(RTT); // we collect OWD only for datapackets - if (content_object.getPayload().length() != HICN_NACK_HEADER_SIZE) { - uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload().data(); + if (content_object.getPayload()->length() != HICN_NACK_HEADER_SIZE) { + uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload()->data(); int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch()) @@ -474,7 +474,7 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RTCTransportProtocol::onNack(const ContentObject &content_object) { - uint32_t *payload = (uint32_t *)content_object.getPayload().data(); + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); @@ -531,7 +531,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t payload_size = (uint32_t)content_object->getPayload().length(); + uint32_t payload_size = (uint32_t)content_object->getPayload()->length(); uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; @@ -549,7 +549,7 @@ void RTCTransportProtocol::onContentObject( avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * - content_object->getPayload().length()); + content_object->getPayload()->length()); if (inflightInterests_[pkt].retransmissions == 0) { inflightInterestsCount_--; @@ -570,10 +570,10 @@ void RTCTransportProtocol::onContentObject( void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user - Array a = content_object.getPayload(); + auto a = content_object.getPayload(); - uint8_t *start = ((uint8_t *)a.data()) + HICN_TIMESTAMP_SIZE; - unsigned size = (unsigned)(a.length() - HICN_TIMESTAMP_SIZE); + uint8_t *start = ((uint8_t *)a->data()) + HICN_TIMESTAMP_SIZE; + unsigned size = (unsigned)(a->length() - HICN_TIMESTAMP_SIZE); // set offset between hICN and RTP packets uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index e10907ccc..6f605aab9 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -533,7 +533,7 @@ class HIperfServer { if (!ec) { auto payload = content_objects_[content_objects_index_++ & mask_]->getPayload(); - producer_socket_->produce(payload.data(), payload.length()); + producer_socket_->produce(payload->data(), payload->length()); rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); @@ -625,7 +625,8 @@ void usage() { "the suffix to 0." << std::endl; std::cerr << "-B\t<bitrate>\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." << std::endl; + << "Bitrate for RTC producer, to be used with the -R option." + << std::endl; std::cerr << std::endl; std::cerr << "Client specific:" << std::endl; std::cerr << "-b\t<beta_parameter>\t\t" |