diff options
Diffstat (limited to 'libtransport/src/core')
27 files changed, 2979 insertions, 443 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt index 5c8ab9270..4e3ac10ec 100644 --- a/libtransport/src/core/CMakeLists.txt +++ b/libtransport/src/core/CMakeLists.txt @@ -21,55 +21,27 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format.h ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.h ${CMAKE_CURRENT_SOURCE_DIR}/portal.h - ${CMAKE_CURRENT_SOURCE_DIR}/connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h - ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h - ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.h + ${CMAKE_CURRENT_SOURCE_DIR}/local_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/rs.h ) list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc ${CMAKE_CURRENT_SOURCE_DIR}/name.cc ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc - ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc - ${CMAKE_CURRENT_SOURCE_DIR}/connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/portal.cc + ${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.cc + ${CMAKE_CURRENT_SOURCE_DIR}/io_module.cc + ${CMAKE_CURRENT_SOURCE_DIR}/local_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/fec.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rs.cc ) -if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") - if (BUILD_WITH_VPP OR BUILD_HICNPLUGIN) - list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h - ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.h - ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.h - ) - - list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.cc - ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.c - ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.c - ) - endif() - - list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_connector.h - ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_interface.h - ) - - list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_connector.cc - ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_interface.cc - ) -endif() - set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc index f5cccf404..0c68ef559 100644 --- a/libtransport/src/core/content_object.cc +++ b/libtransport/src/core/content_object.cc @@ -32,8 +32,9 @@ namespace transport { namespace core { -ContentObject::ContentObject(const Name &name, Packet::Format format) - : Packet(format) { +ContentObject::ContentObject(const Name &name, Packet::Format format, + std::size_t additional_header_size) + : Packet(format, additional_header_size) { if (TRANSPORT_EXPECT_FALSE( hicn_data_set_name(format, packet_start_, &name.name_) < 0)) { throw errors::RuntimeException("Error filling the packet name."); @@ -47,41 +48,32 @@ ContentObject::ContentObject(const Name &name, Packet::Format format) } #ifdef __ANDROID__ -ContentObject::ContentObject(hicn_format_t format) - : ContentObject(Name("0::0|0"), format) {} +ContentObject::ContentObject(hicn_format_t format, + std::size_t additional_header_size) + : ContentObject(Name("0::0|0"), format, additional_header_size) {} #else -ContentObject::ContentObject(hicn_format_t format) - : ContentObject(Packet::base_name, format) {} +ContentObject::ContentObject(hicn_format_t format, + std::size_t additional_header_size) + : ContentObject(Packet::base_name, format, additional_header_size) {} #endif ContentObject::ContentObject(const Name &name, hicn_format_t format, + std::size_t additional_header_size, const uint8_t *payload, std::size_t size) - : ContentObject(name, format) { + : ContentObject(name, format, additional_header_size) { appendPayload(payload, size); } -ContentObject::ContentObject(const uint8_t *buffer, std::size_t size) - : Packet(buffer, size) { - if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < - 0) { - throw errors::RuntimeException("Error getting name from content object."); - } +ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) { + name_ = std::move(other.name_); } -ContentObject::ContentObject(MemBufPtr &&buffer) : Packet(std::move(buffer)) { - if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < - 0) { - throw errors::RuntimeException("Error getting name from content object."); - } +ContentObject::ContentObject(const ContentObject &other) : Packet(other) { + name_ = other.name_; } -ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) { - name_ = std::move(other.name_); - - if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < - 0) { - throw errors::MalformedPacketException(); - } +ContentObject &ContentObject::operator=(const ContentObject &other) { + return (ContentObject &)Packet::operator=(other); } ContentObject::~ContentObject() {} @@ -132,10 +124,11 @@ uint32_t ContentObject::getPathLabel() const { "Error retrieving the path label from content object"); } - return path_label; + return ntohl(path_label); } ContentObject &ContentObject::setPathLabel(uint32_t path_label) { + path_label = htonl(path_label); if (hicn_data_set_path_label((hicn_header_t *)packet_start_, path_label) < 0) { throw errors::RuntimeException( diff --git a/libtransport/src/core/errors.cc b/libtransport/src/core/errors.cc new file mode 100644 index 000000000..82647a60b --- /dev/null +++ b/libtransport/src/core/errors.cc @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/errors.h> + +namespace transport { +namespace core { + +const std::error_category& core_category() { + static core_category_impl instance; + + return instance; +} + +const char* core_category_impl::name() const throw() { + return "transport::protocol::error"; +} + +std::string core_category_impl::message(int ev) const { + switch (static_cast<core_error>(ev)) { + case core_error::success: { + return "Success"; + } + case core_error::configuration_parse_failed: { + return "Error parsing configuration."; + } + case core_error::configuration_not_applied: { + return "Configuration was not applied due to wrong parameters."; + } + default: { + return "Unknown core error"; + } + } +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/errors.h b/libtransport/src/core/errors.h new file mode 100644 index 000000000..a46f1dbcd --- /dev/null +++ b/libtransport/src/core/errors.h @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace core { + +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& core_category(); + +/** + * The list of errors. + */ +enum class core_error { + success = 0, + configuration_parse_failed, + configuration_not_applied +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(core_error error) { + return std::error_code(static_cast<int>(error), core_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(core_error error) { + return std::error_condition(static_cast<int>(error), core_category()); +} + +/** + * @brief A server error category. + */ +class core_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace core +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::core::core_error> + : public std::true_type {}; +// } // namespace system +} // namespace std
\ No newline at end of file diff --git a/libtransport/src/core/facade.h b/libtransport/src/core/facade.h index 04f643f63..199081271 100644 --- a/libtransport/src/core/facade.h +++ b/libtransport/src/core/facade.h @@ -15,36 +15,14 @@ #pragma once -#include <core/forwarder_interface.h> -#include <core/hicn_forwarder_interface.h> #include <core/manifest_format_fixed.h> #include <core/manifest_inline.h> #include <core/portal.h> -#ifdef __linux__ -#ifndef __ANDROID__ -#include <core/raw_socket_interface.h> -#ifdef __vpp__ -#include <core/vpp_forwarder_interface.h> -#endif -#endif -#endif - namespace transport { namespace core { -using HicnForwarderPortal = Portal<HicnForwarderInterface>; - -#ifdef __linux__ -#ifndef __ANDROID__ -using RawSocketPortal = Portal<RawSocketInterface>; -#endif -#ifdef __vpp__ -using VPPForwarderPortal = Portal<VPPForwarderInterface>; -#endif -#endif - using ContentObjectManifest = core::ManifestInline<ContentObject, Fixed>; using InterestManifest = core::ManifestInline<Interest, Fixed>; diff --git a/libtransport/src/core/fec.cc b/libtransport/src/core/fec.cc new file mode 100644 index 000000000..134198b9e --- /dev/null +++ b/libtransport/src/core/fec.cc @@ -0,0 +1,878 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980624 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +/* + * The following parameter defines how many bits are used for + * field elements. The code supports any value from 2 to 16 + * but fastest operation is achieved with 8 bit elements + * This is the only parameter you may want to change. + */ +#ifndef GF_BITS +#define GF_BITS 8 /* code over GF(2**GF_BITS) - change to suit */ +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <hicn/transport/portability/platform.h> +#include "fec.h" + +/** + * XXX This disable a warning raising only in some platforms. + * TODO Check if this warning is a mistake or it is a real bug: + * https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83404 + * https://gcc.gnu.org/bugzilla//show_bug.cgi?id=88059 + */ +#ifndef __clang__ +#pragma GCC diagnostic ignored "-Wstringop-overflow" +#endif + +/* + * compatibility stuff + */ +#ifdef MSDOS /* but also for others, e.g. sun... */ +#define NEED_BCOPY +#define bcmp(a,b,n) memcmp(a,b,n) +#endif + +#ifdef ANDROID +#define bcmp(a,b,n) memcmp(a,b,n) +#endif + +#ifdef NEED_BCOPY +#define bcopy(s, d, siz) memcpy((d), (s), (siz)) +#define bzero(d, siz) memset((d), '\0', (siz)) +#endif + +/* + * stuff used for testing purposes only + */ + +#ifdef TEST +#define DEB(x) +#define DDB(x) x +#define DEBUG 0 /* minimal debugging */ +#ifdef MSDOS +#include <time.h> +struct timeval { + unsigned long ticks; +}; +#define gettimeofday(x, dummy) { (x)->ticks = clock() ; } +#define DIFF_T(a,b) (1+ 1000000*(a.ticks - b.ticks) / CLOCKS_PER_SEC ) +typedef unsigned long u_long ; +typedef unsigned short u_short ; +#else /* typically, unix systems */ +#include <sys/time.h> +#define DIFF_T(a,b) \ + (1+ 1000000*(a.tv_sec - b.tv_sec) + (a.tv_usec - b.tv_usec) ) +#endif + +#define TICK(t) \ + {struct timeval x ; \ + gettimeofday(&x, NULL) ; \ + t = x.tv_usec + 1000000* (x.tv_sec & 0xff ) ; \ + } +#define TOCK(t) \ + { u_long t1 ; TICK(t1) ; \ + if (t1 < t) t = 256000000 + t1 - t ; \ + else t = t1 - t ; \ + if (t == 0) t = 1 ;} + +u_long ticks[10]; /* vars for timekeeping */ +#else +#define DEB(x) +#define DDB(x) +#define TICK(x) +#define TOCK(x) +#endif /* TEST */ + +/* + * You should not need to change anything beyond this point. + * The first part of the file implements linear algebra in GF. + * + * gf is the type used to store an element of the Galois Field. + * Must constain at least GF_BITS bits. + * + * Note: unsigned char will work up to GF(256) but int seems to run + * faster on the Pentium. We use int whenever have to deal with an + * index, since they are generally faster. + */ +#if (GF_BITS < 2 && GF_BITS >16) +#error "GF_BITS must be 2 .. 16" +#endif + + +#define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */ + +/* + * Primitive polynomials - see Lin & Costello, Appendix A, + * and Lee & Messerschmitt, p. 453. + */ +static const char *allPp[] = { /* GF_BITS polynomial */ + NULL, /* 0 no code */ + NULL, /* 1 no code */ + "111", /* 2 1+x+x^2 */ + "1101", /* 3 1+x+x^3 */ + "11001", /* 4 1+x+x^4 */ + "101001", /* 5 1+x^2+x^5 */ + "1100001", /* 6 1+x+x^6 */ + "10010001", /* 7 1 + x^3 + x^7 */ + "101110001", /* 8 1+x^2+x^3+x^4+x^8 */ + "1000100001", /* 9 1+x^4+x^9 */ + "10010000001", /* 10 1+x^3+x^10 */ + "101000000001", /* 11 1+x^2+x^11 */ + "1100101000001", /* 12 1+x+x^4+x^6+x^12 */ + "11011000000001", /* 13 1+x+x^3+x^4+x^13 */ + "110000100010001", /* 14 1+x+x^6+x^10+x^14 */ + "1100000000000001", /* 15 1+x+x^15 */ + "11010000000010001" /* 16 1+x+x^3+x^12+x^16 */ +}; + + +/* + * To speed up computations, we have tables for logarithm, exponent + * and inverse of a number. If GF_BITS <= 8, we use a table for + * multiplication as well (it takes 64K, no big deal even on a PDA, + * especially because it can be pre-initialized an put into a ROM!), + * otherwhise we use a table of logarithms. + * In any case the macro gf_mul(x,y) takes care of multiplications. + */ + +static gf gf_exp[2*GF_SIZE]; /* index->poly form conversion table */ +static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */ +static gf inverse[GF_SIZE+1]; /* inverse of field elem. */ + /* inv[\alpha**i]=\alpha**(GF_SIZE-i-1) */ + +/* + * modnn(x) computes x % GF_SIZE, where GF_SIZE is 2**GF_BITS - 1, + * without a slow divide. + */ +static inline gf +modnn(int x) +{ + while (x >= GF_SIZE) { + x -= GF_SIZE; + x = (x >> GF_BITS) + (x & GF_SIZE); + } + return x; +} + +#define SWAP(a,b,t) {t tmp; tmp=a; a=b; b=tmp;} + +/* + * gf_mul(x,y) multiplies two numbers. If GF_BITS<=8, it is much + * faster to use a multiplication table. + * + * USE_GF_MULC, GF_MULC0(c) and GF_ADDMULC(x) can be used when multiplying + * many numbers by the same constant. In this case the first + * call sets the constant, and others perform the multiplications. + * A value related to the multiplication is held in a local variable + * declared with USE_GF_MULC . See usage in addmul1(). + */ +#if (GF_BITS <= 8) +static gf gf_mul_table[GF_SIZE + 1][GF_SIZE + 1]; + +#define gf_mul(x,y) gf_mul_table[x][y] + +#define USE_GF_MULC gf * __gf_mulc_ +#define GF_MULC0(c) __gf_mulc_ = gf_mul_table[c] +#define GF_ADDMULC(dst, x) dst ^= __gf_mulc_[x] + +static void +init_mul_table() +{ + int i, j; + for (i=0; i< GF_SIZE+1; i++) + for (j=0; j< GF_SIZE+1; j++) + gf_mul_table[i][j] = gf_exp[modnn(gf_log[i] + gf_log[j]) ] ; + + for (j=0; j< GF_SIZE+1; j++) + gf_mul_table[0][j] = gf_mul_table[j][0] = 0; +} +#else /* GF_BITS > 8 */ +static inline gf +gf_mul(x,y) +{ + if ( (x) == 0 || (y)==0 ) return 0; + + return gf_exp[gf_log[x] + gf_log[y] ] ; +} +#define init_mul_table() + +#define USE_GF_MULC register gf * __gf_mulc_ +#define GF_MULC0(c) __gf_mulc_ = &gf_exp[ gf_log[c] ] +#define GF_ADDMULC(dst, x) { if (x) dst ^= __gf_mulc_[ gf_log[x] ] ; } +#endif + +/* + * Generate GF(2**m) from the irreducible polynomial p(X) in p[0]..p[m] + * Lookup tables: + * index->polynomial form gf_exp[] contains j= \alpha^i; + * polynomial form -> index form gf_log[ j = \alpha^i ] = i + * \alpha=x is the primitive element of GF(2^m) + * + * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple + * multiplication of two numbers can be resolved without calling modnn + */ + +/* + * i use malloc so many times, it is easier to put checks all in + * one place. + */ +static void * +my_malloc(int sz, const char *err_string) +{ + void *p = malloc( sz ); + if (p == NULL) { + fprintf(stderr, "-- malloc failure allocating %s\n", err_string); + exit(1) ; + } + return p ; +} + +#define NEW_GF_MATRIX(rows, cols) \ + (gf *)my_malloc(rows * cols * sizeof(gf), " ## __LINE__ ## " ) + +/* + * initialize the data structures used for computations in GF. + */ +static void +generate_gf(void) +{ + int i; + gf mask; + const char *Pp = allPp[GF_BITS] ; + + mask = 1; /* x ** 0 = 1 */ + gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */ + /* + * first, generate the (polynomial representation of) powers of \alpha, + * which are stored in gf_exp[i] = \alpha ** i . + * At the same time build gf_log[gf_exp[i]] = i . + * The first GF_BITS powers are simply bits shifted to the left. + */ + for (i = 0; i < GF_BITS; i++, mask <<= 1 ) { + gf_exp[i] = mask; + gf_log[gf_exp[i]] = i; + /* + * If Pp[i] == 1 then \alpha ** i occurs in poly-repr + * gf_exp[GF_BITS] = \alpha ** GF_BITS + */ + if ( Pp[i] == '1' ) + gf_exp[GF_BITS] ^= mask; + } + /* + * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can als + * compute its inverse. + */ + gf_log[gf_exp[GF_BITS]] = GF_BITS; + /* + * Poly-repr of \alpha ** (i+1) is given by poly-repr of + * \alpha ** i shifted left one-bit and accounting for any + * \alpha ** GF_BITS term that may occur when poly-repr of + * \alpha ** i is shifted. + */ + mask = 1 << (GF_BITS - 1 ) ; + for (i = GF_BITS + 1; i < GF_SIZE; i++) { + if (gf_exp[i - 1] >= mask) + gf_exp[i] = gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1); + else + gf_exp[i] = gf_exp[i - 1] << 1; + gf_log[gf_exp[i]] = i; + } + /* + * log(0) is not defined, so use a special value + */ + gf_log[0] = GF_SIZE ; + /* set the extended gf_exp values for fast multiply */ + for (i = 0 ; i < GF_SIZE ; i++) + gf_exp[i + GF_SIZE] = gf_exp[i] ; + + /* + * again special cases. 0 has no inverse. This used to + * be initialized to GF_SIZE, but it should make no difference + * since noone is supposed to read from here. + */ + inverse[0] = 0 ; + inverse[1] = 1; + for (i=2; i<=GF_SIZE; i++) + inverse[i] = gf_exp[GF_SIZE-gf_log[i]]; +} + +/* + * Various linear algebra operations that i use often. + */ + +/* + * addmul() computes dst[] = dst[] + c * src[] + * This is used often, so better optimize it! Currently the loop is + * unrolled 16 times, a good value for 486 and pentium-class machines. + * The case c=0 is also optimized, whereas c=1 is not. These + * calls are unfrequent in my typical apps so I did not bother. + * + * Note that gcc on + */ +#define addmul(dst, src, c, sz) \ + if (c != 0) addmul1(dst, src, c, sz) + +#define UNROLL 16 /* 1, 4, 8, 16 */ +static void +addmul1(gf *dst1, gf *src1, gf c, int sz) +{ + USE_GF_MULC ; + gf *dst = dst1, *src = src1 ; + gf *lim = &dst[sz - UNROLL + 1] ; + + GF_MULC0(c) ; + +#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */ + for (; dst < lim ; dst += UNROLL, src += UNROLL ) { + GF_ADDMULC( dst[0] , src[0] ); + GF_ADDMULC( dst[1] , src[1] ); + GF_ADDMULC( dst[2] , src[2] ); + GF_ADDMULC( dst[3] , src[3] ); +#if (UNROLL > 4) + GF_ADDMULC( dst[4] , src[4] ); + GF_ADDMULC( dst[5] , src[5] ); + GF_ADDMULC( dst[6] , src[6] ); + GF_ADDMULC( dst[7] , src[7] ); +#endif +#if (UNROLL > 8) + GF_ADDMULC( dst[8] , src[8] ); + GF_ADDMULC( dst[9] , src[9] ); + GF_ADDMULC( dst[10] , src[10] ); + GF_ADDMULC( dst[11] , src[11] ); + GF_ADDMULC( dst[12] , src[12] ); + GF_ADDMULC( dst[13] , src[13] ); + GF_ADDMULC( dst[14] , src[14] ); + GF_ADDMULC( dst[15] , src[15] ); +#endif + } +#endif + lim += UNROLL - 1 ; + for (; dst < lim; dst++, src++ ) /* final components */ + GF_ADDMULC( *dst , *src ); +} + +/* + * computes C = AB where A is n*k, B is k*m, C is n*m + */ +static void +matmul(gf *a, gf *b, gf *c, int n, int k, int m) +{ + int row, col, i ; + + for (row = 0; row < n ; row++) { + for (col = 0; col < m ; col++) { + gf *pa = &a[ row * k ]; + gf *pb = &b[ col ]; + gf acc = 0 ; + for (i = 0; i < k ; i++, pa++, pb += m ) + acc ^= gf_mul( *pa, *pb ) ; + c[ row * m + col ] = acc ; + } + } +} + +#ifdef DEBUGG +/* + * returns 1 if the square matrix is identiy + * (only for test) + */ +static int +is_identity(gf *m, int k) +{ + int row, col ; + for (row=0; row<k; row++) + for (col=0; col<k; col++) + if ( (row==col && *m != 1) || + (row!=col && *m != 0) ) + return 0 ; + else + m++ ; + return 1 ; +} +#endif /* debug */ + +/* + * invert_mat() takes a matrix and produces its inverse + * k is the size of the matrix. + * (Gauss-Jordan, adapted from Numerical Recipes in C) + * Return non-zero if singular. + */ +DEB( int pivloops=0; int pivswaps=0 ; /* diagnostic */) +static int +invert_mat(gf *src, int k) +{ + gf c, *p ; + int irow, icol, row, col, i, ix ; + + int error = 1 ; + int *indxc = (int*)my_malloc(k*sizeof(int), "indxc"); + int *indxr = (int*)my_malloc(k*sizeof(int), "indxr"); + int *ipiv = (int*)my_malloc(k*sizeof(int), "ipiv"); + gf *id_row = NEW_GF_MATRIX(1, k); + gf *temp_row = NEW_GF_MATRIX(1, k); + + bzero(id_row, k*sizeof(gf)); + DEB( pivloops=0; pivswaps=0 ; /* diagnostic */ ) + /* + * ipiv marks elements already used as pivots. + */ + for (i = 0; i < k ; i++) + ipiv[i] = 0 ; + + for (col = 0; col < k ; col++) { + gf *pivot_row ; + /* + * Zeroing column 'col', look for a non-zero element. + * First try on the diagonal, if it fails, look elsewhere. + */ + irow = icol = -1 ; + if (ipiv[col] != 1 && src[col*k + col] != 0) { + irow = col ; + icol = col ; + goto found_piv ; + } + for (row = 0 ; row < k ; row++) { + if (ipiv[row] != 1) { + for (ix = 0 ; ix < k ; ix++) { + DEB( pivloops++ ; ) + if (ipiv[ix] == 0) { + if (src[row*k + ix] != 0) { + irow = row ; + icol = ix ; + goto found_piv ; + } + } else if (ipiv[ix] > 1) { + fprintf(stderr, "singular matrix\n"); + goto fail ; + } + } + } + } + if (icol == -1) { + fprintf(stderr, "XXX pivot not found!\n"); + goto fail ; + } +found_piv: + ++(ipiv[icol]) ; + /* + * swap rows irow and icol, so afterwards the diagonal + * element will be correct. Rarely done, not worth + * optimizing. + */ + if (irow != icol) { + for (ix = 0 ; ix < k ; ix++ ) { + SWAP( src[irow*k + ix], src[icol*k + ix], gf) ; + } + } + indxr[col] = irow ; + indxc[col] = icol ; + pivot_row = &src[icol*k] ; + c = pivot_row[icol] ; + if (c == 0) { + fprintf(stderr, "singular matrix 2\n"); + goto fail ; + } + if (c != 1 ) { /* otherwhise this is a NOP */ + /* + * this is done often , but optimizing is not so + * fruitful, at least in the obvious ways (unrolling) + */ + DEB( pivswaps++ ; ) + c = inverse[ c ] ; + pivot_row[icol] = 1 ; + for (ix = 0 ; ix < k ; ix++ ) + pivot_row[ix] = gf_mul(c, pivot_row[ix] ); + } + /* + * from all rows, remove multiples of the selected row + * to zero the relevant entry (in fact, the entry is not zero + * because we know it must be zero). + * (Here, if we know that the pivot_row is the identity, + * we can optimize the addmul). + */ + id_row[icol] = 1; + if (bcmp(pivot_row, id_row, k*sizeof(gf)) != 0) { + for (p = src, ix = 0 ; ix < k ; ix++, p += k ) { + if (ix != icol) { + c = p[icol] ; + p[icol] = 0 ; + addmul(p, pivot_row, c, k ); + } + } + } + id_row[icol] = 0; + } /* done all columns */ + for (col = k-1 ; col >= 0 ; col-- ) { + if (indxr[col] <0 || indxr[col] >= k) + fprintf(stderr, "AARGH, indxr[col] %d\n", indxr[col]); + else if (indxc[col] <0 || indxc[col] >= k) + fprintf(stderr, "AARGH, indxc[col] %d\n", indxc[col]); + else + if (indxr[col] != indxc[col] ) { + for (row = 0 ; row < k ; row++ ) { + SWAP( src[row*k + indxr[col]], src[row*k + indxc[col]], gf) ; + } + } + } + error = 0 ; +fail: + free(indxc); + free(indxr); + free(ipiv); + free(id_row); + free(temp_row); + return error ; +} + +/* + * fast code for inverting a vandermonde matrix. + * XXX NOTE: It assumes that the matrix + * is not singular and _IS_ a vandermonde matrix. Only uses + * the second column of the matrix, containing the p_i's. + * + * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but + * largely revised for my purposes. + * p = coefficients of the matrix (p_i) + * q = values of the polynomial (known) + */ + +int +invert_vdm(gf *src, int k) +{ + int i, j, row, col ; + gf *b, *c, *p; + gf t, xx ; + + if (k == 1) /* degenerate case, matrix must be p^0 = 1 */ + return 0 ; + /* + * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1 + * b holds the coefficient for the matrix inversion + */ + c = NEW_GF_MATRIX(1, k); + b = NEW_GF_MATRIX(1, k); + + p = NEW_GF_MATRIX(1, k); + + for ( j=1, i = 0 ; i < k ; i++, j+=k ) { + c[i] = 0 ; + p[i] = src[j] ; /* p[i] */ + } + /* + * construct coeffs. recursively. We know c[k] = 1 (implicit) + * and start P_0 = x - p_0, then at each stage multiply by + * x - p_i generating P_i = x P_{i-1} - p_i P_{i-1} + * After k steps we are done. + */ + c[k-1] = p[0] ; /* really -p(0), but x = -x in GF(2^m) */ + for (i = 1 ; i < k ; i++ ) { + gf p_i = p[i] ; /* see above comment */ + for (j = k-1 - ( i - 1 ) ; j < k-1 ; j++ ) + c[j] ^= gf_mul( p_i, c[j+1] ) ; + c[k-1] ^= p_i ; + } + + for (row = 0 ; row < k ; row++ ) { + /* + * synthetic division etc. + */ + xx = p[row] ; + t = 1 ; + b[k-1] = 1 ; /* this is in fact c[k] */ + for (i = k-2 ; i >= 0 ; i-- ) { + b[i] = c[i+1] ^ gf_mul(xx, b[i+1]) ; + t = gf_mul(xx, t) ^ b[i] ; + } + for (col = 0 ; col < k ; col++ ) + src[col*k + row] = gf_mul(inverse[t], b[col] ); + } + free(c) ; + free(b) ; + free(p) ; + return 0 ; +} + +static int fec_initialized = 0 ; +static void +init_fec() +{ + TICK(ticks[0]); + generate_gf(); + TOCK(ticks[0]); + DDB(fprintf(stderr, "generate_gf took %ldus\n", ticks[0]);) + TICK(ticks[0]); + init_mul_table(); + TOCK(ticks[0]); + DDB(fprintf(stderr, "init_mul_table took %ldus\n", ticks[0]);) + fec_initialized = 1 ; +} + +/* + * This section contains the proper FEC encoding/decoding routines. + * The encoding matrix is computed starting with a Vandermonde matrix, + * and then transforming it into a systematic matrix. + */ + +#define FEC_MAGIC 0xFECC0DEC + +void +fec_free(struct fec_parms *p) +{ + if (p==NULL || + p->magic != ( ( (FEC_MAGIC ^ p->k) ^ p->n) ^ (unsigned long)(p->enc_matrix)) ) { + fprintf(stderr, "bad parameters to fec_free\n"); + return ; + } + free(p->enc_matrix); + free(p); +} + +/* + * create a new encoder, returning a descriptor. This contains k,n and + * the encoding matrix. + */ +struct fec_parms * +fec_new(int k, int n) +{ + int row, col ; + gf *p, *tmp_m ; + + struct fec_parms *retval ; + + if (fec_initialized == 0) + init_fec(); + + if (k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n ) { + fprintf(stderr, "Invalid parameters k %d n %d GF_SIZE %d\n", + k, n, GF_SIZE ); + return NULL ; + } + retval = (struct fec_parms *)my_malloc(sizeof(struct fec_parms), "new_code"); + retval->k = k ; + retval->n = n ; + retval->enc_matrix = NEW_GF_MATRIX(n, k); + retval->magic = ( ( FEC_MAGIC ^ k) ^ n) ^ (unsigned long)(retval->enc_matrix) ; + tmp_m = NEW_GF_MATRIX(n, k); + /* + * fill the matrix with powers of field elements, starting from 0. + * The first row is special, cannot be computed with exp. table. + */ + tmp_m[0] = 1 ; + for (col = 1; col < k ; col++) + tmp_m[col] = 0 ; + for (p = tmp_m + k, row = 0; row < n-1 ; row++, p += k) { + for ( col = 0 ; col < k ; col ++ ) + p[col] = gf_exp[modnn(row*col)]; + } + + /* + * quick code to build systematic matrix: invert the top + * k*k vandermonde matrix, multiply right the bottom n-k rows + * by the inverse, and construct the identity matrix at the top. + */ + TICK(ticks[3]); + invert_vdm(tmp_m, k); /* much faster than invert_mat */ + matmul(tmp_m + k*k, tmp_m, retval->enc_matrix + k*k, n - k, k, k); + /* + * the upper matrix is I so do not bother with a slow multiply + */ + bzero(retval->enc_matrix, k*k*sizeof(gf) ); + for (p = retval->enc_matrix, col = 0 ; col < k ; col++, p += k+1 ) + *p = 1 ; + free(tmp_m); + TOCK(ticks[3]); + + DDB(fprintf(stderr, "--- %ld us to build encoding matrix\n", + ticks[3]);) + DEB(pr_matrix(retval->enc_matrix, n, k, "encoding_matrix");) + return retval ; +} + +/* + * fec_encode accepts as input pointers to n data packets of size sz, + * and produces as output a packet pointed to by fec, computed + * with index "index". + */ +void +fec_encode(struct fec_parms *code, gf *src[], gf *fec, int index, int sz) +{ + int i, k = code->k ; + gf *p ; + + if (GF_BITS > 8) + sz /= 2 ; + + if (index < k) + bcopy(src[index], fec, sz*sizeof(gf) ) ; + else if (index < code->n) { + p = &(code->enc_matrix[index*k] ); + bzero(fec, sz*sizeof(gf)); + for (i = 0; i < k ; i++) + addmul(fec, src[i], p[i], sz ) ; + } else + fprintf(stderr, "Invalid index %d (max %d)\n", + index, code->n - 1 ); +} + +/* + * shuffle move src packets in their position + */ +static int +shuffle(gf *pkt[], int index[], int k) +{ + int i; + + for ( i = 0 ; i < k ; ) { + if (index[i] >= k || index[i] == i) + i++ ; + else { + /* + * put pkt in the right position (first check for conflicts). + */ + int c = index[i] ; + + if (index[c] == c) { + DEB(fprintf(stderr, "\nshuffle, error at %d\n", i);) + return 1 ; + } + SWAP(index[i], index[c], int) ; + SWAP(pkt[i], pkt[c], gf *) ; + } + } + DEB( /* just test that it works... */ + for ( i = 0 ; i < k ; i++ ) { + if (index[i] < k && index[i] != i) { + fprintf(stderr, "shuffle: after\n"); + for (i=0; i<k ; i++) fprintf(stderr, "%3d ", index[i]); + fprintf(stderr, "\n"); + return 1 ; + } + } + ) + return 0 ; +} + +/* + * build_decode_matrix constructs the encoding matrix given the + * indexes. The matrix must be already allocated as + * a vector of k*k elements, in row-major order + */ +static gf * +build_decode_matrix(struct fec_parms *code, gf *pkt[], int index[]) +{ + int i , k = code->k ; + gf *p, *matrix = NEW_GF_MATRIX(k, k); + + TICK(ticks[9]); + for (i = 0, p = matrix ; i < k ; i++, p += k ) { +#if 1 /* this is simply an optimization, not very useful indeed */ + if (index[i] < k) { + bzero(p, k*sizeof(gf) ); + p[i] = 1 ; + } else +#endif + if (index[i] < code->n ) + bcopy( &(code->enc_matrix[index[i]*k]), p, k*sizeof(gf) ); + else { + fprintf(stderr, "decode: invalid index %d (max %d)\n", + index[i], code->n - 1 ); + free(matrix) ; + return NULL ; + } + } + TICK(ticks[9]); + if (invert_mat(matrix, k)) { + free(matrix); + matrix = NULL ; + } + TOCK(ticks[9]); + return matrix ; +} + +/* + * fec_decode receives as input a vector of packets, the indexes of + * packets, and produces the correct vector as output. + * + * Input: + * code: pointer to code descriptor + * pkt: pointers to received packets. They are modified + * to store the output packets (in place) + * index: pointer to packet indexes (modified) + * sz: size of each packet + */ +int +fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) +{ + gf *m_dec ; + gf **new_pkt ; + int row, col , k = code->k ; + + if (GF_BITS > 8) + sz /= 2 ; + + if (shuffle(pkt, index, k)) /* error if true */ + return 1 ; + m_dec = build_decode_matrix(code, pkt, index); + + if (m_dec == NULL) + return 1 ; /* error */ + /* + * do the actual decoding + */ + new_pkt = (gf**)my_malloc (k * sizeof (gf * ), "new pkt pointers" ); + for (row = 0 ; row < k ; row++ ) { + if (index[row] >= k) { + new_pkt[row] = (gf*) my_malloc (sz * sizeof (gf), "new pkt buffer" ); + bzero(new_pkt[row], sz * sizeof(gf) ) ; + for (col = 0 ; col < k ; col++ ) + addmul(new_pkt[row], pkt[col], m_dec[row*k + col], sz) ; + } + } + /* + * move pkts to their final destination + */ + for (row = 0 ; row < k ; row++ ) { + if (index[row] >= k) { + bcopy(new_pkt[row], pkt[row], sz*sizeof(gf)); + free(new_pkt[row]); + } + } + free(new_pkt); + free(m_dec); + + return 0; +} diff --git a/libtransport/src/core/fec.h b/libtransport/src/core/fec.h new file mode 100644 index 000000000..8234057a7 --- /dev/null +++ b/libtransport/src/core/fec.h @@ -0,0 +1,65 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980614 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +/* + * The following parameter defines how many bits are used for + * field elements. The code supports any value from 2 to 16 + * but fastest operation is achieved with 8 bit elements + * This is the only parameter you may want to change. + */ +#ifndef GF_BITS +#define GF_BITS 8 /* code over GF(2**GF_BITS) - change to suit */ +#endif + +#if (GF_BITS <= 8) +typedef unsigned char gf; +#else +typedef unsigned short gf; +#endif + +#define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */ + +struct fec_parms { + unsigned long magic ; + int k, n ; /* parameters of the code */ + gf *enc_matrix ; +}; + +void fec_free(struct fec_parms *p) ; +struct fec_parms *fec_new(int k, int n) ; + +void fec_encode(struct fec_parms *code, gf *src[], gf *fec, int index, int sz); +int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz); + +/* end of file */ diff --git a/libtransport/src/core/global_configuration.cc b/libtransport/src/core/global_configuration.cc new file mode 100644 index 000000000..e0b6c040a --- /dev/null +++ b/libtransport/src/core/global_configuration.cc @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <hicn/transport/core/connector.h> +#include <hicn/transport/utils/log.h> + +#include <libconfig.h++> +#include <map> + +namespace transport { +namespace core { + +GlobalConfiguration::GlobalConfiguration() {} + +bool GlobalConfiguration::parseTransportConfig(const std::string& path) { + using namespace libconfig; + Config cfg; + + try { + cfg.readFile(path.c_str()); + } catch (const FileIOException& fioex) { + TRANSPORT_LOGE("I/O error while reading file."); + return false; + } catch (const ParseException& pex) { + TRANSPORT_LOGE("Parse error at %s:%d - %s", pex.getFile(), pex.getLine(), + pex.getError()); + return false; + } + + Setting& root = cfg.getRoot(); + + /** + * Iterate over sections. Best thing to do here would be to have other + * components of the program registering a callback here, to parse their + * section of the configuration file. + */ + for (auto section = root.begin(); section != root.end(); section++) { + std::string name = section->getName(); + std::error_code ec; + TRANSPORT_LOGD("Parsing Section: %s", name.c_str()); + + auto it = configuration_parsers_.find(name); + if (it != configuration_parsers_.end() && !it->second.first) { + TRANSPORT_LOGD("Found valid configuration parser"); + it->second.second(*section, ec); + it->second.first = true; + } + } + + return true; +} + +void GlobalConfiguration::parseConfiguration(const std::string& path) { + // Check if an environment variable with the configuration path exists. COnf + // variable comes first. + std::unique_lock<std::mutex> lck(cp_mtx_); + if (const char* env_c = std::getenv(GlobalConfiguration::conf_file)) { + parseTransportConfig(env_c); + } else if (!path.empty()) { + conf_file_path_ = path; + parseTransportConfig(conf_file_path_); + } else { + TRANSPORT_LOGD( + "Called parseConfiguration but no configuration file was provided."); + } +} + +void GlobalConfiguration::registerConfigurationSetter( + const std::string& key, const SetCallback& set_callback) { + std::unique_lock<std::mutex> lck(cp_mtx_); + if (configuration_setters_.find(key) != configuration_setters_.end()) { + TRANSPORT_LOGW( + "Trying to register configuration setter %s twice. Ignoring second " + "registration attempt.", + key.c_str()); + } else { + configuration_setters_.emplace(key, set_callback); + } +} + +void GlobalConfiguration::registerConfigurationGetter( + const std::string& key, const GetCallback& get_callback) { + std::unique_lock<std::mutex> lck(cp_mtx_); + if (configuration_getters_.find(key) != configuration_getters_.end()) { + TRANSPORT_LOGW( + "Trying to register configuration getter %s twice. Ignoring second " + "registration attempt.", + key.c_str()); + } else { + configuration_getters_.emplace(key, get_callback); + } +} + +void GlobalConfiguration::registerConfigurationParser( + const std::string& key, const ParserCallback& parser) { + std::unique_lock<std::mutex> lck(cp_mtx_); + if (configuration_parsers_.find(key) != configuration_parsers_.end()) { + TRANSPORT_LOGW( + "Trying to register configuration key %s twice. Ignoring second " + "registration attempt.", + key.c_str()); + } else { + configuration_parsers_.emplace(key, std::make_pair(false, parser)); + + // Trigger a parsing of the configuration. + if (!conf_file_path_.empty()) { + parseTransportConfig(conf_file_path_); + } + } +} + +void GlobalConfiguration::unregisterConfigurationParser( + const std::string& key) { + std::unique_lock<std::mutex> lck(cp_mtx_); + auto it = configuration_parsers_.find(key); + if (it != configuration_parsers_.end()) { + configuration_parsers_.erase(it); + } +} + +void GlobalConfiguration::unregisterConfigurationSetter( + const std::string& key) { + std::unique_lock<std::mutex> lck(cp_mtx_); + auto it = configuration_setters_.find(key); + if (it != configuration_setters_.end()) { + configuration_setters_.erase(it); + } +} + +void GlobalConfiguration::unregisterConfigurationGetter( + const std::string& key) { + std::unique_lock<std::mutex> lck(cp_mtx_); + auto it = configuration_getters_.find(key); + if (it != configuration_getters_.end()) { + configuration_getters_.erase(it); + } +} + +void GlobalConfiguration::getConfiguration( + interface::global_config::ConfigurationObject& configuration_object, + std::error_code& ec) { + auto it = configuration_getters_.find(configuration_object.getKey()); + + if (it != configuration_getters_.end()) { + it->second(configuration_object, ec); + } +} + +void GlobalConfiguration::setConfiguration( + const interface::global_config::ConfigurationObject& configuration_object, + std::error_code& ec) { + auto it = configuration_setters_.find(configuration_object.getKey()); + + if (it != configuration_setters_.end()) { + it->second(configuration_object, ec); + } +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/global_configuration.h b/libtransport/src/core/global_configuration.h new file mode 100644 index 000000000..dcc8d94e3 --- /dev/null +++ b/libtransport/src/core/global_configuration.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/utils/singleton.h> + +#include <functional> +#include <map> +#include <memory> +#include <mutex> +#include <system_error> + +namespace libconfig { +class Setting; +} + +namespace transport { +namespace core { + +/** + * Class holding workflow for global configuration. + * This class does not contains the actual configuration, which is rather stored + * inside the modules to be configured. This class contains the handlers to call + * for getting/setting the configurations and to parse the corresponding + * sections of the configuration file. Each class register 3 callbacks: one to + * parse conf section and 2 to set/get the configuration through programming + * interface. + */ +class GlobalConfiguration : public utils::Singleton<GlobalConfiguration> { + static const constexpr char *conf_file = "TRANSPORT_CONFIG"; + friend class utils::Singleton<GlobalConfiguration>; + + public: + /** + * This callback will be called by GlobalConfiguration in + * + */ + using ParserCallback = std::function<void(const libconfig::Setting &config, + std::error_code &ec)>; + using GetCallback = + std::function<void(interface::global_config::ConfigurationObject &object, + std::error_code &ec)>; + + using SetCallback = std::function<void( + const interface::global_config::ConfigurationObject &object, + std::error_code &ec)>; + + ~GlobalConfiguration() = default; + + public: + void parseConfiguration(const std::string &path); + + void registerConfigurationParser(const std::string &key, + const ParserCallback &parser); + + void registerConfigurationSetter(const std::string &key, + const SetCallback &set_callback); + void registerConfigurationGetter(const std::string &key, + const GetCallback &get_callback); + + void unregisterConfigurationParser(const std::string &key); + + void unregisterConfigurationSetter(const std::string &key); + + void unregisterConfigurationGetter(const std::string &key); + + void getConfiguration( + interface::global_config::ConfigurationObject &configuration_object, + std::error_code &ec); + void setConfiguration( + const interface::global_config::ConfigurationObject &configuration_object, + std::error_code &ec); + + private: + GlobalConfiguration(); + std::string conf_file_path_; + bool parseTransportConfig(const std::string &path); + + private: + std::mutex cp_mtx_; + using ParserPair = std::pair<bool, ParserCallback>; + std::map<std::string, ParserPair> configuration_parsers_; + std::map<std::string, GetCallback> configuration_getters_; + std::map<std::string, SetCallback> configuration_setters_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc index 9ee662615..06cbe9f81 100644 --- a/libtransport/src/core/interest.cc +++ b/libtransport/src/core/interest.cc @@ -31,8 +31,9 @@ namespace transport { namespace core { -Interest::Interest(const Name &interest_name, Packet::Format format) - : Packet(format) { +Interest::Interest(const Name &interest_name, Packet::Format format, + std::size_t additional_header_size) + : Packet(format, additional_header_size) { if (hicn_interest_set_name(format_, packet_start_, interest_name.getConstStructReference()) < 0) { throw errors::MalformedPacketException(); @@ -45,20 +46,14 @@ Interest::Interest(const Name &interest_name, Packet::Format format) } #ifdef __ANDROID__ -Interest::Interest(hicn_format_t format) : Interest(Name("0::0|0"), format) {} +Interest::Interest(hicn_format_t format, std::size_t additional_header_size) + : Interest(Name("0::0|0"), format, additional_header_size) {} #else -Interest::Interest(hicn_format_t format) : Interest(base_name, format) {} +Interest::Interest(hicn_format_t format, std::size_t additional_header_size) + : Interest(base_name, format, additional_header_size) {} #endif -Interest::Interest(const uint8_t *buffer, std::size_t size) - : Packet(buffer, size) { - if (hicn_interest_get_name(format_, packet_start_, - name_.getStructReference()) < 0) { - throw errors::MalformedPacketException(); - } -} - -Interest::Interest(MemBufPtr &&buffer) : Packet(std::move(buffer)) { +Interest::Interest(MemBuf &&buffer) : Packet(std::move(buffer)) { if (hicn_interest_get_name(format_, packet_start_, name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); @@ -70,6 +65,14 @@ Interest::Interest(Interest &&other_interest) name_ = std::move(other_interest.name_); } +Interest::Interest(const Interest &other_interest) : Packet(other_interest) { + name_ = other_interest.name_; +} + +Interest &Interest::operator=(const Interest &other) { + return (Interest &)Packet::operator=(other); +} + Interest::~Interest() {} const Name &Interest::getName() const { @@ -152,6 +155,59 @@ void Interest::resetForHash() { } } +bool Interest::hasManifest() { + return (getPayloadType() == PayloadType::MANIFEST); +} + +void Interest::appendSuffix(std::uint32_t suffix) { + if (TRANSPORT_EXPECT_FALSE(suffix_set_.empty())) { + setPayloadType(PayloadType::MANIFEST); + } + + suffix_set_.emplace(suffix); +} + +void Interest::encodeSuffixes() { + if (!hasManifest()) { + return; + } + + // We assume interest does not hold signature for the moment. + auto int_manifest_header = + (InterestManifestHeader *)(writableData() + headerSize()); + int_manifest_header->n_suffixes = suffix_set_.size(); + std::size_t additional_length = + int_manifest_header->n_suffixes * sizeof(uint32_t); + + uint32_t *suffix = (uint32_t *)(int_manifest_header + 1); + for (auto it = suffix_set_.begin(); it != suffix_set_.end(); it++, suffix++) { + *suffix = *it; + } + + updateLength(additional_length); +} + +uint32_t *Interest::firstSuffix() { + if (!hasManifest()) { + return nullptr; + } + + auto ret = (InterestManifestHeader *)(writableData() + headerSize()); + ret += 1; + + return (uint32_t *)ret; +} + +uint32_t Interest::numberOfSuffixes() { + if (!hasManifest()) { + return 0; + } + + auto header = (InterestManifestHeader *)(writableData() + headerSize()); + + return header->n_suffixes; +} + } // end namespace core } // end namespace transport diff --git a/libtransport/src/core/io_module.cc b/libtransport/src/core/io_module.cc new file mode 100644 index 000000000..fef0c1504 --- /dev/null +++ b/libtransport/src/core/io_module.cc @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <dlfcn.h> +#include <hicn/transport/core/io_module.h> +#include <hicn/transport/utils/log.h> + +#ifdef ANDROID +#include <io_modules/udp/hicn_forwarder_module.h> +#endif + +#include <deque> + +namespace transport { +namespace core { + +IoModule::~IoModule() {} + +IoModule *IoModule::load(const char *module_name) { +#ifdef ANDROID + return new HicnForwarderModule(); +#else + void *handle = 0; + IoModule *module = 0; + IoModule *(*creator)(void) = 0; + const char *error = 0; + + // open module + handle = dlopen(module_name, RTLD_NOW); + if (!handle) { + if ((error = dlerror()) != 0) { + TRANSPORT_LOGE("%s", error); + } + return 0; + } + + // link factory method + creator = (IoModule * (*)(void)) dlsym(handle, "create_module"); + if (!creator) { + if ((error = dlerror()) != 0) { + TRANSPORT_LOGE("%s", error); + return 0; + } + } + + // create object and return it + module = (*creator)(); + module->handle_ = handle; + + return module; +#endif +} + +bool IoModule::unload(IoModule *module) { + if (!module) { + return false; + } + +#ifdef ANDROID + delete module; +#else + // destroy object and close module + void *handle = module->handle_; + delete module; + dlclose(handle); +#endif + + return true; +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/local_connector.cc b/libtransport/src/core/local_connector.cc new file mode 100644 index 000000000..f0e36a3d7 --- /dev/null +++ b/libtransport/src/core/local_connector.cc @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/local_connector.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/errors/not_implemented_exception.h> +#include <hicn/transport/utils/log.h> + +#include <asio/io_service.hpp> + +namespace transport { +namespace core { + +LocalConnector::~LocalConnector() {} + +void LocalConnector::close() { state_ = State::CLOSED; } + +void LocalConnector::send(Packet &packet) { + if (!isConnected()) { + return; + } + + TRANSPORT_LOGD("Sending packet to local socket."); + io_service_.get().post([this, p{packet.shared_from_this()}]() mutable { + receive_callback_(this, *p, std::make_error_code(std::errc(0))); + }); +} + +void LocalConnector::send(const uint8_t *packet, std::size_t len) { + throw errors::NotImplementedException(); +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/local_connector.h b/libtransport/src/core/local_connector.h new file mode 100644 index 000000000..b0daa4f53 --- /dev/null +++ b/libtransport/src/core/local_connector.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/connector.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/utils/move_wrapper.h> +#include <hicn/transport/utils/shared_ptr_utils.h> +#include <io_modules/forwarder/errors.h> + +#ifndef ASIO_STANDALONE +#define ASIO_STANDALONE +#endif +#include <asio/io_service.hpp> + +namespace transport { +namespace core { + +class LocalConnector : public Connector { + public: + template <typename ReceiveCallback, typename SentCallback, typename OnClose, + typename OnReconnect> + LocalConnector(asio::io_service &io_service, + ReceiveCallback &&receive_callback, SentCallback &&packet_sent, + OnClose &&close_callback, OnReconnect &&on_reconnect) + : Connector(receive_callback, packet_sent, close_callback, on_reconnect), + io_service_(io_service), + io_service_work_(io_service_.get()) { + state_ = State::CONNECTED; + } + + ~LocalConnector() override; + + void send(Packet &packet) override; + + void send(const uint8_t *packet, std::size_t len) override; + + void close() override; + + auto shared_from_this() { return utils::shared_from(this); } + + private: + std::reference_wrapper<asio::io_service> io_service_; + asio::io_service::work io_service_work_; + std::string name_; +}; + +} // namespace core +} // namespace transport diff --git a/libtransport/src/core/manifest.h b/libtransport/src/core/manifest.h index eadfed752..9b25ebd67 100644 --- a/libtransport/src/core/manifest.h +++ b/libtransport/src/core/manifest.h @@ -15,11 +15,10 @@ #pragma once +#include <core/manifest_format.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> -#include <core/manifest_format.h> - #include <set> namespace transport { @@ -36,18 +35,20 @@ class Manifest : public Base { "Base must inherit from packet!"); public: + // core::ContentObjectManifest::Ptr + using Encoder = typename FormatTraits::Encoder; using Decoder = typename FormatTraits::Decoder; Manifest(std::size_t signature_size = 0) - : Base(HF_INET6_TCP_AH), + : Base(HF_INET6_TCP_AH, signature_size), encoder_(*this, signature_size), decoder_(*this) { Base::setPayloadType(PayloadType::MANIFEST); } Manifest(const core::Name &name, std::size_t signature_size = 0) - : Base(name, HF_INET6_TCP_AH), + : Base(name, HF_INET6_TCP_AH, signature_size), encoder_(*this, signature_size), decoder_(*this) { Base::setPayloadType(PayloadType::MANIFEST); @@ -55,7 +56,9 @@ class Manifest : public Base { template <typename T> Manifest(T &&base) - : Base(std::forward<T &&>(base)), encoder_(*this), decoder_(*this) { + : Base(std::forward<T &&>(base)), + encoder_(*this, 0, false), + decoder_(*this) { Base::setPayloadType(PayloadType::MANIFEST); } @@ -96,13 +99,13 @@ class Manifest : public Base { return *this; } - Manifest &setHashAlgorithm(utils::CryptoHashType hash_algorithm) { + Manifest &setHashAlgorithm(auth::CryptoHashType hash_algorithm) { hash_algorithm_ = hash_algorithm; encoder_.setHashAlgorithm(hash_algorithm_); return *this; } - utils::CryptoHashType getHashAlgorithm() { return hash_algorithm_; } + auth::CryptoHashType getHashAlgorithm() { return hash_algorithm_; } ManifestType getManifestType() const { return manifest_type_; } @@ -138,7 +141,7 @@ class Manifest : public Base { protected: ManifestType manifest_type_; - utils::CryptoHashType hash_algorithm_; + auth::CryptoHashType hash_algorithm_; bool is_last_; Encoder encoder_; diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h index 36d23f99b..b759942cb 100644 --- a/libtransport/src/core/manifest_format.h +++ b/libtransport/src/core/manifest_format.h @@ -15,8 +15,8 @@ #pragma once +#include <hicn/transport/auth/crypto_hasher.h> #include <hicn/transport/core/name.h> -#include <hicn/transport/security/crypto_hasher.h> #include <cinttypes> #include <type_traits> @@ -63,8 +63,10 @@ template <typename T> struct format_traits { using Encoder = typename T::Encoder; using Decoder = typename T::Decoder; + using Hash = typename T::Hash; using HashType = typename T::HashType; - using HashList = typename T::HashList; + using Suffix = typename T::Suffix; + using SuffixList = typename T::SuffixList; }; class Packet; @@ -86,7 +88,7 @@ class ManifestEncoder { return static_cast<Implementation &>(*this).setManifestTypeImpl(type); } - ManifestEncoder &setHashAlgorithm(utils::CryptoHashType hash) { + ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) { return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash); } @@ -160,7 +162,7 @@ class ManifestDecoder { return static_cast<const Implementation &>(*this).getManifestTypeImpl(); } - utils::CryptoHashType getHashAlgorithm() const { + auth::CryptoHashType getHashAlgorithm() const { return static_cast<const Implementation &>(*this).getHashAlgorithmImpl(); } diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc index ca80c38b1..55280b460 100644 --- a/libtransport/src/core/manifest_format_fixed.cc +++ b/libtransport/src/core/manifest_format_fixed.cc @@ -13,49 +13,50 @@ * limitations under the License. */ +#include <core/manifest_format_fixed.h> #include <hicn/transport/core/packet.h> #include <hicn/transport/utils/literals.h> -#include <core/manifest_format_fixed.h> - namespace transport { namespace core { // TODO use preallocated pool of membufs FixedManifestEncoder::FixedManifestEncoder(Packet &packet, - std::size_t signature_size) + std::size_t signature_size, + bool clear) : packet_(packet), - max_size_(Packet::default_mtu - packet_.headerSize() - signature_size), - manifest_( - utils::MemBuf::create(Packet::default_mtu - packet_.headerSize())), - manifest_header_( - reinterpret_cast<ManifestHeader *>(manifest_->writableData())), - manifest_entries_(reinterpret_cast<ManifestEntry *>( - manifest_->writableData() + sizeof(ManifestHeader))), + max_size_(Packet::default_mtu - packet_.headerSize()), + manifest_header_(reinterpret_cast<ManifestHeader *>( + packet_.writableData() + packet_.headerSize())), + manifest_entries_( + reinterpret_cast<ManifestEntry *>(manifest_header_ + 1)), current_entry_(0), signature_size_(signature_size) { - *manifest_header_ = {0}; + if (clear) { + *manifest_header_ = {0}; + } } FixedManifestEncoder::~FixedManifestEncoder() {} FixedManifestEncoder &FixedManifestEncoder::encodeImpl() { - manifest_->append(sizeof(ManifestHeader) + - manifest_header_->number_of_entries * - sizeof(ManifestEntry)); - packet_.appendPayload(std::move(manifest_)); + packet_.append(sizeof(ManifestHeader) + + manifest_header_->number_of_entries * sizeof(ManifestEntry)); + packet_.updateLength(); return *this; } FixedManifestEncoder &FixedManifestEncoder::clearImpl() { - manifest_ = utils::MemBuf::create(Packet::default_mtu - packet_.headerSize() - - signature_size_); + packet_.trimEnd(sizeof(ManifestHeader) + + manifest_header_->number_of_entries * sizeof(ManifestEntry)); + current_entry_ = 0; + *manifest_header_ = {0}; return *this; } FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl( - utils::CryptoHashType algorithm) { + auth::CryptoHashType algorithm) { manifest_header_->hash_algorithm = static_cast<uint8_t>(algorithm); return *this; } @@ -83,7 +84,7 @@ FixedManifestEncoder &FixedManifestEncoder::setBaseNameImpl( } FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl( - uint32_t suffix, const utils::CryptoHash &hash) { + uint32_t suffix, const auth::CryptoHash &hash) { auto _hash = hash.getDigest<std::uint8_t>(); addSuffixHashBytes(suffix, _hash.data(), _hash.length()); return *this; @@ -170,8 +171,8 @@ ManifestType FixedManifestDecoder::getManifestTypeImpl() const { return static_cast<ManifestType>(manifest_header_->manifest_type); } -utils::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const { - return static_cast<utils::CryptoHashType>(manifest_header_->hash_algorithm); +auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const { + return static_cast<auth::CryptoHashType>(manifest_header_->hash_algorithm); } NextSegmentCalculationStrategy diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h index 1d7cd7d32..56ad4ef6d 100644 --- a/libtransport/src/core/manifest_format_fixed.h +++ b/libtransport/src/core/manifest_format_fixed.h @@ -15,9 +15,8 @@ #pragma once -#include <hicn/transport/core/packet.h> - #include <core/manifest_format.h> +#include <hicn/transport/core/packet.h> #include <string> @@ -53,8 +52,10 @@ class Packet; struct Fixed { using Encoder = FixedManifestEncoder; using Decoder = FixedManifestDecoder; - using HashType = utils::CryptoHash; - using SuffixList = std::list<std::pair<std::uint32_t, std::uint8_t *>>; + using Hash = auth::CryptoHash; + using HashType = auth::CryptoHashType; + using Suffix = uint32_t; + using SuffixList = std::list<std::pair<uint32_t, uint8_t *>>; }; struct Flags { @@ -84,7 +85,8 @@ static const constexpr std::uint8_t manifest_version = 1; class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { public: - FixedManifestEncoder(Packet &packet, std::size_t signature_size = 0); + FixedManifestEncoder(Packet &packet, std::size_t signature_size = 0, + bool clear = true); ~FixedManifestEncoder(); @@ -94,7 +96,7 @@ class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { FixedManifestEncoder &setManifestTypeImpl(ManifestType manifest_type); - FixedManifestEncoder &setHashAlgorithmImpl(utils::CryptoHashType algorithm); + FixedManifestEncoder &setHashAlgorithmImpl(Fixed::HashType algorithm); FixedManifestEncoder &setNextSegmentCalculationStrategyImpl( NextSegmentCalculationStrategy strategy); @@ -102,7 +104,7 @@ class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { FixedManifestEncoder &setBaseNameImpl(const core::Name &base_name); FixedManifestEncoder &addSuffixAndHashImpl(uint32_t suffix, - const utils::CryptoHash &hash); + const Fixed::Hash &hash); FixedManifestEncoder &setIsFinalManifestImpl(bool is_last); @@ -125,7 +127,6 @@ class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { Packet &packet_; std::size_t max_size_; - std::unique_ptr<utils::MemBuf> manifest_; ManifestHeader *manifest_header_; ManifestEntry *manifest_entries_; std::size_t current_entry_; @@ -144,7 +145,7 @@ class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> { ManifestType getManifestTypeImpl() const; - utils::CryptoHashType getHashAlgorithmImpl() const; + Fixed::HashType getHashAlgorithmImpl() const; NextSegmentCalculationStrategy getNextSegmentCalculationStrategyImpl() const; diff --git a/libtransport/src/core/manifest_inline.h b/libtransport/src/core/manifest_inline.h index dedf82b45..fcb1d214f 100644 --- a/libtransport/src/core/manifest_inline.h +++ b/libtransport/src/core/manifest_inline.h @@ -30,8 +30,12 @@ class ManifestInline : public Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>> { using ManifestBase = Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>>; + + using Hash = typename FormatTraits::Hash; using HashType = typename FormatTraits::HashType; + using Suffix = typename FormatTraits::Suffix; using SuffixList = typename FormatTraits::SuffixList; + using HashEntry = std::pair<auth::CryptoHashType, std::vector<uint8_t>>; public: ManifestInline() : ManifestBase() {} @@ -44,7 +48,7 @@ class ManifestInline static TRANSPORT_ALWAYS_INLINE ManifestInline *createManifest( const core::Name &manifest_name, ManifestVersion version, - ManifestType type, utils::CryptoHashType algorithm, bool is_last, + ManifestType type, auth::CryptoHashType algorithm, bool is_last, const Name &base_name, NextSegmentCalculationStrategy strategy, std::size_t signature_size) { auto manifest = new ManifestInline(manifest_name, signature_size); @@ -84,7 +88,7 @@ class ManifestInline const Name &getBaseName() { return base_name_; } - ManifestInline &addSuffixHash(uint32_t suffix, const HashType &hash) { + ManifestInline &addSuffixHash(Suffix suffix, const Hash &hash) { ManifestBase::encoder_.addSuffixAndHash(suffix, hash); return *this; } @@ -104,12 +108,35 @@ class ManifestInline return next_segment_strategy_; } + // Convert several manifests into a single map from suffixes to packet hashes. + // All manifests must have been decoded beforehand. + static std::unordered_map<Suffix, HashEntry> getSuffixMap( + const std::vector<ManifestInline *> &manifests) { + std::unordered_map<Suffix, HashEntry> suffix_map; + + for (auto manifest_ptr : manifests) { + HashType hash_algorithm = manifest_ptr->getHashAlgorithm(); + SuffixList suffix_list = manifest_ptr->getSuffixList(); + + for (auto it = suffix_list.begin(); it != suffix_list.end(); ++it) { + std::vector<uint8_t> hash( + it->second, it->second + auth::hash_size_map[hash_algorithm]); + suffix_map[it->first] = {hash_algorithm, hash}; + } + } + + return suffix_map; + } + static std::unordered_map<Suffix, HashEntry> getSuffixMap( + ManifestInline *manifest) { + return getSuffixMap(std::vector<ManifestInline *>{manifest}); + } + private: core::Name base_name_; NextSegmentCalculationStrategy next_segment_strategy_; SuffixList suffix_hash_map_; }; -} // end namespace core - -} // end namespace transport
\ No newline at end of file +} // namespace core +} // namespace transport diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc index 811e93b87..795c8a697 100644 --- a/libtransport/src/core/name.cc +++ b/libtransport/src/core/name.cc @@ -13,14 +13,13 @@ * limitations under the License. */ +#include <core/manifest_format.h> #include <hicn/transport/core/name.h> #include <hicn/transport/errors/errors.h> #include <hicn/transport/errors/tokenizer_exception.h> #include <hicn/transport/utils/hash.h> #include <hicn/transport/utils/string_tokenizer.h> -#include <core/manifest_format.h> - namespace transport { namespace core { @@ -98,7 +97,12 @@ bool Name::operator!=(const Name &name) const { } Name::operator bool() const { - return bool(hicn_name_empty((hicn_name_t *)&name_)); + auto ret = isValid(); + return ret; +} + +bool Name::isValid() const { + return bool(!hicn_name_empty((hicn_name_t *)&name_)); } bool Name::equals(const Name &name, bool consider_segment) const { diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc index cd2c5aa69..6f237729a 100644 --- a/libtransport/src/core/packet.cc +++ b/libtransport/src/core/packet.cc @@ -31,59 +31,94 @@ namespace core { const core::Name Packet::base_name("0::0|0"); -Packet::Packet(Format format) - : packet_(utils::MemBuf::create(getHeaderSizeFromFormat(format, 256)) - .release()), - packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), - header_head_(packet_.get()), - payload_head_(nullptr), - format_(format) { - if (hicn_packet_init_header(format, packet_start_) < 0) { - throw errors::RuntimeException("Unexpected error initializing the packet."); - } - - packet_->append(getHeaderSizeFromFormat(format_)); -} - -Packet::Packet(MemBufPtr &&buffer) - : packet_(std::move(buffer)), - packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), - header_head_(packet_.get()), - payload_head_(nullptr), - format_(getFormatFromBuffer(packet_->writableData(), packet_->length())) { -} - -Packet::Packet(const uint8_t *buffer, std::size_t size) - : Packet(MemBufPtr(utils::MemBuf::copyBuffer(buffer, size).release())) {} +Packet::Packet(Format format, std::size_t additional_header_size) + : utils::MemBuf(utils::MemBuf(CREATE, 2048)), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(0), + format_(format), + payload_type_(PayloadType::UNSPECIFIED) { + setFormat(format_, additional_header_size); +} + +Packet::Packet(MemBuf &&buffer) + : utils::MemBuf(std::move(buffer)), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(0), + format_(getFormatFromBuffer(data(), length())), + payload_type_(PayloadType::UNSPECIFIED) {} + +Packet::Packet(CopyBufferOp, const uint8_t *buffer, std::size_t size) + : utils::MemBuf(COPY_BUFFER, buffer, size), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(0), + format_(getFormatFromBuffer(data(), length())), + payload_type_(PayloadType::UNSPECIFIED) {} + +Packet::Packet(WrapBufferOp, uint8_t *buffer, std::size_t length, + std::size_t size) + : utils::MemBuf(WRAP_BUFFER, buffer, length, size), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(0), + format_(getFormatFromBuffer(this->data(), this->length())), + payload_type_(PayloadType::UNSPECIFIED) {} + +Packet::Packet(CreateOp, uint8_t *buffer, std::size_t length, std::size_t size, + Format format, std::size_t additional_header_size) + : utils::MemBuf(WRAP_BUFFER, buffer, length, size), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(0), + format_(format), + payload_type_(PayloadType::UNSPECIFIED) { + clear(); + setFormat(format_, additional_header_size); +} + +Packet::Packet(const Packet &other) + : utils::MemBuf(other), + packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), + header_offset_(other.header_offset_), + format_(other.format_), + payload_type_(PayloadType::UNSPECIFIED) {} Packet::Packet(Packet &&other) - : packet_(std::move(other.packet_)), + : utils::MemBuf(std::move(other)), packet_start_(other.packet_start_), - header_head_(other.header_head_), - payload_head_(other.payload_head_), - format_(other.format_) { + header_offset_(other.header_offset_), + format_(other.format_), + payload_type_(PayloadType::UNSPECIFIED) { other.packet_start_ = nullptr; - other.header_head_ = nullptr; - other.payload_head_ = nullptr; other.format_ = HF_UNSPEC; + other.header_offset_ = 0; } Packet::~Packet() {} +Packet &Packet::operator=(const Packet &other) { + if (this != &other) { + *this = other; + packet_start_ = reinterpret_cast<hicn_header_t *>(writableData()); + } + + return *this; +} + std::size_t Packet::getHeaderSizeFromBuffer(Format format, const uint8_t *buffer) { size_t header_length; + if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer, &header_length) < 0) { throw errors::MalformedPacketException(); } + return header_length; } bool Packet::isInterest(const uint8_t *buffer) { bool is_interest = false; - if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece((const hicn_header_t *)buffer, + if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece(HF_INET6_TCP, + (const hicn_header_t *)buffer, &is_interest) < 0)) { throw errors::RuntimeException( "Impossible to retrieve ece flag from packet"); @@ -92,6 +127,25 @@ bool Packet::isInterest(const uint8_t *buffer) { return !is_interest; } +void Packet::setFormat(Packet::Format format, + std::size_t additional_header_size) { + format_ = format; + if (hicn_packet_init_header(format_, packet_start_) < 0) { + throw errors::RuntimeException("Unexpected error initializing the packet."); + } + + auto header_size = getHeaderSizeFromFormat(format_); + assert(header_size <= tailroom()); + append(header_size); + + assert(additional_header_size <= tailroom()); + append(additional_header_size); + + header_offset_ = length(); +} + +bool Packet::isInterest() { return Packet::isInterest(data()); } + std::size_t Packet::getPayloadSizeFromBuffer(Format format, const uint8_t *buffer) { std::size_t payload_length; @@ -105,67 +159,58 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format, } std::size_t Packet::payloadSize() const { - return getPayloadSizeFromBuffer(format_, - reinterpret_cast<uint8_t *>(packet_start_)); + std::size_t ret = 0; + + if (length()) { + ret = getPayloadSizeFromBuffer(format_, + reinterpret_cast<uint8_t *>(packet_start_)); + } + + return ret; } std::size_t Packet::headerSize() const { - return getHeaderSizeFromBuffer(format_, - reinterpret_cast<uint8_t *>(packet_start_)); + if (header_offset_ == 0 && length()) { + const_cast<Packet *>(this)->header_offset_ = getHeaderSizeFromBuffer( + format_, reinterpret_cast<uint8_t *>(packet_start_)); + } + + return header_offset_; } Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) { - separateHeaderPayload(); - - if (!payload_head_) { - payload_head_ = payload.get(); - } - - header_head_->prependChain(std::move(payload)); + prependChain(std::move(payload)); updateLength(); return *this; } Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) { - return appendPayload(utils::MemBuf::copyBuffer(buffer, length)); -} - -Packet &Packet::appendHeader(std::unique_ptr<utils::MemBuf> &&header) { - separateHeaderPayload(); + prependPayload(&buffer, &length); - if (!payload_head_) { - header_head_->prependChain(std::move(header)); - } else { - payload_head_->prependChain(std::move(header)); + if (length) { + appendPayload(utils::MemBuf::copyBuffer(buffer, length)); } updateLength(); return *this; } -Packet &Packet::appendHeader(const uint8_t *buffer, std::size_t length) { - return appendHeader(utils::MemBuf::copyBuffer(buffer, length)); -} - std::unique_ptr<utils::MemBuf> Packet::getPayload() const { - const_cast<Packet *>(this)->separateHeaderPayload(); - - // Hopefully the payload is contiguous - if (TRANSPORT_EXPECT_FALSE(payload_head_ && - payload_head_->next() != header_head_)) { - payload_head_->gather(payloadSize()); - } - - return payload_head_->cloneOne(); + auto ret = clone(); + ret->trimStart(headerSize()); + return ret; } Packet &Packet::updateLength(std::size_t length) { std::size_t total_length = length; - for (utils::MemBuf *current = payload_head_; - current && current != header_head_; current = current->next()) { + const utils::MemBuf *current = this; + do { total_length += current->length(); - } + current = current->next(); + } while (current != this); + + total_length -= headerSize(); if (hicn_packet_set_payload_length(format_, packet_start_, total_length) < 0) { @@ -176,13 +221,16 @@ Packet &Packet::updateLength(std::size_t length) { } PayloadType Packet::getPayloadType() const { - hicn_payload_type_t ret = HPT_UNSPEC; + if (payload_type_ == PayloadType::UNSPECIFIED) { + hicn_payload_type_t ret; + if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) { + throw errors::RuntimeException("Impossible to retrieve payload type."); + } - if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) { - throw errors::RuntimeException("Impossible to retrieve payload type."); + payload_type_ = (PayloadType)ret; } - return PayloadType(ret); + return payload_type_; } Packet &Packet::setPayloadType(PayloadType payload_type) { @@ -191,60 +239,76 @@ Packet &Packet::setPayloadType(PayloadType payload_type) { throw errors::RuntimeException("Error setting payload type of the packet."); } + payload_type_ = payload_type; + return *this; } Packet::Format Packet::getFormat() const { - if (format_ == HF_UNSPEC) { + /** + * We check packet start because after a movement it will result in a nullptr + */ + if (format_ == HF_UNSPEC && length()) { if (hicn_packet_get_format(packet_start_, &format_) < 0) { - throw errors::MalformedPacketException(); + TRANSPORT_LOGE("Unexpected packet format."); } } return format_; } -const std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() const { - return packet_; +std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() { + return std::static_pointer_cast<utils::MemBuf>(shared_from_this()); } void Packet::dump() const { - const_cast<Packet *>(this)->separateHeaderPayload(); - TRANSPORT_LOGI("HEADER -- Length: %zu", headerSize()); - hicn_packet_dump((uint8_t *)header_head_->data(), headerSize()); - TRANSPORT_LOGI("PAYLOAD -- Length: %zu", payloadSize()); - for (utils::MemBuf *current = payload_head_; - current && current != header_head_; current = current->next()) { + + const utils::MemBuf *current = this; + do { TRANSPORT_LOGI("MemBuf Length: %zu", current->length()); - hicn_packet_dump((uint8_t *)current->data(), current->length()); - } + dump((uint8_t *)current->data(), current->length()); + current = current->next(); + } while (current != this); +} + +void Packet::dump(uint8_t *buffer, std::size_t length) { + hicn_packet_dump(buffer, length); } void Packet::setSignatureSize(std::size_t size_bytes) { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes); if (ret < 0) { - throw errors::RuntimeException("Packet without Authentication Header."); + throw errors::RuntimeException("Error setting signature size."); } - - packet_->append(size_bytes); - updateLength(); } uint8_t *Packet::getSignature() const { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + uint8_t *signature; int ret = hicn_packet_get_signature(format_, packet_start_, &signature); if (ret < 0) { - throw errors::RuntimeException("Packet without Authentication Header."); + throw errors::RuntimeException("Error getting signature."); } return signature; } void Packet::setSignatureTimestamp(const uint64_t ×tamp) { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + int ret = hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp); @@ -254,6 +318,10 @@ void Packet::setSignatureTimestamp(const uint64_t ×tamp) { } uint64_t Packet::getSignatureTimestamp() const { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + uint64_t return_value; int ret = hicn_packet_get_signature_timestamp(format_, packet_start_, &return_value); @@ -266,7 +334,11 @@ uint64_t Packet::getSignatureTimestamp() const { } void Packet::setValidationAlgorithm( - const utils::CryptoSuite &validation_algorithm) { + const auth::CryptoSuite &validation_algorithm) { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + int ret = hicn_packet_set_validation_algorithm(format_, packet_start_, uint8_t(validation_algorithm)); @@ -275,7 +347,11 @@ void Packet::setValidationAlgorithm( } } -utils::CryptoSuite Packet::getValidationAlgorithm() const { +auth::CryptoSuite Packet::getValidationAlgorithm() const { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + uint8_t return_value; int ret = hicn_packet_get_validation_algorithm(format_, packet_start_, &return_value); @@ -284,10 +360,14 @@ utils::CryptoSuite Packet::getValidationAlgorithm() const { throw errors::RuntimeException("Error getting the validation algorithm."); } - return utils::CryptoSuite(return_value); + return auth::CryptoSuite(return_value); } -void Packet::setKeyId(const utils::KeyId &key_id) { +void Packet::setKeyId(const auth::KeyId &key_id) { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first); if (ret < 0) { @@ -295,8 +375,12 @@ void Packet::setKeyId(const utils::KeyId &key_id) { } } -utils::KeyId Packet::getKeyId() const { - utils::KeyId return_value; +auth::KeyId Packet::getKeyId() const { + if (!authenticationHeader()) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + + auth::KeyId return_value; int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first, &return_value.second); @@ -307,8 +391,8 @@ utils::KeyId Packet::getKeyId() const { return return_value; } -utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const { - utils::CryptoHasher hasher(static_cast<utils::CryptoHashType>(algorithm)); +auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const { + auth::CryptoHasher hasher(static_cast<auth::CryptoHashType>(algorithm)); hasher.init(); // Copy IP+TCP/ICMP header before zeroing them @@ -318,11 +402,11 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const { const_cast<Packet *>(this)->resetForHash(); - auto current = header_head_; + const utils::MemBuf *current = this; do { hasher.updateBytes(current->data(), current->length()); current = current->next(); - } while (current != header_head_); + } while (current != this); hicn_packet_copy_header(format_, &header_copy, packet_start_, false); @@ -330,15 +414,33 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const { } bool Packet::checkIntegrity() const { - if (hicn_packet_check_integrity(format_, packet_start_) < 0) { + uint16_t partial_csum = + csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0); + + for (const utils::MemBuf *current = next(); current != this; + current = current->next()) { + partial_csum = csum(current->data(), current->length(), ~partial_csum); + } + + if (hicn_packet_check_integrity_no_payload(format_, packet_start_, + partial_csum) < 0) { return false; } return true; } +void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) { + auto last = prev(); + auto to_copy = std::min(*size, last->tailroom()); + std::memcpy(last->writableTail(), *buffer, to_copy); + last->append(to_copy); + *size -= to_copy; + *buffer += to_copy; +} + Packet &Packet::setSyn() { - if (hicn_packet_set_syn(packet_start_) < 0) { + if (hicn_packet_set_syn(format_, packet_start_) < 0) { throw errors::RuntimeException("Error setting syn bit in the packet."); } @@ -346,7 +448,7 @@ Packet &Packet::setSyn() { } Packet &Packet::resetSyn() { - if (hicn_packet_reset_syn(packet_start_) < 0) { + if (hicn_packet_reset_syn(format_, packet_start_) < 0) { throw errors::RuntimeException("Error resetting syn bit in the packet."); } @@ -355,7 +457,7 @@ Packet &Packet::resetSyn() { bool Packet::testSyn() const { bool res = false; - if (hicn_packet_test_syn(packet_start_, &res) < 0) { + if (hicn_packet_test_syn(format_, packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing syn bit in the packet."); } @@ -363,7 +465,7 @@ bool Packet::testSyn() const { } Packet &Packet::setAck() { - if (hicn_packet_set_ack(packet_start_) < 0) { + if (hicn_packet_set_ack(format_, packet_start_) < 0) { throw errors::RuntimeException("Error setting ack bit in the packet."); } @@ -371,7 +473,7 @@ Packet &Packet::setAck() { } Packet &Packet::resetAck() { - if (hicn_packet_reset_ack(packet_start_) < 0) { + if (hicn_packet_reset_ack(format_, packet_start_) < 0) { throw errors::RuntimeException("Error resetting ack bit in the packet."); } @@ -380,7 +482,7 @@ Packet &Packet::resetAck() { bool Packet::testAck() const { bool res = false; - if (hicn_packet_test_ack(packet_start_, &res) < 0) { + if (hicn_packet_test_ack(format_, packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing ack bit in the packet."); } @@ -388,7 +490,7 @@ bool Packet::testAck() const { } Packet &Packet::setRst() { - if (hicn_packet_set_rst(packet_start_) < 0) { + if (hicn_packet_set_rst(format_, packet_start_) < 0) { throw errors::RuntimeException("Error setting rst bit in the packet."); } @@ -396,7 +498,7 @@ Packet &Packet::setRst() { } Packet &Packet::resetRst() { - if (hicn_packet_reset_rst(packet_start_) < 0) { + if (hicn_packet_reset_rst(format_, packet_start_) < 0) { throw errors::RuntimeException("Error resetting rst bit in the packet."); } @@ -405,7 +507,7 @@ Packet &Packet::resetRst() { bool Packet::testRst() const { bool res = false; - if (hicn_packet_test_rst(packet_start_, &res) < 0) { + if (hicn_packet_test_rst(format_, packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing rst bit in the packet."); } @@ -413,7 +515,7 @@ bool Packet::testRst() const { } Packet &Packet::setFin() { - if (hicn_packet_set_fin(packet_start_) < 0) { + if (hicn_packet_set_fin(format_, packet_start_) < 0) { throw errors::RuntimeException("Error setting fin bit in the packet."); } @@ -421,7 +523,7 @@ Packet &Packet::setFin() { } Packet &Packet::resetFin() { - if (hicn_packet_reset_fin(packet_start_) < 0) { + if (hicn_packet_reset_fin(format_, packet_start_) < 0) { throw errors::RuntimeException("Error resetting fin bit in the packet."); } @@ -430,7 +532,7 @@ Packet &Packet::resetFin() { bool Packet::testFin() const { bool res = false; - if (hicn_packet_test_fin(packet_start_, &res) < 0) { + if (hicn_packet_test_fin(format_, packet_start_, &res) < 0) { throw errors::RuntimeException("Error testing fin bit in the packet."); } @@ -447,24 +549,29 @@ Packet &Packet::resetFlags() { } std::string Packet::printFlags() const { - std::string flags = ""; + std::string flags; + if (testSyn()) { flags += "S"; } + if (testAck()) { flags += "A"; } + if (testRst()) { flags += "R"; } + if (testFin()) { flags += "F"; } + return flags; } Packet &Packet::setSrcPort(uint16_t srcPort) { - if (hicn_packet_set_src_port(packet_start_, srcPort) < 0) { + if (hicn_packet_set_src_port(format_, packet_start_, srcPort) < 0) { throw errors::RuntimeException("Error setting source port in the packet."); } @@ -472,7 +579,7 @@ Packet &Packet::setSrcPort(uint16_t srcPort) { } Packet &Packet::setDstPort(uint16_t dstPort) { - if (hicn_packet_set_dst_port(packet_start_, dstPort) < 0) { + if (hicn_packet_set_dst_port(format_, packet_start_, dstPort) < 0) { throw errors::RuntimeException( "Error setting destination port in the packet."); } @@ -483,7 +590,7 @@ Packet &Packet::setDstPort(uint16_t dstPort) { uint16_t Packet::getSrcPort() const { uint16_t port = 0; - if (hicn_packet_get_src_port(packet_start_, &port) < 0) { + if (hicn_packet_get_src_port(format_, packet_start_, &port) < 0) { throw errors::RuntimeException("Error reading source port in the packet."); } @@ -493,7 +600,7 @@ uint16_t Packet::getSrcPort() const { uint16_t Packet::getDstPort() const { uint16_t port = 0; - if (hicn_packet_get_dst_port(packet_start_, &port) < 0) { + if (hicn_packet_get_dst_port(format_, packet_start_, &port) < 0) { throw errors::RuntimeException( "Error reading destination port in the packet."); } @@ -518,37 +625,6 @@ uint8_t Packet::getTTL() const { return hops; } -void Packet::separateHeaderPayload() { - if (payload_head_) { - return; - } - - int signature_size = 0; - if (_is_ah(format_)) { - signature_size = (uint32_t)getSignatureSize(); - } - - auto header_size = getHeaderSizeFromFormat(format_, signature_size); - auto payload_length = packet_->length() - header_size; - - packet_->trimEnd(packet_->length()); - - auto payload = packet_->cloneOne(); - payload_head_ = payload.get(); - payload_head_->advance(header_size); - payload_head_->append(payload_length); - packet_->prependChain(std::move(payload)); - packet_->append(header_size); -} - -void Packet::resetPayload() { - if (packet_->isChained()) { - packet_->separateChain(packet_->next(), packet_->prev()); - payload_head_ = nullptr; - updateLength(); - } -} - } // end namespace core } // end namespace transport diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h index aeff78ea2..ca6411ddf 100644 --- a/libtransport/src/core/pending_interest.h +++ b/libtransport/src/core/pending_interest.h @@ -21,7 +21,6 @@ #include <hicn/transport/core/name.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> - #include <utils/deadline_timer.h> #include <asio/steady_timer.hpp> @@ -34,24 +33,21 @@ class HicnForwarderInterface; class VPPForwarderInterface; class RawSocketInterface; -template <typename ForwarderInt> class Portal; using OnContentObjectCallback = interface::Portal::OnContentObjectCallback; using OnInterestTimeoutCallback = interface::Portal::OnInterestTimeoutCallback; class PendingInterest { - friend class Portal<HicnForwarderInterface>; - friend class Portal<VPPForwarderInterface>; - friend class Portal<RawSocketInterface>; + friend class Portal; public: using Ptr = utils::ObjectPool<PendingInterest>::Ptr; - PendingInterest() - : interest_(nullptr, nullptr), - timer_(), - on_content_object_callback_(), - on_interest_timeout_callback_() {} + // PendingInterest() + // : interest_(nullptr, nullptr), + // timer_(), + // on_content_object_callback_(), + // on_interest_timeout_callback_() {} PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer) diff --git a/libtransport/src/core/portal.cc b/libtransport/src/core/portal.cc new file mode 100644 index 000000000..d1d26c5b7 --- /dev/null +++ b/libtransport/src/core/portal.cc @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/errors.h> +#include <core/global_configuration.h> +#include <core/portal.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/portability/platform.h> +#include <hicn/transport/utils/file.h> + +#include <libconfig.h++> + +using namespace transport::interface::global_config; + +namespace transport { +namespace core { + +#ifdef ANDROID +static const constexpr char default_module[] = ""; +#elif defined(MACINTOSH) +static const constexpr char default_module[] = "hicnlight_module.dylib"; +#elif defined(LINUX) +static const constexpr char default_module[] = "hicnlight_module.so"; +#endif + +IoModuleConfiguration Portal::conf_; +std::string Portal::io_module_path_ = defaultIoModule(); + +std::string Portal::defaultIoModule() { + using namespace std::placeholders; + GlobalConfiguration::getInstance().registerConfigurationParser( + io_module_section, + std::bind(&Portal::parseIoModuleConfiguration, _1, _2)); + GlobalConfiguration::getInstance().registerConfigurationGetter( + io_module_section, std::bind(&Portal::getModuleConfiguration, _1, _2)); + GlobalConfiguration::getInstance().registerConfigurationSetter( + io_module_section, std::bind(&Portal::setModuleConfiguration, _1, _2)); + + // return default + conf_.name = default_module; + return default_module; +} + +void Portal::getModuleConfiguration(ConfigurationObject& object, + std::error_code& ec) { + assert(object.getKey() == io_module_section); + + auto conf = dynamic_cast<const IoModuleConfiguration&>(object); + conf = conf_; + ec = std::error_code(); +} + +std::string getIoModulePath(const std::string& name, + const std::vector<std::string>& paths, + std::error_code& ec) { +#ifdef LINUX + std::string extension = ".so"; +#elif defined(MACINTOSH) + std::string extension = ".dylib"; +#else +#error "Platform not supported."; +#endif + + std::string complete_path = name; + + if (name.empty()) { + ec = make_error_code(core_error::configuration_parse_failed); + return ""; + } + + complete_path += extension; + + for (auto& p : paths) { + if (p.at(0) != '/') { + TRANSPORT_LOGW("Path %s is not an absolute path. Ignoring it.", + p.c_str()); + continue; + } + + if (utils::File::exists(p + "/" + complete_path)) { + complete_path = p + "/" + complete_path; + break; + } + } + + return complete_path; +} + +void Portal::setModuleConfiguration(const ConfigurationObject& object, + std::error_code& ec) { + assert(object.getKey() == io_module_section); + + const IoModuleConfiguration& conf = + dynamic_cast<const IoModuleConfiguration&>(object); + auto path = getIoModulePath(conf.name, conf.search_path, ec); + if (!ec) { + conf_ = conf; + io_module_path_ = path; + } +} + +void Portal::parseIoModuleConfiguration(const libconfig::Setting& io_config, + std::error_code& ec) { + using namespace libconfig; + // path property: the list of paths where to look for the module. + std::vector<std::string> paths; + std::string name; + + if (io_config.exists("path")) { + // get path where looking for modules + const Setting& path_list = io_config.lookup("path"); + auto count = path_list.getLength(); + + for (int i = 0; i < count; i++) { + paths.emplace_back(path_list[i].c_str()); + } + } + + if (io_config.exists("name")) { + io_config.lookupValue("name", name); + } else { + ec = make_error_code(core_error::configuration_parse_failed); + return; + } + + auto path = getIoModulePath(name, paths, ec); + if (!ec) { + conf_.name = name; + conf_.search_path = paths; + io_module_path_ = path; + } +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index b63eab3af..59254cf7b 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -15,24 +15,20 @@ #pragma once -#include <core/forwarder_interface.h> #include <core/pending_interest.h> -#include <core/udp_socket_connector.h> #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> +#include <hicn/transport/core/io_module.h> #include <hicn/transport/core/name.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/errors/errors.h> +#include <hicn/transport/interfaces/global_conf_interface.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/utils/fixed_block_allocator.h> #include <hicn/transport/utils/log.h> -#ifdef __vpp__ -#include <core/memif_connector.h> -#endif - #include <asio.hpp> #include <asio/steady_timer.hpp> #include <future> @@ -40,17 +36,19 @@ #include <queue> #include <unordered_map> +namespace libconfig { +class Setting; +} + namespace transport { namespace core { namespace portal_details { -static constexpr uint32_t pool_size = 2048; +static constexpr uint32_t pit_size = 1024; class HandlerMemory { #ifdef __vpp__ - static constexpr std::size_t memory_size = 1024 * 1024; - public: HandlerMemory() {} @@ -58,12 +56,11 @@ class HandlerMemory { HandlerMemory &operator=(const HandlerMemory &) = delete; TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return utils::FixedBlockAllocator<128, 4096>::getInstance() - ->allocateBlock(); + return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock( + utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } #else @@ -159,33 +156,16 @@ class Pool { public: Pool(asio::io_service &io_service) : io_service_(io_service) { increasePendingInterestPool(); - increaseInterestPool(); - increaseContentObjectPool(); } TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { // Create pool of pending interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { + for (uint32_t i = 0; i < pit_size; i++) { pending_interests_pool_.add(new PendingInterest( Interest::Ptr(nullptr), std::make_unique<asio::steady_timer>(io_service_))); } } - - TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { - // Create pool of interests to reuse - for (uint32_t i = 0; i < pool_size; i++) { - interest_pool_.add(new Interest()); - } - } - - TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { - // Create pool of content object to reuse - for (uint32_t i = 0; i < pool_size; i++) { - content_object_pool_.add(new ContentObject()); - } - } - PendingInterest::Ptr getPendingInterest() { auto res = pending_interests_pool_.get(); while (TRANSPORT_EXPECT_FALSE(!res.first)) { @@ -196,35 +176,15 @@ class Pool { return std::move(res.second); } - TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { - auto res = content_object_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseContentObjectPool(); - res = content_object_pool_.get(); - } - - return std::move(res.second); - } - - TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { - auto res = interest_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increaseInterestPool(); - res = interest_pool_.get(); - } - - return std::move(res.second); - } - private: utils::ObjectPool<PendingInterest> pending_interests_pool_; - utils::ObjectPool<ContentObject> content_object_pool_; - utils::ObjectPool<Interest> interest_pool_; asio::io_service &io_service_; }; } // namespace portal_details +class PortalConfiguration; + using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest::Ptr>; @@ -250,32 +210,32 @@ using interface::BindConfig; * The portal class is not thread safe, appropriate locking is required by the * users of this class. */ -template <typename ForwarderInt> -class Portal { - static_assert( - std::is_base_of<ForwarderInterface<ForwarderInt, - typename ForwarderInt::ConnectorType>, - ForwarderInt>::value, - "ForwarderInt must inherit from ForwarderInterface!"); +class Portal { public: using ConsumerCallback = interface::Portal::ConsumerCallback; using ProducerCallback = interface::Portal::ProducerCallback; + friend class PortalConfiguration; + Portal() : Portal(internal_io_service_) {} Portal(asio::io_service &io_service) - : io_service_(io_service), + : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }), + io_service_(io_service), packet_pool_(io_service), app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), - connector_(std::bind(&Portal::processIncomingMessages, this, - std::placeholders::_1), - std::bind(&Portal::setLocalRoutes, this), io_service_, - app_name_), - forwarder_interface_(connector_) {} - + is_consumer_(false) { + /** + * This workaroung allows to initialize memory for packet buffers *before* + * any static variables that may be initialized in the io_modules. In this + * way static variables in modules will be destroyed before the packet + * memory. + */ + PacketManager<>::getInstance(); + } /** * Set the consumer callback. * @@ -304,7 +264,7 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void setOutputInterface( const std::string &output_interface) { - forwarder_interface_.setOutputInterface(output_interface); + io_module_->setOutputInterface(output_interface); } /** @@ -314,8 +274,19 @@ class Portal { * is a consumer or a producer. */ TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { - pending_interest_hash_table_.reserve(portal_details::pool_size); - forwarder_interface_.connect(is_consumer); + if (!io_module_) { + pending_interest_hash_table_.reserve(portal_details::pit_size); + io_module_.reset(IoModule::load(io_module_path_.c_str())); + assert(io_module_); + + io_module_->init(std::bind(&Portal::processIncomingMessages, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3), + std::bind(&Portal::setLocalRoutes, this), io_service_, + app_name_); + io_module_->connect(is_consumer); + is_consumer_ = is_consumer; + } } /** @@ -324,13 +295,19 @@ class Portal { ~Portal() { killConnection(); } /** + * Compute name hash + */ + TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) { + return name.getHash32() + name.getSuffix(); + } + + /** * Check if there is already a pending interest for a given name. * * @param name - The interest name. */ TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { - auto it = - pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; } @@ -357,31 +334,46 @@ class Portal { OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - uint32_t hash = - interest->getName().getHash32() + interest->getName().getSuffix(); // Send it - forwarder_interface_.send(*interest); - - auto pending_interest = packet_pool_.getPendingInterest(); - pending_interest->setInterest(std::move(interest)); - pending_interest->setOnContentObjectCallback( - std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( - std::move(on_interest_timeout_callback)); - pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( - async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, - this, std::placeholders::_1, hash))); + interest->encodeSuffixes(); + io_module_->send(*interest); + + uint32_t initial_hash = interest->getName().getHash32(); + auto hash = initial_hash + interest->getName().getSuffix(); + uint32_t *suffix = interest->firstSuffix(); + auto n_suffixes = interest->numberOfSuffixes(); + uint32_t counter = 0; + // Set timers + do { + auto pending_interest = packet_pool_.getPendingInterest(); + pending_interest->setInterest(std::move(interest)); + pending_interest->setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest->setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + + pending_interest->startCountdown( + portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal::timerHandler, this, + std::placeholders::_1, hash))); + + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + it->second->cancelTimer(); - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); + // Get reference to interest packet in order to have it destroyed. + auto _int = it->second->getInterest(); + it->second = std::move(pending_interest); + } else { + pending_interest_hash_table_[hash] = std::move(pending_interest); + } - // Get reference to interest packet in order to have it destroyed. - auto _int = it->second->getInterest(); - it->second = std::move(pending_interest); - } else { - pending_interest_hash_table_[hash] = std::move(pending_interest); - } + if (suffix) { + hash = initial_hash + *suffix; + suffix++; + } + + } while (counter++ < n_suffixes); } /** @@ -423,9 +415,9 @@ class Portal { * @param config - The configuration for the local forwarder binding. */ TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - forwarder_interface_.setContentStoreSize(config.csReserved()); + assert(io_module_); + io_module_->setContentStoreSize(config.csReserved()); served_namespaces_.push_back(config.prefix()); - setLocalRoutes(); } @@ -460,7 +452,7 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void sendContentObject( ContentObject &content_object) { - forwarder_interface_.send(content_object); + io_module_->send(content_object); } /** @@ -482,7 +474,7 @@ class Portal { * Disconnect the transport from the local forwarder. */ TRANSPORT_ALWAYS_INLINE void killConnection() { - forwarder_interface_.closeConnection(); + io_module_->closeConnection(); } /** @@ -497,6 +489,17 @@ class Portal { } /** + * Remove one pending interest. + */ + TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) { + if (!io_service_.stopped()) { + io_service_.dispatch(std::bind(&Portal::doClearOne, this, name)); + } else { + doClearOne(name); + } + } + + /** * Get a reference to the io_service object. */ TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { @@ -508,8 +511,8 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { served_namespaces_.push_back(prefix); - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); + if (io_module_->isConnected()) { + io_module_->registerRoute(prefix); } } @@ -530,36 +533,49 @@ class Portal { } /** + * Remove one pending interest. + */ + TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) { + auto it = pending_interest_hash_table_.find(getHash(name)); + + if (it != pending_interest_hash_table_.end()) { + it->second->cancelTimer(); + + // Get interest packet from pending interest and do nothing with it. It + // will get destroyed as it goes out of scope. + auto _int = it->second->getInterest(); + + pending_interest_hash_table_.erase(it); + } + } + + /** * Callback called by the underlying connector upon reception of a packet from * the local forwarder. * * @param packet_buffer - The bytes of the packet. */ TRANSPORT_ALWAYS_INLINE void processIncomingMessages( - Packet::MemBufPtr &&packet_buffer) { + Connector *c, utils::MemBuf &buffer, const std::error_code &ec) { bool is_stopped = io_service_.stopped(); if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } - if (TRANSPORT_EXPECT_FALSE( - ForwarderInt::isControlMessage(packet_buffer->data()))) { - processControlMessage(std::move(packet_buffer)); + if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) { + processControlMessage(buffer); return; } - Packet::Format format = Packet::getFormatFromBuffer( - packet_buffer->data(), packet_buffer->length()); + // The buffer is a base class for an interest or a content object + Packet &packet_buffer = static_cast<Packet &>(buffer); + auto format = packet_buffer.getFormat(); if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (!Packet::isInterest(packet_buffer->data())) { - auto content_object = packet_pool_.getContentObject(); - content_object->replace(std::move(packet_buffer)); - processContentObject(std::move(content_object)); + if (is_consumer_) { + processContentObject(static_cast<ContentObject &>(packet_buffer)); } else { - auto interest = packet_pool_.getInterest(); - interest->replace(std::move(packet_buffer)); - processInterest(std::move(interest)); + processInterest(static_cast<Interest &>(packet_buffer)); } } else { TRANSPORT_LOGE("Received not supported packet. Ignoring it."); @@ -573,16 +589,16 @@ class Portal { */ TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { for (auto &prefix : served_namespaces_) { - if (connector_.isConnected()) { - forwarder_interface_.registerRoute(prefix); + if (io_module_->isConnected()) { + io_module_->registerRoute(prefix); } } } - TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) { + TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) { // Interest for a producer if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { - producer_callback_->onInterest(std::move(interest)); + producer_callback_->onInterest(interest); } } @@ -595,24 +611,27 @@ class Portal { * @param content_object - The data packet */ TRANSPORT_ALWAYS_INLINE void processContentObject( - ContentObject::Ptr &&content_object) { - uint32_t hash = content_object->getName().getHash32() + - content_object->getName().getSuffix(); + ContentObject &content_object) { + TRANSPORT_LOGD("processContentObject %s", + content_object.getName().toString().c_str()); + uint32_t hash = getHash(content_object.getName()); auto it = pending_interest_hash_table_.find(hash); if (it != pending_interest_hash_table_.end()) { + TRANSPORT_LOGD("Found pending interest."); + PendingInterest::Ptr interest_ptr = std::move(it->second); pending_interest_hash_table_.erase(it); interest_ptr->cancelTimer(); auto _int = interest_ptr->getInterest(); if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_(std::move(_int), - std::move(content_object)); + interest_ptr->on_content_object_callback_(*_int, content_object); } else if (consumer_callback_) { - consumer_callback_->onContentObject(std::move(_int), - std::move(content_object)); + consumer_callback_->onContentObject(*_int, content_object); } + } else { + TRANSPORT_LOGD("No interest pending for received content object."); } } @@ -622,12 +641,13 @@ class Portal { * them. */ TRANSPORT_ALWAYS_INLINE void processControlMessage( - Packet::MemBufPtr &&packet_buffer) { - forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); + utils::MemBuf &packet_buffer) { + io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; + std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_; asio::io_service &io_service_; asio::io_service internal_io_service_; @@ -641,8 +661,19 @@ class Portal { ConsumerCallback *consumer_callback_; ProducerCallback *producer_callback_; - typename ForwarderInt::ConnectorType connector_; - ForwarderInt forwarder_interface_; + bool is_consumer_; + + private: + static std::string defaultIoModule(); + static void parseIoModuleConfiguration(const libconfig::Setting &io_config, + std::error_code &ec); + static void getModuleConfiguration( + interface::global_config::ConfigurationObject &conf, std::error_code &ec); + static void setModuleConfiguration( + const interface::global_config::ConfigurationObject &conf, + std::error_code &ec); + static interface::global_config::IoModuleConfiguration conf_; + static std::string io_module_path_; }; } // namespace core diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc index eea4aeb8b..1e2b2ed9d 100644 --- a/libtransport/src/core/prefix.cc +++ b/libtransport/src/core/prefix.cc @@ -25,12 +25,12 @@ extern "C" { #include <hicn/transport/portability/win_portability.h> #endif +#include <openssl/rand.h> + #include <cstring> #include <memory> #include <random> -#include <openssl/rand.h> - namespace transport { namespace core { @@ -99,7 +99,7 @@ void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length, ip_prefix_.family = family; } -std::unique_ptr<Sockaddr> Prefix::toSockaddr() { +std::unique_ptr<Sockaddr> Prefix::toSockaddr() const { Sockaddr *ret = nullptr; switch (ip_prefix_.family) { @@ -120,14 +120,14 @@ std::unique_ptr<Sockaddr> Prefix::toSockaddr() { return std::unique_ptr<Sockaddr>(ret); } -uint16_t Prefix::getPrefixLength() { return ip_prefix_.len; } +uint16_t Prefix::getPrefixLength() const { return ip_prefix_.len; } Prefix &Prefix::setPrefixLength(uint16_t prefix_length) { ip_prefix_.len = prefix_length; return *this; } -int Prefix::getAddressFamily() { return ip_prefix_.family; } +int Prefix::getAddressFamily() const { return ip_prefix_.family; } Prefix &Prefix::setAddressFamily(int address_family) { ip_prefix_.family = address_family; @@ -226,7 +226,7 @@ Name Prefix::getRandomName() const { ip_prefix_.len; size_t size = (size_t)ceil((float)addr_len / 8.0); - uint8_t *buffer = (uint8_t *) malloc(sizeof(uint8_t) * size); + uint8_t *buffer = (uint8_t *)malloc(sizeof(uint8_t) * size); RAND_bytes(buffer, size); @@ -332,7 +332,7 @@ bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length, return true; } -ip_prefix_t &Prefix::toIpPrefixStruct() { return ip_prefix_; } +const ip_prefix_t &Prefix::toIpPrefixStruct() const { return ip_prefix_; } } // namespace core diff --git a/libtransport/src/core/rs.cc b/libtransport/src/core/rs.cc new file mode 100644 index 000000000..44b5852e5 --- /dev/null +++ b/libtransport/src/core/rs.cc @@ -0,0 +1,365 @@ + +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/fec.h> +#include <core/rs.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/utils/log.h> + +#include <cassert> + +namespace transport { +namespace core { +namespace fec { + +BlockCode::BlockCode(uint32_t k, uint32_t n, struct fec_parms *code) + : Packets(), + k_(k), + n_(n), + code_(code), + max_buffer_size_(0), + current_block_size_(0), + to_decode_(false) { + sorted_index_.reserve(n); +} + +bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i) { + // Get index + to_decode_ = true; + TRANSPORT_LOGD("adding symbol of size %zu", packet->length()); + return addSymbol(packet, i, packet->length() - sizeof(fec_header)); +} + +bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i) { + return addSymbol(packet, i, packet->length()); +} + +bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, + std::size_t size) { + if (size > max_buffer_size_) { + max_buffer_size_ = size; + } + + operator[](current_block_size_++) = std::make_pair(i, packet); + + if (current_block_size_ >= k_) { + if (to_decode_) { + decode(); + } else { + encode(); + } + + clear(); + return false; + } + + return true; +} + +void BlockCode::encode() { + gf *data[n_]; + std::uint16_t old_values[k_]; + uint32_t base = operator[](0).first; + + // Set packet length in first 2 bytes + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + + TRANSPORT_LOGD("Current buffer size: %zu", packet->length()); + + auto ret = packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); + if (TRANSPORT_EXPECT_FALSE(ret == false)) { + throw errors::RuntimeException( + "Provided packet is not suitable to be used as FEC source packet. " + "Aborting."); + } + + // Buffers should hold 2 bytes before the starting pointer, in order to be + // able to set the length for the encoding operation + packet->prepend(2); + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + + old_values[i] = *length; + *length = htons(packet->length() - LEN_SIZE_BYTES); + + data[i] = packet->writableData(); + } + + // Finish to fill source block with the buffers to hold the repair symbols + for (uint32_t i = k_; i < n_; i++) { + // For the moment we get a packet from the pool here.. later we'll need to + // require a packet from the caller with a callback. + auto packet = PacketManager<>::getInstance().getMemBuf(); + packet->append(max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES); + fec_header *fh = reinterpret_cast<fec_header *>(packet->writableData()); + + fh->setSeqNumberBase(base); + fh->setNFecSymbols(n_ - k_); + fh->setEncodedSymbolId(i); + fh->setSourceBlockLen(n_); + + packet->trimStart(sizeof(fec_header)); + + data[i] = packet->writableData(); + operator[](i) = std::make_pair(i, std::move(packet)); + } + + // Generate repair symbols and put them in corresponding buffers + TRANSPORT_LOGD("Calling encode with max_buffer_size_ = %zu", + max_buffer_size_); + for (uint32_t i = k_; i < n_; i++) { + fec_encode(code_, data, data[i], i, max_buffer_size_ + LEN_SIZE_BYTES); + } + + // Restore original content of buffer space used to store the length + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + *length = old_values[i]; + packet->trimStart(2); + } + + // Re-include header in repair packets + for (uint32_t i = k_; i < n_; i++) { + auto &packet = operator[](i).second; + TRANSPORT_LOGD("Produced repair symbol of size = %zu", packet->length()); + packet->prepend(sizeof(fec_header)); + } +} + +void BlockCode::decode() { + gf *data[k_]; + uint32_t index[k_]; + + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + index[i] = operator[](i).first; + sorted_index_[i] = index[i]; + + if (index[i] < k_) { + TRANSPORT_LOGD("DECODE SOURCE - index %u - Current buffer size: %zu", + index[i], packet->length()); + // This is a source packet. We need to prepend the length and fill + // additional space to 0 + + // Buffers should hold 2 bytes before the starting pointer, in order to be + // able to set the length for the encoding operation + packet->prepend(LEN_SIZE_BYTES); + packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + + *length = htons(packet->length() - LEN_SIZE_BYTES); + } else { + TRANSPORT_LOGD("DECODE SYMBOL - index %u - Current buffer size: %zu", + index[i], packet->length()); + packet->trimStart(sizeof(fec_header)); + } + + data[i] = packet->writableData(); + } + + // We decode the source block + TRANSPORT_LOGD("Calling decode with max_buffer_size_ = %zu", + max_buffer_size_); + fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); + + // Find the index in the block for recovered packets + for (uint32_t i = 0; i < k_; i++) { + if (index[i] != i) { + for (uint32_t j = 0; j < k_; j++) + if (sorted_index_[j] == uint32_t(index[i])) { + sorted_index_[j] = i; + } + } + } + + // Reorder block by index with in-place sorting + for (uint32_t i = 0; i < k_; i++) { + for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { + std::swap(sorted_index_[j], sorted_index_[i]); + std::swap(operator[](j), operator[](i)); + } + } + + // Adjust length according to the one written in the source packet + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + packet->trimStart(2); + packet->setLength(ntohs(*length)); + } +} + +void BlockCode::clear() { + current_block_size_ = 0; + max_buffer_size_ = 0; + sorted_index_.clear(); + to_decode_ = false; +} + +void rs::MatrixDeleter::operator()(struct fec_parms *params) { + fec_free(params); +} + +rs::Codes rs::createCodes() { + Codes ret; + + ret.emplace(std::make_pair(1, 3), Matrix(fec_new(1, 3), MatrixDeleter())); + ret.emplace(std::make_pair(6, 10), Matrix(fec_new(6, 10), MatrixDeleter())); + ret.emplace(std::make_pair(8, 32), Matrix(fec_new(8, 32), MatrixDeleter())); + ret.emplace(std::make_pair(10, 30), Matrix(fec_new(10, 30), MatrixDeleter())); + ret.emplace(std::make_pair(16, 24), Matrix(fec_new(16, 24), MatrixDeleter())); + ret.emplace(std::make_pair(10, 40), Matrix(fec_new(10, 40), MatrixDeleter())); + ret.emplace(std::make_pair(10, 60), Matrix(fec_new(10, 60), MatrixDeleter())); + ret.emplace(std::make_pair(10, 90), Matrix(fec_new(10, 90), MatrixDeleter())); + + return ret; +} + +rs::Codes rs::codes_ = createCodes(); + +rs::rs(uint32_t k, uint32_t n) : k_(k), n_(n) {} + +void rs::setFECCallback(const PacketsReady &callback) { + fec_callback_ = callback; +} + +encoder::encoder(uint32_t k, uint32_t n) + : rs(k, n), + current_code_(codes_[std::make_pair(k, n)].get()), + source_block_(k_, n_, current_code_) {} + +void encoder::consume(const fec::buffer &packet, uint32_t index) { + if (!source_block_.addSourceSymbol(packet, index)) { + std::vector<buffer> repair_packets; + for (uint32_t i = k_; i < n_; i++) { + repair_packets.emplace_back(std::move(source_block_[i].second)); + } + fec_callback_(repair_packets); + } +} + +decoder::decoder(uint32_t k, uint32_t n) : rs(k, n) {} + +void decoder::recoverPackets(SourceBlocks::iterator &src_block_it) { + TRANSPORT_LOGD("recoverPackets for %u", k_); + auto &src_block = src_block_it->second; + std::vector<buffer> source_packets(k_); + for (uint32_t i = 0; i < src_block.getK(); i++) { + source_packets[i] = std::move(src_block[i].second); + } + + fec_callback_(source_packets); + processed_source_blocks_.emplace(src_block_it->first); + + auto it = parked_packets_.find(src_block_it->first); + if (it != parked_packets_.end()) { + parked_packets_.erase(it); + } + + src_blocks_.erase(src_block_it); +} + +void decoder::consume(const fec::buffer &packet, uint32_t index) { + // Normalize index + auto i = index % n_; + + // Get base + uint32_t base = index - i; + + TRANSPORT_LOGD( + "Decoder consume called for source symbol. BASE = %u, index = %u and i = " + "%u", + base, index, i); + + // check if a source block already exist for this symbol. If it does not + // exist, we lazily park this packet until we receive a repair symbol for the + // same block. This is done for 2 reason: + // 1) If we receive all the source packets of a block, we do not need to + // recover anything. + // 2) Sender may change n and k at any moment, so we construct the source + // block based on the (n, k) values written in the fec header. This is + // actually not used right now, since we use fixed value of n and k passed + // at construction time, but it paves the ground for a more dynamic + // protocol that may come in the future. + auto it = src_blocks_.find(base); + if (it != src_blocks_.end()) { + auto ret = it->second.addSourceSymbol(packet, i); + if (!ret) { + recoverPackets(it); + } + } else { + TRANSPORT_LOGD("Adding to parked source packets"); + auto ret = parked_packets_.emplace( + base, std::vector<std::pair<buffer, uint32_t> >()); + ret.first->second.emplace_back(packet, i); + } +} + +void decoder::consume(const fec::buffer &packet) { + // Repair symbol! Get index and base source block. + fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + auto i = h->getEncodedSymbolId(); + auto base = h->getSeqNumberBase(); + auto n = h->getSourceBlockLen(); + auto k = n - h->getNFecSymbols(); + + TRANSPORT_LOGD( + "Decoder consume called for repair symbol. BASE = %u, index = %u and i = " + "%u", + base, base + i, i); + + // check if a source block already exist for this symbol + auto it = src_blocks_.find(base); + if (it == src_blocks_.end()) { + // Create new source block + auto code_it = codes_.find(std::make_pair(k, n)); + if (code_it == codes_.end()) { + TRANSPORT_LOGE("Code for k = %u and n = %u does not exist.", k_, n_); + return; + } + + auto emplace_result = + src_blocks_.emplace(base, BlockCode(k, n, code_it->second.get())); + it = emplace_result.first; + + // Check in the parked packets and insert any packet that is part of this + // source block + + auto it2 = parked_packets_.find(base); + if (it2 != parked_packets_.end()) { + for (auto &packet_index : it2->second) { + auto ret = + it->second.addSourceSymbol(packet_index.first, packet_index.second); + if (!ret) { + recoverPackets(it); + // Finish to delete packets in same source block that were + // eventually not used + return; + } + } + } + } + + auto ret = it->second.addRepairSymbol(packet, i); + if (!ret) { + recoverPackets(it); + } +} + +} // namespace fec +} // namespace core +} // namespace transport diff --git a/libtransport/src/core/rs.h b/libtransport/src/core/rs.h new file mode 100644 index 000000000..d630bd233 --- /dev/null +++ b/libtransport/src/core/rs.h @@ -0,0 +1,338 @@ + +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <arpa/inet.h> +#include <hicn/transport/utils/membuf.h> +#include <protocols/fec_base.h> + +#include <array> +#include <cstdint> +#include <map> +#include <unordered_set> +#include <vector> + +namespace transport { +namespace core { + +namespace fec { + +static const constexpr uint16_t MAX_SOURCE_BLOCK_SIZE = 128; + +using buffer = typename utils::MemBuf::Ptr; +/** + * We use a std::array in place of std::vector to avoid to allocate a new vector + * in the heap every time we build a new source block, which would be bad if + * the decoder has to allocate several source blocks for many concurrent bases. + * std::array allows to be constructed in place, saving the allocation at the + * price os knowing in advance its size. + */ +using Packets = std::array<std::pair<uint32_t, buffer>, MAX_SOURCE_BLOCK_SIZE>; + +/** + * FEC Header, prepended to symbol packets. + */ +struct fec_header { + /** + * The base source packet seq_number this FES symbol refers to + */ + uint32_t seq_number; + + /** + * The index of the symbol inside the source block, between k and n - 1 + */ + uint8_t encoded_symbol_id; + + /** + * Total length of source block (n) + */ + uint8_t source_block_len; + + /** + * Total number of symbols (n - k) + */ + uint8_t n_fec_symbols; + + /** + * Align header to 64 bits + */ + uint8_t padding; + + void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } + uint32_t getSeqNumberBase() { return ntohl(seq_number); } + void setEncodedSymbolId(uint8_t esi) { encoded_symbol_id = esi; } + uint8_t getEncodedSymbolId() { return encoded_symbol_id; } + void setSourceBlockLen(uint8_t k) { source_block_len = k; } + uint8_t getSourceBlockLen() { return source_block_len; } + void setNFecSymbols(uint8_t n_r) { n_fec_symbols = n_r; } + uint8_t getNFecSymbols() { return n_fec_symbols; } +}; + +/** + * This class models the source block itself. + */ +class BlockCode : public Packets { + /** + * For variable length packet we need to prepend to the padded payload the + * real length of the packet. This is *not* sent over the network. + */ + static constexpr std::size_t LEN_SIZE_BYTES = 2; + + public: + BlockCode(uint32_t k, uint32_t n, struct fec_parms *code); + + /** + * Add a repair symbol to the dource block. + */ + bool addRepairSymbol(const fec::buffer &packet, uint32_t i); + + /** + * Add a source symbol to the source block. + */ + bool addSourceSymbol(const fec::buffer &packet, uint32_t i); + + /** + * Get current length of source block. + */ + std::size_t length() { return current_block_size_; } + + /** + * Get N + */ + uint32_t getN() { return n_; } + + /** + * Get K + */ + uint32_t getK() { return k_; } + + /** + * Clear source block + */ + void clear(); + + private: + /** + * Add symbol to source block + **/ + bool addSymbol(const fec::buffer &packet, uint32_t i, std::size_t size); + + /** + * Starting from k source symbols, get the n - k repair symbols + */ + void encode(); + + /** + * Starting from k symbols (mixed repair and source), get k source symbols. + * NOTE: It does not make sense to retrieve the k source symbols using the + * very same k source symbols. With the current implementation that case can + * never happen. + */ + void decode(); + + private: + uint32_t k_; + uint32_t n_; + struct fec_parms *code_; + std::size_t max_buffer_size_; + std::size_t current_block_size_; + std::vector<uint32_t> sorted_index_; + bool to_decode_; +}; + +/** + * This class contains common parameters between the fec encoder and decoder. + * In particular it contains: + * - The callback to be called when symbols are encoded / decoded + * - The reference to the static reed-solomon parameters, allocated at program + * startup + * - N and K. Ideally they are useful only for the encoder (the decoder can + * retrieve them from the FEC header). However right now we assume sender and + * receiver agreed on the parameters k and n to use. We will introduce a control + * message later to negotiate them, so that decoder cah dynamically change them + * during the download. + */ +class rs { + /** + * Deleter for static preallocated reed-solomon parameters. + */ + struct MatrixDeleter { + void operator()(struct fec_parms *params); + }; + + /** + * unique_ptr to reed-solomon parameters, with custom deleter to call fec_free + * at the end of the program + */ + using Matrix = std::unique_ptr<struct fec_parms, MatrixDeleter>; + + /** + * Key to retrieve static preallocated reed-solomon parameters. It is pair of + * k and n + */ + using Code = std::pair<std::uint32_t /* k */, std::uint32_t /* n */>; + + /** + * Custom hash function for (k, n) pair. + */ + struct CodeHasher { + std::size_t operator()(const transport::core::fec::rs::Code &code) const { + uint64_t ret = uint64_t(code.first) << 32 | uint64_t(code.second); + return std::hash<uint64_t>{}(ret); + } + }; + + protected: + /** + * Callback to be called after the encode or the decode operations. In the + * former case it will contain the symbols, while in the latter the sources. + */ + using PacketsReady = std::function<void(std::vector<buffer> &)>; + + /** + * The sequence number base. + */ + using SNBase = std::uint32_t; + + /** + * The map of source blocks, used at the decoder side. For the encoding + * operation we can use one source block only, since packet are produced in + * order. + */ + using SourceBlocks = std::unordered_map<SNBase, BlockCode>; + + /** + * Map (k, n) -> reed-solomon parameter + */ + using Codes = std::unordered_map<Code, Matrix, CodeHasher>; + + public: + rs(uint32_t k, uint32_t n); + ~rs() = default; + + /** + * Set callback to call after packet encoding / decoding + */ + void setFECCallback(const PacketsReady &callback); + + virtual void clear() { processed_source_blocks_.clear(); } + + private: + /** + * Create reed-solomon codes at program startup. + */ + static Codes createCodes(); + + protected: + bool processed(SNBase seq_base) { + return processed_source_blocks_.find(seq_base) != + processed_source_blocks_.end(); + } + + std::uint32_t k_; + std::uint32_t n_; + PacketsReady fec_callback_; + + /** + * Keep track of processed source blocks + */ + std::unordered_set<SNBase> processed_source_blocks_; + + static Codes codes_; +}; + +/** + * The reed-solomon encoder. It is feeded with source symbols and it provide + * repair-symbols through the fec_callback_ + */ +class encoder : public rs { + public: + encoder(uint32_t k, uint32_t n); + /** + * Always consume source symbols. + */ + void consume(const fec::buffer &packet, uint32_t index); + + void clear() override { + rs::clear(); + source_block_.clear(); + } + + private: + struct fec_parms *current_code_; + /** + * The source block. As soon as it is filled with k source symbols, the + * encoder calls the callback fec_callback_ and the resets the block 0, ready + * to accept another batch of k source symbols. + */ + BlockCode source_block_; +}; + +/** + * The reed-solomon encoder. It is feeded with source/repair symbols and it + * provides the original source symbols through the fec_callback_ + */ +class decoder : public rs { + public: + decoder(uint32_t k, uint32_t n); + + /** + * Consume source symbol + */ + void consume(const fec::buffer &packet, uint32_t i); + + /** + * Consume repair symbol + */ + void consume(const fec::buffer &packet); + + /** + * Clear decoder to reuse + */ + void clear() override { + rs::clear(); + src_blocks_.clear(); + parked_packets_.clear(); + } + + private: + void recoverPackets(SourceBlocks::iterator &src_block_it); + + private: + /** + * Map of source blocks. We use a map because we may receive symbols belonging + * to diffreent source blocks at the same time, so we need to be able to + * decode many source symbols at the same time. + */ + SourceBlocks src_blocks_; + + /** + * Unordered Map of source symbols for which we did not receive any repair + * symbol in the same source block. Notably this happens when: + * + * - We receive the source symbols first and the repair symbols after + * - We received only source symbols for a given block. In that case it does + * not make any sense to build the source block, since we received all the + * source packet of the block. + */ + std::unordered_map<uint32_t, std::vector<std::pair<buffer, uint32_t>>> + parked_packets_; +}; + +} // namespace fec + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/tcp_socket_connector.h b/libtransport/src/core/tcp_socket_connector.h index c57123e9f..9dbd250d1 100644 --- a/libtransport/src/core/tcp_socket_connector.h +++ b/libtransport/src/core/tcp_socket_connector.h @@ -15,12 +15,11 @@ #pragma once +#include <core/connector.h> #include <hicn/transport/config.h> #include <hicn/transport/core/name.h> #include <hicn/transport/utils/branch_prediction.h> -#include <core/connector.h> - #include <asio.hpp> #include <asio/steady_timer.hpp> #include <deque> |