summaryrefslogtreecommitdiffstats
path: root/utils/src/hiperf.cc
diff options
context:
space:
mode:
Diffstat (limited to 'utils/src/hiperf.cc')
-rw-r--r--utils/src/hiperf.cc211
1 files changed, 151 insertions, 60 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 7594d1f94..ddc79d520 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -16,14 +16,15 @@
#include <hicn/transport/interfaces/rtc_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/identity.h>
#ifndef _WIN32
#include <hicn/transport/utils/daemonizator.h>
#endif
#include <hicn/transport/utils/literals.h>
-#include <unordered_set>
#include <fstream>
#include <iomanip>
+#include <unordered_set>
#ifdef __linux__
#include <mcheck.h>
@@ -45,6 +46,9 @@ namespace interface {
using CryptoSuite = utils::CryptoSuite;
using Identity = utils::Identity;
+/**
+ * Container for command line configuration for hiperf client.
+ */
struct ClientConfiguration {
ClientConfiguration()
: name("b001::abcd", 0),
@@ -54,7 +58,7 @@ struct ClientConfiguration {
window(-1),
virtual_download(true),
producer_certificate("/tmp/rsa_certificate.pem"),
- receive_buffer(std::make_shared<std::vector<uint8_t>>()),
+ receive_buffer(nullptr),
download_size(0),
report_interval_milliseconds_(1000),
rtc_(false),
@@ -67,7 +71,7 @@ struct ClientConfiguration {
double window;
bool virtual_download;
std::string producer_certificate;
- std::shared_ptr<std::vector<uint8_t>> receive_buffer;
+ std::shared_ptr<utils::MemBuf> receive_buffer;
std::size_t download_size;
std::uint32_t report_interval_milliseconds_;
TransportProtocolAlgorithms transport_protocol_;
@@ -75,6 +79,9 @@ struct ClientConfiguration {
bool test_mode_;
};
+/**
+ * Class for handling the production rate for the RTC producer.
+ */
class Rate {
public:
Rate() : rate_kbps_(0) {}
@@ -110,6 +117,9 @@ class Rate {
float rate_kbps_;
};
+/**
+ * Container for command line configuration for hiperf server.
+ */
struct ServerConfiguration {
ServerConfiguration()
: name("b001::abcd/64"),
@@ -147,10 +157,23 @@ struct ServerConfiguration {
std::size_t payload_size_;
};
+/**
+ * Forward declaration of client Read callbacks.
+ */
+class RTCCallback;
+class Callback;
+
+/**
+ * Hiperf client class: configure and setup an hicn consumer following the
+ * ClientConfiguration.
+ */
class HIperfClient {
typedef std::chrono::time_point<std::chrono::steady_clock> Time;
typedef std::chrono::microseconds TimeDuration;
+ friend class RTCCallback;
+ friend class Callback;
+
public:
HIperfClient(const ClientConfiguration &conf)
: configuration_(conf),
@@ -158,75 +181,55 @@ class HIperfClient {
old_bytes_value_(0),
signals_(io_service_, SIGINT),
expected_seg_(0),
- lost_packets_(std::unordered_set<uint32_t>()) {}
-
- void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec) {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - t_download_);
- long usec = (long)dt.count();
-
- std::cout << "Content retrieved. Size: " << bytes_transferred << " [Bytes]"
- << std::endl;
-
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
-
- io_service_.stop();
- }
-
- void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec) {
- configuration_.receive_buffer->clear();
- }
+ lost_packets_(std::unordered_set<uint32_t>()),
+ rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr),
+ callback_(configuration_.rtc_ ? nullptr : new Callback(*this)) {}
void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {
- if(!configuration_.test_mode_)
- return;
+ const ContentObject &contentObject) {
+ if (!configuration_.test_mode_) return;
uint32_t receivedSeg = contentObject.getName().getSuffix();
auto payload = contentObject.getPayload();
- if((uint32_t)payload->length() == 8){ //8 is the size of the NACK payload
+ if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK
+ // payload
uint32_t *payloadPtr = (uint32_t *)payload->data();
uint32_t productionSeg = *(payloadPtr);
uint32_t productionRate = *(++payloadPtr);
- if(productionRate == 0){
- std::cout << "[STOP] producer is not producing content"
- << std::endl;
+ if (productionRate == 0) {
+ std::cout << "[STOP] producer is not producing content" << std::endl;
return;
}
- if(receivedSeg < productionSeg){
- std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg <<
- ". Next expected packet " << productionSeg + 1 << std::endl;
+ if (receivedSeg < productionSeg) {
+ std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg
+ << ". Next expected packet " << productionSeg + 1
+ << std::endl;
expected_seg_ = productionSeg;
- } else if(receivedSeg > productionSeg){
- std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg <<
- ". Next expected packet " << productionSeg << std::endl;
+ } else if (receivedSeg > productionSeg) {
+ std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg
+ << ". Next expected packet " << productionSeg << std::endl;
}
return;
}
- if(receivedSeg > expected_seg_){
- for(uint32_t i = expected_seg_; i < receivedSeg; i++){
+ if (receivedSeg > expected_seg_) {
+ for (uint32_t i = expected_seg_; i < receivedSeg; i++) {
std::cout << "[LOSS] lost packet " << i << std::endl;
lost_packets_.insert(i);
}
expected_seg_ = receivedSeg + 1;
return;
- }else if (receivedSeg < expected_seg_){
+ } else if (receivedSeg < expected_seg_) {
auto it = lost_packets_.find(receivedSeg);
- if(it != lost_packets_.end()){
+ if (it != lost_packets_.end()) {
std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl;
lost_packets_.erase(it);
- }else{
- std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " <<
- expected_seg_ << std::endl;
+ } else {
+ std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted "
+ << expected_seg_ << std::endl;
}
return;
}
@@ -379,28 +382,22 @@ class HIperfClient {
if (!configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HIperfClient::processPayload, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ ConsumerCallbacksOptions::READ_CALLBACK, callback_);
} else {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HIperfClient::processPayloadRtc, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_);
}
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if(configuration_.rtc_){
+ if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
(ConsumerContentObjectCallback)std::bind(
&HIperfClient::checkReceivedRtcContent, this,
- std::placeholders::_1, std::placeholders::_2));
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
@@ -437,14 +434,103 @@ class HIperfClient {
});
t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name,
- configuration_.receive_buffer);
+ consumer_socket_->asyncConsume(configuration_.name);
io_service_.run();
return ERROR_SUCCESS;
}
private:
+ class RTCCallback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t mtu = 1500;
+
+ public:
+ RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = mtu;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ // Do nothing
+ return;
+ }
+
+ size_t maxBufferSize() const override { return mtu; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error while reading from RTC socket" << std::endl;
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ std::cout << "Data successfully read" << std::endl;
+ }
+
+ private:
+ HIperfClient &client_;
+ };
+
+ class Callback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t read_size = 16 * 1024;
+
+ public:
+ Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {}
+
+ bool isBufferMovable() noexcept override { return true; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ // Not used
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ // Do nothing
+ return;
+ }
+
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ if (client_.configuration_.receive_buffer) {
+ client_.configuration_.receive_buffer->prependChain(std::move(buffer));
+ } else {
+ client_.configuration_.receive_buffer = std::move(buffer);
+ }
+ }
+
+ size_t maxBufferSize() const override { return read_size; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error " << ec.message() << " while reading from socket"
+ << std::endl;
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ Time t2 = std::chrono::steady_clock::now();
+ TimeDuration dt =
+ std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
+ long usec = (long)dt.count();
+
+ std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
+ << std::endl;
+
+ std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
+ << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
+ << std::endl;
+
+ client_.io_service_.stop();
+ }
+
+ private:
+ HIperfClient &client_;
+ };
+
ClientConfiguration configuration_;
Time t_stats_;
Time t_download_;
@@ -455,8 +541,14 @@ class HIperfClient {
std::unique_ptr<ConsumerSocket> consumer_socket_;
uint32_t expected_seg_;
std::unordered_set<uint32_t> lost_packets_;
+ RTCCallback *rtc_callback_;
+ Callback *callback_;
};
+/**
+ * Hiperf server class: configure and setup an hicn producer following the
+ * ServerConfiguration.
+ */
class HIperfServer {
const std::size_t log2_content_object_buffer_size = 8;
@@ -800,7 +892,6 @@ void usage() {
"receiving the correct data. This is an RTC specific option, to be "
"used with the -R (default false)"
<< std::endl;
-
}
int main(int argc, char *argv[]) {
@@ -899,7 +990,7 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
- case 't':{
+ case 't': {
client_configuration.test_mode_ = true;
options = 1;
break;