diff options
Diffstat (limited to 'utils/src')
-rw-r--r-- | utils/src/hiperf.cc | 201 |
1 files changed, 163 insertions, 38 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 860e7a000..e10907ccc 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/interfaces/rtc_socket_producer.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_producer.h> #ifndef _WIN32 @@ -71,6 +72,41 @@ struct ClientConfiguration { bool rtc_; }; +class Rate { + public: + Rate() : rate_kbps_(0) {} + + Rate(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + } + + Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} + + Rate &operator=(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + + return *this; + } + + std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { + return std::chrono::microseconds( + packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_))); + } + + private: + float rate_kbps_; +}; + struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), @@ -84,7 +120,10 @@ struct ServerConfiguration { hash_algorithm(HashAlgorithm::SHA_256), keystore_name("/tmp/rsa_crypto_material.p12"), keystore_password("cisco"), - multiphase_produce_(false) {} + multiphase_produce_(false), + rtc_(false), + production_rate_(std::string("2048kbps")), + payload_size_(1400) {} Prefix name; bool virtual_producer; @@ -98,6 +137,9 @@ struct ServerConfiguration { std::string keystore_name; std::string keystore_password; bool multiphase_produce_; + bool rtc_; + Rate production_rate_; + std::size_t payload_size_; }; class HIperfClient { @@ -338,10 +380,11 @@ class HIperfServer { HIperfServer(ServerConfiguration &conf) : configuration_(conf), signals_(io_service_, SIGINT), + rtc_timer_(io_service_), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { - std::string buffer(1440, 'X'); + std::string buffer(configuration_.payload_size_, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; @@ -357,16 +400,6 @@ class HIperfServer { void processInterest(ProducerSocket &p, const Interest &interest) { content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); - - // if (final_chunk_number_ > 0 && interest.getName().getSuffix() == 0) { - // auto name = interest.getName(); - // manifest_ = std::make_shared<ContentObjectManifest>(name); - // // manifest_->setFinalChunkNumber(final_chunk_number_); - // manifest_->encode(); - // p.produce(*manifest_); - // return; - // } - producer_socket_->produce( *content_objects_[content_objects_index_++ & mask_]); } @@ -414,7 +447,11 @@ class HIperfServer { int setup() { int ret; - producer_socket_ = std::make_unique<ProducerSocket>(); + if (configuration_.rtc_) { + producer_socket_ = std::make_unique<RTCProducerSocket>(); + } else { + producer_socket_ = std::make_unique<ProducerSocket>(); + } if (configuration_.sign) { auto identity = setProducerIdentity(configuration_.keystore_name, @@ -431,6 +468,13 @@ class HIperfServer { producer_socket_->registerPrefix(configuration_.name); producer_socket_->connect(); + if (configuration_.rtc_) { + std::cout << "Running RTC producer: all other options (with the " + "exception of the bitrate) will be ignored." + << std::endl; + return ERROR_SUCCESS; + } + if (!configuration_.virtual_producer) { if (producer_socket_->setSocketOption( GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, @@ -485,6 +529,20 @@ class HIperfServer { return ERROR_SUCCESS; } + void sendRTCContentObjectCallback(std::error_code ec) { + if (!ec) { + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + producer_socket_->produce(payload.data(), payload.length()); + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + } + int run() { std::cerr << "Starting to serve consumers" << std::endl; @@ -494,6 +552,15 @@ class HIperfServer { io_service_.stop(); }); + if (configuration_.rtc_) { + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + io_service_.run(); return ERROR_SUCCESS; @@ -503,6 +570,7 @@ class HIperfServer { ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; + asio::steady_timer rtc_timer_; std::vector<std::shared_ptr<ContentObject>> content_objects_; std::uint16_t content_objects_index_; std::uint16_t mask_; @@ -519,10 +587,17 @@ void usage() { #ifndef _WIN32 std::cerr << "-D\t\t\t\t\t" << "Run as a daemon" << std::endl; + std::cerr << "-R\t\t\t\t\t" + << "Run RTC protocol (client or server)" << std::endl; + std::cerr << "-f\t<filename>\t\t\t" + << "Log file" << std::endl; #endif std::cerr << std::endl; std::cerr << "Server specific:" << std::endl; - std::cerr << "-s\t<content_size>\t\t\tSize of the content to publish" + std::cerr << "-A\t<download_size>\t\t\tSize of the content to publish. This " + "is not the size of the packet (see -s for it)." + << std::endl; + std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." << std::endl; std::cerr << "-r\t\t\t\t\t" << "Produce real content of content_size bytes" << std::endl; @@ -542,6 +617,15 @@ void usage() { << std::endl; std::cerr << "-p\t<password>\t\t\t" << "Password for p12 keystore" << std::endl; + std::cerr << "-x\t\t\t\t\t" + << "Produce a content of <download_size>, then after downloading " + "it produce a new content of" + << std::endl + << "\t\t\t\t\t<download_size> without resetting " + "the suffix to 0." + << std::endl; + std::cerr << "-B\t<bitrate>\t\t\t" + << "Bitrate for RTC producer, to be used with the -R option." << std::endl; std::cerr << std::endl; std::cerr << "Client specific:" << std::endl; std::cerr << "-b\t<beta_parameter>\t\t" @@ -561,6 +645,9 @@ void usage() { "to be used for verifying the " "origin of the packets received" << std::endl; + std::cerr << "-i\t<stats_interval>\t\t" + << "Show the statistics every <stats_interval> milliseconds." + << std::endl; std::cout << "-v\t\t\t\t\t" << "Enable verification of received data" << std::endl; } @@ -588,84 +675,109 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) { + while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + -1) { switch (opt) { // Common - case 'D': + case 'D': { daemon = true; break; + } #else - while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) { + while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + -1) { switch (opt) { #endif - case 'f': + case 'f': { log_file = optarg; break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } // Server or Client - case 'S': + case 'S': { role -= 1; break; - case 'C': + } + case 'C': { role += 1; break; + } // Client specifc - case 'b': + case 'b': { client_configuration.beta = std::stod(optarg); options = 1; break; - case 'd': + } + case 'd': { client_configuration.drop_factor = std::stod(optarg); options = 1; break; - case 'W': + } + case 'W': { client_configuration.window = std::stod(optarg); options = 1; break; - case 'M': + } + case 'M': { client_configuration.virtual_download = false; options = 1; break; - case 'c': + } + case 'c': { client_configuration.producer_certificate = std::string(optarg); options = 1; break; - case 'v': + } + case 'v': { client_configuration.verify = true; options = 1; break; - case 'i': + } + case 'i': { client_configuration.report_interval_milliseconds_ = std::stoul(optarg); options = 1; break; - case 'R': - client_configuration.rtc_ = true; - break; + } // Server specific - case 's': + case 'A': { server_configuration.download_size = std::stoul(optarg); options = -1; break; - case 'r': + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { server_configuration.virtual_producer = false; options = -1; break; - case 'm': + } + case 'm': { server_configuration.manifest = true; options = -1; break; - case 'l': + } + case 'l': { server_configuration.live_production = true; options = -1; break; - case 'k': + } + case 'k': { server_configuration.keystore_name = std::string(optarg); server_configuration.sign = true; options = -1; break; - case 'y': + } + case 'y': { if (strncasecmp(optarg, "sha256", 6) == 0) { server_configuration.hash_algorithm = HashAlgorithm::SHA_256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { @@ -678,14 +790,27 @@ int main(int argc, char *argv[]) { } options = -1; break; - case 'p': + } + case 'p': { server_configuration.keystore_password = std::string(optarg); options = -1; break; - case 'x': + } + case 'x': { server_configuration.multiphase_produce_ = true; options = -1; break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + std::cout << "---------------------------------------------------------" + "---------------------->" + << str << std::endl; + server_configuration.production_rate_ = str; + options = -1; + break; + } case 'h': default: usage(); |