From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- utils/src/hiperf.cc | 211 +++++++++++++++++++++++++++++++++-------------- utils/src/ping_server.cc | 115 ++++++++++++++------------ 2 files changed, 211 insertions(+), 115 deletions(-) (limited to 'utils') diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 7594d1f94..ddc79d520 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -16,14 +16,15 @@ #include #include #include +#include #ifndef _WIN32 #include #endif #include -#include #include #include +#include #ifdef __linux__ #include @@ -45,6 +46,9 @@ namespace interface { using CryptoSuite = utils::CryptoSuite; using Identity = utils::Identity; +/** + * Container for command line configuration for hiperf client. + */ struct ClientConfiguration { ClientConfiguration() : name("b001::abcd", 0), @@ -54,7 +58,7 @@ struct ClientConfiguration { window(-1), virtual_download(true), producer_certificate("/tmp/rsa_certificate.pem"), - receive_buffer(std::make_shared>()), + receive_buffer(nullptr), download_size(0), report_interval_milliseconds_(1000), rtc_(false), @@ -67,7 +71,7 @@ struct ClientConfiguration { double window; bool virtual_download; std::string producer_certificate; - std::shared_ptr> receive_buffer; + std::shared_ptr receive_buffer; std::size_t download_size; std::uint32_t report_interval_milliseconds_; TransportProtocolAlgorithms transport_protocol_; @@ -75,6 +79,9 @@ struct ClientConfiguration { bool test_mode_; }; +/** + * Class for handling the production rate for the RTC producer. + */ class Rate { public: Rate() : rate_kbps_(0) {} @@ -110,6 +117,9 @@ class Rate { float rate_kbps_; }; +/** + * Container for command line configuration for hiperf server. + */ struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), @@ -147,10 +157,23 @@ struct ServerConfiguration { std::size_t payload_size_; }; +/** + * Forward declaration of client Read callbacks. + */ +class RTCCallback; +class Callback; + +/** + * Hiperf client class: configure and setup an hicn consumer following the + * ClientConfiguration. + */ class HIperfClient { typedef std::chrono::time_point Time; typedef std::chrono::microseconds TimeDuration; + friend class RTCCallback; + friend class Callback; + public: HIperfClient(const ClientConfiguration &conf) : configuration_(conf), @@ -158,75 +181,55 @@ class HIperfClient { old_bytes_value_(0), signals_(io_service_, SIGINT), expected_seg_(0), - lost_packets_(std::unordered_set()) {} - - void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, - const std::error_code &ec) { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast(t2 - t_download_); - long usec = (long)dt.count(); - - std::cout << "Content retrieved. Size: " << bytes_transferred << " [Bytes]" - << std::endl; - - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; - - io_service_.stop(); - } - - void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred, - const std::error_code &ec) { - configuration_.receive_buffer->clear(); - } + lost_packets_(std::unordered_set()), + rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr), + callback_(configuration_.rtc_ ? nullptr : new Callback(*this)) {} void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) { - if(!configuration_.test_mode_) - return; + const ContentObject &contentObject) { + if (!configuration_.test_mode_) return; uint32_t receivedSeg = contentObject.getName().getSuffix(); auto payload = contentObject.getPayload(); - if((uint32_t)payload->length() == 8){ //8 is the size of the NACK payload + if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK + // payload uint32_t *payloadPtr = (uint32_t *)payload->data(); uint32_t productionSeg = *(payloadPtr); uint32_t productionRate = *(++payloadPtr); - if(productionRate == 0){ - std::cout << "[STOP] producer is not producing content" - << std::endl; + if (productionRate == 0) { + std::cout << "[STOP] producer is not producing content" << std::endl; return; } - if(receivedSeg < productionSeg){ - std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg << - ". Next expected packet " << productionSeg + 1 << std::endl; + if (receivedSeg < productionSeg) { + std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg + << ". Next expected packet " << productionSeg + 1 + << std::endl; expected_seg_ = productionSeg; - } else if(receivedSeg > productionSeg){ - std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg << - ". Next expected packet " << productionSeg << std::endl; + } else if (receivedSeg > productionSeg) { + std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg + << ". Next expected packet " << productionSeg << std::endl; } return; } - if(receivedSeg > expected_seg_){ - for(uint32_t i = expected_seg_; i < receivedSeg; i++){ + if (receivedSeg > expected_seg_) { + for (uint32_t i = expected_seg_; i < receivedSeg; i++) { std::cout << "[LOSS] lost packet " << i << std::endl; lost_packets_.insert(i); } expected_seg_ = receivedSeg + 1; return; - }else if (receivedSeg < expected_seg_){ + } else if (receivedSeg < expected_seg_) { auto it = lost_packets_.find(receivedSeg); - if(it != lost_packets_.end()){ + if (it != lost_packets_.end()) { std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl; lost_packets_.erase(it); - }else{ - std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " << - expected_seg_ << std::endl; + } else { + std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " + << expected_seg_ << std::endl; } return; } @@ -379,28 +382,22 @@ class HIperfClient { if (!configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + ConsumerCallbacksOptions::READ_CALLBACK, callback_); } else { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayloadRtc, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_); } if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - if(configuration_.rtc_){ + if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, (ConsumerContentObjectCallback)std::bind( &HIperfClient::checkReceivedRtcContent, this, - std::placeholders::_1, std::placeholders::_2)); + std::placeholders::_1, std::placeholders::_2)); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } @@ -437,14 +434,103 @@ class HIperfClient { }); t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name, - configuration_.receive_buffer); + consumer_socket_->asyncConsume(configuration_.name); io_service_.run(); return ERROR_SUCCESS; } private: + class RTCCallback : public ConsumerSocket::ReadCallback { + static constexpr std::size_t mtu = 1500; + + public: + RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { + client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + } + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = + client_.configuration_.receive_buffer->writableData(); + *max_length = mtu; + } + + void readDataAvailable(std::size_t length) noexcept override { + // Do nothing + return; + } + + size_t maxBufferSize() const override { return mtu; } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error while reading from RTC socket" << std::endl; + } + + void readSuccess(std::size_t total_size) noexcept override { + std::cout << "Data successfully read" << std::endl; + } + + private: + HIperfClient &client_; + }; + + class Callback : public ConsumerSocket::ReadCallback { + static constexpr std::size_t read_size = 16 * 1024; + + public: + Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {} + + bool isBufferMovable() noexcept override { return true; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + // Not used + } + + void readDataAvailable(std::size_t length) noexcept override { + // Do nothing + return; + } + + void readBufferAvailable( + std::unique_ptr &&buffer) noexcept override { + if (client_.configuration_.receive_buffer) { + client_.configuration_.receive_buffer->prependChain(std::move(buffer)); + } else { + client_.configuration_.receive_buffer = std::move(buffer); + } + } + + size_t maxBufferSize() const override { return read_size; } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error " << ec.message() << " while reading from socket" + << std::endl; + } + + void readSuccess(std::size_t total_size) noexcept override { + Time t2 = std::chrono::steady_clock::now(); + TimeDuration dt = + std::chrono::duration_cast(t2 - client_.t_download_); + long usec = (long)dt.count(); + + std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" + << std::endl; + + std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " + << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" + << std::endl; + + client_.io_service_.stop(); + } + + private: + HIperfClient &client_; + }; + ClientConfiguration configuration_; Time t_stats_; Time t_download_; @@ -455,8 +541,14 @@ class HIperfClient { std::unique_ptr consumer_socket_; uint32_t expected_seg_; std::unordered_set lost_packets_; + RTCCallback *rtc_callback_; + Callback *callback_; }; +/** + * Hiperf server class: configure and setup an hicn producer following the + * ServerConfiguration. + */ class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; @@ -800,7 +892,6 @@ void usage() { "receiving the correct data. This is an RTC specific option, to be " "used with the -R (default false)" << std::endl; - } int main(int argc, char *argv[]) { @@ -899,7 +990,7 @@ int main(int argc, char *argv[]) { options = 1; break; } - case 't':{ + case 't': { client_configuration.test_mode_ = true; options = 1; break; diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc index d6614303a..049ab3ac5 100644 --- a/utils/src/ping_server.cc +++ b/utils/src/ping_server.cc @@ -19,6 +19,7 @@ #else #include #endif +#include #include #include @@ -43,16 +44,22 @@ utils::Identity setProducerIdentity(std::string keystore_name, class CallbackContainer { const std::size_t log2_content_object_buffer_size = 12; -public: + public: CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose, bool dump, bool quite, bool flags, bool reset, uint8_t ttl, utils::Identity *identity, bool sign, uint32_t lifetime) : buffer_(object_size, 'X'), content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - content_objects_index_(0), verbose_(verbose), dump_(dump), - quite_(quite), flags_(flags), reset_(reset), ttl_(ttl), - identity_(identity), sign_(sign) { + content_objects_index_(0), + verbose_(verbose), + dump_(dump), + quite_(quite), + flags_(flags), + reset_(reset), + ttl_(ttl), + identity_(identity), + sign_(sign) { core::Packet::Format format; if (prefix.getAddressFamily() == AF_INET) { @@ -114,7 +121,7 @@ public: content_object->setAck(); } else if (interest.testAck()) { content_object->setAck(); - } // here I may need to handle the FIN flag; + } // here I may need to handle the FIN flag; } else if (reset_) { content_object->setRst(); } @@ -136,8 +143,7 @@ public: std::cout << "-----------------------" << std::endl; } - if (!quite_) - std::cout << std::endl; + if (!quite_) std::cout << std::endl; if (sign_) { identity_->getSigner().sign(*content_object); @@ -147,7 +153,7 @@ public: } } -private: + private: std::string buffer_; std::vector> content_objects_; std::uint16_t mask_; @@ -222,51 +228,51 @@ int main(int argc, char **argv) { while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) { #endif switch (opt) { - case 's': - object_size = std::stoi(optarg); - break; - case 'n': - name_prefix = optarg; - break; - case 't': - ttl = (uint8_t)std::stoi(optarg); - break; - case 'l': - data_lifetime = std::stoi(optarg); - break; - case 'V': - verbose = true; - break; - case 'D': - dump = true; - break; - case 'q': - verbose = false; - dump = false; - quite = true; - break; + case 's': + object_size = std::stoi(optarg); + break; + case 'n': + name_prefix = optarg; + break; + case 't': + ttl = (uint8_t)std::stoi(optarg); + break; + case 'l': + data_lifetime = std::stoi(optarg); + break; + case 'V': + verbose = true; + break; + case 'D': + dump = true; + break; + case 'q': + verbose = false; + dump = false; + quite = true; + break; #ifndef _WIN32 - case 'd': - daemon = true; - break; + case 'd': + daemon = true; + break; #endif - case 'f': - flags = true; - break; - case 'r': - reset = true; - break; - case 'k': - keystore_path = optarg; - sign = true; - break; - case 'p': - keystore_password = optarg; - break; - case 'H': - default: - help(); - exit(EXIT_FAILURE); + case 'f': + flags = true; + break; + case 'r': + reset = true; + break; + case 'k': + keystore_path = optarg; + sign = true; + break; + case 'p': + keystore_password = optarg; + break; + case 'H': + default: + help(); + exit(EXIT_FAILURE); } } @@ -282,8 +288,7 @@ int main(int argc, char **argv) { std::string ip_address = tokenizer.nextToken(); Name n(ip_address); - if (object_size > 1350) - object_size = 1350; + if (object_size > 1350) object_size = 1350; CallbackContainer *stubs; utils::Identity identity = setProducerIdentity( @@ -327,9 +332,9 @@ int main(int argc, char **argv) { return 0; } -} // namespace interface +} // namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char **argv) { return transport::interface::main(argc, argv); -- cgit 1.2.3-korg