aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmake/Modules/Packager.cmake49
-rw-r--r--cmake/Modules/ServiceScript.cmake42
-rw-r--r--ctrl/libhicnctrl/includes/hicn/ctrl/api.h106
-rwxr-xr-xctrl/libhicnctrl/includes/hicn/ctrl/commands.h2
-rw-r--r--ctrl/libhicnctrl/src/CMakeLists.txt1
-rw-r--r--ctrl/libhicnctrl/src/api.c908
-rw-r--r--hicn-light/CMakeLists.txt10
-rw-r--r--hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c1
-rw-r--r--hicn-light/src/hicn/config/CMakeLists.txt2
-rw-r--r--hicn-light/src/hicn/config/configuration.c82
-rw-r--r--hicn-light/src/hicn/config/configurationListeners.c4
-rw-r--r--hicn-light/src/hicn/config/controlAddConnection.c22
-rw-r--r--hicn-light/src/hicn/config/controlAddListener.c55
-rw-r--r--hicn-light/src/hicn/config/controlListConnections.c1
-rw-r--r--hicn-light/src/hicn/config/controlRemove.c7
-rw-r--r--hicn-light/src/hicn/config/controlRemoveListener.c115
-rw-r--r--hicn-light/src/hicn/config/controlRemoveListener.h31
-rw-r--r--hicn-light/src/hicn/config/symbolicNameTable.c38
-rw-r--r--hicn-light/src/hicn/core/connection.c1
-rw-r--r--hicn-light/src/hicn/io/hicnListener.c202
-rw-r--r--hicn-light/src/hicn/io/ioOperations.c2
-rw-r--r--hicn-light/src/hicn/io/listenerSet.c14
-rw-r--r--hicn-light/src/hicn/io/listenerSet.h16
-rw-r--r--hicn-light/src/hicn/io/streamConnection.c1
-rw-r--r--hicn-light/src/hicn/io/tcpListener.c11
-rw-r--r--hicn-light/src/hicn/io/udpListener.c3
-rw-r--r--hicn-light/src/hicn/processor/messageProcessor.c18
-rw-r--r--hicn-light/src/hicn/socket/ops_linux.c25
-rw-r--r--hicn-light/src/hicn/utils/commands.h29
-rw-r--r--hicn-light/src/hicn/utils/utils.c1
-rw-r--r--hicn-plugin/src/data_pcslookup_node.c52
-rw-r--r--hicn-plugin/src/pcs.h28
-rw-r--r--lib/src/name.c2
-rw-r--r--lib/src/util/ip_address.c11
-rw-r--r--libtransport/src/hicn/transport/CMakeLists.txt2
-rw-r--r--libtransport/src/hicn/transport/config.h.in4
-rw-r--r--libtransport/src/hicn/transport/core/portal.h12
-rw-r--r--libtransport/src/hicn/transport/interfaces/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.cc26
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.h4
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc203
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc717
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h673
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_default_values.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_keys.h31
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc659
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h508
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc15
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h6
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc40
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc316
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h5
-rw-r--r--libtransport/src/hicn/transport/utils/content_store.cc7
-rw-r--r--libtransport/src/hicn/transport/utils/content_store.h5
-rw-r--r--utils/src/hiperf.cc17
57 files changed, 3194 insertions, 1955 deletions
diff --git a/cmake/Modules/Packager.cmake b/cmake/Modules/Packager.cmake
index 898e40bcb..6f77fd3a8 100644
--- a/cmake/Modules/Packager.cmake
+++ b/cmake/Modules/Packager.cmake
@@ -33,10 +33,35 @@ function(get_next_version VERSION NEXT_VERSION)
math(EXPR major "${major} + 1")
endif()
- set(minor "0${minor}")
+ if (minor LESS 10)
+ set(minor "0${minor}")
+ endif()
+
set(${NEXT_VERSION} "${major}.${minor}" PARENT_SCOPE)
endfunction()
+macro(extract_version)
+ # Extract version from git
+ execute_process(
+ COMMAND git describe --long --match v*
+ WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
+ OUTPUT_VARIABLE VER
+ OUTPUT_STRIP_TRAILING_WHITESPACE
+ )
+
+ if (NOT VER)
+ set(VER "v1.0-0-gcafe")
+ endif()
+
+ message(STATUS "Git describe output: ${VER}")
+
+ string(REGEX REPLACE "v([0-9]+).([0-9]+)-([0-9]+)-(g[0-9a-f]+)" "\\1;\\2;\\3;\\4" VER ${VER})
+ list(GET VER 0 VERSION_MAJOR)
+ list(GET VER 1 VERSION_MINOR)
+ list(GET VER 2 VERSION_REVISION)
+ list(GET VER 3 COMMIT_NAME)
+endmacro(extract_version)
+
macro(make_packages)
if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
# parse /etc/os-release
@@ -49,23 +74,17 @@ macro(make_packages)
set(OS_${_name} ${_value})
endforeach()
- # extract version from git
- execute_process(
- COMMAND git describe --long --match v*
- WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
- OUTPUT_VARIABLE VER
- OUTPUT_STRIP_TRAILING_WHITESPACE
- )
+ extract_version()
- if (NOT VER)
- set(VER "v1.0-1-gcafe")
- endif()
+ message(STATUS "Version major: ${VERSION_MAJOR}")
+ message(STATUS "Version minor: ${VERSION_MINOR}")
+ message(STATUS "Revision: ${VERSION_REVISION}")
+ message(STATUS "Commit hash: ${COMMIT_NAME}")
- string(REGEX REPLACE "v(.*)-([0-9]+)-(g[0-9a-f]+)" "\\1;\\2;\\3" VER ${VER})
- list(GET VER 0 tag)
+ set(tag "${VERSION_MAJOR}.${VERSION_MINOR}")
string(REPLACE "-" "~" tag ${tag})
- list(GET VER 1 commit_num)
- list(GET VER 2 commit_name)
+ set(commit_num ${VERSION_REVISION})
+ set(commit_name ${COMMIT_NAME})
if (NOT DEFINED ENV{BUILD_NUMBER})
set(bld "b1")
diff --git a/cmake/Modules/ServiceScript.cmake b/cmake/Modules/ServiceScript.cmake
new file mode 100644
index 000000000..110aa816b
--- /dev/null
+++ b/cmake/Modules/ServiceScript.cmake
@@ -0,0 +1,42 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+########################################
+#
+# Find the LongBow libraries and includes
+# This module sets:
+# SYSTEMD_FOUND: True if Systemd was found
+# SYSTEMD_SERVICES_INSTALL_DIR: The Systemd install directory
+
+set(SYSTEMD_SERVICE_FOLDER "/lib/systemd/system")
+
+macro(install_service_script script)
+cmake_parse_arguments(ARG
+ ""
+ "COMPONENT"
+ ""
+ ${ARGN}
+)
+
+ # Install service file only if
+ # 1) We are on a linux system
+ # 2) The installation prefix is /usr
+
+ if (NOT ARG_COMPONENT)
+ set(ARG_COMPONENT hicn)
+ endif()
+
+ if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux" AND ${CMAKE_INSTALL_PREFIX} STREQUAL "/usr")
+ install (FILES ${script} DESTINATION ${SYSTEMD_SERVICE_FOLDER} COMPONENT ${ARG_COMPONENT})
+ endif()
+endmacro(install_service_script) \ No newline at end of file
diff --git a/ctrl/libhicnctrl/includes/hicn/ctrl/api.h b/ctrl/libhicnctrl/includes/hicn/ctrl/api.h
index a0ee828b9..07c98514d 100644
--- a/ctrl/libhicnctrl/includes/hicn/ctrl/api.h
+++ b/ctrl/libhicnctrl/includes/hicn/ctrl/api.h
@@ -73,11 +73,6 @@
#define HICN_DEFAULT_PORT 9695
-#define LIBHICNCTRL_SUCCESS 0
-#define LIBHICNCTRL_FAILURE -1
-#define LIBHICNCTRL_NOT_IMPLEMENTED -99
-#define LIBHICNCTRL_IS_ERROR(x) (x < 0)
-
/* Helper for avoiding warnings about type-punning */
#define UNION_CAST(x, destType) \
(((union {__typeof__(x) a; destType b;})x).b)
@@ -219,11 +214,11 @@ hc_ ## TYPE ## _find(hc_data_t * data, const hc_ ## TYPE ## _t * element, \
foreach_type(hc_ ## TYPE ## _t, x, data) { \
if (hc_ ## TYPE ## _cmp(x, element) >= 0) { \
*found = x; \
- return LIBHICNCTRL_SUCCESS; \
+ return 0; \
} \
}; \
*found = NULL; /* this is optional */ \
- return LIBHICNCTRL_SUCCESS; \
+ return 0; \
}
/******************************************************************************
@@ -236,113 +231,103 @@ hc_ ## TYPE ## _find(hc_data_t * data, const hc_ ## TYPE ## _t * element, \
/**
* \brief Holds the state of an hICN control socket
*/
-typedef struct {
- char * url;
- int fd;
- u32 seq;
-
- /* Partial receive buffer */
- u8 buf[RECV_BUFLEN];
- size_t roff; /**< Read offset */
- size_t woff; /**< Write offset */
-
- /*
- * Because received messages are potentially unbounded in size, we might not
- * guarantee that we can store a full packet before processing it. We must
- * implement a very simple state machine remembering the current parsing
- * status in order to partially process the packet.
- */
- size_t remaining;
- u32 send_id;
- u32 send_seq;
- u32 recv_seq;
-} hc_sock_t;
+typedef struct hc_sock_s hc_sock_t;
/**
* \brief Create an hICN control socket using the specified URL.
* \param [in] url - The URL to connect to.
* \return an hICN control socket
*/
-hc_sock_t *
-hc_sock_create_url(const char * url);
+hc_sock_t * hc_sock_create_url(const char * url);
/**
* \brief Create an hICN control socket using the default connection type.
* \return an hICN control socket
*/
-hc_sock_t *
-hc_sock_create(void);
+hc_sock_t * hc_sock_create(void);
/**
* \brief Frees an hICN control socket
+ * \param [in] s - hICN control socket
*/
-void
-hc_sock_free(hc_sock_t *s);
+void hc_sock_free(hc_sock_t * s);
+
+/**
+ * \brief Returns the next available sequence number to use for requests to the
+ * API.
+ * \param [in] s - hICN control socket
+ */
+int hc_sock_get_next_seq(hc_sock_t * s);
/**
* \brief Sets the socket as non-blocking
+ * \param [in] s - hICN control socket
* \return Error code
*/
-int
-hc_sock_set_nonblocking(hc_sock_t *s);
+int hc_sock_set_nonblocking(hc_sock_t * s);
+
+/**
+ * \brief Return the file descriptor associated to the hICN contorl sock
+ * \param [in] s - hICN control socket
+ * \return The file descriptor (positive value), or a negative integer in case
+ * of error
+ */
+int hc_sock_get_fd(hc_sock_t * s);
/**
* \brief Connect the socket
* \return Error code
*/
int
-hc_sock_connect(hc_sock_t *s);
+hc_sock_connect(hc_sock_t * s);
/**
* \brief Return the offset and size of available buffer space
- * \param [in] sock - hICN control socket
+ * \param [in] s - hICN control socket
* \param [out] buffer - Offset in buffer
* \param [out] size - Remaining size
* \return Error code
*/
-int
-hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size);
+int hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size);
/**
* \brief Write/read iexchance on the control socket (internal helper function)
- * \param [in] sock - hICN control socket
+ * \param [in] s - hICN control socket
* \param [in] msg - Message to send
* \param [in] msglen - Length of the message to send
* \return Error code
*/
-int
-hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen);
+int hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen, int seq);
/**
* \brief Helper for reading socket contents
- * \param [in] sock - hICN control socket
- * \param [in] data - Result data buffer
- * \param [in] parse - Parse function to convert remote types into lib native
- * types, or NULL not to perform any translation.
+ * \param [in] s - hICN control socket
* \return Error code
*/
-int
-hc_sock_recv(hc_sock_t * s, hc_data_t * data);
+int hc_sock_recv(hc_sock_t * s);
/**
* \brief Processing data received by socket
- * \param [in] sock - hICN control socket
- * \param [in] data - Result data buffer
+ * \param [in] s - hICN control socket
* \param [in] parse - Parse function to convert remote types into lib native
* types, or NULL not to perform any translation.
* \return Error code
*/
-int
-hc_sock_process(hc_sock_t * s, hc_data_t * data,
- int (*parse)(const u8 * src, u8 * dst));
+int hc_sock_process(hc_sock_t * s, hc_data_t ** data);
+
+/**
+ * \brief Callback used in async mode when data is available on the socket
+ * \param [in] s - hICN control socket
+ * \return Error code
+ */
+int hc_sock_callback(hc_sock_t * s, hc_data_t ** data);
/**
* \brief Reset the state of the sock (eg. to handle a reconnecton)
- * \param [in] sock - hICN control socket
+ * \param [in] s - hICN control socket
* \return Error code
*/
-int
-hc_sock_reset(hc_sock_t * s);
+int hc_sock_reset(hc_sock_t * s);
/******************************************************************************
* Command-specific structures and functions
@@ -471,7 +456,7 @@ typedef struct {
int hc_listener_create(hc_sock_t * s, hc_listener_t * listener);
/* listener_found might eventually be allocated, and needs to be freed */
-int hc_listener_get(hc_sock_t *s, hc_listener_t * listener,
+int hc_listener_get(hc_sock_t * s, hc_listener_t * listener,
hc_listener_t ** listener_found);
int hc_listener_delete(hc_sock_t * s, hc_listener_t * listener);
int hc_listener_list(hc_sock_t * s, hc_data_t ** pdata);
@@ -518,7 +503,7 @@ typedef struct {
int hc_connection_create(hc_sock_t * s, hc_connection_t * connection);
/* connection_found will be allocated, and must be freed */
-int hc_connection_get(hc_sock_t *s, hc_connection_t * connection,
+int hc_connection_get(hc_sock_t * s, hc_connection_t * connection,
hc_connection_t ** connection_found);
int hc_connection_update_by_id(hc_sock_t * s, int hc_connection_id,
hc_connection_t * connection);
@@ -579,13 +564,14 @@ int hc_face_create(hc_sock_t * s, hc_face_t * face);
int hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found);
int hc_face_delete(hc_sock_t * s, hc_face_t * face);
int hc_face_list(hc_sock_t * s, hc_data_t ** pdata);
+int hc_face_list_async(hc_sock_t * s); //, hc_data_t ** pdata);
#define foreach_face(VAR, data) foreach_type(hc_face_t, VAR, data)
#define MAX_FACE_ID 255
#define MAXSZ_FACE_ID_ 3
#define MAXSZ_FACE_ID MAXSZ_FACE_ID_ + NULLTERM
-#define MAXSZ_FACE_NAME_ NAMELEN
+#define MAXSZ_FACE_NAME_ NAME_LEN
#define MAXSZ_FACE_NAME MAXSZ_FACE_NAME_ + NULLTERM
#define MAXSZ_HC_FACE_ MAXSZ_FACE_ID_ + MAXSZ_FACE_NAME_ + MAXSZ_FACE_ + 5
diff --git a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h
index 4209c6eb6..75f05988f 100755
--- a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h
+++ b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h
@@ -58,6 +58,7 @@ typedef enum {
ADD_ROUTE,
LIST_ROUTES,
REMOVE_CONNECTION,
+ REMOVE_LISTENER,
REMOVE_ROUTE,
CACHE_STORE,
CACHE_SERVE,
@@ -77,7 +78,6 @@ typedef enum {
REMOVE_POLICY,
UPDATE_CONNECTION,
#endif /* WITH_POLICY */
- REMOVE_LISTENER,
LAST_COMMAND_VALUE
} command_id;
diff --git a/ctrl/libhicnctrl/src/CMakeLists.txt b/ctrl/libhicnctrl/src/CMakeLists.txt
index 7b4413d55..4708595e0 100644
--- a/ctrl/libhicnctrl/src/CMakeLists.txt
+++ b/ctrl/libhicnctrl/src/CMakeLists.txt
@@ -24,6 +24,7 @@ set(HEADER_FILES
set(UTIL_HEADER_FILES
face.h
util/log.h
+ util/map.h
)
set(SOURCE_FILES
diff --git a/ctrl/libhicnctrl/src/api.c b/ctrl/libhicnctrl/src/api.c
index 769c96076..aa1ec8edd 100644
--- a/ctrl/libhicnctrl/src/api.c
+++ b/ctrl/libhicnctrl/src/api.c
@@ -20,6 +20,7 @@
#include <assert.h> // assert
#include <math.h> // log2
+#include <stdbool.h>
#include <stdio.h> // snprintf
#include <string.h> // memmove, strcasecmp
#include <sys/socket.h> // socket
@@ -30,10 +31,83 @@
#include <hicn/ctrl/commands.h>
#include <hicn/util/token.h>
#include "util/log.h"
+#include "util/map.h"
#include <strings.h>
#define PORT 9695
+/*
+ * Internal state associated to a pending request
+ */
+typedef struct {
+ int seq;
+ hc_data_t * data;
+ /* Information used to process results */
+ int size_in;
+ int (*parse)(const u8 * src, u8 * dst);
+} hc_sock_request_t;
+
+/**
+ * Messages to the forwarder might be multiplexed thanks to the seqNum fields in
+ * the header_control_message structure. The forwarder simply answers back the
+ * original sequence number. We maintain a map of such sequence number to
+ * outgoing queries so that replied can be demultiplexed and treated
+ * appropriately.
+ */
+TYPEDEF_MAP_H(hc_sock_map, int, hc_sock_request_t *);
+TYPEDEF_MAP(hc_sock_map, int, hc_sock_request_t *, int_cmp, int_snprintf, generic_snprintf);
+
+struct hc_sock_s {
+ char * url;
+ int fd;
+
+ /* Partial receive buffer */
+ u8 buf[RECV_BUFLEN];
+ size_t roff; /**< Read offset */
+ size_t woff; /**< Write offset */
+
+ /*
+ * Because received messages are potentially unbounded in size, we might not
+ * guarantee that we can store a full packet before processing it. We must
+ * implement a very simple state machine remembering the current parsing
+ * status in order to partially process the packet.
+ */
+ size_t remaining;
+ u32 send_id;
+
+ /* Next sequence number to be used for requests */
+ int seq;
+
+ /* Request being parsed (NULL if none) */
+ hc_sock_request_t * cur_request;
+
+ bool async;
+ hc_sock_map_t * map;
+};
+
+
+hc_sock_request_t *
+hc_sock_request_create(int seq, hc_data_t * data, HC_PARSE parse)
+{
+ assert(seq >= 0);
+ assert(data);
+
+ hc_sock_request_t * request = malloc(sizeof(hc_sock_request_t));
+ if (!request)
+ return NULL;
+ request->seq = seq;
+ request->data = data;
+ request->parse = parse;
+ return request;
+}
+
+void
+hc_sock_request_free(hc_sock_request_t * request)
+{
+ free(request);
+}
+
+
#if 0
#ifdef __APPLE__
#define RANDBYTE() (u8)(arc4random() & 0xFF)
@@ -239,7 +313,7 @@ hc_data_create(size_t in_element_size, size_t out_element_size)
data->in_element_size = in_element_size;
data->out_element_size = out_element_size;
data->size = 0;
- data->complete = 0;
+ data->complete = false;
data->command_id = 0; // TODO this could also be a busy mark in the socket
/* No callback needed in blocking code for instance */
data->complete_cb = NULL;
@@ -274,21 +348,21 @@ hc_data_ensure_available(hc_data_t * data, size_t count)
data->max_size_log = new_size_log;
data->buffer = realloc(data->buffer, (1 << new_size_log) * data->out_element_size);
if (!data->buffer)
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
hc_data_push_many(hc_data_t * data, const void * elements, size_t count)
{
if (hc_data_ensure_available(data, count) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
memcpy(data->buffer + data->size * data->out_element_size, elements,
count * data->out_element_size);
data->size += count;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
@@ -316,7 +390,7 @@ hc_data_set_callback(hc_data_t * data, data_callback_t cb, void * cb_data)
{
data->complete_cb = cb;
data->complete_cb_data = cb_data;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
@@ -325,14 +399,14 @@ hc_data_set_complete(hc_data_t * data)
data->complete = true;
if (data->complete_cb)
return data->complete_cb(data, data->complete_cb_data);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
hc_data_reset(hc_data_t * data)
{
data->size = 0;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/******************************************************************************
@@ -377,10 +451,10 @@ hc_sock_parse_url(const char * url, struct sockaddr * sa)
break;
}
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
hc_sock_t *
@@ -399,9 +473,20 @@ hc_sock_create_url(const char * url)
if (hc_sock_reset(s) < 0)
goto ERR_RESET;
+ s->seq = 0;
+ s->cur_request = NULL;
+
+ s->map = hc_sock_map_create();
+ if (!s->map)
+ goto ERR_MAP;
+
return s;
+ //hc_sock_map_free(s->map);
+ERR_MAP:
ERR_RESET:
+ if (s->url)
+ free(s->url);
close(s->fd);
ERR_SOCKET:
free(s);
@@ -418,6 +503,18 @@ hc_sock_create(void)
void
hc_sock_free(hc_sock_t * s)
{
+ hc_sock_request_t ** request_array = NULL;
+ int n = hc_sock_map_get_value_array(s->map, &request_array);
+ if (n < 0) {
+ ERROR("Could not retrieve pending request array for freeing up resources");
+ } else {
+ for (unsigned i = 0; i < n; i++) {
+ hc_sock_request_t * request = request_array[i];
+ hc_sock_request_free(request);
+ }
+ free(request_array);
+ }
+ hc_sock_map_free(s->map);
if (s->url)
free(s->url);
close(s->fd);
@@ -425,13 +522,25 @@ hc_sock_free(hc_sock_t * s)
}
int
-hc_sock_set_nonblocking(hc_sock_t *s)
+hc_sock_get_next_seq(hc_sock_t * s)
+{
+ return s->seq++;
+}
+
+int
+hc_sock_set_nonblocking(hc_sock_t * s)
{
return (fcntl(s->fd, F_SETFL, fcntl(s->fd, F_GETFL) | O_NONBLOCK) < 0);
}
int
-hc_sock_connect(hc_sock_t *s)
+hc_sock_get_fd(hc_sock_t * s)
+{
+ return s->fd;
+}
+
+int
+hc_sock_connect(hc_sock_t * s)
{
struct sockaddr_storage ss = { 0 };
@@ -444,17 +553,18 @@ hc_sock_connect(hc_sock_t *s)
if (connect(s->fd, (struct sockaddr *)&ss, size) < 0) //sizeof(struct sockaddr)) < 0)
goto ERR_CONNECT;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
ERR_CONNECT:
ERR_PARSE:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
int
-hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen)
+hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen, int seq)
{
int rc;
+ msg->hdr.seqNum = seq;
rc = send(s->fd, msg, msglen, 0);
if (rc < 0) {
perror("hc_sock_send");
@@ -469,11 +579,11 @@ hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size)
*buffer = s->buf + s->woff;
*size = RECV_BUFLEN - s->woff;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
-hc_sock_recv(hc_sock_t * s, hc_data_t * data)
+hc_sock_recv(hc_sock_t * s)
{
int rc;
@@ -485,24 +595,24 @@ hc_sock_recv(hc_sock_t * s, hc_data_t * data)
rc = recv(s->fd, s->buf + s->woff, RECV_BUFLEN - s->woff, 0);
if (rc == 0) {
- return LIBHICNCTRL_FAILURE;
/* Connection has been closed */
- // XXX
+ return 0;
}
if (rc < 0) {
+ /*
+ * Let's not return 0 which currently means the socket has been closed
+ */
+ if (errno == EWOULDBLOCK)
+ return -1;
perror("hc_sock_recv");
- /* Error occurred */
- // XXX check for EWOULDBLOCK;
- // XXX
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
s->woff += rc;
- return LIBHICNCTRL_SUCCESS;
+ return rc;
}
int
-hc_sock_process(hc_sock_t * s, hc_data_t * data,
- int (*parse)(const u8 * src, u8 * dst))
+hc_sock_process(hc_sock_t * s, hc_data_t ** data)
{
int err = 0;
@@ -511,7 +621,7 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data,
while(available > 0) {
- if (s->remaining == 0) {
+ if (!s->cur_request) { // No message being parsed, alternatively (remaining == 0)
hc_msg_t * msg = (hc_msg_t*)(s->buf + s->roff);
/* We expect a message header */
@@ -519,74 +629,82 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data,
break;
/* Sanity checks (might instead raise warnings) */
- // TODO: sync check ?
assert((msg->hdr.messageType == RESPONSE_LIGHT) ||
(msg->hdr.messageType == ACK_LIGHT) ||
(msg->hdr.messageType == NACK_LIGHT));
- //assert(msg->hdr.commandID == data->command_id); // FIXME
- assert(msg->hdr.seqNum == s->recv_seq++);
+ hc_sock_request_t * request = NULL;
+ if (hc_sock_map_get(s->map, msg->hdr.seqNum, &request) < 0) {
+ ERROR("[hc_sock_process] Error searching for matching request");
+ return -1;
+ }
+ if (!request) {
+ ERROR("[hc_sock_process] No request matching received sequence number");
+ return -1;
+ }
s->remaining = msg->hdr.length;
if (s->remaining == 0) {
- /*
- * The protocol expects all sequence number to be reset after
- * each transaction. We reset before running the callback in
- * case it triggers new exchanges.
- */
- s->send_seq = HICN_CTRL_SEND_SEQ_INIT;
- s->recv_seq = HICN_CTRL_RECV_SEQ_INIT;
-
- // TODO : check before even sending ?
- /* Complete message without payload */
- // TODO : is this correct ? no error code ?
- hc_data_set_complete(data);
+ if (data) {
+ *data = request->data;
+// } else {
+// free(request->data);
+ }
+ hc_data_set_complete(request->data);
+ hc_sock_request_free(request);
+ } else {
+ /* We only remember it if there is still data to parse */
+ s->cur_request = request;
}
available -= sizeof(hc_msg_header_t);
s->roff += sizeof(hc_msg_header_t);
} else {
/* We expect the complete payload, or at least a chunk of it */
- size_t num_chunks = available / data->in_element_size;
+ size_t num_chunks = available / s->cur_request->data->in_element_size;
if (num_chunks == 0)
break;
if (num_chunks > s->remaining)
num_chunks = s->remaining;
- if (!parse) {
- hc_data_push_many(data, s->buf + s->roff, num_chunks);
+ if (!s->cur_request->parse) {
+ /* If we don't need to parse results, then we can directly push
+ * all of them into the result data structure */
+ hc_data_push_many(s->cur_request->data, s->buf + s->roff, num_chunks);
} else {
int rc;
- rc = hc_data_ensure_available(data, num_chunks);
+ rc = hc_data_ensure_available(s->cur_request->data, num_chunks);
if (rc < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
for (int i = 0; i < num_chunks; i++) {
- u8 * dst = hc_data_get_next(data);
+ u8 * dst = hc_data_get_next(s->cur_request->data);
if (!dst)
- return LIBHICNCTRL_FAILURE;
+ return -1;
- rc = parse(s->buf + s->roff + i * data->in_element_size, dst);
+ rc = s->cur_request->parse(s->buf + s->roff + i * s->cur_request->data->in_element_size, dst);
if (rc < 0)
err = -1; /* FIXME we let the loop complete (?) */
- data->size++;
+ s->cur_request->data->size++;
}
}
-
s->remaining -= num_chunks;
+ available -= num_chunks * s->cur_request->data->in_element_size;
+ s->roff += num_chunks * s->cur_request->data->in_element_size;
if (s->remaining == 0) {
- /*
- * The protocol expects all sequence number to be reset after
- * each transaction. We reset before running the callback in
- * case it triggers new exchanges.
- */
- s->send_seq = HICN_CTRL_SEND_SEQ_INIT;
- s->recv_seq = HICN_CTRL_RECV_SEQ_INIT;
-
- hc_data_set_complete(data);
+ if (hc_sock_map_remove(s->map, s->cur_request->seq, NULL) < 0) {
+ ERROR("[hc_sock_process] Error removing request from map");
+ return -1;
+ }
+ if (data) {
+ *data = s->cur_request->data;
+// } else {
+// free(s->cur_request->data);
+ }
+ hc_data_set_complete(s->cur_request->data);
+ hc_sock_request_free(s->cur_request);
+ s->cur_request = NULL;
}
- available -= num_chunks * data->in_element_size;
- s->roff += num_chunks * data->in_element_size;
}
}
@@ -605,13 +723,48 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data,
}
int
+hc_sock_callback(hc_sock_t * s, hc_data_t ** data)
+{
+ *data = NULL;
+
+ for (;;) {
+ int n = hc_sock_recv(s);
+ if (n == 0) {
+ DEBUG("EOF");
+ goto ERR_EOF;
+ }
+ if (n < 0) {
+ switch(errno) {
+ case ECONNRESET:
+ case ENODEV:
+ /* Forwarder restarted */
+ WARN("Forwarder likely restarted: not (yet) implemented");
+ goto ERR_EOF;
+ case EWOULDBLOCK:
+ //DEBUG("Would block... stop reading from socket");
+ goto END;
+ default:
+ perror("hc_sock_recv");
+ goto ERR_EOF;
+ }
+ }
+ if (hc_sock_process(s, data) < 0) {
+ return -1;
+ }
+ }
+END:
+ return 0;
+
+ERR_EOF:
+ return -1;
+}
+
+int
hc_sock_reset(hc_sock_t * s)
{
s->roff = s->woff = 0;
- s->send_seq = HICN_CTRL_SEND_SEQ_INIT;
- s->recv_seq = HICN_CTRL_RECV_SEQ_INIT;
s->remaining = 0;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/******************************************************************************
@@ -630,8 +783,11 @@ typedef struct {
int
hc_execute_command(hc_sock_t * s, hc_msg_t * msg, size_t msg_len,
- hc_command_params_t * params, hc_data_t ** pdata)
+ hc_command_params_t * params, hc_data_t ** pdata, bool async)
{
+ if (async)
+ assert(!pdata);
+
/* Sanity check */
switch(params->cmd) {
case ACTION_CREATE:
@@ -655,35 +811,70 @@ hc_execute_command(hc_sock_t * s, hc_msg_t * msg, size_t msg_len,
assert(params->parse == NULL);
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- hc_sock_reset(s);
+ //hc_sock_reset(s);
/* XXX data will at least store the result (complete) */
hc_data_t * data = hc_data_create(params->size_in, params->size_out);
- if (!data)
+ if (!data) {
+ ERROR("[hc_execute_command] Could not create data storage");
goto ERR_DATA;
+ }
- if (hc_sock_send(s, msg, msg_len) < 0)
+ int seq = hc_sock_get_next_seq(s);
+ if (seq < 0) {
+ ERROR("[hc_execute_command] Could not get next sequence number");
+ goto ERR_SEQ;
+ }
+
+ /* Create state used to process the request */
+ hc_sock_request_t * request = NULL;
+ request = hc_sock_request_create(seq, data, params->parse);
+ if (!request) {
+ ERROR("[hc_execute_command] Could not create request state");
+ goto ERR_REQUEST;
+ }
+
+ /* Add state to map */
+ if (hc_sock_map_add(s->map, seq, request) < 0) {
+ ERROR("[hc_execute_command] Error adding request state to map");
+ goto ERR_MAP;
+ }
+
+ if (hc_sock_send(s, msg, msg_len, seq) < 0) {
+ ERROR("[hc_execute_command] Error sending message");
goto ERR_PROCESS;
+ }
+
+ if (async)
+ return 0;
+
while(!data->complete) {
- if (hc_sock_recv(s, data) < 0)
- break;
- if (hc_sock_process(s, data, params->parse) < 0) {
+ /*
+ * As the socket is non blocking it might happen that we need to read
+ * several times before success... shall we alternate between blocking
+ * and non-blocking mode ?
+ */
+ if (hc_sock_recv(s) < 0)
+ continue; //break;
+ if (hc_sock_process(s, pdata) < 0) {
+ ERROR("[hc_execute_command] Error processing socket results");
goto ERR_PROCESS;
}
}
- if (pdata)
- *pdata = data;
-
- return LIBHICNCTRL_SUCCESS;
+ return 0;
ERR_PROCESS:
+ERR_MAP:
+ hc_sock_request_free(request);
+ERR_REQUEST:
+ERR_SEQ:
free(data);
ERR_DATA:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
/*----------------------------------------------------------------------------*
@@ -693,13 +884,13 @@ ERR_DATA:
/* LISTENER CREATE */
int
-hc_listener_create(hc_sock_t * s, hc_listener_t * listener)
+_hc_listener_create(hc_sock_t * s, hc_listener_t * listener, bool async)
{
if (!IS_VALID_FAMILY(listener->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_CONNECTION_TYPE(listener->type))
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -709,7 +900,7 @@ hc_listener_create(hc_sock_t * s, hc_listener_t * listener)
.messageType = REQUEST_LIGHT,
.commandID = ADD_LISTENER,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
.address = {
@@ -733,31 +924,43 @@ hc_listener_create(hc_sock_t * s, hc_listener_t * listener)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_listener_create(hc_sock_t * s, hc_listener_t * listener)
+{
+ return _hc_listener_create(s, listener, false);
+}
+
+int
+hc_listener_create_async(hc_sock_t * s, hc_listener_t * listener)
+{
+ return _hc_listener_create(s, listener, true);
}
/* LISTENER GET */
int
-hc_listener_get(hc_sock_t *s, hc_listener_t * listener,
+hc_listener_get(hc_sock_t * s, hc_listener_t * listener,
hc_listener_t ** listener_found)
{
hc_data_t * listeners;
hc_listener_t * found;
if (hc_listener_list(s, &listeners) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
/* Test */
if (hc_listener_find(listeners, listener, &found) < 0) {
hc_data_free(listeners);
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (found) {
*listener_found = malloc(sizeof(hc_listener_t));
if (!*listener_found)
- return LIBHICNCTRL_FAILURE;
+ return -1;
**listener_found = *found;
} else {
*listener_found = NULL;
@@ -765,14 +968,14 @@ hc_listener_get(hc_sock_t *s, hc_listener_t * listener,
hc_data_free(listeners);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* LISTENER DELETE */
int
-hc_listener_delete(hc_sock_t * s, hc_listener_t * listener)
+_hc_listener_delete(hc_sock_t * s, hc_listener_t * listener, bool async)
{
struct {
header_control_message hdr;
@@ -782,24 +985,20 @@ hc_listener_delete(hc_sock_t * s, hc_listener_t * listener)
.messageType = REQUEST_LIGHT,
.commandID = REMOVE_LISTENER,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
if (listener->id) {
- printf("Delete by ID\n");
snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%d", listener->id);
} else if (*listener->name) {
- printf("Delete by name %s\n", listener->name);
snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%s", listener->name);
} else {
- printf("Delete after search\n");
hc_listener_t * listener_found;
if (hc_listener_get(s, listener, &listener_found) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!listener_found)
- return LIBHICNCTRL_FAILURE;
- printf("Delete listener ID=%d\n", listener_found->id);
+ return -1;
snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%d", listener_found->id);
free(listener_found);
}
@@ -812,13 +1011,26 @@ hc_listener_delete(hc_sock_t * s, hc_listener_t * listener)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
}
+int
+hc_listener_delete(hc_sock_t * s, hc_listener_t * listener)
+{
+ return _hc_listener_delete(s, listener, false);
+}
+
+int
+hc_listener_delete_async(hc_sock_t * s, hc_listener_t * listener)
+{
+ return _hc_listener_delete(s, listener, true);
+}
+
+
/* LISTENER LIST */
int
-hc_listener_list(hc_sock_t * s, hc_data_t ** pdata)
+_hc_listener_list(hc_sock_t * s, hc_data_t ** pdata, bool async)
{
struct {
header_control_message hdr;
@@ -827,7 +1039,7 @@ hc_listener_list(hc_sock_t * s, hc_data_t ** pdata)
.messageType = REQUEST_LIGHT,
.commandID = LIST_LISTENERS,
.length = 0,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
@@ -839,7 +1051,19 @@ hc_listener_list(hc_sock_t * s, hc_data_t ** pdata)
.parse = (HC_PARSE)hc_listener_parse,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata, async);
+}
+
+int
+hc_listener_list(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_listener_list(s, pdata, false);
+}
+
+int
+hc_listener_list_async(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_listener_list(s, pdata, true);
}
/* LISTENER VALIDATE */
@@ -848,12 +1072,12 @@ int
hc_listener_validate(const hc_listener_t * listener)
{
if (!IS_VALID_FAMILY(listener->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_CONNECTION_TYPE(listener->type))
- return LIBHICNCTRL_FAILURE;
+ return -1;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* LISTENER CMP */
@@ -866,8 +1090,8 @@ hc_listener_cmp(const hc_listener_t * l1, const hc_listener_t * l2)
(strncmp(l1->interface_name, l2->interface_name, INTERFACE_LEN) == 0) &&
(ip_address_cmp(&l1->local_addr, &l2->local_addr, l1->family) == 0) &&
(l1->local_port == l2->local_port))
- ? LIBHICNCTRL_SUCCESS
- : LIBHICNCTRL_FAILURE;
+ ? 0
+ : -1;
}
/* LISTENER PARSE */
@@ -878,18 +1102,18 @@ hc_listener_parse(void * in, hc_listener_t * listener)
list_listeners_command * cmd = (list_listeners_command *)in;
if (!IS_VALID_LIST_LISTENERS_TYPE(cmd->encapType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
hc_connection_type_t type = map_from_encap_type[cmd->encapType];
if (type == CONNECTION_TYPE_UNDEFINED)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_ADDR_TYPE(cmd->addressType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
int family = map_from_addr_type[cmd->addressType];
if (!IS_VALID_FAMILY(family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
*listener = (hc_listener_t) {
.id = cmd->connid,
@@ -900,7 +1124,7 @@ hc_listener_parse(void * in, hc_listener_t * listener)
};
snprintf(listener->name, NAME_LEN, "%s", cmd->listenerName);
snprintf(listener->interface_name, INTERFACE_LEN, "%s", cmd->interfaceName);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
GENERATE_FIND(listener)
@@ -931,10 +1155,10 @@ hc_listener_snprintf(char * s, size_t size, hc_listener_t * listener)
/* CONNECTION CREATE */
int
-hc_connection_create(hc_sock_t * s, hc_connection_t * connection)
+_hc_connection_create(hc_sock_t * s, hc_connection_t * connection, bool async)
{
if (hc_connection_validate(connection) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -944,7 +1168,7 @@ hc_connection_create(hc_sock_t * s, hc_connection_t * connection)
.messageType = REQUEST_LIGHT,
.commandID = ADD_CONNECTION,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -970,31 +1194,43 @@ hc_connection_create(hc_sock_t * s, hc_connection_t * connection)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_connection_create(hc_sock_t * s, hc_connection_t * connection)
+{
+ return _hc_connection_create(s, connection, false);
+}
+
+int
+hc_connection_create_async(hc_sock_t * s, hc_connection_t * connection)
+{
+ return _hc_connection_create(s, connection, true);
}
/* CONNECTION GET */
int
-hc_connection_get(hc_sock_t *s, hc_connection_t * connection,
+hc_connection_get(hc_sock_t * s, hc_connection_t * connection,
hc_connection_t ** connection_found)
{
hc_data_t * connections;
hc_connection_t * found;
if (hc_connection_list(s, &connections) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
/* Test */
if (hc_connection_find(connections, connection, &found) < 0) {
hc_data_free(connections);
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (found) {
*connection_found = malloc(sizeof(hc_connection_t));
if (!*connection_found)
- return LIBHICNCTRL_FAILURE;
+ return -1;
**connection_found = *found;
} else {
*connection_found = NULL;
@@ -1002,14 +1238,14 @@ hc_connection_get(hc_sock_t *s, hc_connection_t * connection,
hc_data_free(connections);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* CONNECTION DELETE */
int
-hc_connection_delete(hc_sock_t * s, hc_connection_t * connection)
+_hc_connection_delete(hc_sock_t * s, hc_connection_t * connection, bool async)
{
struct {
header_control_message hdr;
@@ -1019,24 +1255,20 @@ hc_connection_delete(hc_sock_t * s, hc_connection_t * connection)
.messageType = REQUEST_LIGHT,
.commandID = REMOVE_CONNECTION,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
if (connection->id) {
- printf("Delete by ID\n");
snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%d", connection->id);
} else if (*connection->name) {
- printf("Delete by name %s\n", connection->name);
snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%s", connection->name);
} else {
- printf("Delete after search\n");
hc_connection_t * connection_found;
if (hc_connection_get(s, connection, &connection_found) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!connection_found)
- return LIBHICNCTRL_FAILURE;
- printf("Delete connection ID=%d\n", connection_found->id);
+ return -1;
snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%d", connection_found->id);
free(connection_found);
}
@@ -1049,14 +1281,25 @@ hc_connection_delete(hc_sock_t * s, hc_connection_t * connection)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_connection_delete(hc_sock_t * s, hc_connection_t * connection)
+{
+ return _hc_connection_delete(s, connection, false);
}
+int
+hc_connection_delete_async(hc_sock_t * s, hc_connection_t * connection)
+{
+ return _hc_connection_delete(s, connection, true);
+}
/* CONNECTION LIST */
int
-hc_connection_list(hc_sock_t * s, hc_data_t ** pdata)
+_hc_connection_list(hc_sock_t * s, hc_data_t ** pdata, bool async)
{
struct {
header_control_message hdr;
@@ -1065,7 +1308,7 @@ hc_connection_list(hc_sock_t * s, hc_data_t ** pdata)
.messageType = REQUEST_LIGHT,
.commandID = LIST_CONNECTIONS,
.length = 0,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
@@ -1077,7 +1320,19 @@ hc_connection_list(hc_sock_t * s, hc_data_t ** pdata)
.parse = (HC_PARSE)hc_connection_parse,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata, async);
+}
+
+int
+hc_connection_list(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_connection_list(s, pdata, false);
+}
+
+int
+hc_connection_list_async(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_connection_list(s, pdata, true);
}
/* CONNECTION VALIDATE */
@@ -1086,14 +1341,14 @@ int
hc_connection_validate(const hc_connection_t * connection)
{
if (!IS_VALID_FAMILY(connection->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_CONNECTION_TYPE(connection->type))
- return LIBHICNCTRL_FAILURE;
+ return -1;
/* TODO assert both local and remote have the right family */
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* CONNECTION CMP */
@@ -1111,8 +1366,8 @@ int hc_connection_cmp(const hc_connection_t * c1, const hc_connection_t * c2)
(c1->local_port == c2->local_port) &&
(ip_address_cmp(&c1->remote_addr, &c2->remote_addr, c1->family) == 0) &&
(c1->remote_port == c2->remote_port))
- ? LIBHICNCTRL_SUCCESS
- : LIBHICNCTRL_FAILURE;
+ ? 0
+ : -1;
}
/* CONNECTION PARSE */
@@ -1123,25 +1378,25 @@ hc_connection_parse(void * in, hc_connection_t * connection)
list_connections_command * cmd = (list_connections_command *)in;
if (!IS_VALID_LIST_CONNECTIONS_TYPE(cmd->connectionData.connectionType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
hc_connection_type_t type = map_from_list_connections_type[cmd->connectionData.connectionType];
if (type == CONNECTION_TYPE_UNDEFINED)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_LIST_CONNECTIONS_STATE(cmd->state))
- return LIBHICNCTRL_FAILURE;
+ return -1;
hc_connection_state_t state = map_from_list_connections_state[cmd->state];
if (state == HC_CONNECTION_STATE_UNDEFINED)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!IS_VALID_ADDR_TYPE(cmd->connectionData.ipType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
int family = map_from_addr_type[cmd->connectionData.ipType];
if (!IS_VALID_FAMILY(family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
*connection = (hc_connection_t) {
.id = cmd->connid,
@@ -1159,7 +1414,7 @@ hc_connection_parse(void * in, hc_connection_t * connection)
};
snprintf(connection->name, NAME_LEN, "%s", cmd->connectionData.symbolic);
snprintf(connection->interface_name, INTERFACE_LEN, "%s", cmd->interfaceName);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
GENERATE_FIND(connection)
@@ -1196,8 +1451,8 @@ hc_connection_snprintf(char * s, size_t size, const hc_connection_t * connection
/* CONNECTION SET ADMIN STATE */
int
-hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
- face_state_t state)
+_hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
+ face_state_t state, bool async)
{
struct {
header_control_message hdr;
@@ -1207,7 +1462,7 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
.messageType = REQUEST_LIGHT,
.commandID = CONNECTION_SET_ADMIN_STATE,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
.admin_state = state,
@@ -1223,7 +1478,21 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
+ face_state_t state)
+{
+ return _hc_connection_set_admin_state(s, conn_id_or_name, state, false);
+}
+
+int
+hc_connection_set_admin_state_async(hc_sock_t * s, const char * conn_id_or_name,
+ face_state_t state)
+{
+ return _hc_connection_set_admin_state(s, conn_id_or_name, state, true);
}
/*----------------------------------------------------------------------------*
@@ -1233,10 +1502,10 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name,
/* ROUTE CREATE */
int
-hc_route_create(hc_sock_t * s, hc_route_t * route)
+_hc_route_create(hc_sock_t * s, hc_route_t * route, bool async)
{
if (!IS_VALID_FAMILY(route->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -1246,7 +1515,7 @@ hc_route_create(hc_sock_t * s, hc_route_t * route)
.messageType = REQUEST_LIGHT,
.commandID = ADD_ROUTE,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -1271,16 +1540,28 @@ hc_route_create(hc_sock_t * s, hc_route_t * route)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_route_create(hc_sock_t * s, hc_route_t * route)
+{
+ return _hc_route_create(s, route, false);
+}
+
+int
+hc_route_create_async(hc_sock_t * s, hc_route_t * route)
+{
+ return _hc_route_create(s, route, true);
}
/* ROUTE DELETE */
int
-hc_route_delete(hc_sock_t * s, hc_route_t * route)
+_hc_route_delete(hc_sock_t * s, hc_route_t * route, bool async)
{
if (!IS_VALID_FAMILY(route->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -1290,7 +1571,7 @@ hc_route_delete(hc_sock_t * s, hc_route_t * route)
.messageType = REQUEST_LIGHT,
.commandID = REMOVE_ROUTE,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -1308,13 +1589,25 @@ hc_route_delete(hc_sock_t * s, hc_route_t * route)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_route_delete(hc_sock_t * s, hc_route_t * route)
+{
+ return _hc_route_delete(s, route, false);
+}
+
+int
+hc_route_delete_async(hc_sock_t * s, hc_route_t * route)
+{
+ return _hc_route_delete(s, route, true);
}
/* ROUTE LIST */
int
-hc_route_list(hc_sock_t * s, hc_data_t ** pdata)
+_hc_route_list(hc_sock_t * s, hc_data_t ** pdata, bool async)
{
struct {
header_control_message hdr;
@@ -1323,7 +1616,7 @@ hc_route_list(hc_sock_t * s, hc_data_t ** pdata)
.messageType = REQUEST_LIGHT,
.commandID = LIST_ROUTES,
.length = 0,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
@@ -1335,7 +1628,19 @@ hc_route_list(hc_sock_t * s, hc_data_t ** pdata)
.parse = (HC_PARSE)hc_route_parse,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata, async);
+}
+
+int
+hc_route_list(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_route_list(s, pdata, false);
+}
+
+int
+hc_route_list_async(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_route_list(s, pdata, true);
}
/* ROUTE PARSE */
@@ -1346,11 +1651,11 @@ hc_route_parse(void * in, hc_route_t * route)
list_routes_command * cmd = (list_routes_command *) in;
if (!IS_VALID_ADDR_TYPE(cmd->addressType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
int family = map_from_addr_type[cmd->addressType];
if (!IS_VALID_FAMILY(family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
*route = (hc_route_t) {
.face_id = cmd->connid,
@@ -1359,7 +1664,7 @@ hc_route_parse(void * in, hc_route_t * route)
.len = cmd->len,
.cost = cmd->cost,
};
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* ROUTE SNPRINTF */
@@ -1416,9 +1721,9 @@ hc_face_to_listener(const hc_face_t * face, hc_listener_t * listener)
case FACE_TYPE_UDP_LISTENER:
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- return LIBHICNCTRL_FAILURE; /* XXX Not implemented */
+ return -1; /* XXX Not implemented */
}
/* LISTENER -> FACE */
@@ -1426,7 +1731,7 @@ hc_face_to_listener(const hc_face_t * face, hc_listener_t * listener)
int
hc_listener_to_face(const hc_listener_t * listener, hc_face_t * face)
{
- return LIBHICNCTRL_FAILURE; /* XXX Not implemented */
+ return -1; /* XXX Not implemented */
}
/* FACE -> CONNECTION */
@@ -1501,13 +1806,13 @@ hc_face_to_connection(const hc_face_t * face, hc_connection_t * connection, bool
f->netdevice.name);
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
snprintf(connection->interface_name, INTERFACE_LEN, "%s",
f->netdevice.name);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* CONNECTION -> FACE */
@@ -1570,14 +1875,14 @@ hc_connection_to_face(const hc_connection_t * connection, hc_face_t * face)
};
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
face->face.netdevice.name[0] = '\0';
face->face.netdevice.index = 0;
snprintf(face->name, NAME_LEN, "%s", connection->name);
snprintf(face->face.netdevice.name, INTERFACE_LEN, "%s", connection->interface_name);
netdevice_update_index(&face->face.netdevice);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* CONNECTION -> LISTENER */
@@ -1594,7 +1899,7 @@ hc_connection_to_local_listener(const hc_connection_t * connection, hc_listener_
};
snprintf(listener->name, NAME_LEN, "lst%u", RANDBYTE()); // generate name
snprintf(listener->interface_name, INTERFACE_LEN, "%s", connection->interface_name);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* FACE CREATE */
@@ -1615,18 +1920,18 @@ hc_face_create(hc_sock_t * s, hc_face_t * face)
case FACE_TYPE_UDP:
if (hc_face_to_connection(face, &connection, true) < 0) {
ERROR("[hc_face_create] Could not convert face to connection.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
/* Ensure we have a corresponding local listener */
if (hc_connection_to_local_listener(&connection, &listener) < 0) {
ERROR("[hc_face_create] Could not convert face to local listener.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (hc_listener_get(s, &listener, &listener_found) < 0) {
ERROR("[hc_face_create] Could not retrieve listener");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (!listener_found) {
@@ -1634,7 +1939,7 @@ hc_face_create(hc_sock_t * s, hc_face_t * face)
if (hc_listener_create(s, &listener) < 0) {
ERROR("[hc_face_create] Could not create listener.");
free(listener_found);
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
} else {
free(listener_found);
@@ -1643,7 +1948,7 @@ hc_face_create(hc_sock_t * s, hc_face_t * face)
/* Create corresponding connection */
if (hc_connection_create(s, &connection) < 0) {
ERROR("[hc_face_create] Could not create connection.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
/*
@@ -1652,12 +1957,12 @@ hc_face_create(hc_sock_t * s, hc_face_t * face)
*/
if (hc_connection_get(s, &connection, &connection_found) < 0) {
ERROR("[hc_face_create] Could not retrieve connection");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (!connection_found) {
ERROR("[hc_face_create] Could not find newly created connection.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
face->id = connection_found->id;
@@ -1670,21 +1975,21 @@ hc_face_create(hc_sock_t * s, hc_face_t * face)
case FACE_TYPE_UDP_LISTENER:
if (hc_face_to_listener(face, &listener) < 0) {
ERROR("Could not convert face to listener.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
if (hc_listener_create(s, &listener) < 0) {
ERROR("[hc_face_create] Could not create listener.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- return LIBHICNCTRL_FAILURE;
+ return -1;
break;
default:
ERROR("[hc_face_create] Unknwon face type.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
};
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
@@ -1702,12 +2007,12 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found)
case FACE_TYPE_TCP:
case FACE_TYPE_UDP:
if (hc_face_to_connection(face, &connection, false) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (hc_connection_get(s, &connection, &connection_found) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!connection_found) {
*face_found = NULL;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
*face_found = malloc(sizeof(face_t));
hc_connection_to_face(connection_found, *face_found);
@@ -1718,12 +2023,12 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found)
case FACE_TYPE_TCP_LISTENER:
case FACE_TYPE_UDP_LISTENER:
if (hc_face_to_listener(face, &listener) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (hc_listener_get(s, &listener, &listener_found) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
if (!listener_found) {
*face_found = NULL;
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
*face_found = malloc(sizeof(face_t));
hc_listener_to_face(listener_found, *face_found);
@@ -1731,10 +2036,10 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found)
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
@@ -1747,7 +2052,7 @@ hc_face_delete(hc_sock_t * s, hc_face_t * face)
hc_connection_t connection;
if (hc_face_to_connection(face, &connection, false) < 0) {
ERROR("[hc_face_delete] Could not convert face to connection.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
return hc_connection_delete(s, &connection);
}
@@ -1762,7 +2067,7 @@ hc_face_list(hc_sock_t * s, hc_data_t ** pdata)
if (hc_connection_list(s, &connection_data) < 0) {
ERROR("[hc_face_list] Could not list connections.");
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
hc_data_t * face_data = hc_data_create(sizeof(hc_connection_t), sizeof(hc_face_t));
@@ -1776,11 +2081,55 @@ hc_face_list(hc_sock_t * s, hc_data_t ** pdata)
*pdata = face_data;
hc_data_free(connection_data);
- return LIBHICNCTRL_SUCCESS;
+ return 0;
ERR:
hc_data_free(connection_data);
- return LIBHICNCTRL_FAILURE;
+ return -1;
+}
+
+int
+hc_connection_parse_to_face(void * in, hc_face_t * face)
+{
+ hc_connection_t connection;
+
+ if (hc_connection_parse(in, &connection) < 0) {
+ ERROR("[hc_connection_parse_to_face] Could not parse connection");
+ return -1;
+ }
+
+ if (hc_connection_to_face(&connection, face) < 0) {
+ ERROR("[hc_connection_parse_to_face] Could not convert connection to face.");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int
+hc_face_list_async(hc_sock_t * s) //, hc_data_t ** pdata)
+{
+ struct {
+ header_control_message hdr;
+ } msg = {
+ .hdr = {
+ .messageType = REQUEST_LIGHT,
+ .commandID = LIST_CONNECTIONS,
+ .length = 0,
+ .seqNum = 0,
+ },
+ };
+
+ hc_command_params_t params = {
+ .cmd = ACTION_LIST,
+ .cmd_id = LIST_CONNECTIONS,
+ .size_in = sizeof(list_connections_command),
+ .size_out = sizeof(hc_face_t),
+ .parse = (HC_PARSE)hc_connection_parse_to_face,
+ };
+
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, true);
}
/* /!\ Please update constants in header file upon changes */
@@ -1824,7 +2173,7 @@ hc_face_snprintf(char * s, size_t size, hc_face_t * face)
return rc;
break;
default:
- return LIBHICNCTRL_FAILURE;
+ return -1;
}
// [#ID NAME] TYPE LOCAL_URL REMOTE_URL STATE/ADMIN_STATE (TAGS)
@@ -1852,7 +2201,7 @@ hc_face_snprintf(char * s, size_t size, hc_face_t * face)
face_state_str[face->face.state],
face_state_str[face->face.admin_state]);
#endif /* WITH_POLICY */
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
@@ -1866,10 +2215,11 @@ hc_face_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, // XXX wron
* Punting
*----------------------------------------------------------------------------*/
-int hc_punting_create(hc_sock_t * s, hc_punting_t * punting)
+int
+_hc_punting_create(hc_sock_t * s, hc_punting_t * punting, bool async)
{
if (hc_punting_validate(punting) < 0)
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -1879,7 +2229,7 @@ int hc_punting_create(hc_sock_t * s, hc_punting_t * punting)
.messageType = REQUEST_LIGHT,
.commandID = ADD_PUNTING,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -1898,37 +2248,54 @@ int hc_punting_create(hc_sock_t * s, hc_punting_t * punting)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_punting_create(hc_sock_t * s, hc_punting_t * punting)
+{
+ return _hc_punting_create(s, punting, false);
+}
+
+int
+hc_punting_create_async(hc_sock_t * s, hc_punting_t * punting)
+{
+ return _hc_punting_create(s, punting, true);
}
int hc_punting_get(hc_sock_t * s, hc_punting_t * punting, hc_punting_t ** punting_found)
{
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ ERROR("hc_punting_get not (yet) implemented.");
+ return -1;
}
int hc_punting_delete(hc_sock_t * s, hc_punting_t * punting)
{
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ ERROR("hc_punting_delete not (yet) implemented.");
+ return -1;
}
int hc_punting_list(hc_sock_t * s, hc_data_t ** pdata)
{
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ ERROR("hc_punting_list not (yet) implemented.");
+ return -1;
}
int hc_punting_validate(const hc_punting_t * punting)
{
if (!IS_VALID_FAMILY(punting->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
/*
* We might use the zero value to add punting on all faces but this is not
* (yet) implemented
*/
- if (punting->face_id == 0)
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ if (punting->face_id == 0) {
+ ERROR("Punting on all faces is not (yet) implemented.");
+ return -1;
+ }
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int hc_punting_cmp(const hc_punting_t * p1, const hc_punting_t * p2)
@@ -1937,18 +2304,20 @@ int hc_punting_cmp(const hc_punting_t * p1, const hc_punting_t * p2)
(p1->family == p2->family) &&
(ip_address_cmp(&p1->prefix, &p2->prefix, p1->family) == 0) &&
(p1->prefix_len == p2->prefix_len))
- ? LIBHICNCTRL_SUCCESS
- : LIBHICNCTRL_FAILURE;
+ ? 0
+ : -1;
}
int hc_punting_parse(void * in, hc_punting_t * punting)
{
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ ERROR("hc_punting_parse not (yet) implemented.");
+ return -1;
}
int hc_punting_snprintf(char * s, size_t size, hc_punting_t * punting)
{
- return LIBHICNCTRL_NOT_IMPLEMENTED;
+ ERROR("hc_punting_snprintf not (yet) implemented.");
+ return -1;
}
@@ -1957,7 +2326,7 @@ int hc_punting_snprintf(char * s, size_t size, hc_punting_t * punting)
*----------------------------------------------------------------------------*/
int
-hc_cache_set_store(hc_sock_t * s, int enabled)
+_hc_cache_set_store(hc_sock_t * s, int enabled, bool async)
{
struct {
header_control_message hdr;
@@ -1967,7 +2336,7 @@ hc_cache_set_store(hc_sock_t * s, int enabled)
.messageType = REQUEST_LIGHT,
.commandID = CACHE_STORE,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
.activate = enabled,
@@ -1982,11 +2351,23 @@ hc_cache_set_store(hc_sock_t * s, int enabled)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
}
int
-hc_cache_set_serve(hc_sock_t * s, int enabled)
+hc_cache_set_store(hc_sock_t * s, int enabled)
+{
+ return _hc_cache_set_store(s, enabled, false);
+}
+
+int
+hc_cache_set_store_async(hc_sock_t * s, int enabled)
+{
+ return _hc_cache_set_store(s, enabled, true);
+}
+
+int
+_hc_cache_set_serve(hc_sock_t * s, int enabled, bool async)
{
struct {
header_control_message hdr;
@@ -1996,7 +2377,7 @@ hc_cache_set_serve(hc_sock_t * s, int enabled)
.messageType = REQUEST_LIGHT,
.commandID = CACHE_SERVE,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
.activate = enabled,
@@ -2011,9 +2392,20 @@ hc_cache_set_serve(hc_sock_t * s, int enabled)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
}
+int
+hc_cache_set_serve(hc_sock_t * s, int enabled)
+{
+ return _hc_cache_set_serve(s, enabled, false);
+}
+
+int
+hc_cache_set_serve_async(hc_sock_t * s, int enabled)
+{
+ return _hc_cache_set_serve(s, enabled, true);
+}
/*----------------------------------------------------------------------------*
* Strategy
@@ -2023,7 +2415,7 @@ hc_cache_set_serve(hc_sock_t * s, int enabled)
int
hc_strategy_set(hc_sock_t * s /* XXX */)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* How to retrieve that from the forwarder ? */
@@ -2042,12 +2434,12 @@ hc_strategy_list(hc_sock_t * s, hc_data_t ** data)
for (unsigned i = 0; i < ARRAY_SIZE(strategies); i++) {
hc_strategy_t * strategy = (hc_strategy_t*)hc_data_get_next(*data);
if (!strategy)
- return LIBHICNCTRL_FAILURE;
+ return -1;
snprintf(strategy->name, MAXSZ_HC_STRATEGY, "%s", strategies[i]);
(*data)->size++;
}
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* /!\ Please update constants in header file upon changes */
@@ -2065,7 +2457,7 @@ hc_strategy_snprintf(char * s, size_t size, hc_strategy_t * strategy)
int
hc_wldr_set(hc_sock_t * s /* XXX */)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/*----------------------------------------------------------------------------*
@@ -2075,25 +2467,25 @@ hc_wldr_set(hc_sock_t * s /* XXX */)
int
hc_mapme_set(hc_sock_t * s, int enabled)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
hc_mapme_set_discovery(hc_sock_t * s, int enabled)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
hc_mapme_set_timescale(hc_sock_t * s, double timescale)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
int
hc_mapme_set_retx(hc_sock_t * s, double timescale)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/*----------------------------------------------------------------------------*
@@ -2105,10 +2497,10 @@ hc_mapme_set_retx(hc_sock_t * s, double timescale)
/* POLICY CREATE */
int
-hc_policy_create(hc_sock_t * s, hc_policy_t * policy)
+_hc_policy_create(hc_sock_t * s, hc_policy_t * policy, bool async)
{
if (!IS_VALID_FAMILY(policy->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -2118,7 +2510,7 @@ hc_policy_create(hc_sock_t * s, hc_policy_t * policy)
.messageType = REQUEST_LIGHT,
.commandID = ADD_POLICY,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -2137,16 +2529,28 @@ hc_policy_create(hc_sock_t * s, hc_policy_t * policy)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_policy_create(hc_sock_t * s, hc_policy_t * policy)
+{
+ return _hc_policy_create(s, policy, false);
+}
+
+int
+hc_policy_create_async(hc_sock_t * s, hc_policy_t * policy)
+{
+ return _hc_policy_create(s, policy, true);
}
/* POLICY DELETE */
int
-hc_policy_delete(hc_sock_t * s, hc_policy_t * policy)
+_hc_policy_delete(hc_sock_t * s, hc_policy_t * policy, bool async)
{
if (!IS_VALID_FAMILY(policy->family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
struct {
header_control_message hdr;
@@ -2156,7 +2560,7 @@ hc_policy_delete(hc_sock_t * s, hc_policy_t * policy)
.messageType = REQUEST_LIGHT,
.commandID = REMOVE_POLICY,
.length = 1,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
.payload = {
/* we use IPv6 which is the longest address */
@@ -2174,13 +2578,25 @@ hc_policy_delete(hc_sock_t * s, hc_policy_t * policy)
.parse = NULL,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, NULL, async);
+}
+
+int
+hc_policy_delete(hc_sock_t * s, hc_policy_t * policy)
+{
+ return _hc_policy_delete(s, policy, false);
+}
+
+int
+hc_policy_delete_async(hc_sock_t * s, hc_policy_t * policy)
+{
+ return _hc_policy_delete(s, policy, true);
}
/* POLICY LIST */
int
-hc_policy_list(hc_sock_t * s, hc_data_t ** pdata)
+_hc_policy_list(hc_sock_t * s, hc_data_t ** pdata, bool async)
{
struct {
header_control_message hdr;
@@ -2189,7 +2605,7 @@ hc_policy_list(hc_sock_t * s, hc_data_t ** pdata)
.messageType = REQUEST_LIGHT,
.commandID = LIST_POLICIES,
.length = 0,
- .seqNum = s->send_seq,
+ .seqNum = 0,
},
};
@@ -2201,7 +2617,19 @@ hc_policy_list(hc_sock_t * s, hc_data_t ** pdata)
.parse = (HC_PARSE)hc_policy_parse,
};
- return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata);
+ return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), &params, pdata, async);
+}
+
+int
+hc_policy_list(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_policy_list(s, pdata, false);
+}
+
+int
+hc_policy_list_async(hc_sock_t * s, hc_data_t ** pdata)
+{
+ return _hc_policy_list(s, pdata, true);
}
/* POLICY PARSE */
@@ -2212,11 +2640,11 @@ hc_policy_parse(void * in, hc_policy_t * policy)
list_policies_command * cmd = (list_policies_command *) in;
if (!IS_VALID_ADDR_TYPE(cmd->addressType))
- return LIBHICNCTRL_FAILURE;
+ return -1;
int family = map_from_addr_type[cmd->addressType];
if (!IS_VALID_FAMILY(family))
- return LIBHICNCTRL_FAILURE;
+ return -1;
*policy = (hc_policy_t) {
.family = family,
@@ -2224,7 +2652,7 @@ hc_policy_parse(void * in, hc_policy_t * policy)
.len = cmd->len,
.policy = cmd->policy,
};
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
/* POLICY SNPRINTF */
@@ -2233,7 +2661,7 @@ hc_policy_parse(void * in, hc_policy_t * policy)
int
hc_policy_snprintf(char * s, size_t size, hc_policy_t * policy)
{
- return LIBHICNCTRL_SUCCESS;
+ return 0;
}
#endif /* WITH_POLICY */
diff --git a/hicn-light/CMakeLists.txt b/hicn-light/CMakeLists.txt
index 1e5805d68..a632380fc 100644
--- a/hicn-light/CMakeLists.txt
+++ b/hicn-light/CMakeLists.txt
@@ -39,7 +39,6 @@ if(NOT WIN32)
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /wd4996")
endif ()
- set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g")
if(${CMAKE_SYSTEM_NAME} STREQUAL "Android")
message("############ Detected cross compile for $ENV{CMAKE_SYSTEM_NAME}")
@@ -72,7 +71,7 @@ else()
endif ()
endif()
-include( Packaging )
+include(Packaging)
find_package(Threads REQUIRED)
@@ -108,3 +107,10 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "AppleClang")
endif()
add_subdirectory(src/hicn)
+
+# Install service file in linux systems
+include(ServiceScript)
+install_service_script(
+ ${CMAKE_CURRENT_SOURCE_DIR}/config/hicn-light.service
+ COMPONENT ${HICN_LIGHT}
+) \ No newline at end of file
diff --git a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c
index f704d237e..ab0e0e6d8 100644
--- a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c
+++ b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c
@@ -63,6 +63,7 @@ static int payloadLengthController[LAST_COMMAND_VALUE] = {
sizeof(add_route_command),
sizeof(list_routes_command), // needed when get response from FWD
sizeof(remove_connection_command),
+ sizeof(remove_listener_command),
sizeof(remove_route_command),
sizeof(cache_store_command),
sizeof(cache_serve_command),
diff --git a/hicn-light/src/hicn/config/CMakeLists.txt b/hicn-light/src/hicn/config/CMakeLists.txt
index b1e475aee..45f36e8ff 100644
--- a/hicn-light/src/hicn/config/CMakeLists.txt
+++ b/hicn-light/src/hicn/config/CMakeLists.txt
@@ -35,6 +35,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/controlListPolicies.h
${CMAKE_CURRENT_SOURCE_DIR}/controlQuit.h
${CMAKE_CURRENT_SOURCE_DIR}/controlRemove.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveListener.h
${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveConnection.h
${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveRoute.h
${CMAKE_CURRENT_SOURCE_DIR}/controlRemovePolicy.h
@@ -78,6 +79,7 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/controlListPolicies.c
${CMAKE_CURRENT_SOURCE_DIR}/controlQuit.c
${CMAKE_CURRENT_SOURCE_DIR}/controlRemove.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveListener.c
${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveConnection.c
${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveRoute.c
${CMAKE_CURRENT_SOURCE_DIR}/controlRemovePolicy.c
diff --git a/hicn-light/src/hicn/config/configuration.c b/hicn-light/src/hicn/config/configuration.c
index b14ea551a..2b63a32a0 100644
--- a/hicn-light/src/hicn/config/configuration.c
+++ b/hicn-light/src/hicn/config/configuration.c
@@ -346,7 +346,6 @@ static void configuration_SendResponse(Configuration *config, struct iovec *msg,
if (conn == NULL) {
return;
}
-
connection_SendIOVBuffer(conn, msg, 2);
}
@@ -450,6 +449,10 @@ struct iovec *configuration_ProcessCreateTunnel(Configuration *config,
/* Hook: new connection created through the control protocol */
forwarder_onConnectionEvent(config->forwarder, conn, CONNECTION_EVENT_UPDATE);
#endif /* WITH_MAPME */
+ if (source)
+ addressDestroy(&source);
+ if (destination)
+ addressDestroy(&destination);
success = true;
#else
@@ -472,6 +475,70 @@ ERR:
return utils_CreateNack(header, control, sizeof(add_connection_command));
}
+struct iovec *configuration_ProcessRemoveListener(Configuration *config,
+ struct iovec *request,
+ unsigned ingressId) {
+ header_control_message *header = request[0].iov_base;
+ remove_listener_command *control = request[1].iov_base;
+
+ bool success = false;
+
+ const char *symbolicOrListenerid = control->symbolicOrListenerid;
+ int listenerId = -1;
+ ListenerSet *listenerSet = forwarder_GetListenerSet(config->forwarder);
+ if (utils_IsNumber(symbolicOrListenerid)) {
+ // case for connid as input
+ listenerId = (unsigned)strtold(symbolicOrListenerid, NULL);
+ } else {
+ listenerId = listenerSet_FindIdByListenerName(listenerSet, symbolicOrListenerid);
+ }
+
+ if (listenerId >= 0) {
+
+ ConnectionTable *connTable = forwarder_GetConnectionTable(config->forwarder);
+ ListenerOps *listenerOps = listenerSet_FindById(listenerSet, listenerId);
+ if (listenerOps) {
+ ConnectionList *connectionList =connectionTable_GetEntries(connTable);
+ for (size_t i =0; i < connectionList_Length(connectionList); i++) {
+ Connection *connection = connectionList_Get(connectionList, i);
+ const AddressPair *addressPair = connection_GetAddressPair(connection);
+ const Address *address = addressPair_GetLocal(addressPair);
+ if (addressEquals(listenerOps->getListenAddress(listenerOps),address)) {
+ // case for connid as input
+ unsigned connid = connection_GetConnectionId(connection);
+ // remove connection from the FIB
+ forwarder_RemoveConnectionIdFromRoutes(config->forwarder, connid);
+ // remove connection
+ connectionTable_RemoveById(connTable, connid);
+ const char *symbolicConnection = symbolicNameTable_GetNameByIndex(config->symbolicNameTable,connid);
+ symbolicNameTable_Remove(config->symbolicNameTable, symbolicConnection);
+ }
+ }
+ // remove listener
+ listenerSet_RemoveById(listenerSet, listenerId);
+ success = true;
+ } else {
+ logger_Log(forwarder_GetLogger(config->forwarder), LoggerFacility_IO,
+ PARCLogLevel_Error, __func__,
+ "Listener Id not found, check list listeners");
+ }
+ }
+
+ // generate ACK/NACK
+ struct iovec *response;
+
+ if (success) { // ACK
+ response =
+ utils_CreateAck(header, control, sizeof(remove_listener_command));
+ } else { // NACK
+ response =
+ utils_CreateNack(header, control, sizeof(remove_connection_command));
+ }
+
+ return response;
+}
+
+
/**
* Add an IP-based tunnel.
*
@@ -493,7 +560,6 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config,
const char *symbolicOrConnid = control->symbolicOrConnid;
ConnectionTable *table = forwarder_GetConnectionTable(config->forwarder);
-
if (strcmp(symbolicOrConnid, "SELF") == 0) {
forwarder_RemoveConnectionIdFromRoutes(config->forwarder, ingressId);
connectionTable_RemoveById(table, ingressId);
@@ -515,6 +581,9 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config,
forwarder_RemoveConnectionIdFromRoutes(config->forwarder, connid);
// remove connection
connectionTable_RemoveById(table, connid);
+ // remove connection from symbolicNameTable
+ const char *symbolicConnection = symbolicNameTable_GetNameByIndex(config->symbolicNameTable,connid);
+ symbolicNameTable_Remove(config->symbolicNameTable, symbolicConnection);
#ifdef WITH_MAPME
/* Hook: new connection created through the control protocol */
@@ -568,6 +637,8 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config,
}
}
+
+
// generate ACK/NACK
struct iovec *response;
@@ -612,8 +683,8 @@ struct iovec *configuration_ProcessConnectionList(Configuration *config,
list_connections_command *listConnectionsCommand =
(list_connections_command *)(payloadResponse +
(i * sizeof(list_connections_command)));
-
// set structure fields
+
listConnectionsCommand->connid = connection_GetConnectionId(original);
const char *connectionName = symbolicNameTable_GetNameByIndex(config->symbolicNameTable, connection_GetConnectionId(original));
@@ -1202,7 +1273,6 @@ struct iovec *configuration_DispatchCommand(Configuration *config,
struct iovec *control,
unsigned ingressId) {
struct iovec *response = NULL;
-
switch (command) {
case ADD_LISTENER:
response = configurationListeners_Add(config, control, ingressId);
@@ -1229,6 +1299,10 @@ struct iovec *configuration_DispatchCommand(Configuration *config,
response = configuration_ProcessRemoveTunnel(config, control, ingressId);
break;
+ case REMOVE_LISTENER:
+ response = configuration_ProcessRemoveListener(config, control, ingressId);
+ break;
+
case REMOVE_ROUTE:
response = configuration_ProcessUnregisterHicnPrefix(config, control);
break;
diff --git a/hicn-light/src/hicn/config/configurationListeners.c b/hicn-light/src/hicn/config/configurationListeners.c
index 86d8a215a..c321007e2 100644
--- a/hicn-light/src/hicn/config/configurationListeners.c
+++ b/hicn-light/src/hicn/config/configurationListeners.c
@@ -317,6 +317,7 @@ static bool _setupTcpListenerOnInet6Light(Forwarder *forwarder, char *listenerNa
* Create a new IPV6/UDP listener.
*
* @param [in,out] forwarder The hicn-light forwarder instance
+ * @param [in] listenerName The name of the listener
* @param [in] addr6 The ipv6 address in network byte order
* @param [in] port The port number in network byte order
* @param [in] interfaceName The name of the interface to bind the socket
@@ -615,7 +616,8 @@ void configurationListeners_SetutpLocalIPv4(const Configuration *config,
Forwarder *forwarder = configuration_GetForwarder(config);
in_addr_t addr = inet_addr("127.0.0.1");
uint16_t network_byte_order_port = htons(port);
- char listenerNameUdp[16] = "lo_udp";
+
+ char listenerNameUdp[16] = "lo_udp";
char listenerNameTcp[16] = "lo_tcp";
char *loopback_interface = "lo";
_setupUdpListenerOnInet(forwarder, listenerNameUdp,(ipv4_addr_t *)&(addr),
diff --git a/hicn-light/src/hicn/config/controlAddConnection.c b/hicn-light/src/hicn/config/controlAddConnection.c
index e09b61b37..eaa680bde 100644
--- a/hicn-light/src/hicn/config/controlAddConnection.c
+++ b/hicn-light/src/hicn/config/controlAddConnection.c
@@ -42,11 +42,13 @@ static CommandReturn _controlAddConnection_Execute(CommandParser *parser,
// ===================================================
+#ifdef __linux__
static CommandReturn _controlAddConnection_HicnHelpExecute(
CommandParser *parser, CommandOps *ops, PARCList *args);
static CommandReturn _controlAddConnection_HicnExecute(CommandParser *parser,
CommandOps *ops,
PARCList *args);
+#endif
static CommandReturn _controlAddConnection_UdpHelpExecute(CommandParser *parser,
CommandOps *ops,
@@ -65,11 +67,15 @@ static CommandReturn _controlAddConnection_TcpExecute(CommandParser *parser,
// ===================================================
static const char *_commandAddConnection = "add connection";
+#ifdef __linux__
static const char *_commandAddConnectionHicn = "add connection hicn";
+#endif
static const char *_commandAddConnectionUdp = "add connection udp";
static const char *_commandAddConnectionTcp = "add connection tcp";
static const char *_commandAddConnectionHelp = "help add connection";
+#ifdef __linux__
static const char *_commandAddConnectionHicnHelp = "help add connection hicn";
+#endif
static const char *_commandAddConnectionUdpHelp = "help add connection udp";
static const char *_commandAddConnectionTcpHelp = "help add connection tcp";
@@ -89,11 +95,13 @@ CommandOps *controlAddConnection_HelpCreate(ControlState *state) {
// ===================================================
+#ifdef __linux__
static CommandOps *_controlAddConnection_HicnCreate(ControlState *state) {
return commandOps_Create(state, _commandAddConnectionHicn, NULL,
_controlAddConnection_HicnExecute,
commandOps_Destroy);
}
+#endif
static CommandOps *_controlAddConnection_UdpCreate(ControlState *state) {
return commandOps_Create(state, _commandAddConnectionUdp, NULL,
@@ -108,12 +116,13 @@ static CommandOps *_controlAddConnection_TcpCreate(ControlState *state) {
}
// ===================================================
-
+#ifdef __linux__
static CommandOps *_controlAddConnection_HicnHelpCreate(ControlState *state) {
return commandOps_Create(state, _commandAddConnectionHicnHelp, NULL,
_controlAddConnection_HicnHelpExecute,
commandOps_Destroy);
}
+#endif
static CommandOps *_controlAddConnection_UdpHelpCreate(ControlState *state) {
return commandOps_Create(state, _commandAddConnectionUdpHelp, NULL,
@@ -133,7 +142,9 @@ static CommandReturn _controlAddConnection_HelpExecute(CommandParser *parser,
CommandOps *ops,
PARCList *args) {
printf("Available commands:\n");
+#ifdef __linux__
printf(" %s\n", _commandAddConnectionHicn);
+#endif
printf(" %s\n", _commandAddConnectionUdp);
printf(" %s\n", _commandAddConnectionTcp);
printf("\n");
@@ -142,14 +153,17 @@ static CommandReturn _controlAddConnection_HelpExecute(CommandParser *parser,
static void _controlAddConnection_Init(CommandParser *parser, CommandOps *ops) {
ControlState *state = ops->closure;
+#ifdef __linux__
controlState_RegisterCommand(state,
_controlAddConnection_HicnHelpCreate(state));
+#endif
controlState_RegisterCommand(state,
_controlAddConnection_UdpHelpCreate(state));
controlState_RegisterCommand(state,
_controlAddConnection_TcpHelpCreate(state));
-
+#ifdef __linux__
controlState_RegisterCommand(state, _controlAddConnection_HicnCreate(state));
+#endif
controlState_RegisterCommand(state, _controlAddConnection_UdpCreate(state));
controlState_RegisterCommand(state, _controlAddConnection_TcpCreate(state));
}
@@ -255,7 +269,9 @@ static CommandReturn _controlAddConnection_IpHelp(CommandParser *parser,
CommandOps *ops,
PARCList *args,
const char *protocol) {
+#ifdef __linux__
printf("add connection hicn <symbolic> <remote_ip> <local_ip>\n");
+#endif
printf(
"add connection udp <symbolic> <remote_ip> <port> <local_ip> <port>\n");
printf(
@@ -268,6 +284,7 @@ static CommandReturn _controlAddConnection_IpHelp(CommandParser *parser,
return CommandReturn_Success;
}
+#ifdef __linux__
static CommandReturn _controlAddConnection_HicnHelpExecute(
CommandParser *parser, CommandOps *ops, PARCList *args) {
_controlAddConnection_IpHelp(parser, ops, args, "hicn");
@@ -303,6 +320,7 @@ static CommandReturn _controlAddConnection_HicnExecute(CommandParser *parser,
return _controlAddConnection_CreateTunnel(
parser, ops, local_ip, port, remote_ip, port, HICN_CONN, symbolic);
}
+#endif
static CommandReturn _controlAddConnection_UdpHelpExecute(CommandParser *parser,
CommandOps *ops,
diff --git a/hicn-light/src/hicn/config/controlAddListener.c b/hicn-light/src/hicn/config/controlAddListener.c
index c9253425a..cfd061131 100644
--- a/hicn-light/src/hicn/config/controlAddListener.c
+++ b/hicn-light/src/hicn/config/controlAddListener.c
@@ -58,27 +58,26 @@ static const int _indexProtocol = 2;
static const int _indexSymbolic = 3;
static const int _indexAddress = 4;
static const int _indexPort = 5;
-#ifdef __linux__
static const int _indexInterfaceName = 6;
-#endif
static CommandReturn _controlAddListener_HelpExecute(CommandParser *parser,
CommandOps *ops,
PARCList *args) {
printf("commands:\n");
- printf(" add listener hicn <symbolic> <localAddress> \n");
#ifdef __linux__
+ printf(" add listener hicn <symbolic> <localAddress> \n");
+#endif
printf(" add listener udp <symbolic> <localAddress> <port> <interface>\n");
printf(" add listener tcp <symbolic> <localAddress> <port> <interface>\n");
-#else
- printf(" add listener udp <symbolic> <localAddress> <port>\n");
- printf(" add listener tcp <symbolic> <localAddress> <port>\n");
-#endif
printf("\n");
printf(
" symbolic: User defined name for listener, must start with "
"alpha and be alphanum\n");
+#ifdef __linux__
printf(" protocol: hicn | udp\n");
+#else
+ printf(" protocol: udp\n");
+#endif
printf(
" localAddress: IPv4 or IPv6 address (or prefix protocol = hicn) "
"assigend to the local interface\n");
@@ -88,23 +87,18 @@ static CommandReturn _controlAddListener_HelpExecute(CommandParser *parser,
printf("\n");
printf("Notes:\n");
printf(" The symblic name must be unique or the source will reject it.\n");
+#ifdef __linux__
printf(
- " If protocol = hinc: the address 0::0 indicates the main listern, "
+ " If protocol = hicn: the address 0::0 indicates the main listern, "
"for which we can set punting rules.\n");
+#endif
return CommandReturn_Success;
}
-#ifdef __linux__
-static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops,
- const char *symbolic, const char *addr,
- const char *port, const char *interfaceName, listener_mode mode,
- connection_type type) {
-#else
static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops,
const char *symbolic, const char *addr,
- const char *port, listener_mode mode,
+ const char *port, char *interfaceName, listener_mode mode,
connection_type type) {
-#endif
ControlState *state = ops->closure;
// allocate command payload
@@ -126,9 +120,7 @@ static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops,
}
// Fill remaining payload fields
-#ifdef __linux__
memcpy(addListenerCommand->interfaceName, interfaceName, 16);
-#endif
addListenerCommand->listenerMode = mode;
addListenerCommand->connectionType = type;
addListenerCommand->port = htons((uint16_t)atoi(port));
@@ -149,11 +141,7 @@ static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops,
static CommandReturn _controlAddListener_Execute(CommandParser *parser,
CommandOps *ops,
PARCList *args) {
-#ifdef __linux__
if (parcList_Size(args) != 5 && parcList_Size(args) != 7) {
-#else
- if (parcList_Size(args) != 5 && parcList_Size(args) != 6) {
-#endif
_controlAddListener_HelpExecute(parser, ops, args);
return CommandReturn_Failure;
}
@@ -169,45 +157,26 @@ static CommandReturn _controlAddListener_Execute(CommandParser *parser,
return result;
}
- const char *host = parcList_GetAtIndex(args, _indexAddress);
-#ifdef __linux__
- const char *interfaceName = parcList_GetAtIndex(args, _indexInterfaceName);
-#endif
const char *protocol = parcList_GetAtIndex(args, _indexProtocol);
-
+ const char *host = parcList_GetAtIndex(args, _indexAddress);
+ char *interfaceName = parcList_GetAtIndex(args, _indexInterfaceName);
if ((strcasecmp("hicn", protocol) == 0)) {
const char *port =
"1234"; // this is a random port number that will be ignored
// here we discard the prefix len if it exists, since we don't use it in
// code but we let libhicn to find the right ip address.
-#ifdef __linux__
return _CreateListener(parser, ops, symbolic, host, port, "hicn", HICN_MODE,
HICN_CONN);
-#else
- return _CreateListener(parser, ops, symbolic, host, port, HICN_MODE,
- HICN_CONN);
-#endif
}
-
const char *port = parcList_GetAtIndex(args, _indexPort);
if ((strcasecmp("udp", protocol) == 0)) {
-#ifdef __linux__
return _CreateListener(parser, ops, symbolic, host, port, interfaceName, IP_MODE,
UDP_CONN);
-#else
- return _CreateListener(parser, ops, symbolic, host, port, IP_MODE,
- UDP_CONN);
-#endif
} else if ((strcasecmp("tcp", protocol) == 0)) {
-#ifdef __linux__
return _CreateListener(parser, ops, symbolic, host, port, interfaceName, IP_MODE,
TCP_CONN);
-#else
- return _CreateListener(parser, ops, symbolic, host, port, IP_MODE,
- TCP_CONN);
-#endif
} else {
_controlAddListener_HelpExecute(parser, ops, args);
return CommandReturn_Failure;
diff --git a/hicn-light/src/hicn/config/controlListConnections.c b/hicn-light/src/hicn/config/controlListConnections.c
index dbd9707ca..0e8e2c30b 100644
--- a/hicn-light/src/hicn/config/controlListConnections.c
+++ b/hicn-light/src/hicn/config/controlListConnections.c
@@ -120,6 +120,7 @@ static CommandReturn _controlListConnections_Execute(CommandParser *parser,
#endif /* WITH_POLICY */
// Process/Print payload
+
for (int i = 0; i < receivedHeader->length; i++) {
list_connections_command *listConnectionsCommand =
(list_connections_command *)(receivedPayload +
diff --git a/hicn-light/src/hicn/config/controlRemove.c b/hicn-light/src/hicn/config/controlRemove.c
index af833dc8b..ef0c15934 100644
--- a/hicn-light/src/hicn/config/controlRemove.c
+++ b/hicn-light/src/hicn/config/controlRemove.c
@@ -27,6 +27,7 @@
#include <parc/algol/parc_Memory.h>
#include <hicn/config/controlRemove.h>
+#include <hicn/config/controlRemoveListener.h>
#include <hicn/config/controlRemoveConnection.h>
#include <hicn/config/controlRemovePunting.h>
#include <hicn/config/controlRemoveRoute.h>
@@ -62,6 +63,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser,
CommandOps *ops,
PARCList *args) {
CommandOps *ops_remove_connection = controlRemoveConnection_Create(NULL);
+ CommandOps *ops_remove_listener = controlRemoveListener_Create(NULL);
CommandOps *ops_remove_route = controlRemoveRoute_Create(NULL);
CommandOps *ops_remove_punting = controlRemovePunting_Create(NULL);
#ifdef WITH_POLICY
@@ -70,6 +72,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser,
printf("Available commands:\n");
printf(" %s\n", ops_remove_connection->command);
+ printf(" %s\n", ops_remove_listener->command);
printf(" %s\n", ops_remove_route->command);
printf(" %s\n", ops_remove_punting->command);
#ifdef WITH_POLICY
@@ -78,6 +81,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser,
printf("\n");
commandOps_Destroy(&ops_remove_connection);
+ commandOps_Destroy(&ops_remove_listener);
commandOps_Destroy(&ops_remove_route);
commandOps_Destroy(&ops_remove_punting);
#ifdef WITH_POLICY
@@ -90,8 +94,11 @@ static void _controlRemove_Init(CommandParser *parser, CommandOps *ops) {
ControlState *state = ops->closure;
controlState_RegisterCommand(state,
controlRemoveConnection_HelpCreate(state));
+ controlState_RegisterCommand(state,
+ controlRemoveListener_HelpCreate(state));
controlState_RegisterCommand(state, controlRemoveRoute_HelpCreate(state));
controlState_RegisterCommand(state, controlRemoveConnection_Create(state));
+ controlState_RegisterCommand(state, controlRemoveListener_Create(state));
controlState_RegisterCommand(state, controlRemoveRoute_Create(state));
controlState_RegisterCommand(state, controlRemovePunting_Create(state));
controlState_RegisterCommand(state, controlRemovePunting_HelpCreate(state));
diff --git a/hicn-light/src/hicn/config/controlRemoveListener.c b/hicn-light/src/hicn/config/controlRemoveListener.c
new file mode 100644
index 000000000..50581a8d9
--- /dev/null
+++ b/hicn-light/src/hicn/config/controlRemoveListener.c
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/hicn-light/config.h>
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <parc/assert/parc_Assert.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_Network.h>
+#include <hicn/utils/address.h>
+
+#include <hicn/config/controlRemoveListener.h>
+
+#include <hicn/utils/commands.h>
+#include <hicn/utils/utils.h>
+
+static CommandReturn _controlRemoveListener_Execute(CommandParser *parser,
+ CommandOps *ops,
+ PARCList *args);
+static CommandReturn _controlRemoveListener_HelpExecute(CommandParser *parser,
+ CommandOps *ops,
+ PARCList *args);
+
+// ===================================================
+
+static const char *_commandRemoveListener = "remove listener";
+static const char *_commandRemoveListenerHelp = "help remove listener";
+
+// ====================================================
+
+CommandOps *controlRemoveListener_Create(ControlState *state) {
+ return commandOps_Create(state, _commandRemoveListener, NULL,
+ _controlRemoveListener_Execute,
+ commandOps_Destroy);
+}
+
+CommandOps *controlRemoveListener_HelpCreate(ControlState *state) {
+ return commandOps_Create(state, _commandRemoveListenerHelp, NULL,
+ _controlRemoveListener_HelpExecute,
+ commandOps_Destroy);
+}
+
+// ====================================================
+
+static CommandReturn _controlRemoveListener_HelpExecute(CommandParser *parser,
+ CommandOps *ops,
+ PARCList *args) {
+ printf("command:\n");
+ printf(" remove listener <symbolic|id>\n");
+ return CommandReturn_Success;
+}
+
+static CommandReturn _controlRemoveListener_Execute(CommandParser *parser,
+ CommandOps *ops,
+ PARCList *args) {
+ ControlState *state = ops->closure;
+
+ if (parcList_Size(args) != 3) {
+ _controlRemoveListener_HelpExecute(parser, ops, args);
+ return false;
+ }
+
+ if ((strcmp(parcList_GetAtIndex(args, 0), "remove") != 0) ||
+ (strcmp(parcList_GetAtIndex(args, 1), "listener") != 0)) {
+ _controlRemoveListener_HelpExecute(parser, ops, args);
+ return false;
+ }
+
+ const char *listenerId = parcList_GetAtIndex(args, 2);
+
+if (!utils_ValidateSymbolicName(listenerId) &&
+ !utils_IsNumber(listenerId)) {
+ printf(
+ "ERROR: Invalid symbolic or listenerId:\nsymbolic name must begin with an "
+ "alpha followed by alphanum;\nlistenerId must be an integer\n");
+ return CommandReturn_Failure;
+ }
+
+ // allocate command payload
+ remove_listener_command *removeListenerCommand =
+ parcMemory_AllocateAndClear(sizeof(remove_listener_command));
+ // fill payload
+ //removeListenerCommand->listenerId = atoi(listenerId);
+ strncpy(removeListenerCommand->symbolicOrListenerid, listenerId, strlen(listenerId));
+
+ // send message and receive response
+ struct iovec *response =
+ utils_SendRequest(state, REMOVE_LISTENER, removeListenerCommand,
+ sizeof(remove_listener_command));
+
+ if (!response) { // get NULL pointer
+ return CommandReturn_Failure;
+ }
+
+ parcMemory_Deallocate(&response); // free iovec pointer
+ return CommandReturn_Success;
+}
diff --git a/hicn-light/src/hicn/config/controlRemoveListener.h b/hicn-light/src/hicn/config/controlRemoveListener.h
new file mode 100644
index 000000000..794d1e1a9
--- /dev/null
+++ b/hicn-light/src/hicn/config/controlRemoveListener.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file control_RemoveConnection.h
+ * @brief Remove a connection from the connection table
+ *
+ * Implements the "remove connection" and "help remove connection" nodes of the
+ * CLI tree
+ *
+ */
+
+#ifndef Control_RemoveListener_h
+#define Control_RemoveListener_h
+
+#include <hicn/config/controlState.h>
+CommandOps *controlRemoveListener_Create(ControlState *state);
+CommandOps *controlRemoveListener_HelpCreate(ControlState *state);
+#endif // Control_RemoveListener_h
diff --git a/hicn-light/src/hicn/config/symbolicNameTable.c b/hicn-light/src/hicn/config/symbolicNameTable.c
index 746c4e647..723039fae 100644
--- a/hicn-light/src/hicn/config/symbolicNameTable.c
+++ b/hicn-light/src/hicn/config/symbolicNameTable.c
@@ -127,19 +127,36 @@ bool symbolicNameTable_Add(SymbolicNameTable *table, const char *symbolicName,
parcAssertTrue(connid < UINT32_MAX, "Parameter connid must be less than %u",
UINT32_MAX);
- char *key = _createKey(symbolicName);
+ char *key1 = _createKey(symbolicName);
- uint32_t *value = parcMemory_Allocate(sizeof(uint32_t));
- *value = connid;
+ uint32_t *value1 = parcMemory_Allocate(sizeof(uint32_t));
+ *value1 = connid;
- bool success = parcHashCodeTable_Add(table->symbolicNameTable, key, value);
- success = parcHashCodeTable_Add(table->indexToNameTable, value, key);
- if (!success) {
- parcMemory_Deallocate((void **)&key);
- parcMemory_Deallocate((void **)&value);
- }
+ bool success = parcHashCodeTable_Add(table->symbolicNameTable, key1, value1);
+ if (!success)
+ goto ERR_NAME;
+
+ char *key2 = _createKey(symbolicName);
+
+ uint32_t *value2 = parcMemory_Allocate(sizeof(uint32_t));
+ *value2 = connid;
+ success = parcHashCodeTable_Add(table->indexToNameTable, value2, key2);
+ if (!success)
+ goto ERR_INDEX;
+
+ goto END;
+
+ERR_INDEX:
+ parcMemory_Deallocate((void **)&key2);
+ parcMemory_Deallocate((void **)&value2);
+ parcHashCodeTable_Del(table->symbolicNameTable, key1);
+ERR_NAME:
+ parcMemory_Deallocate((void **)&key1);
+ parcMemory_Deallocate((void **)&value1);
+END:
return success;
+
}
unsigned symbolicNameTable_Get(SymbolicNameTable *table,
@@ -152,9 +169,8 @@ unsigned symbolicNameTable_Get(SymbolicNameTable *table,
char *key = _createKey(symbolicName);
uint32_t *value = parcHashCodeTable_Get(table->symbolicNameTable, key);
- if (value) {
+ if (value)
connid = *value;
- }
parcMemory_Deallocate((void **)&key);
return connid;
diff --git a/hicn-light/src/hicn/core/connection.c b/hicn-light/src/hicn/core/connection.c
index 821da884d..fb43d4a1e 100644
--- a/hicn-light/src/hicn/core/connection.c
+++ b/hicn-light/src/hicn/core/connection.c
@@ -32,6 +32,7 @@
#endif /* WITH_POLICY */
struct connection {
+
const AddressPair *addressPair;
IoOperations *ops;
diff --git a/hicn-light/src/hicn/io/hicnListener.c b/hicn-light/src/hicn/io/hicnListener.c
index 995347d6a..a60c4dd12 100644
--- a/hicn-light/src/hicn/io/hicnListener.c
+++ b/hicn-light/src/hicn/io/hicnListener.c
@@ -82,6 +82,12 @@ static EncapType _getEncapType(const ListenerOps *ops);
static int _getSocket(const ListenerOps *ops);
static unsigned _createNewConnection(ListenerOps *listener, int fd, const AddressPair *pair);
static const Connection * _lookupConnection(ListenerOps * listener, const AddressPair *pair);
+static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer);
+static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void);
+static Address *_createAddressFromPacket(uint8_t *msgBuffer);
+static void _handleProbeMessage(ListenerOps * listener, uint8_t *msgBuffer);
+static void _handleWldrNotification(ListenerOps *listener, uint8_t *msgBuffer);
+static void _readFrameToDiscard(HicnListener *hicn, int fd);
static ListenerOps _hicnTemplate = {
.context = NULL,
@@ -96,8 +102,6 @@ static ListenerOps _hicnTemplate = {
.lookupConnection = &_lookupConnection,
};
-static void _hicnListener_readcb(int fd, PARCEventType what, void *hicnVoid);
-
static bool _isEmptyAddressIPv6(Address *address) {
struct sockaddr_in6 *addr6 =
parcMemory_AllocateAndClear(sizeof(struct sockaddr_in6));
@@ -118,6 +122,100 @@ static bool _isEmptyAddressIPv6(Address *address) {
return res;
}
+static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer) {
+ HicnListener * hicn = (HicnListener*)listener->context;
+ Message *message = NULL;
+
+ ssize_t readLength = read(fd, msgBuffer, MTU_SIZE);
+
+ if (readLength < 0) {
+ printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno));
+ return message;
+ }
+
+ size_t packetLength = messageHandler_GetTotalPacketLength(msgBuffer);
+
+ if (readLength != packetLength) {
+ parcMemory_Deallocate((void **)&msgBuffer);
+ return message;
+ }
+
+ if (messageHandler_IsTCP(msgBuffer)) {
+ MessagePacketType pktType;
+ unsigned connid = 0;
+ if (messageHandler_IsData(msgBuffer)) {
+ pktType = MessagePacketType_ContentObject;
+ if (hicn->connection_id == -1) {
+ parcMemory_Deallocate((void **)&msgBuffer);
+ return message;
+ } else {
+ connid = hicn->connection_id;
+ }
+ } else if (messageHandler_IsInterest(msgBuffer)) {
+ // notice that the connections for the interest (the one that we create at
+ // run time) uses as a local address 0::0, so the main tun
+ pktType = MessagePacketType_Interest;
+ Address *packetAddr = _createAddressFromPacket(msgBuffer);
+
+ AddressPair *pair_find = addressPair_Create(packetAddr, /* dummy */ hicn->localAddress);
+ const Connection *conn = _lookupConnection(listener, pair_find);
+ addressPair_Release(&pair_find);
+ if (conn == NULL) {
+ AddressPair *pair = addressPair_Create(hicn->localAddress, packetAddr);
+ connid = _createNewConnection(listener, fd, pair);
+ addressPair_Release(&pair);
+ } else {
+ connid = connection_GetConnectionId(conn);
+ }
+ addressDestroy(&packetAddr);
+ } else {
+ printf("Got a packet that is not a data nor an interest, drop it!\n");
+ parcMemory_Deallocate((void **)&msgBuffer);
+ return message;
+ }
+
+ message = message_CreateFromByteArray(connid, msgBuffer, pktType,
+ forwarder_GetTicks(hicn->forwarder),
+ forwarder_GetLogger(hicn->forwarder));
+ if (message == NULL) {
+ parcMemory_Deallocate((void **)&msgBuffer);
+ }
+ } else if (messageHandler_IsWldrNotification(msgBuffer)) {
+ _handleWldrNotification(listener, msgBuffer);
+ } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) {
+ _handleProbeMessage(listener, msgBuffer);
+ } else {
+ messageHandler_handleHooks(hicn->forwarder, msgBuffer, listener, fd, NULL);
+ parcMemory_Deallocate((void **)&msgBuffer);
+ }
+
+ return message;
+}
+
+static void _receivePacket(ListenerOps * listener, int fd) {
+ HicnListener * hicn = (HicnListener*)listener->context;
+ Message *msg = NULL;
+ uint8_t *msgBuffer = parcMemory_AllocateAndClear(MTU_SIZE);
+ msg = _readMessage(listener, fd, msgBuffer);
+
+ if (msg) {
+ forwarder_Receive(hicn->forwarder, msg);
+ }
+}
+
+static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void) {
+ ListenerOps * listener = (ListenerOps *)listener_void;
+ HicnListener *hicn = (HicnListener *)listener->context;
+
+ if (hicn->inetFamily == IPv4 || hicn->inetFamily == IPv6) {
+ if (what & PARCEventType_Read) {
+ _receivePacket(listener, fd);
+ }
+ } else {
+ _readFrameToDiscard(hicn, fd);
+ }
+}
+
static bool _isEmptyAddressIPv4(Address *address) {
bool res = false;
@@ -133,13 +231,8 @@ ListenerOps *hicnListener_CreateInet(Forwarder *forwarder, char *symbolic,
hicn->forwarder = forwarder;
hicn->listenerName = parcMemory_StringDuplicate(symbolic, strlen(symbolic));
- hicn->logger = logger_Acquire(forwarder_GetLogger(forwarder));
-
- hicn->logger = logger_Acquire(forwarder_GetLogger(forwarder));
hicn->conn_id = forwarder_GetNextConnectionId(forwarder);
- hicn->localAddress = addressCopy(address);
-
hicn->inetFamily = IPv4;
hicn->connection_id = -1;
@@ -322,7 +415,7 @@ bool _hicnListener_BindInet6(ListenerOps *ops, const Address *remoteAddress) {
if (logger_IsLoggable(hicn->logger, LoggerFacility_IO,
PARCLogLevel_Debug)) {
logger_Log(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
- "hicn_bild failed %d %s", res, hicn_socket_strerror(res));
+ "hicn_bind failed %d %s", res, hicn_socket_strerror(res));
}
} else {
result = true;
@@ -354,7 +447,7 @@ bool _hicnListener_BindInet(ListenerOps *ops, const Address *remoteAddress) {
if (logger_IsLoggable(hicn->logger, LoggerFacility_IO,
PARCLogLevel_Debug)) {
logger_Log(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
- "hicn_bild failed %d %s", res, hicn_socket_strerror(res));
+ "hicn_bind failed %d %s", res, hicn_socket_strerror(res));
}
} else {
result = true;
@@ -438,6 +531,7 @@ static const char *_getListenerName(const ListenerOps *ops) {
HicnListener *hicn = (HicnListener *)ops->context;
return hicn->listenerName;
}
+
static const char *_getInterfaceName(const ListenerOps *ops) {
const char *interfaceName = "";
return interfaceName;
@@ -601,96 +695,6 @@ static void _handleWldrNotification(ListenerOps *listener, uint8_t *msgBuffer) {
}
-static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer) {
- HicnListener * hicn = (HicnListener*)listener->context;
- Message *message = NULL;
-
- ssize_t readLength = read(fd, msgBuffer, MTU_SIZE);
-
- if (readLength < 0) {
- printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno));
- return message;
- }
-
- size_t packetLength = messageHandler_GetTotalPacketLength(msgBuffer);
- if (readLength != packetLength) {
- parcMemory_Deallocate((void **)&msgBuffer);
- return message;
- }
- if (messageHandler_IsTCP(msgBuffer)) {
- MessagePacketType pktType;
- unsigned connid = 0;
- if (messageHandler_IsData(msgBuffer)) {
- pktType = MessagePacketType_ContentObject;
- if (hicn->connection_id == -1) {
- parcMemory_Deallocate((void **)&msgBuffer);
- return message;
- } else {
- connid = hicn->connection_id;
- }
- } else if (messageHandler_IsInterest(msgBuffer)) {
- // notice that the connections for the interest (the one that we create at
- // run time) uses as a local address 0::0, so the main tun
- pktType = MessagePacketType_Interest;
- Address *packetAddr = _createAddressFromPacket(msgBuffer);
- AddressPair *pair_find = addressPair_Create(packetAddr, /* dummy */ hicn->localAddress);
- const Connection *conn = _lookupConnection(listener, pair_find);
- addressPair_Release(&pair_find);
- if (conn == NULL) {
- AddressPair *pair = addressPair_Create(hicn->localAddress, packetAddr);
- connid = _createNewConnection(listener, fd, pair);
- addressPair_Release(&pair);
- } else {
- connid = connection_GetConnectionId(conn);
- }
- addressDestroy(&packetAddr);
- } else {
- printf("Got a packet that is not a data nor an interest, drop it!\n");
- parcMemory_Deallocate((void **)&msgBuffer);
- return message;
- }
-
- message = message_CreateFromByteArray(connid, msgBuffer, pktType,
- forwarder_GetTicks(hicn->forwarder),
- forwarder_GetLogger(hicn->forwarder));
- if (message == NULL) {
- parcMemory_Deallocate((void **)&msgBuffer);
- }
- } else if (messageHandler_IsWldrNotification(msgBuffer)) {
- _handleWldrNotification(listener, msgBuffer);
- } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) {
- _handleProbeMessage(listener, msgBuffer);
- } else {
- messageHandler_handleHooks(hicn->forwarder, msgBuffer, listener, fd, NULL);
- parcMemory_Deallocate((void **)&msgBuffer);
- }
-
- return message;
-}
-
-static void _receivePacket(ListenerOps * listener, int fd) {
- HicnListener * hicn = (HicnListener*)listener->context;
- Message *msg = NULL;
- uint8_t *msgBuffer = parcMemory_AllocateAndClear(MTU_SIZE);
- msg = _readMessage(listener, fd, msgBuffer);
-
- if (msg) {
- forwarder_Receive(hicn->forwarder, msg);
- }
-}
-
-static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void) {
- ListenerOps * listener = (ListenerOps *)listener_void;
- HicnListener *hicn = (HicnListener *)listener->context;
-
- if (hicn->inetFamily == IPv4 || hicn->inetFamily == IPv6) {
- if (what & PARCEventType_Read) {
- _receivePacket(listener, fd);
- }
- } else {
- _readFrameToDiscard(hicn, fd);
- }
-}
diff --git a/hicn-light/src/hicn/io/ioOperations.c b/hicn-light/src/hicn/io/ioOperations.c
index 31e37a461..b2d346ed8 100644
--- a/hicn-light/src/hicn/io/ioOperations.c
+++ b/hicn-light/src/hicn/io/ioOperations.c
@@ -41,6 +41,8 @@ const AddressPair *ioOperations_GetAddressPair(const IoOperations *ops) {
return ops->getAddressPair(ops);
}
+
+
bool ioOperations_IsUp(const IoOperations *ops) { return ops->isUp(ops); }
bool ioOperations_IsLocal(const IoOperations *ops) { return ops->isLocal(ops); }
diff --git a/hicn-light/src/hicn/io/listenerSet.c b/hicn-light/src/hicn/io/listenerSet.c
index 3e44973d7..45dbe887a 100644
--- a/hicn-light/src/hicn/io/listenerSet.c
+++ b/hicn-light/src/hicn/io/listenerSet.c
@@ -166,3 +166,17 @@ int listenerSet_FindIdByListenerName(const ListenerSet *set, const char *listene
return index;
}
+
+void listenerSet_RemoveById(const ListenerSet *set, unsigned id) {
+ parcAssertNotNull(set, "Parameter set must be non-null");
+
+ for (size_t i = 0; i < parcArrayList_Size(set->listOfListeners);
+ i++) {
+ ListenerOps *ops = parcArrayList_Get(set->listOfListeners, i);
+ parcAssertNotNull(ops, "Got null listener ops at index %zu", i);
+ if (ops->getInterfaceIndex(ops) == id) {
+ parcArrayList_RemoveAtIndex(set->listOfListeners, i);
+ break;
+ }
+ }
+}
diff --git a/hicn-light/src/hicn/io/listenerSet.h b/hicn-light/src/hicn/io/listenerSet.h
index c8937fa02..5779d2af4 100644
--- a/hicn-light/src/hicn/io/listenerSet.h
+++ b/hicn-light/src/hicn/io/listenerSet.h
@@ -135,7 +135,6 @@ ListenerOps *listenerSet_Get(const ListenerSet *set, size_t index);
ListenerOps *listenerSet_Find(const ListenerSet *set, EncapType encapType,
const Address *localAddress);
-
/**
* Looks up a listener by its id
*
@@ -153,6 +152,7 @@ ListenerOps *listenerSet_Find(const ListenerSet *set, EncapType encapType,
* @endcode
*/
ListenerOps *listenerSet_FindById(const ListenerSet *set, unsigned id);
+
/**
* Looks up a listener by its id
*
@@ -171,4 +171,18 @@ ListenerOps *listenerSet_FindById(const ListenerSet *set, unsigned id);
*/
int listenerSet_FindIdByListenerName(const ListenerSet *set, const char *listenerName);
+/**
+ * Remove up a listener by its id
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [in] set An allocated listener set
+ * @param [in] id of the listener
+ *
+ * Example:
+ * @code
+ *
+ * @endcode
+ */
+void listenerSet_RemoveById(const ListenerSet *set, unsigned id);
#endif
diff --git a/hicn-light/src/hicn/io/streamConnection.c b/hicn-light/src/hicn/io/streamConnection.c
index 224f129f7..08ff728d6 100644
--- a/hicn-light/src/hicn/io/streamConnection.c
+++ b/hicn-light/src/hicn/io/streamConnection.c
@@ -589,6 +589,7 @@ static void _conn_readcb(PARCEventQueue *event, PARCEventType type,
// kind of packets
while (parcEventBuffer_GetLength(input) >= sizeof(header_control_message) &&
parcEventBuffer_GetLength(input) >= stream->nextMessageLength) {
+
if ((command = _isACommand(input)) != LAST_COMMAND_VALUE) {
struct iovec *rx;
// Get message from the stream and set the stream->nextMessageLength
diff --git a/hicn-light/src/hicn/io/tcpListener.c b/hicn-light/src/hicn/io/tcpListener.c
index 4464edf28..e2b80c215 100644
--- a/hicn-light/src/hicn/io/tcpListener.c
+++ b/hicn-light/src/hicn/io/tcpListener.c
@@ -31,7 +31,6 @@
typedef struct tcp_listener {
char *listenerName;
-
Forwarder *forwarder;
Logger *logger;
@@ -49,10 +48,15 @@ typedef struct tcp_listener {
static void _tcpListener_Destroy(_TcpListener **listenerPtr);
static void _tcpListener_OpsDestroy(ListenerOps **listenerOpsPtr);
+
static const char *_tcpListener_ListenerName(const ListenerOps *ops);
+
static unsigned _tcpListener_OpsGetInterfaceIndex(const ListenerOps *ops);
+
static const Address *_tcpListener_OpsGetListenAddress(const ListenerOps *ops);
+
static const char *_tcpListener_InterfaceName(const ListenerOps *ops);
+
static EncapType _tcpListener_OpsGetEncapType(const ListenerOps *ops);
static ListenerOps _tcpTemplate = {
@@ -71,6 +75,7 @@ static void _tcpListener_Listen(int, struct sockaddr *, int socklen,
ListenerOps *tcpListener_CreateInet6(Forwarder *forwarder, char *listenerName,
struct sockaddr_in6 sin6, char *interfaceName) {
+
_TcpListener *tcp = parcMemory_AllocateAndClear(sizeof(_TcpListener));
parcAssertNotNull(tcp, "parcMemory_AllocateAndClear(%zu) returned NULL",
sizeof(_TcpListener));
@@ -123,7 +128,10 @@ ListenerOps *tcpListener_CreateInet(Forwarder *forwarder, char *listenerName,
sizeof(_TcpListener));
tcp->forwarder = forwarder;
+ tcp->listenerName = parcMemory_StringDuplicate(listenerName, strlen(listenerName));
tcp->logger = logger_Acquire(forwarder_GetLogger(forwarder));
+ tcp->interfaceName = parcMemory_StringDuplicate(interfaceName, strlen(interfaceName));
+
tcp->listener = dispatcher_CreateListener(
forwarder_GetDispatcher(forwarder), _tcpListener_Listen, (void *)tcp, -1,
(struct sockaddr *)&sin, sizeof(sin));
@@ -175,7 +183,6 @@ static void _tcpListener_Destroy(_TcpListener **listenerPtr) {
parcMemory_Deallocate((void **)&tcp->listenerName);
parcMemory_Deallocate((void **)&tcp->interfaceName);
-
logger_Release(&tcp->logger);
dispatcher_DestroyListener(forwarder_GetDispatcher(tcp->forwarder),
&tcp->listener);
diff --git a/hicn-light/src/hicn/io/udpListener.c b/hicn-light/src/hicn/io/udpListener.c
index 050ca104c..f43756a11 100644
--- a/hicn-light/src/hicn/io/udpListener.c
+++ b/hicn-light/src/hicn/io/udpListener.c
@@ -40,8 +40,8 @@
#define IPv6 6
struct udp_listener {
- Forwarder *forwarder;
char *listenerName;
+ Forwarder *forwarder;
Logger *logger;
PARCEvent *udp_event;
@@ -76,6 +76,7 @@ static ListenerOps udpTemplate = {
.getInterfaceName = &_getInterfaceName,
};
+
static void _readcb(int fd, PARCEventType what, void * listener_void);
#ifdef __ANDROID__
diff --git a/hicn-light/src/hicn/processor/messageProcessor.c b/hicn-light/src/hicn/processor/messageProcessor.c
index 3ca9264b8..6598b9035 100644
--- a/hicn-light/src/hicn/processor/messageProcessor.c
+++ b/hicn-light/src/hicn/processor/messageProcessor.c
@@ -131,16 +131,18 @@ static void messageProcessor_ForwardToInterfaceId(MessageProcessor *processor,
static void
messageProcessor_Tick(int fd, PARCEventType type, void *user_data)
{
- MessageProcessor *processor = (MessageProcessor*)user_data;
- uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder);
+ MessageProcessor *processor = (MessageProcessor*)user_data;
+ uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder);
- /* Loop over FIB entries to compute statistics from counters */
- FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder);
+ /* Loop over FIB entries to compute statistics from counters */
+ FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder);
- for (size_t i = 0; i < fibEntryList_Length(fibList); i++) {
- FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i);
- fibEntry_UpdateStats(entry, now);
- }
+ for (size_t i = 0; i < fibEntryList_Length(fibList); i++) {
+ FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i);
+ fibEntry_UpdateStats(entry, now);
+ }
+
+ fibEntryList_Destroy(&fibList);
}
#endif /* WITH_POLICY */
diff --git a/hicn-light/src/hicn/socket/ops_linux.c b/hicn-light/src/hicn/socket/ops_linux.c
index f81ceff37..af41f400f 100644
--- a/hicn-light/src/hicn/socket/ops_linux.c
+++ b/hicn-light/src/hicn/socket/ops_linux.c
@@ -574,21 +574,17 @@ int _nl_get_ip_addr(uint32_t interface_id, uint8_t address_family,
if (address_family == AF_INET6) {
if ((payload->ifa_index == interface_id) &&
(payload->ifa_prefixlen < IPV6_ADDR_LEN * 8)) {
- printf("got ip address\n");
memcpy(prefix->address.buffer, RTA_DATA(payload + 1), IPV6_ADDR_LEN);
prefix->family = AF_INET6;
prefix->len = IPV6_ADDR_LEN_BITS;
- printf("returning %d\n", HICN_SOCKET_ERROR_NONE);
return HICN_SOCKET_ERROR_NONE;
}
} else if (address_family == AF_INET) {
if ((payload->ifa_index == interface_id) &&
(payload->ifa_prefixlen < IPV4_ADDR_LEN * 8)) {
- printf("got ip address\n");
memcpy(prefix->address.buffer, RTA_DATA(payload + 1), IPV4_ADDR_LEN);
prefix->family = AF_INET;
prefix->len = IPV4_ADDR_LEN_BITS;
- printf("returning %d\n", HICN_SOCKET_ERROR_NONE);
return HICN_SOCKET_ERROR_NONE;
}
} else {
@@ -600,7 +596,6 @@ ERR_NL:
ERR_RECV:
ERR_SEND:
ERR_SOCKET:
- printf("error getting ip address\n");
return HICN_SOCKET_ERROR_UNSPEC;
}
@@ -624,7 +619,7 @@ int _nl_set_ip_addr(uint32_t interface_id, ip_prefix_t *prefix) {
.payload.ifa_index = interface_id};
/* Set attributes = length/type/value */
- struct rtattr ifa_address = {RTA_LENGTH(ip_prefix_len(prefix)),
+ struct rtattr ifa_address = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)),
IFA_ADDRESS};
struct iovec iov[] = {
{&msg, sizeof(msg)},
@@ -992,13 +987,13 @@ int _nl_del_lo_route(const ip_prefix_t *prefix) {
/* Set attribute = length/type/value */
uint32_t one = 1;
- struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), RTA_DST};
+ struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), RTA_DST};
struct rtattr a_ifid_lo = {RTA_LENGTH(sizeof(uint32_t)), RTA_OIF};
struct iovec iov[] = {
{&msg, sizeof(msg)},
/* Ip address */
{&a_dst, sizeof(a_dst)},
- {(void *)&prefix->address.buffer, ip_prefix_len(prefix)},
+ {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)},
/* Interface id */
{&a_ifid_lo, sizeof(a_ifid_lo)},
{&one, sizeof(one)}};
@@ -1154,7 +1149,7 @@ int _nl_add_neigh_proxy(const ip_prefix_t *prefix,
};
/* Message attributes = length/type/value */
- struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), NDA_DST};
+ struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), NDA_DST};
/* Iovec describing the packets */
struct iovec iov[] = {
@@ -1231,7 +1226,7 @@ int _nl_add_in_route_table(const ip_prefix_t *prefix,
};
/* Message attributes = length/type/value */
- struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), RTA_DST};
+ struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), RTA_DST};
struct rtattr a_oif = {RTA_LENGTH(sizeof(uint32_t)), RTA_OIF};
/* Iovec describing the packets */
@@ -1239,7 +1234,7 @@ int _nl_add_in_route_table(const ip_prefix_t *prefix,
{&msg, sizeof(msg)},
/* Destination prefix / ip address */
{&a_dst, sizeof(a_dst)},
- {(void *)&prefix->address.buffer, ip_prefix_len(prefix)},
+ {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)},
/* Output interface */
{&a_oif, sizeof(a_oif)},
{(void *)&interface_id, sizeof(uint32_t)},
@@ -1333,7 +1328,7 @@ int _nl_add_prio_rule(const ip_prefix_t *prefix, uint8_t address_family,
if (prefix) {
/* Message attributes = length/type/value */
- struct rtattr a_src = {RTA_LENGTH(ip_prefix_len(prefix)), FRA_SRC};
+ struct rtattr a_src = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), FRA_SRC};
struct rtattr a_prio = {RTA_LENGTH(sizeof(uint32_t)), FRA_PRIORITY};
/* Iovec describing the packets */
@@ -1341,7 +1336,7 @@ int _nl_add_prio_rule(const ip_prefix_t *prefix, uint8_t address_family,
{&msg, sizeof(msg)},
/* Source prefix / prefix */
{&a_src, sizeof(a_src)},
- {(void *)&prefix->address.buffer, ip_prefix_len(prefix)},
+ {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)},
/* Priority */
{&a_prio, sizeof(a_prio)},
{(void *)&priority, sizeof(uint32_t)},
@@ -1434,7 +1429,7 @@ int _nl_del_prio_rule(const ip_prefix_t *prefix, uint8_t address_family,
/* Message attributes = length/type/value */
if (prefix) {
- struct rtattr a_src = {RTA_LENGTH(ip_prefix_len(prefix)), FRA_SRC};
+ struct rtattr a_src = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), FRA_SRC};
struct rtattr a_prio = {RTA_LENGTH(sizeof(uint32_t)), FRA_PRIORITY};
/* Iovec describing the packets */
@@ -1442,7 +1437,7 @@ int _nl_del_prio_rule(const ip_prefix_t *prefix, uint8_t address_family,
{&msg, sizeof(msg)},
/* Source prefix / prefix */
{&a_src, sizeof(a_src)},
- {(void *)&prefix->address.buffer, ip_prefix_len(prefix)},
+ {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)},
/* Priority */
{&a_prio, sizeof(a_prio)},
{(void *)&priority, sizeof(uint32_t)},
diff --git a/hicn-light/src/hicn/utils/commands.h b/hicn-light/src/hicn/utils/commands.h
index 834da6259..1a313de6a 100644
--- a/hicn-light/src/hicn/utils/commands.h
+++ b/hicn-light/src/hicn/utils/commands.h
@@ -58,6 +58,7 @@ typedef enum {
ADD_ROUTE,
LIST_ROUTES,
REMOVE_CONNECTION,
+ REMOVE_LISTENER,
REMOVE_ROUTE,
CACHE_STORE,
CACHE_SERVE,
@@ -124,7 +125,7 @@ typedef struct {
uint8_t connectionType;
} add_listener_command;
-// SIZE=40
+// SIZE=56
//========== [01] ADD CONNECTION ==========
@@ -167,11 +168,11 @@ typedef struct {
uint32_t connid;
uint8_t state;
uint8_t admin_state;
- char connectionName[16];
char interfaceName[16];
+ char connectionName[16];
} list_connections_command;
-// SIZE=64
+// SIZE=80
//========== [03] ADD ROUTE ==========
@@ -198,14 +199,18 @@ typedef struct {
// SIZE=24
//========== [05] REMOVE CONNECTION ==========
-
typedef struct {
char symbolicOrConnid[16];
} remove_connection_command;
+//========== [06] REMOVE LISTENER ==========
+typedef struct {
+ char symbolicOrListenerid[16];
+} remove_listener_command;
+
// SIZE=16
-//========== [06] REMOVE ROUTE ==========
+//========== [07] REMOVE ROUTE ==========
typedef struct {
char symbolicOrConnid[16];
@@ -216,7 +221,7 @@ typedef struct {
// SIZE=36
-//========== [07] CACHE STORE ==========
+//========== [08] CACHE STORE ==========
typedef struct {
uint8_t activate;
@@ -224,7 +229,7 @@ typedef struct {
// SIZE=1
-//========== [08] CACHE SERVE ==========
+//========== [09] CACHE SERVE ==========
typedef struct {
uint8_t activate;
@@ -232,15 +237,13 @@ typedef struct {
// SIZE=1
-//========== [09] SET STRATEGY ==========
+//========== [10] SET STRATEGY ==========
typedef enum {
SET_STRATEGY_LOADBALANCER,
SET_STRATEGY_RANDOM,
SET_STRATEGY_RANDOM_PER_DASH_SEGMENT,
SET_STRATEGY_LOADBALANCER_WITH_DELAY,
- SET_STRATEGY_LOADBALANCER_BY_RATE,
- SET_STRATEGY_LOADBALANCER_BEST_ROUTE,
LAST_STRATEGY_VALUE
} strategy_type;
@@ -285,7 +288,7 @@ typedef struct {
uint8_t encapType;
} list_listeners_command;
-// SIZE=24
+// SIZE=56
//========== [14] MAPME ==========
@@ -353,9 +356,11 @@ static inline int payloadLengthDaemon(command_id id) {
case ADD_ROUTE:
return sizeof(add_route_command);
case LIST_ROUTES:
- return 0; // list routes: payload always 0
+ return 0; // list rout`es: payload always 0
case REMOVE_CONNECTION:
return sizeof(remove_connection_command);
+ case REMOVE_LISTENER:
+ return sizeof(remove_listener_command);
case REMOVE_ROUTE:
return sizeof(remove_route_command);
case CACHE_STORE:
diff --git a/hicn-light/src/hicn/utils/utils.c b/hicn-light/src/hicn/utils/utils.c
index 61ff9a904..93a3efd81 100644
--- a/hicn-light/src/hicn/utils/utils.c
+++ b/hicn-light/src/hicn/utils/utils.c
@@ -90,7 +90,6 @@ struct iovec *utils_CreateNack(header_control_message *header, void *payload,
parcMemory_AllocateAndClear(sizeof(struct iovec) * 2);
header->messageType = NACK_LIGHT;
-
response[0].iov_base = header;
response[0].iov_len = sizeof(header_control_message);
response[1].iov_base = payload;
diff --git a/hicn-plugin/src/data_pcslookup_node.c b/hicn-plugin/src/data_pcslookup_node.c
index fdf855e57..1ae36202f 100644
--- a/hicn-plugin/src/data_pcslookup_node.c
+++ b/hicn-plugin/src/data_pcslookup_node.c
@@ -14,12 +14,12 @@
*/
#include "data_pcslookup.h"
+#include "infra.h"
#include "mgmt.h"
#include "parser.h"
-#include "infra.h"
+#include "state.h"
#include "strategy.h"
#include "strategy_dpo_manager.h"
-#include "state.h"
/* Stats string values */
static char *hicn_data_pcslookup_error_strings[] = {
@@ -37,10 +37,9 @@ vlib_node_registration_t hicn_data_pcslookup_node;
* hICN node for handling data. It performs a lookup in the PIT.
*/
static uword
-hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
- vlib_frame_t * frame)
+hicn_data_pcslookup_node_fn (vlib_main_t * vm,
+ vlib_node_runtime_t * node, vlib_frame_t * frame)
{
-
u32 n_left_from, *from, *to_next;
hicn_data_pcslookup_next_t next_index;
hicn_data_pcslookup_runtime_t *rt;
@@ -87,7 +86,8 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
{
vlib_buffer_t *b1;
b1 = vlib_get_buffer (vm, from[1]);
- //Prefetch two cache lines-- 128 byte-- so that we load the hicn_buffer_t as well
+ // Prefetch two cache lines-- 128 byte-- so that we load the
+ // hicn_buffer_t as well
CLIB_PREFETCH (b1, 2 * CLIB_CACHE_LINE_BYTES, STORE);
CLIB_PREFETCH (b1->data, CLIB_CACHE_LINE_BYTES, LOAD);
}
@@ -99,10 +99,9 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
to_next += 1;
n_left_to_next -= 1;
-
b0 = vlib_get_buffer (vm, bi0);
hb0 = hicn_get_buffer (b0);
- next0 = HICN_DATA_PCSLOOKUP_NEXT_ERROR_DROP;
+ next0 = HICN_DATA_PCSLOOKUP_NEXT_ERROR_DROP;
/* Incr packet counter */
stats.pkts_processed += 1;
@@ -111,26 +110,24 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
nameptr = (u8 *) (&name);
if (PREDICT_TRUE (ret0 == HICN_ERROR_NONE &&
- hicn_hashtb_fullhash (nameptr, namelen,
- &name_hash) == HICN_ERROR_NONE))
+ hicn_hashtb_fullhash (nameptr, namelen,
+ &name_hash) ==
+ HICN_ERROR_NONE))
{
int res =
hicn_hashtb_lookup_node (rt->pitcs->pcs_table, nameptr,
namelen, name_hash,
- !(hb0->flags &
- HICN_BUFFER_FLAGS_FACE_IS_APP)
- /* take lock */ ,
+ 1 /*is_data. Do not take lock if hit CS */ ,
&node_id0, &dpo_ctx_id0, &vft_id0,
- &is_cs0,
- &hash_entry_id, &bucket_id,
+ &is_cs0, &hash_entry_id, &bucket_id,
&bucket_is_overflown);
stats.pkts_data_count += 1;
#if HICN_FEATURE_CS
- if ((res == HICN_ERROR_HASHTB_HASH_NOT_FOUND
- || (res == HICN_ERROR_NONE && is_cs0))
- && ((hb0->flags & HICN_BUFFER_FLAGS_FACE_IS_APP)))
+ if ((res == HICN_ERROR_HASHTB_HASH_NOT_FOUND ||
+ (res == HICN_ERROR_NONE && is_cs0)) &&
+ ((hb0->flags & HICN_BUFFER_FLAGS_FACE_IS_APP)))
{
next0 = HICN_DATA_PCSLOOKUP_NEXT_STORE_DATA;
}
@@ -169,9 +166,8 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
* Fix in case of a wrong speculation. Needed to
* clone the data in the right frame
*/
- vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
- to_next, n_left_to_next,
- bi0, next0);
+ vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, next0);
/* Maybe trace */
if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) &&
@@ -217,24 +213,22 @@ hicn_data_pcslookup_format_trace (u8 * s, va_list * args)
return (s);
}
-
/*
* Node registration for the data forwarder node
*/
/* *INDENT-OFF* */
-VLIB_REGISTER_NODE (hicn_data_pcslookup_node) =
+VLIB_REGISTER_NODE(hicn_data_pcslookup_node) =
{
.function = hicn_data_pcslookup_node_fn,
.name = "hicn-data-pcslookup",
- .vector_size = sizeof (u32),
- .runtime_data_bytes = sizeof (hicn_data_pcslookup_runtime_t),
+ .vector_size = sizeof(u32),
+ .runtime_data_bytes = sizeof(hicn_data_pcslookup_runtime_t),
.format_trace = hicn_data_pcslookup_format_trace,
- .type = VLIB_NODE_TYPE_INTERNAL,
- .n_errors = ARRAY_LEN (hicn_data_pcslookup_error_strings),
+ .type = VLIB_NODE_TYPE_INTERNAL,
+ .n_errors = ARRAY_LEN(hicn_data_pcslookup_error_strings),
.error_strings = hicn_data_pcslookup_error_strings,
.n_next_nodes = HICN_DATA_PCSLOOKUP_N_NEXT,
- .next_nodes =
- {
+ .next_nodes = {
[HICN_DATA_PCSLOOKUP_NEXT_V4_LOOKUP] = "ip4-lookup",
[HICN_DATA_PCSLOOKUP_NEXT_V6_LOOKUP] = "ip6-lookup",
[HICN_DATA_PCSLOOKUP_NEXT_STORE_DATA] = "hicn-data-push",
diff --git a/hicn-plugin/src/pcs.h b/hicn-plugin/src/pcs.h
index c7e8a4b59..d9c48954e 100644
--- a/hicn-plugin/src/pcs.h
+++ b/hicn-plugin/src/pcs.h
@@ -499,8 +499,16 @@ hicn_pcs_cs_update (vlib_main_t * vm, hicn_pit_cs_t * pitcs,
policy_vft->hicn_cs_delete_get (pitcs, policy_state,
&node, &pcs_entry, &hash_entry);
- hicn_pcs_cs_delete (vm, pitcs, &pcs_entry, &node, hash_entry, NULL,
- NULL);
+ /*
+ * We don't have to decrease the lock (therefore we cannot
+ * use hicn_pcs_cs_delete function)
+ */
+ policy_vft->hicn_cs_dequeue (pitcs, node, pcs_entry, policy_state);
+
+ hicn_cs_delete_trimmed (pitcs, &pcs_entry, hash_entry, &node, vm);
+
+ /* Update the global CS counter */
+ pitcs->pcs_cs_count--;
}
}
else
@@ -541,14 +549,14 @@ hicn_pcs_cs_delete (vlib_main_t * vm, hicn_pit_cs_t * pitcs,
}
/* A data could have been inserted in the CS through a push. In this case locks == 0 */
- if (hash_entry->locks == 0 || hash_entry->locks == 1)
+ hash_entry->locks--;
+ if (hash_entry->locks == 0)
{
hicn_pcs_delete_internal
(pitcs, pcs_entryp, hash_entry, nodep, vm, dpo_vft, hicn_dpo_id);
}
else
{
- hash_entry->locks--;
hash_entry->he_flags |= HICN_HASH_ENTRY_FLAG_DELETED;
}
}
@@ -603,8 +611,16 @@ hicn_pcs_cs_insert (vlib_main_t * vm, hicn_pit_cs_t * pitcs,
policy_vft->hicn_cs_delete_get (pitcs, policy_state,
&node, &pcs_entry, &hash_entry);
- hicn_pcs_cs_delete (vm, pitcs, &pcs_entry, &node, hash_entry, NULL,
- NULL);
+ /*
+ * We don't have to decrease the lock (therefore we cannot
+ * use hicn_pcs_cs_delete function)
+ */
+ policy_vft->hicn_cs_dequeue (pitcs, node, pcs_entry, policy_state);
+
+ hicn_cs_delete_trimmed (pitcs, &pcs_entry, hash_entry, &node, vm);
+
+ /* Update the global CS counter */
+ pitcs->pcs_cs_count--;
}
}
return ret;
diff --git a/lib/src/name.c b/lib/src/name.c
index a19971d49..d5ee1d520 100644
--- a/lib/src/name.c
+++ b/lib/src/name.c
@@ -74,7 +74,7 @@ hicn_name_create (const char *ip_address, u32 id, hicn_name_t * name)
}
int
-hicn_name_create_from_prefix (const ip_prefix_t * prefix, u32 id,
+hicn_name_create_from_ip_prefix (const ip_prefix_t * prefix, u32 id,
hicn_name_t * name)
{
switch (prefix->family)
diff --git a/lib/src/util/ip_address.c b/lib/src/util/ip_address.c
index 2cf2aaef3..7afd3e2a4 100644
--- a/lib/src/util/ip_address.c
+++ b/lib/src/util/ip_address.c
@@ -190,9 +190,9 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix)
char *addr = strdup (ip_address_str);
p = strchr (addr, '/');
- if (!p)
- ip_prefix->len = 0; // until we get the ip address family
- else {
+ if (!p) {
+ ip_prefix->len = ~0; // until we get the ip address family
+ } else {
ip_prefix->len = strtoul (p + 1, &eptr, 10);
*p = 0;
}
@@ -202,11 +202,15 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix)
switch (ip_prefix->family)
{
case AF_INET6:
+ if (ip_prefix->len == (u8)~0)
+ ip_prefix->len = IPV6_ADDR_LEN_BITS;
if (ip_prefix->len > IPV6_ADDR_LEN_BITS)
goto ERR;
pton_fd = inet_pton (AF_INET6, addr, &ip_prefix->address.buffer);
break;
case AF_INET:
+ if (ip_prefix->len == (u8)~0)
+ ip_prefix->len = IPV4_ADDR_LEN_BITS;
if (ip_prefix->len > IPV4_ADDR_LEN_BITS)
goto ERR;
pton_fd = inet_pton (AF_INET, addr, &ip_prefix->address.buffer);
@@ -220,6 +224,7 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix)
if (pton_fd <= 0)
goto ERR;
+ free(addr);
return 1;
ERR:
free (addr);
diff --git a/libtransport/src/hicn/transport/CMakeLists.txt b/libtransport/src/hicn/transport/CMakeLists.txt
index 6e0ae5b88..22acdcb7f 100644
--- a/libtransport/src/hicn/transport/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/CMakeLists.txt
@@ -25,6 +25,8 @@ add_subdirectory(portability)
add_subdirectory(protocols)
add_subdirectory(utils)
+include(Packager)
+extract_version()
configure_file("config.h.in" "config.h" @ONLY)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/config.h
diff --git a/libtransport/src/hicn/transport/config.h.in b/libtransport/src/hicn/transport/config.h.in
index 7d47c2b3f..ef47affda 100644
--- a/libtransport/src/hicn/transport/config.h.in
+++ b/libtransport/src/hicn/transport/config.h.in
@@ -17,6 +17,10 @@
#cmakedefine TRANSPORT_HAVE_PTHREAD 1
+#define HICNTRANSPORT_VERSION_MAJOR "@VERSION_MAJOR@"
+#define HICNTRANSPORT_VERSION_MINOR "@VERSION_MINOR@"
+#define HICNTRANSPORT_VERSION_REVISION "@VERSION_REVISION@"
+
#ifndef ASIO_STANDALONE
#cmakedefine ASIO_STANDALONE
#endif
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 3ea37c938..5796308b4 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -417,8 +417,14 @@ class Portal {
pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler(
async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler,
this, std::placeholders::_1, hash)));
- pending_interest_hash_table_.emplace(
- std::make_pair(hash, std::move(pending_interest)));
+
+ auto it = pending_interest_hash_table_.find(hash);
+ if(it != pending_interest_hash_table_.end()){
+ it->second->cancelTimer();
+ it->second = std::move(pending_interest);
+ }else{
+ pending_interest_hash_table_[hash] = std::move(pending_interest);
+ }
}
/**
@@ -644,7 +650,7 @@ class Portal {
std::move(interest_ptr->getInterest()), std::move(content_object));
}
} else {
- TRANSPORT_LOGW("No pending interests for current content (%s)",
+ TRANSPORT_LOGD("No pending interests for current content (%s)",
content_object->getName().toString().c_str());
}
}
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
index a5cca78a6..0c2c73623 100644
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -28,6 +28,7 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc
${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.cc b/libtransport/src/hicn/transport/interfaces/callbacks.cc
new file mode 100644
index 000000000..2574e7720
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.cc
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "callbacks.h"
+
+namespace transport {
+
+namespace interface {
+
+std::nullptr_t VOID_HANDLER = nullptr;
+
+} // namespace interface
+
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
index 24f47eb75..7194cca42 100644
--- a/libtransport/src/hicn/transport/interfaces/callbacks.h
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.h
@@ -18,6 +18,8 @@
#include <functional>
#include <system_error>
+#include <hicn/transport/core/facade.h>
+
namespace utils {
class MemBuf;
}
@@ -105,6 +107,8 @@ using ProducerContentObjectCallback =
using ProducerInterestCallback =
std::function<void(ProducerSocket &, core::Interest &)>;
+extern std::nullptr_t VOID_HANDLER;
+
} // namespace interface
} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 740f9f77c..c1a45ebb7 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/rtc_socket_producer.h>
#include <stdlib.h>
#include <time.h>
@@ -31,9 +32,10 @@
#define HICN_MAX_DATA_SEQ 0xefffffff
-//slow production rate param
-#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed
- // through experiments
+// slow production rate param
+#define MIN_PRODUCTION_RATE \
+ 10 // in pacekts per sec. this value is computed
+ // through experiments
#define LIFETIME_FRACTION 0.5
// NACK HEADER
@@ -63,13 +65,13 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false){
+ timer_on_(false) {
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
+ interests_cache_timer_ =
+ std::make_unique<asio::steady_timer>(this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
+ setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -81,13 +83,13 @@ RTCProducerSocket::RTCProducerSocket()
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false){
+ timer_on_(false) {
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
+ interests_cache_timer_ =
+ std::make_unique<asio::steady_timer>(this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
+ setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -111,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::scheduleRoundTimer(){
- round_timer_->expires_from_now(
- std::chrono::milliseconds(STATS_INTERVAL_DURATION));
- round_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- updateStats();
- });
+void RTCProducerSocket::scheduleRoundTimer() {
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(STATS_INTERVAL_DURATION));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
}
void RTCProducerSocket::updateStats() {
@@ -148,111 +150,136 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
producedPackets_++;
- ContentObject content_object(flowName_.setSuffix(currentSeg_));
-
+ auto content_object =
+ std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_.load()));
auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
payload->append(TIMESTAMP_LEN);
payload->prependChain(std::move(buffer));
- content_object.appendPayload(std::move(payload));
+ content_object->appendPayload(std::move(payload));
+
+ content_object->setLifetime(500); // XXX this should be set by the APP
- content_object.setLifetime(500); // XXX this should be set by the APP
+ content_object->setPathLabel(prodLabel_);
- content_object.setPathLabel(prodLabel_);
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object->shared_from_this()));
- if (on_content_object_output_ != VOID_HANDLER) {
- on_content_object_output_(*this, content_object);
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*this, *content_object);
}
- portal_->sendContentObject(content_object);
+ portal_->sendContentObject(*content_object);
- //remove interests from the interest cache if it exists
- if(!seqs_map_.empty()){
+ if (on_content_object_output_) {
+ on_content_object_output_(*this, *content_object);
+ }
- utils::SpinLock::Acquire locked(interests_cache_lock_);
+ uint32_t old_curr = currentSeg_.load();
+ currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ;
- auto it_seqs = seqs_map_.find(currentSeg_);
- if(it_seqs != seqs_map_.end()){
- auto range = timers_map_.equal_range(it_seqs->second);
- for(auto it_timers = range.first; it_timers != range.second; it_timers++){
- if(it_timers->second == it_seqs->first){
- timers_map_.erase(it_timers);
- break;
- }
- }
- seqs_map_.erase(it_seqs);
+ // remove interests from the interest cache if it exists
+ // this generates nacks that will tell to the consumer
+ // that a new data packet was produced
+ if (!seqs_map_.empty()) {
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+ for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
+ if (it->first != old_curr) sendNack(it->first);
}
+ seqs_map_.clear();
+ timers_map_.clear();
}
-
- currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ;
}
void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
uint32_t interestSeg = interest->getName().getSuffix();
uint32_t lifetime = interest->getLifetime();
- if (on_interest_input_ != VOID_HANDLER) {
+ if (on_interest_input_) {
on_interest_input_(*this, *interest);
}
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
- if(interestSeg > HICN_MAX_DATA_SEQ){
+ if (interestSeg > HICN_MAX_DATA_SEQ) {
sendNack(interestSeg);
return;
}
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(*interest);
+
+ if (content_object) {
+ if (on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_(*this, *interest);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ return;
+ } else {
+ if (on_interest_process_) {
+ on_interest_process_(*this, *interest);
+ }
+ }
+
// if the production rate is less than MIN_PRODUCTION_RATE we put the
// interest in a queue, otherwise we handle it in the usual way
- if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){
-
+ if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
+ interestSeg >= currentSeg_.load()) {
utils::SpinLock::Acquire locked(interests_cache_lock_);
uint64_t next_timer = ~0;
- if(!timers_map_.empty()){
+ if (!timers_map_.empty()) {
next_timer = timers_map_.begin()->first;
}
uint64_t expiration = now + (lifetime * LIFETIME_FRACTION);
- //check if the seq number exists already
+ // check if the seq number exists already
auto it_seqs = seqs_map_.find(interestSeg);
- if(it_seqs != seqs_map_.end()){
- //the seq already exists
- if(expiration < it_seqs->second){
+ if (it_seqs != seqs_map_.end()) {
+ // the seq already exists
+ if (expiration < it_seqs->second) {
// we need to update the timer becasue we got a smaller one
// 1) remove the entry from the multimap
// 2) update this entry
auto range = timers_map_.equal_range(it_seqs->second);
- for(auto it_timers = range.first; it_timers != range.second; it_timers++){
- if(it_timers->second == it_seqs->first){
+ for (auto it_timers = range.first; it_timers != range.second;
+ it_timers++) {
+ if (it_timers->second == it_seqs->first) {
timers_map_.erase(it_timers);
break;
}
}
- timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
it_seqs->second = expiration;
- }else{
- //nothing to do here
- return;
+ } else {
+ // nothing to do here
+ return;
}
- }else{
+ } else {
// add the new seq
- timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
- seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration));
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
+ seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration));
}
- //here we have at least one interest in the queue, we need to start or
- //update the timer
- if(!timer_on_){
- //set timeout
+ // here we have at least one interest in the queue, we need to start or
+ // update the timer
+ if (!timer_on_) {
+ // set timeout
timer_on_ = true;
scheduleCacheTimer(timers_map_.begin()->first - now);
} else {
- //re-schedule the timer because a new interest will expires sooner
- if(next_timer > timers_map_.begin()->first){
+ // re-schedule the timer because a new interest will expires sooner
+ if (next_timer > timers_map_.begin()->first) {
interests_cache_timer_->cancel();
scheduleCacheTimer(timers_map_.begin()->first - now);
}
@@ -265,44 +292,44 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
1000.0) *
(double)packetsProductionRate_.load()));
- if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
+ if (interestSeg < currentSeg_.load() ||
+ interestSeg > (max_gap + currentSeg_.load())) {
sendNack(interestSeg);
}
// else drop packet
}
-void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){
- interests_cache_timer_->expires_from_now(
- std::chrono::milliseconds(wait));
+void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) {
+ interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait));
interests_cache_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- interestCacheTimer();
- });
+ if (ec) return;
+ interestCacheTimer();
+ });
}
-void RTCProducerSocket::interestCacheTimer(){
+void RTCProducerSocket::interestCacheTimer() {
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
utils::SpinLock::Acquire locked(interests_cache_lock_);
- for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){
+ for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
uint64_t expire = it_timers->first;
- if(expire <= now){
+ if (expire <= now) {
uint32_t seq = it_timers->second;
sendNack(seq);
- //remove the interest from the other map
+ // remove the interest from the other map
seqs_map_.erase(seq);
it_timers = timers_map_.erase(it_timers);
- }else{
- //stop, we are done!
+ } else {
+ // stop, we are done!
break;
}
}
- if(timers_map_.empty()){
+ if (timers_map_.empty()) {
timer_on_ = false;
- }else{
+ } else {
timer_on_ = true;
scheduleCacheTimer(timers_map_.begin()->first - now);
}
@@ -317,14 +344,14 @@ void RTCProducerSocket::sendNack(uint32_t sequence) {
nack.setName(flowName_.setSuffix(sequence));
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
- *payload_ptr = currentSeg_;
+ *payload_ptr = currentSeg_.load();
*(++payload_ptr) = bytesProductionRate_.load();
nack.setLifetime(0);
nack.setPathLabel(prodLabel_);
- if (on_content_object_output_ != VOID_HANDLER) {
+ if (on_content_object_output_) {
on_content_object_output_(*this, nack);
}
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index a2540ceef..37ba88d8a 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -47,7 +47,7 @@ class RTCProducerSocket : public ProducerSocket {
void scheduleRoundTimer();
void interestCacheTimer();
- uint32_t currentSeg_;
+ std::atomic<uint32_t> currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;
diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h
index 90f6a3ef6..f0194880a 100644
--- a/libtransport/src/hicn/transport/interfaces/socket.h
+++ b/libtransport/src/hicn/transport/interfaces/socket.h
@@ -27,8 +27,6 @@
#define SOCKET_OPTION_NOT_SET 3
#define SOCKET_OPTION_DEFAULT 12345
-#define VOID_HANDLER 0
-
namespace transport {
namespace interface {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index e1afd2161..14cd27b6b 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -45,7 +45,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
rate_estimation_alpha_(default_values::rate_alpha),
rate_estimation_observer_(nullptr),
rate_estimation_choice_(0),
- is_async_(false),
verifier_(std::make_shared<utils::Verifier>()),
verify_signature_(false),
on_interest_output_(VOID_HANDLER),
@@ -58,8 +57,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
virtual_download_(false),
- rtt_stats_(false),
- timer_interval_milliseconds_(0) {
+ timer_interval_milliseconds_(0),
+ guard_raaqm_params_() {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
transport_protocol_ = std::make_unique<CbrTransportProtocol>(this);
@@ -88,7 +87,6 @@ int ConsumerSocket::consume(const Name &name) {
network_name_ = name;
network_name_.setSuffix(0);
- is_async_ = false;
transport_protocol_->start();
@@ -100,7 +98,6 @@ int ConsumerSocket::asyncConsume(const Name &name) {
async_downloader_.add([this, name]() {
network_name_ = std::move(name);
network_name_.setSuffix(0);
- is_async_ = true;
transport_protocol_->start();
});
}
@@ -108,20 +105,6 @@ int ConsumerSocket::asyncConsume(const Name &name) {
return CONSUMER_RUNNING;
}
-void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
- Portal::ConsumerCallback *callback) {
- if (!async_downloader_.stopped()) {
- // TODO Workaround, to be fixed!
- auto i = interest.release();
- async_downloader_.add([this, i, callback]() mutable {
- Interest::Ptr _interest(i);
- portal_->setConsumerCallback(callback);
- portal_->sendInterest(std::move(_interest));
- portal_->runEventsLoop();
- });
- }
-}
-
void ConsumerSocket::stop() {
if (transport_protocol_->isRunning()) {
transport_protocol_->stop();
@@ -138,6 +121,702 @@ asio::io_service &ConsumerSocket::getIoService() {
return portal_->getIoService();
}
+// If the thread calling lambda_func is not the same of io_service, this
+// function reschedule the function on it
+template <typename Lambda, typename arg2>
+int ConsumerSocket::rescheduleOnIOService(int socket_option_key,
+ arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (transport_protocol_->isRunning()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ ReadCallback *socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key, ReadCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ ReadCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key, ReadCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ *socket_option_value = read_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ double socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case MIN_WINDOW_SIZE:
+ min_window_size_ = socket_option_value;
+ break;
+
+ case MAX_WINDOW_SIZE:
+ max_window_size_ = socket_option_value;
+ break;
+
+ case CURRENT_WINDOW_SIZE:
+ current_window_size_ = socket_option_value;
+ break;
+
+ case GAMMA_VALUE:
+ gamma_ = socket_option_value;
+ break;
+
+ case BETA_VALUE:
+ beta_ = socket_option_value;
+ break;
+
+ case DROP_FACTOR:
+ drop_factor_ = socket_option_value;
+ break;
+
+ case MINIMUM_DROP_PROBABILITY:
+ minimum_drop_probability_ = socket_option_value;
+ break;
+
+ case RATE_ESTIMATION_ALPHA:
+ if (socket_option_value >= 0 && socket_option_value < 1) {
+ rate_estimation_alpha_ = socket_option_value;
+ } else {
+ rate_estimation_alpha_ = default_values::alpha;
+ }
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ max_retransmissions_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ interest_lifetime_ = socket_option_value;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ if (socket_option_value > 0) {
+ rate_estimation_batching_parameter_ = socket_option_value;
+ } else {
+ rate_estimation_batching_parameter_ = default_values::batch;
+ }
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ if (socket_option_value > 0) {
+ rate_estimation_choice_ = socket_option_value;
+ } else {
+ rate_estimation_choice_ = default_values::rate_choice;
+ }
+ break;
+
+ case GeneralTransportOptions::STATS_INTERVAL:
+ timer_interval_milliseconds_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key, std::nullptr_t socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_retransmission_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_timeout_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_output_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_input_ = VOID_HANDLER;
+ break;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_verification_ = VOID_HANDLER;
+ break;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ virtual_download_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ verify_signature_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ }
+ return result;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ on_content_object_input_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ on_content_object_verification_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerInterestCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerInterestCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ on_interest_retransmission_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ on_interest_output_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ on_interest_timeout_ = socket_option_value;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ on_interest_satisfied_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerManifestCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerManifestCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ on_manifest_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
+ rate_estimation_observer_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Verifier> &socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::VERIFIER:
+ verifier_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+ default:
+ return result;
+ }
+ }
+
+ return result;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ int result = SOCKET_OPTION_NOT_SET;
+ if (!transport_protocol_->isRunning()) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CERTIFICATE:
+ key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
+
+ if (key_id_ != nullptr) {
+ result = SOCKET_OPTION_SET;
+ }
+ break;
+
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ }
+ return result;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerTimerCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::STATS_SUMMARY:
+ stats_summary_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ double &socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MIN_WINDOW_SIZE:
+ socket_option_value = min_window_size_;
+ break;
+
+ case GeneralTransportOptions::MAX_WINDOW_SIZE:
+ socket_option_value = max_window_size_;
+ break;
+
+ case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
+ socket_option_value = current_window_size_;
+ break;
+
+ // RAAQM parameters
+
+ case RaaqmTransportOptions::GAMMA_VALUE:
+ socket_option_value = gamma_;
+ break;
+
+ case RaaqmTransportOptions::BETA_VALUE:
+ socket_option_value = beta_;
+ break;
+
+ case RaaqmTransportOptions::DROP_FACTOR:
+ socket_option_value = drop_factor_;
+ break;
+
+ case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
+ socket_option_value = minimum_drop_probability_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
+ socket_option_value = rate_estimation_alpha_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ socket_option_value = max_retransmissions_;
+ break;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ socket_option_value = interest_lifetime_;
+ break;
+
+ case RaaqmTransportOptions::SAMPLE_NUMBER:
+ socket_option_value = sample_number_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ socket_option_value = rate_estimation_batching_parameter_;
+ break;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ socket_option_value = rate_estimation_choice_;
+ break;
+
+ case GeneralTransportOptions::STATS_INTERVAL:
+ socket_option_value = timer_interval_milliseconds_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::RUNNING:
+ socket_option_value = transport_protocol_->isRunning();
+ break;
+
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ socket_option_value = virtual_download_;
+ break;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ socket_option_value = verify_signature_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ Name **socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ *socket_option_value = &network_name_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ *socket_option_value = &on_content_object_input_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerContentObjectVerificationCallback **socket_option_value)
+ -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ *socket_option_value = &on_content_object_verification_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerInterestCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerInterestCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ *socket_option_value = &on_interest_retransmission_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ *socket_option_value = &on_interest_output_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ *socket_option_value = &on_interest_timeout_;
+ break;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ *socket_option_value = &on_interest_satisfied_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerManifestCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerManifestCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ *socket_option_value = &on_manifest_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) {
+ utils::SpinLock::Acquire locked(guard_raaqm_params_);
+ switch (socket_option_key) {
+ case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
+ *socket_option_value = (rate_estimation_observer_);
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<utils::Verifier> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::VERIFIER:
+ socket_option_value = verifier_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerTimerCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ConsumerTimerCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::STATS_SUMMARY:
+ *socket_option_value = &stats_summary_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
} // namespace interface
} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 8f7a9718c..e3620b269 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket {
int asyncConsume(const Name &name);
/**
- * Send an interest asynchronously in another thread, which is the same used
- * for asyncConsume.
- *
- * @param interest - An Interest::Ptr to the interest. Notice that the
- * application looses the ownership of the interest, which is transferred to
- * the library itself.
- * @param callback - A ConsumerCallback containing the events to be trigger in
- * case of timeout or content reception.
- *
- */
- void asyncSendInterest(Interest::Ptr &&interest,
- Portal::ConsumerCallback *callback);
-
- /**
* Stops the consumer socket. If several downloads are queued (using
* asyncConsume), this call stops just the current one.
*/
@@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket {
*/
asio::io_service &getIoService() override;
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ReadCallback *socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- read_callback_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ReadCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- *socket_option_value = read_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- double socket_option_value) {
- switch (socket_option_key) {
- case MIN_WINDOW_SIZE:
- min_window_size_ = socket_option_value;
- break;
-
- case MAX_WINDOW_SIZE:
- max_window_size_ = socket_option_value;
- break;
-
- case CURRENT_WINDOW_SIZE:
- current_window_size_ = socket_option_value;
- break;
-
- case GAMMA_VALUE:
- gamma_ = socket_option_value;
- break;
-
- case BETA_VALUE:
- beta_ = socket_option_value;
- break;
-
- case DROP_FACTOR:
- drop_factor_ = socket_option_value;
- break;
-
- case MINIMUM_DROP_PROBABILITY:
- minimum_drop_probability_ = socket_option_value;
- break;
-
- case RATE_ESTIMATION_ALPHA:
- if (socket_option_value >= 0 && socket_option_value < 1) {
- rate_estimation_alpha_ = socket_option_value;
- } else {
- rate_estimation_alpha_ = default_values::alpha;
- }
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- input_buffer_size_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_size_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- max_retransmissions_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- interest_lifetime_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_retransmission_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_timeout_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_output_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_input_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_verification_ = VOID_HANDLER;
- break;
- }
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- if (socket_option_value > 0) {
- rate_estimation_batching_parameter_ = socket_option_value;
- } else {
- rate_estimation_batching_parameter_ = default_values::batch;
- }
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- if (socket_option_value > 0) {
- rate_estimation_choice_ = socket_option_value;
- } else {
- rate_estimation_choice_ = default_values::rate_choice;
- }
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- timer_interval_milliseconds_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- bool socket_option_value) {
- switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- break;
-
- case RaaqmTransportOptions::RTT_STATS:
- rtt_stats_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- verify_signature_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- network_name_ = *socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- on_content_object_input_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ReadCallback *socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ReadCallback **socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ double socket_option_value);
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ virtual int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value);
+
+ virtual int setSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback socket_option_value);
+
+ virtual int setSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- on_content_object_verification_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerInterestCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- on_interest_retransmission_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- on_interest_output_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- on_interest_timeout_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- on_interest_satisfied_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerManifestCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- on_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, IcnObserver *socket_option_value) {
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- rate_estimation_observer_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ ConsumerContentObjectVerificationCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerManifestCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value);
+
+ virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- verifier_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, const std::string &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CERTIFICATE:
- key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
-
- if (key_id_ != nullptr) {
- break;
- }
-
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerTimerCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- stats_summary_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- double &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MIN_WINDOW_SIZE:
- socket_option_value = min_window_size_;
- break;
-
- case GeneralTransportOptions::MAX_WINDOW_SIZE:
- socket_option_value = max_window_size_;
- break;
-
- case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
- socket_option_value = current_window_size_;
- break;
-
- // RAAQM parameters
-
- case RaaqmTransportOptions::GAMMA_VALUE:
- socket_option_value = gamma_;
- break;
-
- case RaaqmTransportOptions::BETA_VALUE:
- socket_option_value = beta_;
- break;
-
- case RaaqmTransportOptions::DROP_FACTOR:
- socket_option_value = drop_factor_;
- break;
-
- case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
- socket_option_value = minimum_drop_probability_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
- socket_option_value = rate_estimation_alpha_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)input_buffer_size_;
- break;
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_size_;
- break;
-
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- socket_option_value = max_retransmissions_;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- socket_option_value = interest_lifetime_;
- break;
-
- case RaaqmTransportOptions::SAMPLE_NUMBER:
- socket_option_value = sample_number_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- socket_option_value = rate_estimation_batching_parameter_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- socket_option_value = rate_estimation_choice_;
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- socket_option_value = timer_interval_milliseconds_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::ASYNC_MODE:
- socket_option_value = is_async_;
- break;
+ const std::shared_ptr<utils::Verifier> &socket_option_value);
- case GeneralTransportOptions::RUNNING:
- socket_option_value = transport_protocol_->isRunning();
- break;
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value);
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
- case RaaqmTransportOptions::RTT_STATS:
- socket_option_value = rtt_stats_;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value);
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- socket_option_value = verify_signature_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ double &socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value);
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- Name **socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- *socket_option_value = &network_name_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- *socket_option_value = &on_content_object_input_;
- break;
+ virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ Name **socket_option_value);
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ virtual int getSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- *socket_option_value = &on_content_object_verification_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerInterestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- *socket_option_value = &on_interest_retransmission_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- *socket_option_value = &on_interest_output_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- *socket_option_value = &on_interest_timeout_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- *socket_option_value = &on_interest_satisfied_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerManifestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- *socket_option_value = &on_manifest_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, IcnObserver **socket_option_value) {
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- *socket_option_value = (rate_estimation_observer_);
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ ConsumerContentObjectVerificationCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerInterestCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerManifestCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value);
+
+ virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- socket_option_value = verifier_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerTimerCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- *socket_option_value = &stats_summary_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ std::shared_ptr<utils::Verifier> &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ std::string &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback **socket_option_value);
+
+ protected:
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func);
private:
asio::io_service internal_io_service_;
@@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket {
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
+ // No need to protect from multiple accesses in the async consumer
+ // The parameter is accessible only with a getSocketOption and
+ // set from the consume
Name network_name_;
int interest_lifetime_;
@@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket {
double max_window_size_;
double current_window_size_;
uint32_t max_retransmissions_;
- size_t output_buffer_size_;
- size_t input_buffer_size_;
// RAAQM Parameters
double minimum_drop_probability_;
@@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket {
int rate_estimation_batching_parameter_;
int rate_estimation_choice_;
- bool is_async_;
-
// Verification parameters
std::shared_ptr<utils::Verifier> verifier_;
PARCKeyId *key_id_;
- bool verify_signature_;
+ std::atomic_bool verify_signature_;
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
@@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket {
// Virtual download for traffic generator
bool virtual_download_;
- bool rtt_stats_;
uint32_t timer_interval_milliseconds_;
// Transport protocol
std::unique_ptr<TransportProtocol> transport_protocol_;
+
+ utils::SpinLock guard_raaqm_params_;
};
} // namespace interface
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
index 13029e83a..bcf103b8c 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
@@ -31,8 +31,6 @@ static constexpr uint32_t content_object_expiry_time =
never_expire_time; // milliseconds -> 50 seconds
static constexpr uint32_t content_object_packet_size =
1500; // The ethernet MTU
-static constexpr uint32_t producer_socket_input_buffer_size =
- 150000; // Interests
static constexpr uint32_t producer_socket_output_buffer_size =
150000; // Content Object
static constexpr uint32_t log_2_default_buffer_size = 12;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
index c21108186..e14f0f412 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -35,22 +35,21 @@ typedef enum {
INTEREST_LIFETIME = 107,
CONTENT_OBJECT_EXPIRY_TIME = 108,
KEY_LOCATOR = 110,
- SIGNATURE_TYPE = 111,
- MIN_WINDOW_SIZE = 112,
- MAX_WINDOW_SIZE = 113,
- CURRENT_WINDOW_SIZE = 114,
- ASYNC_MODE = 115,
- MAKE_MANIFEST = 116,
- PORTAL = 117,
- RUNNING = 118,
- APPLICATION_BUFFER = 119,
- HASH_ALGORITHM = 120,
- CRYPTO_SUITE = 121,
- IDENTITY = 122,
- VERIFIER = 123,
- CERTIFICATE = 124,
- VERIFY_SIGNATURE = 125,
- STATS_INTERVAL = 126
+ MIN_WINDOW_SIZE = 111,
+ MAX_WINDOW_SIZE = 112,
+ CURRENT_WINDOW_SIZE = 113,
+ ASYNC_MODE = 114,
+ MAKE_MANIFEST = 115,
+ PORTAL = 116,
+ RUNNING = 117,
+ APPLICATION_BUFFER = 118,
+ HASH_ALGORITHM = 119,
+ CRYPTO_SUITE = 120,
+ IDENTITY = 121,
+ VERIFIER = 122,
+ CERTIFICATE = 123,
+ VERIFY_SIGNATURE = 124,
+ STATS_INTERVAL = 125
} GeneralTransportOptions;
typedef enum {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 9ca004c41..f90197490 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -37,10 +37,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
output_buffer_(default_values::producer_socket_output_buffer_size),
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
- signature_type_(SHA_256),
hash_algorithm_(HashAlgorithm::SHA_256),
- input_buffer_capacity_(default_values::producer_socket_input_buffer_size),
- input_buffer_size_(0),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -98,22 +95,30 @@ void ProducerSocket::listen() {
void ProducerSocket::passContentObjectToCallbacks(
const std::shared_ptr<ContentObject> &content_object) {
if (content_object) {
- if (on_new_segment_ != VOID_HANDLER) {
- on_new_segment_(*this, *content_object);
+ if (on_new_segment_) {
+ io_service_.dispatch([this, content_object]() {
+ on_new_segment_(*this, *content_object);
+ });
}
- if (on_content_object_to_sign_ != VOID_HANDLER) {
- on_content_object_to_sign_(*this, *content_object);
+ if (on_content_object_to_sign_) {
+ io_service_.dispatch([this, content_object]() {
+ on_content_object_to_sign_(*this, *content_object);
+ });
}
- if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
- on_content_object_in_output_buffer_(*this, *content_object);
+ if (on_content_object_in_output_buffer_) {
+ io_service_.dispatch([this, content_object]() {
+ on_content_object_in_output_buffer_(*this, *content_object);
+ });
}
output_buffer_.insert(content_object);
- if (on_content_object_output_ != VOID_HANDLER) {
- on_content_object_output_(*this, *content_object);
+ if (on_content_object_output_) {
+ io_service_.dispatch([this, content_object]() {
+ on_content_object_output_(*this, *content_object);
+ });
}
portal_->sendContentObject(*content_object);
@@ -121,15 +126,19 @@ void ProducerSocket::passContentObjectToCallbacks(
}
void ProducerSocket::produce(ContentObject &content_object) {
- if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
- on_content_object_in_output_buffer_(*this, content_object);
+ if (on_content_object_in_output_buffer_) {
+ io_service_.dispatch([this, &content_object]() {
+ on_content_object_in_output_buffer_(*this, content_object);
+ });
}
output_buffer_.insert(std::static_pointer_cast<ContentObject>(
content_object.shared_from_this()));
- if (on_content_object_output_ != VOID_HANDLER) {
- on_content_object_output_(*this, content_object);
+ if (on_content_object_output_) {
+ io_service_.dispatch([this, &content_object]() {
+ on_content_object_output_(*this, content_object);
+ });
}
portal_->sendContentObject(content_object);
@@ -142,6 +151,15 @@ uint32_t ProducerSocket::produce(Name content_name,
return 0;
}
+ // Copy the atomic variable to ensure always the same value during the a
+ // production
+ std::size_t data_packet_size = data_packet_size_;
+ uint32_t content_object_expiry_time = content_object_expiry_time_;
+ HashAlgorithm algo = hash_algorithm_;
+ bool making_manifest = making_manifest_;
+ std::shared_ptr<utils::Identity> identity;
+ getSocketOption(GeneralTransportOptions::IDENTITY, identity);
+
auto buffer_size = buffer->length();
const std::size_t hash_size = 32;
@@ -162,7 +180,7 @@ uint32_t ProducerSocket::produce(Name content_name,
std::unique_ptr<utils::CryptoHash> zero_hash;
// TODO Manifest may still be used for indexing
- if (making_manifest_ && !identity_) {
+ if (making_manifest && !identity) {
throw errors::RuntimeException(
"Making manifests without setting producer identity. Aborting.");
}
@@ -182,18 +200,18 @@ uint32_t ProducerSocket::produce(Name content_name,
}
format = hf_format;
- if (making_manifest_) {
+ if (making_manifest) {
format = hf_format;
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- hf_format_ah, identity_->getSignatureLength());
- } else if (identity_) {
+ hf_format_ah, identity->getSignatureLength());
+ } else if (identity) {
format = hf_format_ah;
- signature_length = identity_->getSignatureLength();
+ signature_length = identity->getSignatureLength();
}
header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
- free_space_for_content = data_packet_size_ - header_size;
+ free_space_for_content = data_packet_size - header_size;
uint32_t number_of_segments =
uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
@@ -204,9 +222,9 @@ uint32_t ProducerSocket::produce(Name content_name,
// TODO allocate space for all the headers
- if (making_manifest_) {
+ if (making_manifest) {
auto segment_in_manifest = static_cast<float>(
- std::floor(double(data_packet_size_ - manifest_header_size -
+ std::floor(double(data_packet_size - manifest_header_size -
ContentObjectManifest::getManifestHeaderSize()) /
(4.0 + 32.0)) -
1.0);
@@ -219,8 +237,8 @@ uint32_t ProducerSocket::produce(Name content_name,
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
hash_algorithm_, is_last_manifest, content_name,
core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity_->getSignatureLength()));
- manifest->setLifetime(content_object_expiry_time_);
+ identity->getSignatureLength()));
+ manifest->setLifetime(content_object_expiry_time);
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
@@ -231,21 +249,21 @@ uint32_t ProducerSocket::produce(Name content_name,
uint8_t hash[hash_size];
std::memset(hash, 0, hash_size);
zero_hash = std::make_unique<utils::CryptoHash>(
- hash, hash_size, static_cast<utils::CryptoHashType>(hash_algorithm_));
+ hash, hash_size, static_cast<utils::CryptoHashType>(algo));
}
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest_) {
+ if (making_manifest) {
if (manifest->estimateManifestSize(2) >
- data_packet_size_ - manifest_header_size) {
+ data_packet_size - manifest_header_size) {
// Add next manifest
manifest->addSuffixHash(current_segment, *zero_hash);
// Send the current manifest
manifest->encode();
- identity_->getSigner().sign(*manifest);
+ identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
@@ -258,8 +276,8 @@ uint32_t ProducerSocket::produce(Name content_name,
core::ManifestType::INLINE_MANIFEST, hash_algorithm_,
is_last_manifest, content_name,
core::NextSegmentCalculationStrategy::INCREMENTAL,
- identity_->getSignatureLength()));
- manifest->setLifetime(content_object_expiry_time_);
+ identity->getSignatureLength()));
+ manifest->setLifetime(content_object_expiry_time);
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
} else {
@@ -271,7 +289,7 @@ uint32_t ProducerSocket::produce(Name content_name,
auto content_object = std::make_shared<ContentObject>(
content_name.setSuffix(current_segment), format);
- content_object->setLifetime(content_object_expiry_time_);
+ content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
b->trimStart(free_space_for_content * packaged_segments);
@@ -280,7 +298,7 @@ uint32_t ProducerSocket::produce(Name content_name,
b->append(buffer_size - bytes_segmented);
bytes_segmented += (int)(buffer_size - bytes_segmented);
- if (is_last && making_manifest_) {
+ if (is_last && making_manifest) {
is_last_manifest = true;
} else if (is_last) {
content_object->setRst();
@@ -293,19 +311,19 @@ uint32_t ProducerSocket::produce(Name content_name,
content_object->appendPayload(std::move(b));
- if (making_manifest_) {
+ if (making_manifest) {
using namespace std::chrono_literals;
utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_);
manifest->addSuffixHash(current_segment, hash);
- } else if (identity_) {
- identity_->getSigner().sign(*content_object);
+ } else if (identity) {
+ identity->getSigner().sign(*content_object);
}
current_segment++;
passContentObjectToCallbacks(content_object);
}
- if (making_manifest_) {
+ if (making_manifest) {
if (is_last_manifest) {
manifest->setFinalManifest(is_last_manifest);
}
@@ -315,13 +333,15 @@ uint32_t ProducerSocket::produce(Name content_name,
}
manifest->encode();
- identity_->getSigner().sign(*manifest);
+ identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
}
- if (on_content_produced_ != VOID_HANDLER) {
- on_content_produced_(*this, std::make_error_code(std::errc(0)),
- buffer_size);
+ if (on_content_produced_) {
+ io_service_.dispatch([this, buffer_size]() {
+ on_content_produced_(*this, std::make_error_code(std::errc(0)),
+ buffer_size);
+ });
}
return current_segment - start_offset;
@@ -347,7 +367,7 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
}
void ProducerSocket::onInterest(Interest &interest) {
- if (on_interest_input_ != VOID_HANDLER) {
+ if (on_interest_input_) {
on_interest_input_(*this, interest);
}
@@ -355,22 +375,571 @@ void ProducerSocket::onInterest(Interest &interest) {
output_buffer_.find(interest);
if (content_object) {
- if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) {
+ if (on_interest_satisfied_output_buffer_) {
on_interest_satisfied_output_buffer_(*this, interest);
}
- if (on_content_object_output_ != VOID_HANDLER) {
+ if (on_content_object_output_) {
on_content_object_output_(*this, *content_object);
}
portal_->sendContentObject(*content_object);
} else {
- if (on_interest_process_ != VOID_HANDLER) {
+ if (on_interest_process_) {
on_interest_process_(*this, interest);
}
}
}
+// If the thread calling lambda_func is not the same of io_service, this
+// function reschedule the function on it
+template <typename Lambda, typename arg2>
+int ProducerSocket::rescheduleOnIOService(int socket_option_key,
+ arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+}
+
+// If the thread calling lambda_func is not the same of io_service, this
+// function reschedule the function on it
+template <typename Lambda, typename arg2>
+int ProducerSocket::rescheduleOnIOServiceWithReference(
+ int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2 &)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != this->listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ std::unique_lock<std::mutex> lck(mtx);
+ bool done = false;
+ io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx,
+ &cv, &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+
+ if (!done) {
+ cv.wait(lck);
+ }
+ });
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ if (socket_option_value < default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ data_packet_size_ = socket_option_value;
+ break;
+ }
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_.setLimit(socket_option_value);
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_input_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_dropped_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_inserted_input_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_process_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_new_segment_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_in_output_buffer_ = VOID_HANDLER;
+ break;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_output_ = VOID_HANDLER;
+ break;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ Name *socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ served_namespaces_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ on_new_segment_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ on_content_object_in_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ on_content_object_output_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ break;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_ = socket_option_value;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ hash_algorithm_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CRYPTO_SUITE:
+ crypto_suite_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key,
+ const std::shared_ptr<utils::Identity> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY: {
+ utils::SpinLock::Acquire locked(identity_lock_);
+ identity_.reset();
+ identity_ = socket_option_value;
+ } break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = (uint32_t)output_buffer_.getLimit();
+ break;
+
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ socket_option_value = (uint32_t)data_packet_size_;
+ break;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ socket_option_value = content_object_expiry_time_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ socket_option_value = served_namespaces_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentObjectCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ *socket_option_value = &on_new_segment_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ *socket_option_value = &on_content_object_to_sign_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ *socket_option_value = &on_content_object_in_output_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ *socket_option_value = &on_content_object_output_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerContentCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ *socket_option_value = &on_content_produced_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ProducerInterestCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in case
+ // setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerInterestCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ *socket_option_value = &on_interest_input_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ *socket_option_value = &on_interest_dropped_input_buffer_;
+ break;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ *socket_option_value = &on_interest_inserted_input_buffer_;
+ break;
+
+ case CACHE_HIT:
+ *socket_option_value = &on_interest_satisfied_output_buffer_;
+ break;
+
+ case CACHE_MISS:
+ *socket_option_value = &on_interest_process_;
+ break;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ ;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = hash_algorithm_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = crypto_suite_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<utils::Identity> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY: {
+ utils::SpinLock::Acquire locked(identity_lock_);
+ socket_option_value = identity_;
+ } break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
+
asio::io_service &ProducerSocket::getIoService() { return io_service_; }
} // namespace interface
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 18adbf4a7..5c617d761 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -91,500 +91,116 @@ class ProducerSocket : public Socket<BasePortal>,
onInterest(*interest);
};
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- if (socket_option_value < default_values::max_content_object_size &&
- socket_option_value > 0) {
- data_packet_size_ = socket_option_value;
- break;
- }
-
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- if (socket_option_value >= 1) {
- input_buffer_capacity_ = socket_option_value;
- break;
- }
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
- break;
-
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- content_object_expiry_time_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::SIGNATURE_TYPE:
- if (socket_option_value == SOCKET_OPTION_DEFAULT) {
- signature_type_ = SHA_256;
- } else {
- signature_type_ = socket_option_value;
- }
-
- if (signature_type_ == SHA_256 || signature_type_ == RSA_256) {
- signature_size_ = 32;
- }
-
- break;
-
- case ProducerCallbacksOptions::INTEREST_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_input_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_dropped_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_inserted_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_HIT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_MISS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_process_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- if (socket_option_value == VOID_HANDLER) {
- on_new_segment_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_in_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_output_ = VOID_HANDLER;
- break;
- }
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- bool socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- making_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- return SOCKET_OPTION_NOT_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- on_new_segment_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- on_content_object_in_output_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value);
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- on_content_object_output_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key, Name *socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ProducerInterestCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- on_interest_input_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value);
- case ProducerCallbacksOptions::INTEREST_DROP:
- on_interest_dropped_input_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value);
- case ProducerCallbacksOptions::INTEREST_PASS:
- on_interest_inserted_input_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value);
- case ProducerCallbacksOptions::CACHE_HIT:
- on_interest_satisfied_output_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- on_interest_process_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ProducerContentCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- on_content_produced_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value);
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, HashAlgorithm socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- hash_algorithm_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, utils::CryptoSuite socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Identity> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY:
- identity_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, const std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- ;
- }
+ const std::shared_ptr<utils::Identity> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)input_buffer_capacity_;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value);
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
- break;
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value);
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- socket_option_value = (uint32_t)data_packet_size_;
- break;
+ virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- socket_option_value = content_object_expiry_time_;
- break;
-
- case GeneralTransportOptions::SIGNATURE_TYPE:
- socket_option_value = signature_type_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- socket_option_value = making_manifest_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ virtual int getSocketOption(
int socket_option_key,
- ProducerContentObjectCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- *socket_option_value = &on_new_segment_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- *socket_option_value = &on_content_object_in_output_buffer_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- *socket_option_value = &on_content_object_output_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ProducerContentCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- *socket_option_value = &on_content_produced_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ProducerInterestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- *socket_option_value = &on_interest_input_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- *socket_option_value = &on_interest_dropped_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- *socket_option_value = &on_interest_inserted_input_buffer_;
- break;
+ ProducerContentObjectCallback **socket_option_value);
- case CACHE_HIT:
- *socket_option_value = &on_interest_satisfied_output_buffer_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ ProducerContentCallback **socket_option_value);
- case CACHE_MISS:
- *socket_option_value = &on_interest_process_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ ProducerInterestCallback **socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value);
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- ;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, HashAlgorithm &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = hash_algorithm_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<utils::Identity> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, utils::CryptoSuite &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::string &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Identity> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY:
- if (identity_) {
- socket_option_value = identity_;
- break;
- }
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func);
- private:
+ protected:
// Threads
std::thread listening_thread_;
- protected:
asio::io_service internal_io_service_;
asio::io_service &io_service_;
std::shared_ptr<Portal> portal_;
- std::size_t data_packet_size_;
- std::list<Prefix> served_namespaces_;
- uint32_t content_object_expiry_time_;
+ std::atomic<size_t> data_packet_size_;
+ std::list<Prefix>
+ served_namespaces_; // No need to be threadsafe, this is always modified
+ // by the application thread
+ std::atomic<uint32_t> content_object_expiry_time_;
// buffers
+ // ContentStore is thread-safe
utils::ContentStore output_buffer_;
- private:
utils::EventThread async_thread_;
int registration_status_;
- bool making_manifest_;
+ std::atomic<bool> making_manifest_;
// map for storing sequence numbers for several calls of the publish
// function
std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
- int signature_type_;
- int signature_size_;
-
- HashAlgorithm hash_algorithm_;
- utils::CryptoSuite crypto_suite_;
+ std::atomic<HashAlgorithm> hash_algorithm_;
+ std::atomic<utils::CryptoSuite> crypto_suite_;
+ utils::SpinLock identity_lock_;
std::shared_ptr<utils::Identity> identity_;
- // buffers
-
- std::queue<std::shared_ptr<const Interest>> input_buffer_;
- std::atomic_size_t input_buffer_capacity_;
- std::atomic_size_t input_buffer_size_;
-
// callbacks
- protected:
ProducerInterestCallback on_interest_input_;
ProducerInterestCallback on_interest_dropped_input_buffer_;
ProducerInterestCallback on_interest_inserted_input_buffer_;
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
index 9caa2eca7..8da9529d6 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -23,23 +23,28 @@ namespace protocol {
using namespace interface;
TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket)
- : socket_(icn_socket), is_running_(false) {
+ : socket_(icn_socket), is_running_(false), is_first_(false) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
}
int TransportProtocol::start() {
- // If the protocol is already running, return
+ // If the protocol is already running, return otherwise set as running
if (is_running_) return -1;
- // Set the protocol as running
- is_running_ = true;
-
// Reset the protocol state machine
reset();
+ // Set it is the first time we schedule an interest
+ is_first_ = true;
+
// Schedule next interests
scheduleNextInterests();
+ is_first_ = false;
+
+ // Set the protocol as running
+ is_running_ = true;
+
// Start Event loop
portal_->runEventsLoop();
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index 88889bb8c..e4821b6a0 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -15,6 +15,8 @@
#pragma once
+#include <atomic>
+
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/protocols/packet_manager.h>
#include <hicn/transport/protocols/statistics.h>
@@ -60,7 +62,9 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
protected:
interface::ConsumerSocket *socket_;
std::shared_ptr<interface::BasePortal> portal_;
- volatile bool is_running_;
+ std::atomic<bool> is_running_;
+ // True if it si the first time we schedule an interest
+ std::atomic<bool> is_first_;
TransportStatistics stats_;
};
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 574693c51..c816158f9 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -318,17 +318,17 @@ void RaaqmTransportProtocol::onContentObject(
}
// Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = nullptr;
+ ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
&callback_content_object);
- if (*callback_content_object != VOID_HANDLER) {
+ if (*callback_content_object) {
(*callback_content_object)(*socket_, *content_object);
}
- ConsumerInterestCallback *callback_interest = nullptr;
+ ConsumerInterestCallback *callback_interest = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
&callback_interest);
- if (*callback_content_object != VOID_HANDLER) {
+ if (*callback_content_object) {
(*callback_interest)(*socket_, *interest);
}
@@ -369,10 +369,10 @@ void RaaqmTransportProtocol::onContentSegment(
reassemble(std::move(content_object));
} else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
index_manager_->getFinalSuffix())) {
- interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
socket_->getSocketOption(READ_CALLBACK, &on_payload);
- if (on_payload != nullptr) {
+ if (on_payload) {
on_payload->readSuccess(stats_.getBytesRecv());
}
}
@@ -404,10 +404,10 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
return;
}
- ConsumerInterestCallback *callback = nullptr;
+ ConsumerInterestCallback *callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
&callback);
- if (*callback != VOID_HANDLER) {
+ if (*callback) {
(*callback)(*socket_, *interest);
}
@@ -420,17 +420,17 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
max_rtx)) {
stats_.updateRetxCount(1);
- callback = nullptr;
+ callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
&callback);
- if (*callback != VOID_HANDLER) {
+ if (*callback) {
(*callback)(*socket_, *interest);
}
- callback = nullptr;
+ callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
&callback);
- if ((*callback) != VOID_HANDLER) {
+ if (*callback) {
(*callback)(*socket_, *interest);
}
@@ -450,7 +450,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::scheduleNextInterests() {
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
return;
}
@@ -490,14 +490,14 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_lifetime);
interest->setLifetime(interest_lifetime);
- ConsumerInterestCallback *callback = nullptr;
+ ConsumerInterestCallback *callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
&callback);
- if (*callback != VOID_HANDLER) {
+ if (*callback) {
callback->operator()(*socket_, *interest);
}
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
return;
}
@@ -516,10 +516,10 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
socket_->getSocketOption(READ_CALLBACK, &on_payload);
- if (on_payload == nullptr) {
+ if (on_payload) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
@@ -581,10 +581,10 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
stats_.updateAverageWindowSize(current_window_size_);
// Call statistics callback
- ConsumerTimerCallback *stats_callback = nullptr;
+ ConsumerTimerCallback *stats_callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
&stats_callback);
- if (*stats_callback != VOID_HANDLER) {
+ if (*stats_callback) {
auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
uint32_t timer_interval_milliseconds = 0;
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index accd98495..e6134f767 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -87,11 +87,14 @@ void RTCTransportProtocol::reset() {
interestRetransmissions_.clear();
lastSegNacked_ = 0;
lastReceived_ = 0;
+ lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
highestReceived_ = 0;
firstSequenceInRound_ = 0;
rtx_timer_used_ = false;
- for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){
+ for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) {
inflightInterests_[i] = {0};
}
@@ -188,14 +191,13 @@ void RTCTransportProtocol::updateDelayStats(
pathTable_[pathLabel]->insertOwdSample(OWD);
pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber);
- }else{
+ } else {
pathTable_[pathLabel]->receivedNack();
}
}
void RTCTransportProtocol::updateStats(uint32_t round_duration) {
- if(pathTable_.empty())
- return;
+ if (pathTable_.empty()) return;
if (receivedBytes_ != 0) {
double bytesPerSec =
@@ -210,68 +212,70 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
it->second->roundEnd();
- if(it->second->isActive()){
- if(it->second->getMinRtt() < minRtt){
+ if (it->second->isActive()) {
+ if (it->second->getMinRtt() < minRtt) {
minRtt = it->second->getMinRtt();
producerPathLabels_[0] = it->first;
}
- if(it->second->getMinRtt() > maxRtt){
+ if (it->second->getMinRtt() > maxRtt) {
maxRtt = it->second->getMinRtt();
producerPathLabels_[1] = it->first;
}
}
}
- if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
- return; //this should not happen
+ return; // this should not happen
- //as a queuing delay we keep the lowest one among the two paths
- //if one path is congested the forwarder should decide to do not
- //use it soon so it does not make sens to inform the application
- //that maybe we have a problem
- if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
+ // as a queuing delay we keep the lowest one among the two paths
+ // if one path is congested the forwarder should decide to do not
+ // use it so it does not make sense to inform the application
+ // that maybe we have a problem
+ if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
pathTable_[producerPathLabels_[1]]->getQueuingDealy())
- queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
+ queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
else
- queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
+ queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
- uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_;
+ uint32_t numberTheoricallyReceivedPackets_ =
+ highestReceived_ - firstSequenceInRound_;
double lossRate = 0;
- if(numberTheoricallyReceivedPackets_ != 0)
- lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_);
+ if (numberTheoricallyReceivedPackets_ != 0)
+ lossRate = (double)((double)(packetLost_ - lossRecovered_) /
+ (double)numberTheoricallyReceivedPackets_);
- if(lossRate < 0)
- lossRate = 0;
+ if (lossRate < 0) lossRate = 0;
- if(initied){
+ if (initied) {
lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
- (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
- }else {
- lossRate_ =lossRate;
+ (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
+ } else {
+ lossRate_ = lossRate;
initied = true;
}
}
if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE;
- //for the BDP we use the max rtt, so that we calibrate the window on the
- //RTT of the slowest path. In this way we are sure that the window will
- //never be too small
+ // for the BDP we use the max rtt, so that we calibrate the window on the
+ // RTT of the slowest path. In this way we are sure that the window will
+ // never be too small
uint32_t BDP = (uint32_t)ceil(
- (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() /
- (double)HICN_MILLI_IN_A_SEC) *
- HICN_BANDWIDTH_SLACK_FACTOR) /
- avgPacketSize_);
+ (estimatedBw_ *
+ (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() /
+ (double)HICN_MILLI_IN_A_SEC) *
+ HICN_BANDWIDTH_SLACK_FACTOR) /
+ avgPacketSize_);
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
ConsumerTimerCallback *stats_callback = nullptr;
socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
&stats_callback);
- if (*stats_callback != VOID_HANDLER) {
- //Send the stats to the app
+ if (*stats_callback) {
+ // Send the stats to the app
stats_.updateQueuingDelay(queuingDelay_);
stats_.updateLossRatio(lossRate_);
(*stats_callback)(*socket_, stats_);
@@ -331,8 +335,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
// currentState = RTC_NORMAL_STATE
if (BDPWin != 0) {
- maxCWin_ =
- (uint32_t)ceil((double)BDPWin + ((double)BDPWin / 10.0)); // BDP + 10%
+ maxCWin_ = (uint32_t)ceil((double)BDPWin +
+ (((double)BDPWin * 30.0) / 100.0)); // BDP + 30%
} else {
maxCWin_ = min(maxWaintingInterest, maxCWin_);
}
@@ -377,22 +381,22 @@ void RTCTransportProtocol::increaseWindow() {
}
}
-void RTCTransportProtocol::probeRtt(){
+void RTCTransportProtocol::probeRtt() {
time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- //get a random numbe in the probe seq range
+ &interest_name);
+ // get a random numbe in the probe seq range
std::default_random_engine eng((std::random_device())());
- std::uniform_int_distribution<uint32_t> idis(
- HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ);
+ std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ,
+ HICN_MAX_PROBE_SEQ);
probe_seq_number_ = idis(eng);
interest_name->setSuffix(probe_seq_number_);
- //we considere the probe as a rtx so that we do not incresea inFlightInt
+ // we considere the probe as a rtx so that we do not incresea inFlightInt
received_probe_ = false;
sendInterest(interest_name, true);
@@ -403,7 +407,6 @@ void RTCTransportProtocol::probeRtt(){
});
}
-
void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
auto interest = getPacket();
interest->setName(*interest_name);
@@ -418,11 +421,11 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
&on_interest_output);
- if (*on_interest_output != VOID_HANDLER) {
+ if (*on_interest_output) {
(*on_interest_output)(*socket_, *interest);
}
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
return;
}
@@ -437,20 +440,20 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
void RTCTransportProtocol::scheduleNextInterests() {
checkRound();
- if (!is_running_) return;
+ if (!is_running_ && !is_first_) return;
while (inflightInterestsCount_ < currentCWin_) {
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- interest_name->setSuffix(actualSegment_);
+ interest_name->setSuffix(actualSegment_);
// if the producer socket is not stated (does not reply even with nacks)
// we keep asking for something without marking anything as lost (see
// timeout). In this way when the producer socket will start the
- //consumer socket will not miss any packet
- if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){
+ // consumer socket will not miss any packet
+ if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
uint32_t pkt = actualSegment_ & modMask_;
inflightInterests_[pkt].state = sent_;
inflightInterests_[pkt].sequence = actualSegment_;
@@ -474,9 +477,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
continue;
}
- //same if the packet is lost
+ // same if the packet is lost
if (inflightInterests_[pkt].state == lost_ &&
- inflightInterests_[pkt].sequence == actualSegment_){
+ inflightInterests_[pkt].sequence == actualSegment_) {
actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
continue;
}
@@ -486,7 +489,7 @@ void RTCTransportProtocol::scheduleNextInterests() {
std::chrono::steady_clock::now().time_since_epoch())
.count();
- //here the packet can be in any state except for lost or recevied
+ // here the packet can be in any state except for lost or recevied
inflightInterests_[pkt].state = sent_;
inflightInterests_[pkt].sequence = actualSegment_;
actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
@@ -503,30 +506,40 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) {
void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ bool new_rtx = false;
for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
if (it == interestRetransmissions_.end()) {
- if (lastSegNacked_ <= i) {
+ uint32_t pkt = i & modMask_;
+ if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) {
// it must be larger than the last past nack received
packetLost_++;
interestRetransmissions_[i] = 0;
uint32_t pkt = i & modMask_;
- //we reset the transmission time setting to now, so that rtx will
- //happne in one RTT on waint one inter arrival gap
+ // we reset the transmission time setting to now, so that rtx will
+ // happne in one RTT on waint one inter arrival gap
inflightInterests_[pkt].transmissionTime = now;
+ new_rtx = true;
}
} // if the retransmission is already there the rtx timer will
// take care of it
}
- if(!rtx_timer_used_)
+ // in case a new rtx is added to the map we need to run checkRtx()
+ if (new_rtx) {
+ if (rtx_timer_used_) {
+ // if a timer is pending we need to delete it
+ rtx_timer_->cancel();
+ rtx_timer_used_ = false;
+ }
checkRtx();
+ }
}
-void RTCTransportProtocol::retransmit() {
+uint64_t RTCTransportProtocol::retransmit() {
auto it = interestRetransmissions_.begin();
// cut len to max HICN_MAX_RTX_SIZE
@@ -537,6 +550,10 @@ void RTCTransportProtocol::retransmit() {
}
it = interestRetransmissions_.begin();
+ uint64_t smallest_timeout = ULONG_MAX;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
while (it != interestRetransmissions_.end()) {
uint32_t pkt = it->first & modMask_;
@@ -560,65 +577,75 @@ void RTCTransportProtocol::retransmit() {
continue;
}
-
- uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
- uint64_t rtx_time = sent_time;
-
- if(it->second == 0){
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
- //first rtx: wait RTTmax - RTTmin + gap
- rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt() +
- pathTable_[producerPathLabels_[1]]->getInterArrivalGap();
+ uint64_t rtx_time = now;
+
+ if (it->second == 0) {
+ // first rtx
+ if (producerPathLabels_[0] != producerPathLabels_[1]) {
+ // multipath
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end() &&
+ (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
+ HICN_MIN_INTER_ARRIVAL_GAP)) {
+ rtx_time = lastReceivedTime_ +
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
+ } // else low rate producer, send it immediatly
+ } else {
+ // single path
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
+ HICN_MIN_INTER_ARRIVAL_GAP)) {
+ rtx_time = lastReceivedTime_ +
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
+ } // else low rate producer send immediatly
}
- }else{
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){
- //second+ rtx: waint min rtt
+ } else {
+ // second or plus rtx, wait for the min rtt
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) {
+ uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt();
- }
+ } // if we don't have info we send it immediatly
}
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if(now >= rtx_time){
+ if (now >= rtx_time) {
inflightInterests_[pkt].transmissionTime = now;
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
+ } else if (rtx_time < smallest_timeout) {
+ smallest_timeout = rtx_time;
}
++it;
}
+ return smallest_timeout;
}
void RTCTransportProtocol::checkRtx() {
- if(interestRetransmissions_.empty()){
+ if (interestRetransmissions_.empty()) {
rtx_timer_used_ = false;
return;
}
- //we use the packet intearriva time on the fastest path
- //even if this stats should be the same on both
- auto pathStats = pathTable_.find(producerPathLabels_[0]);
+ uint64_t next_timeout = retransmit();
uint64_t wait = 1;
- if(pathStats != pathTable_.end()){
- wait = floor(pathStats->second->getInterArrivalGap() / 2.0);
- if(wait < 1)
- wait = 1;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (next_timeout != ULONG_MAX && now < next_timeout) {
+ wait = next_timeout - now;
}
-
rtx_timer_used_ = true;
- retransmit();
rtx_timer_->expires_from_now(std::chrono::milliseconds(wait));
rtx_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
+ rtx_timer_used_ = false;
checkRtx();
});
}
@@ -628,14 +655,14 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
uint32_t segmentNumber = interest->getName().getSuffix();
- if(segmentNumber >= HICN_MIN_PROBE_SEQ){
+ if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
// this is a timeout on a probe, do nothing
return;
}
uint32_t pkt = segmentNumber & modMask_;
- if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){
+ if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
inflightInterestsCount_--;
// we do nothing, and we keep asking the same stuff over
// and over until we get at least a packet
@@ -670,7 +697,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
scheduleNextInterests();
}
-bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) {
+bool RTCTransportProtocol::onNack(const ContentObject &content_object,
+ bool rtx) {
uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
@@ -678,7 +706,13 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
bool old_nack = false;
- if(!rtx){
+ // if we did not received anything between lastReceived_ + 1 and productionSeg
+ // most likelly some packets got lost
+ if (lastReceived_ != 0) {
+ addRetransmissions(lastReceived_ + 1, productionSeg);
+ }
+
+ if (!rtx) {
gotNack_ = true;
// we synch the estimated production rate with the actual one
estimatedBw_ = (double)productionRate;
@@ -688,7 +722,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
// we are asking for stuff produced in the past
actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ;
- if(!rtx) {
+ if (!rtx) {
if (currentState_ == HICN_RTC_NORMAL_STATE) {
currentState_ = HICN_RTC_SYNC_STATE;
}
@@ -697,23 +731,13 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
increaseWindow();
}
- //we need to remove the rtx for packets with seq number
- //< productionSeg
- for(auto it = interestRetransmissions_.begin(); it !=
- interestRetransmissions_.end();){
- if(it->first < productionSeg)
- it = interestRetransmissions_.erase(it);
- else
- ++it;
- }
-
lastSegNacked_ = productionSeg;
old_nack = true;
} else if (productionSeg < nackSegment) {
actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
- if(!rtx){
+ if (!rtx) {
// we are asking stuff in the future
gotFutureNack_++;
computeMaxWindow(productionRate, 0);
@@ -724,8 +748,8 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
}
}
} else {
- //we are asking the right thing, but the producer is slow
- //keep doing the same until the packet is produced
+ // we are asking the right thing, but the producer is slow
+ // keep doing the same until the packet is produced
actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
}
@@ -734,7 +758,6 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
void RTCTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
-
// as soon as we get a packet firstPckReceived_ will never be false
firstPckReceived_ = true;
@@ -746,24 +769,25 @@ void RTCTransportProtocol::onContentObject(
ConsumerContentObjectCallback *callback_content_object = nullptr;
socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
&callback_content_object);
- if (*callback_content_object != VOID_HANDLER) {
+ if (*callback_content_object) {
(*callback_content_object)(*socket_, *content_object);
}
- if(segmentNumber >= HICN_MIN_PROBE_SEQ){
- if(segmentNumber == probe_seq_number_ && !received_probe_){
+ if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
+ if (segmentNumber == probe_seq_number_ && !received_probe_) {
received_probe_ = true;
uint32_t pathLabel = content_object->getPathLabel();
- if (pathTable_.find(pathLabel) == pathTable_.end()){
- //if this path does not exists we cannot create a new one so drop
+ if (pathTable_.find(pathLabel) == pathTable_.end()) {
+ // if this path does not exists we cannot create a new one so drop
return;
}
- //this is the expected probe, update the RTT and drop the packet
+ // this is the expected probe, update the RTT and drop the packet
uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count() - time_sent_probe_;
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ time_sent_probe_;
pathTable_[pathLabel]->insertRttSample(RTT);
pathTable_[pathLabel]->receivedNack();
@@ -779,27 +803,33 @@ void RTCTransportProtocol::onContentObject(
bool old_nack = false;
if (interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()){
- //this is not a retransmitted packet
+ interestRetransmissions_.end()) {
+ // this is not a retransmitted packet
old_nack = onNack(*content_object, false);
updateDelayStats(*content_object);
} else {
old_nack = onNack(*content_object, true);
}
- //the nacked_ state is used only to avoid to decrease inflightInterestsCount_
- //multiple times. In fact, every time that we receive an event related to an
- //interest (timeout, nacked, content) we cange the state. In this way we are
- //sure that we do not decrease twice the counter
- if(old_nack)
+ // the nacked_ state is used only to avoid to decrease
+ // inflightInterestsCount_ multiple times. In fact, every time that we
+ // receive an event related to an interest (timeout, nacked, content) we
+ // cange the state. In this way we are sure that we do not decrease twice the
+ // counter
+ if (old_nack) {
inflightInterests_[pkt].state = lost_;
- else
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
inflightInterests_[pkt].state = nacked_;
+ }
} else {
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
+ receivedBytes_ += (uint32_t)(content_object->headerSize() +
+ content_object->payloadSize());
+
if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--; // packet sent without timeouts
}
@@ -807,33 +837,33 @@ void RTCTransportProtocol::onContentObject(
if (inflightInterests_[pkt].state == sent_ &&
interestRetransmissions_.find(segmentNumber) ==
interestRetransmissions_.end()) {
- // we count only non retransmitted data in order to take into accunt only
- // the transmition rate of the producer
- receivedBytes_ += (uint32_t)(content_object->headerSize() +
- content_object->payloadSize());
+ // delay stats are computed only for non retransmitted data
updateDelayStats(*content_object);
+ }
- addRetransmissions(lastReceived_ + 1, segmentNumber);
- if(segmentNumber > highestReceived_)
- highestReceived_ = segmentNumber;
- // lastReceived_ is updated only for data packets received without RTX
+ addRetransmissions(lastReceived_ + 1, segmentNumber);
+ if (segmentNumber > highestReceived_) {
+ highestReceived_ = segmentNumber;
+ }
+ if (segmentNumber > lastReceived_) {
lastReceived_ = segmentNumber;
+ lastReceivedTime_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
}
-
receivedData_++;
inflightInterests_[pkt].state = received_;
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end()) lossRecovered_++;
+
+ interestRetransmissions_.erase(segmentNumber);
+
reassemble(std::move(content_object));
increaseWindow();
}
- // in any case we remove the packet from the rtx list
- auto it = interestRetransmissions_.find(segmentNumber);
- if(it != interestRetransmissions_.end())
- lossRecovered_ ++;
-
- interestRetransmissions_.erase(segmentNumber);
-
scheduleNextInterests();
}
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 509f11361..7927e3969 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -49,6 +49,7 @@
#define HICN_MAX_RTX_SIZE 1024
#define HICN_MAX_RTX_MAX_AGE 10000
#define HICN_MIN_RTT_WIN 30 // rounds
+#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms
// cwin
#define HICN_INITIAL_CWIN 1 // packets
@@ -124,7 +125,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
void scheduleNextInterests() override;
void addRetransmissions(uint32_t val);
void addRetransmissions(uint32_t start, uint32_t stop);
- void retransmit();
+ uint64_t retransmit();
void checkRtx();
void probeRtt();
void onTimeout(Interest::Ptr &&interest) override;
@@ -159,6 +160,8 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
//for samething that is older than this value.
uint32_t lastReceived_; //segment of the last content object received
//indicates the base of the window on the client
+ uint64_t lastReceivedTime_; //time at which we recevied the
+ //lastReceived_ packet
//rtt probes
//the RTC transport tends to overestimate the RTT
diff --git a/libtransport/src/hicn/transport/utils/content_store.cc b/libtransport/src/hicn/transport/utils/content_store.cc
index 1e6b9fcea..8e3435507 100644
--- a/libtransport/src/hicn/transport/utils/content_store.cc
+++ b/libtransport/src/hicn/transport/utils/content_store.cc
@@ -85,12 +85,17 @@ void ContentStore::erase(const Name &exact_name) {
}
void ContentStore::setLimit(size_t max_packets) {
+ utils::SpinLock::Acquire locked(cs_mutex_);
max_content_store_size_ = max_packets;
}
-std::size_t ContentStore::getLimit() const { return max_content_store_size_; }
+std::size_t ContentStore::getLimit() const {
+ utils::SpinLock::Acquire locked(cs_mutex_);
+ return max_content_store_size_;
+}
std::size_t ContentStore::size() const {
+ utils::SpinLock::Acquire locked(cs_mutex_);
return content_store_hash_table_.size();
}
diff --git a/libtransport/src/hicn/transport/utils/content_store.h b/libtransport/src/hicn/transport/utils/content_store.h
index a89403a01..f7dc41835 100644
--- a/libtransport/src/hicn/transport/utils/content_store.h
+++ b/libtransport/src/hicn/transport/utils/content_store.h
@@ -68,8 +68,9 @@ class ContentStore {
ContentStoreHashTable content_store_hash_table_;
FIFOList fifo_list_;
std::shared_ptr<ContentObject> empty_reference_;
- std::size_t max_content_store_size_;
- utils::SpinLock cs_mutex_;
+ // Must be atomic
+ std::atomic_size_t max_content_store_size_;
+ mutable utils::SpinLock cs_mutex_;
};
} // end namespace utils \ No newline at end of file
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index ccb22779b..94e5d998d 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -47,7 +47,6 @@ namespace interface {
#define MIN_PROBE_SEQ 0xefffffff
-
using CryptoSuite = utils::CryptoSuite;
using Identity = utils::Identity;
@@ -217,8 +216,8 @@ class HIperfClient {
expected_seg_ = productionSeg;
} else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) {
std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg
- << ". Next expected packet " << productionSeg << std::endl;
- } else if (receivedSeg >= MIN_PROBE_SEQ){
+ << ". Next expected packet " << productionSeg << std::endl;
+ } else if (receivedSeg >= MIN_PROBE_SEQ) {
std::cout << "[PROBE] probe number = " << receivedSeg << std::endl;
}
return;
@@ -259,9 +258,7 @@ class HIperfClient {
void handleTimerExpiration(ConsumerSocket &c,
const protocol::TransportStatistics &stats) {
-
- if (configuration_.rtc_)
- return;
+ if (configuration_.rtc_) return;
const char separator = ' ';
const int width = 20;
@@ -327,7 +324,9 @@ class HIperfClient {
}
consumer_socket_ = std::make_unique<ConsumerSocket>(transport_protocol);
- consumer_socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, configuration_.interest_lifetime_);
+ consumer_socket_->setSocketOption(
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ configuration_.interest_lifetime_);
#if defined(DEBUG) && defined(__linux__)
std::shared_ptr<transport::BasePortal> portal;
@@ -944,8 +943,8 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
- while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:ItL:")) !=
- -1) {
+ while ((opt = getopt(argc, argv,
+ "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:ItL:")) != -1) {
switch (opt) {
// Common
case 'D': {