summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-03-19 16:21:55 +0100
committerMauro Sardara <msardara@cisco.com>2019-03-19 16:23:22 +0100
commit393a0a4018936134fa1b343523bbafaab606973d (patch)
treeeb587157a3ff5235ec02611287c02c5e08487eba /libtransport
parent7266728a7857b038679fe35321ace30386f0c461 (diff)
[HICN-125 HICN-126]
- Add MemBuf as return type of getPayload of transport::core::Packet - Fix incremental index manager Change-Id: Ib557d56b1bf42e3974364c611b825b21f1e3d3f1 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/core/manifest_format_fixed.cc5
-rw-r--r--libtransport/src/hicn/transport/core/packet.cc146
-rw-r--r--libtransport/src/hicn/transport/core/packet.h6
-rw-r--r--libtransport/src/hicn/transport/http/server_acceptor.cc8
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc20
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc2
-rw-r--r--libtransport/src/hicn/transport/protocols/indexing_manager.h56
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc13
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc10
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h1
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc16
11 files changed, 109 insertions, 174 deletions
diff --git a/libtransport/src/hicn/transport/core/manifest_format_fixed.cc b/libtransport/src/hicn/transport/core/manifest_format_fixed.cc
index 73b33268c..b97aa7079 100644
--- a/libtransport/src/hicn/transport/core/manifest_format_fixed.cc
+++ b/libtransport/src/hicn/transport/core/manifest_format_fixed.cc
@@ -143,10 +143,9 @@ std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() {
FixedManifestDecoder::FixedManifestDecoder(Packet &packet)
: packet_(packet),
manifest_header_(reinterpret_cast<ManifestHeader *>(
- const_cast<uint8_t *>(packet_.getPayload().data()))),
+ packet_.getPayload()->writableData())),
manifest_entries_(reinterpret_cast<ManifestEntry *>(
- const_cast<uint8_t *>(packet_.getPayload().data()) +
- sizeof(ManifestHeader))) {}
+ packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {}
FixedManifestDecoder::~FixedManifestDecoder() {}
diff --git a/libtransport/src/hicn/transport/core/packet.cc b/libtransport/src/hicn/transport/core/packet.cc
index d925375ce..ea9666ff7 100644
--- a/libtransport/src/hicn/transport/core/packet.cc
+++ b/libtransport/src/hicn/transport/core/packet.cc
@@ -34,11 +34,11 @@ const core::Name Packet::base_name("0::0|0");
Packet::Packet(Format format)
: packet_(utils::MemBuf::create(getHeaderSizeFromFormat(format, 256))
.release()),
- packet_start_(packet_->writableData()),
+ packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())),
header_head_(packet_.get()),
payload_head_(nullptr),
format_(format) {
- if (hicn_packet_init_header(format, (hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_init_header(format, packet_start_) < 0) {
throw errors::RuntimeException("Unexpected error initializing the packet.");
}
@@ -47,10 +47,10 @@ Packet::Packet(Format format)
Packet::Packet(MemBufPtr &&buffer)
: packet_(std::move(buffer)),
- packet_start_(packet_->writableData()),
+ packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())),
header_head_(packet_.get()),
payload_head_(nullptr),
- format_(getFormatFromBuffer(packet_start_)) {}
+ format_(getFormatFromBuffer(packet_->writableData())) {}
Packet::Packet(const uint8_t *buffer, std::size_t size)
: Packet(MemBufPtr(utils::MemBuf::copyBuffer(buffer, size).release())) {}
@@ -124,25 +124,24 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format,
void Packet::replace(MemBufPtr &&buffer) {
packet_ = std::move(buffer);
- packet_start_ = packet_->writableData();
+ packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
header_head_ = packet_.get();
payload_head_ = nullptr;
- format_ = getFormatFromBuffer(packet_start_);
+ format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
}
std::size_t Packet::payloadSize() const {
- return getPayloadSizeFromBuffer(format_, packet_start_);
+ return getPayloadSizeFromBuffer(format_,
+ reinterpret_cast<uint8_t *>(packet_start_));
}
std::size_t Packet::headerSize() const {
- return getHeaderSizeFromBuffer(format_, packet_start_);
+ return getHeaderSizeFromBuffer(format_,
+ reinterpret_cast<uint8_t *>(packet_start_));
}
-const uint8_t *Packet::start() const { return packet_start_; }
-
void Packet::setLifetime(uint32_t lifetime) {
- if (hicn_interest_set_lifetime((hicn_header_t *)packet_start_, lifetime) <
- 0) {
+ if (hicn_interest_set_lifetime(packet_start_, lifetime) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -150,7 +149,7 @@ void Packet::setLifetime(uint32_t lifetime) {
uint32_t Packet::getLifetime() const {
uint32_t lifetime = 0;
- if (hicn_packet_get_lifetime((hicn_header_t *)packet_start_, &lifetime) < 0) {
+ if (hicn_packet_get_lifetime(packet_start_, &lifetime) < 0) {
throw errors::MalformedPacketException();
}
@@ -190,20 +189,16 @@ Packet &Packet::appendHeader(const uint8_t *buffer, std::size_t length) {
return appendHeader(utils::MemBuf::copyBuffer(buffer, length));
}
-utils::Array<uint8_t> Packet::getPayload() const {
+std::unique_ptr<utils::MemBuf> Packet::getPayload() const {
const_cast<Packet *>(this)->separateHeaderPayload();
- if (TRANSPORT_EXPECT_FALSE(payload_head_ == nullptr)) {
- return utils::Array<uint8_t>();
- }
-
// Hopefully the payload is contiguous
- if (TRANSPORT_EXPECT_FALSE(payload_head_->next() != header_head_)) {
+ if (TRANSPORT_EXPECT_FALSE(payload_head_ &&
+ payload_head_->next() != header_head_)) {
payload_head_->gather(payloadSize());
}
- return utils::Array<uint8_t>(payload_head_->writableData(),
- payload_head_->length());
+ return payload_head_->cloneOne();
}
Packet &Packet::updateLength(std::size_t length) {
@@ -214,8 +209,8 @@ Packet &Packet::updateLength(std::size_t length) {
total_length += current->length();
}
- if (hicn_packet_set_payload_length(format_, (hicn_header_t *)packet_start_,
- total_length) < 0) {
+ if (hicn_packet_set_payload_length(format_, packet_start_, total_length) <
+ 0) {
throw errors::RuntimeException("Error setting the packet payload.");
}
@@ -225,7 +220,7 @@ Packet &Packet::updateLength(std::size_t length) {
PayloadType Packet::getPayloadType() const {
hicn_payload_type_t ret = HPT_UNSPEC;
- if (hicn_packet_get_payload_type((hicn_header_t *)packet_start_, &ret) < 0) {
+ if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) {
throw errors::RuntimeException("Impossible to retrieve payload type.");
}
@@ -233,7 +228,7 @@ PayloadType Packet::getPayloadType() const {
}
Packet &Packet::setPayloadType(PayloadType payload_type) {
- if (hicn_packet_set_payload_type((hicn_header_t *)packet_start_,
+ if (hicn_packet_set_payload_type(packet_start_,
hicn_payload_type_t(payload_type)) < 0) {
throw errors::RuntimeException("Error setting payload type of the packet.");
}
@@ -243,7 +238,7 @@ Packet &Packet::setPayloadType(PayloadType payload_type) {
Packet::Format Packet::getFormat() const {
if (format_ == HF_UNSPEC) {
- if (hicn_packet_get_format((hicn_header_t *)packet_start_, &format_) < 0) {
+ if (hicn_packet_get_format(packet_start_, &format_) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -268,8 +263,7 @@ void Packet::dump() const {
}
void Packet::setSignatureSize(std::size_t size_bytes) {
- int ret = hicn_packet_set_signature_size(
- format_, (hicn_header_t *)packet_start_, size_bytes);
+ int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes);
if (ret < 0) {
throw errors::RuntimeException("Packet without Authentication Header.");
@@ -281,8 +275,7 @@ void Packet::setSignatureSize(std::size_t size_bytes) {
uint8_t *Packet::getSignature() const {
uint8_t *signature;
- int ret = hicn_packet_get_signature(format_, (hicn_header_t *)packet_start_,
- &signature);
+ int ret = hicn_packet_get_signature(format_, packet_start_, &signature);
if (ret < 0) {
throw errors::RuntimeException("Packet without Authentication Header.");
@@ -293,8 +286,7 @@ uint8_t *Packet::getSignature() const {
std::size_t Packet::getSignatureSize() const {
size_t size_bytes;
- int ret = hicn_packet_get_signature_size(
- format_, (hicn_header_t *)packet_start_, &size_bytes);
+ int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
if (ret < 0) {
throw errors::RuntimeException("Packet without Authentication Header.");
@@ -304,8 +296,8 @@ std::size_t Packet::getSignatureSize() const {
}
void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
- int ret = hicn_packet_set_signature_timestamp(
- format_, (hicn_header_t *)packet_start_, timestamp);
+ int ret =
+ hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
if (ret < 0) {
throw errors::RuntimeException("Error setting the signature timestamp.");
@@ -314,8 +306,8 @@ void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
uint64_t Packet::getSignatureTimestamp() const {
uint64_t return_value;
- int ret = hicn_packet_get_signature_timestamp(
- format_, (hicn_header_t *)packet_start_, &return_value);
+ int ret = hicn_packet_get_signature_timestamp(format_, packet_start_,
+ &return_value);
if (ret < 0) {
throw errors::RuntimeException("Error getting the signature timestamp.");
@@ -326,8 +318,8 @@ uint64_t Packet::getSignatureTimestamp() const {
void Packet::setValidationAlgorithm(
const utils::CryptoSuite &validation_algorithm) {
- int ret = hicn_packet_set_validation_algorithm(
- format_, (hicn_header_t *)packet_start_, uint8_t(validation_algorithm));
+ int ret = hicn_packet_set_validation_algorithm(format_, packet_start_,
+ uint8_t(validation_algorithm));
if (ret < 0) {
throw errors::RuntimeException("Error setting the validation algorithm.");
@@ -336,8 +328,8 @@ void Packet::setValidationAlgorithm(
utils::CryptoSuite Packet::getValidationAlgorithm() const {
uint8_t return_value;
- int ret = hicn_packet_get_validation_algorithm(
- format_, (hicn_header_t *)packet_start_, &return_value);
+ int ret = hicn_packet_get_validation_algorithm(format_, packet_start_,
+ &return_value);
if (ret < 0) {
throw errors::RuntimeException("Error getting the validation algorithm.");
@@ -347,8 +339,7 @@ utils::CryptoSuite Packet::getValidationAlgorithm() const {
}
void Packet::setKeyId(const utils::KeyId &key_id) {
- int ret = hicn_packet_set_key_id(format_, (hicn_header_t *)packet_start_,
- key_id.first);
+ int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first);
if (ret < 0) {
throw errors::RuntimeException("Error setting the key id.");
@@ -357,8 +348,8 @@ void Packet::setKeyId(const utils::KeyId &key_id) {
utils::KeyId Packet::getKeyId() const {
utils::KeyId return_value;
- int ret = hicn_packet_get_key_id(format_, (hicn_header_t *)packet_start_,
- &return_value.first, &return_value.second);
+ int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first,
+ &return_value.second);
if (ret < 0) {
throw errors::RuntimeException("Error getting the validation algorithm.");
@@ -374,8 +365,7 @@ utils::CryptoHash Packet::computeDigest(HashAlgorithm algorithm) const {
// Copy IP+TCP/ICMP header before zeroing them
hicn_header_t header_copy;
- hicn_packet_copy_header(format_, (hicn_header_t *)packet_start_, &header_copy,
- false);
+ hicn_packet_copy_header(format_, packet_start_, &header_copy, false);
const_cast<Packet *>(this)->resetForHash();
@@ -385,8 +375,7 @@ utils::CryptoHash Packet::computeDigest(HashAlgorithm algorithm) const {
current = current->next();
} while (current != header_head_);
- hicn_packet_copy_header(format_, &header_copy, (hicn_header_t *)packet_start_,
- false);
+ hicn_packet_copy_header(format_, &header_copy, packet_start_, false);
return hasher.finalize();
}
@@ -401,15 +390,14 @@ void Packet::setChecksum() {
}
partial_csum = csum(current->data(), current->length(), partial_csum);
}
- if (hicn_packet_compute_header_checksum(
- format_, (hicn_header_t *)packet_start_, partial_csum) < 0) {
+ if (hicn_packet_compute_header_checksum(format_, packet_start_,
+ partial_csum) < 0) {
throw errors::MalformedPacketException();
}
}
bool Packet::checkIntegrity() const {
- if (hicn_packet_check_integrity(format_, (hicn_header_t *)packet_start_) <
- 0) {
+ if (hicn_packet_check_integrity(format_, packet_start_) < 0) {
return false;
}
@@ -417,7 +405,7 @@ bool Packet::checkIntegrity() const {
}
Packet &Packet::setSyn() {
- if (hicn_packet_set_syn((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_set_syn(packet_start_) < 0) {
throw errors::RuntimeException("Error setting syn bit in the packet.");
}
@@ -425,7 +413,7 @@ Packet &Packet::setSyn() {
}
Packet &Packet::resetSyn() {
- if (hicn_packet_reset_syn((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_reset_syn(packet_start_) < 0) {
throw errors::RuntimeException("Error resetting syn bit in the packet.");
}
@@ -434,7 +422,7 @@ Packet &Packet::resetSyn() {
bool Packet::testSyn() const {
bool res = false;
- if (hicn_packet_test_syn((hicn_header_t *)packet_start_, &res) < 0) {
+ if (hicn_packet_test_syn(packet_start_, &res) < 0) {
throw errors::RuntimeException("Error testing syn bit in the packet.");
}
@@ -442,7 +430,7 @@ bool Packet::testSyn() const {
}
Packet &Packet::setAck() {
- if (hicn_packet_set_ack((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_set_ack(packet_start_) < 0) {
throw errors::RuntimeException("Error setting ack bit in the packet.");
}
@@ -450,7 +438,7 @@ Packet &Packet::setAck() {
}
Packet &Packet::resetAck() {
- if (hicn_packet_reset_ack((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_reset_ack(packet_start_) < 0) {
throw errors::RuntimeException("Error resetting ack bit in the packet.");
}
@@ -459,7 +447,7 @@ Packet &Packet::resetAck() {
bool Packet::testAck() const {
bool res = false;
- if (hicn_packet_test_ack((hicn_header_t *)packet_start_, &res) < 0) {
+ if (hicn_packet_test_ack(packet_start_, &res) < 0) {
throw errors::RuntimeException("Error testing ack bit in the packet.");
}
@@ -467,7 +455,7 @@ bool Packet::testAck() const {
}
Packet &Packet::setRst() {
- if (hicn_packet_set_rst((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_set_rst(packet_start_) < 0) {
throw errors::RuntimeException("Error setting rst bit in the packet.");
}
@@ -475,7 +463,7 @@ Packet &Packet::setRst() {
}
Packet &Packet::resetRst() {
- if (hicn_packet_reset_rst((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_reset_rst(packet_start_) < 0) {
throw errors::RuntimeException("Error resetting rst bit in the packet.");
}
@@ -484,7 +472,7 @@ Packet &Packet::resetRst() {
bool Packet::testRst() const {
bool res = false;
- if (hicn_packet_test_rst((hicn_header_t *)packet_start_, &res) < 0) {
+ if (hicn_packet_test_rst(packet_start_, &res) < 0) {
throw errors::RuntimeException("Error testing rst bit in the packet.");
}
@@ -492,7 +480,7 @@ bool Packet::testRst() const {
}
Packet &Packet::setFin() {
- if (hicn_packet_set_fin((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_set_fin(packet_start_) < 0) {
throw errors::RuntimeException("Error setting fin bit in the packet.");
}
@@ -500,7 +488,7 @@ Packet &Packet::setFin() {
}
Packet &Packet::resetFin() {
- if (hicn_packet_reset_fin((hicn_header_t *)packet_start_) < 0) {
+ if (hicn_packet_reset_fin(packet_start_) < 0) {
throw errors::RuntimeException("Error resetting fin bit in the packet.");
}
@@ -509,7 +497,7 @@ Packet &Packet::resetFin() {
bool Packet::testFin() const {
bool res = false;
- if (hicn_packet_test_fin((hicn_header_t *)packet_start_, &res) < 0) {
+ if (hicn_packet_test_fin(packet_start_, &res) < 0) {
throw errors::RuntimeException("Error testing fin bit in the packet.");
}
@@ -543,7 +531,7 @@ std::string Packet::printFlags() const {
}
Packet &Packet::setSrcPort(uint16_t srcPort) {
- if (hicn_packet_set_src_port((hicn_header_t *)packet_start_, srcPort) < 0) {
+ if (hicn_packet_set_src_port(packet_start_, srcPort) < 0) {
throw errors::RuntimeException("Error setting source port in the packet.");
}
@@ -551,7 +539,7 @@ Packet &Packet::setSrcPort(uint16_t srcPort) {
}
Packet &Packet::setDstPort(uint16_t dstPort) {
- if (hicn_packet_set_dst_port((hicn_header_t *)packet_start_, dstPort) < 0) {
+ if (hicn_packet_set_dst_port(packet_start_, dstPort) < 0) {
throw errors::RuntimeException(
"Error setting destination port in the packet.");
}
@@ -562,7 +550,7 @@ Packet &Packet::setDstPort(uint16_t dstPort) {
uint16_t Packet::getSrcPort() const {
uint16_t port = 0;
- if (hicn_packet_get_src_port((hicn_header_t *)packet_start_, &port) < 0) {
+ if (hicn_packet_get_src_port(packet_start_, &port) < 0) {
throw errors::RuntimeException("Error reading source port in the packet.");
}
@@ -572,7 +560,7 @@ uint16_t Packet::getSrcPort() const {
uint16_t Packet::getDstPort() const {
uint16_t port = 0;
- if (hicn_packet_get_dst_port((hicn_header_t *)packet_start_, &port) < 0) {
+ if (hicn_packet_get_dst_port(packet_start_, &port) < 0) {
throw errors::RuntimeException(
"Error reading destination port in the packet.");
}
@@ -581,7 +569,7 @@ uint16_t Packet::getDstPort() const {
}
Packet &Packet::setTTL(uint8_t hops) {
- if (hicn_packet_set_hoplimit((hicn_header_t *)packet_start_, hops) < 0) {
+ if (hicn_packet_set_hoplimit(packet_start_, hops) < 0) {
throw errors::RuntimeException("Error setting TTL.");
}
@@ -590,7 +578,7 @@ Packet &Packet::setTTL(uint8_t hops) {
uint8_t Packet::getTTL() const {
uint8_t hops = 0;
- if (hicn_packet_get_hoplimit((hicn_header_t *)packet_start_, &hops) < 0) {
+ if (hicn_packet_get_hoplimit(packet_start_, &hops) < 0) {
throw errors::RuntimeException("Error reading TTL.");
}
@@ -603,27 +591,21 @@ void Packet::separateHeaderPayload() {
}
int signature_size = 0;
-
if (_is_ah(format_)) {
signature_size = (uint32_t)getSignatureSize();
}
auto header_size = getHeaderSizeFromFormat(format_, signature_size);
auto payload_length = packet_->length() - header_size;
- if (!payload_length) {
- return;
- }
packet_->trimEnd(packet_->length());
- if (payload_length) {
- auto payload = packet_->cloneOne();
- payload_head_ = payload.get();
- payload_head_->advance(header_size);
- payload_head_->append(payload_length);
- packet_->prependChain(std::move(payload));
- packet_->append(header_size);
- }
+ auto payload = packet_->cloneOne();
+ payload_head_ = payload.get();
+ payload_head_->advance(header_size);
+ payload_head_->append(payload_length);
+ packet_->prependChain(std::move(payload));
+ packet_->append(header_size);
}
} // end namespace core
diff --git a/libtransport/src/hicn/transport/core/packet.h b/libtransport/src/hicn/transport/core/packet.h
index 78dbeae07..16191dd7f 100644
--- a/libtransport/src/hicn/transport/core/packet.h
+++ b/libtransport/src/hicn/transport/core/packet.h
@@ -101,8 +101,6 @@ class Packet : public std::enable_shared_from_this<Packet> {
const std::shared_ptr<utils::MemBuf> data();
- const uint8_t *start() const;
-
virtual const Name &getName() const = 0;
virtual Name &getWritableName() = 0;
@@ -123,7 +121,7 @@ class Packet : public std::enable_shared_from_this<Packet> {
Packet &appendHeader(const uint8_t *buffer, std::size_t length);
- utils::Array<uint8_t> getPayload() const;
+ std::unique_ptr<utils::MemBuf> getPayload() const;
Packet &updateLength(std::size_t length = 0);
@@ -190,7 +188,7 @@ class Packet : public std::enable_shared_from_this<Packet> {
protected:
Name name_;
MemBufPtr packet_;
- uint8_t *packet_start_;
+ hicn_header_t *packet_start_;
utils::MemBuf *header_head_;
utils::MemBuf *payload_head_;
mutable Format format_;
diff --git a/libtransport/src/hicn/transport/http/server_acceptor.cc b/libtransport/src/hicn/transport/http/server_acceptor.cc
index 615fa80d8..486b04c57 100644
--- a/libtransport/src/hicn/transport/http/server_acceptor.cc
+++ b/libtransport/src/hicn/transport/http/server_acceptor.cc
@@ -85,9 +85,9 @@ void HTTPServerAcceptor::listen(bool async) {
void HTTPServerAcceptor::processIncomingInterest(ProducerSocket &p,
Interest &interest) {
// Temporary solution. With
- utils::Array<uint8_t> payload = interest.getPayload();
+ auto payload = interest.getPayload();
- int request_id = utils::hash::fnv32_buf(payload.data(), payload.length());
+ int request_id = utils::hash::fnv32_buf(payload->data(), payload->length());
if (publishers_.find(request_id) != publishers_.end()) {
if (publishers_[request_id]) {
@@ -98,8 +98,8 @@ void HTTPServerAcceptor::processIncomingInterest(ProducerSocket &p,
publishers_[request_id] =
std::make_shared<HTTPServerPublisher>(interest.getName());
- callback_(publishers_[request_id], (uint8_t *)payload.data(),
- payload.length(), request_id);
+ callback_(publishers_[request_id], (uint8_t *)payload->data(),
+ payload->length(), request_id);
}
std::map<int, std::shared_ptr<HTTPServerPublisher>>
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
index 3bb51e72e..2e180cf34 100644
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
@@ -261,7 +261,7 @@ void AsyncFullDuplexSocket::signalProductionToSubscribers(
// Todo consider using preallocated pool of membufs
auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
_payload->append(sizeof(ActionMessage));
- auto payload = const_cast<uint8_t *>(interest->getPayload().data());
+ auto payload = interest->getPayload()->writableData();
ActionMessage *produce_notification =
reinterpret_cast<ActionMessage *>(payload);
@@ -294,7 +294,7 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage(
const core::Interest &interest) {
auto mesg = interest.getPayload();
const MessageHeader *header =
- reinterpret_cast<const MessageHeader *>(mesg.data());
+ reinterpret_cast<const MessageHeader *>(mesg->data());
switch (header->msg_type) {
case MessageType::ACTION: {
@@ -350,14 +350,12 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage(
// We saved one round trip :)
auto buffer = ContentBuffer();
- const uint8_t *data = mesg.data() + sizeof(PayloadMessage);
- buffer->assign(data, data + mesg.length() - sizeof(PayloadMessage));
+ const uint8_t *data = mesg->data() + sizeof(PayloadMessage);
+ buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage));
read_callback_->readBufferAvailable(std::move(buffer));
return createAck();
}
- default: {
- return std::shared_ptr<core::ContentObject>(nullptr);
- }
+ default: { return std::shared_ptr<core::ContentObject>(nullptr); }
}
return std::shared_ptr<core::ContentObject>(nullptr);
@@ -366,7 +364,7 @@ AsyncFullDuplexSocket::decodeSynchronizationMessage(
void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s,
const core::Interest &i) {
auto payload = i.getPayload();
- if (payload.length()) {
+ if (payload->length()) {
// Try to decode payload and see if starting an async pull operation
auto response = decodeSynchronizationMessage(i);
if (response) {
@@ -412,14 +410,14 @@ void AsyncFullDuplexSocket::OnConnectCallback::onContentObject(
// The ack message should contain the name to be used for notifying
// the production of the content to the other part
- if (content_object->getPayload().length() == 0) {
+ if (content_object->getPayload()->length() == 0) {
TRANSPORT_LOGW("Connection response message empty....");
return;
}
SubscriptionResponseMessage *response =
reinterpret_cast<SubscriptionResponseMessage *>(
- content_object->getPayload().writableData());
+ content_object->getPayload()->writableData());
if (response->response.header.msg_type == MessageType::RESPONSE) {
if (response->response.return_code == ReturnCode::OK) {
@@ -457,7 +455,7 @@ std::shared_ptr<core::ContentObject> AsyncFullDuplexSocket::createAck() {
auto response = std::make_shared<core::ContentObject>(name);
auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
_payload->append(sizeof(ResponseMessage));
- auto payload = response->getPayload().data();
+ auto payload = response->getPayload()->data();
ResponseMessage *response_message = (ResponseMessage *)payload;
response_message->header.msg_type = MessageType::RESPONSE;
response_message->header.reserved[0] = 0;
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 87cf27b75..00cc82543 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -163,7 +163,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
void RTCProducerSocket::sendNack(const Interest &interest) {
nack_->setName(interest.getName());
- uint32_t *payload_ptr = (uint32_t *)nack_->getPayload().data();
+ uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data();
*payload_ptr = currentSeg_;
*(++payload_ptr) = bytesProductionRate_;
diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/indexing_manager.h
index c692695a3..b6b8bb4a6 100644
--- a/libtransport/src/hicn/transport/protocols/indexing_manager.h
+++ b/libtransport/src/hicn/transport/protocols/indexing_manager.h
@@ -40,6 +40,8 @@ class IndexManager {
*/
virtual uint32_t getNextSuffix() = 0;
+ virtual void setFirstSuffix(uint32_t suffix) = 0;
+
/**
* Retrive the next segment to be reassembled.
*/
@@ -72,52 +74,6 @@ class IndexVerificationManager : public IndexManager {
virtual bool onContentObject(const core::ContentObject &content_object) = 0;
};
-class ZeroIndexManager : public IndexVerificationManager {
- public:
- ZeroIndexManager() : reset_(true) {}
-
- TRANSPORT_ALWAYS_INLINE virtual void reset() override { reset_ = true; }
-
- /**
- * Retrieve from the manifest the next suffix to retrieve.
- */
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override {
- uint32_t ret = reset_ ? 0 : IndexManager::invalid_index;
- reset_ = false;
- return ret;
- }
-
- /**
- * Retrive the next segment to be reassembled.
- */
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override {
- return IndexManager::invalid_index;
- }
-
- TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override {
- return false;
- }
-
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override {
- return IndexManager::invalid_index;
- }
-
- TRANSPORT_ALWAYS_INLINE bool onManifest(
- core::ContentObject::Ptr &&content_object) override {
- throw errors::UnexpectedManifestException();
- }
-
- TRANSPORT_ALWAYS_INLINE bool onContentObject(
- const core::ContentObject &content_object) override {
- throw errors::RuntimeException(
- "Called onContentObject on a ZeroIndexManager, which is not able to "
- "process packets.");
- }
-
- private:
- bool reset_;
-};
-
class IncrementalIndexManager : public IndexVerificationManager {
public:
IncrementalIndexManager(interface::ConsumerSocket *icn_socket)
@@ -135,7 +91,7 @@ class IncrementalIndexManager : public IndexVerificationManager {
TRANSPORT_ALWAYS_INLINE virtual void reset() override {
final_suffix_ = std::numeric_limits<uint32_t>::max();
- next_download_suffix_ = 0;
+ next_download_suffix_ = first_suffix_;
next_reassembly_suffix_ = 0;
}
@@ -147,6 +103,11 @@ class IncrementalIndexManager : public IndexVerificationManager {
: IndexManager::invalid_index;
}
+ TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix(
+ uint32_t suffix) override {
+ first_suffix_ = suffix;
+ }
+
/**
* Retrive the next segment to be reassembled.
*/
@@ -183,6 +144,7 @@ class IncrementalIndexManager : public IndexVerificationManager {
protected:
interface::ConsumerSocket *socket_;
uint32_t final_suffix_;
+ uint32_t first_suffix_;
uint32_t next_download_suffix_;
uint32_t next_reassembly_suffix_;
std::unique_ptr<VerificationManager> verification_manager_;
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index cd3a6cb85..be76f7c23 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -98,6 +98,11 @@ int RaaqmTransportProtocol::start() {
void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
void RaaqmTransportProtocol::reset() {
+ // Set first segment to retrieve
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ index_manager_->setFirstSuffix(name->getSuffix());
+
// Reset reassembly component
BaseReassembly::reset();
@@ -333,17 +338,9 @@ void RaaqmTransportProtocol::onContentObject(
index_manager_->onManifest(std::move(content_object));
} else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
- index_manager_ = incremental_index_manager_.get();
- }
-
onContentSegment(std::move(interest), std::move(content_object));
}
- if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
- BaseReassembly::index_ = index_manager_->getNextReassemblySegment();
- }
-
scheduleNextInterests();
}
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index 53180935c..899f701c7 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -25,12 +25,11 @@ namespace protocol {
BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
ContentReassembledCallback *content_callback)
: reassembly_consumer_socket_(icn_socket),
- zero_index_manager_(std::make_unique<ZeroIndexManager>()),
incremental_index_manager_(
std::make_unique<IncrementalIndexManager>(icn_socket)),
manifest_index_manager_(
std::make_unique<ManifestIndexManager>(icn_socket)),
- index_manager_(zero_index_manager_.get()),
+ index_manager_(incremental_index_manager_.get()),
index_(0) {
setContentCallback(content_callback);
}
@@ -54,14 +53,14 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
}
void BaseReassembly::copyContent(const ContentObject &content_object) {
- utils::Array<> a = content_object.getPayload();
+ auto a = content_object.getPayload();
std::shared_ptr<std::vector<uint8_t>> content_buffer;
reassembly_consumer_socket_->getSocketOption(
interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
- content_buffer->insert(content_buffer->end(), (uint8_t *)a.data(),
- (uint8_t *)a.data() + a.length());
+ content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(),
+ (uint8_t *)a->data() + a->length());
bool download_completed =
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
@@ -74,6 +73,7 @@ void BaseReassembly::copyContent(const ContentObject &content_object) {
void BaseReassembly::reset() {
manifest_index_manager_->reset();
incremental_index_manager_->reset();
+ index_ = index_manager_->getNextReassemblySegment();
received_packets_.clear();
}
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index ef3e99fc5..9efddb773 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -57,7 +57,6 @@ class BaseReassembly : public Reassembly {
protected:
// The consumer socket
interface::ConsumerSocket *reassembly_consumer_socket_;
- std::unique_ptr<ZeroIndexManager> zero_index_manager_;
std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
IndexVerificationManager *index_manager_;
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index c2323345f..48ba90f9e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -186,8 +186,8 @@ void RTCTransportProtocol::updateDelayStats(
pathTable_[pathLabel]->insertRttSample(RTT);
// we collect OWD only for datapackets
- if (content_object.getPayload().length() != HICN_NACK_HEADER_SIZE) {
- uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload().data();
+ if (content_object.getPayload()->length() != HICN_NACK_HEADER_SIZE) {
+ uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload()->data();
int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
@@ -474,7 +474,7 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
void RTCTransportProtocol::onNack(const ContentObject &content_object) {
- uint32_t *payload = (uint32_t *)content_object.getPayload().data();
+ uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
uint32_t nackSegment = content_object.getName().getSuffix();
@@ -531,7 +531,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
void RTCTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t payload_size = (uint32_t)content_object->getPayload().length();
+ uint32_t payload_size = (uint32_t)content_object->getPayload()->length();
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
@@ -549,7 +549,7 @@ void RTCTransportProtocol::onContentObject(
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) *
- content_object->getPayload().length());
+ content_object->getPayload()->length());
if (inflightInterests_[pkt].retransmissions == 0) {
inflightInterestsCount_--;
@@ -570,10 +570,10 @@ void RTCTransportProtocol::onContentObject(
void RTCTransportProtocol::returnContentToApplication(
const ContentObject &content_object) {
// return content to the user
- Array a = content_object.getPayload();
+ auto a = content_object.getPayload();
- uint8_t *start = ((uint8_t *)a.data()) + HICN_TIMESTAMP_SIZE;
- unsigned size = (unsigned)(a.length() - HICN_TIMESTAMP_SIZE);
+ uint8_t *start = ((uint8_t *)a->data()) + HICN_TIMESTAMP_SIZE;
+ unsigned size = (unsigned)(a->length() - HICN_TIMESTAMP_SIZE);
// set offset between hICN and RTP packets
uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));