summaryrefslogtreecommitdiffstats
path: root/utils
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-03 10:03:56 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-15 11:37:30 +0200
commitc365689250216861fd7727203ee6ba1049ad5778 (patch)
tree97f1f3d1a6cb7314f1292d97be6d8e8e06cc998b /utils
parentd8ce6d98a2a726393655bd71eb81b8ef5222d6ba (diff)
[HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application.
Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'utils')
-rw-r--r--utils/src/hiperf.cc211
-rw-r--r--utils/src/ping_server.cc115
2 files changed, 211 insertions, 115 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 7594d1f94..ddc79d520 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -16,14 +16,15 @@
#include <hicn/transport/interfaces/rtc_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/identity.h>
#ifndef _WIN32
#include <hicn/transport/utils/daemonizator.h>
#endif
#include <hicn/transport/utils/literals.h>
-#include <unordered_set>
#include <fstream>
#include <iomanip>
+#include <unordered_set>
#ifdef __linux__
#include <mcheck.h>
@@ -45,6 +46,9 @@ namespace interface {
using CryptoSuite = utils::CryptoSuite;
using Identity = utils::Identity;
+/**
+ * Container for command line configuration for hiperf client.
+ */
struct ClientConfiguration {
ClientConfiguration()
: name("b001::abcd", 0),
@@ -54,7 +58,7 @@ struct ClientConfiguration {
window(-1),
virtual_download(true),
producer_certificate("/tmp/rsa_certificate.pem"),
- receive_buffer(std::make_shared<std::vector<uint8_t>>()),
+ receive_buffer(nullptr),
download_size(0),
report_interval_milliseconds_(1000),
rtc_(false),
@@ -67,7 +71,7 @@ struct ClientConfiguration {
double window;
bool virtual_download;
std::string producer_certificate;
- std::shared_ptr<std::vector<uint8_t>> receive_buffer;
+ std::shared_ptr<utils::MemBuf> receive_buffer;
std::size_t download_size;
std::uint32_t report_interval_milliseconds_;
TransportProtocolAlgorithms transport_protocol_;
@@ -75,6 +79,9 @@ struct ClientConfiguration {
bool test_mode_;
};
+/**
+ * Class for handling the production rate for the RTC producer.
+ */
class Rate {
public:
Rate() : rate_kbps_(0) {}
@@ -110,6 +117,9 @@ class Rate {
float rate_kbps_;
};
+/**
+ * Container for command line configuration for hiperf server.
+ */
struct ServerConfiguration {
ServerConfiguration()
: name("b001::abcd/64"),
@@ -147,10 +157,23 @@ struct ServerConfiguration {
std::size_t payload_size_;
};
+/**
+ * Forward declaration of client Read callbacks.
+ */
+class RTCCallback;
+class Callback;
+
+/**
+ * Hiperf client class: configure and setup an hicn consumer following the
+ * ClientConfiguration.
+ */
class HIperfClient {
typedef std::chrono::time_point<std::chrono::steady_clock> Time;
typedef std::chrono::microseconds TimeDuration;
+ friend class RTCCallback;
+ friend class Callback;
+
public:
HIperfClient(const ClientConfiguration &conf)
: configuration_(conf),
@@ -158,75 +181,55 @@ class HIperfClient {
old_bytes_value_(0),
signals_(io_service_, SIGINT),
expected_seg_(0),
- lost_packets_(std::unordered_set<uint32_t>()) {}
-
- void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec) {
- Time t2 = std::chrono::steady_clock::now();
- TimeDuration dt =
- std::chrono::duration_cast<TimeDuration>(t2 - t_download_);
- long usec = (long)dt.count();
-
- std::cout << "Content retrieved. Size: " << bytes_transferred << " [Bytes]"
- << std::endl;
-
- std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
- << (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]"
- << std::endl;
-
- io_service_.stop();
- }
-
- void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred,
- const std::error_code &ec) {
- configuration_.receive_buffer->clear();
- }
+ lost_packets_(std::unordered_set<uint32_t>()),
+ rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr),
+ callback_(configuration_.rtc_ ? nullptr : new Callback(*this)) {}
void checkReceivedRtcContent(ConsumerSocket &c,
- const ContentObject &contentObject) {
- if(!configuration_.test_mode_)
- return;
+ const ContentObject &contentObject) {
+ if (!configuration_.test_mode_) return;
uint32_t receivedSeg = contentObject.getName().getSuffix();
auto payload = contentObject.getPayload();
- if((uint32_t)payload->length() == 8){ //8 is the size of the NACK payload
+ if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK
+ // payload
uint32_t *payloadPtr = (uint32_t *)payload->data();
uint32_t productionSeg = *(payloadPtr);
uint32_t productionRate = *(++payloadPtr);
- if(productionRate == 0){
- std::cout << "[STOP] producer is not producing content"
- << std::endl;
+ if (productionRate == 0) {
+ std::cout << "[STOP] producer is not producing content" << std::endl;
return;
}
- if(receivedSeg < productionSeg){
- std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg <<
- ". Next expected packet " << productionSeg + 1 << std::endl;
+ if (receivedSeg < productionSeg) {
+ std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg
+ << ". Next expected packet " << productionSeg + 1
+ << std::endl;
expected_seg_ = productionSeg;
- } else if(receivedSeg > productionSeg){
- std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg <<
- ". Next expected packet " << productionSeg << std::endl;
+ } else if (receivedSeg > productionSeg) {
+ std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg
+ << ". Next expected packet " << productionSeg << std::endl;
}
return;
}
- if(receivedSeg > expected_seg_){
- for(uint32_t i = expected_seg_; i < receivedSeg; i++){
+ if (receivedSeg > expected_seg_) {
+ for (uint32_t i = expected_seg_; i < receivedSeg; i++) {
std::cout << "[LOSS] lost packet " << i << std::endl;
lost_packets_.insert(i);
}
expected_seg_ = receivedSeg + 1;
return;
- }else if (receivedSeg < expected_seg_){
+ } else if (receivedSeg < expected_seg_) {
auto it = lost_packets_.find(receivedSeg);
- if(it != lost_packets_.end()){
+ if (it != lost_packets_.end()) {
std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl;
lost_packets_.erase(it);
- }else{
- std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " <<
- expected_seg_ << std::endl;
+ } else {
+ std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted "
+ << expected_seg_ << std::endl;
}
return;
}
@@ -379,28 +382,22 @@ class HIperfClient {
if (!configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HIperfClient::processPayload, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ ConsumerCallbacksOptions::READ_CALLBACK, callback_);
} else {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HIperfClient::processPayloadRtc, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_);
}
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if(configuration_.rtc_){
+ if (configuration_.rtc_) {
ret = consumer_socket_->setSocketOption(
ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
(ConsumerContentObjectCallback)std::bind(
&HIperfClient::checkReceivedRtcContent, this,
- std::placeholders::_1, std::placeholders::_2));
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
@@ -437,14 +434,103 @@ class HIperfClient {
});
t_download_ = t_stats_ = std::chrono::steady_clock::now();
- consumer_socket_->asyncConsume(configuration_.name,
- configuration_.receive_buffer);
+ consumer_socket_->asyncConsume(configuration_.name);
io_service_.run();
return ERROR_SUCCESS;
}
private:
+ class RTCCallback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t mtu = 1500;
+
+ public:
+ RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
+ }
+
+ bool isBufferMovable() noexcept override { return false; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = mtu;
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ // Do nothing
+ return;
+ }
+
+ size_t maxBufferSize() const override { return mtu; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error while reading from RTC socket" << std::endl;
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ std::cout << "Data successfully read" << std::endl;
+ }
+
+ private:
+ HIperfClient &client_;
+ };
+
+ class Callback : public ConsumerSocket::ReadCallback {
+ static constexpr std::size_t read_size = 16 * 1024;
+
+ public:
+ Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {}
+
+ bool isBufferMovable() noexcept override { return true; }
+
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {
+ // Not used
+ }
+
+ void readDataAvailable(std::size_t length) noexcept override {
+ // Do nothing
+ return;
+ }
+
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {
+ if (client_.configuration_.receive_buffer) {
+ client_.configuration_.receive_buffer->prependChain(std::move(buffer));
+ } else {
+ client_.configuration_.receive_buffer = std::move(buffer);
+ }
+ }
+
+ size_t maxBufferSize() const override { return read_size; }
+
+ void readError(const std::error_code ec) noexcept override {
+ std::cerr << "Error " << ec.message() << " while reading from socket"
+ << std::endl;
+ }
+
+ void readSuccess(std::size_t total_size) noexcept override {
+ Time t2 = std::chrono::steady_clock::now();
+ TimeDuration dt =
+ std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_);
+ long usec = (long)dt.count();
+
+ std::cout << "Content retrieved. Size: " << total_size << " [Bytes]"
+ << std::endl;
+
+ std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- "
+ << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]"
+ << std::endl;
+
+ client_.io_service_.stop();
+ }
+
+ private:
+ HIperfClient &client_;
+ };
+
ClientConfiguration configuration_;
Time t_stats_;
Time t_download_;
@@ -455,8 +541,14 @@ class HIperfClient {
std::unique_ptr<ConsumerSocket> consumer_socket_;
uint32_t expected_seg_;
std::unordered_set<uint32_t> lost_packets_;
+ RTCCallback *rtc_callback_;
+ Callback *callback_;
};
+/**
+ * Hiperf server class: configure and setup an hicn producer following the
+ * ServerConfiguration.
+ */
class HIperfServer {
const std::size_t log2_content_object_buffer_size = 8;
@@ -800,7 +892,6 @@ void usage() {
"receiving the correct data. This is an RTC specific option, to be "
"used with the -R (default false)"
<< std::endl;
-
}
int main(int argc, char *argv[]) {
@@ -899,7 +990,7 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
- case 't':{
+ case 't': {
client_configuration.test_mode_ = true;
options = 1;
break;
diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc
index d6614303a..049ab3ac5 100644
--- a/utils/src/ping_server.cc
+++ b/utils/src/ping_server.cc
@@ -19,6 +19,7 @@
#else
#include <openssl/applink.c>
#endif
+#include <hicn/transport/utils/identity.h>
#include <hicn/transport/utils/signer.h>
#include <hicn/transport/utils/string_tokenizer.h>
@@ -43,16 +44,22 @@ utils::Identity setProducerIdentity(std::string keystore_name,
class CallbackContainer {
const std::size_t log2_content_object_buffer_size = 12;
-public:
+ public:
CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose,
bool dump, bool quite, bool flags, bool reset, uint8_t ttl,
utils::Identity *identity, bool sign, uint32_t lifetime)
: buffer_(object_size, 'X'),
content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)),
mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- content_objects_index_(0), verbose_(verbose), dump_(dump),
- quite_(quite), flags_(flags), reset_(reset), ttl_(ttl),
- identity_(identity), sign_(sign) {
+ content_objects_index_(0),
+ verbose_(verbose),
+ dump_(dump),
+ quite_(quite),
+ flags_(flags),
+ reset_(reset),
+ ttl_(ttl),
+ identity_(identity),
+ sign_(sign) {
core::Packet::Format format;
if (prefix.getAddressFamily() == AF_INET) {
@@ -114,7 +121,7 @@ public:
content_object->setAck();
} else if (interest.testAck()) {
content_object->setAck();
- } // here I may need to handle the FIN flag;
+ } // here I may need to handle the FIN flag;
} else if (reset_) {
content_object->setRst();
}
@@ -136,8 +143,7 @@ public:
std::cout << "-----------------------" << std::endl;
}
- if (!quite_)
- std::cout << std::endl;
+ if (!quite_) std::cout << std::endl;
if (sign_) {
identity_->getSigner().sign(*content_object);
@@ -147,7 +153,7 @@ public:
}
}
-private:
+ private:
std::string buffer_;
std::vector<std::shared_ptr<ContentObject>> content_objects_;
std::uint16_t mask_;
@@ -222,51 +228,51 @@ int main(int argc, char **argv) {
while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) {
#endif
switch (opt) {
- case 's':
- object_size = std::stoi(optarg);
- break;
- case 'n':
- name_prefix = optarg;
- break;
- case 't':
- ttl = (uint8_t)std::stoi(optarg);
- break;
- case 'l':
- data_lifetime = std::stoi(optarg);
- break;
- case 'V':
- verbose = true;
- break;
- case 'D':
- dump = true;
- break;
- case 'q':
- verbose = false;
- dump = false;
- quite = true;
- break;
+ case 's':
+ object_size = std::stoi(optarg);
+ break;
+ case 'n':
+ name_prefix = optarg;
+ break;
+ case 't':
+ ttl = (uint8_t)std::stoi(optarg);
+ break;
+ case 'l':
+ data_lifetime = std::stoi(optarg);
+ break;
+ case 'V':
+ verbose = true;
+ break;
+ case 'D':
+ dump = true;
+ break;
+ case 'q':
+ verbose = false;
+ dump = false;
+ quite = true;
+ break;
#ifndef _WIN32
- case 'd':
- daemon = true;
- break;
+ case 'd':
+ daemon = true;
+ break;
#endif
- case 'f':
- flags = true;
- break;
- case 'r':
- reset = true;
- break;
- case 'k':
- keystore_path = optarg;
- sign = true;
- break;
- case 'p':
- keystore_password = optarg;
- break;
- case 'H':
- default:
- help();
- exit(EXIT_FAILURE);
+ case 'f':
+ flags = true;
+ break;
+ case 'r':
+ reset = true;
+ break;
+ case 'k':
+ keystore_path = optarg;
+ sign = true;
+ break;
+ case 'p':
+ keystore_password = optarg;
+ break;
+ case 'H':
+ default:
+ help();
+ exit(EXIT_FAILURE);
}
}
@@ -282,8 +288,7 @@ int main(int argc, char **argv) {
std::string ip_address = tokenizer.nextToken();
Name n(ip_address);
- if (object_size > 1350)
- object_size = 1350;
+ if (object_size > 1350) object_size = 1350;
CallbackContainer *stubs;
utils::Identity identity = setProducerIdentity(
@@ -327,9 +332,9 @@ int main(int argc, char **argv) {
return 0;
}
-} // namespace interface
+} // namespace interface
-} // end namespace transport
+} // end namespace transport
int main(int argc, char **argv) {
return transport::interface::main(argc, argv);