aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc71
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h10
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc73
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h16
-rw-r--r--utils/src/hiperf.cc418
5 files changed, 368 insertions, 220 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 00cc82543..67fcc83e3 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -24,6 +24,11 @@
#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
#define STATS_INTERVAL_DURATION 500 // ms
#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+#define INACTIVE_TIME 100 // ms opus generates ~50 packets per seocnd, one
+ // every
+// 20ms. to be safe we use 20ms*5 as timer for an
+// inactive socket
+#define MILLI_IN_A_SEC 1000 // ms in a second
// NACK HEADER
// +-----------------------------------------+
@@ -46,11 +51,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
producedPackets_(0),
bytesProductionRate_(0),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ active_(false) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
nack_->appendPayload(std::move(nack_payload));
- lastStats_ = std::chrono::steady_clock::now();
+ lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
}
@@ -63,11 +71,14 @@ RTCProducerSocket::RTCProducerSocket()
producedPackets_(0),
bytesProductionRate_(0),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ active_(false) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
nack_->appendPayload(std::move(nack_payload));
- lastStats_ = std::chrono::steady_clock::now();
+ lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
}
@@ -92,16 +103,15 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::updateStats(uint32_t packet_size) {
+void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) {
producedBytes_ += packet_size;
producedPackets_++;
- std::chrono::steady_clock::duration duration =
- std::chrono::steady_clock::now() - lastStats_;
- if (std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() >=
- STATS_INTERVAL_DURATION) {
- lastStats_ = std::chrono::steady_clock::now();
+ uint64_t duration = now - lastStats_;
+ if (duration >= STATS_INTERVAL_DURATION) {
+ lastStats_ = now;
bytesProductionRate_ = producedBytes_ * perSecondFactor_;
packetsProductionRate_ = producedPackets_ * perSecondFactor_;
+ if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
producedBytes_ = 0;
producedPackets_ = 0;
}
@@ -117,17 +127,20 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
return;
}
- updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN));
+ active_ = true;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
- ContentObject content_object(flowName_.setSuffix(currentSeg_));
+ lastProduced_ = now;
+
+ updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
- uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
+ ContentObject content_object(flowName_.setSuffix(currentSeg_));
auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN);
- memcpy(payload->writableData(), &timestamp, TIMESTAMP_LEN);
+ memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size);
payload->append(buffer_size + TIMESTAMP_LEN);
content_object.appendPayload(std::move(payload));
@@ -149,23 +162,41 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
on_interest_input_(*this, *interest);
}
- // packetsProductionRate_ is modified by another thread in updateStats
- // this should be safe since I just read here.
+ if (active_.load()) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (now - lastProduced_.load() >= INACTIVE_TIME) {
+ active_ = false;
+ }
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!active_.load())) {
+ sendNack(*interest);
+ return;
+ }
+
max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
1000.0) *
- (double)packetsProductionRate_));
+ (double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
sendNack(*interest);
}
+ // else drop packet
}
void RTCProducerSocket::sendNack(const Interest &interest) {
nack_->setName(interest.getName());
uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data();
*payload_ptr = currentSeg_;
- *(++payload_ptr) = bytesProductionRate_;
+
+ if (active_.load()) {
+ *(++payload_ptr) = bytesProductionRate_;
+ } else {
+ *(++payload_ptr) = 0;
+ }
nack_->setLifetime(0);
nack_->setPathLabel(prodLabel_);
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index bc54be4bb..cb09ef991 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -18,6 +18,7 @@
#include <hicn/transport/interfaces/socket_producer.h>
#include <hicn/transport/utils/content_store.h>
+#include <atomic>
#include <map>
#include <mutex>
@@ -41,7 +42,7 @@ class RTCProducerSocket : public ProducerSocket {
private:
void sendNack(const Interest &interest);
- void updateStats(uint32_t packet_size);
+ void updateStats(uint32_t packet_size, uint64_t now);
// std::map<uint32_t, uint64_t> pendingInterests_;
uint32_t currentSeg_;
@@ -53,9 +54,12 @@ class RTCProducerSocket : public ProducerSocket {
uint32_t producedBytes_;
uint32_t producedPackets_;
uint32_t bytesProductionRate_;
- uint32_t packetsProductionRate_;
+ std::atomic<uint32_t> packetsProductionRate_;
uint32_t perSecondFactor_;
- std::chrono::steady_clock::time_point lastStats_;
+ uint64_t lastStats_;
+ // std::chrono::steady_clock::time_point lastProduced_;
+ std::atomic<uint64_t> lastProduced_;
+ std::atomic<bool> active_;
};
} // namespace interface
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index a993d596b..98abbe35b 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -38,6 +38,8 @@ RTCTransportProtocol::RTCTransportProtocol(
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
icnet_socket->getSocketOption(PORTAL, portal_);
+ nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ nack_timer_used_ = false;
reset();
}
@@ -211,8 +213,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
auto it = pathTable_.find(producerPathLabel_);
if (it == pathTable_.end()) return;
- // double maxAvgRTT = it->second->getAverageRtt();
- // double minRTT = it->second->getMinRtt();
minRtt_ = it->second->getMinRtt();
queuingDelay_ = it->second->getQueuingDealy();
@@ -222,22 +222,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
it->second->roundEnd();
}
- // this is inefficient but the window is supposed to be small, so it
- // probably makes sense to leave it like this
- // if(minRTT == 0)
- // minRTT = 1;
-
- // minRTTwin_[roundCounter_ % MIN_RTT_WIN] = minRTT;
- // minRtt_ = minRTT;
- // for (int i = 0; i < MIN_RTT_WIN; i++)
- // if(minRtt_ > minRTTwin_[i])
- // minRtt_ = minRTTwin_[i];
-
- // roundCounter_++;
-
- // std::cout << "min RTT " << minRtt_ << " queuing " << queuingDelay_ <<
- // std::endl;
-
if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
double lossRate = (double)((double)packetLost_ / (double)sentInterest_);
lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
@@ -473,23 +457,45 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
scheduleNextInterests();
}
+bool RTCTransportProtocol::checkIfProducerIsActive(
+ const ContentObject &content_object) {
+ uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
+ uint32_t productionSeg = *payload;
+ uint32_t productionRate = *(++payload);
+
+ if (productionRate == 0) {
+ // the producer socket is not active
+ // in this case we consider only the first nack
+ if (nack_timer_used_) return false;
+
+ nack_timer_used_ = true;
+ // actualSegment_ should be the one in the nack, which will the next in
+ // production
+ actualSegment_ = productionSeg;
+ // all the rest (win size should not change)
+ // we wait a bit before pull the socket again
+ nack_timer_->expires_from_now(std::chrono::milliseconds(500));
+ nack_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ nack_timer_used_ = false;
+ scheduleNextInterests();
+ });
+ return false;
+ }
+
+ return true;
+}
+
void RTCTransportProtocol::onNack(const ContentObject &content_object) {
uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
uint32_t nackSegment = content_object.getName().getSuffix();
+ gotNack_ = true;
// we synch the estimated production rate with the actual one
estimatedBw_ = (double)productionRate;
- // if(inflightInterests_[segmentNumber %
- // default_values::default_buffer_size].retransmissions != 0){ ignore nacks
- // for retransmissions
- // return;
- //}
-
- gotNack_ = true;
-
if (productionSeg > nackSegment) {
// we are asking for stuff produced in the past
actualSegment_ = max(productionSeg + 1, actualSegment_);
@@ -535,14 +541,21 @@ void RTCTransportProtocol::onContentObject(
uint32_t payload_size = (uint32_t)payload->length();
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
+ bool schedule_next_interest = true;
if (payload_size == HICN_NACK_HEADER_SIZE) {
- // Nacks always come form the producer, so we set the producerePathLabel_;
+ // Nacks always come form the producer, so we set the producerPathLabel_;
producerPathLabel_ = content_object->getPathLabel();
if (inflightInterests_[pkt].retransmissions == 0) {
+ // discard nacks for rtx packets
inflightInterestsCount_--;
- onNack(*content_object);
+ schedule_next_interest = checkIfProducerIsActive(*content_object);
+ // if checkIfProducerIsActive returns true, we did all we need to do
+ // inside that function, no need to call onNack
+ if (!schedule_next_interest) onNack(*content_object);
updateDelayStats(*content_object);
+ } else {
+ schedule_next_interest = checkIfProducerIsActive(*content_object);
}
} else {
@@ -564,7 +577,9 @@ void RTCTransportProtocol::onContentObject(
increaseWindow();
}
- scheduleNextInterests();
+ if (schedule_next_interest) {
+ scheduleNextInterests();
+ }
}
void RTCTransportProtocol::returnContentToApplication(
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 58c143988..0bb9d9b2e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -121,6 +121,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
void scheduleNextInterests() override;
void scheduleAppNackRtx(std::vector<uint32_t> &nacks);
void onTimeout(Interest::Ptr &&interest) override;
+ // checkIfProducerIsActive: return true if we need to schedule an interest
+ // immediatly after, false otherwise (this happens when the producer socket
+ // is not active)
+ bool checkIfProducerIsActive(const ContentObject &content_object);
void onNack(const ContentObject &content_object);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
@@ -142,18 +146,11 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// controller var
std::chrono::steady_clock::time_point lastRoundBegin_;
- // bool allPacketsInSync_;
- // unsigned numberOfRoundsInSync_;
- // unsigned numberOfCatchUpRounds_;
- // bool catchUpPhase_;
unsigned currentState_;
- // uint32_t inProduction_;
-
// cwin var
uint32_t currentCWin_;
uint32_t maxCWin_;
- // uint32_t previousCWin_;
// names/packets var
uint32_t actualSegment_;
@@ -167,6 +164,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// application for pakets for which we already got a
// past NACK by the producer these packet are too old,
// they will never be retrived
+ bool nack_timer_used_;
+ std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule
+ // a nack retransmission in
+ // of inactive prod socket
uint32_t modMask_;
@@ -185,7 +186,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// vector
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
uint32_t roundCounter_;
- // std::vector<uint64_t> minRTTwin_;
uint64_t minRtt_;
// CC var
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 34cb79b3f..425b91bca 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -46,11 +46,17 @@ using Identity = utils::Identity;
struct ClientConfiguration {
ClientConfiguration()
- : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f),
- window(-1), virtual_download(true),
+ : name("b001::abcd", 0),
+ verify(false),
+ beta(-1.f),
+ drop_factor(-1.f),
+ window(-1),
+ virtual_download(true),
producer_certificate("/tmp/rsa_certificate.pem"),
receive_buffer(std::make_shared<std::vector<uint8_t>>()),
- download_size(0), report_interval_milliseconds_(1000), rtc_(false) {}
+ download_size(0),
+ report_interval_milliseconds_(1000),
+ rtc_(false) {}
Name name;
bool verify;
@@ -67,7 +73,7 @@ struct ClientConfiguration {
};
class Rate {
-public:
+ public:
Rate() : rate_kbps_(0) {}
Rate(const std::string &rate) {
@@ -97,19 +103,28 @@ public:
packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_)));
}
-private:
+ private:
float rate_kbps_;
};
struct ServerConfiguration {
ServerConfiguration()
- : name("b001::abcd/64"), virtual_producer(true), manifest(false),
- live_production(false), sign(false), content_lifetime(600000000_U32),
- content_object_size(1440), download_size(20 * 1024 * 1024),
+ : name("b001::abcd/64"),
+ virtual_producer(true),
+ manifest(false),
+ live_production(false),
+ sign(false),
+ content_lifetime(600000000_U32),
+ content_object_size(1440),
+ download_size(20 * 1024 * 1024),
hash_algorithm(HashAlgorithm::SHA_256),
keystore_name("/tmp/rsa_crypto_material.p12"),
- keystore_password("cisco"), multiphase_produce_(false), rtc_(false),
- production_rate_(std::string("2048kbps")), payload_size_(1400) {}
+ keystore_password("cisco"),
+ multiphase_produce_(false),
+ rtc_(false),
+ interactive_(false),
+ production_rate_(std::string("2048kbps")),
+ payload_size_(1400) {}
Prefix name;
bool virtual_producer;
@@ -124,6 +139,7 @@ struct ServerConfiguration {
std::string keystore_password;
bool multiphase_produce_;
bool rtc_;
+ bool interactive_;
Rate production_rate_;
std::size_t payload_size_;
};
@@ -132,10 +148,12 @@ class HIperfClient {
typedef std::chrono::time_point<std::chrono::steady_clock> Time;
typedef std::chrono::microseconds TimeDuration;
-public:
+ public:
HIperfClient(const ClientConfiguration &conf)
- : configuration_(conf), total_duration_milliseconds_(0),
- old_bytes_value_(0), signals_(io_service_, SIGINT) {}
+ : configuration_(conf),
+ total_duration_milliseconds_(0),
+ old_bytes_value_(0),
+ signals_(io_service_, SIGINT) {}
void processPayload(ConsumerSocket &c, std::size_t bytes_transferred,
const std::error_code &ec) {
@@ -154,6 +172,11 @@ public:
io_service_.stop();
}
+ void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred,
+ const std::error_code &ec) {
+ configuration_.receive_buffer->clear();
+ }
+
bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) {
if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) {
std::cout << "VERIFY CONTENT" << std::endl;
@@ -298,11 +321,19 @@ public:
return ERROR_SETUP;
}
- ret = consumer_socket_->setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HIperfClient::processPayload, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ if (!configuration_.rtc_) {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_RETRIEVED,
+ (ConsumerContentCallback)std::bind(
+ &HIperfClient::processPayload, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+ } else {
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_RETRIEVED,
+ (ConsumerContentCallback)std::bind(
+ &HIperfClient::processPayloadRtc, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
+ }
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
@@ -346,7 +377,7 @@ public:
return ERROR_SUCCESS;
}
-private:
+ private:
ClientConfiguration configuration_;
Time t_stats_;
Time t_download_;
@@ -360,13 +391,19 @@ private:
class HIperfServer {
const std::size_t log2_content_object_buffer_size = 8;
-public:
+ public:
HIperfServer(ServerConfiguration &conf)
- : configuration_(conf), signals_(io_service_, SIGINT),
+ : configuration_(conf),
+ signals_(io_service_, SIGINT),
rtc_timer_(io_service_),
content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
content_objects_index_(0),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) {
+ mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
+#ifndef _WIN32
+ input_(io_service_, ::dup(STDIN_FILENO)),
+ rtc_running_(false)
+#endif
+ {
std::string buffer(configuration_.payload_size_, 'X');
std::cout << "Producing contents under name " << conf.name.getName()
<< std::endl;
@@ -414,10 +451,9 @@ public:
<< std::endl;
}
- std::shared_ptr<utils::Identity>
- setProducerIdentity(std::string &keystore_name,
- std::string &keystore_password,
- HashAlgorithm &hash_algorithm) {
+ std::shared_ptr<utils::Identity> setProducerIdentity(
+ std::string &keystore_name, std::string &keystore_password,
+ HashAlgorithm &hash_algorithm) {
if (access(keystore_name.c_str(), F_OK) != -1) {
return std::make_shared<utils::Identity>(keystore_name, keystore_password,
hash_algorithm);
@@ -527,6 +563,36 @@ public:
}
}
+#ifndef _WIN32
+ void handleInput(const std::error_code &error, std::size_t length) {
+ if (error) {
+ producer_socket_->stop();
+ io_service_.stop();
+ }
+
+ if (rtc_running_) {
+ std::cout << "stop real time content production" << std::endl;
+ rtc_running_ = false;
+ rtc_timer_.cancel();
+ } else {
+ std::cout << "start real time content production" << std::endl;
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+
+ input_buffer_.consume(length); // Remove newline from input.
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&HIperfServer::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ }
+#endif
+
int run() {
std::cerr << "Starting to serve consumers" << std::endl;
@@ -537,12 +603,29 @@ public:
});
if (configuration_.rtc_) {
+#ifndef _WIN32
+ if (configuration_.interactive_) {
+ asio::async_read_until(
+ input_, input_buffer_, '\n',
+ std::bind(&HIperfServer::handleInput, this, std::placeholders::_1,
+ std::placeholders::_2));
+ } else {
+ rtc_running_ = true;
+ rtc_timer_.expires_from_now(
+ configuration_.production_rate_.getMicrosecondsForPacket(
+ configuration_.payload_size_));
+ rtc_timer_.async_wait(
+ std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
+ std::placeholders::_1));
+ }
+#else
rtc_timer_.expires_from_now(
configuration_.production_rate_.getMicrosecondsForPacket(
configuration_.payload_size_));
rtc_timer_.async_wait(
std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
std::placeholders::_1));
+#endif
}
io_service_.run();
@@ -550,7 +633,7 @@ public:
return ERROR_SUCCESS;
}
-private:
+ private:
ServerConfiguration configuration_;
asio::io_service io_service_;
asio::signal_set signals_;
@@ -559,6 +642,11 @@ private:
std::uint16_t content_objects_index_;
std::uint16_t mask_;
std::unique_ptr<ProducerSocket> producer_socket_;
+#ifndef _WIN32
+ asio::posix::stream_descriptor input_;
+ asio::streambuf input_buffer_;
+ bool rtc_running_;
+#endif
};
void usage() {
@@ -607,10 +695,16 @@ void usage() {
<< std::endl;
std::cerr << " <download_size> without "
"resetting the suffix to 0"
- << std::endl;
- std::cerr << "-B <bitrate> = bitrate for RTC "
+ << std::endl;
+ std::cerr << "-B <bitrate> = bitrate for RTC "
"producer, to be used with the -R option"
<< std::endl;
+#ifndef _WIN32
+ std::cerr << "-I = interactive mode,"
+ "start/stop real time content production "
+ "by pressing return. To be used with the -R option"
+ << std::endl;
+#endif
std::cerr << std::endl;
std::cerr << "Client specific:" << std::endl;
std::cerr << "-b <beta_parameter> = RAAQM beta parameter"
@@ -657,146 +751,150 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) !=
+ while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:I")) !=
-1) {
switch (opt) {
- // Common
- case 'D': {
- daemon = true;
- break;
- }
+ // Common
+ case 'D': {
+ daemon = true;
+ break;
+ }
+ case 'I': {
+ server_configuration.interactive_ = true;
+ break;
+ }
#else
while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) !=
-1) {
switch (opt) {
#endif
- case 'f': {
- log_file = optarg;
- break;
- }
- case 'R': {
- client_configuration.rtc_ = true;
- server_configuration.rtc_ = true;
- break;
- }
+ case 'f': {
+ log_file = optarg;
+ break;
+ }
+ case 'R': {
+ client_configuration.rtc_ = true;
+ server_configuration.rtc_ = true;
+ break;
+ }
- // Server or Client
- case 'S': {
- role -= 1;
- break;
- }
- case 'C': {
- role += 1;
- break;
- }
+ // Server or Client
+ case 'S': {
+ role -= 1;
+ break;
+ }
+ case 'C': {
+ role += 1;
+ break;
+ }
- // Client specifc
- case 'b': {
- client_configuration.beta = std::stod(optarg);
- options = 1;
- break;
- }
- case 'd': {
- client_configuration.drop_factor = std::stod(optarg);
- options = 1;
- break;
- }
- case 'W': {
- client_configuration.window = std::stod(optarg);
- options = 1;
- break;
- }
- case 'M': {
- client_configuration.virtual_download = false;
- options = 1;
- break;
- }
- case 'c': {
- client_configuration.producer_certificate = std::string(optarg);
- options = 1;
- break;
- }
- case 'v': {
- client_configuration.verify = true;
- options = 1;
- break;
- }
- case 'i': {
- client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
- options = 1;
- break;
- }
+ // Client specifc
+ case 'b': {
+ client_configuration.beta = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'd': {
+ client_configuration.drop_factor = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'W': {
+ client_configuration.window = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'M': {
+ client_configuration.virtual_download = false;
+ options = 1;
+ break;
+ }
+ case 'c': {
+ client_configuration.producer_certificate = std::string(optarg);
+ options = 1;
+ break;
+ }
+ case 'v': {
+ client_configuration.verify = true;
+ options = 1;
+ break;
+ }
+ case 'i': {
+ client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
- // Server specific
- case 'A': {
- server_configuration.download_size = std::stoul(optarg);
- options = -1;
- break;
- }
- case 's': {
- server_configuration.payload_size_ = std::stoul(optarg);
- options = -1;
- break;
- }
- case 'r': {
- server_configuration.virtual_producer = false;
- options = -1;
- break;
- }
- case 'm': {
- server_configuration.manifest = true;
- options = -1;
- break;
- }
- case 'l': {
- server_configuration.live_production = true;
- options = -1;
- break;
- }
- case 'k': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.sign = true;
- options = -1;
- break;
- }
- case 'y': {
- if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = HashAlgorithm::SHA_256;
- } else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = HashAlgorithm::SHA_512;
- } else if (strncasecmp(optarg, "crc32", 5) == 0) {
- server_configuration.hash_algorithm = HashAlgorithm::CRC32C;
- } else {
- std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
- << std::endl;
+ // Server specific
+ case 'A': {
+ server_configuration.download_size = std::stoul(optarg);
+ options = -1;
+ break;
}
- options = -1;
- break;
- }
- case 'p': {
- server_configuration.keystore_password = std::string(optarg);
- options = -1;
- break;
- }
- case 'x': {
- server_configuration.multiphase_produce_ = true;
- options = -1;
- break;
- }
- case 'B': {
- auto str = std::string(optarg);
- std::transform(str.begin(), str.end(), str.begin(), ::tolower);
- std::cout << "---------------------------------------------------------"
- "---------------------->"
- << str << std::endl;
- server_configuration.production_rate_ = str;
- options = -1;
- break;
- }
- case 'h':
- default:
- usage();
- return EXIT_FAILURE;
+ case 's': {
+ server_configuration.payload_size_ = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 'r': {
+ server_configuration.virtual_producer = false;
+ options = -1;
+ break;
+ }
+ case 'm': {
+ server_configuration.manifest = true;
+ options = -1;
+ break;
+ }
+ case 'l': {
+ server_configuration.live_production = true;
+ options = -1;
+ break;
+ }
+ case 'k': {
+ server_configuration.keystore_name = std::string(optarg);
+ server_configuration.sign = true;
+ options = -1;
+ break;
+ }
+ case 'y': {
+ if (strncasecmp(optarg, "sha256", 6) == 0) {
+ server_configuration.hash_algorithm = HashAlgorithm::SHA_256;
+ } else if (strncasecmp(optarg, "sha512", 6) == 0) {
+ server_configuration.hash_algorithm = HashAlgorithm::SHA_512;
+ } else if (strncasecmp(optarg, "crc32", 5) == 0) {
+ server_configuration.hash_algorithm = HashAlgorithm::CRC32C;
+ } else {
+ std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
+ << std::endl;
+ }
+ options = -1;
+ break;
+ }
+ case 'p': {
+ server_configuration.keystore_password = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'x': {
+ server_configuration.multiphase_produce_ = true;
+ options = -1;
+ break;
+ }
+ case 'B': {
+ auto str = std::string(optarg);
+ std::transform(str.begin(), str.end(), str.begin(), ::tolower);
+ std::cout << "---------------------------------------------------------"
+ "---------------------->"
+ << str << std::endl;
+ server_configuration.production_rate_ = str;
+ options = -1;
+ break;
+ }
+ case 'h':
+ default:
+ usage();
+ return EXIT_FAILURE;
}
}
@@ -875,9 +973,9 @@ int main(int argc, char *argv[]) {
return 0;
}
-} // end namespace interface
+} // end namespace interface
-} // end namespace transport
+} // end namespace transport
int main(int argc, char *argv[]) {
return transport::interface::main(argc, argv);