diff options
57 files changed, 3194 insertions, 1955 deletions
diff --git a/cmake/Modules/Packager.cmake b/cmake/Modules/Packager.cmake index 898e40bcb..6f77fd3a8 100644 --- a/cmake/Modules/Packager.cmake +++ b/cmake/Modules/Packager.cmake @@ -33,10 +33,35 @@ function(get_next_version VERSION NEXT_VERSION) math(EXPR major "${major} + 1") endif() - set(minor "0${minor}") + if (minor LESS 10) + set(minor "0${minor}") + endif() + set(${NEXT_VERSION} "${major}.${minor}" PARENT_SCOPE) endfunction() +macro(extract_version) + # Extract version from git + execute_process( + COMMAND git describe --long --match v* + WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} + OUTPUT_VARIABLE VER + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + + if (NOT VER) + set(VER "v1.0-0-gcafe") + endif() + + message(STATUS "Git describe output: ${VER}") + + string(REGEX REPLACE "v([0-9]+).([0-9]+)-([0-9]+)-(g[0-9a-f]+)" "\\1;\\2;\\3;\\4" VER ${VER}) + list(GET VER 0 VERSION_MAJOR) + list(GET VER 1 VERSION_MINOR) + list(GET VER 2 VERSION_REVISION) + list(GET VER 3 COMMIT_NAME) +endmacro(extract_version) + macro(make_packages) if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") # parse /etc/os-release @@ -49,23 +74,17 @@ macro(make_packages) set(OS_${_name} ${_value}) endforeach() - # extract version from git - execute_process( - COMMAND git describe --long --match v* - WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} - OUTPUT_VARIABLE VER - OUTPUT_STRIP_TRAILING_WHITESPACE - ) + extract_version() - if (NOT VER) - set(VER "v1.0-1-gcafe") - endif() + message(STATUS "Version major: ${VERSION_MAJOR}") + message(STATUS "Version minor: ${VERSION_MINOR}") + message(STATUS "Revision: ${VERSION_REVISION}") + message(STATUS "Commit hash: ${COMMIT_NAME}") - string(REGEX REPLACE "v(.*)-([0-9]+)-(g[0-9a-f]+)" "\\1;\\2;\\3" VER ${VER}) - list(GET VER 0 tag) + set(tag "${VERSION_MAJOR}.${VERSION_MINOR}") string(REPLACE "-" "~" tag ${tag}) - list(GET VER 1 commit_num) - list(GET VER 2 commit_name) + set(commit_num ${VERSION_REVISION}) + set(commit_name ${COMMIT_NAME}) if (NOT DEFINED ENV{BUILD_NUMBER}) set(bld "b1") diff --git a/cmake/Modules/ServiceScript.cmake b/cmake/Modules/ServiceScript.cmake new file mode 100644 index 000000000..110aa816b --- /dev/null +++ b/cmake/Modules/ServiceScript.cmake @@ -0,0 +1,42 @@ +# Copyright (c) 2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################## +# +# Find the LongBow libraries and includes +# This module sets: +# SYSTEMD_FOUND: True if Systemd was found +# SYSTEMD_SERVICES_INSTALL_DIR: The Systemd install directory + +set(SYSTEMD_SERVICE_FOLDER "/lib/systemd/system") + +macro(install_service_script script) +cmake_parse_arguments(ARG + "" + "COMPONENT" + "" + ${ARGN} +) + + # Install service file only if + # 1) We are on a linux system + # 2) The installation prefix is /usr + + if (NOT ARG_COMPONENT) + set(ARG_COMPONENT hicn) + endif() + + if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux" AND ${CMAKE_INSTALL_PREFIX} STREQUAL "/usr") + install (FILES ${script} DESTINATION ${SYSTEMD_SERVICE_FOLDER} COMPONENT ${ARG_COMPONENT}) + endif() +endmacro(install_service_script)
\ No newline at end of file diff --git a/ctrl/libhicnctrl/includes/hicn/ctrl/api.h b/ctrl/libhicnctrl/includes/hicn/ctrl/api.h index a0ee828b9..07c98514d 100644 --- a/ctrl/libhicnctrl/includes/hicn/ctrl/api.h +++ b/ctrl/libhicnctrl/includes/hicn/ctrl/api.h @@ -73,11 +73,6 @@ #define HICN_DEFAULT_PORT 9695 -#define LIBHICNCTRL_SUCCESS 0 -#define LIBHICNCTRL_FAILURE -1 -#define LIBHICNCTRL_NOT_IMPLEMENTED -99 -#define LIBHICNCTRL_IS_ERROR(x) (x < 0) - /* Helper for avoiding warnings about type-punning */ #define UNION_CAST(x, destType) \ (((union {__typeof__(x) a; destType b;})x).b) @@ -219,11 +214,11 @@ hc_ ## TYPE ## _find(hc_data_t * data, const hc_ ## TYPE ## _t * element, \ foreach_type(hc_ ## TYPE ## _t, x, data) { \ if (hc_ ## TYPE ## _cmp(x, element) >= 0) { \ *found = x; \ - return LIBHICNCTRL_SUCCESS; \ + return 0; \ } \ }; \ *found = NULL; /* this is optional */ \ - return LIBHICNCTRL_SUCCESS; \ + return 0; \ } /****************************************************************************** @@ -236,113 +231,103 @@ hc_ ## TYPE ## _find(hc_data_t * data, const hc_ ## TYPE ## _t * element, \ /** * \brief Holds the state of an hICN control socket */ -typedef struct { - char * url; - int fd; - u32 seq; - - /* Partial receive buffer */ - u8 buf[RECV_BUFLEN]; - size_t roff; /**< Read offset */ - size_t woff; /**< Write offset */ - - /* - * Because received messages are potentially unbounded in size, we might not - * guarantee that we can store a full packet before processing it. We must - * implement a very simple state machine remembering the current parsing - * status in order to partially process the packet. - */ - size_t remaining; - u32 send_id; - u32 send_seq; - u32 recv_seq; -} hc_sock_t; +typedef struct hc_sock_s hc_sock_t; /** * \brief Create an hICN control socket using the specified URL. * \param [in] url - The URL to connect to. * \return an hICN control socket */ -hc_sock_t * -hc_sock_create_url(const char * url); +hc_sock_t * hc_sock_create_url(const char * url); /** * \brief Create an hICN control socket using the default connection type. * \return an hICN control socket */ -hc_sock_t * -hc_sock_create(void); +hc_sock_t * hc_sock_create(void); /** * \brief Frees an hICN control socket + * \param [in] s - hICN control socket */ -void -hc_sock_free(hc_sock_t *s); +void hc_sock_free(hc_sock_t * s); + +/** + * \brief Returns the next available sequence number to use for requests to the + * API. + * \param [in] s - hICN control socket + */ +int hc_sock_get_next_seq(hc_sock_t * s); /** * \brief Sets the socket as non-blocking + * \param [in] s - hICN control socket * \return Error code */ -int -hc_sock_set_nonblocking(hc_sock_t *s); +int hc_sock_set_nonblocking(hc_sock_t * s); + +/** + * \brief Return the file descriptor associated to the hICN contorl sock + * \param [in] s - hICN control socket + * \return The file descriptor (positive value), or a negative integer in case + * of error + */ +int hc_sock_get_fd(hc_sock_t * s); /** * \brief Connect the socket * \return Error code */ int -hc_sock_connect(hc_sock_t *s); +hc_sock_connect(hc_sock_t * s); /** * \brief Return the offset and size of available buffer space - * \param [in] sock - hICN control socket + * \param [in] s - hICN control socket * \param [out] buffer - Offset in buffer * \param [out] size - Remaining size * \return Error code */ -int -hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size); +int hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size); /** * \brief Write/read iexchance on the control socket (internal helper function) - * \param [in] sock - hICN control socket + * \param [in] s - hICN control socket * \param [in] msg - Message to send * \param [in] msglen - Length of the message to send * \return Error code */ -int -hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen); +int hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen, int seq); /** * \brief Helper for reading socket contents - * \param [in] sock - hICN control socket - * \param [in] data - Result data buffer - * \param [in] parse - Parse function to convert remote types into lib native - * types, or NULL not to perform any translation. + * \param [in] s - hICN control socket * \return Error code */ -int -hc_sock_recv(hc_sock_t * s, hc_data_t * data); +int hc_sock_recv(hc_sock_t * s); /** * \brief Processing data received by socket - * \param [in] sock - hICN control socket - * \param [in] data - Result data buffer + * \param [in] s - hICN control socket * \param [in] parse - Parse function to convert remote types into lib native * types, or NULL not to perform any translation. * \return Error code */ -int -hc_sock_process(hc_sock_t * s, hc_data_t * data, - int (*parse)(const u8 * src, u8 * dst)); +int hc_sock_process(hc_sock_t * s, hc_data_t ** data); + +/** + * \brief Callback used in async mode when data is available on the socket + * \param [in] s - hICN control socket + * \return Error code + */ +int hc_sock_callback(hc_sock_t * s, hc_data_t ** data); /** * \brief Reset the state of the sock (eg. to handle a reconnecton) - * \param [in] sock - hICN control socket + * \param [in] s - hICN control socket * \return Error code */ -int -hc_sock_reset(hc_sock_t * s); +int hc_sock_reset(hc_sock_t * s); /****************************************************************************** * Command-specific structures and functions @@ -471,7 +456,7 @@ typedef struct { int hc_listener_create(hc_sock_t * s, hc_listener_t * listener); /* listener_found might eventually be allocated, and needs to be freed */ -int hc_listener_get(hc_sock_t *s, hc_listener_t * listener, +int hc_listener_get(hc_sock_t * s, hc_listener_t * listener, hc_listener_t ** listener_found); int hc_listener_delete(hc_sock_t * s, hc_listener_t * listener); int hc_listener_list(hc_sock_t * s, hc_data_t ** pdata); @@ -518,7 +503,7 @@ typedef struct { int hc_connection_create(hc_sock_t * s, hc_connection_t * connection); /* connection_found will be allocated, and must be freed */ -int hc_connection_get(hc_sock_t *s, hc_connection_t * connection, +int hc_connection_get(hc_sock_t * s, hc_connection_t * connection, hc_connection_t ** connection_found); int hc_connection_update_by_id(hc_sock_t * s, int hc_connection_id, hc_connection_t * connection); @@ -579,13 +564,14 @@ int hc_face_create(hc_sock_t * s, hc_face_t * face); int hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found); int hc_face_delete(hc_sock_t * s, hc_face_t * face); int hc_face_list(hc_sock_t * s, hc_data_t ** pdata); +int hc_face_list_async(hc_sock_t * s); //, hc_data_t ** pdata); #define foreach_face(VAR, data) foreach_type(hc_face_t, VAR, data) #define MAX_FACE_ID 255 #define MAXSZ_FACE_ID_ 3 #define MAXSZ_FACE_ID MAXSZ_FACE_ID_ + NULLTERM -#define MAXSZ_FACE_NAME_ NAMELEN +#define MAXSZ_FACE_NAME_ NAME_LEN #define MAXSZ_FACE_NAME MAXSZ_FACE_NAME_ + NULLTERM #define MAXSZ_HC_FACE_ MAXSZ_FACE_ID_ + MAXSZ_FACE_NAME_ + MAXSZ_FACE_ + 5 diff --git a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h index 4209c6eb6..75f05988f 100755 --- a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h +++ b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h @@ -58,6 +58,7 @@ typedef enum { ADD_ROUTE, LIST_ROUTES, REMOVE_CONNECTION, + REMOVE_LISTENER, REMOVE_ROUTE, CACHE_STORE, CACHE_SERVE, @@ -77,7 +78,6 @@ typedef enum { REMOVE_POLICY, UPDATE_CONNECTION, #endif /* WITH_POLICY */ - REMOVE_LISTENER, LAST_COMMAND_VALUE } command_id; diff --git a/ctrl/libhicnctrl/src/CMakeLists.txt b/ctrl/libhicnctrl/src/CMakeLists.txt index 7b4413d55..4708595e0 100644 --- a/ctrl/libhicnctrl/src/CMakeLists.txt +++ b/ctrl/libhicnctrl/src/CMakeLists.txt @@ -24,6 +24,7 @@ set(HEADER_FILES set(UTIL_HEADER_FILES face.h util/log.h + util/map.h ) set(SOURCE_FILES diff --git a/ctrl/libhicnctrl/src/api.c b/ctrl/libhicnctrl/src/api.c index 769c96076..aa1ec8edd 100644 --- a/ctrl/libhicnctrl/src/api.c +++ b/ctrl/libhicnctrl/src/api.c @@ -20,6 +20,7 @@ #include <assert.h> // assert #include <math.h> // log2 +#include <stdbool.h> #include <stdio.h> // snprintf #include <string.h> // memmove, strcasecmp #include <sys/socket.h> // socket @@ -30,10 +31,83 @@ #include <hicn/ctrl/commands.h> #include <hicn/util/token.h> #include "util/log.h" +#include "util/map.h" #include <strings.h> #define PORT 9695 +/* + * Internal state associated to a pending request + */ +typedef struct { + int seq; + hc_data_t * data; + /* Information used to process results */ + int size_in; + int (*parse)(const u8 * src, u8 * dst); +} hc_sock_request_t; + +/** + * Messages to the forwarder might be multiplexed thanks to the seqNum fields in + * the header_control_message structure. The forwarder simply answers back the + * original sequence number. We maintain a map of such sequence number to + * outgoing queries so that replied can be demultiplexed and treated + * appropriately. + */ +TYPEDEF_MAP_H(hc_sock_map, int, hc_sock_request_t *); +TYPEDEF_MAP(hc_sock_map, int, hc_sock_request_t *, int_cmp, int_snprintf, generic_snprintf); + +struct hc_sock_s { + char * url; + int fd; + + /* Partial receive buffer */ + u8 buf[RECV_BUFLEN]; + size_t roff; /**< Read offset */ + size_t woff; /**< Write offset */ + + /* + * Because received messages are potentially unbounded in size, we might not + * guarantee that we can store a full packet before processing it. We must + * implement a very simple state machine remembering the current parsing + * status in order to partially process the packet. + */ + size_t remaining; + u32 send_id; + + /* Next sequence number to be used for requests */ + int seq; + + /* Request being parsed (NULL if none) */ + hc_sock_request_t * cur_request; + + bool async; + hc_sock_map_t * map; +}; + + +hc_sock_request_t * +hc_sock_request_create(int seq, hc_data_t * data, HC_PARSE parse) +{ + assert(seq >= 0); + assert(data); + + hc_sock_request_t * request = malloc(sizeof(hc_sock_request_t)); + if (!request) + return NULL; + request->seq = seq; + request->data = data; + request->parse = parse; + return request; +} + +void +hc_sock_request_free(hc_sock_request_t * request) +{ + free(request); +} + + #if 0 #ifdef __APPLE__ #define RANDBYTE() (u8)(arc4random() & 0xFF) @@ -239,7 +313,7 @@ hc_data_create(size_t in_element_size, size_t out_element_size) data->in_element_size = in_element_size; data->out_element_size = out_element_size; data->size = 0; - data->complete = 0; + data->complete = false; data->command_id = 0; // TODO this could also be a busy mark in the socket /* No callback needed in blocking code for instance */ data->complete_cb = NULL; @@ -274,21 +348,21 @@ hc_data_ensure_available(hc_data_t * data, size_t count) data->max_size_log = new_size_log; data->buffer = realloc(data->buffer, (1 << new_size_log) * data->out_element_size); if (!data->buffer) - return LIBHICNCTRL_FAILURE; + return -1; } - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_data_push_many(hc_data_t * data, const void * elements, size_t count) { if (hc_data_ensure_available(data, count) < 0) - return LIBHICNCTRL_FAILURE; + return -1; memcpy(data->buffer + data->size * data->out_element_size, elements, count * data->out_element_size); data->size += count; - return LIBHICNCTRL_SUCCESS; + return 0; } int @@ -316,7 +390,7 @@ hc_data_set_callback(hc_data_t * data, data_callback_t cb, void * cb_data) { data->complete_cb = cb; data->complete_cb_data = cb_data; - return LIBHICNCTRL_SUCCESS; + return 0; } int @@ -325,14 +399,14 @@ hc_data_set_complete(hc_data_t * data) data->complete = true; if (data->complete_cb) return data->complete_cb(data, data->complete_cb_data); - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_data_reset(hc_data_t * data) { data->size = 0; - return LIBHICNCTRL_SUCCESS; + return 0; } /****************************************************************************** @@ -377,10 +451,10 @@ hc_sock_parse_url(const char * url, struct sockaddr * sa) break; } default: - return LIBHICNCTRL_FAILURE; + return -1; } - return LIBHICNCTRL_SUCCESS; + return 0; } hc_sock_t * @@ -399,9 +473,20 @@ hc_sock_create_url(const char * url) if (hc_sock_reset(s) < 0) goto ERR_RESET; + s->seq = 0; + s->cur_request = NULL; + + s->map = hc_sock_map_create(); + if (!s->map) + goto ERR_MAP; + return s; + //hc_sock_map_free(s->map); +ERR_MAP: ERR_RESET: + if (s->url) + free(s->url); close(s->fd); ERR_SOCKET: free(s); @@ -418,6 +503,18 @@ hc_sock_create(void) void hc_sock_free(hc_sock_t * s) { + hc_sock_request_t ** request_array = NULL; + int n = hc_sock_map_get_value_array(s->map, &request_array); + if (n < 0) { + ERROR("Could not retrieve pending request array for freeing up resources"); + } else { + for (unsigned i = 0; i < n; i++) { + hc_sock_request_t * request = request_array[i]; + hc_sock_request_free(request); + } + free(request_array); + } + hc_sock_map_free(s->map); if (s->url) free(s->url); close(s->fd); @@ -425,13 +522,25 @@ hc_sock_free(hc_sock_t * s) } int -hc_sock_set_nonblocking(hc_sock_t *s) +hc_sock_get_next_seq(hc_sock_t * s) +{ + return s->seq++; +} + +int +hc_sock_set_nonblocking(hc_sock_t * s) { return (fcntl(s->fd, F_SETFL, fcntl(s->fd, F_GETFL) | O_NONBLOCK) < 0); } int -hc_sock_connect(hc_sock_t *s) +hc_sock_get_fd(hc_sock_t * s) +{ + return s->fd; +} + +int +hc_sock_connect(hc_sock_t * s) { struct sockaddr_storage ss = { 0 }; @@ -444,17 +553,18 @@ hc_sock_connect(hc_sock_t *s) if (connect(s->fd, (struct sockaddr *)&ss, size) < 0) //sizeof(struct sockaddr)) < 0) goto ERR_CONNECT; - return LIBHICNCTRL_SUCCESS; + return 0; ERR_CONNECT: ERR_PARSE: - return LIBHICNCTRL_FAILURE; + return -1; } int -hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen) +hc_sock_send(hc_sock_t * s, hc_msg_t * msg, size_t msglen, int seq) { int rc; + msg->hdr.seqNum = seq; rc = send(s->fd, msg, msglen, 0); if (rc < 0) { perror("hc_sock_send"); @@ -469,11 +579,11 @@ hc_sock_get_available(hc_sock_t * s, u8 ** buffer, size_t * size) *buffer = s->buf + s->woff; *size = RECV_BUFLEN - s->woff; - return LIBHICNCTRL_SUCCESS; + return 0; } int -hc_sock_recv(hc_sock_t * s, hc_data_t * data) +hc_sock_recv(hc_sock_t * s) { int rc; @@ -485,24 +595,24 @@ hc_sock_recv(hc_sock_t * s, hc_data_t * data) rc = recv(s->fd, s->buf + s->woff, RECV_BUFLEN - s->woff, 0); if (rc == 0) { - return LIBHICNCTRL_FAILURE; /* Connection has been closed */ - // XXX + return 0; } if (rc < 0) { + /* + * Let's not return 0 which currently means the socket has been closed + */ + if (errno == EWOULDBLOCK) + return -1; perror("hc_sock_recv"); - /* Error occurred */ - // XXX check for EWOULDBLOCK; - // XXX - return LIBHICNCTRL_FAILURE; + return -1; } s->woff += rc; - return LIBHICNCTRL_SUCCESS; + return rc; } int -hc_sock_process(hc_sock_t * s, hc_data_t * data, - int (*parse)(const u8 * src, u8 * dst)) +hc_sock_process(hc_sock_t * s, hc_data_t ** data) { int err = 0; @@ -511,7 +621,7 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data, while(available > 0) { - if (s->remaining == 0) { + if (!s->cur_request) { // No message being parsed, alternatively (remaining == 0) hc_msg_t * msg = (hc_msg_t*)(s->buf + s->roff); /* We expect a message header */ @@ -519,74 +629,82 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data, break; /* Sanity checks (might instead raise warnings) */ - // TODO: sync check ? assert((msg->hdr.messageType == RESPONSE_LIGHT) || (msg->hdr.messageType == ACK_LIGHT) || (msg->hdr.messageType == NACK_LIGHT)); - //assert(msg->hdr.commandID == data->command_id); // FIXME - assert(msg->hdr.seqNum == s->recv_seq++); + hc_sock_request_t * request = NULL; + if (hc_sock_map_get(s->map, msg->hdr.seqNum, &request) < 0) { + ERROR("[hc_sock_process] Error searching for matching request"); + return -1; + } + if (!request) { + ERROR("[hc_sock_process] No request matching received sequence number"); + return -1; + } s->remaining = msg->hdr.length; if (s->remaining == 0) { - /* - * The protocol expects all sequence number to be reset after - * each transaction. We reset before running the callback in - * case it triggers new exchanges. - */ - s->send_seq = HICN_CTRL_SEND_SEQ_INIT; - s->recv_seq = HICN_CTRL_RECV_SEQ_INIT; - - // TODO : check before even sending ? - /* Complete message without payload */ - // TODO : is this correct ? no error code ? - hc_data_set_complete(data); + if (data) { + *data = request->data; +// } else { +// free(request->data); + } + hc_data_set_complete(request->data); + hc_sock_request_free(request); + } else { + /* We only remember it if there is still data to parse */ + s->cur_request = request; } available -= sizeof(hc_msg_header_t); s->roff += sizeof(hc_msg_header_t); } else { /* We expect the complete payload, or at least a chunk of it */ - size_t num_chunks = available / data->in_element_size; + size_t num_chunks = available / s->cur_request->data->in_element_size; if (num_chunks == 0) break; if (num_chunks > s->remaining) num_chunks = s->remaining; - if (!parse) { - hc_data_push_many(data, s->buf + s->roff, num_chunks); + if (!s->cur_request->parse) { + /* If we don't need to parse results, then we can directly push + * all of them into the result data structure */ + hc_data_push_many(s->cur_request->data, s->buf + s->roff, num_chunks); } else { int rc; - rc = hc_data_ensure_available(data, num_chunks); + rc = hc_data_ensure_available(s->cur_request->data, num_chunks); if (rc < 0) - return LIBHICNCTRL_FAILURE; + return -1; for (int i = 0; i < num_chunks; i++) { - u8 * dst = hc_data_get_next(data); + u8 * dst = hc_data_get_next(s->cur_request->data); if (!dst) - return LIBHICNCTRL_FAILURE; + return -1; - rc = parse(s->buf + s->roff + i * data->in_element_size, dst); + rc = s->cur_request->parse(s->buf + s->roff + i * s->cur_request->data->in_element_size, dst); if (rc < 0) err = -1; /* FIXME we let the loop complete (?) */ - data->size++; + s->cur_request->data->size++; } } - s->remaining -= num_chunks; + available -= num_chunks * s->cur_request->data->in_element_size; + s->roff += num_chunks * s->cur_request->data->in_element_size; if (s->remaining == 0) { - /* - * The protocol expects all sequence number to be reset after - * each transaction. We reset before running the callback in - * case it triggers new exchanges. - */ - s->send_seq = HICN_CTRL_SEND_SEQ_INIT; - s->recv_seq = HICN_CTRL_RECV_SEQ_INIT; - - hc_data_set_complete(data); + if (hc_sock_map_remove(s->map, s->cur_request->seq, NULL) < 0) { + ERROR("[hc_sock_process] Error removing request from map"); + return -1; + } + if (data) { + *data = s->cur_request->data; +// } else { +// free(s->cur_request->data); + } + hc_data_set_complete(s->cur_request->data); + hc_sock_request_free(s->cur_request); + s->cur_request = NULL; } - available -= num_chunks * data->in_element_size; - s->roff += num_chunks * data->in_element_size; } } @@ -605,13 +723,48 @@ hc_sock_process(hc_sock_t * s, hc_data_t * data, } int +hc_sock_callback(hc_sock_t * s, hc_data_t ** data) +{ + *data = NULL; + + for (;;) { + int n = hc_sock_recv(s); + if (n == 0) { + DEBUG("EOF"); + goto ERR_EOF; + } + if (n < 0) { + switch(errno) { + case ECONNRESET: + case ENODEV: + /* Forwarder restarted */ + WARN("Forwarder likely restarted: not (yet) implemented"); + goto ERR_EOF; + case EWOULDBLOCK: + //DEBUG("Would block... stop reading from socket"); + goto END; + default: + perror("hc_sock_recv"); + goto ERR_EOF; + } + } + if (hc_sock_process(s, data) < 0) { + return -1; + } + } +END: + return 0; + +ERR_EOF: + return -1; +} + +int hc_sock_reset(hc_sock_t * s) { s->roff = s->woff = 0; - s->send_seq = HICN_CTRL_SEND_SEQ_INIT; - s->recv_seq = HICN_CTRL_RECV_SEQ_INIT; s->remaining = 0; - return LIBHICNCTRL_SUCCESS; + return 0; } /****************************************************************************** @@ -630,8 +783,11 @@ typedef struct { int hc_execute_command(hc_sock_t * s, hc_msg_t * msg, size_t msg_len, - hc_command_params_t * params, hc_data_t ** pdata) + hc_command_params_t * params, hc_data_t ** pdata, bool async) { + if (async) + assert(!pdata); + /* Sanity check */ switch(params->cmd) { case ACTION_CREATE: @@ -655,35 +811,70 @@ hc_execute_command(hc_sock_t * s, hc_msg_t * msg, size_t msg_len, assert(params->parse == NULL); break; default: - return LIBHICNCTRL_FAILURE; + return -1; } - hc_sock_reset(s); + //hc_sock_reset(s); /* XXX data will at least store the result (complete) */ hc_data_t * data = hc_data_create(params->size_in, params->size_out); - if (!data) + if (!data) { + ERROR("[hc_execute_command] Could not create data storage"); goto ERR_DATA; + } - if (hc_sock_send(s, msg, msg_len) < 0) + int seq = hc_sock_get_next_seq(s); + if (seq < 0) { + ERROR("[hc_execute_command] Could not get next sequence number"); + goto ERR_SEQ; + } + + /* Create state used to process the request */ + hc_sock_request_t * request = NULL; + request = hc_sock_request_create(seq, data, params->parse); + if (!request) { + ERROR("[hc_execute_command] Could not create request state"); + goto ERR_REQUEST; + } + + /* Add state to map */ + if (hc_sock_map_add(s->map, seq, request) < 0) { + ERROR("[hc_execute_command] Error adding request state to map"); + goto ERR_MAP; + } + + if (hc_sock_send(s, msg, msg_len, seq) < 0) { + ERROR("[hc_execute_command] Error sending message"); goto ERR_PROCESS; + } + + if (async) + return 0; + while(!data->complete) { - if (hc_sock_recv(s, data) < 0) - break; - if (hc_sock_process(s, data, params->parse) < 0) { + /* + * As the socket is non blocking it might happen that we need to read + * several times before success... shall we alternate between blocking + * and non-blocking mode ? + */ + if (hc_sock_recv(s) < 0) + continue; //break; + if (hc_sock_process(s, pdata) < 0) { + ERROR("[hc_execute_command] Error processing socket results"); goto ERR_PROCESS; } } - if (pdata) - *pdata = data; - - return LIBHICNCTRL_SUCCESS; + return 0; ERR_PROCESS: +ERR_MAP: + hc_sock_request_free(request); +ERR_REQUEST: +ERR_SEQ: free(data); ERR_DATA: - return LIBHICNCTRL_FAILURE; + return -1; } /*----------------------------------------------------------------------------* @@ -693,13 +884,13 @@ ERR_DATA: /* LISTENER CREATE */ int -hc_listener_create(hc_sock_t * s, hc_listener_t * listener) +_hc_listener_create(hc_sock_t * s, hc_listener_t * listener, bool async) { if (!IS_VALID_FAMILY(listener->family)) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_CONNECTION_TYPE(listener->type)) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -709,7 +900,7 @@ hc_listener_create(hc_sock_t * s, hc_listener_t * listener) .messageType = REQUEST_LIGHT, .commandID = ADD_LISTENER, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { .address = { @@ -733,31 +924,43 @@ hc_listener_create(hc_sock_t * s, hc_listener_t * listener) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_listener_create(hc_sock_t * s, hc_listener_t * listener) +{ + return _hc_listener_create(s, listener, false); +} + +int +hc_listener_create_async(hc_sock_t * s, hc_listener_t * listener) +{ + return _hc_listener_create(s, listener, true); } /* LISTENER GET */ int -hc_listener_get(hc_sock_t *s, hc_listener_t * listener, +hc_listener_get(hc_sock_t * s, hc_listener_t * listener, hc_listener_t ** listener_found) { hc_data_t * listeners; hc_listener_t * found; if (hc_listener_list(s, &listeners) < 0) - return LIBHICNCTRL_FAILURE; + return -1; /* Test */ if (hc_listener_find(listeners, listener, &found) < 0) { hc_data_free(listeners); - return LIBHICNCTRL_FAILURE; + return -1; } if (found) { *listener_found = malloc(sizeof(hc_listener_t)); if (!*listener_found) - return LIBHICNCTRL_FAILURE; + return -1; **listener_found = *found; } else { *listener_found = NULL; @@ -765,14 +968,14 @@ hc_listener_get(hc_sock_t *s, hc_listener_t * listener, hc_data_free(listeners); - return LIBHICNCTRL_SUCCESS; + return 0; } /* LISTENER DELETE */ int -hc_listener_delete(hc_sock_t * s, hc_listener_t * listener) +_hc_listener_delete(hc_sock_t * s, hc_listener_t * listener, bool async) { struct { header_control_message hdr; @@ -782,24 +985,20 @@ hc_listener_delete(hc_sock_t * s, hc_listener_t * listener) .messageType = REQUEST_LIGHT, .commandID = REMOVE_LISTENER, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, }; if (listener->id) { - printf("Delete by ID\n"); snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%d", listener->id); } else if (*listener->name) { - printf("Delete by name %s\n", listener->name); snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%s", listener->name); } else { - printf("Delete after search\n"); hc_listener_t * listener_found; if (hc_listener_get(s, listener, &listener_found) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (!listener_found) - return LIBHICNCTRL_FAILURE; - printf("Delete listener ID=%d\n", listener_found->id); + return -1; snprintf(msg.payload.symbolicOrListenerid, NAME_LEN, "%d", listener_found->id); free(listener_found); } @@ -812,13 +1011,26 @@ hc_listener_delete(hc_sock_t * s, hc_listener_t * listener) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); } +int +hc_listener_delete(hc_sock_t * s, hc_listener_t * listener) +{ + return _hc_listener_delete(s, listener, false); +} + +int +hc_listener_delete_async(hc_sock_t * s, hc_listener_t * listener) +{ + return _hc_listener_delete(s, listener, true); +} + + /* LISTENER LIST */ int -hc_listener_list(hc_sock_t * s, hc_data_t ** pdata) +_hc_listener_list(hc_sock_t * s, hc_data_t ** pdata, bool async) { struct { header_control_message hdr; @@ -827,7 +1039,7 @@ hc_listener_list(hc_sock_t * s, hc_data_t ** pdata) .messageType = REQUEST_LIGHT, .commandID = LIST_LISTENERS, .length = 0, - .seqNum = s->send_seq, + .seqNum = 0, }, }; @@ -839,7 +1051,19 @@ hc_listener_list(hc_sock_t * s, hc_data_t ** pdata) .parse = (HC_PARSE)hc_listener_parse, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata, async); +} + +int +hc_listener_list(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_listener_list(s, pdata, false); +} + +int +hc_listener_list_async(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_listener_list(s, pdata, true); } /* LISTENER VALIDATE */ @@ -848,12 +1072,12 @@ int hc_listener_validate(const hc_listener_t * listener) { if (!IS_VALID_FAMILY(listener->family)) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_CONNECTION_TYPE(listener->type)) - return LIBHICNCTRL_FAILURE; + return -1; - return LIBHICNCTRL_SUCCESS; + return 0; } /* LISTENER CMP */ @@ -866,8 +1090,8 @@ hc_listener_cmp(const hc_listener_t * l1, const hc_listener_t * l2) (strncmp(l1->interface_name, l2->interface_name, INTERFACE_LEN) == 0) && (ip_address_cmp(&l1->local_addr, &l2->local_addr, l1->family) == 0) && (l1->local_port == l2->local_port)) - ? LIBHICNCTRL_SUCCESS - : LIBHICNCTRL_FAILURE; + ? 0 + : -1; } /* LISTENER PARSE */ @@ -878,18 +1102,18 @@ hc_listener_parse(void * in, hc_listener_t * listener) list_listeners_command * cmd = (list_listeners_command *)in; if (!IS_VALID_LIST_LISTENERS_TYPE(cmd->encapType)) - return LIBHICNCTRL_FAILURE; + return -1; hc_connection_type_t type = map_from_encap_type[cmd->encapType]; if (type == CONNECTION_TYPE_UNDEFINED) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_ADDR_TYPE(cmd->addressType)) - return LIBHICNCTRL_FAILURE; + return -1; int family = map_from_addr_type[cmd->addressType]; if (!IS_VALID_FAMILY(family)) - return LIBHICNCTRL_FAILURE; + return -1; *listener = (hc_listener_t) { .id = cmd->connid, @@ -900,7 +1124,7 @@ hc_listener_parse(void * in, hc_listener_t * listener) }; snprintf(listener->name, NAME_LEN, "%s", cmd->listenerName); snprintf(listener->interface_name, INTERFACE_LEN, "%s", cmd->interfaceName); - return LIBHICNCTRL_SUCCESS; + return 0; } GENERATE_FIND(listener) @@ -931,10 +1155,10 @@ hc_listener_snprintf(char * s, size_t size, hc_listener_t * listener) /* CONNECTION CREATE */ int -hc_connection_create(hc_sock_t * s, hc_connection_t * connection) +_hc_connection_create(hc_sock_t * s, hc_connection_t * connection, bool async) { if (hc_connection_validate(connection) < 0) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -944,7 +1168,7 @@ hc_connection_create(hc_sock_t * s, hc_connection_t * connection) .messageType = REQUEST_LIGHT, .commandID = ADD_CONNECTION, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -970,31 +1194,43 @@ hc_connection_create(hc_sock_t * s, hc_connection_t * connection) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_connection_create(hc_sock_t * s, hc_connection_t * connection) +{ + return _hc_connection_create(s, connection, false); +} + +int +hc_connection_create_async(hc_sock_t * s, hc_connection_t * connection) +{ + return _hc_connection_create(s, connection, true); } /* CONNECTION GET */ int -hc_connection_get(hc_sock_t *s, hc_connection_t * connection, +hc_connection_get(hc_sock_t * s, hc_connection_t * connection, hc_connection_t ** connection_found) { hc_data_t * connections; hc_connection_t * found; if (hc_connection_list(s, &connections) < 0) - return LIBHICNCTRL_FAILURE; + return -1; /* Test */ if (hc_connection_find(connections, connection, &found) < 0) { hc_data_free(connections); - return LIBHICNCTRL_FAILURE; + return -1; } if (found) { *connection_found = malloc(sizeof(hc_connection_t)); if (!*connection_found) - return LIBHICNCTRL_FAILURE; + return -1; **connection_found = *found; } else { *connection_found = NULL; @@ -1002,14 +1238,14 @@ hc_connection_get(hc_sock_t *s, hc_connection_t * connection, hc_data_free(connections); - return LIBHICNCTRL_SUCCESS; + return 0; } /* CONNECTION DELETE */ int -hc_connection_delete(hc_sock_t * s, hc_connection_t * connection) +_hc_connection_delete(hc_sock_t * s, hc_connection_t * connection, bool async) { struct { header_control_message hdr; @@ -1019,24 +1255,20 @@ hc_connection_delete(hc_sock_t * s, hc_connection_t * connection) .messageType = REQUEST_LIGHT, .commandID = REMOVE_CONNECTION, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, }; if (connection->id) { - printf("Delete by ID\n"); snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%d", connection->id); } else if (*connection->name) { - printf("Delete by name %s\n", connection->name); snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%s", connection->name); } else { - printf("Delete after search\n"); hc_connection_t * connection_found; if (hc_connection_get(s, connection, &connection_found) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (!connection_found) - return LIBHICNCTRL_FAILURE; - printf("Delete connection ID=%d\n", connection_found->id); + return -1; snprintf(msg.payload.symbolicOrConnid, NAME_LEN, "%d", connection_found->id); free(connection_found); } @@ -1049,14 +1281,25 @@ hc_connection_delete(hc_sock_t * s, hc_connection_t * connection) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_connection_delete(hc_sock_t * s, hc_connection_t * connection) +{ + return _hc_connection_delete(s, connection, false); } +int +hc_connection_delete_async(hc_sock_t * s, hc_connection_t * connection) +{ + return _hc_connection_delete(s, connection, true); +} /* CONNECTION LIST */ int -hc_connection_list(hc_sock_t * s, hc_data_t ** pdata) +_hc_connection_list(hc_sock_t * s, hc_data_t ** pdata, bool async) { struct { header_control_message hdr; @@ -1065,7 +1308,7 @@ hc_connection_list(hc_sock_t * s, hc_data_t ** pdata) .messageType = REQUEST_LIGHT, .commandID = LIST_CONNECTIONS, .length = 0, - .seqNum = s->send_seq, + .seqNum = 0, }, }; @@ -1077,7 +1320,19 @@ hc_connection_list(hc_sock_t * s, hc_data_t ** pdata) .parse = (HC_PARSE)hc_connection_parse, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata, async); +} + +int +hc_connection_list(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_connection_list(s, pdata, false); +} + +int +hc_connection_list_async(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_connection_list(s, pdata, true); } /* CONNECTION VALIDATE */ @@ -1086,14 +1341,14 @@ int hc_connection_validate(const hc_connection_t * connection) { if (!IS_VALID_FAMILY(connection->family)) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_CONNECTION_TYPE(connection->type)) - return LIBHICNCTRL_FAILURE; + return -1; /* TODO assert both local and remote have the right family */ - return LIBHICNCTRL_SUCCESS; + return 0; } /* CONNECTION CMP */ @@ -1111,8 +1366,8 @@ int hc_connection_cmp(const hc_connection_t * c1, const hc_connection_t * c2) (c1->local_port == c2->local_port) && (ip_address_cmp(&c1->remote_addr, &c2->remote_addr, c1->family) == 0) && (c1->remote_port == c2->remote_port)) - ? LIBHICNCTRL_SUCCESS - : LIBHICNCTRL_FAILURE; + ? 0 + : -1; } /* CONNECTION PARSE */ @@ -1123,25 +1378,25 @@ hc_connection_parse(void * in, hc_connection_t * connection) list_connections_command * cmd = (list_connections_command *)in; if (!IS_VALID_LIST_CONNECTIONS_TYPE(cmd->connectionData.connectionType)) - return LIBHICNCTRL_FAILURE; + return -1; hc_connection_type_t type = map_from_list_connections_type[cmd->connectionData.connectionType]; if (type == CONNECTION_TYPE_UNDEFINED) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_LIST_CONNECTIONS_STATE(cmd->state)) - return LIBHICNCTRL_FAILURE; + return -1; hc_connection_state_t state = map_from_list_connections_state[cmd->state]; if (state == HC_CONNECTION_STATE_UNDEFINED) - return LIBHICNCTRL_FAILURE; + return -1; if (!IS_VALID_ADDR_TYPE(cmd->connectionData.ipType)) - return LIBHICNCTRL_FAILURE; + return -1; int family = map_from_addr_type[cmd->connectionData.ipType]; if (!IS_VALID_FAMILY(family)) - return LIBHICNCTRL_FAILURE; + return -1; *connection = (hc_connection_t) { .id = cmd->connid, @@ -1159,7 +1414,7 @@ hc_connection_parse(void * in, hc_connection_t * connection) }; snprintf(connection->name, NAME_LEN, "%s", cmd->connectionData.symbolic); snprintf(connection->interface_name, INTERFACE_LEN, "%s", cmd->interfaceName); - return LIBHICNCTRL_SUCCESS; + return 0; } GENERATE_FIND(connection) @@ -1196,8 +1451,8 @@ hc_connection_snprintf(char * s, size_t size, const hc_connection_t * connection /* CONNECTION SET ADMIN STATE */ int -hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, - face_state_t state) +_hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, + face_state_t state, bool async) { struct { header_control_message hdr; @@ -1207,7 +1462,7 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, .messageType = REQUEST_LIGHT, .commandID = CONNECTION_SET_ADMIN_STATE, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { .admin_state = state, @@ -1223,7 +1478,21 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, + face_state_t state) +{ + return _hc_connection_set_admin_state(s, conn_id_or_name, state, false); +} + +int +hc_connection_set_admin_state_async(hc_sock_t * s, const char * conn_id_or_name, + face_state_t state) +{ + return _hc_connection_set_admin_state(s, conn_id_or_name, state, true); } /*----------------------------------------------------------------------------* @@ -1233,10 +1502,10 @@ hc_connection_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, /* ROUTE CREATE */ int -hc_route_create(hc_sock_t * s, hc_route_t * route) +_hc_route_create(hc_sock_t * s, hc_route_t * route, bool async) { if (!IS_VALID_FAMILY(route->family)) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -1246,7 +1515,7 @@ hc_route_create(hc_sock_t * s, hc_route_t * route) .messageType = REQUEST_LIGHT, .commandID = ADD_ROUTE, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -1271,16 +1540,28 @@ hc_route_create(hc_sock_t * s, hc_route_t * route) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_route_create(hc_sock_t * s, hc_route_t * route) +{ + return _hc_route_create(s, route, false); +} + +int +hc_route_create_async(hc_sock_t * s, hc_route_t * route) +{ + return _hc_route_create(s, route, true); } /* ROUTE DELETE */ int -hc_route_delete(hc_sock_t * s, hc_route_t * route) +_hc_route_delete(hc_sock_t * s, hc_route_t * route, bool async) { if (!IS_VALID_FAMILY(route->family)) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -1290,7 +1571,7 @@ hc_route_delete(hc_sock_t * s, hc_route_t * route) .messageType = REQUEST_LIGHT, .commandID = REMOVE_ROUTE, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -1308,13 +1589,25 @@ hc_route_delete(hc_sock_t * s, hc_route_t * route) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_route_delete(hc_sock_t * s, hc_route_t * route) +{ + return _hc_route_delete(s, route, false); +} + +int +hc_route_delete_async(hc_sock_t * s, hc_route_t * route) +{ + return _hc_route_delete(s, route, true); } /* ROUTE LIST */ int -hc_route_list(hc_sock_t * s, hc_data_t ** pdata) +_hc_route_list(hc_sock_t * s, hc_data_t ** pdata, bool async) { struct { header_control_message hdr; @@ -1323,7 +1616,7 @@ hc_route_list(hc_sock_t * s, hc_data_t ** pdata) .messageType = REQUEST_LIGHT, .commandID = LIST_ROUTES, .length = 0, - .seqNum = s->send_seq, + .seqNum = 0, }, }; @@ -1335,7 +1628,19 @@ hc_route_list(hc_sock_t * s, hc_data_t ** pdata) .parse = (HC_PARSE)hc_route_parse, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata, async); +} + +int +hc_route_list(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_route_list(s, pdata, false); +} + +int +hc_route_list_async(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_route_list(s, pdata, true); } /* ROUTE PARSE */ @@ -1346,11 +1651,11 @@ hc_route_parse(void * in, hc_route_t * route) list_routes_command * cmd = (list_routes_command *) in; if (!IS_VALID_ADDR_TYPE(cmd->addressType)) - return LIBHICNCTRL_FAILURE; + return -1; int family = map_from_addr_type[cmd->addressType]; if (!IS_VALID_FAMILY(family)) - return LIBHICNCTRL_FAILURE; + return -1; *route = (hc_route_t) { .face_id = cmd->connid, @@ -1359,7 +1664,7 @@ hc_route_parse(void * in, hc_route_t * route) .len = cmd->len, .cost = cmd->cost, }; - return LIBHICNCTRL_SUCCESS; + return 0; } /* ROUTE SNPRINTF */ @@ -1416,9 +1721,9 @@ hc_face_to_listener(const hc_face_t * face, hc_listener_t * listener) case FACE_TYPE_UDP_LISTENER: break; default: - return LIBHICNCTRL_FAILURE; + return -1; } - return LIBHICNCTRL_FAILURE; /* XXX Not implemented */ + return -1; /* XXX Not implemented */ } /* LISTENER -> FACE */ @@ -1426,7 +1731,7 @@ hc_face_to_listener(const hc_face_t * face, hc_listener_t * listener) int hc_listener_to_face(const hc_listener_t * listener, hc_face_t * face) { - return LIBHICNCTRL_FAILURE; /* XXX Not implemented */ + return -1; /* XXX Not implemented */ } /* FACE -> CONNECTION */ @@ -1501,13 +1806,13 @@ hc_face_to_connection(const hc_face_t * face, hc_connection_t * connection, bool f->netdevice.name); break; default: - return LIBHICNCTRL_FAILURE; + return -1; } snprintf(connection->interface_name, INTERFACE_LEN, "%s", f->netdevice.name); - return LIBHICNCTRL_SUCCESS; + return 0; } /* CONNECTION -> FACE */ @@ -1570,14 +1875,14 @@ hc_connection_to_face(const hc_connection_t * connection, hc_face_t * face) }; break; default: - return LIBHICNCTRL_FAILURE; + return -1; } face->face.netdevice.name[0] = '\0'; face->face.netdevice.index = 0; snprintf(face->name, NAME_LEN, "%s", connection->name); snprintf(face->face.netdevice.name, INTERFACE_LEN, "%s", connection->interface_name); netdevice_update_index(&face->face.netdevice); - return LIBHICNCTRL_SUCCESS; + return 0; } /* CONNECTION -> LISTENER */ @@ -1594,7 +1899,7 @@ hc_connection_to_local_listener(const hc_connection_t * connection, hc_listener_ }; snprintf(listener->name, NAME_LEN, "lst%u", RANDBYTE()); // generate name snprintf(listener->interface_name, INTERFACE_LEN, "%s", connection->interface_name); - return LIBHICNCTRL_SUCCESS; + return 0; } /* FACE CREATE */ @@ -1615,18 +1920,18 @@ hc_face_create(hc_sock_t * s, hc_face_t * face) case FACE_TYPE_UDP: if (hc_face_to_connection(face, &connection, true) < 0) { ERROR("[hc_face_create] Could not convert face to connection."); - return LIBHICNCTRL_FAILURE; + return -1; } /* Ensure we have a corresponding local listener */ if (hc_connection_to_local_listener(&connection, &listener) < 0) { ERROR("[hc_face_create] Could not convert face to local listener."); - return LIBHICNCTRL_FAILURE; + return -1; } if (hc_listener_get(s, &listener, &listener_found) < 0) { ERROR("[hc_face_create] Could not retrieve listener"); - return LIBHICNCTRL_FAILURE; + return -1; } if (!listener_found) { @@ -1634,7 +1939,7 @@ hc_face_create(hc_sock_t * s, hc_face_t * face) if (hc_listener_create(s, &listener) < 0) { ERROR("[hc_face_create] Could not create listener."); free(listener_found); - return LIBHICNCTRL_FAILURE; + return -1; } } else { free(listener_found); @@ -1643,7 +1948,7 @@ hc_face_create(hc_sock_t * s, hc_face_t * face) /* Create corresponding connection */ if (hc_connection_create(s, &connection) < 0) { ERROR("[hc_face_create] Could not create connection."); - return LIBHICNCTRL_FAILURE; + return -1; } /* @@ -1652,12 +1957,12 @@ hc_face_create(hc_sock_t * s, hc_face_t * face) */ if (hc_connection_get(s, &connection, &connection_found) < 0) { ERROR("[hc_face_create] Could not retrieve connection"); - return LIBHICNCTRL_FAILURE; + return -1; } if (!connection_found) { ERROR("[hc_face_create] Could not find newly created connection."); - return LIBHICNCTRL_FAILURE; + return -1; } face->id = connection_found->id; @@ -1670,21 +1975,21 @@ hc_face_create(hc_sock_t * s, hc_face_t * face) case FACE_TYPE_UDP_LISTENER: if (hc_face_to_listener(face, &listener) < 0) { ERROR("Could not convert face to listener."); - return LIBHICNCTRL_FAILURE; + return -1; } if (hc_listener_create(s, &listener) < 0) { ERROR("[hc_face_create] Could not create listener."); - return LIBHICNCTRL_FAILURE; + return -1; } - return LIBHICNCTRL_FAILURE; + return -1; break; default: ERROR("[hc_face_create] Unknwon face type."); - return LIBHICNCTRL_FAILURE; + return -1; }; - return LIBHICNCTRL_SUCCESS; + return 0; } int @@ -1702,12 +2007,12 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found) case FACE_TYPE_TCP: case FACE_TYPE_UDP: if (hc_face_to_connection(face, &connection, false) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (hc_connection_get(s, &connection, &connection_found) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (!connection_found) { *face_found = NULL; - return LIBHICNCTRL_SUCCESS; + return 0; } *face_found = malloc(sizeof(face_t)); hc_connection_to_face(connection_found, *face_found); @@ -1718,12 +2023,12 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found) case FACE_TYPE_TCP_LISTENER: case FACE_TYPE_UDP_LISTENER: if (hc_face_to_listener(face, &listener) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (hc_listener_get(s, &listener, &listener_found) < 0) - return LIBHICNCTRL_FAILURE; + return -1; if (!listener_found) { *face_found = NULL; - return LIBHICNCTRL_SUCCESS; + return 0; } *face_found = malloc(sizeof(face_t)); hc_listener_to_face(listener_found, *face_found); @@ -1731,10 +2036,10 @@ hc_face_get(hc_sock_t * s, hc_face_t * face, hc_face_t ** face_found) break; default: - return LIBHICNCTRL_FAILURE; + return -1; } - return LIBHICNCTRL_SUCCESS; + return 0; } @@ -1747,7 +2052,7 @@ hc_face_delete(hc_sock_t * s, hc_face_t * face) hc_connection_t connection; if (hc_face_to_connection(face, &connection, false) < 0) { ERROR("[hc_face_delete] Could not convert face to connection."); - return LIBHICNCTRL_FAILURE; + return -1; } return hc_connection_delete(s, &connection); } @@ -1762,7 +2067,7 @@ hc_face_list(hc_sock_t * s, hc_data_t ** pdata) if (hc_connection_list(s, &connection_data) < 0) { ERROR("[hc_face_list] Could not list connections."); - return LIBHICNCTRL_FAILURE; + return -1; } hc_data_t * face_data = hc_data_create(sizeof(hc_connection_t), sizeof(hc_face_t)); @@ -1776,11 +2081,55 @@ hc_face_list(hc_sock_t * s, hc_data_t ** pdata) *pdata = face_data; hc_data_free(connection_data); - return LIBHICNCTRL_SUCCESS; + return 0; ERR: hc_data_free(connection_data); - return LIBHICNCTRL_FAILURE; + return -1; +} + +int +hc_connection_parse_to_face(void * in, hc_face_t * face) +{ + hc_connection_t connection; + + if (hc_connection_parse(in, &connection) < 0) { + ERROR("[hc_connection_parse_to_face] Could not parse connection"); + return -1; + } + + if (hc_connection_to_face(&connection, face) < 0) { + ERROR("[hc_connection_parse_to_face] Could not convert connection to face."); + return -1; + } + + return 0; +} + + +int +hc_face_list_async(hc_sock_t * s) //, hc_data_t ** pdata) +{ + struct { + header_control_message hdr; + } msg = { + .hdr = { + .messageType = REQUEST_LIGHT, + .commandID = LIST_CONNECTIONS, + .length = 0, + .seqNum = 0, + }, + }; + + hc_command_params_t params = { + .cmd = ACTION_LIST, + .cmd_id = LIST_CONNECTIONS, + .size_in = sizeof(list_connections_command), + .size_out = sizeof(hc_face_t), + .parse = (HC_PARSE)hc_connection_parse_to_face, + }; + + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, true); } /* /!\ Please update constants in header file upon changes */ @@ -1824,7 +2173,7 @@ hc_face_snprintf(char * s, size_t size, hc_face_t * face) return rc; break; default: - return LIBHICNCTRL_FAILURE; + return -1; } // [#ID NAME] TYPE LOCAL_URL REMOTE_URL STATE/ADMIN_STATE (TAGS) @@ -1852,7 +2201,7 @@ hc_face_snprintf(char * s, size_t size, hc_face_t * face) face_state_str[face->face.state], face_state_str[face->face.admin_state]); #endif /* WITH_POLICY */ - return LIBHICNCTRL_SUCCESS; + return 0; } int @@ -1866,10 +2215,11 @@ hc_face_set_admin_state(hc_sock_t * s, const char * conn_id_or_name, // XXX wron * Punting *----------------------------------------------------------------------------*/ -int hc_punting_create(hc_sock_t * s, hc_punting_t * punting) +int +_hc_punting_create(hc_sock_t * s, hc_punting_t * punting, bool async) { if (hc_punting_validate(punting) < 0) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -1879,7 +2229,7 @@ int hc_punting_create(hc_sock_t * s, hc_punting_t * punting) .messageType = REQUEST_LIGHT, .commandID = ADD_PUNTING, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -1898,37 +2248,54 @@ int hc_punting_create(hc_sock_t * s, hc_punting_t * punting) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_punting_create(hc_sock_t * s, hc_punting_t * punting) +{ + return _hc_punting_create(s, punting, false); +} + +int +hc_punting_create_async(hc_sock_t * s, hc_punting_t * punting) +{ + return _hc_punting_create(s, punting, true); } int hc_punting_get(hc_sock_t * s, hc_punting_t * punting, hc_punting_t ** punting_found) { - return LIBHICNCTRL_NOT_IMPLEMENTED; + ERROR("hc_punting_get not (yet) implemented."); + return -1; } int hc_punting_delete(hc_sock_t * s, hc_punting_t * punting) { - return LIBHICNCTRL_NOT_IMPLEMENTED; + ERROR("hc_punting_delete not (yet) implemented."); + return -1; } int hc_punting_list(hc_sock_t * s, hc_data_t ** pdata) { - return LIBHICNCTRL_NOT_IMPLEMENTED; + ERROR("hc_punting_list not (yet) implemented."); + return -1; } int hc_punting_validate(const hc_punting_t * punting) { if (!IS_VALID_FAMILY(punting->family)) - return LIBHICNCTRL_FAILURE; + return -1; /* * We might use the zero value to add punting on all faces but this is not * (yet) implemented */ - if (punting->face_id == 0) - return LIBHICNCTRL_NOT_IMPLEMENTED; + if (punting->face_id == 0) { + ERROR("Punting on all faces is not (yet) implemented."); + return -1; + } - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_punting_cmp(const hc_punting_t * p1, const hc_punting_t * p2) @@ -1937,18 +2304,20 @@ int hc_punting_cmp(const hc_punting_t * p1, const hc_punting_t * p2) (p1->family == p2->family) && (ip_address_cmp(&p1->prefix, &p2->prefix, p1->family) == 0) && (p1->prefix_len == p2->prefix_len)) - ? LIBHICNCTRL_SUCCESS - : LIBHICNCTRL_FAILURE; + ? 0 + : -1; } int hc_punting_parse(void * in, hc_punting_t * punting) { - return LIBHICNCTRL_NOT_IMPLEMENTED; + ERROR("hc_punting_parse not (yet) implemented."); + return -1; } int hc_punting_snprintf(char * s, size_t size, hc_punting_t * punting) { - return LIBHICNCTRL_NOT_IMPLEMENTED; + ERROR("hc_punting_snprintf not (yet) implemented."); + return -1; } @@ -1957,7 +2326,7 @@ int hc_punting_snprintf(char * s, size_t size, hc_punting_t * punting) *----------------------------------------------------------------------------*/ int -hc_cache_set_store(hc_sock_t * s, int enabled) +_hc_cache_set_store(hc_sock_t * s, int enabled, bool async) { struct { header_control_message hdr; @@ -1967,7 +2336,7 @@ hc_cache_set_store(hc_sock_t * s, int enabled) .messageType = REQUEST_LIGHT, .commandID = CACHE_STORE, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { .activate = enabled, @@ -1982,11 +2351,23 @@ hc_cache_set_store(hc_sock_t * s, int enabled) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); } int -hc_cache_set_serve(hc_sock_t * s, int enabled) +hc_cache_set_store(hc_sock_t * s, int enabled) +{ + return _hc_cache_set_store(s, enabled, false); +} + +int +hc_cache_set_store_async(hc_sock_t * s, int enabled) +{ + return _hc_cache_set_store(s, enabled, true); +} + +int +_hc_cache_set_serve(hc_sock_t * s, int enabled, bool async) { struct { header_control_message hdr; @@ -1996,7 +2377,7 @@ hc_cache_set_serve(hc_sock_t * s, int enabled) .messageType = REQUEST_LIGHT, .commandID = CACHE_SERVE, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { .activate = enabled, @@ -2011,9 +2392,20 @@ hc_cache_set_serve(hc_sock_t * s, int enabled) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); } +int +hc_cache_set_serve(hc_sock_t * s, int enabled) +{ + return _hc_cache_set_serve(s, enabled, false); +} + +int +hc_cache_set_serve_async(hc_sock_t * s, int enabled) +{ + return _hc_cache_set_serve(s, enabled, true); +} /*----------------------------------------------------------------------------* * Strategy @@ -2023,7 +2415,7 @@ hc_cache_set_serve(hc_sock_t * s, int enabled) int hc_strategy_set(hc_sock_t * s /* XXX */) { - return LIBHICNCTRL_SUCCESS; + return 0; } /* How to retrieve that from the forwarder ? */ @@ -2042,12 +2434,12 @@ hc_strategy_list(hc_sock_t * s, hc_data_t ** data) for (unsigned i = 0; i < ARRAY_SIZE(strategies); i++) { hc_strategy_t * strategy = (hc_strategy_t*)hc_data_get_next(*data); if (!strategy) - return LIBHICNCTRL_FAILURE; + return -1; snprintf(strategy->name, MAXSZ_HC_STRATEGY, "%s", strategies[i]); (*data)->size++; } - return LIBHICNCTRL_SUCCESS; + return 0; } /* /!\ Please update constants in header file upon changes */ @@ -2065,7 +2457,7 @@ hc_strategy_snprintf(char * s, size_t size, hc_strategy_t * strategy) int hc_wldr_set(hc_sock_t * s /* XXX */) { - return LIBHICNCTRL_SUCCESS; + return 0; } /*----------------------------------------------------------------------------* @@ -2075,25 +2467,25 @@ hc_wldr_set(hc_sock_t * s /* XXX */) int hc_mapme_set(hc_sock_t * s, int enabled) { - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_mapme_set_discovery(hc_sock_t * s, int enabled) { - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_mapme_set_timescale(hc_sock_t * s, double timescale) { - return LIBHICNCTRL_SUCCESS; + return 0; } int hc_mapme_set_retx(hc_sock_t * s, double timescale) { - return LIBHICNCTRL_SUCCESS; + return 0; } /*----------------------------------------------------------------------------* @@ -2105,10 +2497,10 @@ hc_mapme_set_retx(hc_sock_t * s, double timescale) /* POLICY CREATE */ int -hc_policy_create(hc_sock_t * s, hc_policy_t * policy) +_hc_policy_create(hc_sock_t * s, hc_policy_t * policy, bool async) { if (!IS_VALID_FAMILY(policy->family)) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -2118,7 +2510,7 @@ hc_policy_create(hc_sock_t * s, hc_policy_t * policy) .messageType = REQUEST_LIGHT, .commandID = ADD_POLICY, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -2137,16 +2529,28 @@ hc_policy_create(hc_sock_t * s, hc_policy_t * policy) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_policy_create(hc_sock_t * s, hc_policy_t * policy) +{ + return _hc_policy_create(s, policy, false); +} + +int +hc_policy_create_async(hc_sock_t * s, hc_policy_t * policy) +{ + return _hc_policy_create(s, policy, true); } /* POLICY DELETE */ int -hc_policy_delete(hc_sock_t * s, hc_policy_t * policy) +_hc_policy_delete(hc_sock_t * s, hc_policy_t * policy, bool async) { if (!IS_VALID_FAMILY(policy->family)) - return LIBHICNCTRL_FAILURE; + return -1; struct { header_control_message hdr; @@ -2156,7 +2560,7 @@ hc_policy_delete(hc_sock_t * s, hc_policy_t * policy) .messageType = REQUEST_LIGHT, .commandID = REMOVE_POLICY, .length = 1, - .seqNum = s->send_seq, + .seqNum = 0, }, .payload = { /* we use IPv6 which is the longest address */ @@ -2174,13 +2578,25 @@ hc_policy_delete(hc_sock_t * s, hc_policy_t * policy) .parse = NULL, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, NULL, async); +} + +int +hc_policy_delete(hc_sock_t * s, hc_policy_t * policy) +{ + return _hc_policy_delete(s, policy, false); +} + +int +hc_policy_delete_async(hc_sock_t * s, hc_policy_t * policy) +{ + return _hc_policy_delete(s, policy, true); } /* POLICY LIST */ int -hc_policy_list(hc_sock_t * s, hc_data_t ** pdata) +_hc_policy_list(hc_sock_t * s, hc_data_t ** pdata, bool async) { struct { header_control_message hdr; @@ -2189,7 +2605,7 @@ hc_policy_list(hc_sock_t * s, hc_data_t ** pdata) .messageType = REQUEST_LIGHT, .commandID = LIST_POLICIES, .length = 0, - .seqNum = s->send_seq, + .seqNum = 0, }, }; @@ -2201,7 +2617,19 @@ hc_policy_list(hc_sock_t * s, hc_data_t ** pdata) .parse = (HC_PARSE)hc_policy_parse, }; - return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata); + return hc_execute_command(s, (hc_msg_t*)&msg, sizeof(msg), ¶ms, pdata, async); +} + +int +hc_policy_list(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_policy_list(s, pdata, false); +} + +int +hc_policy_list_async(hc_sock_t * s, hc_data_t ** pdata) +{ + return _hc_policy_list(s, pdata, true); } /* POLICY PARSE */ @@ -2212,11 +2640,11 @@ hc_policy_parse(void * in, hc_policy_t * policy) list_policies_command * cmd = (list_policies_command *) in; if (!IS_VALID_ADDR_TYPE(cmd->addressType)) - return LIBHICNCTRL_FAILURE; + return -1; int family = map_from_addr_type[cmd->addressType]; if (!IS_VALID_FAMILY(family)) - return LIBHICNCTRL_FAILURE; + return -1; *policy = (hc_policy_t) { .family = family, @@ -2224,7 +2652,7 @@ hc_policy_parse(void * in, hc_policy_t * policy) .len = cmd->len, .policy = cmd->policy, }; - return LIBHICNCTRL_SUCCESS; + return 0; } /* POLICY SNPRINTF */ @@ -2233,7 +2661,7 @@ hc_policy_parse(void * in, hc_policy_t * policy) int hc_policy_snprintf(char * s, size_t size, hc_policy_t * policy) { - return LIBHICNCTRL_SUCCESS; + return 0; } #endif /* WITH_POLICY */ diff --git a/hicn-light/CMakeLists.txt b/hicn-light/CMakeLists.txt index 1e5805d68..a632380fc 100644 --- a/hicn-light/CMakeLists.txt +++ b/hicn-light/CMakeLists.txt @@ -39,7 +39,6 @@ if(NOT WIN32) else () set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /wd4996") endif () - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g") if(${CMAKE_SYSTEM_NAME} STREQUAL "Android") message("############ Detected cross compile for $ENV{CMAKE_SYSTEM_NAME}") @@ -72,7 +71,7 @@ else() endif () endif() -include( Packaging ) +include(Packaging) find_package(Threads REQUIRED) @@ -108,3 +107,10 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "AppleClang") endif() add_subdirectory(src/hicn) + +# Install service file in linux systems +include(ServiceScript) +install_service_script( + ${CMAKE_CURRENT_SOURCE_DIR}/config/hicn-light.service + COMPONENT ${HICN_LIGHT} +)
\ No newline at end of file diff --git a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c index f704d237e..ab0e0e6d8 100644 --- a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c +++ b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c @@ -63,6 +63,7 @@ static int payloadLengthController[LAST_COMMAND_VALUE] = { sizeof(add_route_command), sizeof(list_routes_command), // needed when get response from FWD sizeof(remove_connection_command), + sizeof(remove_listener_command), sizeof(remove_route_command), sizeof(cache_store_command), sizeof(cache_serve_command), diff --git a/hicn-light/src/hicn/config/CMakeLists.txt b/hicn-light/src/hicn/config/CMakeLists.txt index b1e475aee..45f36e8ff 100644 --- a/hicn-light/src/hicn/config/CMakeLists.txt +++ b/hicn-light/src/hicn/config/CMakeLists.txt @@ -35,6 +35,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/controlListPolicies.h ${CMAKE_CURRENT_SOURCE_DIR}/controlQuit.h ${CMAKE_CURRENT_SOURCE_DIR}/controlRemove.h + ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveListener.h ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveConnection.h ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveRoute.h ${CMAKE_CURRENT_SOURCE_DIR}/controlRemovePolicy.h @@ -78,6 +79,7 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/controlListPolicies.c ${CMAKE_CURRENT_SOURCE_DIR}/controlQuit.c ${CMAKE_CURRENT_SOURCE_DIR}/controlRemove.c + ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveListener.c ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveConnection.c ${CMAKE_CURRENT_SOURCE_DIR}/controlRemoveRoute.c ${CMAKE_CURRENT_SOURCE_DIR}/controlRemovePolicy.c diff --git a/hicn-light/src/hicn/config/configuration.c b/hicn-light/src/hicn/config/configuration.c index b14ea551a..2b63a32a0 100644 --- a/hicn-light/src/hicn/config/configuration.c +++ b/hicn-light/src/hicn/config/configuration.c @@ -346,7 +346,6 @@ static void configuration_SendResponse(Configuration *config, struct iovec *msg, if (conn == NULL) { return; } - connection_SendIOVBuffer(conn, msg, 2); } @@ -450,6 +449,10 @@ struct iovec *configuration_ProcessCreateTunnel(Configuration *config, /* Hook: new connection created through the control protocol */ forwarder_onConnectionEvent(config->forwarder, conn, CONNECTION_EVENT_UPDATE); #endif /* WITH_MAPME */ + if (source) + addressDestroy(&source); + if (destination) + addressDestroy(&destination); success = true; #else @@ -472,6 +475,70 @@ ERR: return utils_CreateNack(header, control, sizeof(add_connection_command)); } +struct iovec *configuration_ProcessRemoveListener(Configuration *config, + struct iovec *request, + unsigned ingressId) { + header_control_message *header = request[0].iov_base; + remove_listener_command *control = request[1].iov_base; + + bool success = false; + + const char *symbolicOrListenerid = control->symbolicOrListenerid; + int listenerId = -1; + ListenerSet *listenerSet = forwarder_GetListenerSet(config->forwarder); + if (utils_IsNumber(symbolicOrListenerid)) { + // case for connid as input + listenerId = (unsigned)strtold(symbolicOrListenerid, NULL); + } else { + listenerId = listenerSet_FindIdByListenerName(listenerSet, symbolicOrListenerid); + } + + if (listenerId >= 0) { + + ConnectionTable *connTable = forwarder_GetConnectionTable(config->forwarder); + ListenerOps *listenerOps = listenerSet_FindById(listenerSet, listenerId); + if (listenerOps) { + ConnectionList *connectionList =connectionTable_GetEntries(connTable); + for (size_t i =0; i < connectionList_Length(connectionList); i++) { + Connection *connection = connectionList_Get(connectionList, i); + const AddressPair *addressPair = connection_GetAddressPair(connection); + const Address *address = addressPair_GetLocal(addressPair); + if (addressEquals(listenerOps->getListenAddress(listenerOps),address)) { + // case for connid as input + unsigned connid = connection_GetConnectionId(connection); + // remove connection from the FIB + forwarder_RemoveConnectionIdFromRoutes(config->forwarder, connid); + // remove connection + connectionTable_RemoveById(connTable, connid); + const char *symbolicConnection = symbolicNameTable_GetNameByIndex(config->symbolicNameTable,connid); + symbolicNameTable_Remove(config->symbolicNameTable, symbolicConnection); + } + } + // remove listener + listenerSet_RemoveById(listenerSet, listenerId); + success = true; + } else { + logger_Log(forwarder_GetLogger(config->forwarder), LoggerFacility_IO, + PARCLogLevel_Error, __func__, + "Listener Id not found, check list listeners"); + } + } + + // generate ACK/NACK + struct iovec *response; + + if (success) { // ACK + response = + utils_CreateAck(header, control, sizeof(remove_listener_command)); + } else { // NACK + response = + utils_CreateNack(header, control, sizeof(remove_connection_command)); + } + + return response; +} + + /** * Add an IP-based tunnel. * @@ -493,7 +560,6 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config, const char *symbolicOrConnid = control->symbolicOrConnid; ConnectionTable *table = forwarder_GetConnectionTable(config->forwarder); - if (strcmp(symbolicOrConnid, "SELF") == 0) { forwarder_RemoveConnectionIdFromRoutes(config->forwarder, ingressId); connectionTable_RemoveById(table, ingressId); @@ -515,6 +581,9 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config, forwarder_RemoveConnectionIdFromRoutes(config->forwarder, connid); // remove connection connectionTable_RemoveById(table, connid); + // remove connection from symbolicNameTable + const char *symbolicConnection = symbolicNameTable_GetNameByIndex(config->symbolicNameTable,connid); + symbolicNameTable_Remove(config->symbolicNameTable, symbolicConnection); #ifdef WITH_MAPME /* Hook: new connection created through the control protocol */ @@ -568,6 +637,8 @@ struct iovec *configuration_ProcessRemoveTunnel(Configuration *config, } } + + // generate ACK/NACK struct iovec *response; @@ -612,8 +683,8 @@ struct iovec *configuration_ProcessConnectionList(Configuration *config, list_connections_command *listConnectionsCommand = (list_connections_command *)(payloadResponse + (i * sizeof(list_connections_command))); - // set structure fields + listConnectionsCommand->connid = connection_GetConnectionId(original); const char *connectionName = symbolicNameTable_GetNameByIndex(config->symbolicNameTable, connection_GetConnectionId(original)); @@ -1202,7 +1273,6 @@ struct iovec *configuration_DispatchCommand(Configuration *config, struct iovec *control, unsigned ingressId) { struct iovec *response = NULL; - switch (command) { case ADD_LISTENER: response = configurationListeners_Add(config, control, ingressId); @@ -1229,6 +1299,10 @@ struct iovec *configuration_DispatchCommand(Configuration *config, response = configuration_ProcessRemoveTunnel(config, control, ingressId); break; + case REMOVE_LISTENER: + response = configuration_ProcessRemoveListener(config, control, ingressId); + break; + case REMOVE_ROUTE: response = configuration_ProcessUnregisterHicnPrefix(config, control); break; diff --git a/hicn-light/src/hicn/config/configurationListeners.c b/hicn-light/src/hicn/config/configurationListeners.c index 86d8a215a..c321007e2 100644 --- a/hicn-light/src/hicn/config/configurationListeners.c +++ b/hicn-light/src/hicn/config/configurationListeners.c @@ -317,6 +317,7 @@ static bool _setupTcpListenerOnInet6Light(Forwarder *forwarder, char *listenerNa * Create a new IPV6/UDP listener. * * @param [in,out] forwarder The hicn-light forwarder instance + * @param [in] listenerName The name of the listener * @param [in] addr6 The ipv6 address in network byte order * @param [in] port The port number in network byte order * @param [in] interfaceName The name of the interface to bind the socket @@ -615,7 +616,8 @@ void configurationListeners_SetutpLocalIPv4(const Configuration *config, Forwarder *forwarder = configuration_GetForwarder(config); in_addr_t addr = inet_addr("127.0.0.1"); uint16_t network_byte_order_port = htons(port); - char listenerNameUdp[16] = "lo_udp"; + + char listenerNameUdp[16] = "lo_udp"; char listenerNameTcp[16] = "lo_tcp"; char *loopback_interface = "lo"; _setupUdpListenerOnInet(forwarder, listenerNameUdp,(ipv4_addr_t *)&(addr), diff --git a/hicn-light/src/hicn/config/controlAddConnection.c b/hicn-light/src/hicn/config/controlAddConnection.c index e09b61b37..eaa680bde 100644 --- a/hicn-light/src/hicn/config/controlAddConnection.c +++ b/hicn-light/src/hicn/config/controlAddConnection.c @@ -42,11 +42,13 @@ static CommandReturn _controlAddConnection_Execute(CommandParser *parser, // =================================================== +#ifdef __linux__ static CommandReturn _controlAddConnection_HicnHelpExecute( CommandParser *parser, CommandOps *ops, PARCList *args); static CommandReturn _controlAddConnection_HicnExecute(CommandParser *parser, CommandOps *ops, PARCList *args); +#endif static CommandReturn _controlAddConnection_UdpHelpExecute(CommandParser *parser, CommandOps *ops, @@ -65,11 +67,15 @@ static CommandReturn _controlAddConnection_TcpExecute(CommandParser *parser, // =================================================== static const char *_commandAddConnection = "add connection"; +#ifdef __linux__ static const char *_commandAddConnectionHicn = "add connection hicn"; +#endif static const char *_commandAddConnectionUdp = "add connection udp"; static const char *_commandAddConnectionTcp = "add connection tcp"; static const char *_commandAddConnectionHelp = "help add connection"; +#ifdef __linux__ static const char *_commandAddConnectionHicnHelp = "help add connection hicn"; +#endif static const char *_commandAddConnectionUdpHelp = "help add connection udp"; static const char *_commandAddConnectionTcpHelp = "help add connection tcp"; @@ -89,11 +95,13 @@ CommandOps *controlAddConnection_HelpCreate(ControlState *state) { // =================================================== +#ifdef __linux__ static CommandOps *_controlAddConnection_HicnCreate(ControlState *state) { return commandOps_Create(state, _commandAddConnectionHicn, NULL, _controlAddConnection_HicnExecute, commandOps_Destroy); } +#endif static CommandOps *_controlAddConnection_UdpCreate(ControlState *state) { return commandOps_Create(state, _commandAddConnectionUdp, NULL, @@ -108,12 +116,13 @@ static CommandOps *_controlAddConnection_TcpCreate(ControlState *state) { } // =================================================== - +#ifdef __linux__ static CommandOps *_controlAddConnection_HicnHelpCreate(ControlState *state) { return commandOps_Create(state, _commandAddConnectionHicnHelp, NULL, _controlAddConnection_HicnHelpExecute, commandOps_Destroy); } +#endif static CommandOps *_controlAddConnection_UdpHelpCreate(ControlState *state) { return commandOps_Create(state, _commandAddConnectionUdpHelp, NULL, @@ -133,7 +142,9 @@ static CommandReturn _controlAddConnection_HelpExecute(CommandParser *parser, CommandOps *ops, PARCList *args) { printf("Available commands:\n"); +#ifdef __linux__ printf(" %s\n", _commandAddConnectionHicn); +#endif printf(" %s\n", _commandAddConnectionUdp); printf(" %s\n", _commandAddConnectionTcp); printf("\n"); @@ -142,14 +153,17 @@ static CommandReturn _controlAddConnection_HelpExecute(CommandParser *parser, static void _controlAddConnection_Init(CommandParser *parser, CommandOps *ops) { ControlState *state = ops->closure; +#ifdef __linux__ controlState_RegisterCommand(state, _controlAddConnection_HicnHelpCreate(state)); +#endif controlState_RegisterCommand(state, _controlAddConnection_UdpHelpCreate(state)); controlState_RegisterCommand(state, _controlAddConnection_TcpHelpCreate(state)); - +#ifdef __linux__ controlState_RegisterCommand(state, _controlAddConnection_HicnCreate(state)); +#endif controlState_RegisterCommand(state, _controlAddConnection_UdpCreate(state)); controlState_RegisterCommand(state, _controlAddConnection_TcpCreate(state)); } @@ -255,7 +269,9 @@ static CommandReturn _controlAddConnection_IpHelp(CommandParser *parser, CommandOps *ops, PARCList *args, const char *protocol) { +#ifdef __linux__ printf("add connection hicn <symbolic> <remote_ip> <local_ip>\n"); +#endif printf( "add connection udp <symbolic> <remote_ip> <port> <local_ip> <port>\n"); printf( @@ -268,6 +284,7 @@ static CommandReturn _controlAddConnection_IpHelp(CommandParser *parser, return CommandReturn_Success; } +#ifdef __linux__ static CommandReturn _controlAddConnection_HicnHelpExecute( CommandParser *parser, CommandOps *ops, PARCList *args) { _controlAddConnection_IpHelp(parser, ops, args, "hicn"); @@ -303,6 +320,7 @@ static CommandReturn _controlAddConnection_HicnExecute(CommandParser *parser, return _controlAddConnection_CreateTunnel( parser, ops, local_ip, port, remote_ip, port, HICN_CONN, symbolic); } +#endif static CommandReturn _controlAddConnection_UdpHelpExecute(CommandParser *parser, CommandOps *ops, diff --git a/hicn-light/src/hicn/config/controlAddListener.c b/hicn-light/src/hicn/config/controlAddListener.c index c9253425a..cfd061131 100644 --- a/hicn-light/src/hicn/config/controlAddListener.c +++ b/hicn-light/src/hicn/config/controlAddListener.c @@ -58,27 +58,26 @@ static const int _indexProtocol = 2; static const int _indexSymbolic = 3; static const int _indexAddress = 4; static const int _indexPort = 5; -#ifdef __linux__ static const int _indexInterfaceName = 6; -#endif static CommandReturn _controlAddListener_HelpExecute(CommandParser *parser, CommandOps *ops, PARCList *args) { printf("commands:\n"); - printf(" add listener hicn <symbolic> <localAddress> \n"); #ifdef __linux__ + printf(" add listener hicn <symbolic> <localAddress> \n"); +#endif printf(" add listener udp <symbolic> <localAddress> <port> <interface>\n"); printf(" add listener tcp <symbolic> <localAddress> <port> <interface>\n"); -#else - printf(" add listener udp <symbolic> <localAddress> <port>\n"); - printf(" add listener tcp <symbolic> <localAddress> <port>\n"); -#endif printf("\n"); printf( " symbolic: User defined name for listener, must start with " "alpha and be alphanum\n"); +#ifdef __linux__ printf(" protocol: hicn | udp\n"); +#else + printf(" protocol: udp\n"); +#endif printf( " localAddress: IPv4 or IPv6 address (or prefix protocol = hicn) " "assigend to the local interface\n"); @@ -88,23 +87,18 @@ static CommandReturn _controlAddListener_HelpExecute(CommandParser *parser, printf("\n"); printf("Notes:\n"); printf(" The symblic name must be unique or the source will reject it.\n"); +#ifdef __linux__ printf( - " If protocol = hinc: the address 0::0 indicates the main listern, " + " If protocol = hicn: the address 0::0 indicates the main listern, " "for which we can set punting rules.\n"); +#endif return CommandReturn_Success; } -#ifdef __linux__ -static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops, - const char *symbolic, const char *addr, - const char *port, const char *interfaceName, listener_mode mode, - connection_type type) { -#else static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops, const char *symbolic, const char *addr, - const char *port, listener_mode mode, + const char *port, char *interfaceName, listener_mode mode, connection_type type) { -#endif ControlState *state = ops->closure; // allocate command payload @@ -126,9 +120,7 @@ static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops, } // Fill remaining payload fields -#ifdef __linux__ memcpy(addListenerCommand->interfaceName, interfaceName, 16); -#endif addListenerCommand->listenerMode = mode; addListenerCommand->connectionType = type; addListenerCommand->port = htons((uint16_t)atoi(port)); @@ -149,11 +141,7 @@ static CommandReturn _CreateListener(CommandParser *parser, CommandOps *ops, static CommandReturn _controlAddListener_Execute(CommandParser *parser, CommandOps *ops, PARCList *args) { -#ifdef __linux__ if (parcList_Size(args) != 5 && parcList_Size(args) != 7) { -#else - if (parcList_Size(args) != 5 && parcList_Size(args) != 6) { -#endif _controlAddListener_HelpExecute(parser, ops, args); return CommandReturn_Failure; } @@ -169,45 +157,26 @@ static CommandReturn _controlAddListener_Execute(CommandParser *parser, return result; } - const char *host = parcList_GetAtIndex(args, _indexAddress); -#ifdef __linux__ - const char *interfaceName = parcList_GetAtIndex(args, _indexInterfaceName); -#endif const char *protocol = parcList_GetAtIndex(args, _indexProtocol); - + const char *host = parcList_GetAtIndex(args, _indexAddress); + char *interfaceName = parcList_GetAtIndex(args, _indexInterfaceName); if ((strcasecmp("hicn", protocol) == 0)) { const char *port = "1234"; // this is a random port number that will be ignored // here we discard the prefix len if it exists, since we don't use it in // code but we let libhicn to find the right ip address. -#ifdef __linux__ return _CreateListener(parser, ops, symbolic, host, port, "hicn", HICN_MODE, HICN_CONN); -#else - return _CreateListener(parser, ops, symbolic, host, port, HICN_MODE, - HICN_CONN); -#endif } - const char *port = parcList_GetAtIndex(args, _indexPort); if ((strcasecmp("udp", protocol) == 0)) { -#ifdef __linux__ return _CreateListener(parser, ops, symbolic, host, port, interfaceName, IP_MODE, UDP_CONN); -#else - return _CreateListener(parser, ops, symbolic, host, port, IP_MODE, - UDP_CONN); -#endif } else if ((strcasecmp("tcp", protocol) == 0)) { -#ifdef __linux__ return _CreateListener(parser, ops, symbolic, host, port, interfaceName, IP_MODE, TCP_CONN); -#else - return _CreateListener(parser, ops, symbolic, host, port, IP_MODE, - TCP_CONN); -#endif } else { _controlAddListener_HelpExecute(parser, ops, args); return CommandReturn_Failure; diff --git a/hicn-light/src/hicn/config/controlListConnections.c b/hicn-light/src/hicn/config/controlListConnections.c index dbd9707ca..0e8e2c30b 100644 --- a/hicn-light/src/hicn/config/controlListConnections.c +++ b/hicn-light/src/hicn/config/controlListConnections.c @@ -120,6 +120,7 @@ static CommandReturn _controlListConnections_Execute(CommandParser *parser, #endif /* WITH_POLICY */ // Process/Print payload + for (int i = 0; i < receivedHeader->length; i++) { list_connections_command *listConnectionsCommand = (list_connections_command *)(receivedPayload + diff --git a/hicn-light/src/hicn/config/controlRemove.c b/hicn-light/src/hicn/config/controlRemove.c index af833dc8b..ef0c15934 100644 --- a/hicn-light/src/hicn/config/controlRemove.c +++ b/hicn-light/src/hicn/config/controlRemove.c @@ -27,6 +27,7 @@ #include <parc/algol/parc_Memory.h> #include <hicn/config/controlRemove.h> +#include <hicn/config/controlRemoveListener.h> #include <hicn/config/controlRemoveConnection.h> #include <hicn/config/controlRemovePunting.h> #include <hicn/config/controlRemoveRoute.h> @@ -62,6 +63,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser, CommandOps *ops, PARCList *args) { CommandOps *ops_remove_connection = controlRemoveConnection_Create(NULL); + CommandOps *ops_remove_listener = controlRemoveListener_Create(NULL); CommandOps *ops_remove_route = controlRemoveRoute_Create(NULL); CommandOps *ops_remove_punting = controlRemovePunting_Create(NULL); #ifdef WITH_POLICY @@ -70,6 +72,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser, printf("Available commands:\n"); printf(" %s\n", ops_remove_connection->command); + printf(" %s\n", ops_remove_listener->command); printf(" %s\n", ops_remove_route->command); printf(" %s\n", ops_remove_punting->command); #ifdef WITH_POLICY @@ -78,6 +81,7 @@ static CommandReturn _controlRemove_HelpExecute(CommandParser *parser, printf("\n"); commandOps_Destroy(&ops_remove_connection); + commandOps_Destroy(&ops_remove_listener); commandOps_Destroy(&ops_remove_route); commandOps_Destroy(&ops_remove_punting); #ifdef WITH_POLICY @@ -90,8 +94,11 @@ static void _controlRemove_Init(CommandParser *parser, CommandOps *ops) { ControlState *state = ops->closure; controlState_RegisterCommand(state, controlRemoveConnection_HelpCreate(state)); + controlState_RegisterCommand(state, + controlRemoveListener_HelpCreate(state)); controlState_RegisterCommand(state, controlRemoveRoute_HelpCreate(state)); controlState_RegisterCommand(state, controlRemoveConnection_Create(state)); + controlState_RegisterCommand(state, controlRemoveListener_Create(state)); controlState_RegisterCommand(state, controlRemoveRoute_Create(state)); controlState_RegisterCommand(state, controlRemovePunting_Create(state)); controlState_RegisterCommand(state, controlRemovePunting_HelpCreate(state)); diff --git a/hicn-light/src/hicn/config/controlRemoveListener.c b/hicn-light/src/hicn/config/controlRemoveListener.c new file mode 100644 index 000000000..50581a8d9 --- /dev/null +++ b/hicn-light/src/hicn/config/controlRemoveListener.c @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/hicn-light/config.h> + +#include <ctype.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> + +#include <parc/assert/parc_Assert.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_Network.h> +#include <hicn/utils/address.h> + +#include <hicn/config/controlRemoveListener.h> + +#include <hicn/utils/commands.h> +#include <hicn/utils/utils.h> + +static CommandReturn _controlRemoveListener_Execute(CommandParser *parser, + CommandOps *ops, + PARCList *args); +static CommandReturn _controlRemoveListener_HelpExecute(CommandParser *parser, + CommandOps *ops, + PARCList *args); + +// =================================================== + +static const char *_commandRemoveListener = "remove listener"; +static const char *_commandRemoveListenerHelp = "help remove listener"; + +// ==================================================== + +CommandOps *controlRemoveListener_Create(ControlState *state) { + return commandOps_Create(state, _commandRemoveListener, NULL, + _controlRemoveListener_Execute, + commandOps_Destroy); +} + +CommandOps *controlRemoveListener_HelpCreate(ControlState *state) { + return commandOps_Create(state, _commandRemoveListenerHelp, NULL, + _controlRemoveListener_HelpExecute, + commandOps_Destroy); +} + +// ==================================================== + +static CommandReturn _controlRemoveListener_HelpExecute(CommandParser *parser, + CommandOps *ops, + PARCList *args) { + printf("command:\n"); + printf(" remove listener <symbolic|id>\n"); + return CommandReturn_Success; +} + +static CommandReturn _controlRemoveListener_Execute(CommandParser *parser, + CommandOps *ops, + PARCList *args) { + ControlState *state = ops->closure; + + if (parcList_Size(args) != 3) { + _controlRemoveListener_HelpExecute(parser, ops, args); + return false; + } + + if ((strcmp(parcList_GetAtIndex(args, 0), "remove") != 0) || + (strcmp(parcList_GetAtIndex(args, 1), "listener") != 0)) { + _controlRemoveListener_HelpExecute(parser, ops, args); + return false; + } + + const char *listenerId = parcList_GetAtIndex(args, 2); + +if (!utils_ValidateSymbolicName(listenerId) && + !utils_IsNumber(listenerId)) { + printf( + "ERROR: Invalid symbolic or listenerId:\nsymbolic name must begin with an " + "alpha followed by alphanum;\nlistenerId must be an integer\n"); + return CommandReturn_Failure; + } + + // allocate command payload + remove_listener_command *removeListenerCommand = + parcMemory_AllocateAndClear(sizeof(remove_listener_command)); + // fill payload + //removeListenerCommand->listenerId = atoi(listenerId); + strncpy(removeListenerCommand->symbolicOrListenerid, listenerId, strlen(listenerId)); + + // send message and receive response + struct iovec *response = + utils_SendRequest(state, REMOVE_LISTENER, removeListenerCommand, + sizeof(remove_listener_command)); + + if (!response) { // get NULL pointer + return CommandReturn_Failure; + } + + parcMemory_Deallocate(&response); // free iovec pointer + return CommandReturn_Success; +} diff --git a/hicn-light/src/hicn/config/controlRemoveListener.h b/hicn-light/src/hicn/config/controlRemoveListener.h new file mode 100644 index 000000000..794d1e1a9 --- /dev/null +++ b/hicn-light/src/hicn/config/controlRemoveListener.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file control_RemoveConnection.h + * @brief Remove a connection from the connection table + * + * Implements the "remove connection" and "help remove connection" nodes of the + * CLI tree + * + */ + +#ifndef Control_RemoveListener_h +#define Control_RemoveListener_h + +#include <hicn/config/controlState.h> +CommandOps *controlRemoveListener_Create(ControlState *state); +CommandOps *controlRemoveListener_HelpCreate(ControlState *state); +#endif // Control_RemoveListener_h diff --git a/hicn-light/src/hicn/config/symbolicNameTable.c b/hicn-light/src/hicn/config/symbolicNameTable.c index 746c4e647..723039fae 100644 --- a/hicn-light/src/hicn/config/symbolicNameTable.c +++ b/hicn-light/src/hicn/config/symbolicNameTable.c @@ -127,19 +127,36 @@ bool symbolicNameTable_Add(SymbolicNameTable *table, const char *symbolicName, parcAssertTrue(connid < UINT32_MAX, "Parameter connid must be less than %u", UINT32_MAX); - char *key = _createKey(symbolicName); + char *key1 = _createKey(symbolicName); - uint32_t *value = parcMemory_Allocate(sizeof(uint32_t)); - *value = connid; + uint32_t *value1 = parcMemory_Allocate(sizeof(uint32_t)); + *value1 = connid; - bool success = parcHashCodeTable_Add(table->symbolicNameTable, key, value); - success = parcHashCodeTable_Add(table->indexToNameTable, value, key); - if (!success) { - parcMemory_Deallocate((void **)&key); - parcMemory_Deallocate((void **)&value); - } + bool success = parcHashCodeTable_Add(table->symbolicNameTable, key1, value1); + if (!success) + goto ERR_NAME; + + char *key2 = _createKey(symbolicName); + + uint32_t *value2 = parcMemory_Allocate(sizeof(uint32_t)); + *value2 = connid; + success = parcHashCodeTable_Add(table->indexToNameTable, value2, key2); + if (!success) + goto ERR_INDEX; + + goto END; + +ERR_INDEX: + parcMemory_Deallocate((void **)&key2); + parcMemory_Deallocate((void **)&value2); + parcHashCodeTable_Del(table->symbolicNameTable, key1); +ERR_NAME: + parcMemory_Deallocate((void **)&key1); + parcMemory_Deallocate((void **)&value1); +END: return success; + } unsigned symbolicNameTable_Get(SymbolicNameTable *table, @@ -152,9 +169,8 @@ unsigned symbolicNameTable_Get(SymbolicNameTable *table, char *key = _createKey(symbolicName); uint32_t *value = parcHashCodeTable_Get(table->symbolicNameTable, key); - if (value) { + if (value) connid = *value; - } parcMemory_Deallocate((void **)&key); return connid; diff --git a/hicn-light/src/hicn/core/connection.c b/hicn-light/src/hicn/core/connection.c index 821da884d..fb43d4a1e 100644 --- a/hicn-light/src/hicn/core/connection.c +++ b/hicn-light/src/hicn/core/connection.c @@ -32,6 +32,7 @@ #endif /* WITH_POLICY */ struct connection { + const AddressPair *addressPair; IoOperations *ops; diff --git a/hicn-light/src/hicn/io/hicnListener.c b/hicn-light/src/hicn/io/hicnListener.c index 995347d6a..a60c4dd12 100644 --- a/hicn-light/src/hicn/io/hicnListener.c +++ b/hicn-light/src/hicn/io/hicnListener.c @@ -82,6 +82,12 @@ static EncapType _getEncapType(const ListenerOps *ops); static int _getSocket(const ListenerOps *ops); static unsigned _createNewConnection(ListenerOps *listener, int fd, const AddressPair *pair); static const Connection * _lookupConnection(ListenerOps * listener, const AddressPair *pair); +static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer); +static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void); +static Address *_createAddressFromPacket(uint8_t *msgBuffer); +static void _handleProbeMessage(ListenerOps * listener, uint8_t *msgBuffer); +static void _handleWldrNotification(ListenerOps *listener, uint8_t *msgBuffer); +static void _readFrameToDiscard(HicnListener *hicn, int fd); static ListenerOps _hicnTemplate = { .context = NULL, @@ -96,8 +102,6 @@ static ListenerOps _hicnTemplate = { .lookupConnection = &_lookupConnection, }; -static void _hicnListener_readcb(int fd, PARCEventType what, void *hicnVoid); - static bool _isEmptyAddressIPv6(Address *address) { struct sockaddr_in6 *addr6 = parcMemory_AllocateAndClear(sizeof(struct sockaddr_in6)); @@ -118,6 +122,100 @@ static bool _isEmptyAddressIPv6(Address *address) { return res; } +static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer) { + HicnListener * hicn = (HicnListener*)listener->context; + Message *message = NULL; + + ssize_t readLength = read(fd, msgBuffer, MTU_SIZE); + + if (readLength < 0) { + printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno)); + return message; + } + + size_t packetLength = messageHandler_GetTotalPacketLength(msgBuffer); + + if (readLength != packetLength) { + parcMemory_Deallocate((void **)&msgBuffer); + return message; + } + + if (messageHandler_IsTCP(msgBuffer)) { + MessagePacketType pktType; + unsigned connid = 0; + if (messageHandler_IsData(msgBuffer)) { + pktType = MessagePacketType_ContentObject; + if (hicn->connection_id == -1) { + parcMemory_Deallocate((void **)&msgBuffer); + return message; + } else { + connid = hicn->connection_id; + } + } else if (messageHandler_IsInterest(msgBuffer)) { + // notice that the connections for the interest (the one that we create at + // run time) uses as a local address 0::0, so the main tun + pktType = MessagePacketType_Interest; + Address *packetAddr = _createAddressFromPacket(msgBuffer); + + AddressPair *pair_find = addressPair_Create(packetAddr, /* dummy */ hicn->localAddress); + const Connection *conn = _lookupConnection(listener, pair_find); + addressPair_Release(&pair_find); + if (conn == NULL) { + AddressPair *pair = addressPair_Create(hicn->localAddress, packetAddr); + connid = _createNewConnection(listener, fd, pair); + addressPair_Release(&pair); + } else { + connid = connection_GetConnectionId(conn); + } + addressDestroy(&packetAddr); + } else { + printf("Got a packet that is not a data nor an interest, drop it!\n"); + parcMemory_Deallocate((void **)&msgBuffer); + return message; + } + + message = message_CreateFromByteArray(connid, msgBuffer, pktType, + forwarder_GetTicks(hicn->forwarder), + forwarder_GetLogger(hicn->forwarder)); + if (message == NULL) { + parcMemory_Deallocate((void **)&msgBuffer); + } + } else if (messageHandler_IsWldrNotification(msgBuffer)) { + _handleWldrNotification(listener, msgBuffer); + } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) { + _handleProbeMessage(listener, msgBuffer); + } else { + messageHandler_handleHooks(hicn->forwarder, msgBuffer, listener, fd, NULL); + parcMemory_Deallocate((void **)&msgBuffer); + } + + return message; +} + +static void _receivePacket(ListenerOps * listener, int fd) { + HicnListener * hicn = (HicnListener*)listener->context; + Message *msg = NULL; + uint8_t *msgBuffer = parcMemory_AllocateAndClear(MTU_SIZE); + msg = _readMessage(listener, fd, msgBuffer); + + if (msg) { + forwarder_Receive(hicn->forwarder, msg); + } +} + +static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void) { + ListenerOps * listener = (ListenerOps *)listener_void; + HicnListener *hicn = (HicnListener *)listener->context; + + if (hicn->inetFamily == IPv4 || hicn->inetFamily == IPv6) { + if (what & PARCEventType_Read) { + _receivePacket(listener, fd); + } + } else { + _readFrameToDiscard(hicn, fd); + } +} + static bool _isEmptyAddressIPv4(Address *address) { bool res = false; @@ -133,13 +231,8 @@ ListenerOps *hicnListener_CreateInet(Forwarder *forwarder, char *symbolic, hicn->forwarder = forwarder; hicn->listenerName = parcMemory_StringDuplicate(symbolic, strlen(symbolic)); - hicn->logger = logger_Acquire(forwarder_GetLogger(forwarder)); - - hicn->logger = logger_Acquire(forwarder_GetLogger(forwarder)); hicn->conn_id = forwarder_GetNextConnectionId(forwarder); - hicn->localAddress = addressCopy(address); - hicn->inetFamily = IPv4; hicn->connection_id = -1; @@ -322,7 +415,7 @@ bool _hicnListener_BindInet6(ListenerOps *ops, const Address *remoteAddress) { if (logger_IsLoggable(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug)) { logger_Log(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__, - "hicn_bild failed %d %s", res, hicn_socket_strerror(res)); + "hicn_bind failed %d %s", res, hicn_socket_strerror(res)); } } else { result = true; @@ -354,7 +447,7 @@ bool _hicnListener_BindInet(ListenerOps *ops, const Address *remoteAddress) { if (logger_IsLoggable(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug)) { logger_Log(hicn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__, - "hicn_bild failed %d %s", res, hicn_socket_strerror(res)); + "hicn_bind failed %d %s", res, hicn_socket_strerror(res)); } } else { result = true; @@ -438,6 +531,7 @@ static const char *_getListenerName(const ListenerOps *ops) { HicnListener *hicn = (HicnListener *)ops->context; return hicn->listenerName; } + static const char *_getInterfaceName(const ListenerOps *ops) { const char *interfaceName = ""; return interfaceName; @@ -601,96 +695,6 @@ static void _handleWldrNotification(ListenerOps *listener, uint8_t *msgBuffer) { } -static Message *_readMessage(ListenerOps * listener, int fd, uint8_t *msgBuffer) { - HicnListener * hicn = (HicnListener*)listener->context; - Message *message = NULL; - - ssize_t readLength = read(fd, msgBuffer, MTU_SIZE); - - if (readLength < 0) { - printf("read failed %d: (%d) %s\n", fd, errno, strerror(errno)); - return message; - } - - size_t packetLength = messageHandler_GetTotalPacketLength(msgBuffer); - if (readLength != packetLength) { - parcMemory_Deallocate((void **)&msgBuffer); - return message; - } - if (messageHandler_IsTCP(msgBuffer)) { - MessagePacketType pktType; - unsigned connid = 0; - if (messageHandler_IsData(msgBuffer)) { - pktType = MessagePacketType_ContentObject; - if (hicn->connection_id == -1) { - parcMemory_Deallocate((void **)&msgBuffer); - return message; - } else { - connid = hicn->connection_id; - } - } else if (messageHandler_IsInterest(msgBuffer)) { - // notice that the connections for the interest (the one that we create at - // run time) uses as a local address 0::0, so the main tun - pktType = MessagePacketType_Interest; - Address *packetAddr = _createAddressFromPacket(msgBuffer); - AddressPair *pair_find = addressPair_Create(packetAddr, /* dummy */ hicn->localAddress); - const Connection *conn = _lookupConnection(listener, pair_find); - addressPair_Release(&pair_find); - if (conn == NULL) { - AddressPair *pair = addressPair_Create(hicn->localAddress, packetAddr); - connid = _createNewConnection(listener, fd, pair); - addressPair_Release(&pair); - } else { - connid = connection_GetConnectionId(conn); - } - addressDestroy(&packetAddr); - } else { - printf("Got a packet that is not a data nor an interest, drop it!\n"); - parcMemory_Deallocate((void **)&msgBuffer); - return message; - } - - message = message_CreateFromByteArray(connid, msgBuffer, pktType, - forwarder_GetTicks(hicn->forwarder), - forwarder_GetLogger(hicn->forwarder)); - if (message == NULL) { - parcMemory_Deallocate((void **)&msgBuffer); - } - } else if (messageHandler_IsWldrNotification(msgBuffer)) { - _handleWldrNotification(listener, msgBuffer); - } else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) { - _handleProbeMessage(listener, msgBuffer); - } else { - messageHandler_handleHooks(hicn->forwarder, msgBuffer, listener, fd, NULL); - parcMemory_Deallocate((void **)&msgBuffer); - } - - return message; -} - -static void _receivePacket(ListenerOps * listener, int fd) { - HicnListener * hicn = (HicnListener*)listener->context; - Message *msg = NULL; - uint8_t *msgBuffer = parcMemory_AllocateAndClear(MTU_SIZE); - msg = _readMessage(listener, fd, msgBuffer); - - if (msg) { - forwarder_Receive(hicn->forwarder, msg); - } -} - -static void _hicnListener_readcb(int fd, PARCEventType what, void *listener_void) { - ListenerOps * listener = (ListenerOps *)listener_void; - HicnListener *hicn = (HicnListener *)listener->context; - - if (hicn->inetFamily == IPv4 || hicn->inetFamily == IPv6) { - if (what & PARCEventType_Read) { - _receivePacket(listener, fd); - } - } else { - _readFrameToDiscard(hicn, fd); - } -} diff --git a/hicn-light/src/hicn/io/ioOperations.c b/hicn-light/src/hicn/io/ioOperations.c index 31e37a461..b2d346ed8 100644 --- a/hicn-light/src/hicn/io/ioOperations.c +++ b/hicn-light/src/hicn/io/ioOperations.c @@ -41,6 +41,8 @@ const AddressPair *ioOperations_GetAddressPair(const IoOperations *ops) { return ops->getAddressPair(ops); } + + bool ioOperations_IsUp(const IoOperations *ops) { return ops->isUp(ops); } bool ioOperations_IsLocal(const IoOperations *ops) { return ops->isLocal(ops); } diff --git a/hicn-light/src/hicn/io/listenerSet.c b/hicn-light/src/hicn/io/listenerSet.c index 3e44973d7..45dbe887a 100644 --- a/hicn-light/src/hicn/io/listenerSet.c +++ b/hicn-light/src/hicn/io/listenerSet.c @@ -166,3 +166,17 @@ int listenerSet_FindIdByListenerName(const ListenerSet *set, const char *listene return index; } + +void listenerSet_RemoveById(const ListenerSet *set, unsigned id) { + parcAssertNotNull(set, "Parameter set must be non-null"); + + for (size_t i = 0; i < parcArrayList_Size(set->listOfListeners); + i++) { + ListenerOps *ops = parcArrayList_Get(set->listOfListeners, i); + parcAssertNotNull(ops, "Got null listener ops at index %zu", i); + if (ops->getInterfaceIndex(ops) == id) { + parcArrayList_RemoveAtIndex(set->listOfListeners, i); + break; + } + } +} diff --git a/hicn-light/src/hicn/io/listenerSet.h b/hicn-light/src/hicn/io/listenerSet.h index c8937fa02..5779d2af4 100644 --- a/hicn-light/src/hicn/io/listenerSet.h +++ b/hicn-light/src/hicn/io/listenerSet.h @@ -135,7 +135,6 @@ ListenerOps *listenerSet_Get(const ListenerSet *set, size_t index); ListenerOps *listenerSet_Find(const ListenerSet *set, EncapType encapType, const Address *localAddress); - /** * Looks up a listener by its id * @@ -153,6 +152,7 @@ ListenerOps *listenerSet_Find(const ListenerSet *set, EncapType encapType, * @endcode */ ListenerOps *listenerSet_FindById(const ListenerSet *set, unsigned id); + /** * Looks up a listener by its id * @@ -171,4 +171,18 @@ ListenerOps *listenerSet_FindById(const ListenerSet *set, unsigned id); */ int listenerSet_FindIdByListenerName(const ListenerSet *set, const char *listenerName); +/** + * Remove up a listener by its id + * + * <#Paragraphs Of Explanation#> + * + * @param [in] set An allocated listener set + * @param [in] id of the listener + * + * Example: + * @code + * + * @endcode + */ +void listenerSet_RemoveById(const ListenerSet *set, unsigned id); #endif diff --git a/hicn-light/src/hicn/io/streamConnection.c b/hicn-light/src/hicn/io/streamConnection.c index 224f129f7..08ff728d6 100644 --- a/hicn-light/src/hicn/io/streamConnection.c +++ b/hicn-light/src/hicn/io/streamConnection.c @@ -589,6 +589,7 @@ static void _conn_readcb(PARCEventQueue *event, PARCEventType type, // kind of packets while (parcEventBuffer_GetLength(input) >= sizeof(header_control_message) && parcEventBuffer_GetLength(input) >= stream->nextMessageLength) { + if ((command = _isACommand(input)) != LAST_COMMAND_VALUE) { struct iovec *rx; // Get message from the stream and set the stream->nextMessageLength diff --git a/hicn-light/src/hicn/io/tcpListener.c b/hicn-light/src/hicn/io/tcpListener.c index 4464edf28..e2b80c215 100644 --- a/hicn-light/src/hicn/io/tcpListener.c +++ b/hicn-light/src/hicn/io/tcpListener.c @@ -31,7 +31,6 @@ typedef struct tcp_listener { char *listenerName; - Forwarder *forwarder; Logger *logger; @@ -49,10 +48,15 @@ typedef struct tcp_listener { static void _tcpListener_Destroy(_TcpListener **listenerPtr); static void _tcpListener_OpsDestroy(ListenerOps **listenerOpsPtr); + static const char *_tcpListener_ListenerName(const ListenerOps *ops); + static unsigned _tcpListener_OpsGetInterfaceIndex(const ListenerOps *ops); + static const Address *_tcpListener_OpsGetListenAddress(const ListenerOps *ops); + static const char *_tcpListener_InterfaceName(const ListenerOps *ops); + static EncapType _tcpListener_OpsGetEncapType(const ListenerOps *ops); static ListenerOps _tcpTemplate = { @@ -71,6 +75,7 @@ static void _tcpListener_Listen(int, struct sockaddr *, int socklen, ListenerOps *tcpListener_CreateInet6(Forwarder *forwarder, char *listenerName, struct sockaddr_in6 sin6, char *interfaceName) { + _TcpListener *tcp = parcMemory_AllocateAndClear(sizeof(_TcpListener)); parcAssertNotNull(tcp, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(_TcpListener)); @@ -123,7 +128,10 @@ ListenerOps *tcpListener_CreateInet(Forwarder *forwarder, char *listenerName, sizeof(_TcpListener)); tcp->forwarder = forwarder; + tcp->listenerName = parcMemory_StringDuplicate(listenerName, strlen(listenerName)); tcp->logger = logger_Acquire(forwarder_GetLogger(forwarder)); + tcp->interfaceName = parcMemory_StringDuplicate(interfaceName, strlen(interfaceName)); + tcp->listener = dispatcher_CreateListener( forwarder_GetDispatcher(forwarder), _tcpListener_Listen, (void *)tcp, -1, (struct sockaddr *)&sin, sizeof(sin)); @@ -175,7 +183,6 @@ static void _tcpListener_Destroy(_TcpListener **listenerPtr) { parcMemory_Deallocate((void **)&tcp->listenerName); parcMemory_Deallocate((void **)&tcp->interfaceName); - logger_Release(&tcp->logger); dispatcher_DestroyListener(forwarder_GetDispatcher(tcp->forwarder), &tcp->listener); diff --git a/hicn-light/src/hicn/io/udpListener.c b/hicn-light/src/hicn/io/udpListener.c index 050ca104c..f43756a11 100644 --- a/hicn-light/src/hicn/io/udpListener.c +++ b/hicn-light/src/hicn/io/udpListener.c @@ -40,8 +40,8 @@ #define IPv6 6 struct udp_listener { - Forwarder *forwarder; char *listenerName; + Forwarder *forwarder; Logger *logger; PARCEvent *udp_event; @@ -76,6 +76,7 @@ static ListenerOps udpTemplate = { .getInterfaceName = &_getInterfaceName, }; + static void _readcb(int fd, PARCEventType what, void * listener_void); #ifdef __ANDROID__ diff --git a/hicn-light/src/hicn/processor/messageProcessor.c b/hicn-light/src/hicn/processor/messageProcessor.c index 3ca9264b8..6598b9035 100644 --- a/hicn-light/src/hicn/processor/messageProcessor.c +++ b/hicn-light/src/hicn/processor/messageProcessor.c @@ -131,16 +131,18 @@ static void messageProcessor_ForwardToInterfaceId(MessageProcessor *processor, static void messageProcessor_Tick(int fd, PARCEventType type, void *user_data) { - MessageProcessor *processor = (MessageProcessor*)user_data; - uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder); + MessageProcessor *processor = (MessageProcessor*)user_data; + uint64_t now = (uint64_t)forwarder_GetTicks(processor->forwarder); - /* Loop over FIB entries to compute statistics from counters */ - FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder); + /* Loop over FIB entries to compute statistics from counters */ + FibEntryList *fibList = forwarder_GetFibEntries(processor->forwarder); - for (size_t i = 0; i < fibEntryList_Length(fibList); i++) { - FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i); - fibEntry_UpdateStats(entry, now); - } + for (size_t i = 0; i < fibEntryList_Length(fibList); i++) { + FibEntry *entry = (FibEntry *)fibEntryList_Get(fibList, i); + fibEntry_UpdateStats(entry, now); + } + + fibEntryList_Destroy(&fibList); } #endif /* WITH_POLICY */ diff --git a/hicn-light/src/hicn/socket/ops_linux.c b/hicn-light/src/hicn/socket/ops_linux.c index f81ceff37..af41f400f 100644 --- a/hicn-light/src/hicn/socket/ops_linux.c +++ b/hicn-light/src/hicn/socket/ops_linux.c @@ -574,21 +574,17 @@ int _nl_get_ip_addr(uint32_t interface_id, uint8_t address_family, if (address_family == AF_INET6) { if ((payload->ifa_index == interface_id) && (payload->ifa_prefixlen < IPV6_ADDR_LEN * 8)) { - printf("got ip address\n"); memcpy(prefix->address.buffer, RTA_DATA(payload + 1), IPV6_ADDR_LEN); prefix->family = AF_INET6; prefix->len = IPV6_ADDR_LEN_BITS; - printf("returning %d\n", HICN_SOCKET_ERROR_NONE); return HICN_SOCKET_ERROR_NONE; } } else if (address_family == AF_INET) { if ((payload->ifa_index == interface_id) && (payload->ifa_prefixlen < IPV4_ADDR_LEN * 8)) { - printf("got ip address\n"); memcpy(prefix->address.buffer, RTA_DATA(payload + 1), IPV4_ADDR_LEN); prefix->family = AF_INET; prefix->len = IPV4_ADDR_LEN_BITS; - printf("returning %d\n", HICN_SOCKET_ERROR_NONE); return HICN_SOCKET_ERROR_NONE; } } else { @@ -600,7 +596,6 @@ ERR_NL: ERR_RECV: ERR_SEND: ERR_SOCKET: - printf("error getting ip address\n"); return HICN_SOCKET_ERROR_UNSPEC; } @@ -624,7 +619,7 @@ int _nl_set_ip_addr(uint32_t interface_id, ip_prefix_t *prefix) { .payload.ifa_index = interface_id}; /* Set attributes = length/type/value */ - struct rtattr ifa_address = {RTA_LENGTH(ip_prefix_len(prefix)), + struct rtattr ifa_address = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), IFA_ADDRESS}; struct iovec iov[] = { {&msg, sizeof(msg)}, @@ -992,13 +987,13 @@ int _nl_del_lo_route(const ip_prefix_t *prefix) { /* Set attribute = length/type/value */ uint32_t one = 1; - struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), RTA_DST}; + struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), RTA_DST}; struct rtattr a_ifid_lo = {RTA_LENGTH(sizeof(uint32_t)), RTA_OIF}; struct iovec iov[] = { {&msg, sizeof(msg)}, /* Ip address */ {&a_dst, sizeof(a_dst)}, - {(void *)&prefix->address.buffer, ip_prefix_len(prefix)}, + {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)}, /* Interface id */ {&a_ifid_lo, sizeof(a_ifid_lo)}, {&one, sizeof(one)}}; @@ -1154,7 +1149,7 @@ int _nl_add_neigh_proxy(const ip_prefix_t *prefix, }; /* Message attributes = length/type/value */ - struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), NDA_DST}; + struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), NDA_DST}; /* Iovec describing the packets */ struct iovec iov[] = { @@ -1231,7 +1226,7 @@ int _nl_add_in_route_table(const ip_prefix_t *prefix, }; /* Message attributes = length/type/value */ - struct rtattr a_dst = {RTA_LENGTH(ip_prefix_len(prefix)), RTA_DST}; + struct rtattr a_dst = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), RTA_DST}; struct rtattr a_oif = {RTA_LENGTH(sizeof(uint32_t)), RTA_OIF}; /* Iovec describing the packets */ @@ -1239,7 +1234,7 @@ int _nl_add_in_route_table(const ip_prefix_t *prefix, {&msg, sizeof(msg)}, /* Destination prefix / ip address */ {&a_dst, sizeof(a_dst)}, - {(void *)&prefix->address.buffer, ip_prefix_len(prefix)}, + {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)}, /* Output interface */ {&a_oif, sizeof(a_oif)}, {(void *)&interface_id, sizeof(uint32_t)}, @@ -1333,7 +1328,7 @@ int _nl_add_prio_rule(const ip_prefix_t *prefix, uint8_t address_family, if (prefix) { /* Message attributes = length/type/value */ - struct rtattr a_src = {RTA_LENGTH(ip_prefix_len(prefix)), FRA_SRC}; + struct rtattr a_src = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), FRA_SRC}; struct rtattr a_prio = {RTA_LENGTH(sizeof(uint32_t)), FRA_PRIORITY}; /* Iovec describing the packets */ @@ -1341,7 +1336,7 @@ int _nl_add_prio_rule(const ip_prefix_t *prefix, uint8_t address_family, {&msg, sizeof(msg)}, /* Source prefix / prefix */ {&a_src, sizeof(a_src)}, - {(void *)&prefix->address.buffer, ip_prefix_len(prefix)}, + {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)}, /* Priority */ {&a_prio, sizeof(a_prio)}, {(void *)&priority, sizeof(uint32_t)}, @@ -1434,7 +1429,7 @@ int _nl_del_prio_rule(const ip_prefix_t *prefix, uint8_t address_family, /* Message attributes = length/type/value */ if (prefix) { - struct rtattr a_src = {RTA_LENGTH(ip_prefix_len(prefix)), FRA_SRC}; + struct rtattr a_src = {RTA_LENGTH(ip_address_len(&prefix->address, prefix->family)), FRA_SRC}; struct rtattr a_prio = {RTA_LENGTH(sizeof(uint32_t)), FRA_PRIORITY}; /* Iovec describing the packets */ @@ -1442,7 +1437,7 @@ int _nl_del_prio_rule(const ip_prefix_t *prefix, uint8_t address_family, {&msg, sizeof(msg)}, /* Source prefix / prefix */ {&a_src, sizeof(a_src)}, - {(void *)&prefix->address.buffer, ip_prefix_len(prefix)}, + {(void *)&prefix->address.buffer, ip_address_len(&prefix->address, prefix->family)}, /* Priority */ {&a_prio, sizeof(a_prio)}, {(void *)&priority, sizeof(uint32_t)}, diff --git a/hicn-light/src/hicn/utils/commands.h b/hicn-light/src/hicn/utils/commands.h index 834da6259..1a313de6a 100644 --- a/hicn-light/src/hicn/utils/commands.h +++ b/hicn-light/src/hicn/utils/commands.h @@ -58,6 +58,7 @@ typedef enum { ADD_ROUTE, LIST_ROUTES, REMOVE_CONNECTION, + REMOVE_LISTENER, REMOVE_ROUTE, CACHE_STORE, CACHE_SERVE, @@ -124,7 +125,7 @@ typedef struct { uint8_t connectionType; } add_listener_command; -// SIZE=40 +// SIZE=56 //========== [01] ADD CONNECTION ========== @@ -167,11 +168,11 @@ typedef struct { uint32_t connid; uint8_t state; uint8_t admin_state; - char connectionName[16]; char interfaceName[16]; + char connectionName[16]; } list_connections_command; -// SIZE=64 +// SIZE=80 //========== [03] ADD ROUTE ========== @@ -198,14 +199,18 @@ typedef struct { // SIZE=24 //========== [05] REMOVE CONNECTION ========== - typedef struct { char symbolicOrConnid[16]; } remove_connection_command; +//========== [06] REMOVE LISTENER ========== +typedef struct { + char symbolicOrListenerid[16]; +} remove_listener_command; + // SIZE=16 -//========== [06] REMOVE ROUTE ========== +//========== [07] REMOVE ROUTE ========== typedef struct { char symbolicOrConnid[16]; @@ -216,7 +221,7 @@ typedef struct { // SIZE=36 -//========== [07] CACHE STORE ========== +//========== [08] CACHE STORE ========== typedef struct { uint8_t activate; @@ -224,7 +229,7 @@ typedef struct { // SIZE=1 -//========== [08] CACHE SERVE ========== +//========== [09] CACHE SERVE ========== typedef struct { uint8_t activate; @@ -232,15 +237,13 @@ typedef struct { // SIZE=1 -//========== [09] SET STRATEGY ========== +//========== [10] SET STRATEGY ========== typedef enum { SET_STRATEGY_LOADBALANCER, SET_STRATEGY_RANDOM, SET_STRATEGY_RANDOM_PER_DASH_SEGMENT, SET_STRATEGY_LOADBALANCER_WITH_DELAY, - SET_STRATEGY_LOADBALANCER_BY_RATE, - SET_STRATEGY_LOADBALANCER_BEST_ROUTE, LAST_STRATEGY_VALUE } strategy_type; @@ -285,7 +288,7 @@ typedef struct { uint8_t encapType; } list_listeners_command; -// SIZE=24 +// SIZE=56 //========== [14] MAPME ========== @@ -353,9 +356,11 @@ static inline int payloadLengthDaemon(command_id id) { case ADD_ROUTE: return sizeof(add_route_command); case LIST_ROUTES: - return 0; // list routes: payload always 0 + return 0; // list rout`es: payload always 0 case REMOVE_CONNECTION: return sizeof(remove_connection_command); + case REMOVE_LISTENER: + return sizeof(remove_listener_command); case REMOVE_ROUTE: return sizeof(remove_route_command); case CACHE_STORE: diff --git a/hicn-light/src/hicn/utils/utils.c b/hicn-light/src/hicn/utils/utils.c index 61ff9a904..93a3efd81 100644 --- a/hicn-light/src/hicn/utils/utils.c +++ b/hicn-light/src/hicn/utils/utils.c @@ -90,7 +90,6 @@ struct iovec *utils_CreateNack(header_control_message *header, void *payload, parcMemory_AllocateAndClear(sizeof(struct iovec) * 2); header->messageType = NACK_LIGHT; - response[0].iov_base = header; response[0].iov_len = sizeof(header_control_message); response[1].iov_base = payload; diff --git a/hicn-plugin/src/data_pcslookup_node.c b/hicn-plugin/src/data_pcslookup_node.c index fdf855e57..1ae36202f 100644 --- a/hicn-plugin/src/data_pcslookup_node.c +++ b/hicn-plugin/src/data_pcslookup_node.c @@ -14,12 +14,12 @@ */ #include "data_pcslookup.h" +#include "infra.h" #include "mgmt.h" #include "parser.h" -#include "infra.h" +#include "state.h" #include "strategy.h" #include "strategy_dpo_manager.h" -#include "state.h" /* Stats string values */ static char *hicn_data_pcslookup_error_strings[] = { @@ -37,10 +37,9 @@ vlib_node_registration_t hicn_data_pcslookup_node; * hICN node for handling data. It performs a lookup in the PIT. */ static uword -hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, - vlib_frame_t * frame) +hicn_data_pcslookup_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) { - u32 n_left_from, *from, *to_next; hicn_data_pcslookup_next_t next_index; hicn_data_pcslookup_runtime_t *rt; @@ -87,7 +86,8 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { vlib_buffer_t *b1; b1 = vlib_get_buffer (vm, from[1]); - //Prefetch two cache lines-- 128 byte-- so that we load the hicn_buffer_t as well + // Prefetch two cache lines-- 128 byte-- so that we load the + // hicn_buffer_t as well CLIB_PREFETCH (b1, 2 * CLIB_CACHE_LINE_BYTES, STORE); CLIB_PREFETCH (b1->data, CLIB_CACHE_LINE_BYTES, LOAD); } @@ -99,10 +99,9 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, to_next += 1; n_left_to_next -= 1; - b0 = vlib_get_buffer (vm, bi0); hb0 = hicn_get_buffer (b0); - next0 = HICN_DATA_PCSLOOKUP_NEXT_ERROR_DROP; + next0 = HICN_DATA_PCSLOOKUP_NEXT_ERROR_DROP; /* Incr packet counter */ stats.pkts_processed += 1; @@ -111,26 +110,24 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, nameptr = (u8 *) (&name); if (PREDICT_TRUE (ret0 == HICN_ERROR_NONE && - hicn_hashtb_fullhash (nameptr, namelen, - &name_hash) == HICN_ERROR_NONE)) + hicn_hashtb_fullhash (nameptr, namelen, + &name_hash) == + HICN_ERROR_NONE)) { int res = hicn_hashtb_lookup_node (rt->pitcs->pcs_table, nameptr, namelen, name_hash, - !(hb0->flags & - HICN_BUFFER_FLAGS_FACE_IS_APP) - /* take lock */ , + 1 /*is_data. Do not take lock if hit CS */ , &node_id0, &dpo_ctx_id0, &vft_id0, - &is_cs0, - &hash_entry_id, &bucket_id, + &is_cs0, &hash_entry_id, &bucket_id, &bucket_is_overflown); stats.pkts_data_count += 1; #if HICN_FEATURE_CS - if ((res == HICN_ERROR_HASHTB_HASH_NOT_FOUND - || (res == HICN_ERROR_NONE && is_cs0)) - && ((hb0->flags & HICN_BUFFER_FLAGS_FACE_IS_APP))) + if ((res == HICN_ERROR_HASHTB_HASH_NOT_FOUND || + (res == HICN_ERROR_NONE && is_cs0)) && + ((hb0->flags & HICN_BUFFER_FLAGS_FACE_IS_APP))) { next0 = HICN_DATA_PCSLOOKUP_NEXT_STORE_DATA; } @@ -169,9 +166,8 @@ hicn_data_pcslookup_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, * Fix in case of a wrong speculation. Needed to * clone the data in the right frame */ - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, - to_next, n_left_to_next, - bi0, next0); + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, bi0, next0); /* Maybe trace */ if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) && @@ -217,24 +213,22 @@ hicn_data_pcslookup_format_trace (u8 * s, va_list * args) return (s); } - /* * Node registration for the data forwarder node */ /* *INDENT-OFF* */ -VLIB_REGISTER_NODE (hicn_data_pcslookup_node) = +VLIB_REGISTER_NODE(hicn_data_pcslookup_node) = { .function = hicn_data_pcslookup_node_fn, .name = "hicn-data-pcslookup", - .vector_size = sizeof (u32), - .runtime_data_bytes = sizeof (hicn_data_pcslookup_runtime_t), + .vector_size = sizeof(u32), + .runtime_data_bytes = sizeof(hicn_data_pcslookup_runtime_t), .format_trace = hicn_data_pcslookup_format_trace, - .type = VLIB_NODE_TYPE_INTERNAL, - .n_errors = ARRAY_LEN (hicn_data_pcslookup_error_strings), + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN(hicn_data_pcslookup_error_strings), .error_strings = hicn_data_pcslookup_error_strings, .n_next_nodes = HICN_DATA_PCSLOOKUP_N_NEXT, - .next_nodes = - { + .next_nodes = { [HICN_DATA_PCSLOOKUP_NEXT_V4_LOOKUP] = "ip4-lookup", [HICN_DATA_PCSLOOKUP_NEXT_V6_LOOKUP] = "ip6-lookup", [HICN_DATA_PCSLOOKUP_NEXT_STORE_DATA] = "hicn-data-push", diff --git a/hicn-plugin/src/pcs.h b/hicn-plugin/src/pcs.h index c7e8a4b59..d9c48954e 100644 --- a/hicn-plugin/src/pcs.h +++ b/hicn-plugin/src/pcs.h @@ -499,8 +499,16 @@ hicn_pcs_cs_update (vlib_main_t * vm, hicn_pit_cs_t * pitcs, policy_vft->hicn_cs_delete_get (pitcs, policy_state, &node, &pcs_entry, &hash_entry); - hicn_pcs_cs_delete (vm, pitcs, &pcs_entry, &node, hash_entry, NULL, - NULL); + /* + * We don't have to decrease the lock (therefore we cannot + * use hicn_pcs_cs_delete function) + */ + policy_vft->hicn_cs_dequeue (pitcs, node, pcs_entry, policy_state); + + hicn_cs_delete_trimmed (pitcs, &pcs_entry, hash_entry, &node, vm); + + /* Update the global CS counter */ + pitcs->pcs_cs_count--; } } else @@ -541,14 +549,14 @@ hicn_pcs_cs_delete (vlib_main_t * vm, hicn_pit_cs_t * pitcs, } /* A data could have been inserted in the CS through a push. In this case locks == 0 */ - if (hash_entry->locks == 0 || hash_entry->locks == 1) + hash_entry->locks--; + if (hash_entry->locks == 0) { hicn_pcs_delete_internal (pitcs, pcs_entryp, hash_entry, nodep, vm, dpo_vft, hicn_dpo_id); } else { - hash_entry->locks--; hash_entry->he_flags |= HICN_HASH_ENTRY_FLAG_DELETED; } } @@ -603,8 +611,16 @@ hicn_pcs_cs_insert (vlib_main_t * vm, hicn_pit_cs_t * pitcs, policy_vft->hicn_cs_delete_get (pitcs, policy_state, &node, &pcs_entry, &hash_entry); - hicn_pcs_cs_delete (vm, pitcs, &pcs_entry, &node, hash_entry, NULL, - NULL); + /* + * We don't have to decrease the lock (therefore we cannot + * use hicn_pcs_cs_delete function) + */ + policy_vft->hicn_cs_dequeue (pitcs, node, pcs_entry, policy_state); + + hicn_cs_delete_trimmed (pitcs, &pcs_entry, hash_entry, &node, vm); + + /* Update the global CS counter */ + pitcs->pcs_cs_count--; } } return ret; diff --git a/lib/src/name.c b/lib/src/name.c index a19971d49..d5ee1d520 100644 --- a/lib/src/name.c +++ b/lib/src/name.c @@ -74,7 +74,7 @@ hicn_name_create (const char *ip_address, u32 id, hicn_name_t * name) } int -hicn_name_create_from_prefix (const ip_prefix_t * prefix, u32 id, +hicn_name_create_from_ip_prefix (const ip_prefix_t * prefix, u32 id, hicn_name_t * name) { switch (prefix->family) diff --git a/lib/src/util/ip_address.c b/lib/src/util/ip_address.c index 2cf2aaef3..7afd3e2a4 100644 --- a/lib/src/util/ip_address.c +++ b/lib/src/util/ip_address.c @@ -190,9 +190,9 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix) char *addr = strdup (ip_address_str); p = strchr (addr, '/'); - if (!p) - ip_prefix->len = 0; // until we get the ip address family - else { + if (!p) { + ip_prefix->len = ~0; // until we get the ip address family + } else { ip_prefix->len = strtoul (p + 1, &eptr, 10); *p = 0; } @@ -202,11 +202,15 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix) switch (ip_prefix->family) { case AF_INET6: + if (ip_prefix->len == (u8)~0) + ip_prefix->len = IPV6_ADDR_LEN_BITS; if (ip_prefix->len > IPV6_ADDR_LEN_BITS) goto ERR; pton_fd = inet_pton (AF_INET6, addr, &ip_prefix->address.buffer); break; case AF_INET: + if (ip_prefix->len == (u8)~0) + ip_prefix->len = IPV4_ADDR_LEN_BITS; if (ip_prefix->len > IPV4_ADDR_LEN_BITS) goto ERR; pton_fd = inet_pton (AF_INET, addr, &ip_prefix->address.buffer); @@ -220,6 +224,7 @@ ip_prefix_pton (const char *ip_address_str, ip_prefix_t * ip_prefix) if (pton_fd <= 0) goto ERR; + free(addr); return 1; ERR: free (addr); diff --git a/libtransport/src/hicn/transport/CMakeLists.txt b/libtransport/src/hicn/transport/CMakeLists.txt index 6e0ae5b88..22acdcb7f 100644 --- a/libtransport/src/hicn/transport/CMakeLists.txt +++ b/libtransport/src/hicn/transport/CMakeLists.txt @@ -25,6 +25,8 @@ add_subdirectory(portability) add_subdirectory(protocols) add_subdirectory(utils) +include(Packager) +extract_version() configure_file("config.h.in" "config.h" @ONLY) install( FILES ${CMAKE_CURRENT_BINARY_DIR}/config.h diff --git a/libtransport/src/hicn/transport/config.h.in b/libtransport/src/hicn/transport/config.h.in index 7d47c2b3f..ef47affda 100644 --- a/libtransport/src/hicn/transport/config.h.in +++ b/libtransport/src/hicn/transport/config.h.in @@ -17,6 +17,10 @@ #cmakedefine TRANSPORT_HAVE_PTHREAD 1 +#define HICNTRANSPORT_VERSION_MAJOR "@VERSION_MAJOR@" +#define HICNTRANSPORT_VERSION_MINOR "@VERSION_MINOR@" +#define HICNTRANSPORT_VERSION_REVISION "@VERSION_REVISION@" + #ifndef ASIO_STANDALONE #cmakedefine ASIO_STANDALONE #endif diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 3ea37c938..5796308b4 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -417,8 +417,14 @@ class Portal { pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, this, std::placeholders::_1, hash))); - pending_interest_hash_table_.emplace( - std::make_pair(hash, std::move(pending_interest))); + + auto it = pending_interest_hash_table_.find(hash); + if(it != pending_interest_hash_table_.end()){ + it->second->cancelTimer(); + it->second = std::move(pending_interest); + }else{ + pending_interest_hash_table_[hash] = std::move(pending_interest); + } } /** @@ -644,7 +650,7 @@ class Portal { std::move(interest_ptr->getInterest()), std::move(content_object)); } } else { - TRANSPORT_LOGW("No pending interests for current content (%s)", + TRANSPORT_LOGD("No pending interests for current content (%s)", content_object->getName().toString().c_str()); } } diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index a5cca78a6..0c2c73623 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.cc b/libtransport/src/hicn/transport/interfaces/callbacks.cc new file mode 100644 index 000000000..2574e7720 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/callbacks.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callbacks.h" + +namespace transport { + +namespace interface { + +std::nullptr_t VOID_HANDLER = nullptr; + +} // namespace interface + +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 24f47eb75..7194cca42 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -18,6 +18,8 @@ #include <functional> #include <system_error> +#include <hicn/transport/core/facade.h> + namespace utils { class MemBuf; } @@ -105,6 +107,8 @@ using ProducerContentObjectCallback = using ProducerInterestCallback = std::function<void(ProducerSocket &, core::Interest &)>; +extern std::nullptr_t VOID_HANDLER; + } // namespace interface } // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 740f9f77c..c1a45ebb7 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/rtc_socket_producer.h> #include <stdlib.h> #include <time.h> @@ -31,9 +32,10 @@ #define HICN_MAX_DATA_SEQ 0xefffffff -//slow production rate param -#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed - // through experiments +// slow production rate param +#define MIN_PRODUCTION_RATE \ + 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,13 +65,13 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -81,13 +83,13 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); + setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -111,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::scheduleRoundTimer(){ - round_timer_->expires_from_now( - std::chrono::milliseconds(STATS_INTERVAL_DURATION)); - round_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - updateStats(); - }); +void RTCProducerSocket::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); } void RTCProducerSocket::updateStats() { @@ -148,111 +150,136 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - ContentObject content_object(flowName_.setSuffix(currentSeg_)); - + auto content_object = + std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_.load())); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); payload->append(TIMESTAMP_LEN); payload->prependChain(std::move(buffer)); - content_object.appendPayload(std::move(payload)); + content_object->appendPayload(std::move(payload)); + + content_object->setLifetime(500); // XXX this should be set by the APP - content_object.setLifetime(500); // XXX this should be set by the APP + content_object->setPathLabel(prodLabel_); - content_object.setPathLabel(prodLabel_); + output_buffer_.insert(std::static_pointer_cast<ContentObject>( + content_object->shared_from_this())); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*this, *content_object); } - portal_->sendContentObject(content_object); + portal_->sendContentObject(*content_object); - //remove interests from the interest cache if it exists - if(!seqs_map_.empty()){ + if (on_content_object_output_) { + on_content_object_output_(*this, *content_object); + } - utils::SpinLock::Acquire locked(interests_cache_lock_); + uint32_t old_curr = currentSeg_.load(); + currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; - auto it_seqs = seqs_map_.find(currentSeg_); - if(it_seqs != seqs_map_.end()){ - auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ - timers_map_.erase(it_timers); - break; - } - } - seqs_map_.erase(it_seqs); + // remove interests from the interest cache if it exists + // this generates nacks that will tell to the consumer + // that a new data packet was produced + if (!seqs_map_.empty()) { + utils::SpinLock::Acquire locked(interests_cache_lock_); + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first != old_curr) sendNack(it->first); } + seqs_map_.clear(); + timers_map_.clear(); } - - currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ; } void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { uint32_t interestSeg = interest->getName().getSuffix(); uint32_t lifetime = interest->getLifetime(); - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, *interest); } uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - if(interestSeg > HICN_MAX_DATA_SEQ){ + if (interestSeg > HICN_MAX_DATA_SEQ) { sendNack(interestSeg); return; } + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(*interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_(*this, *interest); + } + + if (on_content_object_output_) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + return; + } else { + if (on_interest_process_) { + on_interest_process_(*this, *interest); + } + } + // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ - + if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()) { utils::SpinLock::Acquire locked(interests_cache_lock_); uint64_t next_timer = ~0; - if(!timers_map_.empty()){ + if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); - //check if the seq number exists already + // check if the seq number exists already auto it_seqs = seqs_map_.find(interestSeg); - if(it_seqs != seqs_map_.end()){ - //the seq already exists - if(expiration < it_seqs->second){ + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { // we need to update the timer becasue we got a smaller one // 1) remove the entry from the multimap // 2) update this entry auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { timers_map_.erase(it_timers); break; } } - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); it_seqs->second = expiration; - }else{ - //nothing to do here - return; + } else { + // nothing to do here + return; } - }else{ + } else { // add the new seq - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); - seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration)); } - //here we have at least one interest in the queue, we need to start or - //update the timer - if(!timer_on_){ - //set timeout + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!timer_on_) { + // set timeout timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } else { - //re-schedule the timer because a new interest will expires sooner - if(next_timer > timers_map_.begin()->first){ + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { interests_cache_timer_->cancel(); scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -265,44 +292,44 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { 1000.0) * (double)packetsProductionRate_.load())); - if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { + if (interestSeg < currentSeg_.load() || + interestSeg > (max_gap + currentSeg_.load())) { sendNack(interestSeg); } // else drop packet } -void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ - interests_cache_timer_->expires_from_now( - std::chrono::milliseconds(wait)); +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) { + interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestCacheTimer(); - }); + if (ec) return; + interestCacheTimer(); + }); } -void RTCProducerSocket::interestCacheTimer(){ +void RTCProducerSocket::interestCacheTimer() { uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { uint64_t expire = it_timers->first; - if(expire <= now){ + if (expire <= now) { uint32_t seq = it_timers->second; sendNack(seq); - //remove the interest from the other map + // remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); - }else{ - //stop, we are done! + } else { + // stop, we are done! break; } } - if(timers_map_.empty()){ + if (timers_map_.empty()) { timer_on_ = false; - }else{ + } else { timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -317,14 +344,14 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { nack.setName(flowName_.setSuffix(sequence)); uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); - *payload_ptr = currentSeg_; + *payload_ptr = currentSeg_.load(); *(++payload_ptr) = bytesProductionRate_.load(); nack.setLifetime(0); nack.setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, nack); } diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index a2540ceef..37ba88d8a 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -47,7 +47,7 @@ class RTCProducerSocket : public ProducerSocket { void scheduleRoundTimer(); void interestCacheTimer(); - uint32_t currentSeg_; + std::atomic<uint32_t> currentSeg_; uint32_t prodLabel_; uint16_t headerSize_; Name flowName_; diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h index 90f6a3ef6..f0194880a 100644 --- a/libtransport/src/hicn/transport/interfaces/socket.h +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -27,8 +27,6 @@ #define SOCKET_OPTION_NOT_SET 3 #define SOCKET_OPTION_DEFAULT 12345 -#define VOID_HANDLER 0 - namespace transport { namespace interface { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index e1afd2161..14cd27b6b 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -45,7 +45,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) rate_estimation_alpha_(default_values::rate_alpha), rate_estimation_observer_(nullptr), rate_estimation_choice_(0), - is_async_(false), verifier_(std::make_shared<utils::Verifier>()), verify_signature_(false), on_interest_output_(VOID_HANDLER), @@ -58,8 +57,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) stats_summary_(VOID_HANDLER), read_callback_(nullptr), virtual_download_(false), - rtt_stats_(false), - timer_interval_milliseconds_(0) { + timer_interval_milliseconds_(0), + guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = std::make_unique<CbrTransportProtocol>(this); @@ -88,7 +87,6 @@ int ConsumerSocket::consume(const Name &name) { network_name_ = name; network_name_.setSuffix(0); - is_async_ = false; transport_protocol_->start(); @@ -100,7 +98,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { async_downloader_.add([this, name]() { network_name_ = std::move(name); network_name_.setSuffix(0); - is_async_ = true; transport_protocol_->start(); }); } @@ -108,20 +105,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { return CONSUMER_RUNNING; } -void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback) { - if (!async_downloader_.stopped()) { - // TODO Workaround, to be fixed! - auto i = interest.release(); - async_downloader_.add([this, i, callback]() mutable { - Interest::Ptr _interest(i); - portal_->setConsumerCallback(callback); - portal_->sendInterest(std::move(_interest)); - portal_->runEventsLoop(); - }); - } -} - void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -138,6 +121,702 @@ asio::io_service &ConsumerSocket::getIoService() { return portal_->getIoService(); } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ConsumerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ReadCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + ReadCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + double socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + break; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + break; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + break; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + break; + + case BETA_VALUE: + beta_ = socket_option_value; + break; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + break; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + break; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = default_values::alpha; + } + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = default_values::batch; + } + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = default_values::rate_choice; + } + break; + + case GeneralTransportOptions::STATS_INTERVAL: + timer_interval_milliseconds_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, std::nullptr_t socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + verify_signature_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerManifestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + on_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + rate_estimation_observer_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Verifier> &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + verifier_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: + return result; + } + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::CERTIFICATE: + key_id_ = verifier_->addKeyFromCertificate(socket_option_value); + + if (key_id_ != nullptr) { + result = SOCKET_OPTION_SET; + } + break; + + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + stats_summary_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + double &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + break; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + break; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + break; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + break; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + break; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + break; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + break; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + break; + + case GeneralTransportOptions::STATS_INTERVAL: + socket_option_value = timer_interval_milliseconds_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::RUNNING: + socket_option_value = transport_protocol_->isRunning(); + break; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + socket_option_value = verify_signature_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + Name **socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + *socket_option_value = &network_name_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + *socket_option_value = &on_content_object_input_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + *socket_option_value = &on_content_object_verification_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + *socket_option_value = &on_interest_retransmission_; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + *socket_option_value = &on_interest_output_; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + *socket_option_value = &on_interest_timeout_; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + *socket_option_value = &on_interest_satisfied_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerManifestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + *socket_option_value = &on_manifest_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + socket_option_value = verifier_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerTimerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + *socket_option_value = &stats_summary_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + } // namespace interface } // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 8f7a9718c..e3620b269 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket { int asyncConsume(const Name &name); /** - * Send an interest asynchronously in another thread, which is the same used - * for asyncConsume. - * - * @param interest - An Interest::Ptr to the interest. Notice that the - * application looses the ownership of the interest, which is transferred to - * the library itself. - * @param callback - A ConsumerCallback containing the events to be trigger in - * case of timeout or content reception. - * - */ - void asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback); - - /** * Stops the consumer socket. If several downloads are queued (using * asyncConsume), this call stops just the current one. */ @@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket { */ asio::io_service &getIoService() override; - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ReadCallback *socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ReadCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - *socket_option_value = read_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - double socket_option_value) { - switch (socket_option_key) { - case MIN_WINDOW_SIZE: - min_window_size_ = socket_option_value; - break; - - case MAX_WINDOW_SIZE: - max_window_size_ = socket_option_value; - break; - - case CURRENT_WINDOW_SIZE: - current_window_size_ = socket_option_value; - break; - - case GAMMA_VALUE: - gamma_ = socket_option_value; - break; - - case BETA_VALUE: - beta_ = socket_option_value; - break; - - case DROP_FACTOR: - drop_factor_ = socket_option_value; - break; - - case MINIMUM_DROP_PROBABILITY: - minimum_drop_probability_ = socket_option_value; - break; - - case RATE_ESTIMATION_ALPHA: - if (socket_option_value >= 0 && socket_option_value < 1) { - rate_estimation_alpha_ = socket_option_value; - } else { - rate_estimation_alpha_ = default_values::alpha; - } - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - input_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - max_retransmissions_ = socket_option_value; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - interest_lifetime_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - if (socket_option_value == VOID_HANDLER) { - on_interest_retransmission_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - if (socket_option_value == VOID_HANDLER) { - on_interest_timeout_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_output_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_input_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - if (socket_option_value > 0) { - rate_estimation_batching_parameter_ = socket_option_value; - } else { - rate_estimation_batching_parameter_ = default_values::batch; - } - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - if (socket_option_value > 0) { - rate_estimation_choice_ = socket_option_value; - } else { - rate_estimation_choice_ = default_values::rate_choice; - } - break; - - case GeneralTransportOptions::STATS_INTERVAL: - timer_interval_milliseconds_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - break; - - case RaaqmTransportOptions::RTT_STATS: - rtt_stats_ = socket_option_value; - break; - - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - network_name_ = *socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ConsumerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - on_content_object_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ReadCallback *socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ReadCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + double socket_option_value); - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); + + virtual int setSocketOption(int socket_option_key, bool socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + virtual int setSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - on_interest_retransmission_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - on_interest_output_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - on_interest_timeout_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - on_interest_satisfied_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerManifestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - on_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, IcnObserver *socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - rate_estimation_observer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + ConsumerContentObjectVerificationCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value); + + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - verifier_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - break; - } - - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerTimerCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - stats_summary_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - double &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MIN_WINDOW_SIZE: - socket_option_value = min_window_size_; - break; - - case GeneralTransportOptions::MAX_WINDOW_SIZE: - socket_option_value = max_window_size_; - break; - - case GeneralTransportOptions::CURRENT_WINDOW_SIZE: - socket_option_value = current_window_size_; - break; - - // RAAQM parameters - - case RaaqmTransportOptions::GAMMA_VALUE: - socket_option_value = gamma_; - break; - - case RaaqmTransportOptions::BETA_VALUE: - socket_option_value = beta_; - break; - - case RaaqmTransportOptions::DROP_FACTOR: - socket_option_value = drop_factor_; - break; - - case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: - socket_option_value = minimum_drop_probability_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_ALPHA: - socket_option_value = rate_estimation_alpha_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_size_; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_size_; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - socket_option_value = max_retransmissions_; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - socket_option_value = interest_lifetime_; - break; - - case RaaqmTransportOptions::SAMPLE_NUMBER: - socket_option_value = sample_number_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - socket_option_value = rate_estimation_batching_parameter_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - socket_option_value = rate_estimation_choice_; - break; - - case GeneralTransportOptions::STATS_INTERVAL: - socket_option_value = timer_interval_milliseconds_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::ASYNC_MODE: - socket_option_value = is_async_; - break; + const std::shared_ptr<utils::Verifier> &socket_option_value); - case GeneralTransportOptions::RUNNING: - socket_option_value = transport_protocol_->isRunning(); - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - - case RaaqmTransportOptions::RTT_STATS: - socket_option_value = rtt_stats_; - break; + virtual int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value); - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; + virtual int getSocketOption(int socket_option_key, + double &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - Name **socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - *socket_option_value = &network_name_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - ConsumerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - *socket_option_value = &on_content_object_input_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + Name **socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - *socket_option_value = &on_interest_retransmission_; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - *socket_option_value = &on_interest_output_; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - *socket_option_value = &on_interest_timeout_; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - *socket_option_value = &on_interest_satisfied_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerManifestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - *socket_option_value = &on_manifest_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, IcnObserver **socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + ConsumerContentObjectVerificationCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerInterestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerManifestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value); + + virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - socket_option_value = verifier_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerTimerCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - *socket_option_value = &stats_summary_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + std::shared_ptr<utils::Verifier> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerTimerCallback **socket_option_value); + + protected: + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); private: asio::io_service internal_io_service_; @@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket { std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; + // No need to protect from multiple accesses in the async consumer + // The parameter is accessible only with a getSocketOption and + // set from the consume Name network_name_; int interest_lifetime_; @@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket { double max_window_size_; double current_window_size_; uint32_t max_retransmissions_; - size_t output_buffer_size_; - size_t input_buffer_size_; // RAAQM Parameters double minimum_drop_probability_; @@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters std::shared_ptr<utils::Verifier> verifier_; PARCKeyId *key_id_; - bool verify_signature_; + std::atomic_bool verify_signature_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; @@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket { // Virtual download for traffic generator bool virtual_download_; - bool rtt_stats_; uint32_t timer_interval_milliseconds_; // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + + utils::SpinLock guard_raaqm_params_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h index 13029e83a..bcf103b8c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h @@ -31,8 +31,6 @@ static constexpr uint32_t content_object_expiry_time = never_expire_time; // milliseconds -> 50 seconds static constexpr uint32_t content_object_packet_size = 1500; // The ethernet MTU -static constexpr uint32_t producer_socket_input_buffer_size = - 150000; // Interests static constexpr uint32_t producer_socket_output_buffer_size = 150000; // Content Object static constexpr uint32_t log_2_default_buffer_size = 12; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index c21108186..e14f0f412 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -35,22 +35,21 @@ typedef enum { INTEREST_LIFETIME = 107, CONTENT_OBJECT_EXPIRY_TIME = 108, KEY_LOCATOR = 110, - SIGNATURE_TYPE = 111, - MIN_WINDOW_SIZE = 112, - MAX_WINDOW_SIZE = 113, - CURRENT_WINDOW_SIZE = 114, - ASYNC_MODE = 115, - MAKE_MANIFEST = 116, - PORTAL = 117, - RUNNING = 118, - APPLICATION_BUFFER = 119, - HASH_ALGORITHM = 120, - CRYPTO_SUITE = 121, - IDENTITY = 122, - VERIFIER = 123, - CERTIFICATE = 124, - VERIFY_SIGNATURE = 125, - STATS_INTERVAL = 126 + MIN_WINDOW_SIZE = 111, + MAX_WINDOW_SIZE = 112, + CURRENT_WINDOW_SIZE = 113, + ASYNC_MODE = 114, + MAKE_MANIFEST = 115, + PORTAL = 116, + RUNNING = 117, + APPLICATION_BUFFER = 118, + HASH_ALGORITHM = 119, + CRYPTO_SUITE = 120, + IDENTITY = 121, + VERIFIER = 122, + CERTIFICATE = 123, + VERIFY_SIGNATURE = 124, + STATS_INTERVAL = 125 } GeneralTransportOptions; typedef enum { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 9ca004c41..f90197490 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -37,10 +37,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) output_buffer_(default_values::producer_socket_output_buffer_size), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), - input_buffer_capacity_(default_values::producer_socket_input_buffer_size), - input_buffer_size_(0), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -98,22 +95,30 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr<ContentObject> &content_object) { if (content_object) { - if (on_new_segment_ != VOID_HANDLER) { - on_new_segment_(*this, *content_object); + if (on_new_segment_) { + io_service_.dispatch([this, content_object]() { + on_new_segment_(*this, *content_object); + }); } - if (on_content_object_to_sign_ != VOID_HANDLER) { - on_content_object_to_sign_(*this, *content_object); + if (on_content_object_to_sign_) { + io_service_.dispatch([this, content_object]() { + on_content_object_to_sign_(*this, *content_object); + }); } - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, *content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, content_object]() { + on_content_object_in_output_buffer_(*this, *content_object); + }); } output_buffer_.insert(content_object); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, *content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, content_object]() { + on_content_object_output_(*this, *content_object); + }); } portal_->sendContentObject(*content_object); @@ -121,15 +126,19 @@ void ProducerSocket::passContentObjectToCallbacks( } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_in_output_buffer_(*this, content_object); + }); } output_buffer_.insert(std::static_pointer_cast<ContentObject>( content_object.shared_from_this())); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_output_(*this, content_object); + }); } portal_->sendContentObject(content_object); @@ -142,6 +151,15 @@ uint32_t ProducerSocket::produce(Name content_name, return 0; } + // Copy the atomic variable to ensure always the same value during the a + // production + std::size_t data_packet_size = data_packet_size_; + uint32_t content_object_expiry_time = content_object_expiry_time_; + HashAlgorithm algo = hash_algorithm_; + bool making_manifest = making_manifest_; + std::shared_ptr<utils::Identity> identity; + getSocketOption(GeneralTransportOptions::IDENTITY, identity); + auto buffer_size = buffer->length(); const std::size_t hash_size = 32; @@ -162,7 +180,7 @@ uint32_t ProducerSocket::produce(Name content_name, std::unique_ptr<utils::CryptoHash> zero_hash; // TODO Manifest may still be used for indexing - if (making_manifest_ && !identity_) { + if (making_manifest && !identity) { throw errors::RuntimeException( "Making manifests without setting producer identity. Aborting."); } @@ -182,18 +200,18 @@ uint32_t ProducerSocket::produce(Name content_name, } format = hf_format; - if (making_manifest_) { + if (making_manifest) { format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity_->getSignatureLength()); - } else if (identity_) { + hf_format_ah, identity->getSignatureLength()); + } else if (identity) { format = hf_format_ah; - signature_length = identity_->getSignatureLength(); + signature_length = identity->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size_ - header_size; + free_space_for_content = data_packet_size - header_size; uint32_t number_of_segments = uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); @@ -204,9 +222,9 @@ uint32_t ProducerSocket::produce(Name content_name, // TODO allocate space for all the headers - if (making_manifest_) { + if (making_manifest) { auto segment_in_manifest = static_cast<float>( - std::floor(double(data_packet_size_ - manifest_header_size - + std::floor(double(data_packet_size - manifest_header_size - ContentObjectManifest::getManifestHeaderSize()) / (4.0 + 32.0)) - 1.0); @@ -219,8 +237,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); @@ -231,21 +249,21 @@ uint32_t ProducerSocket::produce(Name content_name, uint8_t hash[hash_size]; std::memset(hash, 0, hash_size); zero_hash = std::make_unique<utils::CryptoHash>( - hash, hash_size, static_cast<utils::CryptoHashType>(hash_algorithm_)); + hash, hash_size, static_cast<utils::CryptoHashType>(algo)); } for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest_) { + if (making_manifest) { if (manifest->estimateManifestSize(2) > - data_packet_size_ - manifest_header_size) { + data_packet_size - manifest_header_size) { // Add next manifest manifest->addSuffixHash(current_segment, *zero_hash); // Send the current manifest manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); @@ -258,8 +276,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { @@ -271,7 +289,7 @@ uint32_t ProducerSocket::produce(Name content_name, auto content_object = std::make_shared<ContentObject>( content_name.setSuffix(current_segment), format); - content_object->setLifetime(content_object_expiry_time_); + content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); b->trimStart(free_space_for_content * packaged_segments); @@ -280,7 +298,7 @@ uint32_t ProducerSocket::produce(Name content_name, b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); - if (is_last && making_manifest_) { + if (is_last && making_manifest) { is_last_manifest = true; } else if (is_last) { content_object->setRst(); @@ -293,19 +311,19 @@ uint32_t ProducerSocket::produce(Name content_name, content_object->appendPayload(std::move(b)); - if (making_manifest_) { + if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); manifest->addSuffixHash(current_segment, hash); - } else if (identity_) { - identity_->getSigner().sign(*content_object); + } else if (identity) { + identity->getSigner().sign(*content_object); } current_segment++; passContentObjectToCallbacks(content_object); } - if (making_manifest_) { + if (making_manifest) { if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } @@ -315,13 +333,15 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } - if (on_content_produced_ != VOID_HANDLER) { - on_content_produced_(*this, std::make_error_code(std::errc(0)), - buffer_size); + if (on_content_produced_) { + io_service_.dispatch([this, buffer_size]() { + on_content_produced_(*this, std::make_error_code(std::errc(0)), + buffer_size); + }); } return current_segment - start_offset; @@ -347,7 +367,7 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } void ProducerSocket::onInterest(Interest &interest) { - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, interest); } @@ -355,22 +375,571 @@ void ProducerSocket::onInterest(Interest &interest) { output_buffer_.find(interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, interest); } } } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOServiceWithReference( + int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2 &)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + std::unique_lock<std::mutex> lck(mtx); + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + + if (!done) { + cv.wait(lck); + } + }); + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + break; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + identity_.reset(); + identity_ = socket_option_value; + } break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = (uint32_t)output_buffer_.getLimit(); + break; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = (uint32_t)data_packet_size_; + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = served_namespaces_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + ProducerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + *socket_option_value = &on_new_segment_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + *socket_option_value = &on_content_object_in_output_buffer_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + *socket_option_value = &on_content_object_output_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + *socket_option_value = &on_interest_input_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + *socket_option_value = &on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + *socket_option_value = &on_interest_inserted_input_buffer_; + break; + + case CACHE_HIT: + *socket_option_value = &on_interest_satisfied_output_buffer_; + break; + + case CACHE_MISS: + *socket_option_value = &on_interest_process_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + socket_option_value = identity_; + } break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 18adbf4a7..5c617d761 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -91,500 +91,116 @@ class ProducerSocket : public Socket<BasePortal>, onInterest(*interest); }; - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::DATA_PACKET_SIZE: - if (socket_option_value < default_values::max_content_object_size && - socket_option_value > 0) { - data_packet_size_ = socket_option_value; - break; - } - - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - if (socket_option_value >= 1) { - input_buffer_capacity_ = socket_option_value; - break; - } - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); - break; - - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - content_object_expiry_time_ = socket_option_value; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - if (socket_option_value == SOCKET_OPTION_DEFAULT) { - signature_type_ = SHA_256; - } else { - signature_type_ = socket_option_value; - } - - if (signature_type_ == SHA_256 || signature_type_ == RSA_256) { - signature_size_ = 32; - } - - break; - - case ProducerCallbacksOptions::INTEREST_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_input_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_DROP: - if (socket_option_value == VOID_HANDLER) { - on_interest_dropped_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_PASS: - if (socket_option_value == VOID_HANDLER) { - on_interest_inserted_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_HIT: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_MISS: - if (socket_option_value == VOID_HANDLER) { - on_interest_process_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - if (socket_option_value == VOID_HANDLER) { - on_new_segment_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_in_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_output_ = VOID_HANDLER; - break; - } - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - return SOCKET_OPTION_NOT_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ProducerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - on_new_segment_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - on_content_object_in_output_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - on_content_object_output_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, bool socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, Name *socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - on_interest_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value); - case ProducerCallbacksOptions::INTEREST_DROP: - on_interest_dropped_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value); - case ProducerCallbacksOptions::INTEREST_PASS: - on_interest_inserted_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value); - case ProducerCallbacksOptions::CACHE_HIT: - on_interest_satisfied_output_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - on_interest_process_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerContentCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - on_content_produced_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value); - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, HashAlgorithm socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - hash_algorithm_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, utils::CryptoSuite socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - identity_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - ; - } + const std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_capacity_; - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); - break; + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - case GeneralTransportOptions::DATA_PACKET_SIZE: - socket_option_value = (uint32_t)data_packet_size_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - socket_option_value = content_object_expiry_time_; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - socket_option_value = signature_type_; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ProducerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - *socket_option_value = &on_new_segment_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - *socket_option_value = &on_content_object_in_output_buffer_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - *socket_option_value = &on_content_object_output_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - *socket_option_value = &on_content_produced_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - *socket_option_value = &on_interest_input_; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - *socket_option_value = &on_interest_dropped_input_buffer_; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - *socket_option_value = &on_interest_inserted_input_buffer_; - break; + ProducerContentObjectCallback **socket_option_value); - case CACHE_HIT: - *socket_option_value = &on_interest_satisfied_output_buffer_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerContentCallback **socket_option_value); - case CACHE_MISS: - *socket_option_value = &on_interest_process_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerInterestCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, HashAlgorithm &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = hash_algorithm_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, utils::CryptoSuite &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - if (identity_) { - socket_option_value = identity_; - break; - } - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func); - private: + protected: // Threads std::thread listening_thread_; - protected: asio::io_service internal_io_service_; asio::io_service &io_service_; std::shared_ptr<Portal> portal_; - std::size_t data_packet_size_; - std::list<Prefix> served_namespaces_; - uint32_t content_object_expiry_time_; + std::atomic<size_t> data_packet_size_; + std::list<Prefix> + served_namespaces_; // No need to be threadsafe, this is always modified + // by the application thread + std::atomic<uint32_t> content_object_expiry_time_; // buffers + // ContentStore is thread-safe utils::ContentStore output_buffer_; - private: utils::EventThread async_thread_; int registration_status_; - bool making_manifest_; + std::atomic<bool> making_manifest_; // map for storing sequence numbers for several calls of the publish // function std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - int signature_type_; - int signature_size_; - - HashAlgorithm hash_algorithm_; - utils::CryptoSuite crypto_suite_; + std::atomic<HashAlgorithm> hash_algorithm_; + std::atomic<utils::CryptoSuite> crypto_suite_; + utils::SpinLock identity_lock_; std::shared_ptr<utils::Identity> identity_; - // buffers - - std::queue<std::shared_ptr<const Interest>> input_buffer_; - std::atomic_size_t input_buffer_capacity_; - std::atomic_size_t input_buffer_size_; - // callbacks - protected: ProducerInterestCallback on_interest_input_; ProducerInterestCallback on_interest_dropped_input_buffer_; ProducerInterestCallback on_interest_inserted_input_buffer_; diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc index 9caa2eca7..8da9529d6 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -23,23 +23,28 @@ namespace protocol { using namespace interface; TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket) - : socket_(icn_socket), is_running_(false) { + : socket_(icn_socket), is_running_(false), is_first_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); } int TransportProtocol::start() { - // If the protocol is already running, return + // If the protocol is already running, return otherwise set as running if (is_running_) return -1; - // Set the protocol as running - is_running_ = true; - // Reset the protocol state machine reset(); + // Set it is the first time we schedule an interest + is_first_ = true; + // Schedule next interests scheduleNextInterests(); + is_first_ = false; + + // Set the protocol as running + is_running_ = true; + // Start Event loop portal_->runEventsLoop(); diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index 88889bb8c..e4821b6a0 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -15,6 +15,8 @@ #pragma once +#include <atomic> + #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/protocols/packet_manager.h> #include <hicn/transport/protocols/statistics.h> @@ -60,7 +62,9 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, protected: interface::ConsumerSocket *socket_; std::shared_ptr<interface::BasePortal> portal_; - volatile bool is_running_; + std::atomic<bool> is_running_; + // True if it si the first time we schedule an interest + std::atomic<bool> is_first_; TransportStatistics stats_; }; diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 574693c51..c816158f9 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -318,17 +318,17 @@ void RaaqmTransportProtocol::onContentObject( } // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = nullptr; + ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, &callback_content_object); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_content_object)(*socket_, *content_object); } - ConsumerInterestCallback *callback_interest = nullptr; + ConsumerInterestCallback *callback_interest = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, &callback_interest); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_interest)(*socket_, *interest); } @@ -369,10 +369,10 @@ void RaaqmTransportProtocol::onContentSegment( reassemble(std::move(content_object)); } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == index_manager_->getFinalSuffix())) { - interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload != nullptr) { + if (on_payload) { on_payload->readSuccess(stats_.getBytesRecv()); } } @@ -404,10 +404,10 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { return; } - ConsumerInterestCallback *callback = nullptr; + ConsumerInterestCallback *callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } @@ -420,17 +420,17 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { max_rtx)) { stats_.updateRetxCount(1); - callback = nullptr; + callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } - callback = nullptr; + callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &callback); - if ((*callback) != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } @@ -450,7 +450,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -490,14 +490,14 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_lifetime); interest->setLifetime(interest_lifetime); - ConsumerInterestCallback *callback = nullptr; + ConsumerInterestCallback *callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { callback->operator()(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -516,10 +516,10 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload == nullptr) { + if (on_payload) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " @@ -581,10 +581,10 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, stats_.updateAverageWindowSize(current_window_size_); // Call statistics callback - ConsumerTimerCallback *stats_callback = nullptr; + ConsumerTimerCallback *stats_callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, &stats_callback); - if (*stats_callback != VOID_HANDLER) { + if (*stats_callback) { auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); uint32_t timer_interval_milliseconds = 0; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index accd98495..e6134f767 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -87,11 +87,14 @@ void RTCTransportProtocol::reset() { interestRetransmissions_.clear(); lastSegNacked_ = 0; lastReceived_ = 0; + lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); highestReceived_ = 0; firstSequenceInRound_ = 0; rtx_timer_used_ = false; - for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ + for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { inflightInterests_[i] = {0}; } @@ -188,14 +191,13 @@ void RTCTransportProtocol::updateDelayStats( pathTable_[pathLabel]->insertOwdSample(OWD); pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); - }else{ + } else { pathTable_[pathLabel]->receivedNack(); } } void RTCTransportProtocol::updateStats(uint32_t round_duration) { - if(pathTable_.empty()) - return; + if (pathTable_.empty()) return; if (receivedBytes_ != 0) { double bytesPerSec = @@ -210,68 +212,70 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { it->second->roundEnd(); - if(it->second->isActive()){ - if(it->second->getMinRtt() < minRtt){ + if (it->second->isActive()) { + if (it->second->getMinRtt() < minRtt) { minRtt = it->second->getMinRtt(); producerPathLabels_[0] = it->first; } - if(it->second->getMinRtt() > maxRtt){ + if (it->second->getMinRtt() > maxRtt) { maxRtt = it->second->getMinRtt(); producerPathLabels_[1] = it->first; } } } - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) - return; //this should not happen + return; // this should not happen - //as a queuing delay we keep the lowest one among the two paths - //if one path is congested the forwarder should decide to do not - //use it soon so it does not make sens to inform the application - //that maybe we have a problem - if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + // as a queuing delay we keep the lowest one among the two paths + // if one path is congested the forwarder should decide to do not + // use it so it does not make sense to inform the application + // that maybe we have a problem + if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < pathTable_[producerPathLabels_[1]]->getQueuingDealy()) - queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); else - queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_; + uint32_t numberTheoricallyReceivedPackets_ = + highestReceived_ - firstSequenceInRound_; double lossRate = 0; - if(numberTheoricallyReceivedPackets_ != 0) - lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_); + if (numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / + (double)numberTheoricallyReceivedPackets_); - if(lossRate < 0) - lossRate = 0; + if (lossRate < 0) lossRate = 0; - if(initied){ + if (initied) { lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + - (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); - }else { - lossRate_ =lossRate; + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + } else { + lossRate_ = lossRate; initied = true; } } if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; - //for the BDP we use the max rtt, so that we calibrate the window on the - //RTT of the slowest path. In this way we are sure that the window will - //never be too small + // for the BDP we use the max rtt, so that we calibrate the window on the + // RTT of the slowest path. In this way we are sure that the window will + // never be too small uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() / - (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); + (estimatedBw_ * + (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); ConsumerTimerCallback *stats_callback = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, &stats_callback); - if (*stats_callback != VOID_HANDLER) { - //Send the stats to the app + if (*stats_callback) { + // Send the stats to the app stats_.updateQueuingDelay(queuingDelay_); stats_.updateLossRatio(lossRate_); (*stats_callback)(*socket_, stats_); @@ -331,8 +335,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, // currentState = RTC_NORMAL_STATE if (BDPWin != 0) { - maxCWin_ = - (uint32_t)ceil((double)BDPWin + ((double)BDPWin / 10.0)); // BDP + 10% + maxCWin_ = (uint32_t)ceil((double)BDPWin + + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% } else { maxCWin_ = min(maxWaintingInterest, maxCWin_); } @@ -377,22 +381,22 @@ void RTCTransportProtocol::increaseWindow() { } } -void RTCTransportProtocol::probeRtt(){ +void RTCTransportProtocol::probeRtt() { time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - //get a random numbe in the probe seq range + &interest_name); + // get a random numbe in the probe seq range std::default_random_engine eng((std::random_device())()); - std::uniform_int_distribution<uint32_t> idis( - HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ); + std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ, + HICN_MAX_PROBE_SEQ); probe_seq_number_ = idis(eng); interest_name->setSuffix(probe_seq_number_); - //we considere the probe as a rtx so that we do not incresea inFlightInt + // we considere the probe as a rtx so that we do not incresea inFlightInt received_probe_ = false; sendInterest(interest_name, true); @@ -403,7 +407,6 @@ void RTCTransportProtocol::probeRtt(){ }); } - void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -418,11 +421,11 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &on_interest_output); - if (*on_interest_output != VOID_HANDLER) { + if (*on_interest_output) { (*on_interest_output)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -437,20 +440,20 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { void RTCTransportProtocol::scheduleNextInterests() { checkRound(); - if (!is_running_) return; + if (!is_running_ && !is_first_) return; while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - interest_name->setSuffix(actualSegment_); + interest_name->setSuffix(actualSegment_); // if the producer socket is not stated (does not reply even with nacks) // we keep asking for something without marking anything as lost (see // timeout). In this way when the producer socket will start the - //consumer socket will not miss any packet - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + // consumer socket will not miss any packet + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { uint32_t pkt = actualSegment_ & modMask_; inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; @@ -474,9 +477,9 @@ void RTCTransportProtocol::scheduleNextInterests() { continue; } - //same if the packet is lost + // same if the packet is lost if (inflightInterests_[pkt].state == lost_ && - inflightInterests_[pkt].sequence == actualSegment_){ + inflightInterests_[pkt].sequence == actualSegment_) { actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -486,7 +489,7 @@ void RTCTransportProtocol::scheduleNextInterests() { std::chrono::steady_clock::now().time_since_epoch()) .count(); - //here the packet can be in any state except for lost or recevied + // here the packet can be in any state except for lost or recevied inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; @@ -503,30 +506,40 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) { void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + bool new_rtx = false; for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); if (it == interestRetransmissions_.end()) { - if (lastSegNacked_ <= i) { + uint32_t pkt = i & modMask_; + if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { // it must be larger than the last past nack received packetLost_++; interestRetransmissions_[i] = 0; uint32_t pkt = i & modMask_; - //we reset the transmission time setting to now, so that rtx will - //happne in one RTT on waint one inter arrival gap + // we reset the transmission time setting to now, so that rtx will + // happne in one RTT on waint one inter arrival gap inflightInterests_[pkt].transmissionTime = now; + new_rtx = true; } } // if the retransmission is already there the rtx timer will // take care of it } - if(!rtx_timer_used_) + // in case a new rtx is added to the map we need to run checkRtx() + if (new_rtx) { + if (rtx_timer_used_) { + // if a timer is pending we need to delete it + rtx_timer_->cancel(); + rtx_timer_used_ = false; + } checkRtx(); + } } -void RTCTransportProtocol::retransmit() { +uint64_t RTCTransportProtocol::retransmit() { auto it = interestRetransmissions_.begin(); // cut len to max HICN_MAX_RTX_SIZE @@ -537,6 +550,10 @@ void RTCTransportProtocol::retransmit() { } it = interestRetransmissions_.begin(); + uint64_t smallest_timeout = ULONG_MAX; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; @@ -560,65 +577,75 @@ void RTCTransportProtocol::retransmit() { continue; } - - uint64_t sent_time = inflightInterests_[pkt].transmissionTime; - uint64_t rtx_time = sent_time; - - if(it->second == 0){ - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ - //first rtx: wait RTTmax - RTTmin + gap - rtx_time = sent_time + pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt() + - pathTable_[producerPathLabels_[1]]->getInterArrivalGap(); + uint64_t rtx_time = now; + + if (it->second == 0) { + // first rtx + if (producerPathLabels_[0] != producerPathLabels_[1]) { + // multipath + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer, send it immediatly + } else { + // single path + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer send immediatly } - }else{ - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ - //second+ rtx: waint min rtt + } else { + // second or plus rtx, wait for the min rtt + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { + uint64_t sent_time = inflightInterests_[pkt].transmissionTime; rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); - } + } // if we don't have info we send it immediatly } - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if(now >= rtx_time){ + if (now >= rtx_time) { inflightInterests_[pkt].transmissionTime = now; it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); + } else if (rtx_time < smallest_timeout) { + smallest_timeout = rtx_time; } ++it; } + return smallest_timeout; } void RTCTransportProtocol::checkRtx() { - if(interestRetransmissions_.empty()){ + if (interestRetransmissions_.empty()) { rtx_timer_used_ = false; return; } - //we use the packet intearriva time on the fastest path - //even if this stats should be the same on both - auto pathStats = pathTable_.find(producerPathLabels_[0]); + uint64_t next_timeout = retransmit(); uint64_t wait = 1; - if(pathStats != pathTable_.end()){ - wait = floor(pathStats->second->getInterArrivalGap() / 2.0); - if(wait < 1) - wait = 1; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (next_timeout != ULONG_MAX && now < next_timeout) { + wait = next_timeout - now; } - rtx_timer_used_ = true; - retransmit(); rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); rtx_timer_->async_wait([this](std::error_code ec) { if (ec) return; + rtx_timer_used_ = false; checkRtx(); }); } @@ -628,14 +655,14 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint32_t segmentNumber = interest->getName().getSuffix(); - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { // this is a timeout on a probe, do nothing return; } uint32_t pkt = segmentNumber & modMask_; - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { inflightInterestsCount_--; // we do nothing, and we keep asking the same stuff over // and over until we get at least a packet @@ -670,7 +697,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } -bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { +bool RTCTransportProtocol::onNack(const ContentObject &content_object, + bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); @@ -678,7 +706,13 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) bool old_nack = false; - if(!rtx){ + // if we did not received anything between lastReceived_ + 1 and productionSeg + // most likelly some packets got lost + if (lastReceived_ != 0) { + addRetransmissions(lastReceived_ + 1, productionSeg); + } + + if (!rtx) { gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; @@ -688,7 +722,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) // we are asking for stuff produced in the past actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; - if(!rtx) { + if (!rtx) { if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -697,23 +731,13 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) increaseWindow(); } - //we need to remove the rtx for packets with seq number - //< productionSeg - for(auto it = interestRetransmissions_.begin(); it != - interestRetransmissions_.end();){ - if(it->first < productionSeg) - it = interestRetransmissions_.erase(it); - else - ++it; - } - lastSegNacked_ = productionSeg; old_nack = true; } else if (productionSeg < nackSegment) { actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; - if(!rtx){ + if (!rtx) { // we are asking stuff in the future gotFutureNack_++; computeMaxWindow(productionRate, 0); @@ -724,8 +748,8 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) } } } else { - //we are asking the right thing, but the producer is slow - //keep doing the same until the packet is produced + // we are asking the right thing, but the producer is slow + // keep doing the same until the packet is produced actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; } @@ -734,7 +758,6 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // as soon as we get a packet firstPckReceived_ will never be false firstPckReceived_ = true; @@ -746,24 +769,25 @@ void RTCTransportProtocol::onContentObject( ConsumerContentObjectCallback *callback_content_object = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, &callback_content_object); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_content_object)(*socket_, *content_object); } - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ - if(segmentNumber == probe_seq_number_ && !received_probe_){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + if (segmentNumber == probe_seq_number_ && !received_probe_) { received_probe_ = true; uint32_t pathLabel = content_object->getPathLabel(); - if (pathTable_.find(pathLabel) == pathTable_.end()){ - //if this path does not exists we cannot create a new one so drop + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // if this path does not exists we cannot create a new one so drop return; } - //this is the expected probe, update the RTT and drop the packet + // this is the expected probe, update the RTT and drop the packet uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - time_sent_probe_; + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + time_sent_probe_; pathTable_[pathLabel]->insertRttSample(RTT); pathTable_[pathLabel]->receivedNack(); @@ -779,27 +803,33 @@ void RTCTransportProtocol::onContentObject( bool old_nack = false; if (interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ - //this is not a retransmitted packet + interestRetransmissions_.end()) { + // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); } else { old_nack = onNack(*content_object, true); } - //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ - //multiple times. In fact, every time that we receive an event related to an - //interest (timeout, nacked, content) we cange the state. In this way we are - //sure that we do not decrease twice the counter - if(old_nack) + // the nacked_ state is used only to avoid to decrease + // inflightInterestsCount_ multiple times. In fact, every time that we + // receive an event related to an interest (timeout, nacked, content) we + // cange the state. In this way we are sure that we do not decrease twice the + // counter + if (old_nack) { inflightInterests_[pkt].state = lost_; - else + interestRetransmissions_.erase(segmentNumber); + } else { inflightInterests_[pkt].state = nacked_; + } } else { avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); + receivedBytes_ += (uint32_t)(content_object->headerSize() + + content_object->payloadSize()); + if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; // packet sent without timeouts } @@ -807,33 +837,33 @@ void RTCTransportProtocol::onContentObject( if (inflightInterests_[pkt].state == sent_ && interestRetransmissions_.find(segmentNumber) == interestRetransmissions_.end()) { - // we count only non retransmitted data in order to take into accunt only - // the transmition rate of the producer - receivedBytes_ += (uint32_t)(content_object->headerSize() + - content_object->payloadSize()); + // delay stats are computed only for non retransmitted data updateDelayStats(*content_object); + } - addRetransmissions(lastReceived_ + 1, segmentNumber); - if(segmentNumber > highestReceived_) - highestReceived_ = segmentNumber; - // lastReceived_ is updated only for data packets received without RTX + addRetransmissions(lastReceived_ + 1, segmentNumber); + if (segmentNumber > highestReceived_) { + highestReceived_ = segmentNumber; + } + if (segmentNumber > lastReceived_) { lastReceived_ = segmentNumber; + lastReceivedTime_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); } - receivedData_++; inflightInterests_[pkt].state = received_; + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end()) lossRecovered_++; + + interestRetransmissions_.erase(segmentNumber); + reassemble(std::move(content_object)); increaseWindow(); } - // in any case we remove the packet from the rtx list - auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end()) - lossRecovered_ ++; - - interestRetransmissions_.erase(segmentNumber); - scheduleNextInterests(); } diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 509f11361..7927e3969 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -49,6 +49,7 @@ #define HICN_MAX_RTX_SIZE 1024 #define HICN_MAX_RTX_MAX_AGE 10000 #define HICN_MIN_RTT_WIN 30 // rounds +#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms // cwin #define HICN_INITIAL_CWIN 1 // packets @@ -124,7 +125,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void scheduleNextInterests() override; void addRetransmissions(uint32_t val); void addRetransmissions(uint32_t start, uint32_t stop); - void retransmit(); + uint64_t retransmit(); void checkRtx(); void probeRtt(); void onTimeout(Interest::Ptr &&interest) override; @@ -159,6 +160,8 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { //for samething that is older than this value. uint32_t lastReceived_; //segment of the last content object received //indicates the base of the window on the client + uint64_t lastReceivedTime_; //time at which we recevied the + //lastReceived_ packet //rtt probes //the RTC transport tends to overestimate the RTT diff --git a/libtransport/src/hicn/transport/utils/content_store.cc b/libtransport/src/hicn/transport/utils/content_store.cc index 1e6b9fcea..8e3435507 100644 --- a/libtransport/src/hicn/transport/utils/content_store.cc +++ b/libtransport/src/hicn/transport/utils/content_store.cc @@ -85,12 +85,17 @@ void ContentStore::erase(const Name &exact_name) { } void ContentStore::setLimit(size_t max_packets) { + utils::SpinLock::Acquire locked(cs_mutex_); max_content_store_size_ = max_packets; } -std::size_t ContentStore::getLimit() const { return max_content_store_size_; } +std::size_t ContentStore::getLimit() const { + utils::SpinLock::Acquire locked(cs_mutex_); + return max_content_store_size_; +} std::size_t ContentStore::size() const { + utils::SpinLock::Acquire locked(cs_mutex_); return content_store_hash_table_.size(); } diff --git a/libtransport/src/hicn/transport/utils/content_store.h b/libtransport/src/hicn/transport/utils/content_store.h index a89403a01..f7dc41835 100644 --- a/libtransport/src/hicn/transport/utils/content_store.h +++ b/libtransport/src/hicn/transport/utils/content_store.h @@ -68,8 +68,9 @@ class ContentStore { ContentStoreHashTable content_store_hash_table_; FIFOList fifo_list_; std::shared_ptr<ContentObject> empty_reference_; - std::size_t max_content_store_size_; - utils::SpinLock cs_mutex_; + // Must be atomic + std::atomic_size_t max_content_store_size_; + mutable utils::SpinLock cs_mutex_; }; } // end namespace utils
\ No newline at end of file diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index ccb22779b..94e5d998d 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -47,7 +47,6 @@ namespace interface { #define MIN_PROBE_SEQ 0xefffffff - using CryptoSuite = utils::CryptoSuite; using Identity = utils::Identity; @@ -217,8 +216,8 @@ class HIperfClient { expected_seg_ = productionSeg; } else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) { std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg - << ". Next expected packet " << productionSeg << std::endl; - } else if (receivedSeg >= MIN_PROBE_SEQ){ + << ". Next expected packet " << productionSeg << std::endl; + } else if (receivedSeg >= MIN_PROBE_SEQ) { std::cout << "[PROBE] probe number = " << receivedSeg << std::endl; } return; @@ -259,9 +258,7 @@ class HIperfClient { void handleTimerExpiration(ConsumerSocket &c, const protocol::TransportStatistics &stats) { - - if (configuration_.rtc_) - return; + if (configuration_.rtc_) return; const char separator = ' '; const int width = 20; @@ -327,7 +324,9 @@ class HIperfClient { } consumer_socket_ = std::make_unique<ConsumerSocket>(transport_protocol); - consumer_socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, configuration_.interest_lifetime_); + consumer_socket_->setSocketOption( + GeneralTransportOptions::INTEREST_LIFETIME, + configuration_.interest_lifetime_); #if defined(DEBUG) && defined(__linux__) std::shared_ptr<transport::BasePortal> portal; @@ -944,8 +943,8 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:ItL:")) != - -1) { + while ((opt = getopt(argc, argv, + "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:ItL:")) != -1) { switch (opt) { // Common case 'D': { |