summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-03-10 15:50:18 +0100
committerMauro Sardara <msardara@cisco.com>2020-03-10 15:57:31 +0100
commit23657bc8a770734a74f73f6d07075130a366ef00 (patch)
treee5c00a85ceca00db9d8926824f7f59de67536972
parent01d76a603fba7db5cfb83a1fbc3b369f2e4a4823 (diff)
[HICN-544] Do not block reading incoming messages in memif connector.
Signed-off-by: Mauro Sardara <msardara@cisco.com> Change-Id: I844dfa64a977c9c41bfc103bb110c274802b1839 Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r--libtransport/src/core/memif_connector.cc12
-rw-r--r--libtransport/src/core/memif_connector.h2
-rw-r--r--libtransport/src/core/portal.h16
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc5
-rw-r--r--utils/src/hiperf.cc27
5 files changed, 40 insertions, 22 deletions
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 179db63e4..49f262ec8 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -305,11 +305,13 @@ void MemifConnector::sendCallback(const std::error_code &ec) {
}
}
-void MemifConnector::processInputBuffer() {
+void MemifConnector::processInputBuffer(std::uint16_t total_packets) {
Packet::MemBufPtr ptr;
- while (input_buffer_.pop(ptr)) {
- receive_callback_(std::move(ptr));
+ for (; total_packets > 0; total_packets--) {
+ if (input_buffer_.pop(ptr)) {
+ receive_callback_(std::move(ptr));
+ }
}
}
@@ -339,6 +341,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
memif_connection_t *c = connector->memif_connection_.get();
int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t total_packets = 0;
uint16_t rx;
do {
@@ -386,11 +389,12 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
}
c->rx_buf_num -= rx;
+ total_packets += rx;
} while (ret_val == MEMIF_ERR_NOBUF);
connector->io_service_.post(
- std::bind(&MemifConnector::processInputBuffer, connector));
+ std::bind(&MemifConnector::processInputBuffer, connector, total_packets));
return 0;
diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h
index aafef1e56..693efd14c 100644
--- a/libtransport/src/core/memif_connector.h
+++ b/libtransport/src/core/memif_connector.h
@@ -96,7 +96,7 @@ class MemifConnector : public Connector {
void sendCallback(const std::error_code &ec);
- void processInputBuffer();
+ void processInputBuffer(std::uint16_t total_packets);
private:
static utils::EpollEventReactor main_event_reactor_;
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index 34dfbd826..05715543a 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -24,6 +24,7 @@
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/fixed_block_allocator.h>
#include <core/forwarder_interface.h>
#include <core/pending_interest.h>
@@ -52,21 +53,20 @@ class HandlerMemory {
static constexpr std::size_t memory_size = 1024 * 1024;
public:
- HandlerMemory() : index_(0) {}
+ HandlerMemory() {}
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return &storage_[index_++ % memory_size];
+ return utils::FixedBlockAllocator<128, 4096>::getInstance()
+ ->allocateBlock();
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {}
-
- private:
- // Storage space used for handler-based custom memory allocation.
- typename std::aligned_storage<128>::type storage_[memory_size];
- uint32_t index_;
+ TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
+ utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock(
+ pointer);
+ }
#else
public:
HandlerMemory() {}
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index e15498bb1..12631637e 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -39,7 +39,7 @@ ByteStreamReassembly::ByteStreamReassembly(
void ByteStreamReassembly::reassemble(
std::unique_ptr<ContentObjectManifest> &&manifest) {
- if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
+ if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) {
received_packets_.emplace(
std::make_pair(manifest->getName().getSuffix(), nullptr));
assembleContent();
@@ -47,7 +47,8 @@ void ByteStreamReassembly::reassemble(
}
void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) &&
+ read_buffer_->capacity()) {
received_packets_.emplace(std::make_pair(
content_object->getName().getSuffix(), std::move(content_object)));
assembleContent();
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 2b78a02b9..eabd12c86 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -72,6 +72,7 @@ struct ClientConfiguration {
producer_certificate(""),
passphrase(""),
receive_buffer(nullptr),
+ receive_buffer_size_(128 * 1024),
download_size(0),
report_interval_milliseconds_(1000),
transport_protocol_(CBR),
@@ -92,6 +93,7 @@ struct ClientConfiguration {
std::string producer_certificate;
std::string passphrase;
std::shared_ptr<utils::MemBuf> receive_buffer;
+ std::size_t receive_buffer_size_;
std::size_t download_size;
std::uint32_t report_interval_milliseconds_;
TransportProtocolAlgorithms transport_protocol_;
@@ -562,11 +564,10 @@ class HIperfClient {
};
class Callback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t read_size = 128 * 1024;
-
public:
Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {
- client_.configuration_.receive_buffer = utils::MemBuf::create(read_size);
+ client_.configuration_.receive_buffer =
+ utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
}
bool isBufferMovable() noexcept override { return false; }
@@ -575,7 +576,7 @@ class HIperfClient {
size_t *max_length) override {
*application_buffer =
client_.configuration_.receive_buffer->writableData();
- *max_length = read_size;
+ *max_length = client_.configuration_.receive_buffer_size_;
}
void readDataAvailable(std::size_t length) noexcept override {}
@@ -583,7 +584,9 @@ class HIperfClient {
void readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
- size_t maxBufferSize() const override { return read_size; }
+ size_t maxBufferSize() const override {
+ return client_.configuration_.receive_buffer_size_;
+ }
void readError(const std::error_code ec) noexcept override {
std::cerr << "Error " << ec.message() << " while reading from socket"
@@ -740,6 +743,7 @@ class HIperfServer {
}
void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ // std::cout << "Received interest " << interest.getName() << std::endl;
content_objects_[content_objects_index_ & mask_]->setName(
interest.getName());
producer_socket_->produce(
@@ -1147,6 +1151,10 @@ void usage() {
<< std::endl;
std::cerr << "-L\t<interest lifetime>\t\t"
<< "Set interest lifetime." << std::endl;
+ std::cerr << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled."
+ << std::endl;
std::cerr << "-W\t<window_size>\t\t\t"
<< "Use a fixed congestion window "
"for retrieving the data."
@@ -1200,7 +1208,7 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
while ((opt = getopt(argc, argv,
- "DSCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
+ "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
-1) {
switch (opt) {
// Common
@@ -1214,7 +1222,7 @@ int main(int argc, char *argv[]) {
}
#else
while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
+ "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
switch (opt) {
#endif
case 'f': {
@@ -1260,6 +1268,11 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'M': {
+ client_configuration.receive_buffer_size_ = std::stoull(optarg);
+ options = 1;
+ break;
+ }
#ifdef SECURE_HICNTRANSPORT
case 'P': {
client_configuration.producer_prefix_ = Prefix(optarg);