diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-03-10 15:50:18 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2020-03-10 15:57:31 +0100 |
commit | 23657bc8a770734a74f73f6d07075130a366ef00 (patch) | |
tree | e5c00a85ceca00db9d8926824f7f59de67536972 | |
parent | 01d76a603fba7db5cfb83a1fbc3b369f2e4a4823 (diff) |
[HICN-544] Do not block reading incoming messages in memif connector.
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Change-Id: I844dfa64a977c9c41bfc103bb110c274802b1839
Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r-- | libtransport/src/core/memif_connector.cc | 12 | ||||
-rw-r--r-- | libtransport/src/core/memif_connector.h | 2 | ||||
-rw-r--r-- | libtransport/src/core/portal.h | 16 | ||||
-rw-r--r-- | libtransport/src/protocols/byte_stream_reassembly.cc | 5 | ||||
-rw-r--r-- | utils/src/hiperf.cc | 27 |
5 files changed, 40 insertions, 22 deletions
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc index 179db63e4..49f262ec8 100644 --- a/libtransport/src/core/memif_connector.cc +++ b/libtransport/src/core/memif_connector.cc @@ -305,11 +305,13 @@ void MemifConnector::sendCallback(const std::error_code &ec) { } } -void MemifConnector::processInputBuffer() { +void MemifConnector::processInputBuffer(std::uint16_t total_packets) { Packet::MemBufPtr ptr; - while (input_buffer_.pop(ptr)) { - receive_callback_(std::move(ptr)); + for (; total_packets > 0; total_packets--) { + if (input_buffer_.pop(ptr)) { + receive_callback_(std::move(ptr)); + } } } @@ -339,6 +341,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, memif_connection_t *c = connector->memif_connection_.get(); int err = MEMIF_ERR_SUCCESS, ret_val; + uint16_t total_packets = 0; uint16_t rx; do { @@ -386,11 +389,12 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, } c->rx_buf_num -= rx; + total_packets += rx; } while (ret_val == MEMIF_ERR_NOBUF); connector->io_service_.post( - std::bind(&MemifConnector::processInputBuffer, connector)); + std::bind(&MemifConnector::processInputBuffer, connector, total_packets)); return 0; diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h index aafef1e56..693efd14c 100644 --- a/libtransport/src/core/memif_connector.h +++ b/libtransport/src/core/memif_connector.h @@ -96,7 +96,7 @@ class MemifConnector : public Connector { void sendCallback(const std::error_code &ec); - void processInputBuffer(); + void processInputBuffer(std::uint16_t total_packets); private: static utils::EpollEventReactor main_event_reactor_; diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index 34dfbd826..05715543a 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -24,6 +24,7 @@ #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/fixed_block_allocator.h> #include <core/forwarder_interface.h> #include <core/pending_interest.h> @@ -52,21 +53,20 @@ class HandlerMemory { static constexpr std::size_t memory_size = 1024 * 1024; public: - HandlerMemory() : index_(0) {} + HandlerMemory() {} HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return &storage_[index_++ % memory_size]; + return utils::FixedBlockAllocator<128, 4096>::getInstance() + ->allocateBlock(); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {} - - private: - // Storage space used for handler-based custom memory allocation. - typename std::aligned_storage<128>::type storage_[memory_size]; - uint32_t index_; + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { + utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock( + pointer); + } #else public: HandlerMemory() {} diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index e15498bb1..12631637e 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -39,7 +39,7 @@ ByteStreamReassembly::ByteStreamReassembly( void ByteStreamReassembly::reassemble( std::unique_ptr<ContentObjectManifest> &&manifest) { - if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { + if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) { received_packets_.emplace( std::make_pair(manifest->getName().getSuffix(), nullptr)); assembleContent(); @@ -47,7 +47,8 @@ void ByteStreamReassembly::reassemble( } void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) && + read_buffer_->capacity()) { received_packets_.emplace(std::make_pair( content_object->getName().getSuffix(), std::move(content_object))); assembleContent(); diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 2b78a02b9..eabd12c86 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -72,6 +72,7 @@ struct ClientConfiguration { producer_certificate(""), passphrase(""), receive_buffer(nullptr), + receive_buffer_size_(128 * 1024), download_size(0), report_interval_milliseconds_(1000), transport_protocol_(CBR), @@ -92,6 +93,7 @@ struct ClientConfiguration { std::string producer_certificate; std::string passphrase; std::shared_ptr<utils::MemBuf> receive_buffer; + std::size_t receive_buffer_size_; std::size_t download_size; std::uint32_t report_interval_milliseconds_; TransportProtocolAlgorithms transport_protocol_; @@ -562,11 +564,10 @@ class HIperfClient { }; class Callback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t read_size = 128 * 1024; - public: Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = utils::MemBuf::create(read_size); + client_.configuration_.receive_buffer = + utils::MemBuf::create(client_.configuration_.receive_buffer_size_); } bool isBufferMovable() noexcept override { return false; } @@ -575,7 +576,7 @@ class HIperfClient { size_t *max_length) override { *application_buffer = client_.configuration_.receive_buffer->writableData(); - *max_length = read_size; + *max_length = client_.configuration_.receive_buffer_size_; } void readDataAvailable(std::size_t length) noexcept override {} @@ -583,7 +584,9 @@ class HIperfClient { void readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} - size_t maxBufferSize() const override { return read_size; } + size_t maxBufferSize() const override { + return client_.configuration_.receive_buffer_size_; + } void readError(const std::error_code ec) noexcept override { std::cerr << "Error " << ec.message() << " while reading from socket" @@ -740,6 +743,7 @@ class HIperfServer { } void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { + // std::cout << "Received interest " << interest.getName() << std::endl; content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); producer_socket_->produce( @@ -1147,6 +1151,10 @@ void usage() { << std::endl; std::cerr << "-L\t<interest lifetime>\t\t" << "Set interest lifetime." << std::endl; + std::cerr << "-M\t<input_buffer_size>\t\t" + << "Size of consumer input buffer. If 0, reassembly of packets " + "will be disabled." + << std::endl; std::cerr << "-W\t<window_size>\t\t\t" << "Use a fixed congestion window " "for retrieving the data." @@ -1200,7 +1208,7 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 while ((opt = getopt(argc, argv, - "DSCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != + "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != -1) { switch (opt) { // Common @@ -1214,7 +1222,7 @@ int main(int argc, char *argv[]) { } #else while ((opt = getopt(argc, argv, - "SCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { + "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { switch (opt) { #endif case 'f': { @@ -1260,6 +1268,11 @@ int main(int argc, char *argv[]) { options = 1; break; } + case 'M': { + client_configuration.receive_buffer_size_ = std::stoull(optarg); + options = 1; + break; + } #ifdef SECURE_HICNTRANSPORT case 'P': { client_configuration.producer_prefix_ = Prefix(optarg); |