aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_producer.h
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-02-04 11:06:18 +0100
committerAlberto Compagno <acompagn+fdio@cisco.com>2019-03-05 09:56:19 +0000
commit6d7704c1b497341fd6dd3c27e3f64d0db062ccc2 (patch)
tree668c6820653cd84da8474d330d2807a8765f96b5 /libtransport/src/hicn/transport/interfaces/socket_producer.h
parentca66305af16e2f8d8f271218ea71f132e6c21916 (diff)
[HICN-11] Rework on transport protocols improving components modularity
Change-Id: I6683ec5b494238dc93591c103d25275e89b9f267 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.h')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h585
1 files changed, 454 insertions, 131 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index d51464b77..4f38fb30e 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -18,7 +18,6 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/utils/content_store.h>
#include <hicn/transport/utils/event_thread.h>
-#include <hicn/transport/utils/sharable_vector.h>
#include <atomic>
#include <cmath>
@@ -26,8 +25,6 @@
#include <queue>
#include <thread>
-#define PUSH_API 1
-
#define REGISTRATION_NOT_ATTEMPTED 0
#define REGISTRATION_SUCCESS 1
#define REGISTRATION_FAILURE 2
@@ -56,8 +53,7 @@ class ProducerSocket : public Socket<BasePortal>,
void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size);
- void asyncProduce(const Name &suffix,
- utils::SharableVector<uint8_t> &&output_buffer);
+ void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer);
void asyncProduce(ContentObject &content_object);
@@ -75,125 +71,459 @@ class ProducerSocket : public Socket<BasePortal>,
onInterest(*interest);
};
- int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- double socket_option_value) override;
-
- int setSocketOption(int socket_option_key, bool socket_option_value) override;
-
- int setSocketOption(int socket_option_key, Name socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- std::list<Prefix> socket_option_value) override;
-
- int setSocketOption(
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ if (socket_option_value < default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ data_packet_size_ = socket_option_value;
+ break;
+ }
+
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ if (socket_option_value >= 1) {
+ input_buffer_capacity_ = socket_option_value;
+ break;
+ }
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_.setLimit(socket_option_value);
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ if (socket_option_value == SOCKET_OPTION_DEFAULT) {
+ signature_type_ = SHA_256;
+ } else {
+ signature_type_ = socket_option_value;
+ }
+
+ if (signature_type_ == SHA_256 || signature_type_ == RSA_256) {
+ signature_size_ = 32;
+ }
+
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_input_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_dropped_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_inserted_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_process_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_new_segment_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_in_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_output_ = VOID_HANDLER;
+ break;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
+ Name *socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, std::list<Prefix> socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ served_namespaces_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key,
- ProducerContentObjectCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ProducerInterestCallback socket_option_value) override;
-
- int setSocketOption(
+ ProducerContentObjectCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ on_new_segment_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ on_content_object_in_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ on_content_object_output_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, HashAlgorithm socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ hash_algorithm_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, utils::CryptoSuite socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CRYPTO_SUITE:
+ crypto_suite_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) override;
-
- int setSocketOption(
+ const std::shared_ptr<utils::Identity> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY:
+ identity_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, const std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ ;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_capacity_;
+ break;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_.getLimit();
+ break;
+
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ socket_option_value = data_packet_size_;
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ socket_option_value = content_object_expiry_time_;
+ break;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ socket_option_value = signature_type_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, std::list<Prefix> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ socket_option_value = served_namespaces_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key,
- ConsumerContentObjectCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ConsumerInterestCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ConsumerContentCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ConsumerManifestCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key, IcnObserver *obs) override;
-
- int setSocketOption(int socket_option_key,
- HashAlgorithm socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- utils::CryptoSuite socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- const utils::Identity &socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- const std::string &socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ConsumerTimerCallback socket_option_value) override;
-
- int setSocketOption(int socket_option_key,
- ProducerContentCallback socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- double &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- bool &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- Name &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- std::list<Prefix> &socket_option_value) override;
-
- int getSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ProducerInterestCallback &socket_option_value) override;
-
- int getSocketOption(
+ ProducerContentObjectCallback **socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ *socket_option_value = &on_new_segment_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ *socket_option_value = &on_content_object_in_output_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ *socket_option_value = &on_content_object_output_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, ProducerContentCallback **socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ *socket_option_value = &on_content_produced_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, ProducerInterestCallback **socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ *socket_option_value = &on_interest_input_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ *socket_option_value = &on_interest_dropped_input_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ *socket_option_value = &on_interest_inserted_input_buffer_;
+ break;
+
+ case CACHE_HIT:
+ *socket_option_value = &on_interest_satisfied_output_buffer_;
+ break;
+
+ case CACHE_MISS:
+ *socket_option_value = &on_interest_process_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ ;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, HashAlgorithm &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = hash_algorithm_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, utils::CryptoSuite &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = crypto_suite_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback &socket_option_value) override;
-
- int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ConsumerInterestCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ConsumerContentCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ConsumerManifestCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- std::shared_ptr<Portal> &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- IcnObserver **socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- HashAlgorithm &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- utils::CryptoSuite &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- utils::Identity &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- std::string &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ProducerContentCallback &socket_option_value) override;
-
- int getSocketOption(int socket_option_key,
- ConsumerTimerCallback &socket_option_value) override;
+ std::shared_ptr<utils::Identity> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY:
+ if (identity_) {
+ socket_option_value = identity_;
+ break;
+ }
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
protected:
asio::io_service internal_io_service_;
@@ -206,8 +536,6 @@ class ProducerSocket : public Socket<BasePortal>,
// buffers
utils::ContentStore output_buffer_;
- std::unique_ptr<utils::Identity> identity_;
-
private:
utils::EventThread async_thread_;
@@ -215,7 +543,8 @@ class ProducerSocket : public Socket<BasePortal>,
bool making_manifest_;
- // map for storing sequence numbers for several calls of the publish function
+ // map for storing sequence numbers for several calls of the publish
+ // function
std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
int signature_type_;
@@ -223,8 +552,7 @@ class ProducerSocket : public Socket<BasePortal>,
HashAlgorithm hash_algorithm_;
utils::CryptoSuite crypto_suite_;
- // std::unique_ptr<utils::Identity> identity_;
- // utils::Signer& signer_;
+ std::shared_ptr<utils::Identity> identity_;
// buffers
@@ -232,11 +560,6 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic_size_t input_buffer_capacity_;
std::atomic_size_t input_buffer_size_;
-#ifndef PUSH_API
- std::mutex pending_interests_mtx_;
- std::unordered_map<Name, std::shared_ptr<const Interest>> pending_interests_;
-#endif
-
// threads
std::thread listening_thread_;
std::thread processing_thread_;
@@ -268,4 +591,4 @@ class ProducerSocket : public Socket<BasePortal>,
} // namespace interface
-} // end namespace transport
+} // namespace transport