From 7254a7d407c62d705ef410052825be74a6bd1b4e Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 14 Mar 2019 19:46:43 +0100 Subject: [HICN-116] Added RTC producer option to hiperf. Change-Id: I665b7dd3c8eae222d62057bc3387daf6c73df1f8 Signed-off-by: Mauro Sardara --- .../transport/interfaces/rtc_socket_producer.cc | 24 +-- .../transport/interfaces/rtc_socket_producer.h | 4 +- .../hicn/transport/interfaces/socket_producer.h | 8 +- pom.xml | 4 +- utils/src/hiperf.cc | 201 +++++++++++++++++---- 5 files changed, 186 insertions(+), 55 deletions(-) diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index c07ca7989..5a432a99f 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -74,22 +74,22 @@ RTCProducerSocket::RTCProducerSocket() RTCProducerSocket::~RTCProducerSocket() {} -void RTCProducerSocket::registerName(Prefix &producer_namespace) { +void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { ProducerSocket::registerPrefix(producer_namespace); flowName_ = producer_namespace.getName(); - - if (flowName_.getType() == HNT_CONTIGUOUS_V4 || - flowName_.getType() == HNT_IOV_V4) { - headerSize_ = sizeof(hicn_v6_hdr_t::ip); - } else if (flowName_.getType() == HNT_CONTIGUOUS_V6 || - flowName_.getType() == HNT_IOV_V6) { - headerSize_ = sizeof(hicn_v4_hdr_t::ip); - } else { - throw errors::RuntimeException("Unknown name format."); + auto family = flowName_.getAddressFamily(); + + switch (family) { + case AF_INET6: + headerSize_ = Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + break; + case AF_INET: + headerSize_ = Packet::getHeaderSizeFromFormat(HF_INET_TCP); + break; + default: + throw errors::RuntimeException("Unknown name format."); } - - headerSize_ += TCP_HEADER_SIZE; } void RTCProducerSocket::updateStats(uint32_t packet_size) { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index f1bcaa9e8..bc54be4bb 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -33,9 +33,9 @@ class RTCProducerSocket : public ProducerSocket { ~RTCProducerSocket(); - void registerName(Prefix &producer_namespace); + void registerPrefix(const Prefix &producer_namespace) override; - void produce(const uint8_t *buffer, size_t buffer_size); + void produce(const uint8_t *buffer, size_t buffer_size) override; void onInterest(Interest::Ptr &&interest) override; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index d3738dc59..6ba5671cc 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -51,13 +51,19 @@ class ProducerSocket : public Socket, void produce(ContentObject &content_object); + virtual void produce(const uint8_t *buffer, size_t buffer_size) { + // This API is meant to be used just with the RTC producer. + // Here it cannot be used since no name for the content is specified. + throw errors::NotImplementedException(); + } + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size); void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer); void asyncProduce(ContentObject &content_object); - void registerPrefix(const Prefix &producer_namespace); + virtual void registerPrefix(const Prefix &producer_namespace); void serveForever(); diff --git a/pom.xml b/pom.xml index 63d26cc80..796728a28 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> + io.fd.hicn.common @@ -40,4 +40,4 @@ hicn-plugin libtransport - \ No newline at end of file + --> \ No newline at end of file diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 860e7a000..e10907ccc 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include #include #include #ifndef _WIN32 @@ -71,6 +72,41 @@ struct ClientConfiguration { bool rtc_; }; +class Rate { + public: + Rate() : rate_kbps_(0) {} + + Rate(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + } + + Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} + + Rate &operator=(const std::string &rate) { + std::size_t found = rate.find("kbps"); + if (found != std::string::npos) { + rate_kbps_ = std::stof(rate.substr(0, found)); + } else { + throw std::runtime_error("Format " + rate + " not correct"); + } + + return *this; + } + + std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { + return std::chrono::microseconds( + packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_))); + } + + private: + float rate_kbps_; +}; + struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), @@ -84,7 +120,10 @@ struct ServerConfiguration { hash_algorithm(HashAlgorithm::SHA_256), keystore_name("/tmp/rsa_crypto_material.p12"), keystore_password("cisco"), - multiphase_produce_(false) {} + multiphase_produce_(false), + rtc_(false), + production_rate_(std::string("2048kbps")), + payload_size_(1400) {} Prefix name; bool virtual_producer; @@ -98,6 +137,9 @@ struct ServerConfiguration { std::string keystore_name; std::string keystore_password; bool multiphase_produce_; + bool rtc_; + Rate production_rate_; + std::size_t payload_size_; }; class HIperfClient { @@ -338,10 +380,11 @@ class HIperfServer { HIperfServer(ServerConfiguration &conf) : configuration_(conf), signals_(io_service_, SIGINT), + rtc_timer_(io_service_), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { - std::string buffer(1440, 'X'); + std::string buffer(configuration_.payload_size_, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; @@ -357,16 +400,6 @@ class HIperfServer { void processInterest(ProducerSocket &p, const Interest &interest) { content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); - - // if (final_chunk_number_ > 0 && interest.getName().getSuffix() == 0) { - // auto name = interest.getName(); - // manifest_ = std::make_shared(name); - // // manifest_->setFinalChunkNumber(final_chunk_number_); - // manifest_->encode(); - // p.produce(*manifest_); - // return; - // } - producer_socket_->produce( *content_objects_[content_objects_index_++ & mask_]); } @@ -414,7 +447,11 @@ class HIperfServer { int setup() { int ret; - producer_socket_ = std::make_unique(); + if (configuration_.rtc_) { + producer_socket_ = std::make_unique(); + } else { + producer_socket_ = std::make_unique(); + } if (configuration_.sign) { auto identity = setProducerIdentity(configuration_.keystore_name, @@ -431,6 +468,13 @@ class HIperfServer { producer_socket_->registerPrefix(configuration_.name); producer_socket_->connect(); + if (configuration_.rtc_) { + std::cout << "Running RTC producer: all other options (with the " + "exception of the bitrate) will be ignored." + << std::endl; + return ERROR_SUCCESS; + } + if (!configuration_.virtual_producer) { if (producer_socket_->setSocketOption( GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, @@ -485,6 +529,20 @@ class HIperfServer { return ERROR_SUCCESS; } + void sendRTCContentObjectCallback(std::error_code ec) { + if (!ec) { + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + producer_socket_->produce(payload.data(), payload.length()); + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + } + int run() { std::cerr << "Starting to serve consumers" << std::endl; @@ -494,6 +552,15 @@ class HIperfServer { io_service_.stop(); }); + if (configuration_.rtc_) { + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + io_service_.run(); return ERROR_SUCCESS; @@ -503,6 +570,7 @@ class HIperfServer { ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; + asio::steady_timer rtc_timer_; std::vector> content_objects_; std::uint16_t content_objects_index_; std::uint16_t mask_; @@ -519,10 +587,17 @@ void usage() { #ifndef _WIN32 std::cerr << "-D\t\t\t\t\t" << "Run as a daemon" << std::endl; + std::cerr << "-R\t\t\t\t\t" + << "Run RTC protocol (client or server)" << std::endl; + std::cerr << "-f\t\t\t\t" + << "Log file" << std::endl; #endif std::cerr << std::endl; std::cerr << "Server specific:" << std::endl; - std::cerr << "-s\t\t\t\tSize of the content to publish" + std::cerr << "-A\t\t\t\tSize of the content to publish. This " + "is not the size of the packet (see -s for it)." + << std::endl; + std::cerr << "-s\t\t\t\tSize of the payload of each data packet." << std::endl; std::cerr << "-r\t\t\t\t\t" << "Produce real content of content_size bytes" << std::endl; @@ -542,6 +617,15 @@ void usage() { << std::endl; std::cerr << "-p\t\t\t\t" << "Password for p12 keystore" << std::endl; + std::cerr << "-x\t\t\t\t\t" + << "Produce a content of , then after downloading " + "it produce a new content of" + << std::endl + << "\t\t\t\t\t without resetting " + "the suffix to 0." + << std::endl; + std::cerr << "-B\t\t\t\t" + << "Bitrate for RTC producer, to be used with the -R option." << std::endl; std::cerr << std::endl; std::cerr << "Client specific:" << std::endl; std::cerr << "-b\t\t\t" @@ -561,6 +645,9 @@ void usage() { "to be used for verifying the " "origin of the packets received" << std::endl; + std::cerr << "-i\t\t\t" + << "Show the statistics every milliseconds." + << std::endl; std::cout << "-v\t\t\t\t\t" << "Enable verification of received data" << std::endl; } @@ -588,84 +675,109 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) { + while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + -1) { switch (opt) { // Common - case 'D': + case 'D': { daemon = true; break; + } #else - while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) { + while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + -1) { switch (opt) { #endif - case 'f': + case 'f': { log_file = optarg; break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } // Server or Client - case 'S': + case 'S': { role -= 1; break; - case 'C': + } + case 'C': { role += 1; break; + } // Client specifc - case 'b': + case 'b': { client_configuration.beta = std::stod(optarg); options = 1; break; - case 'd': + } + case 'd': { client_configuration.drop_factor = std::stod(optarg); options = 1; break; - case 'W': + } + case 'W': { client_configuration.window = std::stod(optarg); options = 1; break; - case 'M': + } + case 'M': { client_configuration.virtual_download = false; options = 1; break; - case 'c': + } + case 'c': { client_configuration.producer_certificate = std::string(optarg); options = 1; break; - case 'v': + } + case 'v': { client_configuration.verify = true; options = 1; break; - case 'i': + } + case 'i': { client_configuration.report_interval_milliseconds_ = std::stoul(optarg); options = 1; break; - case 'R': - client_configuration.rtc_ = true; - break; + } // Server specific - case 's': + case 'A': { server_configuration.download_size = std::stoul(optarg); options = -1; break; - case 'r': + } + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { server_configuration.virtual_producer = false; options = -1; break; - case 'm': + } + case 'm': { server_configuration.manifest = true; options = -1; break; - case 'l': + } + case 'l': { server_configuration.live_production = true; options = -1; break; - case 'k': + } + case 'k': { server_configuration.keystore_name = std::string(optarg); server_configuration.sign = true; options = -1; break; - case 'y': + } + case 'y': { if (strncasecmp(optarg, "sha256", 6) == 0) { server_configuration.hash_algorithm = HashAlgorithm::SHA_256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { @@ -678,14 +790,27 @@ int main(int argc, char *argv[]) { } options = -1; break; - case 'p': + } + case 'p': { server_configuration.keystore_password = std::string(optarg); options = -1; break; - case 'x': + } + case 'x': { server_configuration.multiphase_produce_ = true; options = -1; break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + std::cout << "---------------------------------------------------------" + "---------------------->" + << str << std::endl; + server_configuration.production_rate_ = str; + options = -1; + break; + } case 'h': default: usage(); -- cgit 1.2.3-korg