diff options
Diffstat (limited to 'libtransport/src/protocols/fec')
-rw-r--r-- | libtransport/src/protocols/fec/CMakeLists.txt | 36 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/fec.cc | 838 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/fec.h | 65 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/fec_info.h | 62 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rely.cc | 205 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rely.h | 191 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rs.cc | 418 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rs.h | 409 |
8 files changed, 2224 insertions, 0 deletions
diff --git a/libtransport/src/protocols/fec/CMakeLists.txt b/libtransport/src/protocols/fec/CMakeLists.txt new file mode 100644 index 000000000..6d61ae043 --- /dev/null +++ b/libtransport/src/protocols/fec/CMakeLists.txt @@ -0,0 +1,36 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/fec.h + ${CMAKE_CURRENT_SOURCE_DIR}/rs.h + ${CMAKE_CURRENT_SOURCE_DIR}/fec_info.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/fec.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rs.cc +) + +if (ENABLE_RELY) + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rely.h + ) + + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rely.cc + ) +endif() + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/fec/fec.cc b/libtransport/src/protocols/fec/fec.cc new file mode 100644 index 000000000..16a04cb98 --- /dev/null +++ b/libtransport/src/protocols/fec/fec.cc @@ -0,0 +1,838 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980624 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +/* + * The following parameter defines how many bits are used for + * field elements. The code supports any value from 2 to 16 + * but fastest operation is achieved with 8 bit elements + * This is the only parameter you may want to change. + */ +#ifndef GF_BITS +#define GF_BITS 8 /* code over GF(2**GF_BITS) - change to suit */ +#endif + +#include "fec.h" + +#include <hicn/transport/portability/platform.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +/** + * XXX This disable a warning raising only in some platforms. + * TODO Check if this warning is a mistake or it is a real bug: + * https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83404 + * https://gcc.gnu.org/bugzilla//show_bug.cgi?id=88059 + */ +#ifndef __clang__ +#pragma GCC diagnostic ignored "-Wstringop-overflow" +#endif + +/* + * compatibility stuff + */ +#ifdef MSDOS /* but also for others, e.g. sun... */ +#define NEED_BCOPY +#define bcmp(a, b, n) memcmp(a, b, n) +#endif + +#ifdef ANDROID +#define bcmp(a, b, n) memcmp(a, b, n) +#endif + +#ifdef NEED_BCOPY +#define bcopy(s, d, siz) memcpy((d), (s), (siz)) +#define bzero(d, siz) memset((d), '\0', (siz)) +#endif + +/* + * stuff used for testing purposes only + */ + +#ifdef TEST +#define DEB(x) +#define DDB(x) x +#define DEBUG 0 /* minimal debugging */ +#ifdef MSDOS +#include <time.h> +struct timeval { + unsigned long ticks; +}; +#define gettimeofday(x, dummy) \ + { (x)->ticks = clock(); } +#define DIFF_T(a, b) (1 + 1000000 * (a.ticks - b.ticks) / CLOCKS_PER_SEC) +typedef unsigned long u_long; +typedef unsigned short u_short; +#else /* typically, unix systems */ +#include <sys/time.h> +#define DIFF_T(a, b) \ + (1 + 1000000 * (a.tv_sec - b.tv_sec) + (a.tv_usec - b.tv_usec)) +#endif + +#define TICK(t) \ + { \ + struct timeval x; \ + gettimeofday(&x, NULL); \ + t = x.tv_usec + 1000000 * (x.tv_sec & 0xff); \ + } +#define TOCK(t) \ + { \ + u_long t1; \ + TICK(t1); \ + if (t1 < t) \ + t = 256000000 + t1 - t; \ + else \ + t = t1 - t; \ + if (t == 0) t = 1; \ + } + +u_long ticks[10]; /* vars for timekeeping */ +#else +#define DEB(x) +#define DDB(x) +#define TICK(x) +#define TOCK(x) +#endif /* TEST */ + +/* + * You should not need to change anything beyond this point. + * The first part of the file implements linear algebra in GF. + * + * gf is the type used to store an element of the Galois Field. + * Must constain at least GF_BITS bits. + * + * Note: unsigned char will work up to GF(256) but int seems to run + * faster on the Pentium. We use int whenever have to deal with an + * index, since they are generally faster. + */ +#if (GF_BITS < 2 && GF_BITS > 16) +#error "GF_BITS must be 2 .. 16" +#endif + +#define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */ + +/* + * Primitive polynomials - see Lin & Costello, Appendix A, + * and Lee & Messerschmitt, p. 453. + */ +static const char *allPp[] = { + /* GF_BITS polynomial */ + NULL, /* 0 no code */ + NULL, /* 1 no code */ + "111", /* 2 1+x+x^2 */ + "1101", /* 3 1+x+x^3 */ + "11001", /* 4 1+x+x^4 */ + "101001", /* 5 1+x^2+x^5 */ + "1100001", /* 6 1+x+x^6 */ + "10010001", /* 7 1 + x^3 + x^7 */ + "101110001", /* 8 1+x^2+x^3+x^4+x^8 */ + "1000100001", /* 9 1+x^4+x^9 */ + "10010000001", /* 10 1+x^3+x^10 */ + "101000000001", /* 11 1+x^2+x^11 */ + "1100101000001", /* 12 1+x+x^4+x^6+x^12 */ + "11011000000001", /* 13 1+x+x^3+x^4+x^13 */ + "110000100010001", /* 14 1+x+x^6+x^10+x^14 */ + "1100000000000001", /* 15 1+x+x^15 */ + "11010000000010001" /* 16 1+x+x^3+x^12+x^16 */ +}; + +/* + * To speed up computations, we have tables for logarithm, exponent + * and inverse of a number. If GF_BITS <= 8, we use a table for + * multiplication as well (it takes 64K, no big deal even on a PDA, + * especially because it can be pre-initialized an put into a ROM!), + * otherwhise we use a table of logarithms. + * In any case the macro gf_mul(x,y) takes care of multiplications. + */ + +static gf gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */ +static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */ +static gf inverse[GF_SIZE + 1]; /* inverse of field elem. */ + /* inv[\alpha**i]=\alpha**(GF_SIZE-i-1) */ + +/* + * modnn(x) computes x % GF_SIZE, where GF_SIZE is 2**GF_BITS - 1, + * without a slow divide. + */ +static inline gf modnn(int x) { + while (x >= GF_SIZE) { + x -= GF_SIZE; + x = (x >> GF_BITS) + (x & GF_SIZE); + } + return x; +} + +#define SWAP(a, b, t) \ + { \ + t tmp; \ + tmp = a; \ + a = b; \ + b = tmp; \ + } + +/* + * gf_mul(x,y) multiplies two numbers. If GF_BITS<=8, it is much + * faster to use a multiplication table. + * + * USE_GF_MULC, GF_MULC0(c) and GF_ADDMULC(x) can be used when multiplying + * many numbers by the same constant. In this case the first + * call sets the constant, and others perform the multiplications. + * A value related to the multiplication is held in a local variable + * declared with USE_GF_MULC . See usage in addmul1(). + */ +#if (GF_BITS <= 8) +static gf gf_mul_table[GF_SIZE + 1][GF_SIZE + 1]; + +#define gf_mul(x, y) gf_mul_table[x][y] + +#define USE_GF_MULC gf *__gf_mulc_ +#define GF_MULC0(c) __gf_mulc_ = gf_mul_table[c] +#define GF_ADDMULC(dst, x) dst ^= __gf_mulc_[x] + +static void init_mul_table() { + int i, j; + for (i = 0; i < GF_SIZE + 1; i++) + for (j = 0; j < GF_SIZE + 1; j++) + gf_mul_table[i][j] = gf_exp[modnn(gf_log[i] + gf_log[j])]; + + for (j = 0; j < GF_SIZE + 1; j++) gf_mul_table[0][j] = gf_mul_table[j][0] = 0; +} +#else /* GF_BITS > 8 */ +static inline gf gf_mul(x, y) { + if ((x) == 0 || (y) == 0) return 0; + + return gf_exp[gf_log[x] + gf_log[y]]; +} +#define init_mul_table() + +#define USE_GF_MULC register gf *__gf_mulc_ +#define GF_MULC0(c) __gf_mulc_ = &gf_exp[gf_log[c]] +#define GF_ADDMULC(dst, x) \ + { \ + if (x) dst ^= __gf_mulc_[gf_log[x]]; \ + } +#endif + +/* + * Generate GF(2**m) from the irreducible polynomial p(X) in p[0]..p[m] + * Lookup tables: + * index->polynomial form gf_exp[] contains j= \alpha^i; + * polynomial form -> index form gf_log[ j = \alpha^i ] = i + * \alpha=x is the primitive element of GF(2^m) + * + * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple + * multiplication of two numbers can be resolved without calling modnn + */ + +/* + * i use malloc so many times, it is easier to put checks all in + * one place. + */ +static void *my_malloc(int sz, const char *err_string) { + void *p = malloc(sz); + if (p == NULL) { + fprintf(stderr, "-- malloc failure allocating %s\n", err_string); + exit(1); + } + return p; +} + +#define NEW_GF_MATRIX(rows, cols) \ + (gf *)my_malloc(rows *cols * sizeof(gf), " ## __LINE__ ## ") + +/* + * initialize the data structures used for computations in GF. + */ +static void generate_gf(void) { + int i; + gf mask; + const char *Pp = allPp[GF_BITS]; + + mask = 1; /* x ** 0 = 1 */ + gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */ + /* + * first, generate the (polynomial representation of) powers of \alpha, + * which are stored in gf_exp[i] = \alpha ** i . + * At the same time build gf_log[gf_exp[i]] = i . + * The first GF_BITS powers are simply bits shifted to the left. + */ + for (i = 0; i < GF_BITS; i++, mask <<= 1) { + gf_exp[i] = mask; + gf_log[gf_exp[i]] = i; + /* + * If Pp[i] == 1 then \alpha ** i occurs in poly-repr + * gf_exp[GF_BITS] = \alpha ** GF_BITS + */ + if (Pp[i] == '1') gf_exp[GF_BITS] ^= mask; + } + /* + * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can als + * compute its inverse. + */ + gf_log[gf_exp[GF_BITS]] = GF_BITS; + /* + * Poly-repr of \alpha ** (i+1) is given by poly-repr of + * \alpha ** i shifted left one-bit and accounting for any + * \alpha ** GF_BITS term that may occur when poly-repr of + * \alpha ** i is shifted. + */ + mask = 1 << (GF_BITS - 1); + for (i = GF_BITS + 1; i < GF_SIZE; i++) { + if (gf_exp[i - 1] >= mask) + gf_exp[i] = gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1); + else + gf_exp[i] = gf_exp[i - 1] << 1; + gf_log[gf_exp[i]] = i; + } + /* + * log(0) is not defined, so use a special value + */ + gf_log[0] = GF_SIZE; + /* set the extended gf_exp values for fast multiply */ + for (i = 0; i < GF_SIZE; i++) gf_exp[i + GF_SIZE] = gf_exp[i]; + + /* + * again special cases. 0 has no inverse. This used to + * be initialized to GF_SIZE, but it should make no difference + * since noone is supposed to read from here. + */ + inverse[0] = 0; + inverse[1] = 1; + for (i = 2; i <= GF_SIZE; i++) inverse[i] = gf_exp[GF_SIZE - gf_log[i]]; +} + +/* + * Various linear algebra operations that i use often. + */ + +/* + * addmul() computes dst[] = dst[] + c * src[] + * This is used often, so better optimize it! Currently the loop is + * unrolled 16 times, a good value for 486 and pentium-class machines. + * The case c=0 is also optimized, whereas c=1 is not. These + * calls are unfrequent in my typical apps so I did not bother. + * + * Note that gcc on + */ +#define addmul(dst, src, c, sz) \ + if (c != 0) addmul1(dst, src, c, sz) + +#define UNROLL 16 /* 1, 4, 8, 16 */ +static void addmul1(gf *dst1, gf *src1, gf c, int sz) { + USE_GF_MULC; + gf *dst = dst1, *src = src1; + gf *lim = &dst[sz - UNROLL + 1]; + + GF_MULC0(c); + +#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */ + for (; dst < lim; dst += UNROLL, src += UNROLL) { + GF_ADDMULC(dst[0], src[0]); + GF_ADDMULC(dst[1], src[1]); + GF_ADDMULC(dst[2], src[2]); + GF_ADDMULC(dst[3], src[3]); +#if (UNROLL > 4) + GF_ADDMULC(dst[4], src[4]); + GF_ADDMULC(dst[5], src[5]); + GF_ADDMULC(dst[6], src[6]); + GF_ADDMULC(dst[7], src[7]); +#endif +#if (UNROLL > 8) + GF_ADDMULC(dst[8], src[8]); + GF_ADDMULC(dst[9], src[9]); + GF_ADDMULC(dst[10], src[10]); + GF_ADDMULC(dst[11], src[11]); + GF_ADDMULC(dst[12], src[12]); + GF_ADDMULC(dst[13], src[13]); + GF_ADDMULC(dst[14], src[14]); + GF_ADDMULC(dst[15], src[15]); +#endif + } +#endif + lim += UNROLL - 1; + for (; dst < lim; dst++, src++) /* final components */ + GF_ADDMULC(*dst, *src); +} + +/* + * computes C = AB where A is n*k, B is k*m, C is n*m + */ +static void matmul(gf *a, gf *b, gf *c, int n, int k, int m) { + int row, col, i; + + for (row = 0; row < n; row++) { + for (col = 0; col < m; col++) { + gf *pa = &a[row * k]; + gf *pb = &b[col]; + gf acc = 0; + for (i = 0; i < k; i++, pa++, pb += m) acc ^= gf_mul(*pa, *pb); + c[row * m + col] = acc; + } + } +} + +#ifdef DEBUGG +/* + * returns 1 if the square matrix is identiy + * (only for test) + */ +static int is_identity(gf *m, int k) { + int row, col; + for (row = 0; row < k; row++) + for (col = 0; col < k; col++) + if ((row == col && *m != 1) || (row != col && *m != 0)) + return 0; + else + m++; + return 1; +} +#endif /* debug */ + +/* + * invert_mat() takes a matrix and produces its inverse + * k is the size of the matrix. + * (Gauss-Jordan, adapted from Numerical Recipes in C) + * Return non-zero if singular. + */ +DEB(int pivloops = 0; int pivswaps = 0; /* diagnostic */) +static int invert_mat(gf *src, int k) { + gf c, *p; + int irow, icol, row, col, i, ix; + + int error = 1; + int *indxc = (int *)my_malloc(k * sizeof(int), "indxc"); + int *indxr = (int *)my_malloc(k * sizeof(int), "indxr"); + int *ipiv = (int *)my_malloc(k * sizeof(int), "ipiv"); + gf *id_row = NEW_GF_MATRIX(1, k); + gf *temp_row = NEW_GF_MATRIX(1, k); + + bzero(id_row, k * sizeof(gf)); + DEB(pivloops = 0; pivswaps = 0; /* diagnostic */) + /* + * ipiv marks elements already used as pivots. + */ + for (i = 0; i < k; i++) ipiv[i] = 0; + + for (col = 0; col < k; col++) { + gf *pivot_row; + /* + * Zeroing column 'col', look for a non-zero element. + * First try on the diagonal, if it fails, look elsewhere. + */ + irow = icol = -1; + if (ipiv[col] != 1 && src[col * k + col] != 0) { + irow = col; + icol = col; + goto found_piv; + } + for (row = 0; row < k; row++) { + if (ipiv[row] != 1) { + for (ix = 0; ix < k; ix++) { + DEB(pivloops++;) + if (ipiv[ix] == 0) { + if (src[row * k + ix] != 0) { + irow = row; + icol = ix; + goto found_piv; + } + } else if (ipiv[ix] > 1) { + fprintf(stderr, "singular matrix\n"); + goto fail; + } + } + } + } + if (icol == -1) { + fprintf(stderr, "XXX pivot not found!\n"); + goto fail; + } + found_piv: + ++(ipiv[icol]); + /* + * swap rows irow and icol, so afterwards the diagonal + * element will be correct. Rarely done, not worth + * optimizing. + */ + if (irow != icol) { + for (ix = 0; ix < k; ix++) { + SWAP(src[irow * k + ix], src[icol * k + ix], gf); + } + } + indxr[col] = irow; + indxc[col] = icol; + pivot_row = &src[icol * k]; + c = pivot_row[icol]; + if (c == 0) { + fprintf(stderr, "singular matrix 2\n"); + goto fail; + } + if (c != 1) { /* otherwhise this is a NOP */ + /* + * this is done often , but optimizing is not so + * fruitful, at least in the obvious ways (unrolling) + */ + DEB(pivswaps++;) + c = inverse[c]; + pivot_row[icol] = 1; + for (ix = 0; ix < k; ix++) pivot_row[ix] = gf_mul(c, pivot_row[ix]); + } + /* + * from all rows, remove multiples of the selected row + * to zero the relevant entry (in fact, the entry is not zero + * because we know it must be zero). + * (Here, if we know that the pivot_row is the identity, + * we can optimize the addmul). + */ + id_row[icol] = 1; + if (bcmp(pivot_row, id_row, k * sizeof(gf)) != 0) { + for (p = src, ix = 0; ix < k; ix++, p += k) { + if (ix != icol) { + c = p[icol]; + p[icol] = 0; + addmul(p, pivot_row, c, k); + } + } + } + id_row[icol] = 0; + } /* done all columns */ + for (col = k - 1; col >= 0; col--) { + if (indxr[col] < 0 || indxr[col] >= k) + fprintf(stderr, "AARGH, indxr[col] %d\n", indxr[col]); + else if (indxc[col] < 0 || indxc[col] >= k) + fprintf(stderr, "AARGH, indxc[col] %d\n", indxc[col]); + else if (indxr[col] != indxc[col]) { + for (row = 0; row < k; row++) { + SWAP(src[row * k + indxr[col]], src[row * k + indxc[col]], gf); + } + } + } + error = 0; +fail: + free(indxc); + free(indxr); + free(ipiv); + free(id_row); + free(temp_row); + return error; +} + +/* + * fast code for inverting a vandermonde matrix. + * XXX NOTE: It assumes that the matrix + * is not singular and _IS_ a vandermonde matrix. Only uses + * the second column of the matrix, containing the p_i's. + * + * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but + * largely revised for my purposes. + * p = coefficients of the matrix (p_i) + * q = values of the polynomial (known) + */ + +int invert_vdm(gf *src, int k) { + int i, j, row, col; + gf *b, *c, *p; + gf t, xx; + + if (k == 1) /* degenerate case, matrix must be p^0 = 1 */ + return 0; + /* + * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1 + * b holds the coefficient for the matrix inversion + */ + c = NEW_GF_MATRIX(1, k); + b = NEW_GF_MATRIX(1, k); + + p = NEW_GF_MATRIX(1, k); + + for (j = 1, i = 0; i < k; i++, j += k) { + c[i] = 0; + p[i] = src[j]; /* p[i] */ + } + /* + * construct coeffs. recursively. We know c[k] = 1 (implicit) + * and start P_0 = x - p_0, then at each stage multiply by + * x - p_i generating P_i = x P_{i-1} - p_i P_{i-1} + * After k steps we are done. + */ + c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */ + for (i = 1; i < k; i++) { + gf p_i = p[i]; /* see above comment */ + for (j = k - 1 - (i - 1); j < k - 1; j++) c[j] ^= gf_mul(p_i, c[j + 1]); + c[k - 1] ^= p_i; + } + + for (row = 0; row < k; row++) { + /* + * synthetic division etc. + */ + xx = p[row]; + t = 1; + b[k - 1] = 1; /* this is in fact c[k] */ + for (i = k - 2; i >= 0; i--) { + b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]); + t = gf_mul(xx, t) ^ b[i]; + } + for (col = 0; col < k; col++) + src[col * k + row] = gf_mul(inverse[t], b[col]); + } + free(c); + free(b); + free(p); + return 0; +} + +static int fec_initialized = 0; +static void init_fec() { + TICK(ticks[0]); + generate_gf(); + TOCK(ticks[0]); + DDB(fprintf(stderr, "generate_gf took %ldus\n", ticks[0]);) + TICK(ticks[0]); + init_mul_table(); + TOCK(ticks[0]); + DDB(fprintf(stderr, "init_mul_table took %ldus\n", ticks[0]);) + fec_initialized = 1; +} + +/* + * This section contains the proper FEC encoding/decoding routines. + * The encoding matrix is computed starting with a Vandermonde matrix, + * and then transforming it into a systematic matrix. + */ + +#define FEC_MAGIC 0xFECC0DEC + +void fec_free(struct fec_parms *p) { + if (p == NULL || p->magic != (((FEC_MAGIC ^ p->k) ^ p->n) ^ + (unsigned long)(p->enc_matrix))) { + fprintf(stderr, "bad parameters to fec_free\n"); + return; + } + free(p->enc_matrix); + free(p); +} + +/* + * create a new encoder, returning a descriptor. This contains k,n and + * the encoding matrix. + */ +struct fec_parms *fec_new(int k, int n) { + int row, col; + gf *p, *tmp_m; + + struct fec_parms *retval; + + if (fec_initialized == 0) init_fec(); + + if (k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n) { + fprintf(stderr, "Invalid parameters k %d n %d GF_SIZE %d\n", k, n, GF_SIZE); + return NULL; + } + retval = (struct fec_parms *)my_malloc(sizeof(struct fec_parms), "new_code"); + retval->k = k; + retval->n = n; + retval->enc_matrix = NEW_GF_MATRIX(n, k); + retval->magic = ((FEC_MAGIC ^ k) ^ n) ^ (unsigned long)(retval->enc_matrix); + tmp_m = NEW_GF_MATRIX(n, k); + /* + * fill the matrix with powers of field elements, starting from 0. + * The first row is special, cannot be computed with exp. table. + */ + tmp_m[0] = 1; + for (col = 1; col < k; col++) tmp_m[col] = 0; + for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) { + for (col = 0; col < k; col++) p[col] = gf_exp[modnn(row * col)]; + } + + /* + * quick code to build systematic matrix: invert the top + * k*k vandermonde matrix, multiply right the bottom n-k rows + * by the inverse, and construct the identity matrix at the top. + */ + TICK(ticks[3]); + invert_vdm(tmp_m, k); /* much faster than invert_mat */ + matmul(tmp_m + k * k, tmp_m, retval->enc_matrix + k * k, n - k, k, k); + /* + * the upper matrix is I so do not bother with a slow multiply + */ + bzero(retval->enc_matrix, k * k * sizeof(gf)); + for (p = retval->enc_matrix, col = 0; col < k; col++, p += k + 1) *p = 1; + free(tmp_m); + TOCK(ticks[3]); + + DDB(fprintf(stderr, "--- %ld us to build encoding matrix\n", ticks[3]);) + DEB(pr_matrix(retval->enc_matrix, n, k, "encoding_matrix");) + return retval; +} + +/* + * fec_encode accepts as input pointers to n data packets of size sz, + * and produces as output a packet pointed to by fec, computed + * with index "index". + */ +void fec_encode(struct fec_parms *code, gf *src[], gf *fec, int index, int sz) { + int i, k = code->k; + gf *p; + + if (GF_BITS > 8) sz /= 2; + + if (index < k) + bcopy(src[index], fec, sz * sizeof(gf)); + else if (index < code->n) { + p = &(code->enc_matrix[index * k]); + bzero(fec, sz * sizeof(gf)); + for (i = 0; i < k; i++) addmul(fec, src[i], p[i], sz); + } else + fprintf(stderr, "Invalid index %d (max %d)\n", index, code->n - 1); +} + +/* + * shuffle move src packets in their position + */ +static int shuffle(gf *pkt[], int index[], int k) { + int i; + + for (i = 0; i < k;) { + if (index[i] >= k || index[i] == i) + i++; + else { + /* + * put pkt in the right position (first check for conflicts). + */ + int c = index[i]; + + if (index[c] == c) { + DEB(fprintf(stderr, "\nshuffle, error at %d\n", i);) + return 1; + } + SWAP(index[i], index[c], int); + SWAP(pkt[i], pkt[c], gf *); + } + } + DEB(/* just test that it works... */ + for (i = 0; i < k; i++) { + if (index[i] < k && index[i] != i) { + fprintf(stderr, "shuffle: after\n"); + for (i = 0; i < k; i++) fprintf(stderr, "%3d ", index[i]); + fprintf(stderr, "\n"); + return 1; + } + }) + return 0; +} + +/* + * build_decode_matrix constructs the encoding matrix given the + * indexes. The matrix must be already allocated as + * a vector of k*k elements, in row-major order + */ +static gf *build_decode_matrix(struct fec_parms *code, gf *pkt[], int index[]) { + int i, k = code->k; + gf *p, *matrix = NEW_GF_MATRIX(k, k); + + TICK(ticks[9]); + for (i = 0, p = matrix; i < k; i++, p += k) { +#if 1 /* this is simply an optimization, not very useful indeed */ + if (index[i] < k) { + bzero(p, k * sizeof(gf)); + p[i] = 1; + } else +#endif + if (index[i] < code->n) + bcopy(&(code->enc_matrix[index[i] * k]), p, k * sizeof(gf)); + else { + fprintf(stderr, "decode: invalid index %d (max %d)\n", index[i], + code->n - 1); + free(matrix); + return NULL; + } + } + TICK(ticks[9]); + if (invert_mat(matrix, k)) { + free(matrix); + matrix = NULL; + } + TOCK(ticks[9]); + return matrix; +} + +/* + * fec_decode receives as input a vector of packets, the indexes of + * packets, and produces the correct vector as output. + * + * Input: + * code: pointer to code descriptor + * pkt: pointers to received packets. They are modified + * to store the output packets (in place) + * index: pointer to packet indexes (modified) + * sz: size of each packet + */ +int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) { + gf *m_dec; + gf **new_pkt; + int row, col, k = code->k; + + if (GF_BITS > 8) sz /= 2; + + if (shuffle(pkt, index, k)) /* error if true */ + return 1; + m_dec = build_decode_matrix(code, pkt, index); + + if (m_dec == NULL) return 1; /* error */ + /* + * do the actual decoding + */ + new_pkt = (gf **)my_malloc(k * sizeof(gf *), "new pkt pointers"); + for (row = 0; row < k; row++) { + if (index[row] >= k) { + new_pkt[row] = (gf *)my_malloc(sz * sizeof(gf), "new pkt buffer"); + bzero(new_pkt[row], sz * sizeof(gf)); + for (col = 0; col < k; col++) + addmul(new_pkt[row], pkt[col], m_dec[row * k + col], sz); + } + } + /* + * move pkts to their final destination + */ + for (row = 0; row < k; row++) { + if (index[row] >= k) { + bcopy(new_pkt[row], pkt[row], sz * sizeof(gf)); + free(new_pkt[row]); + } + } + free(new_pkt); + free(m_dec); + + return 0; +} diff --git a/libtransport/src/protocols/fec/fec.h b/libtransport/src/protocols/fec/fec.h new file mode 100644 index 000000000..7710bb7af --- /dev/null +++ b/libtransport/src/protocols/fec/fec.h @@ -0,0 +1,65 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980614 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +/* + * The following parameter defines how many bits are used for + * field elements. The code supports any value from 2 to 16 + * but fastest operation is achieved with 8 bit elements + * This is the only parameter you may want to change. + */ +#ifndef GF_BITS +#define GF_BITS 8 /* code over GF(2**GF_BITS) - change to suit */ +#endif + +#if (GF_BITS <= 8) +typedef unsigned char gf; +#else +typedef unsigned short gf; +#endif + +#define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */ + +struct fec_parms { + unsigned long magic; + int k, n; /* parameters of the code */ + gf *enc_matrix; +}; + +void fec_free(struct fec_parms *p); +struct fec_parms *fec_new(int k, int n); + +void fec_encode(struct fec_parms *code, gf *src[], gf *fec, int index, int sz); +int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz); + +/* end of file */ diff --git a/libtransport/src/protocols/fec/fec_info.h b/libtransport/src/protocols/fec/fec_info.h new file mode 100644 index 000000000..bdfc4d3af --- /dev/null +++ b/libtransport/src/protocols/fec/fec_info.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/errors/not_implemented_exception.h> + +namespace transport { +namespace protocol { + +namespace fec { + +template <typename T> +struct FecInfo { + static bool isFec() { throw errors::NotImplementedException(); } + static uint32_t nextSymbol(uint32_t index) { + throw errors::NotImplementedException(); + } + static uint32_t nextSource(uint32_t index) { + throw errors::NotImplementedException(); + } +}; + +template <uint32_t K, uint32_t N> +struct Code {}; + +template <uint32_t K, uint32_t N> +struct FecInfo<Code<K, N>> { + static bool isFec(uint32_t index) { return (index % N) >= K; } + + static uint32_t nextSymbol(uint32_t index) { + if (isFec(index)) { + return index; + } + + return index + (K - (index % N)); + } + + static uint32_t nextSource(uint32_t index) { + if (!isFec(index)) { + return index; + } + + return index + (N - (index % N)); + } +}; + +} // namespace fec +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc new file mode 100644 index 000000000..7a30a62e2 --- /dev/null +++ b/libtransport/src/protocols/fec/rely.cc @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/core/global_object_pool.h> +#include <protocols/fec/rely.h> + +#include <rely/packet.hpp> + +namespace transport { +namespace protocol { +namespace fec { + +RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : RelyBase(k, n) { + configure(kmtu, ktimeout, kmax_stream_size); + set_repair_trigger(k_, n_ - k_, n_ - k_); +} + +void RelyEncoder::onPacketProduced(core::ContentObject &content_object, + uint32_t offset) { + // Get pointer to payload, leaving space to insert FEC header. + // TODO Check if this additional header is really needed. + auto data = content_object.writableData() + offset - sizeof(fec_header); + auto length = content_object.length() - offset + sizeof(fec_header); + + // Check packet length does not exceed maximum length supported by the + // encoder (otherwise segmentation would take place). + assert(length < max_packet_bytes()); + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Encoding packet of length " << length - sizeof(fec_header); + + // Get the suffix. With rely we need to write it in the fec_header in order to + // be able to recognize the seq number upon recovery. + auto suffix = content_object.getName().getSuffix(); + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Producing packet " << suffix + << " (index == " << current_index_ << ")"; + + // Consume payload. Add fec_header in front before feeding payload to encoder, + // and copy original content of packet + fec_header *h = reinterpret_cast<fec_header *>(data); + fec_header copy = *h; + h->setSeqNumberBase(suffix); + auto packets = consume(data, length, getCurrentTime()); + assert(packets == 1); + + // Update packet counter + current_index_ += packets; + + // Restore original packet content and increment data pointer to the correct + // position + *h = copy; + data += sizeof(fec_header); + + // Check position of this packet inside N size block + auto i = current_index_ % n_; + + // encoder will produce a source packet + if (i <= k_) { + // Rely modifies the payload of the packet. We replace the packet with the + // one returned by rely. + // TODO Optimize it by copying only the RELY header + + // Be sure encoder can produce + assert(can_produce()); + + // Check new payload size and make sure it fits in packet buffer + auto new_payload_size = produce_bytes(); + int difference = new_payload_size - length; + + assert(difference > 0); + assert(content_object.ensureCapacity(difference)); + + // Update length + DLOG_IF(INFO, VLOG_IS_ON(4)) << "The packet length will be incremented by " + << difference + sizeof(fec_header); + content_object.append(difference + sizeof(fec_header)); + content_object.updateLength(); + + // Make sure we got a source packet, otherwise we would put a repair symbol + // in a source packet + assert(rely::packet_is_systematic(produce_data())); + + // Copy rely packet replacing old source packet. + std::memcpy(data, produce_data(), new_payload_size); + + // Advance the encoder to next symbol. + produce_next(); + } + +#if 0 + if (i == k_) { + // Ensure repair are generated after k source packets + flush_repair(); + } +#endif + + // Here we should produce all the repair packets + while (can_produce()) { + // The current index MUST be k_, because we enforce n - k repair to be + // produced after k sources + assert(current_index_ == k_); + + buffer packet; + if (!buffer_callback_) { + // If no callback is installed, let's allocate a buffer from global pool + packet = core::PacketManager<>::getInstance().getMemBuf(); + packet->append(produce_bytes()); + } else { + // Otherwise let's ask a buffer to the caller. + packet = buffer_callback_(produce_bytes()); + } + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Producing symbol of size " << produce_bytes(); + + // Copy symbol to packet buffer + std::memcpy(packet->writableData(), produce_data(), produce_bytes()); + + // Push symbol in repair_packets + packets_.emplace_back(0, std::move(packet)); + + // Advance the encoder + produce_next(); + } + + // Print number of unprotected symbols + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Number of unprotected symbols: " << unprotected_symbols(); + + // If we have generated repair symbols, let's notify caller via the installed + // callback + if (packets_.size()) { + assert(packets_.size() == n_ - k_); + fec_callback_(packets_); + packets_.clear(); + current_index_ = 0; + } +} + +RelyDecoder::RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : RelyBase(k, n, seq_offset) { + configure(kmtu, ktimeout, kmax_stream_size); +} + +void RelyDecoder::onDataPacket(core::ContentObject &content_object, + uint32_t offset) { + // Adjust pointers to point to packet payload + auto data = content_object.writableData() + offset; + auto size = content_object.length() - offset; + + // Pass payload to decoder + consume(data, size, getCurrentTime()); + + // Drain decoder if possible + while (can_produce()) { + // Get size of decoded packet + auto size = produce_bytes(); + + // Get buffer to copy packet in + auto packet = core::PacketManager<>::getInstance().getMemBuf(); + + // Copy buffer + packet->append(size); + std::memcpy(packet->writableData(), produce_data(), size); + + // Read seq number + fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + uint32_t index = h->getSeqNumberBase(); + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "The index written in the packet is " << index; + + // Remove FEC header + packet->trimStart(sizeof(fec_header)); + + // Save packet in buffer + packets_.emplace_back(index, std::move(packet)); + + // Advance to next packet + produce_next(); + } + + // If we produced packets, lets notify the caller via the callback + if (packets_.size() > 0) { + fec_callback_(packets_); + packets_.clear(); + } +} + +} // namespace fec +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/fec/rely.h b/libtransport/src/protocols/fec/rely.h new file mode 100644 index 000000000..bfbdb30bc --- /dev/null +++ b/libtransport/src/protocols/fec/rely.h @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/chrono_typedefs.h> +#include <hicn/transport/utils/membuf.h> +#include <protocols/fec/fec_info.h> +#include <protocols/fec_base.h> + +#include <rely/decoder.hpp> +#include <rely/encoder.hpp> + +#define RELY_DEBUG 0 + +namespace transport { +namespace protocol { +namespace fec { + +/** + * @brief Table of used codes. + */ +#define foreach_rely_fec_type \ + _(Rely, 1, 3) \ + _(Rely, 2, 3) \ + _(Rely, 4, 5) \ + _(Rely, 4, 6) \ + _(Rely, 4, 7) \ + _(Rely, 6, 10) \ + _(Rely, 8, 10) \ + _(Rely, 8, 11) \ + _(Rely, 8, 12) \ + _(Rely, 8, 14) \ + _(Rely, 8, 16) \ + _(Rely, 8, 32) \ + _(Rely, 10, 30) \ + _(Rely, 10, 40) \ + _(Rely, 10, 90) \ + _(Rely, 16, 21) \ + _(Rely, 16, 23) \ + _(Rely, 16, 24) \ + _(Rely, 16, 27) \ + _(Rely, 17, 21) \ + _(Rely, 17, 34) \ + _(Rely, 32, 41) \ + _(Rely, 32, 46) \ + _(Rely, 32, 54) \ + _(Rely, 34, 42) \ + _(Rely, 35, 70) \ + _(Rely, 52, 62) + +/** + * @brief Base class to store common fields. + */ +class RelyBase : public virtual FECBase { + protected: + static const constexpr size_t kmax_stream_size = 125U; + static const constexpr size_t kmtu = 1500U; + static const constexpr size_t ktimeout = 100U; + /** + * @brief FEC Header, added to each packet to get sequence number upon + * decoding operations. It may be removed once we know the meaning of the + * fields in the rely header. + */ + struct fec_header { + uint32_t seq_number; + + void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } + uint32_t getSeqNumberBase() { return ntohl(seq_number); } + }; + + /** + * @brief Construct a new Rely Base object. + * + * @param k The number of source symbol needed to generate n - k repair + * symbols + * @param n The sum of source packets and repair packets in a `block` + * @param seq_offset offset to use if production suffixes starts from an index + * != 0 + */ + RelyBase(uint32_t k, uint32_t n, uint32_t seq_offset = 0) + : k_(k), + n_(n), + seq_offset_(seq_offset % n_), + current_index_(seq_offset) +#if RELY_DEBUG + , + time_(0) +#endif + { + } + + /** + * @brief Get the current time in milliseconds + * + * @return int64_t Current time in milliseconds + */ + int64_t getCurrentTime() { + // Get the current time +#if RELY_DEBUG + return time_++; +#else + auto _time = utils::SteadyClock::now().time_since_epoch(); + auto time = std::chrono::duration_cast<utils::Milliseconds>(_time).count(); + return time; +#endif + } + + protected: + uint32_t k_; + uint32_t n_; + std::uint32_t seq_offset_; + /** + * @brief Vector of packets to be passed to caller callbacks. For encoder it + * will contain the repair packets, for decoder the recovered sources. + */ + std::vector<std::pair<uint32_t, buffer>> packets_; + + /** + * @brief Current index to be used for local packet count. + * + */ + uint32_t current_index_; +#if RELY_DEBUG + uint32_t time_; +#endif +}; + +/** + * @brief The Rely Encoder implementation. + * + */ +class RelyEncoder : private RelyBase, + private rely::encoder, + public ProducerFEC { + public: + RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); + /** + * Producers will call this function when they produce a data packet. + */ + void onPacketProduced(core::ContentObject &content_object, + uint32_t offset) override; + + /** + * @brief Get the fec header size, if added to source packets + */ + std::size_t getFecHeaderSize() override { + return header_bytes() + sizeof(fec_header) + 4; + } + + void reset() override {} +}; + +class RelyDecoder : private RelyBase, + private rely::decoder, + public ConsumerFEC { + public: + RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); + + /** + * Consumers will call this function when they receive a data packet + */ + void onDataPacket(core::ContentObject &content_object, + uint32_t offset) override; + + /** + * @brief Get the fec header size, if added to source packets + */ + std::size_t getFecHeaderSize() override { + return header_bytes() + sizeof(fec_header); + } + + void reset() override {} +}; + +} // namespace fec + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/fec/rs.cc b/libtransport/src/protocols/fec/rs.cc new file mode 100644 index 000000000..2c23d515d --- /dev/null +++ b/libtransport/src/protocols/fec/rs.cc @@ -0,0 +1,418 @@ + +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <glog/logging.h> +#include <hicn/transport/core/global_object_pool.h> +#include <protocols/fec/fec.h> +#include <protocols/fec/rs.h> + +#include <cassert> + +namespace transport { +namespace protocol { +namespace fec { + +BlockCode::BlockCode(uint32_t k, uint32_t n, uint32_t seq_offset, + struct fec_parms *code, rs ¶ms) + : Packets(), + k_(k), + n_(n), + seq_offset_(seq_offset), + code_(code), + max_buffer_size_(0), + current_block_size_(0), + to_decode_(false), + params_(params) { + sorted_index_.reserve(n); + UNUSED(seq_offset_); +} + +bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i, + uint32_t offset) { + // Get index + to_decode_ = true; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding symbol of size " << packet->length(); + return addSymbol(packet, i, offset, + packet->length() - sizeof(fec_header) - offset); +} + +bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i, + uint32_t offset) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding source symbol of size " + << packet->length() << ", offset " << offset; + return addSymbol(packet, i, offset, packet->length() - offset); +} + +bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, + uint32_t offset, std::size_t size) { + if (size > max_buffer_size_) { + max_buffer_size_ = size; + } + + operator[](current_block_size_++) = std::make_tuple(i, packet, offset); + + if (current_block_size_ >= k_) { + if (to_decode_) { + decode(); + } else { + encode(); + } + + clear(); + return false; + } + + return true; +} + +void BlockCode::encode() { + gf *data[n_]; + uint32_t base = std::get<0>(operator[](0)); + + // Set packet length in first 2 bytes + for (uint32_t i = 0; i < k_; i++) { + auto &packet = std::get<1>(operator[](i)); + auto offset = std::get<2>(operator[](i)); + + auto ret = + packet->ensureCapacityAndFillUnused(max_buffer_size_ + offset, 0); + if (TRANSPORT_EXPECT_FALSE(ret == false)) { + throw errors::RuntimeException( + "Provided packet is not suitable to be used as FEC source packet. " + "Aborting."); + } + + // Buffers should hold 2 *after* the padding, in order to be + // able to set the length for the encoding operation. + // packet->trimStart(offset); + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData() + + max_buffer_size_ + offset); + auto buffer_length = packet->length() - offset; + *length = htons(buffer_length); + + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current buffer size: " << packet->length(); + + data[i] = packet->writableData() + offset; + } + + // Finish to fill source block with the buffers to hold the repair symbols + auto length = max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES; + for (uint32_t i = k_; i < n_; i++) { + buffer packet; + if (!params_.buffer_callback_) { + // If no callback is installed, let's allocate a buffer from global pool + packet = core::PacketManager<>::getInstance().getMemBuf(); + packet->append(length); + } else { + // Otherwise let's ask a buffer to the caller. + packet = params_.buffer_callback_(length); + } + + fec_header *fh = reinterpret_cast<fec_header *>(packet->writableData()); + + fh->setSeqNumberBase(base); + fh->setNFecSymbols(n_ - k_); + fh->setEncodedSymbolId(i); + fh->setSourceBlockLen(n_); + + packet->trimStart(sizeof(fec_header)); + + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current symbol size: " << packet->length(); + + data[i] = packet->writableData(); + operator[](i) = std::make_tuple(i, std::move(packet), uint32_t(0)); + } + + // Generate repair symbols and put them in corresponding buffers + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Calling encode with max_buffer_size_ = " << max_buffer_size_; + for (uint32_t i = k_; i < n_; i++) { + fec_encode(code_, data, data[i], i, max_buffer_size_ + LEN_SIZE_BYTES); + } + + // Re-include header in repair packets + for (uint32_t i = k_; i < n_; i++) { + auto &packet = std::get<1>(operator[](i)); + packet->prepend(sizeof(fec_header)); + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Produced repair symbol of size = " << packet->length(); + } +} + +void BlockCode::decode() { + gf *data[k_]; + uint32_t index[k_]; + + for (uint32_t i = 0; i < k_; i++) { + auto &packet = std::get<1>(operator[](i)); + index[i] = std::get<0>(operator[](i)); + auto offset = std::get<2>(operator[](i)); + sorted_index_[i] = index[i]; + + if (index[i] < k_) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "DECODE SOURCE - index " << index[i] + << " - Current buffer size: " << packet->length(); + // This is a source packet. We need to fill + // additional space to 0 and append the length + + // Buffers should hold 2 bytes at the end, in order to be + // able to set the length for the encoding operation + packet->trimStart(offset); + packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); + uint16_t *length = reinterpret_cast<uint16_t *>( + packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); + + *length = htons(packet->length()); + } else { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "DECODE SYMBOL - index " << index[i] + << " - Current buffer size: " << packet->length(); + packet->trimStart(sizeof(fec_header) + offset); + } + + data[i] = packet->writableData(); + } + + // We decode the source block + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Calling decode with max_buffer_size_ = " << max_buffer_size_; + fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); + + // Find the index in the block for recovered packets + for (uint32_t i = 0; i < k_; i++) { + if (index[i] != i) { + for (uint32_t j = 0; j < k_; j++) + if (sorted_index_[j] == uint32_t(index[i])) { + sorted_index_[j] = i; + } + } + } + + // Reorder block by index with in-place sorting + for (uint32_t i = 0; i < k_; i++) { + for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { + std::swap(sorted_index_[j], sorted_index_[i]); + std::swap(operator[](j), operator[](i)); + } + } + + // Adjust length according to the one written in the source packet + for (uint32_t i = 0; i < k_; i++) { + auto &packet = std::get<1>(operator[](i)); + uint16_t *length = reinterpret_cast<uint16_t *>( + packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); + packet->setLength(ntohs(*length)); + } +} + +void BlockCode::clear() { + current_block_size_ = 0; + max_buffer_size_ = 0; + sorted_index_.clear(); + to_decode_ = false; +} + +void rs::MatrixDeleter::operator()(struct fec_parms *params) { + fec_free(params); +} + +rs::Codes rs::createCodes() { + Codes ret; + +#define _(name, k, n) \ + ret.emplace(std::make_pair(k, n), Matrix(fec_new(k, n), MatrixDeleter())); + foreach_rs_fec_type +#undef _ + + return ret; +} + +rs::Codes rs::codes_ = createCodes(); + +rs::rs(uint32_t k, uint32_t n, uint32_t seq_offset) + : k_(k), n_(n), seq_offset_(seq_offset % n) {} + +RSEncoder::RSEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : rs(k, n, seq_offset), + current_code_(codes_[std::make_pair(k, n)].get()), + source_block_(k_, n_, seq_offset_, current_code_, *this) {} + +void RSEncoder::consume(const fec::buffer &packet, uint32_t index, + uint32_t offset) { + if (!source_block_.addSourceSymbol(packet, index, offset)) { + std::vector<std::pair<uint32_t, buffer>> repair_packets; + for (uint32_t i = k_; i < n_; i++) { + repair_packets.emplace_back(std::move(std::get<0>(source_block_[i])), + std::move(std::get<1>(source_block_[i]))); + } + + fec_callback_(repair_packets); + } +} + +void RSEncoder::onPacketProduced(core::ContentObject &content_object, + uint32_t offset) { + consume(content_object.shared_from_this(), + content_object.getName().getSuffix(), offset); +} + +RSDecoder::RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) + : rs(k, n, seq_offset) {} + +void RSDecoder::recoverPackets(SourceBlocks::iterator &src_block_it) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "recoverPackets for " << k_; + auto &src_block = src_block_it->second; + std::vector<std::pair<uint32_t, buffer>> source_packets(k_); + for (uint32_t i = 0; i < src_block.getK(); i++) { + source_packets[i] = std::make_pair(src_block_it->first + i, + std::move(std::get<1>(src_block[i]))); + } + + setProcessed(src_block_it->first); + + fec_callback_(source_packets); + processed_source_blocks_.emplace(src_block_it->first); + + auto it = parked_packets_.find(src_block_it->first); + if (it != parked_packets_.end()) { + parked_packets_.erase(it); + } + + src_blocks_.erase(src_block_it); +} + +void RSDecoder::consumeSource(const fec::buffer &packet, uint32_t index, + uint32_t offset) { + // Normalize index + assert(index >= seq_offset_); + auto i = (index - seq_offset_) % n_; + + // Get base + uint32_t base = index - i; + + if (processed(base)) { + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Decoder consume called for source symbol. BASE = " << base + << ", index = " << index << " and i = " << i; + + // check if a source block already exist for this symbol. If it does not + // exist, we lazily park this packet until we receive a repair symbol for the + // same block. This is done for 2 reason: + // 1) If we receive all the source packets of a block, we do not need to + // recover anything. + // 2) Sender may change n and k at any moment, so we construct the source + // block based on the (n, k) values written in the fec header. This is + // actually not used right now, since we use fixed value of n and k passed + // at construction time, but it paves the ground for a more dynamic + // protocol that may come in the future. + auto it = src_blocks_.find(base); + if (it != src_blocks_.end()) { + auto ret = it->second.addSourceSymbol(packet, i, offset); + if (!ret) { + recoverPackets(it); + } + } else { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding to parked source packets"; + auto ret = parked_packets_.emplace( + base, std::vector<std::pair<buffer, uint32_t>>()); + ret.first->second.emplace_back(packet, i); + + if (ret.first->second.size() >= k_) { + setProcessed(ret.first->first); + parked_packets_.erase(ret.first); + } + } +} + +void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { + // Repair symbol! Get index and base source block. + fec_header *h = + reinterpret_cast<fec_header *>(packet->writableData() + offset); + auto i = h->getEncodedSymbolId(); + auto base = h->getSeqNumberBase(); + auto n = h->getSourceBlockLen(); + auto k = n - h->getNFecSymbols(); + + if (processed(base)) { + return; + } + + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Decoder consume called for repair symbol. BASE = " << base + << ", index = " << base + i << " and i = " << i << ". K=" << k + << ", N=" << n; + + // check if a source block already exist for this symbol + auto it = src_blocks_.find(base); + if (it == src_blocks_.end()) { + // Create new source block + auto code_it = codes_.find(std::make_pair(k, n)); + if (code_it == codes_.end()) { + LOG(ERROR) << "Code for k = " << k << " and n = " << n + << " does not exist."; + return; + } + + auto emplace_result = src_blocks_.emplace( + base, BlockCode(k, n, seq_offset_, code_it->second.get(), *this)); + it = emplace_result.first; + + // Check in the parked packets and insert any packet that is part of this + // source block + + auto it2 = parked_packets_.find(base); + if (it2 != parked_packets_.end()) { + for (auto &packet_index : it2->second) { + auto ret = it->second.addSourceSymbol(packet_index.first, + packet_index.second, offset); + if (!ret) { + recoverPackets(it); + // Finish to delete packets in same source block that were + // eventually not used + return; + } + } + } + } + + auto ret = it->second.addRepairSymbol(packet, i, offset); + if (!ret) { + recoverPackets(it); + } +} + +void RSDecoder::onDataPacket(core::ContentObject &content_object, + uint32_t offset) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Calling fec for data packet " << content_object.getName() + << ". Offset: " << offset; + + auto suffix = content_object.getName().getSuffix(); + + if (isSymbol(suffix)) { + consumeRepair(content_object.shared_from_this(), offset); + } else { + consumeSource(content_object.shared_from_this(), suffix, offset); + } +} + +} // namespace fec +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/fec/rs.h b/libtransport/src/protocols/fec/rs.h new file mode 100644 index 000000000..e159ad9f7 --- /dev/null +++ b/libtransport/src/protocols/fec/rs.h @@ -0,0 +1,409 @@ + +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <arpa/inet.h> +#include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/utils/membuf.h> +#include <protocols/fec/fec_info.h> +#include <protocols/fec_base.h> + +#include <array> +#include <cstdint> +#include <map> +#include <unordered_set> +#include <vector> + +namespace transport { +namespace protocol { + +namespace fec { + +#define foreach_rs_fec_type \ + _(RS, 1, 3) \ + _(RS, 4, 5) \ + _(RS, 4, 6) \ + _(RS, 4, 7) \ + _(RS, 6, 10) \ + _(RS, 8, 10) \ + _(RS, 8, 11) \ + _(RS, 8, 12) \ + _(RS, 8, 14) \ + _(RS, 8, 16) \ + _(RS, 8, 32) \ + _(RS, 10, 30) \ + _(RS, 10, 40) \ + _(RS, 10, 60) \ + _(RS, 10, 90) \ + _(RS, 16, 18) \ + _(RS, 16, 21) \ + _(RS, 16, 23) \ + _(RS, 16, 24) \ + _(RS, 16, 27) \ + _(RS, 17, 21) \ + _(RS, 17, 34) \ + _(RS, 32, 36) \ + _(RS, 32, 41) \ + _(RS, 32, 46) \ + _(RS, 32, 54) \ + _(RS, 34, 42) \ + _(RS, 35, 70) \ + _(RS, 52, 62) + +static const constexpr uint16_t MAX_SOURCE_BLOCK_SIZE = 128; + +/** + * We use a std::array in place of std::vector to avoid to allocate a new vector + * in the heap every time we build a new source block, which would be bad if + * the decoder has to allocate several source blocks for many concurrent bases. + * std::array allows to be constructed in place, saving the allocation at the + * price os knowing in advance its size. + */ +using Packets = std::array<std::tuple</* index */ uint32_t, /* buffer */ buffer, + uint32_t /* offset */>, + MAX_SOURCE_BLOCK_SIZE>; + +/** + * FEC Header, prepended to symbol packets. + */ +struct fec_header { + /** + * The base source packet seq_number this FES symbol refers to + */ + uint32_t seq_number; + + /** + * The index of the symbol inside the source block, between k and n - 1 + */ + uint8_t encoded_symbol_id; + + /** + * Total length of source block (n) + */ + uint8_t source_block_len; + + /** + * Total number of symbols (n - k) + */ + uint8_t n_fec_symbols; + + /** + * Align header to 64 bits + */ + uint8_t padding; + + void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } + uint32_t getSeqNumberBase() { return ntohl(seq_number); } + void setEncodedSymbolId(uint8_t esi) { encoded_symbol_id = esi; } + uint8_t getEncodedSymbolId() { return encoded_symbol_id; } + void setSourceBlockLen(uint8_t k) { source_block_len = k; } + uint8_t getSourceBlockLen() { return source_block_len; } + void setNFecSymbols(uint8_t n_r) { n_fec_symbols = n_r; } + uint8_t getNFecSymbols() { return n_fec_symbols; } +}; + +class rs; + +/** + * This class models the source block itself. + */ +class BlockCode : public Packets { + /** + * For variable length packet we need to prepend to the padded payload the + * real length of the packet. This is *not* sent over the network. + */ + static constexpr std::size_t LEN_SIZE_BYTES = 2; + + public: + BlockCode(uint32_t k, uint32_t n, uint32_t seq_offset, struct fec_parms *code, + rs ¶ms); + + /** + * Add a repair symbol to the dource block. + */ + bool addRepairSymbol(const fec::buffer &packet, uint32_t i, + uint32_t offset = 0); + + /** + * Add a source symbol to the source block. + */ + bool addSourceSymbol(const fec::buffer &packet, uint32_t i, + uint32_t offset = 0); + + /** + * Get current length of source block. + */ + std::size_t length() { return current_block_size_; } + + /** + * Get N + */ + uint32_t getN() { return n_; } + + /** + * Get K + */ + uint32_t getK() { return k_; } + + /** + * Clear source block + */ + void clear(); + + private: + /** + * Add symbol to source block + **/ + bool addSymbol(const fec::buffer &packet, uint32_t i, uint32_t offset, + std::size_t size); + + /** + * Starting from k source symbols, get the n - k repair symbols + */ + void encode(); + + /** + * Starting from k symbols (mixed repair and source), get k source symbols. + * NOTE: It does not make sense to retrieve the k source symbols using the + * very same k source symbols. With the current implementation that case can + * never happen. + */ + void decode(); + + private: + uint32_t k_; + uint32_t n_; + uint32_t seq_offset_; + struct fec_parms *code_; + std::size_t max_buffer_size_; + std::size_t current_block_size_; + std::vector<uint32_t> sorted_index_; + bool to_decode_; + rs ¶ms_; +}; + +/** + * This class contains common parameters between the fec encoder and decoder. + * In particular it contains: + * - The callback to be called when symbols are encoded / decoded + * - The reference to the static reed-solomon parameters, allocated at program + * startup + * - N and K. Ideally they are useful only for the encoder (the decoder can + * retrieve them from the FEC header). However right now we assume sender and + * receiver agreed on the parameters k and n to use. We will introduce a control + * message later to negotiate them, so that decoder cah dynamically change them + * during the download. + */ +class rs : public virtual FECBase { + friend class BlockCode; + + /** + * Deleter for static preallocated reed-solomon parameters. + */ + struct MatrixDeleter { + void operator()(struct fec_parms *params); + }; + + /** + * unique_ptr to reed-solomon parameters, with custom deleter to call fec_free + * at the end of the program + */ + using Matrix = std::unique_ptr<struct fec_parms, MatrixDeleter>; + + /** + * Key to retrieve static preallocated reed-solomon parameters. It is pair of + * k and n + */ + using Code = std::pair<std::uint32_t /* k */, std::uint32_t /* n */>; + + /** + * Custom hash function for (k, n) pair. + */ + struct CodeHasher { + std::size_t operator()(const Code &code) const { + uint64_t ret = uint64_t(code.first) << 32 | uint64_t(code.second); + return std::hash<uint64_t>{}(ret); + } + }; + + protected: + /** + * Callback to be called after the encode or the decode operations. In the + * former case it will contain the symbols, while in the latter the sources. + */ + using PacketsReady = std::function<void(std::vector<buffer> &)>; + + /** + * The sequence number base. + */ + using SNBase = std::uint32_t; + + /** + * The map of source blocks, used at the decoder side. For the encoding + * operation we can use one source block only, since packet are produced in + * order. + */ + using SourceBlocks = std::unordered_map<SNBase, BlockCode>; + + /** + * Map (k, n) -> reed-solomon parameter + */ + using Codes = std::unordered_map<Code, Matrix, CodeHasher>; + + public: + rs(uint32_t k, uint32_t n, uint32_t seq_offset = 0); + ~rs() = default; + + virtual void clear() { processed_source_blocks_.clear(); } + + bool isSymbol(uint32_t index) { return ((index - seq_offset_) % n_) >= k_; } + + private: + /** + * Create reed-solomon codes at program startup. + */ + static Codes createCodes(); + + protected: + bool processed(SNBase seq_base) { + return processed_source_blocks_.find(seq_base) != + processed_source_blocks_.end(); + } + + void setProcessed(SNBase seq_base) { + processed_source_blocks_.emplace(seq_base); + } + + std::uint32_t k_; + std::uint32_t n_; + std::uint32_t seq_offset_; + + /** + * Keep track of processed source blocks + */ + std::unordered_set<SNBase> processed_source_blocks_; + + static Codes codes_; +}; + +/** + * The reed-solomon encoder. It is feeded with source symbols and it provide + * repair-symbols through the fec_callback_ + */ +class RSEncoder : public rs, public ProducerFEC { + public: + RSEncoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); + /** + * Always consume source symbols. + */ + void consume(const fec::buffer &packet, uint32_t index, uint32_t offset = 0); + + void onPacketProduced(core::ContentObject &content_object, + uint32_t offset) override; + + /** + * @brief Get the fec header size, if added to source packets + */ + std::size_t getFecHeaderSize() override { return 0; } + + void clear() override { + rs::clear(); + source_block_.clear(); + } + + void reset() override { clear(); } + + private: + struct fec_parms *current_code_; + /** + * The source block. As soon as it is filled with k source symbols, the + * encoder calls the callback fec_callback_ and the resets the block 0, ready + * to accept another batch of k source symbols. + */ + BlockCode source_block_; +}; + +/** + * The reed-solomon encoder. It is feeded with source/repair symbols and it + * provides the original source symbols through the fec_callback_ + */ +class RSDecoder : public rs, public ConsumerFEC { + public: + RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); + + /** + * Consume source symbol + */ + void consumeSource(const fec::buffer &packet, uint32_t i, + uint32_t offset = 0); + + /** + * Consume repair symbol + */ + void consumeRepair(const fec::buffer &packet, uint32_t offset = 0); + + /** + * Consumers will call this function when they receive a data packet + */ + void onDataPacket(core::ContentObject &content_object, + uint32_t offset) override; + + /** + * @brief Get the fec header size, if added to source packets + */ + std::size_t getFecHeaderSize() override { return 0; } + + /** + * Clear decoder to reuse + */ + void clear() override { + rs::clear(); + src_blocks_.clear(); + parked_packets_.clear(); + } + + void reset() override { clear(); } + + private: + void recoverPackets(SourceBlocks::iterator &src_block_it); + + private: + /** + * Map of source blocks. We use a map because we may receive symbols belonging + * to diffreent source blocks at the same time, so we need to be able to + * decode many source symbols at the same time. + */ + SourceBlocks src_blocks_; + + /** + * Unordered Map of source symbols for which we did not receive any repair + * symbol in the same source block. Notably this happens when: + * + * - We receive the source symbols first and the repair symbols after + * - We received only source symbols for a given block. In that case it does + * not make any sense to build the source block, since we received all the + * source packet of the block. + */ + std::unordered_map<uint32_t, std::vector<std::pair<buffer, uint32_t>>> + parked_packets_; +}; + +} // namespace fec + +} // namespace protocol + +} // namespace transport |