aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation/socket_consumer.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation/socket_consumer.h')
-rw-r--r--libtransport/src/implementation/socket_consumer.h413
1 files changed, 193 insertions, 220 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 5e5073956..3117e21e7 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,15 +13,18 @@
* limitations under the License.
*/
+#pragma once
+
+#include <hicn/transport/auth/verifier.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/statistics.h>
-#include <hicn/transport/security/verifier.h>
#include <hicn/transport/utils/event_thread.h>
+#include <implementation/socket.h>
#include <protocols/cbr.h>
-#include <protocols/protocol.h>
#include <protocols/raaqm.h>
-#include <protocols/rtc.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/transport_protocol.h>
namespace transport {
namespace implementation {
@@ -30,13 +33,13 @@ using namespace core;
using namespace interface;
using ReadCallback = interface::ConsumerSocket::ReadCallback;
-class ConsumerSocket : public Socket<BasePortal> {
+class ConsumerSocket : public Socket {
private:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
- std::shared_ptr<Portal> &&portal)
- : consumer_interface_(consumer),
- portal_(portal),
- async_downloader_(),
+ std::shared_ptr<core::Portal> &&portal)
+ : Socket(std::move(portal)),
+ consumer_interface_(consumer),
+ packet_format_(default_values::packet_format),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
max_window_size_(default_values::max_window_size),
@@ -54,51 +57,53 @@ class ConsumerSocket : public Socket<BasePortal> {
rate_estimation_observer_(nullptr),
rate_estimation_batching_parameter_(default_values::batch),
rate_estimation_choice_(0),
- is_async_(false),
- verifier_(std::make_shared<utils::Verifier>()),
+ manifest_factor_relevant_(default_values::manifest_factor_relevant),
+ manifest_factor_alert_(default_values::manifest_factor_alert),
+ verifier_(std::make_shared<auth::VoidVerifier>()),
verify_signature_(false),
- key_content_(false),
+ reset_window_(false),
on_interest_output_(VOID_HANDLER),
on_interest_timeout_(VOID_HANDLER),
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
- on_content_object_verification_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
+ on_fwd_strategy_(VOID_HANDLER),
+ on_rec_strategy_(VOID_HANDLER),
read_callback_(nullptr),
timer_interval_milliseconds_(0),
+ recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY),
+ aggregated_data_(false),
+ content_sharing_mode_(false),
+ aggregated_interests_(false),
guard_raaqm_params_() {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
transport_protocol_ =
- std::make_unique<protocol::CbrTransportProtocol>(this);
+ std::make_shared<protocol::CbrTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RTC:
transport_protocol_ =
- std::make_unique<protocol::RTCTransportProtocol>(this);
+ std::make_shared<protocol::rtc::RTCTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RAAQM:
default:
transport_protocol_ =
- std::make_unique<protocol::RaaqmTransportProtocol>(this);
+ std::make_shared<protocol::RaaqmTransportProtocol>(this);
break;
}
}
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
- : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {}
+ : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {}
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
- asio::io_service &io_service)
- : ConsumerSocket(consumer, protocol,
- std::make_shared<Portal>(io_service)) {
+ ::utils::EventThread &worker)
+ : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) {
is_async_ = true;
}
- ~ConsumerSocket() {
- stop();
- async_downloader_.stop();
- }
+ ~ConsumerSocket() { stop(); }
interface::ConsumerSocket *getInterface() {
return consumer_interface_;
@@ -122,23 +127,9 @@ class ConsumerSocket : public Socket<BasePortal> {
transport_protocol_->start();
- return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED;
- }
-
- virtual int asyncConsume(const Name &name) {
- if (!async_downloader_.stopped()) {
- async_downloader_.add([this, name]() {
- network_name_ = std::move(name);
- network_name_.setSuffix(0);
- transport_protocol_->start();
- });
- }
-
return CONSUMER_RUNNING;
}
- bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); }
-
void stop() {
if (transport_protocol_->isRunning()) {
transport_protocol_->stop();
@@ -151,7 +142,8 @@ class ConsumerSocket : public Socket<BasePortal> {
}
}
- asio::io_service &getIoService() { return portal_->getIoService(); }
+ using Socket::getSocketOption;
+ using Socket::setSocketOption;
virtual int setSocketOption(int socket_option_key,
ReadCallback *socket_option_value) {
@@ -269,6 +261,23 @@ class ConsumerSocket : public Socket<BasePortal> {
timer_interval_milliseconds_ = socket_option_value;
break;
+ case RtcTransportOptions::RECOVERY_STRATEGY:
+ recovery_strategy_ =
+ (RtcTransportRecoveryStrategies)socket_option_value;
+ break;
+
+ case MANIFEST_FACTOR_RELEVANT:
+ manifest_factor_relevant_ = socket_option_value;
+ break;
+
+ case MANIFEST_FACTOR_ALERT:
+ manifest_factor_alert_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::PACKET_FORMAT:
+ packet_format_ = socket_option_value;
+ break;
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -314,13 +323,6 @@ class ConsumerSocket : public Socket<BasePortal> {
on_content_object_input_ = VOID_HANDLER;
break;
}
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_verification_ = VOID_HANDLER;
- break;
- }
-
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -333,13 +335,23 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- verify_signature_ = socket_option_value;
+ case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET:
+ reset_window_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ case RtcTransportOptions::AGGREGATED_DATA:
+ aggregated_data_ = socket_option_value;
result = SOCKET_OPTION_SET;
break;
- case GeneralTransportOptions::KEY_CONTENT:
- key_content_ = socket_option_value;
+ case RtcTransportOptions::CONTENT_SHARING_MODE:
+ content_sharing_mode_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ case RtcTransportOptions::AGGREGATED_INTERESTS:
+ aggregated_interests_ = socket_option_value;
result = SOCKET_OPTION_SET;
break;
@@ -371,29 +383,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- on_content_object_verification_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
- }
-
int setSocketOption(int socket_option_key,
ConsumerInterestCallback socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -427,51 +416,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value) {
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- verification_failed_callback_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- });
- }
-
- // int setSocketOption(
- // int socket_option_key,
- // ConsumerContentObjectVerificationFailedCallback socket_option_value) {
- // return rescheduleOnIOService(
- // socket_option_key, socket_option_value,
- // [this](
- // int socket_option_key,
- // ConsumerContentObjectVerificationFailedCallback
- // socket_option_value)
- // -> int {
- // switch (socket_option_key) {
- // case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- // verification_failed_callback_ = socket_option_value;
- // break;
-
- // default:
- // return SOCKET_OPTION_NOT_SET;
- // }
-
- // return SOCKET_OPTION_SET;
- // });
- // }
-
int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
@@ -488,45 +432,49 @@ class ConsumerSocket : public Socket<BasePortal> {
int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value) {
- int result = SOCKET_OPTION_NOT_SET;
+ const std::shared_ptr<auth::Signer> &socket_option_value) {
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::SIGNER:
+ signer_.reset();
+ signer_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+ }
+ return SOCKET_OPTION_SET;
+ }
+
+ int setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<auth::Verifier> &socket_option_value) {
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
case GeneralTransportOptions::VERIFIER:
verifier_.reset();
verifier_ = socket_option_value;
- result = SOCKET_OPTION_SET;
break;
default:
- return result;
+ return SOCKET_OPTION_NOT_SET;
}
}
-
- return result;
+ return SOCKET_OPTION_SET;
}
int setSocketOption(int socket_option_key,
const std::string &socket_option_value) {
int result = SOCKET_OPTION_NOT_SET;
- if (!transport_protocol_->isRunning()) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CERTIFICATE:
- key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
-
- if (key_id_ != nullptr) {
- result = SOCKET_OPTION_SET;
- }
- break;
-
- case DataLinkOptions::OUTPUT_INTERFACE:
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ if (!transport_protocol_->isRunning()) {
output_interface_ = socket_option_value;
portal_->setOutputInterface(output_interface_);
result = SOCKET_OPTION_SET;
- break;
-
- default:
- return result;
- }
+ }
+ break;
+ default:
+ return result;
}
return result;
}
@@ -552,6 +500,29 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
+ int setSocketOption(int socket_option_key,
+ StrategyCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ StrategyCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE:
+ on_fwd_strategy_ = socket_option_value;
+ break;
+ case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE:
+ on_rec_strategy_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
int getSocketOption(int socket_option_key, double &socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
@@ -623,6 +594,22 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = timer_interval_milliseconds_;
break;
+ case RtcTransportOptions::RECOVERY_STRATEGY:
+ socket_option_value = recovery_strategy_;
+ break;
+
+ case GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT:
+ socket_option_value = manifest_factor_relevant_;
+ break;
+
+ case GeneralTransportOptions::MANIFEST_FACTOR_ALERT:
+ socket_option_value = manifest_factor_alert_;
+ break;
+
+ case GeneralTransportOptions::PACKET_FORMAT:
+ socket_option_value = packet_format_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -636,16 +623,24 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- socket_option_value = verify_signature_;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
break;
- case GeneralTransportOptions::KEY_CONTENT:
- socket_option_value = key_content_;
+ case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET:
+ socket_option_value = reset_window_;
break;
- case GeneralTransportOptions::ASYNC_MODE:
- socket_option_value = is_async_;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ socket_option_value = aggregated_data_;
+ break;
+
+ case RtcTransportOptions::CONTENT_SHARING_MODE:
+ socket_option_value = content_sharing_mode_;
+ break;
+
+ case RtcTransportOptions::AGGREGATED_INTERESTS:
+ socket_option_value = aggregated_interests_;
break;
default:
@@ -689,29 +684,6 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value)
- -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- *socket_option_value = &on_content_object_verification_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
- }
-
int getSocketOption(int socket_option_key,
ConsumerInterestCallback **socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -745,33 +717,12 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
- int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback **socket_option_value) {
- // Reschedule the function on the io_service to avoid race condition in
- // case setSocketOption is called while the io_service is running.
- return rescheduleOnIOService(
- socket_option_key, socket_option_value,
- [this](int socket_option_key,
- ConsumerContentObjectVerificationFailedCallback *
- *socket_option_value) -> int {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::VERIFICATION_FAILED:
- *socket_option_value = &verification_failed_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- });
- }
-
int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) {
+ IcnObserver **socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
+ case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
+ *socket_option_value = (rate_estimation_observer_);
break;
default:
@@ -782,22 +733,19 @@ class ConsumerSocket : public Socket<BasePortal> {
}
int getSocketOption(int socket_option_key,
- IcnObserver **socket_option_value) {
- utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ std::shared_ptr<auth::Signer> &socket_option_value) {
switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- *socket_option_value = (rate_estimation_observer_);
- break;
+ case GeneralTransportOptions::SIGNER:
+ socket_option_value = signer_;
+ return SOCKET_OPTION_GET;
default:
return SOCKET_OPTION_NOT_GET;
}
-
- return SOCKET_OPTION_GET;
}
int getSocketOption(int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value) {
+ std::shared_ptr<auth::Verifier> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::VERIFIER:
socket_option_value = verifier_;
@@ -805,7 +753,6 @@ class ConsumerSocket : public Socket<BasePortal> {
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
@@ -817,7 +764,6 @@ class ConsumerSocket : public Socket<BasePortal> {
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
@@ -854,6 +800,29 @@ class ConsumerSocket : public Socket<BasePortal> {
});
}
+ int getSocketOption(int socket_option_key,
+ StrategyCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ StrategyCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE:
+ *socket_option_value = &on_fwd_strategy_;
+ break;
+ case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE:
+ *socket_option_value = &on_rec_strategy_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
protected:
template <typename Lambda, typename arg2>
int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
@@ -861,14 +830,14 @@ class ConsumerSocket : public Socket<BasePortal> {
// To enforce type check
std::function<int(int, arg2)> func = lambda_func;
int result = SOCKET_OPTION_SET;
- if (transport_protocol_->isRunning()) {
+ if (transport_protocol_ && transport_protocol_->isRunning()) {
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -888,14 +857,12 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- std::shared_ptr<Portal> portal_;
- utils::EventThread async_downloader_;
-
// No need to protect from multiple accesses in the async consumer
// The parameter is accessible only with a getSocketOption and
// set from the consume
Name network_name_;
+ hicn_packet_format_t packet_format_;
int interest_lifetime_;
double min_window_size_;
@@ -916,36 +883,42 @@ class ConsumerSocket : public Socket<BasePortal> {
int rate_estimation_batching_parameter_;
int rate_estimation_choice_;
- bool is_async_;
-
// Verification parameters
- std::shared_ptr<utils::Verifier> verifier_;
- PARCKeyId *key_id_;
+ uint32_t manifest_factor_relevant_;
+ uint32_t manifest_factor_alert_;
+ std::shared_ptr<auth::Verifier> verifier_;
+ transport::auth::KeyId *key_id_;
std::atomic_bool verify_signature_;
- bool key_content_;
+ bool reset_window_;
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
ConsumerInterestCallback on_interest_timeout_;
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
- ConsumerContentObjectVerificationCallback on_content_object_verification_;
ConsumerTimerCallback stats_summary_;
- ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
+ StrategyCallback on_fwd_strategy_;
+ StrategyCallback on_rec_strategy_;
ReadCallback *read_callback_;
uint32_t timer_interval_milliseconds_;
// Transport protocol
- std::unique_ptr<protocol::TransportProtocol> transport_protocol_;
+ std::shared_ptr<protocol::TransportProtocol> transport_protocol_;
// Statistic
TransportStatistics stats_;
+ // RTC protocol
+ RtcTransportRecoveryStrategies recovery_strategy_;
+ bool aggregated_data_;
+ bool content_sharing_mode_;
+ bool aggregated_interests_;
+
utils::SpinLock guard_raaqm_params_;
std::string output_interface_;
};
} // namespace implementation
-} // namespace transport \ No newline at end of file
+} // namespace transport