summaryrefslogtreecommitdiffstats
path: root/utils/src/hiperf.cc
diff options
context:
space:
mode:
Diffstat (limited to 'utils/src/hiperf.cc')
-rw-r--r--utils/src/hiperf.cc635
1 files changed, 422 insertions, 213 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index f4764e096..203c2acb9 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -16,19 +16,16 @@
#include <hicn/transport/config.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/interest.h>
-#include <hicn/transport/interfaces/rtc_socket_producer.h>
+#include <hicn/transport/interfaces/global_conf_interface.h>
+#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
+#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/security/identity.h>
-#include <hicn/transport/security/signer.h>
+#include <hicn/transport/auth/identity.h>
+#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
#include <hicn/transport/utils/literals.h>
-#ifdef SECURE_HICNTRANSPORT
-#include <hicn/transport/interfaces/p2psecure_socket_consumer.h>
-#include <hicn/transport/interfaces/p2psecure_socket_producer.h>
-#endif
-
#ifndef _WIN32
#include <hicn/transport/utils/daemonizator.h>
#endif
@@ -37,6 +34,7 @@
#include <cmath>
#include <fstream>
#include <iomanip>
+#include <sstream>
#include <unordered_set>
#ifdef __linux__
@@ -50,7 +48,6 @@
#endif
namespace transport {
-
namespace interface {
#ifndef ERROR_SUCCESS
@@ -59,13 +56,50 @@ namespace interface {
#define ERROR_SETUP -5
#define MIN_PROBE_SEQ 0xefffffff
+struct packet_t {
+ uint64_t timestamp;
+ uint32_t size;
+};
+
+inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
+
+struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+};
+
/**
* Container for command line configuration for hiperf client.
*/
struct ClientConfiguration {
ClientConfiguration()
: name("b001::abcd", 0),
- verify(false),
beta(-1.f),
drop_factor(-1.f),
window(-1),
@@ -78,15 +112,11 @@ struct ClientConfiguration {
transport_protocol_(CBR),
rtc_(false),
test_mode_(false),
-#ifdef SECURE_HICNTRANSPORT
secure_(false),
-#endif
producer_prefix_(),
- interest_lifetime_(500) {
- }
+ interest_lifetime_(500) {}
Name name;
- bool verify;
double beta;
double drop_factor;
double window;
@@ -99,9 +129,7 @@ struct ClientConfiguration {
TransportProtocolAlgorithms transport_protocol_;
bool rtc_;
bool test_mode_;
-#ifdef SECURE_HICNTRANSPORT
bool secure_;
-#endif
Prefix producer_prefix_;
uint32_t interest_lifetime_;
};
@@ -156,21 +184,19 @@ struct ServerConfiguration {
sign(false),
content_lifetime(600000000_U32),
download_size(20 * 1024 * 1024),
- hash_algorithm(utils::CryptoHashType::SHA_256),
+ hash_algorithm(auth::CryptoHashType::SHA_256),
keystore_name(""),
passphrase(""),
keystore_password("cisco"),
multiphase_produce_(false),
rtc_(false),
interactive_(false),
+ trace_based_(false),
+ trace_index_(0),
+ trace_file_(nullptr),
production_rate_(std::string("2048kbps")),
- payload_size_(1400)
-#ifdef SECURE_HICNTRANSPORT
- ,
- secure_(false)
-#endif
- {
- }
+ payload_size_(1400),
+ secure_(false) {}
Prefix name;
bool virtual_producer;
@@ -179,18 +205,20 @@ struct ServerConfiguration {
bool sign;
std::uint32_t content_lifetime;
std::uint32_t download_size;
- utils::CryptoHashType hash_algorithm;
+ auth::CryptoHashType hash_algorithm;
std::string keystore_name;
std::string passphrase;
std::string keystore_password;
bool multiphase_produce_;
bool rtc_;
bool interactive_;
+ bool trace_based_;
+ std::uint32_t trace_index_;
+ char *trace_file_;
Rate production_rate_;
std::size_t payload_size_;
-#ifdef SECURE_HICNTRANSPORT
bool secure_;
-#endif
+ std::vector<struct packet_t> trace_;
};
/**
@@ -217,18 +245,26 @@ class HIperfClient {
: configuration_(conf),
total_duration_milliseconds_(0),
old_bytes_value_(0),
+ old_interest_tx_value_(0),
+ old_fec_interest_tx_value_(0),
+ old_fec_data_rx_value_(0),
+ old_lost_data_value_(0),
+ old_bytes_recovered_value_(0),
+ old_retx_value_(0),
+ old_sent_int_value_(0),
+ old_received_nacks_value_(0),
+ avg_data_delay_(0),
+ delay_sample_(0),
+ received_bytes_(0),
+ received_data_pkt_(0),
signals_(io_service_),
expected_seg_(0),
lost_packets_(std::unordered_set<uint32_t>()),
- rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr),
- callback_(configuration_.rtc_ ? nullptr : new Callback(*this)),
- key_callback_(configuration_.rtc_ ? nullptr : new KeyCallback(*this)) {}
-
- ~HIperfClient() {
- delete callback_;
- delete key_callback_;
- delete rtc_callback_;
- }
+ rtc_callback_(*this),
+ callback_(*this),
+ key_callback_(*this) {}
+
+ ~HIperfClient() {}
void checkReceivedRtcContent(ConsumerSocket &c,
const ContentObject &contentObject) {
@@ -237,11 +273,15 @@ class HIperfClient {
uint32_t receivedSeg = contentObject.getName().getSuffix();
auto payload = contentObject.getPayload();
- if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK
- // payload
- uint32_t *payloadPtr = (uint32_t *)payload->data();
- uint32_t productionSeg = *(payloadPtr);
- uint32_t productionRate = *(++payloadPtr);
+ if ((uint32_t)payload->length() == 16) { // 16 is the size of the NACK
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)contentObject.getPayload()->data();
+ uint32_t productionSeg = nack_pkt->getProductionSegement();
+ uint32_t productionRate = nack_pkt->getProductionRate();
+
+ // uint32_t *payloadPtr = (uint32_t *)payload->data();
+ // uint32_t productionSeg = *(payloadPtr);
+ // uint32_t productionRate = *(++payloadPtr);
if (productionRate == 0) {
std::cout << "[STOP] producer is not producing content" << std::endl;
@@ -254,7 +294,7 @@ class HIperfClient {
<< std::endl;
expected_seg_ = productionSeg;
} else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) {
- std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg
+ std::cout << "[WINDOW TOO LARGE] received NACK for " << receivedSeg
<< ". Next expected packet " << productionSeg << std::endl;
} else if (receivedSeg >= MIN_PROBE_SEQ) {
std::cout << "[PROBE] probe number = " << receivedSeg << std::endl;
@@ -262,7 +302,24 @@ class HIperfClient {
return;
}
- if (receivedSeg > expected_seg_) {
+ received_bytes_ += (payload->length() - 12);
+ received_data_pkt_++;
+
+ // collecting delay stats. Just for performance testing
+ // XXX we should probably get the transport header (12) somewhere
+ uint64_t *senderTimeStamp = (uint64_t *)(payload->data() + 12);
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ double new_delay = (double)(now - *senderTimeStamp);
+ if (*senderTimeStamp > now)
+ new_delay = -1 * (double)(*senderTimeStamp - now);
+
+ delay_sample_++;
+ avg_data_delay_ =
+ avg_data_delay_ + (new_delay - avg_data_delay_) / delay_sample_;
+
+ if (receivedSeg > expected_seg_ && expected_seg_ != 0) {
for (uint32_t i = expected_seg_; i < receivedSeg; i++) {
std::cout << "[LOSS] lost packet " << i << std::endl;
lost_packets_.insert(i);
@@ -283,24 +340,12 @@ class HIperfClient {
expected_seg_ = receivedSeg + 1;
}
- bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) {
- if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) {
- std::cout << "VERIFY CONTENT" << std::endl;
- } else if (contentObject.getPayloadType() == PayloadType::MANIFEST) {
- std::cout << "VERIFY MANIFEST" << std::endl;
- }
-
- return true;
- }
-
void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {}
void handleTimerExpiration(ConsumerSocket &c,
const TransportStatistics &stats) {
- if (configuration_.rtc_) return;
-
const char separator = ' ';
- const int width = 20;
+ const int width = 15;
utils::TimePoint t2 = utils::SteadyClock::now();
auto exact_duration =
@@ -323,28 +368,125 @@ class HIperfClient {
std::stringstream window;
window << stats.getAverageWindowSize() << std::setfill(separator)
- << "[Interest]";
+ << "[Int]";
std::stringstream avg_rtt;
- avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[us]";
-
- std::cout << std::left << std::setw(width) << "Interval";
- std::cout << std::left << std::setw(width) << "Transfer";
- std::cout << std::left << std::setw(width) << "Bandwidth";
- std::cout << std::left << std::setw(width) << "Retr";
- std::cout << std::left << std::setw(width) << "Cwnd";
- std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
-
- std::cout << std::left << std::setw(width) << interval.str();
- std::cout << std::left << std::setw(width) << bytes_transferred.str();
- std::cout << std::left << std::setw(width) << bandwidth.str();
- std::cout << std::left << std::setw(width) << stats.getRetxCount();
- std::cout << std::left << std::setw(width) << window.str();
- std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
- std::cout << std::endl;
+ avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]";
+ if (configuration_.rtc_) {
+ // we get rtc stats more often, thus we need ms in the interval
+ std::stringstream interval_ms;
+ interval_ms << total_duration_milliseconds_ << "-"
+ << total_duration_milliseconds_ + exact_duration.count();
+
+ std::stringstream lost_data;
+ lost_data << stats.getLostData() - old_lost_data_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream bytes_recovered_data;
+ bytes_recovered_data << stats.getBytesRecoveredData() -
+ old_bytes_recovered_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream data_delay;
+ data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]";
+
+ std::stringstream received_data_pkt;
+ received_data_pkt << received_data_pkt_ << std::setfill(separator)
+ << "[pkt]";
+
+ std::stringstream goodput;
+ goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0
+ << std::setfill(separator) << "[Mbps]";
+
+ std::stringstream loss_rate;
+ loss_rate << std::fixed << std::setprecision(2)
+ << stats.getLossRatio() * 100.0 << std::setfill(separator)
+ << "[%]";
+
+ std::stringstream retx_sent;
+ retx_sent << stats.getRetxCount() - old_retx_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream interest_sent;
+ interest_sent << stats.getInterestTx() - old_sent_int_value_
+ << std::setfill(separator) << "[pkt]";
+
+ std::stringstream nacks;
+ nacks << stats.getReceivedNacks() - old_received_nacks_value_
+ << std::setfill(separator) << "[pkt]";
+
+ // statistics not yet available in the transport
+ // std::stringstream interest_fec_tx;
+ // interest_fec_tx << stats.getInterestFecTxCount() -
+ // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]";
+ // std::stringstream bytes_fec_recv;
+ // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_
+ // << std::setfill(separator) << "[bytes]";
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "RecvData";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Goodput";
+ std::cout << std::left << std::setw(width) << "LossRate";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "InterestSent";
+ std::cout << std::left << std::setw(width) << "ReceivedNacks";
+ std::cout << std::left << std::setw(width) << "SyncWnd";
+ std::cout << std::left << std::setw(width) << "MinRtt";
+ std::cout << std::left << std::setw(width) << "LostData";
+ std::cout << std::left << std::setw(width) << "RecoveredData";
+ std::cout << std::left << std::setw(width) << "State";
+ std::cout << std::left << std::setw(width) << "DataDelay" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval_ms.str();
+ std::cout << std::left << std::setw(width) << received_data_pkt.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << goodput.str();
+ std::cout << std::left << std::setw(width) << loss_rate.str();
+ std::cout << std::left << std::setw(width) << retx_sent.str();
+ std::cout << std::left << std::setw(width) << interest_sent.str();
+ std::cout << std::left << std::setw(width) << nacks.str();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str();
+ std::cout << std::left << std::setw(width) << lost_data.str();
+ std::cout << std::left << std::setw(width) << bytes_recovered_data.str();
+ std::cout << std::left << std::setw(width) << stats.getCCStatus();
+ std::cout << std::left << std::setw(width) << data_delay.str();
+ std::cout << std::endl;
+
+ // statistics not yet available in the transport
+ // std::cout << std::left << std::setw(width) << interest_fec_tx.str();
+ // std::cout << std::left << std::setw(width) << bytes_fec_recv.str();
+ } else {
+ std::cout << std::left << std::setw(width) << "Interval";
+ std::cout << std::left << std::setw(width) << "Transfer";
+ std::cout << std::left << std::setw(width) << "Bandwidth";
+ std::cout << std::left << std::setw(width) << "Retr";
+ std::cout << std::left << std::setw(width) << "Cwnd";
+ std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl;
+
+ std::cout << std::left << std::setw(width) << interval.str();
+ std::cout << std::left << std::setw(width) << bytes_transferred.str();
+ std::cout << std::left << std::setw(width) << bandwidth.str();
+ std::cout << std::left << std::setw(width) << stats.getRetxCount();
+ std::cout << std::left << std::setw(width) << window.str();
+ std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl;
+ std::cout << std::endl;
+ }
total_duration_milliseconds_ += (uint32_t)exact_duration.count();
old_bytes_value_ = stats.getBytesRecv();
+ old_lost_data_value_ = stats.getLostData();
+ old_bytes_recovered_value_ = stats.getBytesRecoveredData();
+ old_fec_interest_tx_value_ = stats.getInterestFecTxCount();
+ old_fec_data_rx_value_ = stats.getBytesFecRecv();
+ old_retx_value_ = stats.getRetxCount();
+ old_sent_int_value_ = stats.getInterestTx();
+ old_received_nacks_value_ = stats.getReceivedNacks();
+ delay_sample_ = 0;
+ avg_data_delay_ = 0;
+ received_bytes_ = 0;
+ received_data_pkt_ = 0;
+
t_stats_ = utils::SteadyClock::now();
}
@@ -359,7 +501,6 @@ class HIperfClient {
configuration_.transport_protocol_ = CBR;
}
-#ifdef SECURE_HICNTRANSPORT
if (configuration_.secure_) {
consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>(
RAAQM, configuration_.transport_protocol_);
@@ -373,12 +514,9 @@ class HIperfClient {
secure_consumer_socket.registerPrefix(configuration_.producer_prefix_);
}
} else {
-#endif
consumer_socket_ =
std::make_shared<ConsumerSocket>(configuration_.transport_protocol_);
-#ifdef SECURE_HICNTRANSPORT
}
-#endif
consumer_socket_->setSocketOption(
GeneralTransportOptions::INTEREST_LIFETIME,
@@ -421,34 +559,21 @@ class HIperfClient {
}
}
- if (configuration_.verify) {
- std::shared_ptr<utils::Verifier> verifier =
- std::make_shared<utils::Verifier>();
- PARCKeyId *key_id_;
-
- if (!configuration_.producer_certificate.empty()) {
- key_id_ = verifier->addKeyFromCertificate(
- configuration_.producer_certificate);
- if (key_id_ == nullptr) return ERROR_SETUP;
- }
-
- if (!configuration_.passphrase.empty()) {
- key_id_ = verifier->addKeyFromPassphrase(
- configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256);
- if (key_id_ == nullptr) return ERROR_SETUP;
- }
-
+ if (!configuration_.producer_certificate.empty()) {
+ std::shared_ptr<auth::Verifier> verifier =
+ std::make_shared<auth::AsymmetricVerifier>(
+ configuration_.producer_certificate);
if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier) ==
- SOCKET_OPTION_NOT_SET) {
+ verifier) == SOCKET_OPTION_NOT_SET)
return ERROR_SETUP;
- }
}
- if (consumer_socket_->setSocketOption(
- GeneralTransportOptions::VERIFY_SIGNATURE, configuration_.verify) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (!configuration_.passphrase.empty()) {
+ std::shared_ptr<auth::Verifier> verifier =
+ std::make_shared<auth::SymmetricVerifier>(configuration_.passphrase);
+ if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier) == SOCKET_OPTION_NOT_SET)
+ return ERROR_SETUP;
}
ret = consumer_socket_->setSocketOption(
@@ -462,16 +587,11 @@ class HIperfClient {
}
if (!configuration_.rtc_) {
- /* key_callback_->setConsumer(consumer_socket_); */
- /* consumer_socket_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- * key_callback_); */
- /* consumer_socket_->setSocketOption(GeneralTransportOptions::KEY_CONTENT,
- * true); */
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, callback_);
+ ConsumerCallbacksOptions::READ_CALLBACK, &callback_);
} else {
ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_);
+ ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_);
}
if (ret == SOCKET_OPTION_NOT_SET) {
@@ -489,6 +609,13 @@ class HIperfClient {
}
}
+ if (configuration_.rtc_) {
+ std::shared_ptr<TransportStatistics> transport_stats;
+ consumer_socket_->getSocketOption(
+ OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
+ transport_stats->setAlpha(0.0);
+ }
+
ret = consumer_socket_->setSocketOption(
ConsumerCallbacksOptions::STATS_SUMMARY,
(ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration,
@@ -515,15 +642,15 @@ class HIperfClient {
std::cout << "Starting download of " << configuration_.name << std::endl;
signals_.add(SIGINT);
- signals_.async_wait([this](const std::error_code &, const int &) {
- consumer_socket_->stop();
- io_service_.stop();
- });
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { io_service_.stop(); });
t_download_ = t_stats_ = std::chrono::steady_clock::now();
consumer_socket_->asyncConsume(configuration_.name);
io_service_.run();
+ consumer_socket_->stop();
+
return ERROR_SUCCESS;
}
@@ -648,36 +775,6 @@ class HIperfClient {
std::cout << "Key size: " << total_size << " bytes" << std::endl;
}
- void readKey() {
- std::shared_ptr<utils::Verifier> verifier =
- std::make_shared<utils::Verifier>();
- verifier->addKeyFromPassphrase(*key_, utils::CryptoSuite::HMAC_SHA256);
-
- if (consumer_socket_) {
- consumer_socket_->setSocketOption(GeneralTransportOptions::KEY_CONTENT,
- false);
- consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
- verifier);
- } else {
- std::cout << "Consumer socket not set" << std::endl;
- return;
- }
-
- if (validateKey()) {
- std::cout << "Key has been authenticated" << std::endl;
- } else {
- std::cout << "Key could not be authenticated" << std::endl;
- return;
- }
-
- if (consumer_socket_->verifyKeyPackets()) {
- std::cout << "Signatures of key packets are valid" << std::endl;
- } else {
- std::cout << "Signatures of key packets are not valid" << std::endl;
- return;
- }
- }
-
void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) {
consumer_socket_ = consumer_socket;
}
@@ -693,14 +790,31 @@ class HIperfClient {
Time t_download_;
uint32_t total_duration_milliseconds_;
uint64_t old_bytes_value_;
+ uint64_t old_interest_tx_value_;
+ uint64_t old_fec_interest_tx_value_;
+ uint64_t old_fec_data_rx_value_;
+ uint64_t old_lost_data_value_;
+ uint64_t old_bytes_recovered_value_;
+ uint32_t old_retx_value_;
+ uint32_t old_sent_int_value_;
+ uint32_t old_received_nacks_value_;
+
+ // IMPORTANT: to be used only for performance testing, when consumer and
+ // producer are synchronized. Used for rtc only at the moment
+ double avg_data_delay_;
+ uint32_t delay_sample_;
+
+ uint32_t received_bytes_;
+ uint32_t received_data_pkt_;
+
asio::io_service io_service_;
asio::signal_set signals_;
- std::shared_ptr<ConsumerSocket> consumer_socket_;
uint32_t expected_seg_;
std::unordered_set<uint32_t> lost_packets_;
- RTCCallback *rtc_callback_;
- Callback *callback_;
- KeyCallback *key_callback_;
+ RTCCallback rtc_callback_;
+ Callback callback_;
+ KeyCallback key_callback_;
+ std::shared_ptr<ConsumerSocket> consumer_socket_;
}; // namespace interface
/**
@@ -723,11 +837,11 @@ class HIperfServer {
#ifndef _WIN32
ptr_last_segment_(&last_segment_),
input_(io_service_),
- rtc_running_(false)
+ rtc_running_(false),
#else
- ptr_last_segment_(&last_segment_)
+ ptr_last_segment_(&last_segment_),
#endif
- {
+ flow_name_(configuration_.name.getName()) {
std::string buffer(configuration_.payload_size_, 'X');
std::cout << "Producing contents under name " << conf.name.getName()
<< std::endl;
@@ -738,9 +852,9 @@ class HIperfServer {
#endif
for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = std::make_shared<ContentObject>(
- conf.name.getName(), HF_INET6_TCP, (const uint8_t *)buffer.data(),
- buffer.size());
+ content_objects_[i] = ContentObject::Ptr(
+ new ContentObject(conf.name.getName(), HF_INET6_TCP, 0,
+ (const uint8_t *)buffer.data(), buffer.size()));
content_objects_[i]->setLifetime(
default_values::content_object_expiry_time);
}
@@ -799,15 +913,16 @@ class HIperfServer {
produceContentAsync(p, interest.getName(), suffix);
}
- void produceContent(ProducerSocket &p, Name content_name, uint32_t suffix) {
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) {
auto b = utils::MemBuf::create(configuration_.download_size);
std::memset(b->writableData(), '?', configuration_.download_size);
b->append(configuration_.download_size);
uint32_t total;
utils::TimePoint t0 = utils::SteadyClock::now();
- total = p.produce(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
utils::TimePoint t1 = utils::SteadyClock::now();
std::cout
@@ -822,11 +937,6 @@ class HIperfServer {
auto b = utils::MemBuf::create(configuration_.download_size);
std::memset(b->writableData(), '?', configuration_.download_size);
b->append(configuration_.download_size);
- /* std::string passphrase = "hunter2"; */
- /* auto b = utils::MemBuf::create(passphrase.length() + 1); */
- /* std::memcpy(b->writableData(), passphrase.c_str(), passphrase.length() +
- * 1); */
- /* b->append(passphrase.length() + 1); */
p.asyncProduce(content_name, std::move(b),
!configuration_.multiphase_produce_, suffix,
@@ -845,23 +955,22 @@ class HIperfServer {
std::placeholders::_1, std::placeholders::_2));
}
- std::shared_ptr<utils::Identity> getProducerIdentity(
- std::string &keystore_name, std::string &keystore_password,
- utils::CryptoHashType &hash_algorithm) {
- if (access(keystore_name.c_str(), F_OK) != -1) {
- return std::make_shared<utils::Identity>(keystore_name, keystore_password,
- hash_algorithm);
- } else {
- return std::make_shared<utils::Identity>(keystore_name, keystore_password,
- utils::CryptoSuite::RSA_SHA256,
- 1024, 365, "producer-test");
+ std::shared_ptr<auth::Identity> getProducerIdentity(
+ std::string &keystore_path, std::string &keystore_pwd,
+ auth::CryptoHashType &hash_type) {
+ if (access(keystore_path.c_str(), F_OK) != -1) {
+ return std::make_shared<auth::Identity>(keystore_path, keystore_pwd,
+ hash_type);
}
+ return std::make_shared<auth::Identity>(keystore_path, keystore_pwd,
+ auth::CryptoSuite::RSA_SHA256, 1024,
+ 365, "producer-test");
}
int setup() {
int ret;
+ int production_protocol;
-#ifdef SECURE_HICNTRANSPORT
if (configuration_.secure_) {
auto identity = getProducerIdentity(configuration_.keystore_name,
configuration_.keystore_password,
@@ -869,22 +978,21 @@ class HIperfServer {
producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
configuration_.rtc_, identity);
} else {
-#endif
- if (configuration_.rtc_) {
- producer_socket_ = std::make_unique<RTCProducerSocket>();
+ if (!configuration_.rtc_) {
+ production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
} else {
- producer_socket_ = std::make_unique<ProducerSocket>();
+ production_protocol = ProductionProtocolAlgorithms::RTC_PROD;
}
-#ifdef SECURE_HICNTRANSPORT
+
+ producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
}
-#endif
if (configuration_.sign) {
- std::shared_ptr<utils::Signer> signer;
+ std::shared_ptr<auth::Signer> signer;
if (!configuration_.passphrase.empty()) {
- signer = std::make_shared<utils::Signer>(
- configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256);
+ signer = std::make_shared<auth::SymmetricSigner>(
+ auth::CryptoSuite::HMAC_SHA256, configuration_.passphrase);
} else if (!configuration_.keystore_name.empty()) {
auto identity = getProducerIdentity(configuration_.keystore_name,
configuration_.keystore_password,
@@ -901,8 +1009,7 @@ class HIperfServer {
}
uint32_t rtc_header_size = 0;
- if(configuration_.rtc_)
- rtc_header_size = 8;
+ if (configuration_.rtc_) rtc_header_size = 12;
producer_socket_->setSocketOption(
GeneralTransportOptions::DATA_PACKET_SIZE,
(uint32_t)(
@@ -987,8 +1094,61 @@ class HIperfServer {
this, std::placeholders::_1));
auto payload =
content_objects_[content_objects_index_++ & mask_]->getPayload();
- producer_socket_->produce(
- payload->data(), payload->length() < 1400 ? payload->length() : 1400);
+
+ // this is used to compute the data packet delay
+ // Used only for performance evaluation
+ // It requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ producer_socket_->produceDatagram(
+ flow_name_, payload->data(),
+ payload->length() < 1400 ? payload->length() : 1400);
+ }
+
+ void sendRTCContentObjectCallbackWithTrace(std::error_code ec) {
+ if (ec) return;
+
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ uint32_t packet_len =
+ configuration_.trace_[configuration_.trace_index_].size;
+
+ // this is used to compute the data packet delay
+ // used only for performance evaluation
+ // it requires clock synchronization between producer and consumer
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+
+ if (packet_len > payload->length()) packet_len = payload->length();
+ if (packet_len > 1400) packet_len = 1400;
+
+ producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+
+ uint32_t next_index = configuration_.trace_index_ + 1;
+ uint64_t schedule_next;
+ if (next_index < configuration_.trace_.size()) {
+ schedule_next =
+ configuration_.trace_[next_index].timestamp -
+ configuration_.trace_[configuration_.trace_index_].timestamp;
+ } else {
+ // here we need to loop, schedule in a random time
+ schedule_next = 1000;
+ }
+
+ configuration_.trace_index_ =
+ (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, this,
+ std::placeholders::_1));
}
#ifndef _WIN32
@@ -1021,6 +1181,21 @@ class HIperfServer {
}
#endif
+ int parseTraceFile() {
+ std::ifstream trace(configuration_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ struct packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ configuration_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
int run() {
std::cerr << "Starting to serve consumers" << std::endl;
@@ -1038,6 +1213,21 @@ class HIperfServer {
input_, input_buffer_, '\n',
std::bind(&HIperfServer::handleInput, this, std::placeholders::_1,
std::placeholders::_2));
+ } else if (configuration_.trace_based_) {
+ std::cout << "trace-based mode enabled" << std::endl;
+ if (configuration_.trace_file_ == nullptr) {
+ std::cout << "cannot find the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ if (parseTraceFile() < 0) {
+ std::cout << "cannot parse the trace file" << std::endl;
+ return ERROR_SETUP;
+ }
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(std::chrono::milliseconds(1));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace,
+ this, std::placeholders::_1));
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
@@ -1078,6 +1268,7 @@ class HIperfServer {
asio::posix::stream_descriptor input_;
asio::streambuf input_buffer_;
bool rtc_running_;
+ core::Name flow_name_;
#endif
}; // namespace interface
@@ -1095,6 +1286,8 @@ void usage() {
<< "Run RTC protocol (client or server)" << std::endl;
std::cerr << "-f\t<filename>\t\t\t"
<< "Log file" << std::endl;
+ std::cerr << "-z\t<io_module>\t\t\t"
+ << "IO module to use. Default: hicnlight_module" << std::endl;
#endif
std::cerr << std::endl;
std::cerr << "SERVER SPECIFIC:" << std::endl;
@@ -1140,14 +1333,19 @@ void usage() {
"Interactive mode, start/stop real time content production "
"by pressing return. To be used with the -R option"
<< std::endl;
-#ifdef SECURE_HICNTRANSPORT
+ std::cerr
+ << "-T\t<filename>\t\t\t"
+ "Trace based mode, hiperf takes as input a file with a trace. "
+ "Each line of the file indicates the timestamp and the size of "
+ "the packet to generate. To be used with the -R option. -B and -I "
+ "will be ignored."
+ << std::endl;
std::cerr << "-E\t\t\t\t\t"
<< "Enable encrypted communication. Requires the path to a p12 "
"file containing the "
"crypto material used for the TLS handshake"
<< std::endl;
#endif
-#endif
std::cerr << std::endl;
std::cerr << "CLIENT SPECIFIC:" << std::endl;
std::cerr << "-b\t<beta_parameter>\t\t"
@@ -1169,26 +1367,21 @@ void usage() {
std::cerr << "-i\t<stats_interval>\t\t"
<< "Show the statistics every <stats_interval> milliseconds."
<< std::endl;
- std::cerr << "-v\t\t\t\t\t"
- << "Enable verification of received data" << std::endl;
std::cerr << "-c\t<certificate_path>\t\t"
<< "Path of the producer certificate to be used for verifying the "
- "origin of the packets received. Must be used with -v."
+ "origin of the packets received."
<< std::endl;
std::cerr << "-k\t<passphrase>\t\t\t"
<< "String from which is derived the symmetric key used by the "
- "producer to sign packets and by the consumer to verify them. "
- "Must be used with -v."
+ "producer to sign packets and by the consumer to verify them."
<< std::endl;
std::cerr << "-t\t\t\t\t\t"
"Test mode, check if the client is receiving the "
"correct data. This is an RTC specific option, to be "
"used with the -R (default false)"
<< std::endl;
-#ifdef SECURE_HICNTRANSPORT
std::cerr << "-P\t\t\t\t\t"
<< "Prefix of the producer where to do the handshake" << std::endl;
-#endif
}
int main(int argc, char *argv[]) {
@@ -1205,6 +1398,9 @@ int main(int argc, char *argv[]) {
int options = 0;
char *log_file = nullptr;
+ interface::global_config::IoModuleConfiguration config;
+ std::string conf_file;
+ config.name = "hicnlight_module";
// Consumer
ClientConfiguration client_configuration;
@@ -1214,9 +1410,9 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(argc, argv,
- "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
- -1) {
+ while ((opt = getopt(
+ argc, argv,
+ "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -1225,11 +1421,19 @@ int main(int argc, char *argv[]) {
}
case 'I': {
server_configuration.interactive_ = true;
+ server_configuration.trace_based_ = false;
+ break;
+ }
+ case 'T': {
+ server_configuration.interactive_ = false;
+ server_configuration.trace_based_ = true;
+ server_configuration.trace_file_ = optarg;
break;
}
#else
while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
+ "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:")) !=
+ -1) {
switch (opt) {
#endif
case 'f': {
@@ -1241,6 +1445,14 @@ int main(int argc, char *argv[]) {
server_configuration.rtc_ = true;
break;
}
+ case 'z': {
+ config.name = optarg;
+ break;
+ }
+ case 'F': {
+ conf_file = optarg;
+ break;
+ }
// Server or Client
case 'S': {
@@ -1255,7 +1467,6 @@ int main(int argc, char *argv[]) {
server_configuration.passphrase = std::string(optarg);
client_configuration.passphrase = std::string(optarg);
server_configuration.sign = true;
- options = -1;
break;
}
@@ -1280,23 +1491,16 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
-#ifdef SECURE_HICNTRANSPORT
case 'P': {
client_configuration.producer_prefix_ = Prefix(optarg);
client_configuration.secure_ = true;
break;
}
-#endif
case 'c': {
client_configuration.producer_certificate = std::string(optarg);
options = 1;
break;
}
- case 'v': {
- client_configuration.verify = true;
- options = 1;
- break;
- }
case 'i': {
client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
options = 1;
@@ -1346,11 +1550,11 @@ int main(int argc, char *argv[]) {
}
case 'y': {
if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256;
+ server_configuration.hash_algorithm = auth::CryptoHashType::SHA_256;
} else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512;
+ server_configuration.hash_algorithm = auth::CryptoHashType::SHA_512;
} else if (strncasecmp(optarg, "crc32", 5) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C;
+ server_configuration.hash_algorithm = auth::CryptoHashType::CRC32C;
} else {
std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
<< std::endl;
@@ -1375,13 +1579,11 @@ int main(int argc, char *argv[]) {
options = -1;
break;
}
-#ifdef SECURE_HICNTRANSPORT
case 'E': {
server_configuration.keystore_name = std::string(optarg);
server_configuration.secure_ = true;
break;
}
-#endif
case 'h':
default:
usage();
@@ -1395,7 +1597,6 @@ int main(int argc, char *argv[]) {
<< std::endl;
usage();
return EXIT_FAILURE;
-
} else if (options < 0 && role > 0) {
std::cerr << "Server options cannot be used when using the "
"software in client mode"
@@ -1443,6 +1644,14 @@ int main(int argc, char *argv[]) {
}
#endif
+ /**
+ * IO module configuration
+ */
+ config.set();
+
+ // Parse config file
+ transport::interface::global_config::parseConfigurationFile(conf_file);
+
if (role > 0) {
HIperfClient c(client_configuration);
if (c.setup() != ERROR_SETUP) {
@@ -1461,11 +1670,11 @@ int main(int argc, char *argv[]) {
#ifdef _WIN32
WSACleanup();
#endif
+
return 0;
}
} // end namespace interface
-
} // end namespace transport
int main(int argc, char *argv[]) {