/* * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * VCL Locked Sessions (VLS) is a wrapper that synchronizes access to VCL APIs * which are, by construction, not thread safe. To this end, VLS uses * configuration and heuristics to detect how applications use sessions in * an attempt to optimize the locking strategy. The modes of operation * currently supported are the following: * * 1) per-process workers * * +----------+ +----------+ * | | | | * | process0 | | process1 | * | | | | * +-----+----+ +-----+----+ * | | * | | * +-----+----+ +-----+----+ * | | | | * | vls_wrk0 | | vls_wrk1 | * | | | | * +-----+----+ +-----+----+ * | | * | | * +-----+----+ +-----+----+ * | | | | * | vcl_wrk0 | | vcl_wrk1 | * | | | | * +----------+ +----------+ * * 2) per-thread workers 3) single-worker multi-thread * * +---------+ +---------+ +---------+ +---------+ * | | | | | | | | * | thread0 | | thread1 | | thread0 | | thread1 | * | | | | | | | | * +--------++ +-+-------+ +--------++ +-+-------+ * | | | | * | | | | * +-+------+-+ +-+------+-+ * | | | | * | vls_wrk0 | | vls_wrk0 | * | | | | * +-+------+-+ +----+-----+ * | | | * | | | * +--------+-+ +-+--------+ +----+-----+ * | | | | | | * | vcl_wrk0 | | vcl_wrk1 | | vcl_wrk0 | * | | | | | | * +----------+ +----------+ +----------+ * * 1) per-process workers: intercept fork calls and assume all children * processes are new workers that must be registered with vcl. VLS * sessions are cloned and shared between workers. Only shared sessions * are locked on use and thereby only one process can interact with * them at a time (explicit sharing). * * 2) per-thread workers: each newly detected pthread is assumed to be a new * worker and is registered with vcl. Enabled via configuration. * When a thread tries to access a session it does not own, a clone and * share rpc request is sent to the owning thread via vcl and vpp. * Consequently, a vls session can map to multiple vcl sessions, one per * vcl worker. VLS sessions are locked on use (implicit sharing). * * 3) single-worker multi-thread: vls does not make any assumptions about * application threads and therefore implements an aggressive locking * strategy that limits access to underlying vcl resources based on type * of interaction and locks vls session on use (implicit sharing). */ #include #include typedef struct vls_shared_data_ { clib_spinlock_t lock; /**< shared data lock */ u32 owner_wrk_index; /**< vcl wrk that owns session */ u32 *workers_subscribed; /**< vec of wrks subscribed to session */ clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */ } vls_shared_data_t; typedef struct vcl_locked_session_ { clib_spinlock_t lock; /**< vls lock when in use */ u32 session_index; /**< vcl session index */ u32 vcl_wrk_index; /**< vcl worker index */ u32 vls_index; /**< index in vls pool */ u32 shared_data_index; /**< shared data index if any */ u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */ uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */ } vcl_locked_session_t; typedef struct vls_worker_ { clib_rwlock_t sh_to_vlsh_table_lock; /**< ht rwlock with mt workers */ vcl_locked_session_t *vls_pool; /**< pool of vls session */ uword *sh_to_vlsh_table; /**< map from vcl sh to vls sh */ u32 *pending_vcl_wrk_cleanup; /**< child vcl wrks to cleanup */ u32 vcl_wrk_index; /**< if 1:1 map vls to vcl wrk */ } vls_worker_t; typedef struct vls_local_ { int vls_wrk_index; /**< vls wrk index, 1 per process */ volatile int vls_mt_n_threads; /**< number of threads detected */ clib_rwlock_t vls_pool_lock; /**< per process/wrk vls pool locks */ pthread_mutex_t vls_mt_mq_mlock; /**< vcl mq lock */ pthread_mutex_t vls_mt_spool_mlock; /**< vcl select or pool lock */ volatile u8 select_mp_check; /**< flag set if select checks done */ } vls_process_local_t; static vls_process_local_t vls_local; static vls_process_local_t *vlsl = &vls_local; typedef struct vls_main_ { vls_worker_t *workers; /**< pool of vls workers */ vls_shared_data_t *shared_data_pool; /**< inter proc pool of shared data */ clib_rwlock_t shared_data_lock; /**< shared data pool lock */ clib_spinlock_t worker_rpc_lock; /**< lock for inter-worker rpcs */ } vls_main_t; vls_main_t *vlsm; typedef enum { VLS_RPC_STATE_INIT, VLS_RPC_STATE_SUCCESS, VLS_RPC_STATE_SESSION_NOT_EXIST, } vls_rpc_state_e; typedef enum vls_rpc_msg_type_ { VLS_RPC_CLONE_AND_SHARE, VLS_RPC_SESS_CLEANUP, } vls_rpc_msg_type_e; typedef struct vls_rpc_msg_ { u8 type; u8 data[0]; } vls_rpc_msg_t; typedef struct vls_clone_and_share_msg_ { u32 vls_index; /**< vls to be shared */ u32 session_index; /**< vcl session to be shared */ u32 origin_vls_wrk; /**< vls worker that initiated the rpc */ u32 origin_vls_index; /**< vls session of the originator */ u32 origin_vcl_wrk; /**< vcl worker that initiated the rpc */ u32 origin_session_index; /**< vcl session of the originator */ } vls_clone_and_share_msg_t; typedef struct vls_sess_cleanup_msg_ { u32 session_index; /**< vcl session to be cleaned */ u32 origin_vcl_wrk; /**< worker that initiated the rpc */ } vls_sess_cleanup_msg_t; void vls_send_session_cleanup_rpc (vcl_worker_t * wrk, u32 dst_wrk_index, u32 dst_session_index); void vls_send_clone_and_share_rpc (vcl_worker_t *wrk, u32 origin_vls_index, u32 session_index, u32 vls_wrk_index, u32 dst_wrk_index, u32 dst_vls_index, u32 dst_session_index); static void vls_cleanup_forked_child (vcl_worker_t *wrk, vcl_worker_t *child_wrk); static void vls_handle_pending_wrk_cleanup (void); static inline u32 vls_get_worker_index (void) { return vlsl->vls_wrk_index; } static u32 vls_shared_data_alloc (void) { vls_shared_data_t *vls_shd; u32 shd_index; clib_rwlock_writer_lock (&vlsm->shared_data_lock); pool_get_zero (vlsm->shared_data_pool, vls_shd); clib_spinlock_init (&vls_shd->lock); shd_index = vls_shd - vlsm->shared_data_pool; clib_rwlock_writer_unlock (&vlsm->shared_data_lock); return shd_index; } static u32 vls_shared_data_index (vls_shared_data_t * vls_shd) { return vls_shd - vlsm->shared_data_pool; } vls_shared_data_t * vls_shared_data_get (u32 shd_index) { if (pool_is_free_index (vlsm->shared_data_pool, shd_index)) return 0; return pool_elt_at_index (vlsm->shared_data_pool, shd_index); } static void vls_shared_data_free (u32 shd_index) { vls_shared_data_t *vls_shd; clib_rwlock_writer_lock (&vlsm->shared_data_lock); vls_shd = vls_shared_data_get (shd_index); clib_spinlock_free (&vls_shd->lock); clib_bitmap_free (vls_shd->listeners); vec_free (vls_shd->workers_subscribed); pool_put (vlsm->shared_data_pool, vls_shd); clib_rwlock_writer_unlock (&vlsm->shared_data_lock); } static inline void vls_shared_data_pool_rlock (void) { clib_rwlock_reader_lock (&vlsm->shared_data_lock); } static inline void vls_shared_data_pool_runlock (void) { clib_rwlock_reader_unlock (&vlsm->shared_data_lock); } static inline void vls_mt_pool_rlock (void) { if (vlsl->vls_mt_n_threads > 1) clib_rwlock_reader_lock (&vlsl->vls_pool_lock); } static inline void vls_mt_pool_runlock (void) { if (vlsl->vls_mt_n_threads > 1) clib_rwlock_reader_unlock (&vlsl->vls_pool_lock); } static inline void vls_mt_pool_wlock (void) { if (vlsl->vls_mt_n_threads > 1) clib_rwlock_writer_lock (&vlsl->vls_pool_lock); } static inline void vls_mt_pool_wunlock (void) { if (vlsl->vls_mt_n_threads > 1) clib_rwlock_writer_unlock (&vlsl->vls_pool_lock); } typedef enum { VLS_MT_OP_READ, VLS_MT_OP_WRITE, VLS_MT_OP_SPOOL, VLS_MT_OP_XPOLL, } vls_mt_ops_t; typedef enum { VLS_MT_LOCK_MQ = 1 << 0, VLS_MT_LOCK_SPOOL = 1 << 1 } vls_mt_lock_type_t; static void vls_mt_add (void) { vlsl->vls_mt_n_threads += 1; /* If multi-thread workers are supported, for each new thread register a new * vcl worker with vpp. Otherwise, all threads use the same vcl worker, so * update the vcl worker's thread local worker index variable */ if (vls_mt_wrk_supported ()) { if (vppcom_worker_register () != VPPCOM_OK) VERR ("failed to register worker"); } else vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void vls_mt_mq_lock (void) { pthread_mutex_lock (&vlsl->vls_mt_mq_mlock); } static inline void vls_mt_mq_unlock (void) { pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock); } static inline void vls_mt_spool_lock (void) { pthread_mutex_lock (&vlsl->vls_mt_spool_mlock); } static inline void vls_mt_create_unlock (void) { pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock); } static void vls_mt_locks_init (void) { pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL); pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL); } u8 vls_is_shared (vcl_locked_session_t * vls) { return (vls->shared_data_index != ~0); } static inline void vls_lock (vcl_locked_session_t * vls) { if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls)) clib_spinlock_lock (&vls->lock); } static inline void vls_unlock (vcl_locked_session_t * vls) { if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls)) clib_spinlock_unlock (&vls->lock); } static inline vcl_session_handle_t vls_to_sh (vcl_locked_session_t * vls) { return vcl_session_handle_from_index (vls->session_index); } static inline vcl_session_handle_t vls_to_sh_tu (vcl_locked_session_t * vls) { vcl_session_handle_t sh; sh = vls_to_sh (vls); vls_mt_pool_runlock (); return sh; } static vls_worker_t * vls_worker_get_current (void) { return pool_elt_at_index (vlsm->workers, vls_get_worker_index ()); } static void vls_worker_alloc (void) { vls_worker_t *wrk; pool_get_zero (vlsm->workers, wrk); if (vls_mt_wrk_supported ()) clib_rwlock_init (&wrk->sh_to_vlsh_table_lock); wrk->vcl_wrk_index = vcl_get_worker_index (); vec_validate (wrk->pending_vcl_wrk_cleanup, 16); vec_reset_length (wrk->pending_vcl_wrk_cleanup); } static void vls_worker_free (vls_worker_t * wrk) { hash_free (wrk->sh_to_vlsh_table); if (vls_mt_wrk_supported ()) clib_rwlock_free (&wrk->sh_to_vlsh_table_lock); pool_free (wrk->vls_pool); pool_put (vlsm->workers, wrk); } static vls_worker_t * vls_worker_get (u32 wrk_index) { if (pool_is_free_index (vlsm->workers, wrk_index)) return 0; return pool_elt_at_index (vlsm->workers, wrk_index); } static void vls_sh_to_vlsh_table_add (vls_worker_t *wrk, vcl_session_handle_t sh, u32 vlsh) { if (vls_mt_wrk_supported ()) clib_rwlock_writer_lock (&wrk->sh_to_vlsh_table_lock); hash_set (wrk->sh_to_vlsh_table, sh, vlsh); if (vls_mt_wrk_supported ()) clib_rwlock_writer_unlock (&wrk->sh_to_vlsh_table_lock); } static void vls_sh_to_vlsh_table_del (vls_worker_t *wrk, vcl_session_handle_t sh) { if (vls_mt_wrk_supported ()) clib_rwlock_writer_lock (&wrk->sh_to_vlsh_table_lock); hash_unset (wrk->sh_to_vlsh_table, sh); if (vls_mt_wrk_supported ()) clib_rwlock_writer_unlock (&wrk->sh_to_vlsh_table_lock); } static uword * vls_sh_to_vlsh_table_get (vls_worker_t *wrk, vcl_session_handle_t sh) { if (vls_mt_wrk_supported ()) clib_rwlock_reader_lock (&wrk->sh_to_vlsh_table_lock); uword *vlshp = hash_get (wrk->sh_to_vlsh_table, sh); if (vls_mt_wrk_supported ()) clib_rwlock_reader_unlock (&wrk->sh_to_vlsh_table_lock); return vlshp; } static vls_handle_t vls_alloc (vcl_session_handle_t sh) { vls_worker_t *wrk = vls_worker_get_current (); vcl_locked_session_t *vls; vls_mt_pool_wlock (); pool_get_zero (wrk->vls_pool, vls); vls->session_index = vppcom_session_index (sh); vls->vcl_wrk_index = vppcom_session_worker (sh); vls->vls_index = vls - wrk->vls_pool; vls->shared_data_index = ~0; vls_sh_to_vlsh_table_add (wrk, sh, vls->vls_index); if (vls_mt_wrk_supported ()) { hash_set (vls->vcl_wrk_index_to_session_index, vls->vcl_wrk_index, vls->session_index); vls->owner_vcl_wrk_index = vls->vcl_wrk_index; } clib_spinlock_init (&vls->lock); vls_mt_pool_wunlock (); return vls->vls_index; } static vcl_locked_session_t * vls_get (vls_handle_t vlsh) { vls_worker_t *wrk = vls_worker_get_current (); if (pool_is_free_index (wrk->vls_pool, vlsh)) return 0; return pool_elt_at_index (wrk->vls_pool, vlsh); } static void vls_free (vcl_locked_session_t * vls) { vls_worker_t *wrk = vls_worker_get_current (); ASSERT (vls != 0); vls_sh_to_vlsh_table_del ( wrk, vcl_session_handle_from_index (vls->session_index)); clib_spinlock_free (&vls->lock); pool_put (wrk->vls_pool, vls); } static vcl_locked_session_t * vls_get_and_lock (vls_handle_t vlsh) { vls_w
/*
 *------------------------------------------------------------------
 * json_format.c
 *
 * Copyright (c) 2015 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *------------------------------------------------------------------

*/
#include <inttypes.h>
#include <vat/json_format.h>
#include <vnet/ip/ip.h>
#include <vppinfra/vec.h>

