diff options
Diffstat (limited to 'libtransport/src')
19 files changed, 232 insertions, 92 deletions
diff --git a/libtransport/src/hicn/transport/CMakeLists.txt b/libtransport/src/hicn/transport/CMakeLists.txt index 5fe101c18..6e0ae5b88 100644 --- a/libtransport/src/hicn/transport/CMakeLists.txt +++ b/libtransport/src/hicn/transport/CMakeLists.txt @@ -54,11 +54,11 @@ else () set(CMAKE_SHARED_LINKER_FLAGS "/NODEFAULTLIB:\"MSVCRTD\"" ) endif () endif () -if (ANDROID_API) +if (${CMAKE_SYSTEM_NAME} STREQUAL "Android") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -isystem -lm") endif() -if (ANDROID_API) +if (${CMAKE_SYSTEM_NAME} STREQUAL "Android") build_library(${LIBTRANSPORT} STATIC SOURCES ${SOURCE_FILES} ${HEADER_FILES} diff --git a/libtransport/src/hicn/transport/core/content_object.cc b/libtransport/src/hicn/transport/core/content_object.cc index 83b545c05..d05239372 100644 --- a/libtransport/src/hicn/transport/core/content_object.cc +++ b/libtransport/src/hicn/transport/core/content_object.cc @@ -22,6 +22,7 @@ extern "C" { TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") #endif #include <hicn/hicn.h> +#include <hicn/util/ip_address.h> } #include <cstring> @@ -153,7 +154,7 @@ ContentObject &ContentObject::setPathLabel(uint32_t path_label) { return *this; } -void ContentObject::setLocator(const ip_address_t &ip_address) { +void ContentObject::setLocator(const ip_prefix_t &ip_address) { if (hicn_data_set_locator(format_, packet_start_, &ip_address) < 0) { throw errors::RuntimeException("Error setting content object locator"); } @@ -161,8 +162,8 @@ void ContentObject::setLocator(const ip_address_t &ip_address) { return; } -ip_address_t ContentObject::getLocator() const { - ip_address_t ip; +ip_prefix_t ContentObject::getLocator() const { + ip_prefix_t ip; if (hicn_data_get_locator(format_, packet_start_, &ip) < 0) { throw errors::RuntimeException("Error getting content object locator."); diff --git a/libtransport/src/hicn/transport/core/content_object.h b/libtransport/src/hicn/transport/core/content_object.h index 5af548fe4..ef5144c23 100644 --- a/libtransport/src/hicn/transport/core/content_object.h +++ b/libtransport/src/hicn/transport/core/content_object.h @@ -60,9 +60,9 @@ class ContentObject : public Packet { ContentObject &setPathLabel(uint32_t path_label); - void setLocator(const ip_address_t &ip_address) override; + void setLocator(const ip_prefix_t &ip_address) override; - ip_address_t getLocator() const override; + ip_prefix_t getLocator() const override; void setLifetime(uint32_t lifetime) override; diff --git a/libtransport/src/hicn/transport/core/facade.h b/libtransport/src/hicn/transport/core/facade.h index c28c84671..27e738e62 100644 --- a/libtransport/src/hicn/transport/core/facade.h +++ b/libtransport/src/hicn/transport/core/facade.h @@ -37,7 +37,7 @@ namespace core { using HicnForwarderPortal = Portal<HicnForwarderInterface>; #ifdef __linux__ -#ifndef __ANDROID_API__ +#ifndef __ANDROID__ using RawSocketPortal = Portal<RawSocketInterface>; #endif #ifdef __vpp__ diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index 8fefba8ad..a89ed8a3c 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -50,7 +50,9 @@ class ForwarderInterface { output_interface_(""), content_store_reserved_(standard_cs_reserved) { inet_address_.family = AF_INET; + inet_address_.len = IPV4_ADDR_LEN; inet6_address_.family = AF_INET6; + inet6_address_.len = IPV6_ADDR_LEN; } public: @@ -133,8 +135,8 @@ class ForwarderInterface { protected: ConnectorType &connector_; - ip_address_t inet_address_; - ip_address_t inet6_address_; + ip_prefix_t inet_address_; + ip_prefix_t inet6_address_; uint16_t mtu_; std::string output_interface_; uint32_t content_store_reserved_; diff --git a/libtransport/src/hicn/transport/core/hicn_binary_api.c b/libtransport/src/hicn/transport/core/hicn_binary_api.c index 3868c0a14..8fde516fd 100644 --- a/libtransport/src/hicn/transport/core/hicn_binary_api.c +++ b/libtransport/src/hicn/transport/core/hicn_binary_api.c @@ -92,12 +92,12 @@ int hicn_binary_api_register_prod_app( CONTEXT_SAVE(context_store, api, mp) - mp->len = (u8)input_params->prefix->prefix_len; + mp->len = (u8)input_params->prefix->len; mp->swif = clib_host_to_net_u32(input_params->swif); mp->cs_reserved = clib_host_to_net_u32(input_params->cs_reserved); - mp->prefix[0] = clib_host_to_net_u64(input_params->prefix->as_u64[0]); - mp->prefix[1] = clib_host_to_net_u64(input_params->prefix->as_u64[1]); + mp->prefix[0] = clib_host_to_net_u64(input_params->prefix->address.as_u64[0]); + mp->prefix[1] = clib_host_to_net_u64(input_params->prefix->address.as_u64[1]); return vpp_binary_api_send_request_wait_reply(api->vpp_api, mp); } @@ -112,8 +112,8 @@ static void vl_api_hicn_api_register_prod_app_reply_t_handler( vpp_binary_api_set_ret_value(binary_api->vpp_api, clib_net_to_host_u32(mp->retval)); params->cs_reserved = mp->cs_reserved; - params->prod_addr->as_u64[0] = mp->prod_addr[0]; - params->prod_addr->as_u64[1] = mp->prod_addr[1]; + params->prod_addr->address.as_u64[0] = mp->prod_addr[0]; + params->prod_addr->address.as_u64[1] = mp->prod_addr[1]; params->face_id = clib_net_to_host_u32(mp->faceid); vpp_binary_api_unlock_waiting_thread(binary_api->vpp_api); @@ -147,9 +147,9 @@ static void vl_api_hicn_api_register_cons_app_reply_t_handler( vpp_binary_api_set_ret_value(binary_api->vpp_api, clib_net_to_host_u32(mp->retval)); - params->src4->as_ip46.ip4.as_u32 = clib_net_to_host_u32(mp->src_addr4); - params->src6->as_u64[0] = clib_net_to_host_u64(mp->src_addr6[0]); - params->src6->as_u64[1] = clib_net_to_host_u64(mp->src_addr6[1]); + params->src4->address.v4.as_u32 = clib_net_to_host_u32(mp->src_addr4); + params->src6->address.as_u64[0] = clib_net_to_host_u64(mp->src_addr6[0]); + params->src6->address.as_u64[1] = clib_net_to_host_u64(mp->src_addr6[1]); params->face_id = clib_host_to_net_u32(mp->faceid); vpp_binary_api_unlock_waiting_thread(binary_api->vpp_api); @@ -166,9 +166,9 @@ int hicn_binary_api_register_route( CONTEXT_SAVE(context_store, api, mp) - mp->prefix[0] = input_params->prefix->as_u64[0]; - mp->prefix[1] = input_params->prefix->as_u64[1]; - mp->len = input_params->prefix->prefix_len; + mp->prefix[0] = input_params->prefix->address.as_u64[0]; + mp->prefix[1] = input_params->prefix->address.as_u64[1]; + mp->len = input_params->prefix->len; mp->face_ids[0] = input_params->face_id; mp->n_faces = 1; diff --git a/libtransport/src/hicn/transport/core/hicn_binary_api.h b/libtransport/src/hicn/transport/core/hicn_binary_api.h index 410ffb96c..b09b5f4a7 100644 --- a/libtransport/src/hicn/transport/core/hicn_binary_api.h +++ b/libtransport/src/hicn/transport/core/hicn_binary_api.h @@ -16,6 +16,7 @@ #pragma once #include <hicn/transport/config.h> +#include <hicn/util/ip_address.h> #ifdef __vpp__ @@ -27,10 +28,8 @@ extern "C" { #include "stdint.h" -typedef struct ip_address ip_address_t; - typedef struct { - ip_address_t* prefix; + ip_prefix_t* prefix; uint32_t swif; uint32_t cs_reserved; } hicn_producer_input_params; @@ -41,18 +40,18 @@ typedef struct { typedef struct { uint32_t cs_reserved; - ip_address_t* prod_addr; + ip_prefix_t* prod_addr; uint32_t face_id; } hicn_producer_output_params; typedef struct { - ip_address_t* src4; - ip_address_t* src6; + ip_prefix_t* src4; + ip_prefix_t* src6; uint32_t face_id; } hicn_consumer_output_params; typedef struct { - ip_address_t* prefix; + ip_prefix_t* prefix; uint32_t face_id; } hicn_producer_set_route_params; @@ -75,4 +74,4 @@ char* hicn_binary_api_get_error_string(int ret_val); } #endif -#endif // __vpp__
\ No newline at end of file +#endif // __vpp__ diff --git a/libtransport/src/hicn/transport/core/interest.cc b/libtransport/src/hicn/transport/core/interest.cc index 60ab10967..6465053de 100644 --- a/libtransport/src/hicn/transport/core/interest.cc +++ b/libtransport/src/hicn/transport/core/interest.cc @@ -119,7 +119,7 @@ void Interest::setName(Name &&name) { } } -void Interest::setLocator(const ip_address_t &ip_address) { +void Interest::setLocator(const ip_prefix_t &ip_address) { if (hicn_interest_set_locator(format_, packet_start_, &ip_address) < 0) { throw errors::RuntimeException("Error setting interest locator."); } @@ -127,8 +127,8 @@ void Interest::setLocator(const ip_address_t &ip_address) { return; } -ip_address_t Interest::getLocator() const { - ip_address_t ip; +ip_prefix_t Interest::getLocator() const { + ip_prefix_t ip; if (hicn_interest_get_locator(format_, packet_start_, &ip) < 0) { throw errors::RuntimeException("Error getting interest locator."); @@ -163,4 +163,4 @@ void Interest::resetForHash() { } // end namespace core -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/interest.h b/libtransport/src/hicn/transport/core/interest.h index 48c833a73..12fd597e5 100644 --- a/libtransport/src/hicn/transport/core/interest.h +++ b/libtransport/src/hicn/transport/core/interest.h @@ -55,9 +55,9 @@ class Interest void setName(Name &&name) override; - void setLocator(const ip_address_t &ip_address) override; + void setLocator(const ip_prefix_t &ip_address) override; - ip_address_t getLocator() const override; + ip_prefix_t getLocator() const override; void setLifetime(uint32_t lifetime) override; diff --git a/libtransport/src/hicn/transport/core/name.cc b/libtransport/src/hicn/transport/core/name.cc index 0621eeeb5..46ef98948 100644 --- a/libtransport/src/hicn/transport/core/name.cc +++ b/libtransport/src/hicn/transport/core/name.cc @@ -169,11 +169,11 @@ std::shared_ptr<Sockaddr> Name::getAddress() const { return std::shared_ptr<Sockaddr>(ret); } -ip_address_t Name::toIpAddress() const { - ip_address_t ret; +ip_prefix_t Name::toIpAddress() const { + ip_prefix_t ret; std::memset(&ret, 0, sizeof(ret)); - if (hicn_name_to_ip_address(&name_, &ret) < 0) { + if (hicn_name_to_ip_prefix(&name_, &ret) < 0) { throw errors::InvalidIpAddressException(); } diff --git a/libtransport/src/hicn/transport/core/name.h b/libtransport/src/hicn/transport/core/name.h index 061371be5..35625ddd1 100644 --- a/libtransport/src/hicn/transport/core/name.h +++ b/libtransport/src/hicn/transport/core/name.h @@ -93,7 +93,7 @@ class Name { Name &setSuffix(uint32_t seq_number); - ip_address_t toIpAddress() const; + ip_prefix_t toIpAddress() const; void copyToDestination(uint8_t *destination, bool include_suffix = false) const; diff --git a/libtransport/src/hicn/transport/core/packet.h b/libtransport/src/hicn/transport/core/packet.h index 4ec93205a..825c4c9dd 100644 --- a/libtransport/src/hicn/transport/core/packet.h +++ b/libtransport/src/hicn/transport/core/packet.h @@ -133,9 +133,9 @@ class Packet : public std::enable_shared_from_this<Packet> { void dump() const; - virtual void setLocator(const ip_address_t &locator) = 0; + virtual void setLocator(const ip_prefix_t &locator) = 0; - virtual ip_address_t getLocator() const = 0; + virtual ip_prefix_t getLocator() const = 0; void setSignatureTimestamp(const uint64_t ×tamp); diff --git a/libtransport/src/hicn/transport/core/prefix.cc b/libtransport/src/hicn/transport/core/prefix.cc index 74d1466ac..6b87ccd1f 100644 --- a/libtransport/src/hicn/transport/core/prefix.cc +++ b/libtransport/src/hicn/transport/core/prefix.cc @@ -33,7 +33,7 @@ namespace transport { namespace core { -Prefix::Prefix() { std::memset(&ip_address_, 0, sizeof(ip_address_t)); } +Prefix::Prefix() { std::memset(&ip_address_, 0, sizeof(ip_prefix_t)); } Prefix::Prefix(const char *prefix) : Prefix(std::string(prefix)) {} @@ -67,7 +67,7 @@ Prefix::Prefix(const core::Name &content_name, uint16_t prefix_length) { } ip_address_ = content_name.toIpAddress(); - ip_address_.prefix_len = prefix_length; + ip_address_.len = prefix_length; ip_address_.family = family; } @@ -77,13 +77,13 @@ void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length, throw errors::InvalidIpAddressException(); } - int ret = inet_pton(family, prefix.c_str(), ip_address_.buffer); + int ret = inet_pton(family, prefix.c_str(), ip_address_.address.buffer); if (ret != 1) { throw errors::InvalidIpAddressException(); } - ip_address_.prefix_len = prefix_length; + ip_address_.len = prefix_length; ip_address_.family = family; } @@ -101,17 +101,17 @@ std::unique_ptr<Sockaddr> Prefix::toSockaddr() { throw errors::InvalidIpAddressException(); } - if (hicn_ip_to_sockaddr_address(&ip_address_, ret) < 0) { + if (ip_prefix_to_sockaddr(&ip_address_, ret) < 0) { throw errors::InvalidIpAddressException(); } return std::unique_ptr<Sockaddr>(ret); } -uint16_t Prefix::getPrefixLength() { return ip_address_.prefix_len; } +uint16_t Prefix::getPrefixLength() { return ip_address_.len; } Prefix &Prefix::setPrefixLength(uint16_t prefix_length) { - ip_address_.prefix_len = prefix_length; + ip_address_.len = prefix_length; return *this; } @@ -123,17 +123,17 @@ Prefix &Prefix::setAddressFamily(int address_family) { } std::string Prefix::getNetwork() const { - if (!checkPrefixLengthAndAddressFamily(ip_address_.prefix_len, + if (!checkPrefixLengthAndAddressFamily(ip_address_.len, ip_address_.family)) { throw errors::InvalidIpAddressException(); } std::size_t size = - ip_address_.family == AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; + ip_address_.family == 4 + AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; std::string network(size, 0); - if (hicn_ip_ntop(&ip_address_, (char *)network.c_str(), size) < 0) { + if (ip_prefix_ntop_short(&ip_address_, (char *)network.c_str(), size) < 0) { throw errors::RuntimeException( "Impossible to retrieve network from ip address."); } @@ -147,7 +147,7 @@ Name Prefix::getName() const { } Prefix &Prefix::setNetwork(std::string &network) { - if (!inet_pton(AF_INET6, network.c_str(), ip_address_.buffer)) { + if (!inet_pton(AF_INET6, network.c_str(), ip_address_.address.buffer)) { throw errors::RuntimeException("The network name is not valid."); } @@ -163,10 +163,10 @@ Name Prefix::makeRandomName() const { 0, std::numeric_limits<uint32_t>::max()); uint64_t random_number = idis(eng); - uint32_t hash_size_bits = IPV6_ADDR_LEN_BITS - ip_address_.prefix_len; + uint32_t hash_size_bits = IPV6_ADDR_LEN_BITS - ip_address_.len; uint64_t ip_address[2]; - memcpy(ip_address, ip_address_.buffer, sizeof(uint64_t)); - memcpy(ip_address + 1, ip_address_.buffer + 8, sizeof(uint64_t)); + memcpy(ip_address, ip_address_.address.buffer, sizeof(uint64_t)); + memcpy(ip_address + 1, ip_address_.address.buffer + 8, sizeof(uint64_t)); std::string network(IPV6_ADDR_LEN * 3, 0); // Let's do the magic ;) @@ -208,7 +208,7 @@ bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length, return true; } -ip_address_t &Prefix::toIpAddressStruct() { return ip_address_; } +ip_prefix_t &Prefix::toIpAddressStruct() { return ip_address_; } } // namespace core diff --git a/libtransport/src/hicn/transport/core/prefix.h b/libtransport/src/hicn/transport/core/prefix.h index b68c6bdf6..022e2bec2 100644 --- a/libtransport/src/hicn/transport/core/prefix.h +++ b/libtransport/src/hicn/transport/core/prefix.h @@ -52,7 +52,7 @@ class Prefix { Name makeRandomName() const; - ip_address_t &toIpAddressStruct(); + ip_prefix_t &toIpAddressStruct(); private: static bool checkPrefixLengthAndAddressFamily(uint16_t prefix_length, @@ -60,9 +60,9 @@ class Prefix { void buildPrefix(std::string &prefix, uint16_t prefix_length, int family); - ip_address_t ip_address_; + ip_prefix_t ip_address_; }; } // end namespace core -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.cc b/libtransport/src/hicn/transport/core/raw_socket_interface.cc index 4cf7b2ca6..ef365fce7 100644 --- a/libtransport/src/hicn/transport/core/raw_socket_interface.cc +++ b/libtransport/src/hicn/transport/core/raw_socket_interface.cc @@ -45,7 +45,7 @@ void RawSocketInterface::connect(bool is_consumer) { utils::retrieveInterfaceAddress(output_interface_, &address); inet6_address_.family = address.sin6_family; - std::memcpy(inet6_address_.buffer, &address.sin6_addr, + std::memcpy(inet6_address_.address.buffer, &address.sin6_addr, sizeof(address.sin6_addr)); connector_.connect(output_interface_, remote_mac_address_); } diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 61c5dfc7f..c8a4f9c88 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -87,8 +87,8 @@ uint32_t VPPForwarderInterface::getMemifConfiguration() { void VPPForwarderInterface::consumerConnection() { hicn_consumer_input_params input = {0}; hicn_consumer_output_params output = {0}; - ip_address_t ip4_address; - ip_address_t ip6_address; + ip_prefix_t ip4_address; + ip_prefix_t ip6_address; output.src4 = &ip4_address; output.src6 = &ip6_address; @@ -103,12 +103,12 @@ void VPPForwarderInterface::consumerConnection() { } inet_address_.family = AF_INET; - inet_address_.prefix_len = output.src4->prefix_len; - std::memcpy(inet_address_.buffer, output.src4->buffer, IPV6_ADDR_LEN); + inet_address_.len = output.src4->len; + std::memcpy(inet_address_.address.buffer, output.src4->address.buffer, IPV6_ADDR_LEN); inet6_address_.family = AF_INET6; - inet6_address_.prefix_len = output.src6->prefix_len; - std::memcpy(inet6_address_.buffer, output.src6->buffer, IPV6_ADDR_LEN); + inet6_address_.len = output.src6->len; + std::memcpy(inet6_address_.address.buffer, output.src6->address.buffer, IPV6_ADDR_LEN); } void VPPForwarderInterface::producerConnection() { @@ -144,7 +144,7 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { auto &addr = prefix.toIpAddressStruct(); // Same ip address for input and outurt params - ip_address_t ip_address; + ip_prefix_t ip_address; if (face_id_ == uint32_t(~0)) { hicn_producer_input_params input; @@ -160,10 +160,10 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { // memif_id, since this function should be called after the // memif creation. input.swif = sw_if_index_; - input.prefix->as_u64[0] = addr.as_u64[0]; - input.prefix->as_u64[1] = addr.as_u64[1]; + input.prefix->address.as_u64[0] = addr.address.as_u64[0]; + input.prefix->address.as_u64[1] = addr.address.as_u64[1]; input.prefix->family = addr.family == AF_INET6 ? AF_INET6 : AF_INET; - input.prefix->prefix_len = addr.prefix_len; + input.prefix->len = addr.len; input.cs_reserved = content_store_reserved_; int ret = hicn_binary_api_register_prod_app( @@ -174,25 +174,25 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { } if (addr.family == AF_INET6) { - inet6_address_.prefix_len = output.prod_addr->prefix_len; - inet6_address_.as_u64[0] = output.prod_addr->as_u64[0]; - inet6_address_.as_u64[1] = output.prod_addr->as_u64[1]; + inet6_address_.len = output.prod_addr->len; + inet6_address_.address.as_u64[0] = output.prod_addr->address.as_u64[0]; + inet6_address_.address.as_u64[1] = output.prod_addr->address.as_u64[1]; } else { - inet_address_.prefix_len = output.prod_addr->prefix_len; + inet_address_.len = output.prod_addr->len; // The ipv4 is written in the last 4 bytes of the ipv6 address, so we need // to copy from the byte 12 - inet_address_.as_u64[0] = output.prod_addr->as_u64[0]; - inet_address_.as_u64[1] = output.prod_addr->as_u64[1]; + inet_address_.address.as_u64[0] = output.prod_addr->address.as_u64[0]; + inet_address_.address.as_u64[1] = output.prod_addr->address.as_u64[1]; } face_id_ = output.face_id; } else { hicn_producer_set_route_params params; params.prefix = &ip_address; - params.prefix->as_u64[0] = addr.as_u64[0]; - params.prefix->as_u64[1] = addr.as_u64[1]; + params.prefix->address.as_u64[0] = addr.address.as_u64[0]; + params.prefix->address.as_u64[1] = addr.address.as_u64[1]; params.prefix->family = addr.family == AF_INET6 ? AF_INET6 : AF_INET; - params.prefix->prefix_len = addr.prefix_len; + params.prefix->len = addr.len; params.face_id = face_id_; int ret = hicn_binary_api_register_route(VPPForwarderInterface::hicn_api_, @@ -225,4 +225,4 @@ void VPPForwarderInterface::closeConnection() { } // namespace transport -#endif
\ No newline at end of file +#endif diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index d1e89efdc..5667b0640 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -31,6 +31,11 @@ #define HICN_MAX_DATA_SEQ 0xefffffff +//slow production rate param +#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed + // through experiments +#define LIFETIME_FRACTION 0.5 + // NACK HEADER // +-----------------------------------------+ // | 4 bytes: current segment in production | @@ -58,12 +63,15 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + timer_on_(false), active_(false) { lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); } RTCProducerSocket::RTCProducerSocket() @@ -74,12 +82,15 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + timer_on_(false), active_(false) { lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); } RTCProducerSocket::~RTCProducerSocket() {} @@ -159,6 +170,24 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { portal_->sendContentObject(content_object); + //remove interests from the interest cache if it exists + if(!seqs_map_.empty()){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + auto it_seqs = seqs_map_.find(currentSeg_); + if(it_seqs != seqs_map_.end()){ + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + seqs_map_.erase(it_seqs); + } + } + currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ; } @@ -170,14 +199,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + bool isActive; { utils::SpinLock::Acquire locked(lock_); isActive = active_; if (isActive) { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); if ((now - lastProduced_) > INACTIVE_TIME) { // socket is inactive active_ = false; @@ -186,13 +216,68 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { } } - if (TRANSPORT_EXPECT_FALSE(!isActive)) { - sendNack(*interest, false); + if(interestSeg > HICN_MAX_DATA_SEQ){ + sendNack(interestSeg, isActive); return; } - if(interestSeg > HICN_MAX_DATA_SEQ){ - sendNack(*interest, isActive); + // if the production rate is less than MIN_PRODUCTION_RATE we put the + // interest in a queue, otherwise we handle it in the usual way + if(bytesProductionRate_ < MIN_PRODUCTION_RATE && interestSeg > currentSeg_){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + uint64_t next_timer = ~0; + if(!timers_map_.empty()){ + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); + //check if the seq number exists already + auto it_seqs = seqs_map_.find(interestSeg); + if(it_seqs != seqs_map_.end()){ + //the seq already exists + if(expiration < it_seqs->second){ + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + it_seqs->second = expiration; + }else{ + //nothing to do here + return; + } + }else{ + // add the new seq + timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration)); + } + + //here we have at least one interest in the queue, we need to start or + //update the timer + if(!timer_on_){ + //set timeout + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } else { + //re-schedule the timer because a new interest will expires sooner + if(next_timer > timers_map_.begin()->first){ + interests_cache_timer_->cancel(); + scheduleTimer(timers_map_.begin()->first - now); + } + } + return; + } + + if (TRANSPORT_EXPECT_FALSE(!isActive)) { + sendNack(interestSeg, false); return; } @@ -202,18 +287,55 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { - sendNack(*interest, true); + sendNack(interestSeg, true); } // else drop packet } -void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) { +void RTCProducerSocket::scheduleTimer(uint64_t wait){ + interests_cache_timer_->expires_from_now( + std::chrono::milliseconds(wait)); + interests_cache_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestCacheTimer(); + }); +} + +void RTCProducerSocket::interestCacheTimer(){ + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + uint64_t expire = it_timers->first; + if(expire <= now){ + uint32_t seq = it_timers->second; + sendNack(seq, active_); + //remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + }else{ + //stop, we are done! + break; + } + } + if(timers_map_.empty()){ + timer_on_ = false; + }else{ + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } +} + +void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); ContentObject nack; nack.appendPayload(std::move(nack_payload)); - nack.setName(interest.getName()); + nack.setName(flowName_.setSuffix(sequence)); uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); *payload_ptr = currentSeg_; diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 5b9a23dd7..aa67f1a29 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -41,8 +41,10 @@ class RTCProducerSocket : public ProducerSocket { void onInterest(Interest::Ptr &&interest) override; private: - void sendNack(const Interest &interest, bool isActive); + void sendNack(uint32_t sequence, bool isActive); void updateStats(uint32_t packet_size, uint64_t now); + void scheduleTimer(uint64_t wait); + void interestCacheTimer(); uint32_t currentSeg_; uint32_t prodLabel_; @@ -55,6 +57,20 @@ class RTCProducerSocket : public ProducerSocket { uint32_t perSecondFactor_; uint64_t lastStats_; + // cache for the received interests + // this map maps the expiration time of an interest to + // its sequence number. the map is sorted by timeouts + // the same timeout may be used for multiple sequence numbers + // but for each sequence number we store only the smallest + // expiry time. In this way the mapping from seqs_map_ to + // timers_map_ is unique + std::multimap<uint64_t,uint32_t> timers_map_; + // this map does the opposite, this map is not ordered + std::unordered_map<uint32_t,uint64_t> seqs_map_; + bool timer_on_; + std::unique_ptr<asio::steady_timer> interests_cache_timer_; + utils::SpinLock interests_cache_lock_; + uint64_t lastProduced_; bool active_; utils::SpinLock lock_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index b3a00c58d..4104d8883 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -744,7 +744,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) old_nack = true; } else if (productionSeg < nackSegment) { - actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ; + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; if(!rtx){ // we are asking stuff in the future |