summaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation/socket_producer.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation/socket_producer.h')
-rw-r--r--libtransport/src/implementation/socket_producer.h207
1 files changed, 134 insertions, 73 deletions
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 9daf79b9d..37151d497 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -40,6 +40,7 @@ namespace implementation {
using namespace core;
using namespace interface;
+using ProducerCallback = interface::ProducerSocket::Callback;
class ProducerSocket : public Socket {
private:
@@ -48,11 +49,14 @@ class ProducerSocket : public Socket {
: Socket(std::move(portal)),
producer_interface_(producer_socket),
data_packet_size_(default_values::content_object_packet_size),
+ max_segment_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
- async_thread_(),
- making_manifest_(false),
+ making_manifest_(default_values::manifest_capacity),
hash_algorithm_(auth::CryptoHashType::SHA256),
- suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
+ signer_(std::make_shared<auth::VoidSigner>()),
+ suffix_strategy_(std::make_shared<utils::IncrementalSuffixStrategy>(0)),
+ aggregated_data_(false),
+ fec_setting_(""),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -63,29 +67,30 @@ class ProducerSocket : public Socket {
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {
+ on_content_produced_(VOID_HANDLER),
+ application_callback_(nullptr) {
switch (protocol) {
case ProductionProtocolAlgorithms::RTC_PROD:
production_protocol_ =
- std::make_unique<protocol::RTCProductionProtocol>(this);
+ std::make_shared<protocol::RTCProductionProtocol>(this);
break;
case ProductionProtocolAlgorithms::BYTE_STREAM:
default:
production_protocol_ =
- std::make_unique<protocol::ByteStreamProductionProtocol>(this);
+ std::make_shared<protocol::ByteStreamProductionProtocol>(this);
break;
}
}
public:
ProducerSocket(interface::ProducerSocket *producer, int protocol)
- : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
+ : ProducerSocket(producer, protocol, core::Portal::createShared()) {
+ is_async_ = true;
+ }
ProducerSocket(interface::ProducerSocket *producer, int protocol,
- asio::io_service &io_service)
- : ProducerSocket(producer, protocol,
- std::make_shared<core::Portal>(io_service)) {
- is_async_ = true;
+ ::utils::EventThread &worker)
+ : ProducerSocket(producer, protocol, core::Portal::createShared(worker)) {
}
virtual ~ProducerSocket() {}
@@ -98,31 +103,9 @@ class ProducerSocket : public Socket {
producer_interface_ = producer_socket;
}
- void connect() override {
- portal_->connect(false);
- production_protocol_->start();
- }
-
- bool isRunning() override { return !production_protocol_->isRunning(); };
+ void connect() override { portal_->connect(false); }
- virtual void asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) {
- if (!async_thread_.stopped()) {
- auto a = buffer.release();
- async_thread_.add([this, content_name, a, is_last, offset,
- last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment = offset + produceStream(content_name, std::move(buf),
- is_last, offset);
- } else {
- produceStream(content_name, std::move(buf), is_last, offset);
- }
- });
- }
- }
+ bool isRunning() override { return production_protocol_->isRunning(); };
virtual uint32_t produceStream(const Name &content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
@@ -156,12 +139,38 @@ class ProducerSocket : public Socket {
production_protocol_->produce(content_object);
}
+ void sendMapme() { production_protocol_->sendMapme(); }
+
void registerPrefix(const Prefix &producer_namespace) {
- production_protocol_->registerNamespaceWithNetwork(producer_namespace);
+ portal_->registerRoute(producer_namespace);
}
+ void start() { production_protocol_->start(); }
void stop() { production_protocol_->stop(); }
+ using Socket::getSocketOption;
+ using Socket::setSocketOption;
+
+ virtual int setSocketOption(int socket_option_key,
+ ProducerCallback *socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::PRODUCER_CALLBACK:
+ application_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
virtual int setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
switch (socket_option_key) {
@@ -172,6 +181,17 @@ class ProducerSocket : public Socket {
}
break;
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::MAX_SEGMENT_SIZE:
+ if (socket_option_value <= default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ max_segment_size_ = socket_option_value;
+ }
+ break;
+
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
production_protocol_->setOutputBufferSize(socket_option_value);
break;
@@ -260,8 +280,8 @@ class ProducerSocket : public Socket {
virtual int setSocketOption(int socket_option_key, bool socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- making_manifest_ = socket_option_value;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ aggregated_data_ = socket_option_value;
break;
default:
@@ -385,7 +405,7 @@ class ProducerSocket : public Socket {
virtual int setSocketOption(
int socket_option_key,
- core::NextSegmentCalculationStrategy socket_option_value) {
+ const std::shared_ptr<utils::SuffixStrategy> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SUFFIX_STRATEGY:
suffix_strategy_ = socket_option_value;
@@ -413,9 +433,33 @@ class ProducerSocket : public Socket {
return SOCKET_OPTION_SET;
}
+ int getSocketOption(int socket_option_key,
+ ProducerCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::PRODUCER_CALLBACK:
+ *socket_option_value = application_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
virtual int getSocketOption(int socket_option_key,
uint32_t &socket_option_value) {
switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ break;
+
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
socket_option_value = production_protocol_->getOutputBufferSize();
break;
@@ -424,6 +468,10 @@ class ProducerSocket : public Socket {
socket_option_value = (uint32_t)data_packet_size_;
break;
+ case GeneralTransportOptions::MAX_SEGMENT_SIZE:
+ socket_option_value = (uint32_t)max_segment_size_;
+ break;
+
case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
socket_option_value = content_object_expiry_time_;
break;
@@ -438,14 +486,14 @@ class ProducerSocket : public Socket {
virtual int getSocketOption(int socket_option_key,
bool &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- socket_option_value = making_manifest_;
- break;
-
case GeneralTransportOptions::ASYNC_MODE:
socket_option_value = is_async_;
break;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ socket_option_value = aggregated_data_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -547,21 +595,6 @@ class ProducerSocket : public Socket {
});
}
- virtual int getSocketOption(
- int socket_option_key,
- std::shared_ptr<core::Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- ;
- }
-
- return SOCKET_OPTION_GET;
- }
-
virtual int getSocketOption(int socket_option_key,
auth::CryptoHashType &socket_option_value) {
switch (socket_option_key) {
@@ -577,7 +610,7 @@ class ProducerSocket : public Socket {
virtual int getSocketOption(
int socket_option_key,
- core::NextSegmentCalculationStrategy &socket_option_value) {
+ std::shared_ptr<utils::SuffixStrategy> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SUFFIX_STRATEGY:
socket_option_value = suffix_strategy_;
@@ -603,9 +636,31 @@ class ProducerSocket : public Socket {
return SOCKET_OPTION_GET;
}
+ int getSocketOption(int socket_option_key, std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::FEC_TYPE:
+ socket_option_value = fec_setting_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
virtual int setSocketOption(int socket_option_key,
const std::string &socket_option_value) {
- return SOCKET_OPTION_NOT_SET;
+ int result = SOCKET_OPTION_NOT_SET;
+ switch (socket_option_key) {
+ case GeneralTransportOptions::FEC_TYPE:
+ fec_setting_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ return result;
}
// If the thread calling lambda_func is not the same of io_service, this
@@ -623,9 +678,9 @@ class ProducerSocket : public Socket {
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -655,9 +710,9 @@ class ProducerSocket : public Socket {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -677,20 +732,24 @@ class ProducerSocket : public Socket {
// Threads
protected:
interface::ProducerSocket *producer_interface_;
- asio::io_service io_service_;
std::atomic<size_t> data_packet_size_;
+ std::atomic<size_t> max_segment_size_;
std::atomic<uint32_t> content_object_expiry_time_;
- utils::EventThread async_thread_;
-
- std::atomic<bool> making_manifest_;
+ std::atomic<uint32_t> making_manifest_;
std::atomic<auth::CryptoHashType> hash_algorithm_;
std::atomic<auth::CryptoSuite> crypto_suite_;
utils::SpinLock signer_lock_;
std::shared_ptr<auth::Signer> signer_;
- core::NextSegmentCalculationStrategy suffix_strategy_;
+ std::shared_ptr<utils::SuffixStrategy> suffix_strategy_;
+
+ std::shared_ptr<protocol::ProductionProtocol> production_protocol_;
- std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
+ // RTC transport
+ bool aggregated_data_;
+
+ // FEC setting
+ std::string fec_setting_;
// callbacks
ProducerInterestCallback on_interest_input_;
@@ -706,6 +765,8 @@ class ProducerSocket : public Socket {
ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
ProducerContentCallback on_content_produced_;
+
+ ProducerCallback *application_callback_;
};
} // namespace implementation