#define VAT_TAB_WIDTH               2

typedef struct vat_print_ctx_s
{
  FILE *ofp;
  u32 indent;
} vat_print_ctx_t;

/* Format an IP4 address. */
static u8 *
vat_json_format_ip4_address (u8 * s, va_list * args)
{
  u8 *a = va_arg (*args, u8 *);
  return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
}

/* Format an IP6 address. */
static u8 *
vat_json_format_ip6_address (u8 * s, va_list * args)
{
  ip6_address_t *a = va_arg (*args, ip6_address_t *);
  u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;

  i_max_n_zero = ARRAY_LEN (a->as_u16);
  max_n_zeros = 0;
  i_first_zero = i_max_n_zero;
  n_zeros = 0;
  for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
    {
      u32 is_zero = a->as_u16[i] == 0;
      if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
	{
	  i_first_zero = i;
	  n_zeros = 0;
	}
      n_zeros += is_zero;
      if ((!is_zero && n_zeros > max_n_zeros)
	  || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
	{
	  i_max_n_zero = i_first_zero;
	  max_n_zeros = n_zeros;
	  i_first_zero = ARRAY_LEN (a->as_u16);
	  n_zeros = 0;
	}
    }

  last_double_colon = 0;
  for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
    {
      if (i == i_max_n_zero && max_n_zeros > 1)
	{
	  s = format (s, "::");
	  i += max_n_zeros - 1;
	  last_double_colon = 1;
	}
      else
	{
	  s = format (s, "%s%x",
		      (last_double_colon || i == 0) ? "" : ":",
		      clib_net_to_host_u16 (a->as_u16[i]));
	  last_double_colon = 0;
	}
    }

  return s;
}

static void
vat_json_indent_print (vat_print_ctx_t * ctx)
{
  int i;
  for (i = 0; i < ctx->indent * VAT_TAB_WIDTH; i++)
    {
      fformat (ctx->ofp, " ");
    }
}

static void
vat_json_indent_line (vat_print_ctx_t * ctx, char *fmt, ...)
{
  va_list va;

  vat_json_indent_print (ctx);
  va_start (va, fmt);
  va_fformat (ctx->ofp, fmt, &va);
  va_end (va);
}

static u8
is_num_only (vat_json_node_t * p)
{
  vat_json_node_t *elem;
  vec_foreach (elem, p)
  {
    if (VAT_JSON_INT != elem->type && VAT_JSON_UINT != elem->type)
      {
	return 0;
      }
  }
  return 1;
}

static void
vat_json_print_internal (vat_print_ctx_t * ctx, vat_json_node_t * node)
{
#define P(fmt,...) fformat(ctx->ofp, fmt, ##__VA_ARGS__)
#define PL(fmt,...) fformat(ctx->ofp, fmt"\n", ##__VA_ARGS__)
#define PPL(fmt,...) vat_json_indent_line(ctx, fmt"\n", ##__VA_ARGS__)
#define PP(fmt,...) vat_json_indent_line(ctx, fmt, ##__VA_ARGS__)
#define INCR (ctx->indent++)
#define DECR (ctx->indent--)

  vat_json_pair_t *pair;
  u32 i, count;
  vat_json_node_t *elem;
  u8 num_only = 0;

  if (!node)
    {
      return;
    }

  switch (node->type)
    {
    case VAT_JSON_OBJECT:
      count = vec_len (node->pairs);
      if (count >= 1)
	{
	  PL ("{");
	  INCR;
	  for (i = 0; i < count; i++)
	    {
	      pair = &node->pairs[i];
	      PP ("\"%s\": ", pair->name);
	      vat_json_print_internal (ctx, &pair->value);
	      if (i < count - 1)
		{
		  P (",");
		}
	      PL ();
	    }
	  DECR;
	  PP ("}");
	}
      else
	{
	  P ("{}");
	}
      break;
    case VAT_JSON_ARRAY:
      num_only = is_num_only (node->array);
      count = vec_len (node->array);
      if (count >= 1)
	{
	  if (num_only)
	    P ("[");
	  else
	    PL ("[ ");
	  INCR;
	  for (i = 0; i < count; i++)
	    {
	      elem = &node->array[i];
	      if (!num_only)
		{
		  vat_json_indent_print (ctx);
		}
	      vat_json_print_internal (ctx, elem);
	      if (i < count - 1)
		{
		  if (num_only)
		    {
		      P (", ");
		    }
		  else
		    {
		      P (",");
		    }
		}
	      if (!num_only)
		PL ();
	    }
	  DECR;
	  if (!num_only)
	    PP ("]");
	  else
	    P ("]");
	}
      else
	{
	  P ("[]");
	}
      break;
    case VAT_JSON_INT:
      P ("%d", node->sint);
      break;
    case VAT_JSON_UINT:
      P ("%" PRIu64, node->uint);
      break;
    case VAT_JSON_REAL:
      P ("%f", node->real);
      break;
    case VAT_JSON_STRING:
      P ("\"%s\"", node->string);
      break;
    case VAT_JSON_IPV4:
      P ("\"%U\"", vat_json_format_ip4_address, &node->ip4);
      break;
    case VAT_JSON_IPV6:
      P ("\"%U\"", vat_json_format_ip6_address, &node->ip6);
      break;
    default:
      break;
    }
#undef PPL
#undef PP
#undef PL
#undef P
}

void
vat_json_print (FILE * ofp, vat_json_node_t * node)
{
  vat_print_ctx_t ctx;
  clib_memset (&ctx, 0, sizeof ctx);
  ctx.indent = 0;
  ctx.ofp = ofp;
  fformat (ofp, "\n");
  vat_json_print_internal (&ctx, node);
  fformat (ofp, "\n");
}

void
vat_json_free (vat_json_node_t * node)
{
  int i = 0;

  if (NULL == node)
    {
      return;
    }
  switch (node->