aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2022-04-22 17:55:01 +0200
committerMauro Sardara <msardara@cisco.com>2022-04-26 15:30:21 +0200
commita1ac96f497719b897793ac14b287cb8d840651c1 (patch)
tree12c608fe352c21d944b0340ce8d3f0be0fb23b11
parent1ac07d842a3a6ce0fb7fa4039241c8ec1a71419b (diff)
HICN-722: Updates on transport, RTC, manifest usage for RTC, infra.
Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Angelo Mantellini <manangel@cisco.com> Co-authored-by: Jacques Samain <jsamain@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> manifest: optimize manifest processing manifest: add FEC parameters to manifests manifest: refactor verification process manifest: report auth alerts in hiperf instead of aborting manifest: remove FEC buffer callback in consumer manifest: refactor and enable manifests by default manifest: update manifest header with transport parameters manifest: batch interests for first manifest from RTC producer manifest: refactor processing of RTC manifests manifest: update manifest-related socket options of consumers manifest: update unit tests for manifests manifest: pack manifest headers manifest: verify FEC packets auth: add consumer socket option to set max unverified delay manifest: process manifests after full FEC decoding manifest: manage forward jumps in RTC verifier fec: remove useless fec codes rs: add new code rate rs: add new code rate rs: add new code rate rs: add new code rate libtransport: increase internal packet cache size remove internal cisco info in cmake manifest: add option to set manifest capacity data_input_node.c: add information about adj_index[VLIB_RX] on received data packetsi sysrepo plugin: update build Change-Id: I0cf64d91bd0a1b7cad4eeaa9871f58f5f10434af Signed-off-by: Mauro Sardara <msardara@cisco.com> Signed-off-by: Luca Muscariello <muscariello@ieee.org>
-rw-r--r--.cz.toml2
-rw-r--r--.gitignore2
-rw-r--r--CMakeLists.txt4
-rw-r--r--Dockerfile.dev52
-rw-r--r--apps/hiperf/src/client.cc30
-rw-r--r--apps/hiperf/src/common.h8
-rw-r--r--apps/hiperf/src/main.cc43
m---------cmake0
-rw-r--r--ctrl/libhicnctrl/src/api.c7
-rw-r--r--ctrl/libhicnctrl/src/api_private.h1
-rw-r--r--ctrl/sysrepo-plugins/cmake/Modules/Packaging.cmake9
-rw-r--r--ctrl/sysrepo-plugins/hicn-plugin/CMakeLists.txt2
-rw-r--r--extras/router-plugin/cmake/Modules/Packaging.cmake9
-rw-r--r--hicn-light/src/hicn/base/bitmap.h9
-rw-r--r--hicn-light/src/hicn/base/pool.c2
-rw-r--r--hicn-light/src/hicn/test/test-bitmap.cc11
-rw-r--r--hicn-light/src/hicn/test/test-ctrl.cc32
-rw-r--r--hicn-plugin/cmake/packaging.cmake9
-rw-r--r--libtransport/cmake/packaging.cmake9
-rw-r--r--libtransport/includes/hicn/transport/auth/crypto_hash.h4
-rw-r--r--libtransport/includes/hicn/transport/auth/signer.h8
-rw-r--r--libtransport/includes/hicn/transport/auth/verifier.h16
-rw-r--r--libtransport/includes/hicn/transport/core/packet.h4
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_options_default_values.h5
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_options_keys.h3
-rw-r--r--libtransport/src/auth/crypto_hash.cc40
-rw-r--r--libtransport/src/auth/signer.cc53
-rw-r--r--libtransport/src/auth/verifier.cc49
-rw-r--r--libtransport/src/core/manifest_format.h5
-rw-r--r--libtransport/src/core/manifest_format_fixed.cc8
-rw-r--r--libtransport/src/core/manifest_format_fixed.h6
-rw-r--r--libtransport/src/core/packet.cc13
-rw-r--r--libtransport/src/implementation/socket_consumer.h35
-rw-r--r--libtransport/src/protocols/fec/fec.cc2
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc96
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.h7
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc37
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h8
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc159
-rw-r--r--libtransport/src/protocols/rtc/rtc.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h7
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc42
-rw-r--r--libtransport/src/protocols/rtc/rtc_forwarding_strategy.h4
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc71
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h19
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.cc81
-rw-r--r--libtransport/src/protocols/rtc/rtc_recovery_strategy.h31
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.cc36
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_delay.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.cc44
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_fec_only.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.cc11
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_low_rate.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc8
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_recovery_off.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc9
-rw-r--r--libtransport/src/protocols/rtc/rtc_rs_rtx_only.h3
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc138
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h37
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.cc249
-rw-r--r--libtransport/src/protocols/rtc/rtc_verifier.h96
-rw-r--r--libtransport/src/protocols/transport_protocol.cc6
-rw-r--r--libtransport/src/test/test_auth.cc24
-rw-r--r--libtransport/src/test/test_core_manifest.cc2
-rw-r--r--libtransport/src/utils/max_filter.h6
-rw-r--r--libtransport/src/utils/min_filter.h6
-rw-r--r--tests/.env1
-rw-r--r--tests/1-node.yml26
-rw-r--r--tests/2-nodes-hicn-light.yml2
-rw-r--r--tests/2-nodes.yml1
-rw-r--r--tests/Makefile35
-rwxr-xr-xtests/config.sh232
-rw-r--r--tests/functional-tests/hicn-light-control.robot29
-rw-r--r--tests/resources/libraries/robot/common.robot2
74 files changed, 1370 insertions, 702 deletions
diff --git a/.cz.toml b/.cz.toml
index 2cbf9452e..1f9d82df6 100644
--- a/.cz.toml
+++ b/.cz.toml
@@ -1,5 +1,5 @@
[tool]
[tool.commitizen]
name = "cz_conventional_commits"
-version = "3.9.1"
+version = "3.11.3"
tag_format = "v$version"
diff --git a/.gitignore b/.gitignore
index 685e356c6..9099d8d0a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,5 @@
build*
-.ccls-cache/
+.cache/
compile_commands.json
sonar*
.scannerwork*
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1ba276c41..27afca29b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -54,6 +54,10 @@ if(EXISTS "${PROJECT_SOURCE_DIR}/internal/cmake/Modules")
set(INTERNAL_ENVIRONMENT True)
endif()
include("${CMAKE_CURRENT_SOURCE_DIR}/versions.cmake")
+list(GET VPP_DEFAULT_VERSION 0 VPP_DEFAULT_VERSION)
+set(PREFIX_VERSION "${VPP_DEFAULT_VERSION}")
+string(REPLACE "-" ";" PREFIX_VERSION ${PREFIX_VERSION})
+list (GET PREFIX_VERSION 0 PREFIX_VERSION)
include(CommonSetup)
diff --git a/Dockerfile.dev b/Dockerfile.dev
index bcd75746f..cfb75f1b4 100644
--- a/Dockerfile.dev
+++ b/Dockerfile.dev
@@ -1,53 +1,17 @@
-FROM ubuntu:focal
-
+FROM dockerhub.cisco.com/icn-docker/hicn-base-devel-focal:x86_64
WORKDIR /hicn-build
-RUN apt-get update
-
-# Do not prompt
-ENV DEBIAN_FRONTEND=noninteractive
-
-# Prevent vpp to set sysctl
-ENV VPP_INSTALL_SKIP_SYSCTL=1
-
-# Add packagecloud repo
-RUN apt-get install -y git curl
-
# Get versions from versions.cmake
-ARG VERSIONS_PATH=/tmp/versions.cmake
-COPY versions.cmake ${VERSIONS_PATH}
-
-RUN export VPP_VERSION=$(cat ${VERSIONS_PATH} | grep VPP_DEFAULT_VERSION | cut -d ' ' -f 2 | tr -d '"' | grep -Po '\d\d.\d\d') && echo ${VPP_VERSION}
+ARG VERSION_PATH=/tmp/versions.cmake
+COPY versions.cmake ${VERSION_PATH}
+ARG INSTALL_VPP_SCRIPT=/tmp/install-vpp.sh
+COPY scripts/install-vpp.sh ${INSTALL_VPP_SCRIPT}
-# Install devel packages
-RUN apt update
-SHELL ["/bin/bash", "-c"]
-RUN export VPP_VERSION=$(cat ${VERSIONS_PATH} | grep VPP_DEFAULT_VERSION | cut -d ' ' -f 2 | tr -d '"' | grep -Po '\d\d.\d\d') && \
- curl -s https://packagecloud.io/install/repositories/fdio/${VPP_VERSION//./}/script.deb.sh | bash && \
- apt-get install -y \
- cmake \
- ninja-build \
- unzip \
- libconfig-dev \
- python3-ply \
- libconfig++-dev \
- build-essential \
- vpp-dev \
- libvppinfra-dev \
- vpp-plugin-core \
- vpp \
- libvppinfra \
- libevent-dev \
+RUN VERSION_PATH=${VERSION_PATH} bash -x ${INSTALL_VPP_SCRIPT}
+RUN apt update && apt-get install -y \
libssl-dev \
- make \
- sudo \
- libcurl4-openssl-dev \
iproute2 \
iperf3 \
iputils-ping \
tcpdump \
- gdb \
- libasio-dev --no-install-recommends
-
-# Switch back to dialog for any ad-hoc use of apt-get
-ENV DEBIAN_FRONTEND=
+ gdb --no-install-recommends
diff --git a/apps/hiperf/src/client.cc b/apps/hiperf/src/client.cc
index 319fa82ab..ba36cd20e 100644
--- a/apps/hiperf/src/client.cc
+++ b/apps/hiperf/src/client.cc
@@ -696,8 +696,11 @@ class HIperfClient::Impl : ForwarderInterface::ICallback {
configuration_.interest_lifetime_);
consumer_socket_->setSocketOption(
- GeneralTransportOptions::MAX_UNVERIFIED_TIME,
- configuration_.unverified_delay_);
+ GeneralTransportOptions::UNVERIFIED_INTERVAL,
+ configuration_.unverified_interval_);
+
+ consumer_socket_->setSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
+ configuration_.unverified_ratio_);
if (consumer_socket_->setSocketOption(
GeneralTransportOptions::PACKET_FORMAT,
@@ -715,6 +718,20 @@ class HIperfClient::Impl : ForwarderInterface::ICallback {
std::cout << "Signal SIGUSR1!" << std::endl;
mtrace();
});
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE,
+ [this](notification::Strategy strategy) {
+ std::cout << "Forwarder strategy callback" << std::endl;
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
+
+ ret = consumer_socket_->setSocketOption(
+ ConsumerCallbacksOptions::REC_STRATEGY_CHANGE,
+ [this](notification::Strategy strategy) {
+ std::cout << "Recovery strategy callback" << std::endl;
+ });
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
#endif
if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE,
@@ -856,15 +873,6 @@ class HIperfClient::Impl : ForwarderInterface::ICallback {
}
if (configuration_.rtc_) {
- ret = consumer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE,
- configuration_.fec_type_);
-
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
- }
-
- if (configuration_.rtc_) {
std::shared_ptr<TransportStatistics> transport_stats;
consumer_socket_->getSocketOption(
OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats);
diff --git a/apps/hiperf/src/common.h b/apps/hiperf/src/common.h
index 13c9dcc1d..3a90f3732 100644
--- a/apps/hiperf/src/common.h
+++ b/apps/hiperf/src/common.h
@@ -180,13 +180,13 @@ struct ClientConfiguration {
secure_(false),
producer_prefix_(),
interest_lifetime_(500),
- unverified_delay_(2000),
+ unverified_interval_(10000),
+ unverified_ratio_(0.2),
relay_name_("c001::abcd/64"),
output_stream_mode_(false),
port_(0),
recovery_strategy_(4),
aggregated_data_(false),
- fec_type_(""),
packet_format_(default_values::packet_format),
print_headers_(true),
nb_iterations_(std::numeric_limits<decltype(nb_iterations_)>::max()) {}
@@ -208,13 +208,13 @@ struct ClientConfiguration {
bool secure_;
Prefix producer_prefix_;
uint32_t interest_lifetime_;
- uint32_t unverified_delay_;
+ uint32_t unverified_interval_;
+ double unverified_ratio_;
Prefix relay_name_;
bool output_stream_mode_;
uint16_t port_;
uint32_t recovery_strategy_;
bool aggregated_data_;
- std::string fec_type_;
Packet::Format packet_format_;
bool print_headers_;
std::uint32_t nb_iterations_;
diff --git a/apps/hiperf/src/main.cc b/apps/hiperf/src/main.cc
index b69392de8..9c0f0a140 100644
--- a/apps/hiperf/src/main.cc
+++ b/apps/hiperf/src/main.cc
@@ -146,7 +146,7 @@ void usage() {
std::cerr << "-g\t<port>\t\t\t\t"
<< "Output stream to localhost at the specified port" << std::endl;
std::cerr << "-e\t<strategy>\t\t\t"
- << "Enance the network with a realiability strategy. Options 1:"
+ << "Enhance the network with a reliability strategy. Options 1:"
<< " unreliable, 2: rtx only, 3: fec only, "
<< "4: delay based, 5: low rate, 6: low rate and best path "
<< "7: low rate and replication, 8: low rate and best"
@@ -187,9 +187,10 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
+ // Please keep in alphabetical order.
while ((opt = getopt(argc, argv,
- "DSCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xE:P:B:ItL:z:T:F:j:"
- "g:G:e:awHn:X:u:")) != -1) {
+ "A:B:CDE:F:G:HIK:L:M:P:RST:U:W:X:ab:c:d:e:f:g:hi:j:k:lm:"
+ "n:p:rs:tu:vwxy:z:")) != -1) {
switch (opt) {
// Common
case 'D': {
@@ -222,11 +223,10 @@ int main(int argc, char *argv[]) {
break;
}
#else
- while (
- (opt = getopt(
- argc, argv,
- "SCf:b:d:W:RM:c:vA:s:rm:lK:k:y:p:hi:xB:E:P:tL:z:F:j:e:awHn:X:u:")) !=
- -1) {
+ // Please keep in alphabetical order.
+ while ((opt = getopt(argc, argv,
+ "A:B:CE:F:HK:L:M:P:RSU:W:X:ab:c:d:e:f:hi:j:k:lm:n:p:rs:"
+ "tu:vwxy:z:")) != -1) {
switch (opt) {
#endif
case 'f': {
@@ -243,16 +243,16 @@ int main(int argc, char *argv[]) {
server_configuration.aggregated_data_ = true;
break;
}
- case 'X': {
- client_configuration.fec_type_ = std::string(optarg);
- server_configuration.fec_type_ = std::string(optarg);
- break;
- }
case 'w': {
client_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
server_configuration.packet_format_ = Packet::Format::HF_INET6_UDP;
break;
}
+ case 'k': {
+ server_configuration.passphrase = std::string(optarg);
+ client_configuration.passphrase = std::string(optarg);
+ break;
+ }
case 'z': {
config.name = optarg;
break;
@@ -271,11 +271,6 @@ int main(int argc, char *argv[]) {
role += 1;
break;
}
- case 'k': {
- server_configuration.passphrase = std::string(optarg);
- client_configuration.passphrase = std::string(optarg);
- break;
- }
// Client specifc
case 'b': {
@@ -324,7 +319,12 @@ int main(int argc, char *argv[]) {
break;
}
case 'u': {
- client_configuration.unverified_delay_ = std::stoul(optarg);
+ client_configuration.unverified_interval_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 'U': {
+ client_configuration.unverified_ratio_ = std::stod(optarg);
options = 1;
break;
}
@@ -419,6 +419,11 @@ int main(int argc, char *argv[]) {
options = 1;
break;
}
+ case 'X': {
+ server_configuration.fec_type_ = std::string(optarg);
+ options = -1;
+ break;
+ }
case 'h':
default:
usage();
diff --git a/cmake b/cmake
-Subproject 5fc3324dd897a75aa1fca4290a8ec02a8f19386
+Subproject 64155d4e52b4f8763667230ce489c675b0e47bf
diff --git a/ctrl/libhicnctrl/src/api.c b/ctrl/libhicnctrl/src/api.c
index 4156ceff9..472e07bc4 100644
--- a/ctrl/libhicnctrl/src/api.c
+++ b/ctrl/libhicnctrl/src/api.c
@@ -689,7 +689,12 @@ int hc_route_validate(const hc_route_t *route) {
ERROR("[hc_route_validate] Invalid connection id");
return -1;
}
- if (!IS_VALID_NAME(route->name) && !IS_VALID_STR_ID(route->name)) {
+ if (route->name[0] == '\0') {
+ if (!IS_VALID_FACE_ID(route->face_id)) {
+ ERROR("[hc_route_validate] Invalid face_id");
+ return -1;
+ }
+ } else if (!IS_VALID_NAME(route->name) && !IS_VALID_STR_ID(route->name)) {
ERROR("[hc_route_validate] Invalid name specified");
return -1;
}
diff --git a/ctrl/libhicnctrl/src/api_private.h b/ctrl/libhicnctrl/src/api_private.h
index 11cb2e00e..65b175810 100644
--- a/ctrl/libhicnctrl/src/api_private.h
+++ b/ctrl/libhicnctrl/src/api_private.h
@@ -73,6 +73,7 @@ static inline bool IS_VALID_STR_ID(const char *name) {
#define IS_VALID_TYPE(x) IS_VALID_ENUM_TYPE(FACE_TYPE, x)
#define IS_VALID_ADDR_TYPE(x) ((x >= ADDR_INET) && (x <= ADDR_UNIX))
+#define IS_VALID_FACE_ID(x) ((x) != INVALID_FACE_ID)
#define IS_VALID_ID(x) (1)
#define IS_VALID_POLICY(x) (1)
diff --git a/ctrl/sysrepo-plugins/cmake/Modules/Packaging.cmake b/ctrl/sysrepo-plugins/cmake/Modules/Packaging.cmake
index cba9adf1f..19be58beb 100644
--- a/ctrl/sysrepo-plugins/cmake/Modules/Packaging.cmake
+++ b/ctrl/sysrepo-plugins/cmake/Modules/Packaging.cmake
@@ -15,22 +15,17 @@
# Packages section
######################
-##############################################################
-# Get VPP version
-##############################################################
-list(GET VPP_DEFAULT_VERSION 0 VPP_VERSION)
-
set(hicn-sysrepo-plugin_DESCRIPTION
"A Plugin to enable hICN VPP in sysrepo."
CACHE STRING "Description for deb/rpm package."
)
set(hicn-sysrepo-plugin_DEB_DEPENDENCIES
- "hicn-plugin (= ${VPP_VERSION}-release), sysrepo (>= 1.0)"
+ "hicn-plugin (= ${PREFIX_VERSION}-release), sysrepo (>= 1.0)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(hicn-sysrepo-plugin_RPM_DEPENDENCIES
- "hicn-plugin = ${VPP_VERSION}-release, sysrepo >= 1.0"
+ "hicn-plugin = ${PREFIX_VERSION}-release, sysrepo >= 1.0"
CACHE STRING "Dependencies for deb/rpm package."
)
diff --git a/ctrl/sysrepo-plugins/hicn-plugin/CMakeLists.txt b/ctrl/sysrepo-plugins/hicn-plugin/CMakeLists.txt
index 9204e2766..3638f5456 100644
--- a/ctrl/sysrepo-plugins/hicn-plugin/CMakeLists.txt
+++ b/ctrl/sysrepo-plugins/hicn-plugin/CMakeLists.txt
@@ -26,7 +26,7 @@ set(SOURCE_FILES
plugin/hicn_plugin.c
)
-list(APPEND SYSREPO_PLUGIN_INCLUDE_DIRS
+list(APPEND SYSREPO_PLUGIN_INCLUDE_DIRS PRIVATE
${VPP_INCLUDE_DIRS}
${HICNPLUGIN_INCLUDE_DIRS}
${SYSREPO_INCLUDE_DIRS}
diff --git a/extras/router-plugin/cmake/Modules/Packaging.cmake b/extras/router-plugin/cmake/Modules/Packaging.cmake
index 15a4f68b8..972f804a6 100644
--- a/extras/router-plugin/cmake/Modules/Packaging.cmake
+++ b/extras/router-plugin/cmake/Modules/Packaging.cmake
@@ -15,22 +15,17 @@
# Packages section
######################
-##############################################################
-# Get VPP version
-##############################################################
-list(GET VPP_DEFAULT_VERSION 0 VPP_VERSION)
-
set(${HICN_EXTRA_PLUGIN}_DESCRIPTION
"A extra plugin to VPP."
CACHE STRING "Description for deb/rpm package."
)
set(${HICN_EXTRA_PLUGIN}_DEB_DEPENDENCIES
- "vpp (= ${VPP_VERSION}-release), vpp-plugin-core (= ${VPP_VERSION}-release)"
+ "vpp (= ${PREFIX_VERSION}-release), vpp-plugin-core (= ${PREFIX_VERSION}-release)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_EXTRA_PLUGIN}_RPM_DEPENDENCIES
- "vpp = ${VPP_VERSION}-release, vpp-plugins = ${VPP_VERSION}-release"
+ "vpp = ${PREFIX_VERSION}-release, vpp-plugins = ${PREFIX_VERSION}-release"
CACHE STRING "Dependencies for deb/rpm package."
)
diff --git a/hicn-light/src/hicn/base/bitmap.h b/hicn-light/src/hicn/base/bitmap.h
index ed3fac2fd..060fd5be0 100644
--- a/hicn-light/src/hicn/base/bitmap.h
+++ b/hicn-light/src/hicn/base/bitmap.h
@@ -114,12 +114,15 @@ static inline int bitmap_get(const bitmap_t* bitmap, off_t i) {
*
* @return bool
*/
-static inline int _bitmap_set(bitmap_t** bitmap, off_t i) {
- if (bitmap_ensure_pos(bitmap, i) < 0) return -1;
+static inline int _bitmap_set(bitmap_t** bitmap_ptr, off_t i) {
+ if (bitmap_ensure_pos(bitmap_ptr, i) < 0) return -1;
+
+ bitmap_t* bitmap = *bitmap_ptr;
size_t offset = i / BITMAP_WIDTH(bitmap);
size_t pos = i % BITMAP_WIDTH(bitmap);
size_t shift = BITMAP_WIDTH(bitmap) - pos - 1;
- (*bitmap)[offset] |= (bitmap_t)1 << shift;
+
+ bitmap[offset] |= (bitmap_t)1 << shift;
return 0;
}
diff --git a/hicn-light/src/hicn/base/pool.c b/hicn-light/src/hicn/base/pool.c
index ba2a14c5f..e5fb7d6ac 100644
--- a/hicn-light/src/hicn/base/pool.c
+++ b/hicn-light/src/hicn/base/pool.c
@@ -60,7 +60,7 @@ void _pool_init(void** pool_ptr, size_t elt_size, size_t init_size,
ph->free_indices = free_indices;
/* Free bitmap */
- uint_fast32_t* fb = ph->free_bitmap;
+ bitmap_t* fb = ph->free_bitmap;
bitmap_init(fb, init_size, max_size);
bitmap_set_to(fb, init_size);
ph->free_bitmap = fb;
diff --git a/hicn-light/src/hicn/test/test-bitmap.cc b/hicn-light/src/hicn/test/test-bitmap.cc
index f1bf1ae5a..f9cd4024f 100644
--- a/hicn-light/src/hicn/test/test-bitmap.cc
+++ b/hicn-light/src/hicn/test/test-bitmap.cc
@@ -103,6 +103,17 @@ TEST_F(BitmapTest, BitmapSet) {
EXPECT_FALSE(bitmap_is_set(bitmap, 19));
EXPECT_TRUE(bitmap_is_unset(bitmap, 19));
+ // Test edge cases (i.e. start and end of block)
+ off_t start_position = 0;
+ bitmap_set(bitmap, start_position);
+ EXPECT_TRUE(bitmap_is_set(bitmap, start_position));
+ EXPECT_FALSE(bitmap_is_unset(bitmap, start_position));
+
+ off_t end_position = BITMAP_WIDTH(bitmap) - 1;
+ bitmap_set(bitmap, end_position);
+ EXPECT_TRUE(bitmap_is_set(bitmap, end_position));
+ EXPECT_FALSE(bitmap_is_unset(bitmap, end_position));
+
bitmap_free(bitmap);
}
diff --git a/hicn-light/src/hicn/test/test-ctrl.cc b/hicn-light/src/hicn/test/test-ctrl.cc
index 77b16a8af..e24b47f27 100644
--- a/hicn-light/src/hicn/test/test-ctrl.cc
+++ b/hicn-light/src/hicn/test/test-ctrl.cc
@@ -161,4 +161,36 @@ TEST_F(CtrlTest, AddRouteInvalidCost) {
result = hc_route_create_conf(s_, &command_.object.route);
success = hc_result_get_success(s_, result);
EXPECT_FALSE(success);
+}
+
+TEST_F(CtrlTest, RouteNameOrID) {
+ hc_route_t route = {
+ .face_id = (face_id_t)INVALID_FACE_ID,
+ .family = AF_INET6,
+ .remote_addr = IPV6_LOOPBACK,
+ .len = 64,
+ .cost = 1,
+ };
+
+ // At least one between name (symbolic or ID) and face_id
+ // should be set to make the route valid
+
+ // Valid name (symbolic)
+ snprintf(route.name, SYMBOLIC_NAME_LEN, "%s", "test");
+ EXPECT_EQ(hc_route_validate(&route), 0);
+
+ // Valid name (ID)
+ snprintf(route.name, SYMBOLIC_NAME_LEN, "%s", "conn0");
+ EXPECT_EQ(hc_route_validate(&route), 0);
+
+ // Valid face_id
+ route.face_id = 1;
+ snprintf(route.name, SYMBOLIC_NAME_LEN, "%s", "");
+ EXPECT_EQ(hc_route_validate(&route), 0);
+
+ // Invalid name stating with number
+ // (face_id is only checked if empty name)
+ route.face_id = 1;
+ snprintf(route.name, SYMBOLIC_NAME_LEN, "%s", "1test");
+ EXPECT_EQ(hc_route_validate(&route), -1);
} \ No newline at end of file
diff --git a/hicn-plugin/cmake/packaging.cmake b/hicn-plugin/cmake/packaging.cmake
index 276fc0b95..3e1f89131 100644
--- a/hicn-plugin/cmake/packaging.cmake
+++ b/hicn-plugin/cmake/packaging.cmake
@@ -15,11 +15,6 @@
# Packages section
######################
-##############################################################
-# Get VPP version
-##############################################################
-list(GET VPP_DEFAULT_VERSION 0 VPP_VERSION)
-
set(${HICN_PLUGIN}_DESCRIPTION
"A high-performance Hybrid ICN forwarder as a plugin to VPP."
CACHE STRING "Description for deb/rpm package."
@@ -54,12 +49,12 @@ fi;
")
set(${HICN_PLUGIN}_DEB_DEPENDENCIES
- "${LIBHICN_COMPONENT} (= stable_version), vpp (>= ${VPP_VERSION}), vpp-plugin-core (>= ${VPP_VERSION})"
+ "${LIBHICN_COMPONENT} (= stable_version), vpp (>= ${PREFIX_VERSION}), vpp-plugin-core (>= ${PREFIX_VERSION})"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${HICN_PLUGIN}-dev_DEB_DEPENDENCIES
- "${LIBHICN_COMPONENT}-dev (= stable_version), vpp-dev (>= ${VPP_VERSION}), libvppinfra-dev (>= ${VPP_VERSION})"
+ "${LIBHICN_COMPONENT}-dev (= stable_version), vpp-dev (>= ${PREFIX_VERSION}), libvppinfra-dev (>= ${PREFIX_VERSION})"
CACHE STRING "Dependencies for deb/rpm package."
)
diff --git a/libtransport/cmake/packaging.cmake b/libtransport/cmake/packaging.cmake
index f7f0c27e3..afdf3b371 100644
--- a/libtransport/cmake/packaging.cmake
+++ b/libtransport/cmake/packaging.cmake
@@ -17,11 +17,6 @@
# Packages section
######################
-##############################################################
-# Get VPP version
-##############################################################
-list(GET VPP_DEFAULT_VERSION 0 VPP_VERSION)
-
set(${LIBTRANSPORT_COMPONENT}_DESCRIPTION
"Libhicn-transport provides transport services and \
socket API for applications willing to communicate \
@@ -62,12 +57,12 @@ set(${LIBTRANSPORT_COMPONENT}-dev_RPM_DEPENDENCIES
)
set(${LIBTRANSPORT_COMPONENT}-io-modules_DEB_DEPENDENCIES
- "${LIBTRANSPORT_COMPONENT} (= stable_version), vpp (>= ${VPP_VERSION}), hicn-plugin (= stable_version)"
+ "${LIBTRANSPORT_COMPONENT} (= stable_version), vpp (>= ${PREFIX_VERSION}), hicn-plugin (= stable_version)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(${LIBTRANSPORT_COMPONENT}-io-modules_RPM_DEPENDENCIES
- "${LIBTRANSPORT_COMPONENT} = stable_version, vpp >= ${VPP_VERSION}, hicn-plugin = stable_version"
+ "${LIBTRANSPORT_COMPONENT} = stable_version, vpp >= ${PREFIX_VERSION}, hicn-plugin = stable_version"
CACHE STRING "Dependencies for deb/rpm package."
)
diff --git a/libtransport/includes/hicn/transport/auth/crypto_hash.h b/libtransport/includes/hicn/transport/auth/crypto_hash.h
index 3c734fee3..29ea27114 100644
--- a/libtransport/includes/hicn/transport/auth/crypto_hash.h
+++ b/libtransport/includes/hicn/transport/auth/crypto_hash.h
@@ -62,7 +62,7 @@ class CryptoHash {
void computeDigest(const utils::MemBuf *buffer);
// Return the computed hash
- std::vector<uint8_t> getDigest() const;
+ const utils::MemBuf::Ptr &getDigest() const;
// Return the computed hash as a string
std::string getStringDigest() const;
@@ -94,7 +94,7 @@ class CryptoHash {
private:
CryptoHashType digest_type_;
- std::vector<uint8_t> digest_;
+ utils::MemBuf::Ptr digest_;
std::size_t digest_size_;
};
diff --git a/libtransport/includes/hicn/transport/auth/signer.h b/libtransport/includes/hicn/transport/auth/signer.h
index 5a7598991..e1b3cae5c 100644
--- a/libtransport/includes/hicn/transport/auth/signer.h
+++ b/libtransport/includes/hicn/transport/auth/signer.h
@@ -46,9 +46,9 @@ class Signer {
virtual void signBuffer(const utils::MemBuf *buffer);
// Return the signature.
- std::vector<uint8_t> getSignature() const;
+ const utils::MemBuf::Ptr &getSignature() const;
- // Return the signature as a string
+ // Return the signature as a string.
std::string getStringSignature() const;
// Return the signature size in bytes.
@@ -65,12 +65,12 @@ class Signer {
// Return the hash algorithm associated to the signer.
CryptoHashType getHashType() const;
- // Print signature to stdout
+ // Print signature to stdout.
void display();
protected:
CryptoSuite suite_;
- std::vector<uint8_t> signature_;
+ utils::MemBuf::Ptr signature_;
std::size_t signature_len_;
std::shared_ptr<EVP_PKEY> key_;
CryptoHash key_id_;
diff --git a/libtransport/includes/hicn/transport/auth/verifier.h b/libtransport/includes/hicn/transport/auth/verifier.h
index 677a1efe4..c89138339 100644
--- a/libtransport/includes/hicn/transport/auth/verifier.h
+++ b/libtransport/includes/hicn/transport/auth/verifier.h
@@ -55,10 +55,10 @@ class Verifier {
// Verify a single packet or buffer.
virtual bool verifyPacket(PacketPtr packet);
virtual bool verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) = 0;
virtual bool verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) = 0;
// Verify a batch of packets. Return a mapping from packet suffixes to their
@@ -111,10 +111,10 @@ class VoidVerifier : public Verifier {
public:
bool verifyPacket(PacketPtr packet) override;
bool verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
bool verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
PolicyMap verifyPackets(const std::vector<PacketPtr> &packets) override;
@@ -144,10 +144,10 @@ class AsymmetricVerifier : public Verifier {
void useCertificate(std::shared_ptr<X509> cert);
bool verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
bool verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
private:
@@ -167,10 +167,10 @@ class SymmetricVerifier : public Verifier {
void setPassphrase(const std::string &passphrase);
bool verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
bool verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) override;
protected:
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h
index 059430f1d..c1671d439 100644
--- a/libtransport/includes/hicn/transport/core/packet.h
+++ b/libtransport/includes/hicn/transport/core/packet.h
@@ -151,13 +151,13 @@ class Packet : public utils::MemBuf,
// Authentication Header methods
bool hasAH() const;
- std::vector<uint8_t> getSignature() const;
+ utils::MemBuf::Ptr getSignature() const;
std::size_t getSignatureFieldSize() const;
std::size_t getSignatureSize() const;
uint64_t getSignatureTimestamp() const;
auth::KeyId getKeyId() const;
auth::CryptoSuite getValidationAlgorithm() const;
- void setSignature(const std::vector<uint8_t> &signature);
+ void setSignature(const utils::MemBuf::Ptr &signature);
void setSignatureFieldSize(std::size_t size);
void setSignatureSize(std::size_t size);
void setSignatureTimestamp(const uint64_t &timestamp_ms);
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/includes/hicn/transport/interfaces/socket_options_default_values.h
index 04454852b..0e19ae629 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_options_default_values.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_options_default_values.h
@@ -53,8 +53,9 @@ static constexpr uint32_t key_locator_size = 60; // bytes
static constexpr uint32_t limit_guard = 80; // bytes
static constexpr uint32_t digest_size = 34; // bytes
static constexpr uint32_t max_out_of_order_segments = 3; // content object
-static constexpr uint32_t max_unverified_delay = 2001; // milliseconds
-static constexpr uint32_t manifest_capacity = 30;
+static constexpr uint32_t unverified_interval = 60000; // milliseconds
+static constexpr double unverified_ratio = 0.2;
+static constexpr uint32_t manifest_capacity = 20;
// RAAQM
static constexpr int sample_number = 30;
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
index 90f218770..6cba50d8b 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
@@ -58,7 +58,6 @@ typedef enum {
INTEREST_LIFETIME = 107,
CONTENT_OBJECT_EXPIRY_TIME = 108,
MAX_SEGMENT_SIZE = 109,
- MAX_UNVERIFIED_TIME = 110,
MIN_WINDOW_SIZE = 111,
MAX_WINDOW_SIZE = 112,
CURRENT_WINDOW_SIZE = 113,
@@ -70,6 +69,8 @@ typedef enum {
HASH_ALGORITHM = 119,
SIGNER = 121,
VERIFIER = 122,
+ UNVERIFIED_INTERVAL = 123,
+ UNVERIFIED_RATIO = 124,
STATS_INTERVAL = 125,
SUFFIX_STRATEGY = 126,
PACKET_FORMAT = 127,
diff --git a/libtransport/src/auth/crypto_hash.cc b/libtransport/src/auth/crypto_hash.cc
index f60f46051..08be47aff 100644
--- a/libtransport/src/auth/crypto_hash.cc
+++ b/libtransport/src/auth/crypto_hash.cc
@@ -14,6 +14,9 @@
*/
#include <hicn/transport/auth/crypto_hash.h>
+#include <hicn/transport/core/global_object_pool.h>
+
+#include "glog/logging.h"
namespace transport {
namespace auth {
@@ -27,18 +30,20 @@ CryptoHash::CryptoHash(const CryptoHash &other)
CryptoHash::CryptoHash(CryptoHash &&other)
: digest_type_(std::move(other.digest_type_)),
- digest_(other.digest_),
- digest_size_(other.digest_size_) {
- other.reset();
-}
+ digest_(std::move(other.digest_)),
+ digest_size_(other.digest_size_) {}
-CryptoHash::CryptoHash(CryptoHashType hash_type) { setType(hash_type); }
+CryptoHash::CryptoHash(CryptoHashType hash_type)
+ : digest_(core::PacketManager<>::getInstance().getMemBuf()) {
+ setType(hash_type);
+}
CryptoHash::CryptoHash(const uint8_t *hash, size_t size,
CryptoHashType hash_type)
: digest_type_(hash_type), digest_size_(size) {
- digest_.resize(size);
- memcpy(digest_.data(), hash, size);
+ digest_ = core::PacketManager<>::getInstance().getMemBuf();
+ digest_->append(size);
+ memcpy(digest_->writableData(), hash, size);
}
CryptoHash::CryptoHash(const std::vector<uint8_t> &hash,
@@ -55,7 +60,7 @@ CryptoHash &CryptoHash::operator=(const CryptoHash &other) {
}
bool CryptoHash::operator==(const CryptoHash &other) const {
- return (digest_type_ == other.digest_type_ && digest_ == other.digest_);
+ return (digest_type_ == other.digest_type_ && *digest_ == *other.digest_);
}
void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) {
@@ -65,8 +70,8 @@ void CryptoHash::computeDigest(const uint8_t *buffer, size_t len) {
throw errors::RuntimeException("Unknown hash type");
}
- EVP_Digest(buffer, len, digest_.data(), (unsigned int *)&digest_size_,
- (*hash_evp)(), nullptr);
+ EVP_Digest(buffer, len, digest_->writableData(),
+ (unsigned int *)&digest_size_, (*hash_evp)(), nullptr);
}
void CryptoHash::computeDigest(const std::vector<uint8_t> &buffer) {
@@ -95,7 +100,7 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) {
p = p->next();
} while (p != buffer);
- if (EVP_DigestFinal_ex(mcdtx, digest_.data(),
+ if (EVP_DigestFinal_ex(mcdtx, digest_->writableData(),
(unsigned int *)&digest_size_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
@@ -103,15 +108,16 @@ void CryptoHash::computeDigest(const utils::MemBuf *buffer) {
EVP_MD_CTX_free(mcdtx);
}
-std::vector<uint8_t> CryptoHash::getDigest() const { return digest_; }
+const utils::MemBuf::Ptr &CryptoHash::getDigest() const { return digest_; }
std::string CryptoHash::getStringDigest() const {
std::stringstream string_digest;
string_digest << std::hex << std::setfill('0');
- for (auto byte : digest_) {
- string_digest << std::hex << std::setw(2) << static_cast<int>(byte);
+ for (size_t i = 0; i < digest_size_; ++i) {
+ string_digest << std::hex << std::setw(2)
+ << static_cast<int>(digest_->data()[i]);
}
return string_digest.str();
@@ -122,10 +128,10 @@ CryptoHashType CryptoHash::getType() const { return digest_type_; }
size_t CryptoHash::getSize() const { return digest_size_; }
void CryptoHash::setType(CryptoHashType hash_type) {
- reset();
digest_type_ = hash_type;
digest_size_ = CryptoHash::getSize(hash_type);
- digest_.resize(digest_size_);
+ DCHECK(digest_size_ <= digest_->tailroom());
+ digest_->setLength(digest_size_);
}
void CryptoHash::display() {
@@ -152,8 +158,8 @@ void CryptoHash::display() {
void CryptoHash::reset() {
digest_type_ = CryptoHashType::UNKNOWN;
- digest_.clear();
digest_size_ = 0;
+ digest_->setLength(0);
}
CryptoHashEVP CryptoHash::getEVP(CryptoHashType hash_type) {
diff --git a/libtransport/src/auth/signer.cc b/libtransport/src/auth/signer.cc
index e74e2f1b8..918e271f5 100644
--- a/libtransport/src/auth/signer.cc
+++ b/libtransport/src/auth/signer.cc
@@ -17,6 +17,8 @@
#include <hicn/transport/auth/signer.h>
#include <hicn/transport/utils/chrono_typedefs.h>
+#include "hicn/transport/core/global_object_pool.h"
+
namespace transport {
namespace auth {
@@ -24,7 +26,10 @@ namespace auth {
// Base Signer
// ---------------------------------------------------------
Signer::Signer()
- : suite_(CryptoSuite::UNKNOWN), signature_len_(0), key_(nullptr) {}
+ : suite_(CryptoSuite::UNKNOWN),
+ signature_(core::PacketManager<>::getInstance().getMemBuf()),
+ signature_len_(0),
+ key_(nullptr) {}
Signer::~Signer() {}
@@ -51,8 +56,8 @@ void Signer::signPacket(PacketPtr packet) {
packet->setValidationAlgorithm(suite_);
// Set key ID
- std::vector<uint8_t> key_id = key_id_.getDigest();
- packet->setKeyId({key_id.data(), key_id.size()});
+ const utils::MemBuf::Ptr &key_id = key_id_.getDigest();
+ packet->setKeyId({key_id->writableData(), key_id->length()});
// Reset fields to compute the packet hash
packet->resetForHash();
@@ -93,14 +98,16 @@ void Signer::signBuffer(const std::vector<uint8_t> &buffer) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
- if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) !=
- 1) {
+ if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(),
+ &signature_len_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
}
void Signer::signBuffer(const utils::MemBuf *buffer) {
@@ -135,24 +142,27 @@ void Signer::signBuffer(const utils::MemBuf *buffer) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
- if (EVP_DigestSignFinal(mdctx.get(), signature_.data(), &signature_len_) !=
- 1) {
+ if (EVP_DigestSignFinal(mdctx.get(), signature_->writableData(),
+ &signature_len_) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
}
-std::vector<uint8_t> Signer::getSignature() const { return signature_; }
+const utils::MemBuf::Ptr &Signer::getSignature() const { return signature_; }
std::string Signer::getStringSignature() const {
std::stringstream string_sig;
string_sig << std::hex << std::setfill('0');
- for (auto byte : signature_) {
- string_sig << std::hex << std::setw(2) << static_cast<int>(byte);
+ for (size_t i = 0; i < signature_len_; ++i) {
+ string_sig << std::hex << std::setw(2)
+ << static_cast<int>(signature_->data()[i]);
}
return string_sig.str();
@@ -193,12 +203,14 @@ void VoidSigner::signBuffer(const utils::MemBuf *buffer) {}
// ---------------------------------------------------------
AsymmetricSigner::AsymmetricSigner(CryptoSuite suite,
std::shared_ptr<EVP_PKEY> key,
- std::shared_ptr<EVP_PKEY> pub_key) {
+ std::shared_ptr<EVP_PKEY> pub_key)
+ : Signer() {
setKey(suite, key, pub_key);
}
AsymmetricSigner::AsymmetricSigner(std::string keystore_path,
- std::string password) {
+ std::string password)
+ : Signer() {
FILE *p12file = fopen(keystore_path.c_str(), "r");
if (p12file == nullptr) {
@@ -230,7 +242,8 @@ void AsymmetricSigner::setKey(CryptoSuite suite, std::shared_ptr<EVP_PKEY> key,
suite_ = suite;
key_ = key;
signature_len_ = EVP_PKEY_size(key.get());
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
std::vector<uint8_t> pbk(i2d_PublicKey(pub_key.get(), nullptr));
uint8_t *pbk_ptr = pbk.data();
@@ -254,7 +267,8 @@ size_t AsymmetricSigner::getSignatureFieldSize() const {
// Symmetric Signer
// ---------------------------------------------------------
SymmetricSigner::SymmetricSigner(CryptoSuite suite,
- const std::string &passphrase) {
+ const std::string &passphrase)
+ : Signer() {
suite_ = suite;
key_ = std::shared_ptr<EVP_PKEY>(
EVP_PKEY_new_raw_private_key(EVP_PKEY_HMAC, nullptr,
@@ -270,7 +284,8 @@ SymmetricSigner::SymmetricSigner(CryptoSuite suite,
}
signature_len_ = EVP_MD_size((*hash_evp)());
- signature_.resize(signature_len_);
+ DCHECK(signature_len_ <= signature_->tailroom());
+ signature_->setLength(signature_len_);
key_id_.computeDigest((uint8_t *)passphrase.c_str(), passphrase.size());
}
diff --git a/libtransport/src/auth/verifier.cc b/libtransport/src/auth/verifier.cc
index 0c35437f3..5d5f01711 100644
--- a/libtransport/src/auth/verifier.cc
+++ b/libtransport/src/auth/verifier.cc
@@ -14,8 +14,11 @@
*/
#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <protocols/errors.h>
+#include "glog/logging.h"
+
namespace transport {
namespace auth {
@@ -49,8 +52,10 @@ bool Verifier::verifyPacket(PacketPtr packet) {
hicn_packet_copy_header(format, packet->packet_start_, &header_copy, false);
// Retrieve packet signature
- std::vector<uint8_t> signature_raw = packet->getSignature();
- signature_raw.resize(packet->getSignatureSize());
+ utils::MemBuf::Ptr signature_raw = packet->getSignature();
+ std::size_t signature_len = packet->getSignatureSize();
+ DCHECK(signature_len <= signature_raw->tailroom());
+ signature_raw->setLength(signature_len);
// Reset fields that are not used to compute signature
packet->resetForHash();
@@ -62,7 +67,7 @@ bool Verifier::verifyPacket(PacketPtr packet) {
// Restore header
hicn_packet_copy_header(format, &header_copy, packet->packet_start_, false);
packet->setSignature(signature_raw);
- packet->setSignatureSize(signature_raw.size());
+ packet->setSignatureSize(signature_raw->length());
return valid_packet;
}
@@ -165,13 +170,13 @@ void Verifier::callVerificationFailedCallback(Suffix suffix,
bool VoidVerifier::verifyPacket(PacketPtr packet) { return true; }
bool VoidVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
return true;
}
bool VoidVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
return true;
}
@@ -232,7 +237,7 @@ void AsymmetricVerifier::useCertificate(std::shared_ptr<X509> cert) {
}
bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -255,12 +260,12 @@ bool AsymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Digest update failed");
}
- return EVP_DigestVerifyFinal(mdctx.get(), signature.data(),
- signature.size()) == 1;
+ return EVP_DigestVerifyFinal(mdctx.get(), signature->data(),
+ signature->length()) == 1;
}
bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -288,8 +293,8 @@ bool AsymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
p = p->next();
} while (p != buffer);
- return EVP_DigestVerifyFinal(mdctx.get(), signature.data(),
- signature.size()) == 1;
+ return EVP_DigestVerifyFinal(mdctx.get(), signature->data(),
+ signature->length()) == 1;
}
// ---------------------------------------------------------
@@ -309,7 +314,7 @@ void SymmetricVerifier::setPassphrase(const std::string &passphrase) {
}
bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -317,7 +322,9 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Unknown hash type");
}
- std::vector<uint8_t> signature_bis(signature.size());
+ const utils::MemBuf::Ptr &signature_bis =
+ core::PacketManager<>::getInstance().getMemBuf();
+ signature_bis->append(signature->length());
size_t signature_bis_len;
std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free);
@@ -334,16 +341,17 @@ bool SymmetricVerifier::verifyBuffer(const std::vector<uint8_t> &buffer,
throw errors::RuntimeException("Digest update failed");
}
- if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(),
+ if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(),
&signature_bis_len) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- return signature == signature_bis && signature.size() == signature_bis_len;
+ return signature->length() == signature_bis_len &&
+ *signature == *signature_bis;
}
bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
- const std::vector<uint8_t> &signature,
+ const utils::MemBuf::Ptr &signature,
CryptoHashType hash_type) {
CryptoHashEVP hash_evp = CryptoHash::getEVP(hash_type);
@@ -352,7 +360,9 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
}
const utils::MemBuf *p = buffer;
- std::vector<uint8_t> signature_bis(signature.size());
+ const utils::MemBuf::Ptr &signature_bis =
+ core::PacketManager<>::getInstance().getMemBuf();
+ signature_bis->append(signature->length());
size_t signature_bis_len;
std::shared_ptr<EVP_MD_CTX> mdctx(EVP_MD_CTX_create(), EVP_MD_CTX_free);
@@ -373,12 +383,13 @@ bool SymmetricVerifier::verifyBuffer(const utils::MemBuf *buffer,
p = p->next();
} while (p != buffer);
- if (EVP_DigestSignFinal(mdctx.get(), signature_bis.data(),
+ if (EVP_DigestSignFinal(mdctx.get(), signature_bis->writableData(),
&signature_bis_len) != 1) {
throw errors::RuntimeException("Digest computation failed");
}
- return signature == signature_bis && signature.size() == signature_bis_len;
+ return signature->length() == signature_bis_len &&
+ *signature == *signature_bis;
}
} // namespace auth
diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h
index 38f26067e..caee210cd 100644
--- a/libtransport/src/core/manifest_format.h
+++ b/libtransport/src/core/manifest_format.h
@@ -18,6 +18,7 @@
#include <hicn/transport/auth/crypto_hash.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <protocols/fec_utils.h>
#include <cinttypes>
#include <type_traits>
@@ -41,11 +42,11 @@ struct ParamsRTC {
std::uint64_t timestamp;
std::uint32_t prod_rate;
std::uint32_t prod_seg;
- std::uint32_t support_fec;
+ protocol::fec::FECType fec_type;
bool operator==(const ParamsRTC &other) const {
return (timestamp == other.timestamp && prod_rate == other.prod_rate &&
- prod_seg == other.prod_seg && support_fec == other.support_fec);
+ prod_seg == other.prod_seg && fec_type == other.fec_type);
}
};
diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc
index 4c8a5e031..428d6ad12 100644
--- a/libtransport/src/core/manifest_format_fixed.cc
+++ b/libtransport/src/core/manifest_format_fixed.cc
@@ -154,22 +154,20 @@ FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl(
.timestamp = params.timestamp,
.prod_rate = params.prod_rate,
.prod_seg = params.prod_seg,
- .support_fec = params.support_fec,
+ .fec_type = static_cast<uint32_t>(params.fec_type),
};
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl(
uint32_t suffix, const auth::CryptoHash &hash) {
- std::vector<uint8_t> _hash = hash.getDigest();
-
manifest_entries_.push_back(ManifestEntry{
.suffix = htonl(suffix),
.hash = {0},
});
std::memcpy(reinterpret_cast<uint8_t *>(manifest_entries_.back().hash),
- _hash.data(), _hash.size());
+ hash.getDigest()->data(), hash.getSize());
if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) {
throw errors::RuntimeException("Manifest size exceeded the packet MTU!");
@@ -301,7 +299,7 @@ ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const {
.timestamp = params_rtc_->timestamp,
.prod_rate = params_rtc_->prod_rate,
.prod_seg = params_rtc_->prod_seg,
- .support_fec = params_rtc_->support_fec,
+ .fec_type = static_cast<protocol::fec::FECType>(params_rtc_->fec_type),
};
}
diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h
index ade4bf02c..5fd2a673d 100644
--- a/libtransport/src/core/manifest_format_fixed.h
+++ b/libtransport/src/core/manifest_format_fixed.h
@@ -65,9 +65,7 @@ namespace core {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Current Segment |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// |F| |
-// + Reserved for future parameters +
-// | |
+// | FEC Type |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Manifest Entry:
@@ -137,7 +135,7 @@ struct __attribute__((__packed__)) TransportParamsRTC {
std::uint64_t timestamp;
std::uint32_t prod_rate;
std::uint32_t prod_seg;
- std::uint32_t support_fec;
+ std::uint32_t fec_type;
};
static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE);
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index df27444af..0c08246af 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -15,6 +15,7 @@
#include <glog/logging.h>
#include <hicn/transport/auth/crypto_hash.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/core/packet.h>
#include <hicn/transport/errors/malformed_packet_exception.h>
#include <hicn/transport/utils/hash.h>
@@ -481,7 +482,7 @@ uint8_t Packet::getTTL() const {
bool Packet::hasAH() const { return _is_ah(format_); }
-std::vector<uint8_t> Packet::getSignature() const {
+utils::MemBuf::Ptr Packet::getSignature() const {
if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
@@ -493,7 +494,11 @@ std::vector<uint8_t> Packet::getSignature() const {
throw errors::RuntimeException("Error getting signature.");
}
- return std::vector<uint8_t>(signature, signature + getSignatureFieldSize());
+ utils::MemBuf::Ptr membuf = PacketManager<>::getInstance().getMemBuf();
+ membuf->append(getSignatureFieldSize());
+ memcpy(membuf->writableData(), signature, getSignatureFieldSize());
+
+ return membuf;
}
std::size_t Packet::getSignatureFieldSize() const {
@@ -570,7 +575,7 @@ auth::CryptoSuite Packet::getValidationAlgorithm() const {
return auth::CryptoSuite(return_value);
}
-void Packet::setSignature(const std::vector<uint8_t> &signature) {
+void Packet::setSignature(const utils::MemBuf::Ptr &signature) {
if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
@@ -580,7 +585,7 @@ void Packet::setSignature(const std::vector<uint8_t> &signature) {
if (ret < 0) {
throw errors::RuntimeException("Error getting signature.");
}
- memcpy(signature_field, signature.data(), signature.size());
+ memcpy(signature_field, signature->data(), signature->length());
}
void Packet::setSignatureFieldSize(std::size_t size) {
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index ebdac7f93..33e70888f 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -56,7 +56,8 @@ class ConsumerSocket : public Socket {
rate_estimation_observer_(nullptr),
rate_estimation_batching_parameter_(default_values::batch),
rate_estimation_choice_(0),
- max_unverified_delay_(default_values::max_unverified_delay),
+ unverified_interval_(default_values::unverified_interval),
+ unverified_ratio_(default_values::unverified_ratio),
verifier_(std::make_shared<auth::VoidVerifier>()),
verify_signature_(false),
reset_window_(false),
@@ -71,7 +72,6 @@ class ConsumerSocket : public Socket {
timer_interval_milliseconds_(0),
recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY),
aggregated_data_(false),
- fec_setting_(""),
guard_raaqm_params_() {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
@@ -197,6 +197,10 @@ class ConsumerSocket : public Socket {
current_window_size_ = socket_option_value;
break;
+ case UNVERIFIED_RATIO:
+ unverified_ratio_ = socket_option_value;
+ break;
+
case GAMMA_VALUE:
gamma_ = socket_option_value;
break;
@@ -238,8 +242,8 @@ class ConsumerSocket : public Socket {
interest_lifetime_ = socket_option_value;
break;
- case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
- max_unverified_delay_ = socket_option_value;
+ case GeneralTransportOptions::UNVERIFIED_INTERVAL:
+ unverified_interval_ = socket_option_value;
break;
case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
@@ -437,11 +441,6 @@ class ConsumerSocket : public Socket {
result = SOCKET_OPTION_SET;
}
break;
- case GeneralTransportOptions::FEC_TYPE:
- fec_setting_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
default:
return result;
}
@@ -507,6 +506,10 @@ class ConsumerSocket : public Socket {
socket_option_value = current_window_size_;
break;
+ case GeneralTransportOptions::UNVERIFIED_RATIO:
+ socket_option_value = unverified_ratio_;
+ break;
+
// RAAQM parameters
case RaaqmTransportOptions::GAMMA_VALUE:
@@ -547,8 +550,8 @@ class ConsumerSocket : public Socket {
socket_option_value = interest_lifetime_;
break;
- case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
- socket_option_value = max_unverified_delay_;
+ case GeneralTransportOptions::UNVERIFIED_INTERVAL:
+ socket_option_value = unverified_interval_;
break;
case RaaqmTransportOptions::SAMPLE_NUMBER:
@@ -702,13 +705,9 @@ class ConsumerSocket : public Socket {
case DataLinkOptions::OUTPUT_INTERFACE:
socket_option_value = output_interface_;
break;
- case GeneralTransportOptions::FEC_TYPE:
- socket_option_value = fec_setting_;
- break;
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
@@ -828,7 +827,8 @@ class ConsumerSocket : public Socket {
int rate_estimation_choice_;
// Verification parameters
- int max_unverified_delay_;
+ uint32_t unverified_interval_;
+ double unverified_ratio_;
std::shared_ptr<auth::Verifier> verifier_;
transport::auth::KeyId *key_id_;
std::atomic_bool verify_signature_;
@@ -857,9 +857,6 @@ class ConsumerSocket : public Socket {
RtcTransportRecoveryStrategies recovery_strategy_;
bool aggregated_data_;
- // FEC setting
- std::string fec_setting_;
-
utils::SpinLock guard_raaqm_params_;
std::string output_interface_;
};
diff --git a/libtransport/src/protocols/fec/fec.cc b/libtransport/src/protocols/fec/fec.cc
index 912e7a40f..5881d4d92 100644
--- a/libtransport/src/protocols/fec/fec.cc
+++ b/libtransport/src/protocols/fec/fec.cc
@@ -719,4 +719,4 @@ int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) {
}
free(m_dec);
return 0;
-}
+} \ No newline at end of file
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index 242abd30d..e49f58167 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -33,13 +33,13 @@ RTCProductionProtocol::RTCProductionProtocol(
implementation::ProducerSocket *icn_socket)
: ProductionProtocol(icn_socket),
current_seg_(1),
+ prev_produced_bytes_(0),
+ prev_produced_packets_(0),
produced_bytes_(0),
produced_packets_(0),
- produced_fec_packets_(0),
- max_packet_production_(1),
- bytes_production_rate_(0),
+ max_packet_production_(UINT32_MAX),
+ bytes_production_rate_(UINT32_MAX),
packets_production_rate_(0),
- fec_packets_production_rate_(0),
last_produced_data_ts_(0),
last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
@@ -116,32 +116,47 @@ void RTCProductionProtocol::scheduleRoundTimer() {
auto sp = self.lock();
if (sp && sp->isRunning()) {
- sp->updateStats();
+ sp->updateStats(true);
}
});
}
-void RTCProductionProtocol::updateStats() {
+void RTCProductionProtocol::updateStats(bool new_round) {
uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t duration = now - last_round_;
- if (duration == 0) duration = 1;
+ if (!new_round) {
+ duration += rtc::PRODUCER_STATS_INTERVAL;
+ } else {
+ prev_produced_bytes_ = 0;
+ prev_produced_packets_ = 0;
+ }
+
double per_second = rtc::MILLI_IN_A_SEC / duration;
uint32_t prev_packets_production_rate = packets_production_rate_;
- bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
- packets_production_rate_ = ceil((double)produced_packets_ * per_second);
- fec_packets_production_rate_ =
- ceil((double)produced_fec_packets_ * per_second);
+ // bytes_production_rate_ does not take into account FEC!!! this is because
+ // each client requests a differen amount of FEC packet so the client itself
+ // increase the production rate in the right way
+ bytes_production_rate_ =
+ ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second);
+ packets_production_rate_ =
+ ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Updating production rate: produced_bytes_ = " << produced_bytes_
- << " bps = " << bytes_production_rate_;
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
// update the production rate as soon as it increases by 10% with respect to
// the last round
max_packet_production_ =
- produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ produced_packets_ + ceil((double)produced_packets_ * 0.10);
if (max_packet_production_ < rtc::WIN_MIN)
max_packet_production_ = rtc::WIN_MIN;
@@ -158,11 +173,14 @@ void RTCProductionProtocol::updateStats() {
sendNacksForPendingInterests();
}
- produced_bytes_ = 0;
- produced_packets_ = 0;
- produced_fec_packets_ = 0;
- last_round_ = now;
- scheduleRoundTimer();
+ if (new_round) {
+ prev_produced_bytes_ = produced_bytes_;
+ prev_produced_packets_ = produced_packets_;
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
+ }
}
uint32_t RTCProductionProtocol::produceStream(
@@ -387,7 +405,7 @@ RTCProductionProtocol::createManifest(const Name &content_name) const {
.timestamp = now,
.prod_rate = bytes_production_rate_,
.prod_seg = current_seg_,
- .support_fec = false,
+ .fec_type = fec_type_,
});
return manifest;
@@ -434,16 +452,13 @@ void RTCProductionProtocol::producePktInternal(
produced_bytes_ +=
content_object->headerSize() + content_object->payloadSize();
produced_packets_++;
- } else {
- produced_fec_packets_++;
}
if (!data_aggregation_ && produced_packets_ >= max_packet_production_) {
// in this case all the pending interests may be used to accomodate the
// sudden increase in the production rate. calling the updateStats we will
// notify all the clients
- round_timer_->cancel();
- updateStats();
+ updateStats(false);
}
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -616,7 +631,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
// if the production rate 0 use delayed nacks
if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = ~0;
+ uint64_t next_timer = UINT64_MAX;
if (!timers_map_.empty()) {
next_timer = timers_map_.begin()->first;
}
@@ -652,8 +667,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
(double)((double)((double)lifetime *
rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_ +
- fec_packets_production_rate_)));
+ (double)(packets_production_rate_)));
if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
sendNack(interest_seg);
@@ -723,20 +737,30 @@ void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
void RTCProductionProtocol::sendNacksForPendingInterests() {
std::unordered_set<uint32_t> to_remove;
- uint32_t packet_gap = 100000; // set it to a high value (100sec)
- if (packets_production_rate_ != 0)
- packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+ uint32_t pps = ceil((double)(packets_production_rate_)*rtc::
+ INTEREST_LIFETIME_REDUCTION_FACTOR);
uint64_t now = utils::SteadyTime::nowMs().count();
-
for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_) {
- uint64_t production_time =
- ((it->first - current_seg_) * packet_gap) + now;
- if (production_time >= it->second) {
+ if (it->first > current_seg_ && it->second > now) {
+ double exp_time_in_sec =
+ (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC;
+ uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec);
+
+ if (it->first > (current_seg_ + packets_prod_before_expire)) {
sendNack(it->first);
to_remove.insert(it->first);
}
+ } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ ||
+ it->second <= now)) {
+ // this branch should never be execcuted
+ // first condition: the packet was already prdocued and we have and old
+ // interest pending. send a nack to notify the consumer if needed. the
+ // case it->first = current_seg_ is not handled because
+ // the interest will be satified by the next data packet.
+ // second condition: the interest is expired.
+ sendNack(it->first);
+ to_remove.insert(it->first);
}
}
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h
index 7f50a2505..c0424a39c 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.h
+++ b/libtransport/src/protocols/prod_protocol_rtc.h
@@ -77,7 +77,7 @@ class RTCProductionProtocol : public ProductionProtocol {
const Name &name) const;
// stats
- void updateStats();
+ void updateStats(bool new_round);
void scheduleRoundTimer();
// pending intersts functions
@@ -108,16 +108,17 @@ class RTCProductionProtocol : public ProductionProtocol {
uint32_t prod_label_; // path label of the producer
uint32_t cache_label_; // path label for content from the producer cache
+ uint32_t prev_produced_bytes_; // XXX clearly explain all these new vars
+ uint32_t prev_produced_packets_;
+
uint32_t produced_bytes_; // bytes produced in the last round
uint32_t produced_packets_; // packet produed in the last round
- uint32_t produced_fec_packets_; // fec packets produced last round
uint32_t max_packet_production_; // never exceed this number of packets
// without update stats
uint32_t bytes_production_rate_; // bytes per sec
uint32_t packets_production_rate_; // pps
- uint32_t fec_packets_production_rate_; // pps
uint64_t last_produced_data_ts_; // ms
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
index abb234757..6a84914ab 100644
--- a/libtransport/src/protocols/rtc/probe_handler.cc
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -36,11 +36,18 @@ ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
ProbeHandler::~ProbeHandler() {}
-uint64_t ProbeHandler::getRtt(uint32_t seq) {
+uint64_t ProbeHandler::getRtt(uint32_t seq, bool is_valid) {
auto it = pending_probes_.find(seq);
if (it == pending_probes_.end()) return 0;
+ if (!is_valid) {
+ // delete the probe anyway
+ pending_probes_.erase(it);
+ valid_batch_ = false;
+ return 0;
+ }
+
uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t rtt = now - it->second;
if (rtt < 1) rtt = 1;
@@ -52,6 +59,7 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) {
}
double ProbeHandler::getProbeLossRate() {
+ if (!valid_batch_) return 1.0;
return 1.0 - ((double)recv_probes_ / (double)sent_probes_);
}
@@ -71,11 +79,26 @@ void ProbeHandler::stopProbes() {
max_probes_ = 0;
sent_probes_ = 0;
recv_probes_ = 0;
+ valid_batch_ = true;
probe_timer_->cancel();
}
void ProbeHandler::sendProbes() {
if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+ auto s = self.lock();
+ if (s) {
+ s->generateProbe();
+ }
+ });
+}
+
+void ProbeHandler::generateProbe() {
+ if (probe_interval_ == 0) return;
if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
uint64_t now = utils::SteadyTime::nowMs().count();
@@ -97,17 +120,7 @@ void ProbeHandler::sendProbes() {
}
}
- if (probe_interval_ == 0) return;
-
- std::weak_ptr<ProbeHandler> self(shared_from_this());
- probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
- probe_timer_->async_wait([self](const std::error_code &ec) {
- if (ec) return;
- auto s = self.lock();
- if (s) {
- s->sendProbes();
- }
- });
+ sendProbes();
}
ProbeType ProbeHandler::getProbeType(uint32_t seq) {
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
index 2de908176..d989194d4 100644
--- a/libtransport/src/protocols/rtc/probe_handler.h
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -42,7 +42,7 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
~ProbeHandler();
// If the function returns 0 the probe is not valid.
- uint64_t getRtt(uint32_t seq);
+ uint64_t getRtt(uint32_t seq, bool is_valid);
// this function may return a residual loss rate higher than the real one if
// we don't wait enough time for the probes to come back
@@ -63,11 +63,17 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
static ProbeType getProbeType(uint32_t seq);
private:
+ void generateProbe();
+
uint32_t probe_interval_; // us
uint32_t max_probes_; // packets
uint32_t sent_probes_; // packets
uint32_t recv_probes_; // packets
+ bool valid_batch_; // if at least one probe in a batch is considered not
+ // valid (e.g. prod rate == ~0) the full batch is invalid.
+ // the bool is set to true when sendProbe is called
+
std::unique_ptr<asio::steady_timer> probe_timer_;
// Map from packet suffixes to timestamp
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index df6522471..d2682edfa 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -63,16 +63,25 @@ std::size_t RTCTransportProtocol::transportHeaderLength() {
// private
void RTCTransportProtocol::initParams() {
TransportProtocol::reset();
- fwd_strategy_.setCallback(on_fwd_strategy_);
-
std::weak_ptr<RTCTransportProtocol> self = shared_from_this();
+ fwd_strategy_.setCallback([self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_fwd_strategy_) (*ptr->on_fwd_strategy_)(strategy);
+ }
+ });
+
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t max_unverified_delay;
- socket_->getSocketOption(GeneralTransportOptions::MAX_UNVERIFIED_TIME,
- max_unverified_delay);
+ uint32_t unverified_interval;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
+ unverified_interval);
+
+ double unverified_ratio;
+ socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
+ unverified_ratio);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -84,8 +93,15 @@ void RTCTransportProtocol::initParams() {
ptr->sendRtxInterest(seq);
}
},
- on_rec_strategy_);
- verifier_ = std::make_shared<RTCVerifier>(verifier, max_unverified_delay);
+ [self](notification::Strategy strategy) {
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (*ptr->on_rec_strategy_) (*ptr->on_rec_strategy_)(strategy);
+ }
+ });
+
+ verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
+ unverified_ratio);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -102,7 +118,6 @@ void RTCTransportProtocol::initParams() {
}
},
portal_->getThread().getIoService());
- state_->initParams();
rc_->setState(state_);
rc_->turnOnRateControl();
@@ -153,21 +168,8 @@ void RTCTransportProtocol::initParams() {
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
RTC_INTEREST_LIFETIME);
- // FEC
- using namespace std::placeholders;
- enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1),
- /* We leave the buffer allocation to the fec decoder */
- fec::FECBase::BufferRequested(0));
-
- if (fec_decoder_) {
- indexer_verifier_->enableFec(fec_type_);
- indexer_verifier_->setNFec(0);
- ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
- fec::FECUtils::getSourceSymbols(fec_type_));
- fec_decoder_->setIOService(portal_->getThread().getIoService());
- } else {
- indexer_verifier_->disableFec();
- }
+ // init state params
+ state_->initParams();
}
// private
@@ -223,29 +225,26 @@ void RTCTransportProtocol::newRound() {
uint32_t received_nacks = state->getReceivedNacksInRound();
uint32_t received_fec = state->getReceivedFecPackets();
- bool in_sync = (ptr->current_state_ == SyncState::in_sync);
- ptr->ldr_->onNewRound(in_sync);
- ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
- ptr->rc_->onNewRound((double)ROUND_LEN);
-
// update sync state if needed
+ double cache_rate = state->getPacketFromCacheRatio();
+ uint32_t round_without_nacks = state->getRoundsWithoutNacks();
+
if (ptr->current_state_ == SyncState::in_sync) {
- double cache_rate = state->getPacketFromCacheRatio();
if (cache_rate > MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::catch_up;
}
} else {
- double target_rate = state->getProducerRate() * PRODUCTION_RATE_FRACTION;
- double received_rate =
- state->getReceivedRate() + state->getRecoveredFecRate();
- uint32_t round_without_nacks = state->getRoundsWithoutNacks();
- double cache_ratio = state->getPacketFromCacheRatio();
if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
- received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ cache_rate < MAX_DATA_FROM_CACHE) {
ptr->current_state_ = SyncState::in_sync;
}
}
+ bool in_sync = (ptr->current_state_ == SyncState::in_sync);
+ ptr->ldr_->onNewRound(in_sync);
+ ptr->state_->onNewRound((double)ROUND_LEN, in_sync);
+ ptr->rc_->onNewRound((double)ROUND_LEN);
+
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Calling updateSyncWindow in newRound function";
ptr->updateSyncWindow();
@@ -335,7 +334,7 @@ void RTCTransportProtocol::updateSyncWindow() {
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- uint32_t buffer = PRODUCER_BUFFER_MS;
+ uint32_t buffer = PRODUCER_BUFFER_MS + ((double)state_->getMinRTT() / 2.0);
current_sync_win_ +=
ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
@@ -360,15 +359,6 @@ void RTCTransportProtocol::updateSyncWindow() {
scheduleNextInterests();
}
-void RTCTransportProtocol::decreaseSyncWindow() {
- // called on future nack
- // we have a new sample of the production rate, so update max win first
- computeMaxSyncWindow();
- current_sync_win_--;
- current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
- scheduleNextInterests();
-}
-
void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
if (!isRunning() && !is_first_) return;
@@ -468,7 +458,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
auto ptr = self.lock();
if (ptr && ptr->isRunning()) {
if (!ptr->scheduler_timer_on_) return;
-
ptr->scheduler_timer_on_ = false;
ptr->scheduleNextInterests();
}
@@ -688,8 +677,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
if (!is_rtx) current_state_ = SyncState::catch_up;
-
- updateSyncWindow();
} else {
// if production_seg == nack_segment we consider this a future nack, since
// production_seg is not yet created. this may happen in case of low
@@ -702,23 +689,50 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// the client is asking for content in the future
// switch to in sync state and decrease the window
current_state_ = SyncState::in_sync;
- decreaseSyncWindow();
}
+ updateSyncWindow();
}
void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
- bool valid = state_->onProbePacketReceived(content_object);
- if (!valid) return;
+ uint32_t suffix = content_object.getName().getSuffix();
+ ParamsRTC params = RTCState::getProbeParams(content_object);
+
+ if (ProbeHandler::getProbeType(suffix) == ProbeType::INIT) {
+ fec::FECType fec_type = params.fec_type;
+
+ if (fec_type != fec::FECType::UNKNOWN && !fec_decoder_) {
+ // Update FEC type
+ fec_type_ = fec_type;
+
+ // Enable FEC
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this,
+ std::placeholders::_1),
+ fec::FECBase::BufferRequested(0));
+
+ // Update FEC parameters
+ indexer_verifier_->enableFec(fec_type);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type),
+ fec::FECUtils::getSourceSymbols(fec_type));
+ fec_decoder_->setIOService(portal_->getThread().getIoService());
+ } else if (fec_type == fec::FECType::UNKNOWN) {
+ indexer_verifier_->disableFec();
+ }
+ }
- uint32_t production_seg = RTCState::getProbeParams(content_object).prod_seg;
+ if (!state_->onProbePacketReceived(content_object)) return;
- // As for the nacks set next_segment
+ // As for NACKs, set next_segment
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "on probe next seg = " << indexer_verifier_->checkNextSuffix()
- << ", jump to " << production_seg;
- indexer_verifier_->jumpToIndex(production_seg);
-
- ldr_->onProbePacketReceived(content_object);
+ << ", jump to " << params.prod_seg;
+ indexer_verifier_->jumpToIndex(params.prod_seg);
+
+ bool loss_detected = ldr_->onProbePacketReceived(content_object);
+ // we are not out of sync here but we are starting to download content from
+ // the cache, maybe beacuse the production rate increased suddenly. for this
+ // reason we put the state to catch up to increase the window
+ if (loss_detected) current_state_ = SyncState::catch_up;
updateSyncWindow();
}
@@ -822,6 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// The packet is considered received, return early
onDataPacketReceived(*content_ptr, compute_stats);
+ // this is a rtx but we may need to feed it in the decoder
+ decodePacket(content_object, is_manifest);
return;
}
@@ -846,18 +862,8 @@ void RTCTransportProtocol::onContentObjectReceived(
state_->dataToBeReceived(segment_number);
}
- // Send packet to FEC decoder
- if (fec_decoder_) {
- DLOG_IF(INFO, VLOG_IS_ON(4))
- << "Send packet " << segment_number << " to FEC decoder";
-
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
- uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
-
- fec_decoder_->onDataPacket(content_object, offset, metadata);
- }
+ // send packet to the decoder
+ decodePacket(content_object, is_manifest);
// We can return early if FEC
if (is_fec) {
@@ -947,6 +953,21 @@ void RTCTransportProtocol::sendStatsToApp(
}
}
+void RTCTransportProtocol::decodePacket(ContentObject &content_object,
+ bool is_manifest) {
+ if (!fec_decoder_) return;
+
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "Send packet " << content_object.getName() << " to FEC decoder";
+
+ uint32_t offset = is_manifest
+ ? content_object.headerSize()
+ : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
+
+ fec_decoder_->onDataPacket(content_object, offset, metadata);
+}
+
void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
Packet::Format format;
socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
index 37706eb1c..3763f33c7 100644
--- a/libtransport/src/protocols/rtc/rtc.h
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -65,7 +65,6 @@ class RTCTransportProtocol : public TransportProtocol {
// window functions
void computeMaxSyncWindow();
void updateSyncWindow();
- void decreaseSyncWindow();
// packet functions
void sendRtxInterest(uint32_t seq);
@@ -89,6 +88,8 @@ class RTCTransportProtocol : public TransportProtocol {
uint32_t received_nacks, uint32_t received_fec);
// FEC functions
+ // send the received content object to the decoder
+ void decodePacket(ContentObject &content_object, bool is_manifest);
void onFecPackets(fec::BufferArray &packets);
// Utils
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
index 03efd8e84..96e39d07e 100644
--- a/libtransport/src/protocols/rtc/rtc_consts.h
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -34,7 +34,7 @@ const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
// increasing this number we increase the time that an
// interest will wait for the data packet to be produced
// at the producer socket
-const uint32_t PRODUCER_BUFFER_MS = 200; // ms
+const uint32_t PRODUCER_BUFFER_MS = 300; // ms
// interest scheduler
// const uint32_t MAX_INTERESTS_IN_BATCH = 5;
@@ -72,7 +72,8 @@ const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
// to get an answer. we wait 100ms between each try
const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
// once we get the first probe we wait at most 60ms for the others
-const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+const uint32_t INIT_RTT_PROBE_WAIT =
+ ((INIT_RTT_PROBES * INIT_RTT_PROBE_INTERVAL) / 1000) * 2; // ms
// we reuires at least 5 probes to be recevied
const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms
const uint32_t MAX_PENDING_PROBES = 10;
@@ -81,7 +82,7 @@ const uint32_t MAX_PENDING_PROBES = 10;
const double MAX_QUEUING_DELAY = 50.0; // ms
// data from cache
-const double MAX_DATA_FROM_CACHE = 0.25; // 25%
+const double MAX_DATA_FROM_CACHE = 0.10; // 10%
// window const
const uint32_t INITIAL_WIN = 5; // pkts
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
index 9503eed3e..c6bc751e6 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc
@@ -35,8 +35,9 @@ RTCForwardingStrategy::RTCForwardingStrategy()
RTCForwardingStrategy::~RTCForwardingStrategy() {}
-void RTCForwardingStrategy::setCallback(interface::StrategyCallback* callback) {
- callback_ = callback;
+void RTCForwardingStrategy::setCallback(
+ interface::StrategyCallback&& callback) {
+ callback_ = std::move(callback);
}
void RTCForwardingStrategy::initFwdStrategy(
@@ -55,27 +56,24 @@ void RTCForwardingStrategy::initFwdStrategy(
}
void RTCForwardingStrategy::checkStrategy() {
- if (*callback_) {
- strategy_t used_strategy = selected_strategy_;
- if (used_strategy == BOTH) used_strategy = current_strategy_;
- assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
- used_strategy == NONE);
-
- notification::ForwardingStrategy strategy =
- notification::ForwardingStrategy::NONE;
- switch (used_strategy) {
- case BEST_PATH:
- strategy = notification::ForwardingStrategy::BEST_PATH;
- break;
- case REPLICATION:
- strategy = notification::ForwardingStrategy::REPLICATION;
- break;
- default:
- break;
- }
-
- (*callback_)(strategy);
+ strategy_t used_strategy = selected_strategy_;
+ if (used_strategy == BOTH) used_strategy = current_strategy_;
+ assert(used_strategy == BEST_PATH || used_strategy == REPLICATION ||
+ used_strategy == NONE);
+
+ notification::ForwardingStrategy strategy =
+ notification::ForwardingStrategy::NONE;
+ switch (used_strategy) {
+ case BEST_PATH:
+ strategy = notification::ForwardingStrategy::BEST_PATH;
+ break;
+ case REPLICATION:
+ strategy = notification::ForwardingStrategy::REPLICATION;
+ break;
+ default:
+ break;
}
+ callback_(strategy);
if (!init_) return;
diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
index 821b28051..9825877fd 100644
--- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h
@@ -44,7 +44,7 @@ class RTCForwardingStrategy {
strategy_t strategy);
void checkStrategy();
- void setCallback(interface::StrategyCallback* callback);
+ void setCallback(interface::StrategyCallback&& callback);
private:
void checkStrategyBestPath();
@@ -68,7 +68,7 @@ class RTCForwardingStrategy {
core::Prefix prefix_;
std::shared_ptr<core::Portal> portal_;
RTCState* state_;
- interface::StrategyCallback* callback_;
+ interface::StrategyCallback callback_;
};
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
index 1ca1cf48d..abf6cda2c 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.cc
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -36,17 +36,17 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
Indexer *indexer, asio::io_service &io_service,
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
- interface::StrategyCallback *external_callback) {
+ interface::StrategyCallback &&external_callback) {
rs_type_ = type;
if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) {
rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) {
rs_ = std::make_shared<RecoveryStrategyDelayBased>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) {
rs_ = std::make_shared<RecoveryStrategyFecOnly>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE ||
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_BESTPATH ||
@@ -55,12 +55,12 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
type == interface::RtcTransportRecoveryStrategies::
LOW_RATE_AND_ALL_FWD_STRATEGIES) {
rs_ = std::make_shared<RecoveryStrategyLowRate>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
} else {
// default
rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY;
rs_ = std::make_shared<RecoveryStrategyRtxOnly>(
- indexer, std::move(callback), io_service, external_callback);
+ indexer, std::move(callback), io_service, std::move(external_callback));
}
}
@@ -97,19 +97,21 @@ void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) {
rs_->onNewRound(in_sync);
}
-void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
+bool RTCLossDetectionAndRecovery::onTimeout(uint32_t seq, bool lost) {
if (!lost) {
- detectLoss(seq, seq + 1);
+ return detectLoss(seq, seq + 1, false);
} else {
rs_->onLostTimeout(seq);
}
+ return false;
}
-void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
+bool RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) {
rs_->receivedPacket(seq);
+ return false;
}
-void RTCLossDetectionAndRecovery::onDataPacketReceived(
+bool RTCLossDetectionAndRecovery::onDataPacketReceived(
const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
bool is_rtx = rs_->isRtx(seq);
@@ -118,10 +120,13 @@ void RTCLossDetectionAndRecovery::onDataPacketReceived(
<< "received data. add from "
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq;
if (!is_rtx)
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq);
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq,
+ false);
+
+ return false;
}
-void RTCLossDetectionAndRecovery::onNackPacketReceived(
+bool RTCLossDetectionAndRecovery::onNackPacketReceived(
const core::ContentObject &nack) {
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
@@ -140,11 +145,18 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived(
<< "received nack. add from "
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
<< production_seq;
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
- production_seq);
+
+ // if it is a future nack store it in the list set of nacked seq
+ if (production_seq <= seq) rs_->receivedFutureNack(seq);
+
+ // call the detectLoss function using the probe flag = true. in fact the
+ // losses detected using nacks are the same as the one detected using probes,
+ // we should not increase the loss counter
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq, true);
}
-void RTCLossDetectionAndRecovery::onProbePacketReceived(
+bool RTCLossDetectionAndRecovery::onProbePacketReceived(
const core::ContentObject &probe) {
// we don't log the reception of a probe packet for the sentinel timer because
// probes are not taken into account into the sync window. we use them as
@@ -157,12 +169,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived(
<< rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to "
<< production_seq;
- detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
- production_seq);
+ return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1,
+ production_seq, true);
}
-void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) {
- if (start >= stop) return;
+bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop,
+ bool recv_probe) {
+ if (start >= stop) return false;
// skip nacked packets
if (start <= rs_->getState()->getLastSeqNacked()) {
@@ -174,13 +187,31 @@ void RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop) {
start = rs_->getState()->getHighestSeqReceivedInOrder() + 1;
}
+ bool loss_detected = false;
for (uint32_t seq = start; seq < stop; seq++) {
if (rs_->getState()->getPacketState(seq) == PacketState::UNKNOWN) {
if (rs_->lossDetected(seq)) {
- rs_->getState()->onLossDetected(seq);
+ loss_detected = true;
+ if ((recv_probe || rs_->wasNacked(seq)) && !rs_->isFecOn()) {
+ // these losses were detected using a probe and fec is off.
+ // in this case most likelly the procotol is about to go out of sync
+ // and the packets are not really lost (e.g. increase in prod rate).
+ // for this reason we do not
+ // count the losses in the stats. Instead we do the following
+ // 1. send RTX for the packets in case they were really lost
+ // 2. return to the RTC protocol that a loss was detected using a
+ // probe. the protocol will switch to catch_up mode to increase the
+ // size of the window
+ rs_->requestPossibleLostPacket(seq);
+ } else {
+ // if fec is on we don't need to mask pontetial losses, so increase
+ // the loss rate
+ rs_->notifyNewLossDetedcted(seq);
+ }
}
}
}
+ return loss_detected;
}
} // namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
index e7f8ce5db..7f683eaa6 100644
--- a/libtransport/src/protocols/rtc/rtc_ldr.h
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -36,7 +36,7 @@ class RTCLossDetectionAndRecovery
RTCLossDetectionAndRecovery(Indexer *indexer, asio::io_service &io_service,
interface::RtcTransportRecoveryStrategies type,
RecoveryStrategy::SendRtxCallback &&callback,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
~RTCLossDetectionAndRecovery();
@@ -47,17 +47,19 @@ class RTCLossDetectionAndRecovery
void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); }
- void turnOnRecovery() { rs_->tunrOnRecovery(); }
+ void turnOnRecovery() { rs_->turnOnRecovery(); }
bool isRtxOn() { return rs_->isRtxOn(); }
void changeRecoveryStrategy(interface::RtcTransportRecoveryStrategies type);
void onNewRound(bool in_sync);
- void onTimeout(uint32_t seq, bool lost);
- void onPacketRecoveredFec(uint32_t seq);
- void onDataPacketReceived(const core::ContentObject &content_object);
- void onNackPacketReceived(const core::ContentObject &nack);
- void onProbePacketReceived(const core::ContentObject &probe);
+
+ // the following functions return true if a loss is detected, false otherwise
+ bool onTimeout(uint32_t seq, bool lost);
+ bool onPacketRecoveredFec(uint32_t seq);
+ bool onDataPacketReceived(const core::ContentObject &content_object);
+ bool onNackPacketReceived(const core::ContentObject &nack);
+ bool onProbePacketReceived(const core::ContentObject &probe);
void clear() { rs_->clear(); }
@@ -67,7 +69,8 @@ class RTCLossDetectionAndRecovery
}
private:
- void detectLoss(uint32_t start, uint32_t stop);
+ // returns true if a loss is detected, false otherwise
+ bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe);
interface::RtcTransportRecoveryStrategies rs_type_;
std::shared_ptr<RecoveryStrategy> rs_;
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
index 888105eab..66ae5086c 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc
@@ -29,20 +29,22 @@ using namespace transport::interface;
RecoveryStrategy::RecoveryStrategy(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- bool use_rtx, bool use_fec, interface::StrategyCallback *external_callback)
+ bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback)
: recovery_on_(false),
+ rtx_during_fec_(0),
next_rtx_timer_(MAX_TIMER_RTX),
send_rtx_callback_(std::move(callback)),
indexer_(indexer),
round_id_(0),
last_fec_used_(0),
- callback_(external_callback) {
+ callback_(std::move(external_callback)) {
setRtxFec(use_rtx, use_fec);
timer_ = std::make_unique<asio::steady_timer>(io_service);
}
RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
- : rtx_state_(std::move(rs.rtx_state_)),
+ : rtx_during_fec_(0),
+ rtx_state_(std::move(rs.rtx_state_)),
rtx_timers_(std::move(rs.rtx_timers_)),
recover_with_fec_(std::move(rs.recover_with_fec_)),
timer_(std::move(rs.timer_)),
@@ -55,7 +57,7 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs)
rc_(std::move(rs.rc_)),
round_id_(std::move(rs.round_id_)),
last_fec_used_(std::move(rs.last_fec_used_)),
- callback_(rs.callback_) {
+ callback_(std::move(rs.callback_)) {
setFecParams(n_, k_);
}
@@ -68,7 +70,7 @@ void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) {
// XXX for the moment we go in steps of 5% loss rate.
// max loss rate = 95%
for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) {
- double dec_loss_rate = (double)loss_rate / 100.0;
+ double dec_loss_rate = (double)(loss_rate + 5) / 100.0;
double exp_losses = (double)k_ * dec_loss_rate;
uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate));
@@ -87,18 +89,39 @@ bool RecoveryStrategy::lossDetected(uint32_t seq) {
return false;
}
- auto it = recover_with_fec_.find(seq);
- if (it != recover_with_fec_.end()) {
+ auto it_fec = recover_with_fec_.find(seq);
+ if (it_fec != recover_with_fec_.end()) {
// this packet is already in list of packets to recover with fec
// this list contians also fec packets that will not be recovered with rtx
return false;
}
- // new loss detected, recover it according to the strategy
- newPacketLoss(seq);
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ // this packet was nacked so we do not use it to determine the loss rate
+ return false;
+ }
+
return true;
}
+void RecoveryStrategy::notifyNewLossDetedcted(uint32_t seq) {
+ // new loss detected
+ // first record the loss. second do what is needed to recover it
+ state_->onLossDetected(seq);
+ newPacketLoss(seq);
+}
+
+void RecoveryStrategy::requestPossibleLostPacket(uint32_t seq) {
+ // these are packets for which we send a RTX but we do not increase the loss
+ // counter beacuse we don't know if they are lost or not
+ addNewRtx(seq, false);
+}
+
+void RecoveryStrategy::receivedFutureNack(uint32_t seq) {
+ nacked_seq_.insert(seq);
+}
+
void RecoveryStrategy::clear() {
rtx_state_.clear();
rtx_timers_.clear();
@@ -236,6 +259,9 @@ void RecoveryStrategy::retransmit() {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "send rtx for sequence " << seq << ", next send in "
<< (rtx_it->second.next_send_ - now);
+
+ // if fec is on increase the number of RTX send during fec
+ if (fec_on_) rtx_during_fec_++;
send_rtx_callback_(seq);
sent_counter++;
}
@@ -306,7 +332,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) {
}
// fec functions
-uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) {
+uint32_t RecoveryStrategy::computeFecPacketsToAsk() {
double loss_rate = state_->getMaxLossRate() * 100; // use loss rate in %
if (loss_rate > 95) loss_rate = 95; // max loss rate
@@ -365,21 +391,25 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk(bool in_sync) {
void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on,
std::optional<bool> fec_on) {
if (rtx_on) rtx_on_ = *rtx_on;
- if (fec_on) fec_on_ = *fec_on;
+ if (fec_on) {
+ if (fec_on_ == false && (*fec_on) == true) { // turn on fec
+ // reset the number of RTX sent during fec
+ rtx_during_fec_ = 0;
+ }
+ fec_on_ = *fec_on;
+ }
- if (*callback_) {
- notification::RecoveryStrategy strategy =
- notification::RecoveryStrategy::RECOVERY_OFF;
+ notification::RecoveryStrategy strategy =
+ notification::RecoveryStrategy::RECOVERY_OFF;
- if (rtx_on_ && fec_on_)
- strategy = notification::RecoveryStrategy::RTX_AND_FEC;
- else if (rtx_on_)
- strategy = notification::RecoveryStrategy::RTX_ONLY;
- else if (fec_on_)
- strategy = notification::RecoveryStrategy::FEC_ONLY;
+ if (rtx_on_ && fec_on_)
+ strategy = notification::RecoveryStrategy::RTX_AND_FEC;
+ else if (rtx_on_)
+ strategy = notification::RecoveryStrategy::RTX_ONLY;
+ else if (fec_on_)
+ strategy = notification::RecoveryStrategy::FEC_ONLY;
- (*callback_)(strategy);
- }
+ callback_(strategy);
}
// common functions
@@ -392,6 +422,12 @@ void RecoveryStrategy::removePacketState(uint32_t seq) {
return;
}
+ auto it_nack = nacked_seq_.find(seq);
+ if (it_nack != nacked_seq_.end()) {
+ nacked_seq_.erase(it_nack);
+ return;
+ }
+
deleteRtx(seq);
}
@@ -406,7 +442,6 @@ void RecoveryStrategy::reduceFec() {
uint32_t bin = ceil(loss_rate / 5.0) - 1;
if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) {
fec_per_loss_rate_[bin].fec_to_ask--;
- // std::cout << "reduce fec to ask for bin " << bin << std::endl;
}
}
}
diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
index 9ffc69a1b..482aedc9d 100644
--- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
+++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h
@@ -44,7 +44,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service, bool use_rtx, bool use_fec,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategy(RecoveryStrategy &&rs);
@@ -56,8 +56,6 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; }
void setFecParams(uint32_t n, uint32_t k);
- void tunrOnRecovery() { recovery_on_ = true; }
-
bool isRtx(uint32_t seq) {
if (rtx_state_.find(seq) != rtx_state_.end()) return true;
return false;
@@ -68,12 +66,22 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
return false;
}
+ bool wasNacked(uint32_t seq) {
+ if (nacked_seq_.find(seq) != nacked_seq_.end()) return true;
+ return false;
+ }
+
bool isRtxOn() { return rtx_on_; }
+ bool isFecOn() { return fec_on_; }
RTCState *getState() { return state_; }
bool lossDetected(uint32_t seq);
+ void notifyNewLossDetedcted(uint32_t seq);
+ void requestPossibleLostPacket(uint32_t seq);
+ void receivedFutureNack(uint32_t seq);
void clear();
+ virtual void turnOnRecovery() = 0;
virtual void onNewRound(bool in_sync) = 0;
virtual void newPacketLoss(uint32_t seq) = 0;
virtual void receivedPacket(uint32_t seq) = 0;
@@ -96,7 +104,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
void deleteRtx(uint32_t seq);
// fec functions
- uint32_t computeFecPacketsToAsk(bool in_sync);
+ uint32_t computeFecPacketsToAsk();
// common functons
void removePacketState(uint32_t seq);
@@ -105,6 +113,12 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
bool rtx_on_;
bool fec_on_;
+ // number of RTX sent after fec turned on
+ // this is used to take into account jitter and out of order packets
+ // if we detect losses but we do not sent any RTX it means that the holes in
+ // the sequence are caused by the jitter
+ uint32_t rtx_during_fec_;
+
// this map keeps track of the retransmitted interest, ordered from the oldest
// to the newest one. the state contains the timer of the first send of the
// interest (from pendingIntetests_), the timer of the next send (key of the
@@ -117,6 +131,13 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
// lost packets that will be recovered with fec
std::unordered_set<uint32_t> recover_with_fec_;
+ // packet for which we recived a future nack
+ // in case we detect a loss for a nacked packet we send an RTX but we do not
+ // increase the loss counter. this is done because it may happen that the
+ // producer rate checkes over time and in flight interest may be satified by
+ // data packet after the reception of nacks
+ std::unordered_set<uint32_t> nacked_seq_;
+
// rtx vars
std::unique_ptr<asio::steady_timer> timer_;
uint64_t next_rtx_timer_;
@@ -144,7 +165,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> {
uint32_t round_id_; // number of rounds
uint32_t last_fec_used_;
std::vector<fec_state_> fec_per_loss_rate_;
- interface::StrategyCallback *callback_;
+ interface::StrategyCallback callback_;
};
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
index e2c60ca77..4be751ec9 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- external_callback), // start with rtx
+ std::move(external_callback)), // start with rtx
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
@@ -37,22 +37,40 @@ RecoveryStrategyDelayBased::RecoveryStrategyDelayBased(RecoveryStrategy &&rs)
setRtxFec(true, false);
// we have to re-init congestion and
// probing
+ switch_rounds_ = 0;
congestion_state_ = false;
probing_state_ = false;
}
RecoveryStrategyDelayBased::~RecoveryStrategyDelayBased() {}
+void RecoveryStrategyDelayBased::turnOnRecovery() {
+ recovery_on_ = true;
+ uint64_t rtt = state_->getMinRTT();
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (rtt > 80 && fec_to_ask != 0) {
+ // we need to start FEC (see fec only strategy for more details)
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // use RTX
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
+ }
+}
+
void RecoveryStrategyDelayBased::softSwitchToFec(uint32_t fec_to_ask) {
if (fec_to_ask == 0) {
setRtxFec(true, false);
switch_rounds_ = 0;
} else {
switch_rounds_++;
- if (switch_rounds_ >= 5) {
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ != 0) { // go to fec only if it is needed (RTX are on)
setRtxFec(false, true);
} else {
- setRtxFec({}, true);
+ setRtxFec(true, true);
}
}
}
@@ -76,9 +94,13 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) {
// switch from rtx to fec or keep use fec. Notice that if some rtx are
// waiting to be scheduled, they will be sent normally, but no new rtx will
// be created If the loss rate is 0 keep to use RTX.
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
softSwitchToFec(fec_to_ask);
- indexer_->setNFec(fec_to_ask);
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
return;
}
@@ -112,7 +134,7 @@ void RecoveryStrategyDelayBased::probing() {
// for the moment ask for all fec and exit the probing phase
probing_state_ = false;
setRtxFec(false, true);
- indexer_->setNFec(computeFecPacketsToAsk(true));
+ indexer_->setNFec(computeFecPacketsToAsk());
}
} // end namespace rtc
diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h
index 0dd199965..5ca90f4cb 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_delay.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h
@@ -26,12 +26,13 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy {
public:
RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyDelayBased(RecoveryStrategy &&rs);
~RecoveryStrategyDelayBased();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
index 36d8e39f0..c44212bda 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc
@@ -25,22 +25,40 @@ namespace rtc {
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
- : RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
- external_callback),
+ interface::StrategyCallback &&external_callback)
+ : RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
+ std::move(external_callback)),
congestion_state_(false),
probing_state_(false),
switch_rounds_(0) {}
RecoveryStrategyFecOnly::RecoveryStrategyFecOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
- setRtxFec(false, true);
+ setRtxFec(true, false);
+ switch_rounds_ = 0;
congestion_state_ = false;
probing_state_ = false;
}
RecoveryStrategyFecOnly::~RecoveryStrategyFecOnly() {}
+void RecoveryStrategyFecOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ // init strategy
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
+ if (fec_to_ask > 0) {
+ // the probing phase detected a lossy link. we immedialty start the fec and
+ // we disable the check to prevent to send fec packets before RTX. in fact
+ // here we know that we have losses and it is not a problem of OOO packets
+ setRtxFec(true, true);
+ rtx_during_fec_ = 1; // avoid to stop fec
+ indexer_->setNFec(fec_to_ask);
+ } else {
+ // keep only RTX on
+ setRtxFec(true, true);
+ }
+}
+
void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
if (!recovery_on_) {
indexer_->setNFec(0);
@@ -66,7 +84,7 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
if (probing_state_) {
probing();
} else {
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
// If fec_to_ask == 0 we use rtx even if in these strategy we use only fec.
// In this way the first packet loss that triggers the usage of fec can be
// recovered using rtx, otherwise it will always be lost
@@ -75,13 +93,19 @@ void RecoveryStrategyFecOnly::onNewRound(bool in_sync) {
switch_rounds_ = 0;
} else {
switch_rounds_++;
- if (switch_rounds_ >= 5) {
+ if (switch_rounds_ >= ((RTC_INTEREST_LIFETIME / ROUND_LEN) * 2) &&
+ rtx_during_fec_ !=
+ 0) { // go to fec only if it is needed (RTX are on)
setRtxFec(false, true);
} else {
- setRtxFec({}, true);
+ setRtxFec(true, true); // keep both
}
}
- indexer_->setNFec(fec_to_ask);
+ if (rtx_during_fec_ == 0) // if we do not send any RTX the losses
+ // registered may be due to jitter
+ indexer_->setNFec(0);
+ else
+ indexer_->setNFec(fec_to_ask);
}
}
@@ -92,7 +116,7 @@ void RecoveryStrategyFecOnly::newPacketLoss(uint32_t seq) {
if (!state_->isPending(seq) && !indexer_->isFec(seq)) {
addNewRtx(seq, true);
} else {
- // if not pending add rtc
+ // if not pending add to list to recover with fec
recover_with_fec_.insert(seq);
state_->onPossibleLossWithNoRtx(seq);
}
@@ -107,7 +131,7 @@ void RecoveryStrategyFecOnly::probing() {
// TODO
// for the moment ask for all fec and exit the probing phase
probing_state_ = false;
- uint32_t fec_to_ask = computeFecPacketsToAsk(true);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
indexer_->setNFec(fec_to_ask);
}
diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
index 37b505d35..1ab78b842 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h
@@ -26,12 +26,13 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy {
public:
RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyFecOnly(RecoveryStrategy &&rs);
~RecoveryStrategyFecOnly();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
index bd153d209..48dd3e34f 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyLowRate::RecoveryStrategyLowRate(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, true,
- external_callback), // start with fec
+ std::move(external_callback)), // start with fec
fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec
rtx_allowed_consecutive_rounds_(0) {
initSwitchVector();
@@ -66,7 +66,7 @@ void RecoveryStrategyLowRate::setRecoveryParameters(bool use_rtx, bool use_fec,
}
void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
- uint32_t fec_to_ask = computeFecPacketsToAsk(in_sync);
+ uint32_t fec_to_ask = computeFecPacketsToAsk();
if (fec_to_ask == 0) {
// fec is off, turn on RTX immediatly to avoid packet losses
setRecoveryParameters(true, false, 0);
@@ -128,6 +128,11 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) {
}
}
+void RecoveryStrategyLowRate::turnOnRecovery() {
+ recovery_on_ = 1;
+ // the stategy will be init in the new round function
+}
+
void RecoveryStrategyLowRate::onNewRound(bool in_sync) {
if (!recovery_on_) {
// disable fec so that no extra packet will be sent
diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
index f0c7bd0d5..d66b197e2 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h
@@ -34,12 +34,13 @@ class RecoveryStrategyLowRate : public RecoveryStrategy {
public:
RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyLowRate(RecoveryStrategy &&rs);
~RecoveryStrategyLowRate();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
index 499e978f1..16b14eff6 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, false, false,
- external_callback) {}
+ std::move(external_callback)) {}
RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
@@ -36,6 +36,10 @@ RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs)
RecoveryStrategyRecoveryOff::~RecoveryStrategyRecoveryOff() {}
+void RecoveryStrategyRecoveryOff::turnOnRecovery() {
+ // nothing to do
+ return;
+}
void RecoveryStrategyRecoveryOff::onNewRound(bool in_sync) {
// nothing to do
return;
diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
index 98cd1e6a5..3a9e71e7d 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h
@@ -26,12 +26,13 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy {
public:
RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs);
~RecoveryStrategyRecoveryOff();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
index c1ae9b53d..8e5db5439 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc
@@ -25,9 +25,9 @@ namespace rtc {
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(
Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service,
- interface::StrategyCallback *external_callback)
+ interface::StrategyCallback &&external_callback)
: RecoveryStrategy(indexer, std::move(callback), io_service, true, false,
- external_callback) {}
+ std::move(external_callback)) {}
RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
: RecoveryStrategy(std::move(rs)) {
@@ -36,6 +36,11 @@ RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs)
RecoveryStrategyRtxOnly::~RecoveryStrategyRtxOnly() {}
+void RecoveryStrategyRtxOnly::turnOnRecovery() {
+ recovery_on_ = true;
+ setRtxFec(true, false);
+}
+
void RecoveryStrategyRtxOnly::onNewRound(bool in_sync) {
// nothing to do
return;
diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
index 7ae909454..e90e5ba13 100644
--- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
+++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h
@@ -26,12 +26,13 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy {
public:
RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback,
asio::io_service &io_service,
- interface::StrategyCallback *external_callback);
+ interface::StrategyCallback &&external_callback);
RecoveryStrategyRtxOnly(RecoveryStrategy &&rs);
~RecoveryStrategyRtxOnly();
+ void turnOnRecovery();
void onNewRound(bool in_sync);
void newPacketLoss(uint32_t seq);
void receivedPacket(uint32_t seq);
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 6a21531f8..5b3b5e4c3 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -56,7 +56,6 @@ void RTCState::initParams() {
last_seq_nacked_ = 0;
loss_rate_ = 0.0;
avg_loss_rate_ = -1.0;
- max_loss_rate_ = 0.0;
last_round_loss_rate_ = 0.0;
// loss rate per sec
@@ -85,14 +84,13 @@ void RTCState::initParams() {
fec_recovered_rate_ = 0.0;
// nack counter
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
// packets counter
received_packets_last_round_ = 0;
received_data_last_round_ = 0;
received_data_from_cache_ = 0;
- data_from_cache_rate_ = 0;
sent_interests_last_round_ = 0;
sent_rtx_last_round_ = 0;
@@ -103,7 +101,7 @@ void RTCState::initParams() {
last_production_seq_ = 0;
producer_is_active_ = false;
- last_prod_update_ = 0;
+ last_prod_update_seq_ = 0;
// paths stats
path_table_.clear();
@@ -180,7 +178,9 @@ void RTCState::onLossDetected(uint32_t seq) {
PacketState state = getPacketState(seq);
// if the packet is already marked with a state, do nothing
- if (state == PacketState::UNKNOWN) {
+ // to be considered lost the packet must be pending
+ if (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end()) {
packets_lost_++;
addToPacketCache(seq, PacketState::LOST);
}
@@ -225,8 +225,8 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
core::ParamsRTC params = RTCState::getDataParams(content_object);
- if (last_prod_update_ < params.timestamp) {
- last_prod_update_ = params.timestamp;
+ if (last_prod_update_seq_ < seq) {
+ last_prod_update_seq_ = seq;
production_rate_ = (double)params.prod_rate;
}
@@ -267,14 +267,12 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
uint32_t seq = nack.getName().getSuffix();
struct nack_packet_t *nack_pkt =
(struct nack_packet_t *)nack.getPayload()->data();
- uint64_t production_time = nack_pkt->getTimestamp();
uint32_t production_seq = nack_pkt->getProductionSegment();
uint32_t production_rate = nack_pkt->getProductionRate();
if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
- last_prod_update_ < production_time) {
+ last_prod_update_seq_ < production_seq) {
// update production rate
- last_prod_update_ = production_time;
last_production_seq_ = production_seq;
production_rate_ = (double)production_rate;
}
@@ -282,7 +280,6 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (compute_stats) {
// this is not an RTX
updatePathStats(nack, true);
- nack_on_last_round_ = true;
}
// for statistics pourpose we log all nacks, also the one received for
@@ -297,6 +294,7 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "lost packet " << seq << " beacuse of a past nack";
+ if (compute_stats) past_nack_on_last_round_ = true;
onPacketLost(seq);
} else if (seq > production_seq) {
// future nack
@@ -317,29 +315,15 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack,
}
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence number is not 1
- if (production_rate_ != 0 || production_seq != 1) {
- producer_is_active_ = true;
- }
-
received_packets_last_round_++;
}
void RTCState::onPacketLost(uint32_t seq) {
-#if 0
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost";
- auto it = pending_interests_.find(seq);
- if (it != pending_interests_.end()) {
- // this packet was never retransmitted so it does
- // not appear in the loss count
- packets_lost_++;
- }
-#endif
if (!indexer_->isFec(seq)) {
PacketState state = getPacketState(seq);
- if (state == PacketState::LOST || state == PacketState::UNKNOWN) {
+ if (state == PacketState::LOST ||
+ (state == PacketState::UNKNOWN &&
+ pending_interests_.find(seq) != pending_interests_.end())) {
definitely_lost_pkt_++;
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
@@ -350,7 +334,12 @@ void RTCState::onPacketLost(uint32_t seq) {
void RTCState::onPacketRecoveredRtx(uint32_t seq) {
packets_sent_to_app_++;
if (seq > highest_seq_received_) highest_seq_received_ = seq;
- losses_recovered_++;
+
+ // increase the recovered packet counter only if the packet was marked as LOST
+ // before.
+ PacketState state = getPacketState(seq);
+ if (state == PacketState::LOST) losses_recovered_++;
+
addRecvOrLost(seq, PacketState::RECEIVED);
}
@@ -371,19 +360,30 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
- if (getPacketState(seq) == PacketState::UNKNOWN)
- onLossDetected(seq); // the pkt was lost but didn't account for it yet
+ // the packet could be not marked as lost yet. onLossDetected checks if add in
+ // the packet in the lost count or not
+ onLossDetected(seq);
addRecvOrLost(seq, PacketState::RECEIVED);
}
bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
+ core::ParamsRTC params = RTCState::getProbeParams(probe);
+
+ bool is_valid = true;
+ uint32_t max = UINT32_MAX;
+ if (params.prod_rate == max) is_valid = false;
uint64_t rtt;
- rtt = probe_handler_->getRtt(seq);
+ rtt = probe_handler_->getRtt(seq, is_valid);
if (rtt == 0) return false; // this is not a valid probe
+ if (!is_valid) return false; // not a valid probe
+
+ // if we are here the producer is active
+ producer_is_active_ = true;
+
// Like for data and nacks update the path stats. Here the RTT is computed
// by the probe handler. Both probes for rtt and bw are good to estimate
// info on the path.
@@ -406,24 +406,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
uint64_t now = utils::SteadyTime::nowMs().count();
- core::ParamsRTC params = RTCState::getProbeParams(probe);
-
int64_t OWD = now - params.timestamp;
path->insertOwdSample(OWD);
- if (last_prod_update_ < params.timestamp) {
+ if (last_prod_update_seq_ < params.prod_seg) {
last_production_seq_ = params.prod_seg;
- last_prod_update_ = params.timestamp;
production_rate_ = (double)params.prod_rate;
}
- // the producer is responding
- // we consider it active only if the production rate is not 0
- // or the production sequence numner is not 1
- if (production_rate_ != 0 || params.prod_seg != 1) {
- producer_is_active_ = true;
- }
-
// check for init RTT. if received_probes_ is equal to 0 schedule a timer to
// wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
// wait forever
@@ -453,12 +443,12 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
void RTCState::onJumpForward(uint32_t next_seq) {
for (uint32_t seq = highest_seq_received_in_order_ + 1; seq < next_seq;
seq++) {
- auto it = pending_interests_.find(seq);
PacketState packet_state = getPacketState(seq);
- if (it == pending_interests_.end() &&
- packet_state != PacketState::RECEIVED &&
+ if (packet_state != PacketState::RECEIVED &&
packet_state != PacketState::DEFINITELY_LOST) {
- onLossDetected(seq);
+ // here we considere the packet as definitely lost whitout increase the
+ // lost packet counter because this loss is not due to the network
+ // condition but the transport wants to skip the packet
onPacketLost(seq);
}
}
@@ -491,29 +481,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
fec_recovered_rate_ =
(fec_recovered_rate_ * 0.8) + (0.2 * fec_recovered_bytes_per_sec);
-#if 0
- // search for an active path. There should be only one active path (meaning a
- // path that leads to the producer socket -no cache- and from which we are
- // currently getting data packets) at any time. However it may happen that
- // there are mulitple active paths in case of mobility (the old path will
- // remain active for a short ammount of time). The main path is selected as
- // the active path from where the consumer received the latest data packet
-
- uint64_t last_packet_ts = 0;
- main_path_ = nullptr;
-
- for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->roundEnd();
- if (it->second->isActive()) {
- uint64_t ts = it->second->getLastPacketTS();
- if (ts > last_packet_ts) {
- last_packet_ts = ts;
- main_path_ = it->second;
- }
- }
- }
-#endif
-
// search for an active path. Is it possible to have multiple path that are
// used at the same time. We use as reference path the one from where we gets
// more packets. This means that the path should have better lantecy or less
@@ -544,7 +511,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
updateLossRate(in_sync);
// handle nacks
- if (!nack_on_last_round_ && received_bytes_ > 0) {
+ if (!past_nack_on_last_round_ && received_bytes_ > 0) {
rounds_without_nacks_++;
} else {
rounds_without_nacks_ = 0;
@@ -561,14 +528,6 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
}
}
- // compute cache/producer ratio
- if (received_data_last_round_ != 0) {
- double new_rate =
- (double)received_data_from_cache_ / (double)received_data_last_round_;
- data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
- (new_rate * (1 - MOVING_AVG_ALPHA));
- }
-
// reset counters
received_bytes_ = 0;
received_fec_bytes_ = 0;
@@ -578,7 +537,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
losses_recovered_ = 0;
first_seq_in_round_ = highest_seq_received_;
- nack_on_last_round_ = false;
+ past_nack_on_last_round_ = false;
received_nacks_last_round_ = 0;
received_packets_last_round_ = 0;
@@ -700,6 +659,7 @@ void RTCState::updateLossRate(bool in_sync) {
expected_packets_ +=
((highest_seq_received_ - first_seq_in_round_) - fec_packets);
} else {
+ expected_packets_ = 0;
packets_sent_to_app_ = 0;
}
@@ -715,7 +675,6 @@ void RTCState::updateLossRate(bool in_sync) {
(double)((double)(lost_per_sec_) / (double)total_expected_packets_);
loss_history_.pushBack(per_sec_loss_rate_);
- max_loss_rate_ = getMaxLoss();
if (in_sync && expected_packets_ != 0) {
// compute residual loss rate
@@ -778,7 +737,9 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
// however we may need to increse the number or lost packets
// XXX: in case we want to use rtx to recover fec packets,
// this may prevent to detect a packet loss and no rtx will be sent
- onLossDetected(i);
+ if (TRANSPORT_EXPECT_TRUE(i >= first_interest_sent_seq_)) {
+ onLossDetected(i);
+ }
} else {
// this is a data packet and we need to get it
break;
@@ -806,8 +767,9 @@ void RTCState::setInitRttTimer(uint32_t wait) {
}
void RTCState::checkInitRttTimer() {
- if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) {
- // we didn't received enough probes, restart
+ if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV ||
+ probe_handler_->getProbeLossRate() == 1.0) {
+ // we didn't received enough probes or they were not valid, restart
received_probes_ = 0;
probe_handler_->setSuffixRange(MIN_INIT_PROBE_SEQ, MAX_INIT_PROBE_SEQ);
probe_handler_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
@@ -819,7 +781,6 @@ void RTCState::checkInitRttTimer() {
init_rtt_ = true;
main_path_->roundEnd();
loss_history_.pushBack(probe_handler_->getProbeLossRate());
- max_loss_rate_ = getMaxLoss();
probe_handler_->setSuffixRange(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ);
probe_handler_->setProbes(RTT_PROBE_INTERVAL, 0);
@@ -829,17 +790,12 @@ void RTCState::checkInitRttTimer() {
double prod_rate = getProducerRate();
double rtt = (double)getMinRTT() / MILLI_IN_A_SEC;
double packet_size = getAveragePacketSize();
- uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt));
last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
discovered_rtt_callback_();
}
-double RTCState::getMaxLoss() {
- if (loss_history_.size() != 0) return loss_history_.begin();
- return 0;
-}
-
core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
uint32_t seq = probe.getName().getSuffix();
core::ParamsRTC params;
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
index 8bf48ccc2..4bd2f76a0 100644
--- a/libtransport/src/protocols/rtc/rtc_state.h
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -162,7 +162,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
double getPerRoundLossRate() const { return loss_rate_; }
double getPerSecondLossRate() const { return per_sec_loss_rate_; }
double getAvgLossRate() const { return avg_loss_rate_; }
- double getMaxLossRate() const { return max_loss_rate_; }
+ double getMaxLossRate() const {
+ if (loss_history_.size() != 0) return loss_history_.begin();
+ return 0;
+ }
+
double getLastRoundLossRate() const { return last_round_loss_rate_; }
double getResidualLossRate() const { return residual_loss_rate_; }
@@ -177,8 +181,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
return highest_seq_received_in_order_;
}
- double getMaxLoss();
-
// fec packets
uint32_t getReceivedFecPackets() const { return received_fec_pkt_; }
uint32_t getPendingFecPackets() const { return pending_fec_pkt_; }
@@ -213,7 +215,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
bool isProducerActive() const { return producer_is_active_; }
// packets from cache
- double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+ // this should be called at the end of a round beacuse otherwise we may have
+ // not enough packets to get a good stat
+ double getPacketFromCacheRatio() const {
+ if (received_data_last_round_ == 0) return 0;
+ return (double)received_data_from_cache_ /
+ (double)received_data_last_round_;
+ }
PendingInterestsMap::iterator getPendingInterestsMapBegin() {
return pending_interests_.begin();
@@ -286,7 +294,6 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
uint32_t last_seq_nacked_; // segment for which we got an oldNack
double loss_rate_;
double avg_loss_rate_;
- double max_loss_rate_;
double last_round_loss_rate_;
utils::MaxFilter<double> loss_history_;
@@ -314,17 +321,16 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
double fec_recovered_rate_; // rate recovered using fec
// nack counters
- // the bool takes tracks only about the valid nacks (no rtx) and it is used to
- // switch between the states. Instead received_nacks_last_round_ logs all the
- // nacks for statistics
- bool nack_on_last_round_;
+ // the bool takes tracks only about the valid past nacks (no rtx) and it is
+ // used to switch between the states. Instead received_nacks_last_round_ logs
+ // all the nacks for statistics
+ bool past_nack_on_last_round_;
uint32_t received_nacks_last_round_;
// packets counters
uint32_t received_packets_last_round_;
uint32_t received_data_last_round_;
uint32_t received_data_from_cache_;
- double data_from_cache_rate_;
uint32_t sent_interests_last_round_;
uint32_t sent_rtx_last_round_;
@@ -344,10 +350,13 @@ class RTCState : public std::enable_shared_from_this<RTCState> {
// producer state
bool
producer_is_active_; // the prodcuer is active if we receive some packets
- uint32_t
- last_production_seq_; // last production seq received by the producer
- uint64_t last_prod_update_; // timestamp of the last packets used to update
- // stats from the producer
+ uint32_t last_production_seq_; // last production seq received by the
+ // producer used to init the sync protcol
+ uint32_t last_prod_update_seq_; // seq number of the last packet used to
+ // update the update from the producer.
+ // assumption: the highest seq number carries
+ // the most up to date info. in case of
+ // probes we look at the produced seq number
// paths stats
std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc
index 29968dd02..7b6330a1f 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.cc
+++ b/libtransport/src/protocols/rtc/rtc_verifier.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -22,8 +22,11 @@ namespace protocol {
namespace rtc {
RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_delay)
- : verifier_(verifier), max_unverified_delay_(max_unverified_delay) {}
+ uint32_t max_unverified_interval,
+ double max_unverified_ratio)
+ : verifier_(verifier),
+ max_unverified_interval_(max_unverified_interval),
+ max_unverified_ratio_(max_unverified_ratio) {}
void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) {
rtc_state_ = rtc_state;
@@ -33,15 +36,20 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) {
verifier_ = verifier;
}
-void RTCVerifier::setMaxUnverifiedDelay(uint32_t max_unverified_delay) {
- max_unverified_delay_ = max_unverified_delay;
+void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) {
+ max_unverified_interval_ = max_unverified_interval;
+}
+
+void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) {
+ max_unverified_ratio_ = max_unverified_ratio;
}
auth::VerificationPolicy RTCVerifier::verify(
core::ContentObject &content_object, bool is_fec) {
- uint32_t suffix = content_object.getName().getSuffix();
- core::PayloadType payload_type = content_object.getPayloadType();
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy default_policy = auth::VerificationPolicy::ABORT;
+ core::PayloadType payload_type = content_object.getPayloadType();
bool is_probe = ProbeHandler::getProbeType(suffix) != ProbeType::NOT_PROBE;
bool is_nack = !is_probe && content_object.payloadSize() == NACK_HEADER_SIZE;
bool is_manifest = !is_probe && !is_nack && !is_fec &&
@@ -55,29 +63,31 @@ auth::VerificationPolicy RTCVerifier::verify(
if (is_data) return verifyData(content_object);
if (is_manifest) return verifyManifest(content_object);
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
+ verifier_->callVerificationFailedCallback(suffix, default_policy);
+ return default_policy;
}
auth::VerificationPolicy RTCVerifier::verifyProbe(
core::ContentObject &content_object) {
- switch (ProbeHandler::getProbeType(content_object.getName().getSuffix())) {
- case ProbeType::INIT: {
- auth::VerificationPolicy policy = verifyManifest(content_object);
- if (policy != auth::VerificationPolicy::ACCEPT) {
- return policy;
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+
+ switch (ProbeHandler::getProbeType(suffix)) {
+ case ProbeType::INIT:
+ policy = verifyManifest(content_object);
+ if (policy == auth::VerificationPolicy::ACCEPT) {
+ policy = processManifest(content_object);
}
- return processManifest(content_object);
- }
+ break;
case ProbeType::RTT:
- return verifyNack(content_object);
+ policy = verifyNack(content_object);
+ break;
default:
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(
- content_object.getName().getSuffix(), policy);
- return policy;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ break;
}
+
+ return policy;
}
auth::VerificationPolicy RTCVerifier::verifyNack(
@@ -92,28 +102,30 @@ auth::VerificationPolicy RTCVerifier::verifyFec(
auth::VerificationPolicy RTCVerifier::verifyData(
core::ContentObject &content_object) {
- uint32_t suffix = content_object.getName().getSuffix();
-
if (_is_ah(content_object.getFormat())) {
return verifier_->verifyPackets(&content_object);
}
- unverified_bytes_[suffix] =
- content_object.headerSize() + content_object.payloadSize();
- unverified_packets_[suffix] =
- content_object.computeDigest(manifest_hash_algo_);
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
+ Timestamp now = utils::SteadyTime::nowMs().count();
+
+ // Flush old packets
+ Timestamp oldest = flush_packets(now);
- // An alert is raised when too much packets remain unverified
- if (getTotalUnverified() > max_unverified_bytes_) {
- unverified_bytes_.clear();
- unverified_packets_.clear();
+ // Add packet to map of unverified packets
+ packets_unverif_.add(
+ {.suffix = suffix, .timestamp = now, .size = content_object.length()},
+ content_object.computeDigest(manifest_hash_algo_));
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
+ // Check that the ratio of unverified packets stays below the limit
+ if (now - oldest < max_unverified_interval_ ||
+ getBufferRatio() < max_unverified_ratio_) {
+ policy = auth::VerificationPolicy::ACCEPT;
}
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, policy);
+ return policy;
}
auth::VerificationPolicy RTCVerifier::verifyManifest(
@@ -123,8 +135,10 @@ auth::VerificationPolicy RTCVerifier::verifyManifest(
auth::VerificationPolicy RTCVerifier::processManifest(
core::ContentObject &content_object) {
- uint32_t suffix = content_object.getName().getSuffix();
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT;
+ // Decode manifest
core::ContentObjectManifest manifest(content_object);
manifest.decode();
@@ -133,65 +147,62 @@ auth::VerificationPolicy RTCVerifier::processManifest(
last_manifest_ = suffix;
}
- // Extract parameters
+ // Extract hash algorithm and hashes
manifest_hash_algo_ = manifest.getHashAlgorithm();
- core::ParamsRTC params = manifest.getParamsRTC();
-
- if (params.prod_rate > 0) {
- max_unverified_bytes_ = static_cast<uint64_t>(
- (max_unverified_delay_ / 1000.0) * params.prod_rate);
- }
-
- if (max_unverified_bytes_ == 0 || !rtc_state_) {
- auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT;
- verifier_->callVerificationFailedCallback(suffix, policy);
- return policy;
- }
-
- // Extract hashes
auth::Verifier::SuffixMap suffix_map =
core::ContentObjectManifest::getSuffixMap(&manifest);
// Return early if the manifest is empty
if (suffix_map.empty()) {
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
}
- // Remove lost packets from digest map
+ // Add hashes to map of all manifest hashes
manifest_digests_.insert(suffix_map.begin(), suffix_map.end());
+
+ // Remove discarded and definitely lost packets from digest map
for (auto it = manifest_digests_.begin(); it != manifest_digests_.end();) {
+ auto it_erased = packets_unverif_erased_.find(it->first);
+
+ if (it_erased != packets_unverif_erased_.end()) {
+ packets_unverif_erased_.erase(it_erased);
+ it = manifest_digests_.erase(it);
+ continue;
+ }
+
if (rtc_state_->getPacketState(it->first) == PacketState::DEFINITELY_LOST) {
- unverified_packets_.erase(it->first);
- unverified_bytes_.erase(it->first);
it = manifest_digests_.erase(it);
- } else {
- ++it;
+ continue;
}
+
+ ++it;
}
// Verify packets
auth::Verifier::PolicyMap policies =
- verifier_->verifyHashes(unverified_packets_, manifest_digests_);
+ verifier_->verifyHashes(packets_unverif_.suffixMap(), manifest_digests_);
- for (const auto &policy : policies) {
- switch (policy.second) {
+ for (const auto &p : policies) {
+ switch (p.second) {
case auth::VerificationPolicy::ACCEPT: {
- manifest_digests_.erase(policy.first);
- unverified_packets_.erase(policy.first);
- unverified_bytes_.erase(policy.first);
+ auto packet_unverif_it = packets_unverif_.packetIt(p.first);
+ Packet packet_verif = *packet_unverif_it;
+ packets_unverif_.remove(packet_unverif_it);
+ packets_verif_.add(packet_verif);
+ manifest_digests_.erase(p.first);
break;
}
case auth::VerificationPolicy::UNKNOWN:
break;
case auth::VerificationPolicy::DROP:
case auth::VerificationPolicy::ABORT:
- auth::VerificationPolicy p = policy.second;
- verifier_->callVerificationFailedCallback(policy.first, p);
- return p;
+ return p.second;
}
}
- return auth::VerificationPolicy::ACCEPT;
+ verifier_->callVerificationFailedCallback(suffix, accept_policy);
+ return accept_policy;
}
void RTCVerifier::onDataRecoveredFec(uint32_t suffix) {
@@ -203,35 +214,101 @@ void RTCVerifier::onJumpForward(uint32_t next_suffix) {
return;
}
- // When we jump forward in the suffix sequence, we remove packets that
- // probably won't be verified. Those packets have a suffix in the range
- // [last_manifest_ + 1, next_suffix[.
- for (auto it = unverified_packets_.begin();
- it != unverified_packets_.end();) {
- if (it->first > last_manifest_) {
- unverified_bytes_.erase(it->first);
- it = unverified_packets_.erase(it);
- } else {
- ++it;
+ // When we jump forward in the suffix sequence, we remove packets that won't
+ // be verified. Those packets have a suffix in the range [last_manifest_ + 1,
+ // next_suffix[.
+ for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix;
+ ++suffix) {
+ auto packet_it = packets_unverif_.packetIt(suffix);
+ if (packet_it != packets_unverif_.set().end()) {
+ packets_unverif_.remove(packet_it);
}
}
}
-uint32_t RTCVerifier::getTotalUnverified() const {
- uint32_t total = 0;
+double RTCVerifier::getBufferRatio() const {
+ size_t total = packets_verif_.size() + packets_unverif_.size();
+ double total_unverified = static_cast<double>(packets_unverif_.size());
+ return total ? total_unverified / total : 0.0;
+}
+
+RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) {
+ Timestamp oldest_verified = packets_verif_.set().empty()
+ ? now
+ : packets_verif_.set().begin()->timestamp;
+ Timestamp oldest_unverified = packets_unverif_.set().empty()
+ ? now
+ : packets_unverif_.set().begin()->timestamp;
+
+ // Prune verified packets older than the unverified interval
+ for (auto it = packets_verif_.set().begin();
+ it != packets_verif_.set().end();) {
+ if (now - it->timestamp < max_unverified_interval_) {
+ break;
+ }
+ it = packets_verif_.remove(it);
+ }
- for (auto bytes : unverified_bytes_) {
- if (bytes.second > UINT32_MAX - total) {
- total = UINT32_MAX;
+ // Prune unverified packets older than the unverified interval
+ for (auto it = packets_unverif_.set().begin();
+ it != packets_unverif_.set().end();) {
+ if (now - it->timestamp < max_unverified_interval_) {
break;
}
- total += bytes.second;
+ packets_unverif_erased_.insert(it->suffix);
+ it = packets_unverif_.remove(it);
+ }
+
+ return std::min(oldest_verified, oldest_unverified);
+}
+
+std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add(
+ const Packet &packet) {
+ auto inserted = packets_.insert(packet);
+ size_ += inserted.second ? packet.size : 0;
+ return inserted;
+}
+
+RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove(
+ PacketSet::iterator packet_it) {
+ size_ -= packet_it->size;
+ return packets_.erase(packet_it);
+}
+
+const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const {
+ return packets_;
+};
+
+size_t RTCVerifier::Packets::size() const { return size_; };
+
+std::pair<RTCVerifier::PacketSet::iterator, bool>
+RTCVerifier::PacketsUnverif::add(const Packet &packet,
+ const auth::CryptoHash &digest) {
+ auto inserted = add(packet);
+ if (inserted.second) {
+ packets_map_[packet.suffix] = inserted.first;
+ digests_map_[packet.suffix] = digest;
}
+ return inserted;
+}
- return total;
+RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove(
+ PacketSet::iterator packet_it) {
+ size_ -= packet_it->size;
+ packets_map_.erase(packet_it->suffix);
+ digests_map_.erase(packet_it->suffix);
+ return packets_.erase(packet_it);
}
-uint32_t RTCVerifier::getMaxUnverified() const { return max_unverified_bytes_; }
+RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt(
+ auth::Suffix suffix) {
+ return packets_map_.at(suffix);
+};
+
+const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap()
+ const {
+ return digests_map_;
+}
} // end namespace rtc
} // end namespace protocol
diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h
index 596bd8536..098984057 100644
--- a/libtransport/src/protocols/rtc/rtc_verifier.h
+++ b/libtransport/src/protocols/rtc/rtc_verifier.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Copyright (c) 2017-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -26,8 +26,9 @@ namespace rtc {
class RTCVerifier {
public:
- RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
- uint32_t max_unverified_delay);
+ explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier,
+ uint32_t max_unverified_interval,
+ double max_unverified_ratio);
virtual ~RTCVerifier() = default;
@@ -35,13 +36,9 @@ class RTCVerifier {
void setVerifier(std::shared_ptr<auth::Verifier> verifier);
- void setMaxUnverifiedDelay(uint32_t max_unverified_delay);
+ void setMaxUnverifiedInterval(uint32_t max_unverified_interval);
- void onDataRecoveredFec(uint32_t suffix);
- void onJumpForward(uint32_t next_suffix);
-
- uint32_t getTotalUnverified() const;
- uint32_t getMaxUnverified() const;
+ void setMaxUnverifiedRatio(double max_unverified_ratio);
auth::VerificationPolicy verify(core::ContentObject &content_object,
bool is_fec = false);
@@ -53,27 +50,84 @@ class RTCVerifier {
auth::VerificationPolicy processManifest(core::ContentObject &content_object);
+ void onDataRecoveredFec(uint32_t suffix);
+ void onJumpForward(uint32_t next_suffix);
+
+ double getBufferRatio() const;
+
protected:
+ struct Packet;
+ using Timestamp = uint64_t;
+ using PacketSet = std::set<Packet>;
+
+ struct Packet {
+ auth::Suffix suffix;
+ Timestamp timestamp;
+ size_t size;
+
+ bool operator==(const Packet &b) const {
+ return timestamp == b.timestamp && suffix == b.suffix;
+ }
+ bool operator<(const Packet &b) const {
+ return timestamp == b.timestamp ? suffix < b.suffix
+ : timestamp < b.timestamp;
+ }
+ };
+
+ class Packets {
+ public:
+ virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet);
+ virtual PacketSet::iterator remove(PacketSet::iterator packet_it);
+ const PacketSet &set() const;
+ size_t size() const;
+
+ protected:
+ PacketSet packets_;
+ size_t size_;
+ };
+
+ class PacketsVerif : public Packets {};
+
+ class PacketsUnverif : public Packets {
+ public:
+ using Packets::add;
+ std::pair<PacketSet::iterator, bool> add(const Packet &packet,
+ const auth::CryptoHash &digest);
+ PacketSet::iterator remove(PacketSet::iterator packet_it) override;
+ PacketSet::iterator packetIt(auth::Suffix suffix);
+ const auth::Verifier::SuffixMap &suffixMap() const;
+
+ private:
+ std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_;
+ auth::Verifier::SuffixMap digests_map_;
+ };
+
// The RTC state.
std::shared_ptr<RTCState> rtc_state_;
// The verifier instance.
std::shared_ptr<auth::Verifier> verifier_;
+ // Window to consider when verifying packets.
+ uint32_t max_unverified_interval_;
+ // Ratio of unverified packets over which an alert is triggered.
+ double max_unverified_ratio_;
+ // The suffix of the last processed manifest.
+ auth::Suffix last_manifest_;
// Hash algorithm used by manifests.
auth::CryptoHashType manifest_hash_algo_;
- // The last manifest processed.
- auth::Suffix last_manifest_;
- // Hold digests extracted from all manifests received.
+ // Digests extracted from all manifests received.
auth::Verifier::SuffixMap manifest_digests_;
- // Hold hashes of all content objects received before they are verified.
- auth::Verifier::SuffixMap unverified_packets_;
- // Hold number of unverified bytes.
- std::unordered_map<auth::Suffix, uint32_t> unverified_bytes_;
- // Maximum delay (in ms) for an unverified byte to become verifed.
- uint32_t max_unverified_delay_;
- // Maximum number of unverified bytes before aborting the connection.
- uint64_t max_unverified_bytes_;
+ // Verified packets with timestamp >= now - max_unverified_interval_.
+ PacketsVerif packets_verif_;
+ // Unverified packets with timestamp >= now - max_unverified_interval_.
+ PacketsUnverif packets_unverif_;
+ // Unverified erased packets with timestamp < now - max_unverified_interval_.
+ std::unordered_set<auth::Suffix> packets_unverif_erased_;
+
+ // Flushes all packets with timestamp < now - max_unverified_interval_.
+ // Returns the timestamp of the oldest packet, verified or not.
+ Timestamp flush_packets(Timestamp now);
};
-} // end namespace rtc
+} // namespace rtc
} // namespace protocol
} // namespace transport
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
index f1e49ec0b..a73b9fb7b 100644
--- a/libtransport/src/protocols/transport_protocol.cc
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -80,12 +80,6 @@ int TransportProtocol::start() {
socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
- std::string fec_type_str = "";
- socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str);
- if (fec_type_str != "") {
- fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str());
- }
-
// Set it is the first time we schedule an interest
is_first_ = true;
diff --git a/libtransport/src/test/test_auth.cc b/libtransport/src/test/test_auth.cc
index d7fd55433..0c47dd958 100644
--- a/libtransport/src/test/test_auth.cc
+++ b/libtransport/src/test/test_auth.cc
@@ -117,13 +117,11 @@ TEST_F(AuthTest, AsymmetricBufferRSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
@@ -157,13 +155,11 @@ TEST_F(AuthTest, AsymmetricBufferDSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
@@ -233,13 +229,11 @@ TEST_F(AuthTest, AsymmetricBufferECDSA) {
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
std::shared_ptr<AsymmetricVerifier> verif =
std::make_shared<AsymmetricVerifier>(pubKey);
- bool res = verif->verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = verif->verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
} // namespace auth
@@ -290,11 +284,9 @@ TEST_F(AuthTest, HMACbuffer) {
std::string payload = "bonjour";
std::vector<uint8_t> buffer(payload.begin(), payload.end());
signer->signBuffer(buffer);
- std::vector<uint8_t> sig = signer->getSignature();
+ utils::MemBuf::Ptr sig = signer->getSignature();
SymmetricVerifier hmac(PASSPHRASE);
- bool res = hmac.verifyBuffer(
- buffer, std::vector<uint8_t>(sig.data(), sig.data() + sig.size()),
- CryptoHashType::SHA256);
+ bool res = hmac.verifyBuffer(buffer, sig, CryptoHashType::SHA256);
EXPECT_EQ(res, true);
}
diff --git a/libtransport/src/test/test_core_manifest.cc b/libtransport/src/test/test_core_manifest.cc
index 23fd5e342..b998ce96b 100644
--- a/libtransport/src/test/test_core_manifest.cc
+++ b/libtransport/src/test/test_core_manifest.cc
@@ -173,7 +173,7 @@ TEST_F(ManifestTest, SetParamsRTC) {
.timestamp = 1,
.prod_rate = 2,
.prod_seg = 3,
- .support_fec = 1,
+ .fec_type = protocol::fec::FECType::UNKNOWN,
};
manifest1_.setParamsRTC(params);
diff --git a/libtransport/src/utils/max_filter.h b/libtransport/src/utils/max_filter.h
index 7a2c6aace..db1a1a565 100644
--- a/libtransport/src/utils/max_filter.h
+++ b/libtransport/src/utils/max_filter.h
@@ -28,7 +28,7 @@ class MaxFilter {
public:
MaxFilter(std::size_t size) : size_(size) {}
- std::size_t size() { return by_arrival_.size(); }
+ std::size_t size() const { return by_arrival_.size(); }
template <typename R>
void pushBack(R&& value) {
@@ -45,9 +45,9 @@ class MaxFilter {
by_order_.clear();
}
- const T& begin() { return *by_order_.crbegin(); }
+ const T& begin() const { return *by_order_.crbegin(); }
- const T& rBegin() { return *by_order_.rbegin(); }
+ const T& rBegin() const { return *by_order_.rbegin(); }
private:
std::multiset<T> by_order_;
diff --git a/libtransport/src/utils/min_filter.h b/libtransport/src/utils/min_filter.h
index 4c7ae81f1..4a3882601 100644
--- a/libtransport/src/utils/min_filter.h
+++ b/libtransport/src/utils/min_filter.h
@@ -28,7 +28,7 @@ class MinFilter {
public:
MinFilter(std::size_t size) : size_(size) {}
- std::size_t size() { return by_arrival_.size(); }
+ std::size_t size() const { return by_arrival_.size(); }
template <typename R>
void pushBack(R&& value) {
@@ -45,9 +45,9 @@ class MinFilter {
by_order_.clear();
}
- const T& begin() { return *by_order_.cbegin(); }
+ const T& begin() const { return *by_order_.cbegin(); }
- const T& rBegin() { return *by_order_.crbegin(); }
+ const T& rBegin() const { return *by_order_.crbegin(); }
private:
std::multiset<T> by_order_;
diff --git a/tests/.env b/tests/.env
index 35c861bda..1d40e4dea 100644
--- a/tests/.env
+++ b/tests/.env
@@ -1,4 +1,5 @@
# Topologies
+TOPOLOGY_1_NODE=1-node
TOPOLOGY_2_NODES=2-nodes
# Container names
diff --git a/tests/1-node.yml b/tests/1-node.yml
new file mode 100644
index 000000000..5cd8bd46c
--- /dev/null
+++ b/tests/1-node.yml
@@ -0,0 +1,26 @@
+version: "3"
+services:
+ client:
+ build:
+ context: ..
+ dockerfile: ${DOCKERFILE}
+ args:
+ BASE_IMAGE: ${BASE_IMAGE}
+ image: hicn-base
+ privileged: true
+ container_name: forwarder
+ working_dir: /workspace
+ volumes:
+ - ..:/workspace
+ entrypoint: [/bin/bash, -ex, -c]
+ command:
+ - |
+ if [ -d /workspace/build-dev ]; then
+ ninja -C /workspace/build-dev install
+ fi
+
+ sudo ip addr add 192.168.1.1/24 dev eth0
+ sudo hicn-light-daemon \
+ --daemon --log-file /tmp/lite_client.log
+
+ tail -f /dev/null
diff --git a/tests/2-nodes-hicn-light.yml b/tests/2-nodes-hicn-light.yml
index e62a6c705..318a5ccbd 100644
--- a/tests/2-nodes-hicn-light.yml
+++ b/tests/2-nodes-hicn-light.yml
@@ -25,7 +25,7 @@ services:
sudo hicn-light-daemon \
--daemon \
--log-file /tmp/lite_client.log \
- --config /tmp/hicn-light.conf --capacity 0
+ --config /tmp/hicn-light.conf
tail -f /dev/null
diff --git a/tests/2-nodes.yml b/tests/2-nodes.yml
index ce87e5876..72dcc2298 100644
--- a/tests/2-nodes.yml
+++ b/tests/2-nodes.yml
@@ -6,6 +6,7 @@ services:
dockerfile: ${DOCKERFILE}
args:
BASE_IMAGE: ${BASE_IMAGE}
+ image: hicn-base
privileged: true
stdin_open: true
hostname: client
diff --git a/tests/Makefile b/tests/Makefile
new file mode 100644
index 000000000..6e85a717e
--- /dev/null
+++ b/tests/Makefile
@@ -0,0 +1,35 @@
+# Use when building for the fist time,
+# then `make test` forces a rebuild if local changes
+build:
+ BASE_IMAGE=hicn DOCKERFILE=Dockerfile.dev BUILD_SOFTWARE=1 \
+ docker-compose -f build.yml up --force-recreate --remove-orphans
+
+# Rebuild from scratch (to avoid cmake cache)
+rebuild:
+ BASE_IMAGE=hicn DOCKERFILE=Dockerfile.dev BUILD_SOFTWARE=1 \
+ REBUILD=1 \
+ docker-compose -f build.yml up --force-recreate --remove-orphans
+
+# Force base image creation from scratch
+rebase:
+ DOCKERFILE=Dockerfile.dev \
+ docker-compose -f build.yml build --pull --no-cache
+
+# If local changes, hicn is re-built
+test:
+ docker-compose -f 1-node.yml up --force-recreate --remove-orphans -d
+
+log:
+ docker exec forwarder tail -f -n +1 /tmp/lite_client.log
+
+shell:
+ docker exec -it forwarder bash
+
+down:
+ docker-compose -f 1-node.yml down
+
+functional:
+ sleep 1 # Wait for the forwarder to be ready
+ bash config.sh test_listeners
+ bash config.sh test_connections
+ bash config.sh test_routes \ No newline at end of file
diff --git a/tests/config.sh b/tests/config.sh
index bcd27e0b9..d504b1c97 100755
--- a/tests/config.sh
+++ b/tests/config.sh
@@ -118,7 +118,6 @@ function conf_exists() {
[[ "${configurations[*]}" =~ ${1} ]] && return 0 || return 1
}
-
# test-name client/server link-model
function setchannel() {
if ! conf_exists "${1}"; then
@@ -184,8 +183,16 @@ function setup() {
error "Error: topology does not exist."
fi
+ if [[ "${topology}" == "1-node" && "${conf}" == "None" ]]; then
+ docker-compose -f "${topology}".yml build
+ docker-compose -f "${topology}".yml up --remove-orphans --force-recreate -d
+
+ sleep 1
+ return
+ fi
+
if ! conf_exists "${conf}"; then
- error "Error: topology does not exist."
+ error "Error: topology conf does not exist."
fi
docker-compose -f "${topology}".yml -f "${topology}-${conf}".yml build
@@ -219,6 +226,11 @@ function stop() {
error "Error: topology does not exist."
fi
+ if [[ "${topology}" == "1-node" && "${conf}" == "None" ]]; then
+ docker-compose -f "${topology}".yml down || true
+ return
+ fi
+
if ! conf_exists "${conf}"; then
error "Error: tect configuration does not exist."
fi
@@ -240,6 +252,216 @@ function runtest() {
echo "${@}" | sudo -i
}
+################################################################
+# Test commands (hicn-light-control)
+################################################################
+INTERFACE="eth0"
+ADDRESS="192.168.1.1"
+LISTENER_NAME="udp0"
+LISTENER_NAME_2="udp1"
+CONN_NAME="conn0"
+CONN_NAME_2="conn1"
+PREFIX="b001::/16"
+COST=1
+
+#---------------------------------------------------------------
+# Helpers
+#---------------------------------------------------------------
+function exec_command() {
+ command=$1
+
+ output=$(docker exec forwarder hicn-light-control $command 2>&1)
+ echo "$output"
+}
+
+function assert_cmd_success() {
+ command=$1
+ output=$(exec_command "${command}")
+
+ if [[ -z "$output" ]]; then
+ echo "OK"
+ else
+ echo "FAILED"
+ exit 0
+ fi
+}
+
+function assert_cmd_failure() {
+ command=$1
+ output=$(exec_command "${command}")
+
+ if [[ ! -z "$output" ]]; then
+ echo "OK"
+ else
+ echo "FAILED"
+ exit 0
+ fi
+}
+
+#---------------------------------------------------------------
+# Tests for listeners, connections, routes
+#---------------------------------------------------------------
+function test_listeners() {
+ echo -n "Add listeners: "
+ command="add listener udp $LISTENER_NAME $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add listener udp $LISTENER_NAME_2 127.0.0.1 12345 $INTERFACE"
+ assert_cmd_success "${command}"
+
+ echo -n "List listeners: "
+ command="list listener"
+ output=$(exec_command "${command}")
+
+ if [[ "${output}" =~ "udp0 inet4://192.168.1.1:9695" &&
+ "${output}" =~ "udp1 inet4://127.0.0.1:12345" &&
+ "${output}" =~ "interface=lo" &&
+ "${output}" =~ "interface=$INTERFACE" &&
+ ! "${output}" =~ "ERROR" ]]; then
+ echo "OK"
+ else
+ echo "FAILED"
+ echo $output
+ exit 0
+ fi
+
+ echo -n "Remove listener using symbolic: "
+ command="remove listener $LISTENER_NAME"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove listener using ID: "
+ command="remove listener 2"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove non-existing listener using symbolic: "
+ command="remove listener $LISTENER_NAME_2"
+ assert_cmd_failure "${command}"
+
+ echo -n "Remove non-existing listener using ID: "
+ command="remove listener 5"
+ assert_cmd_failure "${command}"
+
+ echo -n "Add duplicated listener (same symbolic): "
+ command="add listener udp $LISTENER_NAME $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add listener udp $LISTENER_NAME 127.0.0.1 12345 $INTERFACE"
+ assert_cmd_failure "${command}"
+
+ echo -n "Add duplicated listener (same endpoints): "
+ command="add listener udp $LISTENER_NAME $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add listener udp $LISTENER_NAME_2 $ADDRESS 9695 $INTERFACE"
+ assert_cmd_failure "${command}"
+}
+
+function test_connections() {
+ echo -n "Add connections: "
+ command="add listener udp $LISTENER_NAME $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add connection udp $CONN_NAME $ADDRESS 9695 $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add connection udp $CONN_NAME_2 $ADDRESS 12345 $ADDRESS 9695 $INTERFACE"
+ assert_cmd_success "${command}"
+
+ echo -n "List connections: "
+ command="list connection"
+ output=$(exec_command "${command}")
+
+ if [[ "${output}" =~ "inet4://192.168.1.1:12345" &&
+ "${output}" =~ "inet4://192.168.1.1:9695" &&
+ "${output}" =~ "conn0" && "${output}" =~ "conn1" &&
+ ! "${output}" =~ "ERROR" ]]; then
+ echo "OK"
+ else
+ echo "FAILED"
+ echo $output
+ exit 0
+ fi
+
+ echo -n "Remove connection using symbolic: "
+ command="remove connection $CONN_NAME"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove connection using ID: "
+ command="remove connection 2"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove non-existing connection using symbolic: "
+ command="remove connection $CONN_NAME"
+ assert_cmd_failure "${command}"
+
+ echo -n "Remove non-existing connection using ID: "
+ command="remove connection 5"
+ assert_cmd_failure "${command}"
+
+ echo -n "Add duplicated connection (same symbolic): "
+ command="add connection udp $CONN_NAME $ADDRESS 9695 $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add connection udp $CONN_NAME $ADDRESS 9695 $ADDRESS 12345 $INTERFACE"
+ assert_cmd_failure "${command}"
+
+ # This case is allowed, success code is returned and symbolic is not updated
+ echo -n "Add duplicated connection (different symbolic, same endpoints): "
+ command="add connection udp $CONN_NAME_2 $ADDRESS 9695 $ADDRESS 9695 $INTERFACE"
+ assert_cmd_success "${command}"
+}
+
+function test_routes() {
+ echo -n "Add route: "
+ command="add listener udp $LISTENER_NAME $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add connection udp $CONN_NAME $ADDRESS 9695 $ADDRESS 9695 $INTERFACE"
+ _=$(exec_command "${command}")
+ command="add route $CONN_NAME $PREFIX $COST"
+ assert_cmd_success "${command}"
+
+ echo -n "List routes: "
+ command="list route"
+ output=$(exec_command "${command}")
+
+ if [[ "${output}" =~ "b001::" && "${output}" =~ "16" &&
+ ! "${output}" =~ "ERROR" ]]; then
+ echo "OK"
+ else
+ echo "FAILED"
+ echo "$output"
+ exit 0
+ fi
+
+ echo -n "Remove route using symbolic: "
+ command="remove route $CONN_NAME $PREFIX"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove route using ID: "
+ command="add route $CONN_NAME $PREFIX $COST"
+ _=$(exec_command "${command}")
+ command="remove route 1 $PREFIX"
+ assert_cmd_success "${command}"
+
+ echo -n "Remove non-existing route using symbolic: "
+ command="remove connection $CONN_NAME $PREFIX"
+ assert_cmd_failure "${command}"
+
+ echo -n "Remove non-existing route using ID: "
+ command="remove route 5 $PREFIX"
+ assert_cmd_failure "${command}"
+}
+
+declare -A ctrl_tests=(
+ ["listeners"]="test_listeners"
+ ["connections"]="test_connections"
+ ["routes"]="test_routes"
+)
+
+function ctrl() {
+ type=$1
+ if [[ ! -v "ctrl_tests[${type}]" ]]; then
+ error "Error: hicn-light-contrl test does not exist."
+ exit 1
+ fi
+
+ ${ctrl_tests[${type}]}
+}
+
while (("${#}")); do
case "$1" in
'build')
@@ -281,9 +503,13 @@ while (("${#}")); do
runtest "${@:2}"
break
;;
- *)
+ 'ctrl')
+ ctrl "${2}"
break
;;
+ *)
+ exit 1
+ ;;
esac
done
diff --git a/tests/functional-tests/hicn-light-control.robot b/tests/functional-tests/hicn-light-control.robot
new file mode 100644
index 000000000..e29fc51d6
--- /dev/null
+++ b/tests/functional-tests/hicn-light-control.robot
@@ -0,0 +1,29 @@
+*** Settings ***
+Resource resources/libraries/robot/common.robot
+Test Setup Run Keywords
+... Build Topology 1-node AND
+... Check Environment
+Test Teardown Run Keywords
+... Destroy Topology
+
+*** Test Cases ***
+Listeners
+ Log to console Test listeners
+ ${result} = Run Process ${EXECDIR}/config.sh ctrl listeners
+ Log Many stdout: ${result.stdout}
+ Should Be Equal As Integers ${result.rc} 0
+ Should Not Contain ${result.stdout} FAILED
+
+Connections
+ Log to console Test connections
+ ${result} = Run Process ${EXECDIR}/config.sh ctrl connections
+ Log Many stdout: ${result.stdout}
+ Should Be Equal As Integers ${result.rc} 0
+ Should Not Contain ${result.stdout} FAILED
+
+Routes
+ Log to console Test routes
+ ${result} = Run Process ${EXECDIR}/config.sh ctrl routes
+ Log Many stdout: ${result.stdout}
+ Should Be Equal As Integers ${result.rc} 0
+ Should Not Contain ${result.stdout} FAILED
diff --git a/tests/resources/libraries/robot/common.robot b/tests/resources/libraries/robot/common.robot
index c1e3f20a4..7550db788 100644
--- a/tests/resources/libraries/robot/common.robot
+++ b/tests/resources/libraries/robot/common.robot
@@ -13,6 +13,7 @@ Build Topology
${result_setup} = Run Process ${EXECDIR}/config.sh build setup ${TEST_TOPOLOGY} ${TEST_CONFIGURATION}
Log to console Done
Log Many stdout: ${result_setup.stdout} stderr: ${result_setup.stderr}
+ Should Be Equal As Integers ${result_setup.rc} 0
Check Environment
${result} = Run Process docker ps
@@ -21,3 +22,4 @@ Check Environment
Destroy Topology
${result_teardown} = Run Process ${EXECDIR}/config.sh stopall
Log Many stdout: ${result_teardown.stdout} stderr: ${result_teardown.stderr}
+ Should Be Equal As Integers ${result_teardown.rc} 0