summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc24
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h4
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h8
-rw-r--r--pom.xml4
-rw-r--r--utils/src/hiperf.cc201
5 files changed, 186 insertions, 55 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index c07ca7989..5a432a99f 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -74,22 +74,22 @@ RTCProducerSocket::RTCProducerSocket()
RTCProducerSocket::~RTCProducerSocket() {}
-void RTCProducerSocket::registerName(Prefix &producer_namespace) {
+void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
ProducerSocket::registerPrefix(producer_namespace);
flowName_ = producer_namespace.getName();
-
- if (flowName_.getType() == HNT_CONTIGUOUS_V4 ||
- flowName_.getType() == HNT_IOV_V4) {
- headerSize_ = sizeof(hicn_v6_hdr_t::ip);
- } else if (flowName_.getType() == HNT_CONTIGUOUS_V6 ||
- flowName_.getType() == HNT_IOV_V6) {
- headerSize_ = sizeof(hicn_v4_hdr_t::ip);
- } else {
- throw errors::RuntimeException("Unknown name format.");
+ auto family = flowName_.getAddressFamily();
+
+ switch (family) {
+ case AF_INET6:
+ headerSize_ = Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ break;
+ case AF_INET:
+ headerSize_ = Packet::getHeaderSizeFromFormat(HF_INET_TCP);
+ break;
+ default:
+ throw errors::RuntimeException("Unknown name format.");
}
-
- headerSize_ += TCP_HEADER_SIZE;
}
void RTCProducerSocket::updateStats(uint32_t packet_size) {
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index f1bcaa9e8..bc54be4bb 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -33,9 +33,9 @@ class RTCProducerSocket : public ProducerSocket {
~RTCProducerSocket();
- void registerName(Prefix &producer_namespace);
+ void registerPrefix(const Prefix &producer_namespace) override;
- void produce(const uint8_t *buffer, size_t buffer_size);
+ void produce(const uint8_t *buffer, size_t buffer_size) override;
void onInterest(Interest::Ptr &&interest) override;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index d3738dc59..6ba5671cc 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -51,13 +51,19 @@ class ProducerSocket : public Socket<BasePortal>,
void produce(ContentObject &content_object);
+ virtual void produce(const uint8_t *buffer, size_t buffer_size) {
+ // This API is meant to be used just with the RTC producer.
+ // Here it cannot be used since no name for the content is specified.
+ throw errors::NotImplementedException();
+ }
+
void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size);
void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer);
void asyncProduce(ContentObject &content_object);
- void registerPrefix(const Prefix &producer_namespace);
+ virtual void registerPrefix(const Prefix &producer_namespace);
void serveForever();
diff --git a/pom.xml b/pom.xml
index 63d26cc80..796728a28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- --><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.fd.hicn.common</groupId>
@@ -40,4 +40,4 @@
<module>hicn-plugin</module>
<module>libtransport</module>
</modules>
-</project> \ No newline at end of file
+</project> --> \ No newline at end of file
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 860e7a000..e10907ccc 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/interfaces/rtc_socket_producer.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_producer.h>
#ifndef _WIN32
@@ -71,6 +72,41 @@ struct ClientConfiguration {
bool rtc_;
};
+class Rate {
+ public:
+ Rate() : rate_kbps_(0) {}
+
+ Rate(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+ }
+
+ Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {}
+
+ Rate &operator=(const std::string &rate) {
+ std::size_t found = rate.find("kbps");
+ if (found != std::string::npos) {
+ rate_kbps_ = std::stof(rate.substr(0, found));
+ } else {
+ throw std::runtime_error("Format " + rate + " not correct");
+ }
+
+ return *this;
+ }
+
+ std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) {
+ return std::chrono::microseconds(
+ packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_)));
+ }
+
+ private:
+ float rate_kbps_;
+};
+
struct ServerConfiguration {
ServerConfiguration()
: name("b001::abcd/64"),
@@ -84,7 +120,10 @@ struct ServerConfiguration {
hash_algorithm(HashAlgorithm::SHA_256),
keystore_name("/tmp/rsa_crypto_material.p12"),
keystore_password("cisco"),
- multiphase_produce_(false) {}
+ multiphase_produce_(false),
+ rtc_(false),
+ production_rate_(std::string("2048kbps")),
+ payload_size_(1400) {}
Prefix name;
bool virtual_producer;
@@ -98,6 +137,9 @@ struct ServerConfiguration {
std::string keystore_name;
std::string keystore_password;
bool multiphase_produce_;
+ bool rtc_;
+ Rate production_rate_;
+ std::size_t payload_size_;
};
class HIperfClient {
@@ -338,10 +380,11 @@ class HIperfServer {
HIperfServer(ServerConfiguration &conf)
: configuration_(conf),
signals_(io_service_, SIGINT),
+ rtc_timer_(io_service_),
content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
content_objects_index_(0),
mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) {
- std::string buffer(1440, 'X');
+ std::string buffer(configuration_.payload_size_, 'X');
std::cout << "Producing contents under name " << conf.name.getName()
<< std::endl;
@@ -357,16 +400,6 @@ class HIperfServer {
void processInterest(ProducerSocket &p, const Interest &interest) {
content_objects_[content_objects_index_ & mask_]->setName(
interest.getName());
-
- // if (final_chunk_number_ > 0 && interest.getName().getSuffix() == 0) {
- // auto name = interest.getName();
- // manifest_ = std::make_shared<ContentObjectManifest>(name);
- // // manifest_->setFinalChunkNumber(final_chunk_number_);
- // manifest_->encode();
- // p.produce(*manifest_);
- // return;
- // }
-
producer_socket_->produce(
*content_objects_[content_objects_index_++ & mask_]);
}
@@ -414,7 +447,11 @@ class HIperfServer {
int setup() {
int ret;
- producer_socket_ = std::make_unique<ProducerSocket>();
+ if (configuration_.rtc_) {
+ producer_socket_ = std::make_unique<RTCProducerSocket>();
+ } else {
+ producer_socket_ = std::make_unique<ProducerSocket>();
+ }
if (configuration_.sign) {
auto identity = setProducerIdentity(configuration_.keystore_name,
@@ -431,6 +468,13 @@ class HIperfServer {
producer_socket_->registerPrefix(configuration_.name);
producer_socket_->connect();
+ if (configuration_.rtc_) {
+ std::cout << "Running RTC producer: all other options (with the "
+ "exception of the bitrate) will be ignored."
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
if (!configuration_.virtual_producer) {
if (producer_socket_->setSocketOption(
GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
@@ -485,6 +529,20 @@ class HIperfServer {
return ERROR_SUCCESS;
}
+ void sendRTCContentObjectCallback(std::error_code ec) {
+ if (!ec) {
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+ producer_socket_->produce(payload.data(), payload.length());
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+ }
+
int run() {
std::cerr << "Starting to serve consumers" << std::endl;
@@ -494,6 +552,15 @@ class HIperfServer {
io_service_.stop();
});
+ if (configuration_.rtc_) {
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+
io_service_.run();
return ERROR_SUCCESS;
@@ -503,6 +570,7 @@ class HIperfServer {
ServerConfiguration configuration_;
asio::io_service io_service_;
asio::signal_set signals_;
+ asio::steady_timer rtc_timer_;
std::vector<std::shared_ptr<ContentObject>> content_objects_;
std::uint16_t content_objects_index_;
std::uint16_t mask_;
@@ -519,10 +587,17 @@ void usage() {
#ifndef _WIN32
std::cerr << "-D\t\t\t\t\t"
<< "Run as a daemon" << std::endl;
+ std::cerr << "-R\t\t\t\t\t"
+ << "Run RTC protocol (client or server)" << std::endl;
+ std::cerr << "-f\t<filename>\t\t\t"
+ << "Log file" << std::endl;
#endif
std::cerr << std::endl;
std::cerr << "Server specific:" << std::endl;
- std::cerr << "-s\t<content_size>\t\t\tSize of the content to publish"
+ std::cerr << "-A\t<download_size>\t\t\tSize of the content to publish. This "
+ "is not the size of the packet (see -s for it)."
+ << std::endl;
+ std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet."
<< std::endl;
std::cerr << "-r\t\t\t\t\t"
<< "Produce real content of content_size bytes" << std::endl;
@@ -542,6 +617,15 @@ void usage() {
<< std::endl;
std::cerr << "-p\t<password>\t\t\t"
<< "Password for p12 keystore" << std::endl;
+ std::cerr << "-x\t\t\t\t\t"
+ << "Produce a content of <download_size>, then after downloading "
+ "it produce a new content of"
+ << std::endl
+ << "\t\t\t\t\t<download_size> without resetting "
+ "the suffix to 0."
+ << std::endl;
+ std::cerr << "-B\t<bitrate>\t\t\t"
+ << "Bitrate for RTC producer, to be used with the -R option." << std::endl;
std::cerr << std::endl;
std::cerr << "Client specific:" << std::endl;
std::cerr << "-b\t<beta_parameter>\t\t"
@@ -561,6 +645,9 @@ void usage() {
"to be used for verifying the "
"origin of the packets received"
<< std::endl;
+ std::cerr << "-i\t<stats_interval>\t\t"
+ << "Show the statistics every <stats_interval> milliseconds."
+ << std::endl;
std::cout << "-v\t\t\t\t\t"
<< "Enable verification of received data" << std::endl;
}
@@ -588,84 +675,109 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) {
+ while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) !=
+ -1) {
switch (opt) {
// Common
- case 'D':
+ case 'D': {
daemon = true;
break;
+ }
#else
- while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vs:rmlk:y:p:hi:x")) != -1) {
+ while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) !=
+ -1) {
switch (opt) {
#endif
- case 'f':
+ case 'f': {
log_file = optarg;
break;
+ }
+ case 'R': {
+ client_configuration.rtc_ = true;
+ server_configuration.rtc_ = true;
+ break;
+ }
// Server or Client
- case 'S':
+ case 'S': {
role -= 1;
break;
- case 'C':
+ }
+ case 'C': {
role += 1;
break;
+ }
// Client specifc
- case 'b':
+ case 'b': {
client_configuration.beta = std::stod(optarg);
options = 1;
break;
- case 'd':
+ }
+ case 'd': {
client_configuration.drop_factor = std::stod(optarg);
options = 1;
break;
- case 'W':
+ }
+ case 'W': {
client_configuration.window = std::stod(optarg);
options = 1;
break;
- case 'M':
+ }
+ case 'M': {
client_configuration.virtual_download = false;
options = 1;
break;
- case 'c':
+ }
+ case 'c': {
client_configuration.producer_certificate = std::string(optarg);
options = 1;
break;
- case 'v':
+ }
+ case 'v': {
client_configuration.verify = true;
options = 1;
break;
- case 'i':
+ }
+ case 'i': {
client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
options = 1;
break;
- case 'R':
- client_configuration.rtc_ = true;
- break;
+ }
// Server specific
- case 's':
+ case 'A': {
server_configuration.download_size = std::stoul(optarg);
options = -1;
break;
- case 'r':
+ }
+ case 's': {
+ server_configuration.payload_size_ = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 'r': {
server_configuration.virtual_producer = false;
options = -1;
break;
- case 'm':
+ }
+ case 'm': {
server_configuration.manifest = true;
options = -1;
break;
- case 'l':
+ }
+ case 'l': {
server_configuration.live_production = true;
options = -1;
break;
- case 'k':
+ }
+ case 'k': {
server_configuration.keystore_name = std::string(optarg);
server_configuration.sign = true;
options = -1;
break;
- case 'y':
+ }
+ case 'y': {
if (strncasecmp(optarg, "sha256", 6) == 0) {
server_configuration.hash_algorithm = HashAlgorithm::SHA_256;
} else if (strncasecmp(optarg, "sha512", 6) == 0) {
@@ -678,14 +790,27 @@ int main(int argc, char *argv[]) {
}
options = -1;
break;
- case 'p':
+ }
+ case 'p': {
server_configuration.keystore_password = std::string(optarg);
options = -1;
break;
- case 'x':
+ }
+ case 'x': {
server_configuration.multiphase_produce_ = true;
options = -1;
break;
+ }
+ case 'B': {
+ auto str = std::string(optarg);
+ std::transform(str.begin(), str.end(), str.begin(), ::tolower);
+ std::cout << "---------------------------------------------------------"
+ "---------------------->"
+ << str << std::endl;
+ server_configuration.production_rate_ = str;
+ options = -1;
+ break;
+ }
case 'h':
default:
usage();