aboutsummaryrefslogtreecommitdiffstats
path: root/src/vlib/node.c
AgeCommit message (Collapse)AuthorFilesLines
2019-11-04vlib: fix for vlib_node_add_next_with_slotChristian Hopps1-1/+10
- vlib_node_add_next_with_slot was not cleaning the old next node references to the given slot when replacing it with new next node. This mostly worked until one tried to set the slot to a previously (but not currently) used next node for that slot. Type: fix Signed-off-by: Christian Hopps <chopps@labn.net> Change-Id: I7ee607625da874e320158b80f12ddc16e377f8e9
2019-10-07vlib: move thread barrier around mod of global node next dataChristian E. Hopps1-6/+5
The old code modified the node next array prior to obtaining the thread barrier. Then it updated the runtime node data, and upon barrier release caused reforking of each worker thread. The reforking clones the main thread nodes and reconstructs the runtime node structure. This cloning is not 100% "deep" in the sense that the node next array is shared (i.e., only the pointer is copied). So prior to the barrier being obtained the node's next array is being changed while workers are actively using it (bad). Treating the node next array as read-only in the workers and sharing it is a decent optimization so instead of trying to fix that just move the barrier a little earlier in the process to protect the node next array as well. This was tripping an assert in next frame ownership change by way of the ip4-arp node. The assert verifies that the node's next array length is equal to the runtime next node count. The race above was lost and the node next array data was updated in the main thread while the arp code was still executing in a worker. This was being hit when many arp requests were being sent from both ends of a tunnel during which the add next node function was called, which often led to an assert b/c the next node array was out of sync with the runtime next node count. - PS#2 update - move barrier sync to just above code that modifies state. Ticket: VPP-1783 Type: fix Signed-off-by: Christian E. Hopps <chopps@chopps.org> Change-Id: I868784e28f994ee0922aaaae11c4894a3f4f1fe7 Signed-off-by: Christian E. Hopps <chopps@chopps.org>
2019-07-23vlib: address vlib_error_t scaling issueDave Barach1-1/+1
Encoding the vpp node index into the vlib_error_t as a 10-bit quantity limits us to 1K graph nodes. Unfortunately, a few nodes need 6 bit per-node error codes. Only a very few nodes have so many counters. It turns out that there are about 2K total error counters in the system, which is (approximately) the maximum error heap index. The current (index,code) encoding limits the number of interfaces to around 250, since each interface has two associated graph nodes and we have about 500 "normal, interior" graph node This patch adds an error-index to node-index map, so we can store error heap indices directly in the vlib_buffer_t. Type: refactor Change-Id: I28101cad3d8750819e27b8785fc0cf71ff54f79a Signed-off-by: Dave Barach <dave@barachs.net>
2019-05-29Break out the broom for some cleanup workDave Barach1-0/+28
Maintain the MAINTAINERS file. Removed src/plugins/*.am listings. Added a couple of plugins. Add vlib_process_create (vlib_main_t *vm, char *name, vlib_node_function_t *f, u32 log2_n_stack_bytes); /** @brief Create a vlib process * @param vm &vlib_global_main * @param f the process node function * @param log2_n_stack_bytes size of the process stack, defaults to 16K * @return newly-create node index * @warning call only on the main thread. Barrier sync required. */ This function makes it easy to spin up periodic processes when features are enabled for the first time. That coding pattern is highly recommended. Update the emacs-lisp plugin generator to use vlib_process_create, instead of generating static periodic process nodes. Change-Id: Icda33e93b9034779d3a3e228cd1110af14b058a5 Signed-off-by: Dave Barach <dave@barachs.net>
2019-03-26node: vector must be checked with vec_headerKingwel Xie1-1/+1
see register_node, node-name might be a vector Change-Id: I883ec51c1fa9aa4da4ba6cba415a39bb6a4331e1 Signed-off-by: Kingwel Xie <kingwel.xie@ericsson.com>
2019-03-10Perf tune get_frame_size_infoDave Barach1-0/+3
It turns out that for scalar sizes 0..24, frames are always the same size. That range includes all current use-cases - and then some - so get rid of the hash table. Old code preserved under #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES. Change-Id: Ic005c7143c9639f77d1a0fadd2fc0e90dccb68c1 Signed-off-by: Dave Barach <dbarach@cisco.com>
2018-11-30Metadata / opaque formatting belongs in vppDave Barach1-0/+1
VPP graph dispatch trace record description: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Major Version | Minor Version | NStrings | ProtoHint | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Buffer index (big endian) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + VPP graph node name ... ... | NULL octet | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Buffer Metadata ... ... | NULL octet | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Buffer Opaque ... ... | NULL octet | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Buffer Opaque 2 ... ... | NULL octet | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | VPP ASCII packet trace (if NStrings > 4) | NULL octet | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Packet data (up to 16K) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Graph dispatch records comprise a version stamp, an indication of how many NULL-terminated strings will follow the record header, and a protocol hint. The buffer index allows downstream consumers of these data to easily filter/track single packets as they traverse the forwarding graph. FWIW, the 32-bit buffer index is stored in big endian format. As of this writing, major version = 1, minor version = 0. Nstrings will be either 4 or 5. Here is the current set of protocol hints: typedef enum { VLIB_NODE_PROTO_HINT_NONE = 0, VLIB_NODE_PROTO_HINT_ETHERNET, VLIB_NODE_PROTO_HINT_IP4, VLIB_NODE_PROTO_HINT_IP6, VLIB_NODE_PROTO_HINT_TCP, VLIB_NODE_PROTO_HINT_UDP, VLIB_NODE_N_PROTO_HINTS, } vlib_node_proto_hint_t; Example: VLIB_NODE_PROTO_HINT_IP6 means that the first octet of packet data SHOULD be 0x60, and should begin an ipv6 packet header. Change-Id: Idf310bad80cc0e4207394c80f18db5f77c378741 Signed-off-by: Dave Barach <dave@barachs.net>
2018-11-13vlib rename vlib_frame_args(...) to vlib_frame_scalar_args(..)Damjan Marion1-1/+1
Typically we have scalar_size == 0, so it doesn't matter but vlib_frame_args was providing pointer to scalar frame data, not vector data. To avoid future confusion function is renamed to vlib_frame_scalar_args(...) Change-Id: I48b75523b46d487feea24f3f3cb10c528dde516f Signed-off-by: Damjan Marion <damarion@cisco.com>
2018-10-23c11 safe string handling supportDave Barach1-3/+3
Change-Id: Ied34720ca5a6e6e717eea4e86003e854031b6eab Signed-off-by: Dave Barach <dave@barachs.net>
2018-08-28vlib: add 'show node' and 'set node function' CLIDamjan Marion1-0/+1
Change-Id: I084d7c9e34329f10b5fe45e0b157c4defe0f2811 Signed-off-by: Damjan Marion <damarion@cisco.com>
2018-06-13Stat segment / client: show run" works nowDave Barach1-11/+19
Seems to have minimal-to-zero performance consequences. Data appears accurate: result match the debug CLI output. Checked at low rates, 27 MPPS sprayed across two worker threads. Change-Id: I09ede5150b88a91547feeee448a2854997613004 Signed-off-by: Dave Barach <dave@barachs.net>
2018-05-29Add VLIB_NODE_FN() macro to simplify multiversioning of node functionsDamjan Marion1-0/+20
Change-Id: Ibab5e27277f618ceb2d543b9d6a1a5f191e7d1db Signed-off-by: Damjan Marion <damarion@cisco.com>
2018-01-09api: refactor vlibmemoryFlorin Coras1-0/+54
- separate client/server code for both memory and socket apis - separate memory api code from generic vlib api code - move unix_shared_memory_fifo to svm and rename to svm_fifo_t - overall declutter Change-Id: I90cdd98ff74d0787d58825b914b0f1eafcfa4dc2 Signed-off-by: Florin Coras <fcoras@cisco.com>
2017-07-13VPP-895 multi-thread: fix vpp crash on show runtimeIgor Mikhailov (imichail)1-0/+29
In multi-threaded model (e.g. 1 main and 1 worker threads), after an ethernet interface is deleted (e.g. vhost-user interface), 'show runtime' command produces garbled output and sometimes leads to vpp crash. The reason is because vlib_node_rename() frees and reallocates node's 'n->name' vector, however the change is not propagated into copies of the node on worker threads. Change-Id: Ibf22422913b7f2df22f70f3b2fe8dafd34c1dd06 Signed-off-by: Igor Mikhailov (imichail) <imichail@cisco.com> (cherry picked from commit 02989064e4c26a4940a5292ba6c47023e6dd3131)
2017-06-30VPP debug image with worker threads hit assert on adding IP route with ↵Neale Ranns1-0/+20
traffic (VPP-892) When stacking DPOs the VLIB graph is also updated to add the edge between the nodes, if this edge does not yet exist. This addition should be done with the workers stopped. Change-Id: I327e4d7d26f0b23eb280f17e4619ff2093ff7940 Signed-off-by: Neale Ranns <nranns@cisco.com> (cherry picked from commit c02bd03ddf5eec9e9c79811360685f13e4ba8ee1)
2017-06-09Implement sack based tcp loss recovery (RFC 6675)Florin Coras1-0/+1
- refactor existing congestion control code (RFC 6582/5681). Handling of ack feedback now consists of: ack parsing, cc event detection, event handling, congestion control update - extend sack scoreboard to support sack based retransmissions - basic implementation of Eifel detection algorithm (RFC 3522) for detecting spurious retransmissions - actually initialize the per-thread frame freelist hash tables - increase worker stack size to 2mb - fix session queue node out-of-buffer handling - ensure that the local buffer cache vec_len matches reality - avoid 2x spurious event requeues when short of buffers - count out-of-buffer events - make the builtin server thread-safe - fix bihash template threading issue: need to paint -1 across uninitialized working_copy_length vector elements (via rebase from master) Change-Id: I646cb9f1add9a67d08f4a87badbcb117980ebfc4 Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Barach <dbarach@cisco.com>
2017-04-06Use thread local storage for thread indexDamjan Marion1-1/+1
This patch deprecates stack-based thread identification, Also removes requirement that thread stacks are adjacent. Finally, possibly annoying for some folks, it renames all occurences of cpu_index and cpu_number with thread index. Using word "cpu" is misleading here as thread can be migrated ti different CPU, and also it is not related to linux cpu index. Change-Id: I68cdaf661e701d2336fc953dcb9978d10a70f7c1 Signed-off-by: Damjan Marion <damarion@cisco.com>
2017-03-16vlib: make runtime_data thread-localDamjan Marion1-3/+1
Change-Id: I4aa3e7e42fb81211de1aed07dc7befee87a1e18b Signed-off-by: Damjan Marion <damarion@cisco.com>
2016-12-28Reorganize source tree to use single autotools instanceDamjan Marion1-0/+631
Change-Id: I7b51f88292e057c6443b12224486f2d0c9f8ae23 Signed-off-by: Damjan Marion <damarion@cisco.com>
' href='#n759'>759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
/*
 *------------------------------------------------------------------
 * Copyright (c) 2018 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *------------------------------------------------------------------
 */
#include <signal.h>

#include <vlib/vlib.h>
#include <vlibapi/api.h>
#include <vlibmemory/api.h>
#include <vlibmemory/memory_api.h>

#include <vlibmemory/vl_memory_msg_enum.h>	/* enumerate all vlib messages */

#define vl_typedefs		/* define message structures */
#include <vlibmemory/vl_memory_api_h.h>
#undef vl_typedefs

/* instantiate all the print functions we know about */
#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
#define vl_printfun
#include <vlibmemory/vl_memory_api_h.h>
#undef vl_printfun

/* instantiate all the endian swap functions we know about */
#define vl_endianfun
#include <vlibmemory/vl_memory_api_h.h>
#undef vl_endianfun

volatile int **vl_api_queue_cursizes;

static void
memclnt_queue_callback (vlib_main_t * vm)
{
  int i;
  api_main_t *am = vlibapi_get_main ();
  int have_pending_rpcs;

  if (PREDICT_FALSE (vec_len (vl_api_queue_cursizes) !=
		     1 + vec_len (am->vlib_private_rps)))
    {
      vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
      svm_queue_t *q;

      if (shmem_hdr == 0)
	return;

      q = shmem_hdr->vl_input_queue;
      if (q == 0)
	return;

      vec_add1 (vl_api_queue_cursizes, &q->cursize);

      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
	{
	  svm_region_t *vlib_rp = am->vlib_private_rps[i];

	  shmem_hdr = (void *) vlib_rp->user_ctx;
	  q = shmem_hdr->vl_input_queue;
	  vec_add1 (vl_api_queue_cursizes, &q->cursize);
	}
    }

  for (i = 0; i < vec_len (vl_api_queue_cursizes); i++)
    {
      if (*vl_api_queue_cursizes[i])
	{
	  vm->queue_signal_pending = 1;
	  vm->api_queue_nonempty = 1;
	  vlib_process_signal_event (vm, vl_api_clnt_node.index,
				     /* event_type */ QUEUE_SIGNAL_EVENT,
				     /* event_data */ 0);
	  break;
	}
    }

  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
  have_pending_rpcs = vec_len (vm->pending_rpc_requests) > 0;
  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);

  if (have_pending_rpcs)
    {
      vm->queue_signal_pending = 1;
      vm->api_queue_nonempty = 1;
      vlib_process_signal_event (vm, vl_api_clnt_node.index,
				 /* event_type */ QUEUE_SIGNAL_EVENT,
				 /* event_data */ 0);
    }
}

/*
 * vl_api_memclnt_create_internal
 */
u32
vl_api_memclnt_create_internal (char *name, svm_queue_t * q)
{
  vl_api_registration_t **regpp;
  vl_api_registration_t *regp;
  void *oldheap;
  api_main_t *am = vlibapi_get_main ();

  ASSERT (vlib_get_thread_index () == 0);
  pool_get (am->vl_clients, regpp);


  oldheap = vl_msg_push_heap ();
  *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));

  regp = *regpp;
  clib_memset (regp, 0, sizeof (*regp));
  regp->registration_type = REGISTRATION_TYPE_SHMEM;
  regp->vl_api_registration_pool_index = regpp - am->vl_clients;
  regp->vlib_rp = am->vlib_rp;
  regp->shmem_hdr = am->shmem_hdr;

  regp->vl_input_queue = q;
  regp->name = format (0, "%s%c", name, 0);

  vl_msg_pop_heap (oldheap);
  return vl_msg_api_handle_from_index_and_epoch
    (regp->vl_api_registration_pool_index,
     am->shmem_hdr->application_restarts);
}

/*
 * vl_api_memclnt_create_t_handler
 */
void
vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
{
  vl_api_registration_t **regpp;
  vl_api_registration_t *regp;
  vl_api_memclnt_create_reply_t *rp;
  svm_queue_t *q;
  int rv = 0;
  void *oldheap;
  api_main_t *am = vlibapi_get_main ();
  u8 *msg_table;

  /*
   * This is tortured. Maintain a vlib-address-space private
   * pool of client registrations. We use the shared-memory virtual
   * address of client structure as a handle, to allow direct
   * manipulation of context quota vbls from the client library.
   *
   * This scheme causes trouble w/ API message trace replay, since
   * some random VA from clib_mem_alloc() certainly won't
   * occur in the Linux sim. The (very) few places
   * that care need to use the pool index.
   *
   * Putting the registration object(s) into a pool in shared memory and
   * using the pool index as a handle seems like a great idea.
   * Unfortunately, each and every reference to that pool would need
   * to be protected by a mutex:
   *
   *     Client                      VLIB
   *     ------                      ----
   *     convert pool index to
   *     pointer.
   *     <deschedule>
   *                                 expand pool
   *                                 <deschedule>
   *     kaboom!
   */

  pool_get (am->vl_clients, regpp);

  oldheap = vl_msg_push_heap ();
  *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));

  regp = *regpp;
  clib_memset (regp, 0, sizeof (*regp));
  regp->registration_type = REGISTRATION_TYPE_SHMEM;
  regp->vl_api_registration_pool_index = regpp - am->vl_clients;
  regp->vlib_rp = am->vlib_rp;
  regp->shmem_hdr = am->shmem_hdr;
  regp->clib_file_index = am->shmem_hdr->clib_file_index;

  q = regp->vl_input_queue = (svm_queue_t *) (uword) mp->input_queue;
  VL_MSG_API_SVM_QUEUE_UNPOISON (q);

  regp->name = format (0, "%s", mp->name);
  vec_add1 (regp->name, 0);
  regp->keepalive = true;

  if (am->serialized_message_table_in_shmem == 0)
    am->serialized_message_table_in_shmem =
      vl_api_serialize_message_table (am, 0);

  if (am->vlib_rp != am->vlib_primary_rp)
    msg_table = vl_api_serialize_message_table (am, 0);
  else
    msg_table = am->serialized_message_table_in_shmem;

  vl_msg_pop_heap (oldheap);

  rp = vl_msg_api_alloc (sizeof (*rp));
  rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_REPLY);
  rp->handle = (uword) regp;
  rp->index = vl_msg_api_handle_from_index_and_epoch
    (regp->vl_api_registration_pool_index,
     am->shmem_hdr->application_restarts);
  rp->context = mp->context;
  rp->response = ntohl (rv);
  rp->message_table = pointer_to_uword (msg_table);

  vl_msg_api_send_shmem (q, (u8 *) & rp);
}

void
vl_api_memclnt_create_v2_t_handler (vl_api_memclnt_create_v2_t *mp)
{
  vl_api_registration_t **regpp;
  vl_api_registration_t *regp;
  vl_api_memclnt_create_v2_reply_t *rp;
  svm_queue_t *q;
  int rv = 0;
  void *oldheap;
  api_main_t *am = vlibapi_get_main ();
  u8 *msg_table;

  /*
   * This is tortured. Maintain a vlib-address-space private
   * pool of client registrations. We use the shared-memory virtual
   * address of client structure as a handle, to allow direct
   * manipulation of context quota vbls from the client library.
   *
   * This scheme causes trouble w/ API message trace replay, since
   * some random VA from clib_mem_alloc() certainly won't
   * occur in the Linux sim. The (very) few places
   * that care need to use the pool index.
   *
   * Putting the registration object(s) into a pool in shared memory and
   * using the pool index as a handle seems like a great idea.
   * Unfortunately, each and every reference to that pool would need
   * to be protected by a mutex:
   *
   *     Client                      VLIB
   *     ------                      ----
   *     convert pool index to
   *     pointer.
   *     <deschedule>
   *                                 expand pool
   *                                 <deschedule>
   *     kaboom!
   */

  pool_get (am->vl_clients, regpp);

  oldheap = vl_msg_push_heap ();
  *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));

  regp = *regpp;
  clib_memset (regp, 0, sizeof (*regp));
  regp->registration_type = REGISTRATION_TYPE_SHMEM;
  regp->vl_api_registration_pool_index = regpp - am->vl_clients;
  regp->vlib_rp = am->vlib_rp;
  regp->shmem_hdr = am->shmem_hdr;
  regp->clib_file_index = am->shmem_hdr->clib_file_index;

  q = regp->vl_input_queue = (svm_queue_t *) (uword) mp->input_queue;
  VL_MSG_API_SVM_QUEUE_UNPOISON (q);

  regp->name = format (0, "%s", mp->name);
  vec_add1 (regp->name, 0);
  regp->keepalive = mp->keepalive;

  if (am->serialized_message_table_in_shmem == 0)
    am->serialized_message_table_in_shmem =
      vl_api_serialize_message_table (am, 0);

  if (am->vlib_rp != am->vlib_primary_rp)
    msg_table = vl_api_serialize_message_table (am, 0);
  else
    msg_table = am->serialized_message_table_in_shmem;

  vl_msg_pop_heap (oldheap);

  rp = vl_msg_api_alloc (sizeof (*rp));
  rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_V2_REPLY);
  rp->handle = (uword) regp;
  rp->index = vl_msg_api_handle_from_index_and_epoch (
    regp->vl_api_registration_pool_index, am->shmem_hdr->application_restarts);
  rp->context = mp->context;
  rp->response = ntohl (rv);
  rp->message_table = pointer_to_uword (msg_table);

  vl_msg_api_send_shmem (q, (u8 *) &rp);
}

void
vl_api_call_reaper_functions (u32 client_index)
{
  clib_error_t *error = 0;
  _vl_msg_api_function_list_elt_t *i;

  i = vlibapi_get_main ()->reaper_function_registrations;
  while (i)
    {
      error = i->f (client_index);
      if (error)
	clib_error_report (error);
      i = i->next_init_function;
    }
}

/*
 * vl_api_memclnt_delete_t_handler
 */
void
vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
{
  vl_api_registration_t **regpp;
  vl_api_registration_t *regp;
  vl_api_memclnt_delete_reply_t *rp;
  void *oldheap;
  api_main_t *am = vlibapi_get_main ();
  u32 handle, client_index, epoch;

  handle = mp->index;

  vl_api_call_reaper_functions (handle);

  epoch = vl_msg_api_handle_get_epoch (handle);
  client_index = vl_msg_api_handle_get_index (handle);

  if (epoch != (am->shmem_hdr->application_restarts & VL_API_EPOCH_MASK))
    {
      clib_warning
	("Stale clnt delete index %d old epoch %d cur epoch %d",
	 client_index, epoch,
	 (am->shmem_hdr->application_restarts & VL_API_EPOCH_MASK));
      return;
    }

  regpp = pool_elt_at_index (am->vl_clients, client_index);

  if (!pool_is_free (am->vl_clients, regpp))
    {
      int i;
      regp = *regpp;
      int private_registration = 0;

      /* Send reply unless client asked us to do the cleanup */
      if (!mp->do_cleanup)
	{
	  /*
	   * Note: the API message handling path will set am->vlib_rp
	   * as appropriate for pairwise / private memory segments
	   */
	  rp = vl_msg_api_alloc (sizeof (*rp));
	  rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE_REPLY);
	  rp->handle = mp->handle;
	  rp->response = 1;

	  vl_msg_api_send_shmem (regp->vl_input_queue, (u8 *) & rp);
	  if (client_index != regp->vl_api_registration_pool_index)
	    {
	      clib_warning ("mismatch client_index %d pool_index %d",
			    client_index,
			    regp->vl_api_registration_pool_index);
	      vl_msg_api_free (rp);
	      return;
	    }
	}

      /* No dangling references, please */
      *regpp = 0;

      /* For horizontal scaling, add a hash table... */
      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
	{
	  /* Is this a pairwise / private API segment? */
	  if (am->vlib_private_rps[i] == am->vlib_rp)
	    {
	      /* Note: account for the memfd header page */
	      uword virtual_base = am->vlib_rp->virtual_base - MMAP_PAGESIZE;
	      uword virtual_size = am->vlib_rp->virtual_size + MMAP_PAGESIZE;

	      /*
	       * Kill the registration pool element before we make
	       * the index vanish forever
	       */
	      pool_put_index (am->vl_clients,
			      regp->vl_api_registration_pool_index);

	      vec_delete (am->vlib_private_rps, 1, i);
	      /* Kill it, accounting for the memfd header page */
	      if (munmap ((void *) virtual_base, virtual_size) < 0)
		clib_unix_warning ("munmap");
	      /* Reset the queue-length-address cache */
	      vec_reset_length (vl_api_queue_cursizes);
	      private_registration = 1;
	      break;
	    }
	}

      if (private_registration == 0)
	{
	  pool_put_index (am->vl_clients,
			  regp->vl_api_registration_pool_index);
	  oldheap = vl_msg_push_heap ();
	  if (mp->do_cleanup)
	    svm_queue_free (regp->vl_input_queue);
	  vec_free (regp->name);
	  /* Poison the old registration */
	  clib_memset (regp, 0xF1, sizeof (*regp));
	  clib_mem_free (regp);
	  vl_msg_pop_heap (oldheap);
	  /*
	   * These messages must be freed manually, since they're set up
	   * as "bounce" messages. In the private_registration == 1 case,
	   * we kill the shared-memory segment which contains the message
	   * with munmap.
	   */
	  vl_msg_api_free (mp);
	}
    }
  else
    {
      clib_warning ("unknown client ID %d", mp->index);
    }
}

/**
 * client answered a ping, stave off the grim reaper...
 */
void
  vl_api_memclnt_keepalive_reply_t_handler
  (vl_api_memclnt_keepalive_reply_t * mp)
{
  vl_api_registration_t *regp;
  vlib_main_t *vm = vlib_get_main ();

  regp = vl_api_client_index_to_registration (mp->context);
  if (regp)
    {
      regp->last_heard = vlib_time_now (vm);
      regp->unanswered_pings = 0;
    }
  else
    clib_warning ("BUG: anonymous memclnt_keepalive_reply");
}

/**
 * We can send ourselves these messages if someone uses the
 * builtin binary api test tool...
 */
static void
vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
{
  vl_api_memclnt_keepalive_reply_t *rmp;
  api_main_t *am;
  vl_shmem_hdr_t *shmem_hdr;

  am = vlibapi_get_main ();
  shmem_hdr = am->shmem_hdr;

  rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
  clib_memset (rmp, 0, sizeof (*rmp));
  rmp->_vl_msg_id = ntohs (VL_API_MEMCLNT_KEEPALIVE_REPLY);
  rmp->context = mp->context;
  vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & rmp);
}

/*
 * To avoid filling the API trace buffer with boring messages,
 * don't trace memclnt_keepalive[_reply] msgs
 */

#define foreach_vlib_api_msg                                                  \
  _ (MEMCLNT_CREATE, memclnt_create, 0)                                       \
  _ (MEMCLNT_CREATE_V2, memclnt_create_v2, 0)                                 \
  _ (MEMCLNT_DELETE, memclnt_delete, 0)                                       \
  _ (MEMCLNT_KEEPALIVE, memclnt_keepalive, 0)                                 \
  _ (MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply, 0)

/*
 * memory_api_init
 */
int
vl_mem_api_init (const char *region_name)
{
  int rv;
  api_main_t *am = vlibapi_get_main ();
  vl_msg_api_msg_config_t cfg;
  vl_msg_api_msg_config_t *c = &cfg;
  vl_shmem_hdr_t *shm;
  vlib_main_t *vm = vlib_get_main ();

  clib_memset (c, 0, sizeof (*c));

  if ((rv = vl_map_shmem (region_name, 1 /* is_vlib */ )) < 0)
    return rv;

#define _(N,n,t) do {                                            \
    c->id = VL_API_##N;                                         \
    c->name = #n;                                               \
    c->handler = vl_api_##n##_t_handler;                        \
    c->cleanup = vl_noop_handler;                               \
    c->endian = vl_api_##n##_t_endian;                          \
    c->print = vl_api_##n##_t_print;                            \
    c->size = sizeof(vl_api_##n##_t);                           \
    c->traced = t; /* trace, so these msgs print */             \
    c->replay = 0; /* don't replay client create/delete msgs */ \
    c->message_bounce = 0; /* don't bounce this message */	\
    vl_msg_api_config(c);} while (0);

  foreach_vlib_api_msg;
#undef _

#define vl_msg_name_crc_list
#include <vlibmemory/memclnt.api.h>
#undef vl_msg_name_crc_list

#define _(id, n, crc) vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id);
  foreach_vl_msg_name_crc_memclnt;
#undef _

  /*
   * special-case freeing of memclnt_delete messages, so we can
   * simply munmap pairwise / private API segments...
   */
  am->msg_data[VL_API_MEMCLNT_DELETE].bounce = 1;
  vl_api_set_msg_thread_safe (am, VL_API_MEMCLNT_KEEPALIVE_REPLY, 1);
  vl_api_set_msg_thread_safe (am, VL_API_MEMCLNT_KEEPALIVE, 1);

  vlib_set_queue_signal_callback (vm, memclnt_queue_callback);

  shm = am->shmem_hdr;
  ASSERT (shm && shm->vl_input_queue);

  /* Make a note so we can always find the primary region easily */
  am->vlib_primary_rp = am->vlib_rp;

  return 0;
}

clib_error_t *
map_api_segment_init (vlib_main_t * vm)
{
  api_main_t *am = vlibapi_get_main ();
  int rv;

  if ((rv = vl_mem_api_init (am->region_name)) < 0)
    {
      return clib_error_return (0, "vl_mem_api_init (%s) failed",
				am->region_name);
    }
  return 0;
}

static void
send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
{
  vl_api_memclnt_keepalive_t *mp;
  svm_queue_t *q;
  api_main_t *am = vlibapi_get_main ();

  q = regp->vl_input_queue;

  /*
   * If the queue head is moving, assume that the client is processing
   * messages and skip the ping. This heuristic may fail if the queue
   * is in the same position as last time, net of wrapping; in which
   * case, the client will receive a keepalive.
   */
  if (regp->last_queue_head != q->head)
    {
      regp->last_heard = now;
      regp->unanswered_pings = 0;
      regp->last_queue_head = q->head;
      return;
    }

  /*
   * push/pop shared memory segment, so this routine
   * will work with "normal" as well as "private segment"
   * memory clients..
   */

  mp = vl_mem_api_alloc_as_if_client_w_reg (regp, sizeof (*mp));
  clib_memset (mp, 0, sizeof (*mp));
  mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MEMCLNT_KEEPALIVE);
  mp->context = mp->client_index =
    vl_msg_api_handle_from_index_and_epoch
    (regp->vl_api_registration_pool_index,
     am->shmem_hdr->application_restarts);

  regp->unanswered_pings++;

  /* Failure-to-send due to a stuffed queue is absolutely expected */
  if (svm_queue_add (q, (u8 *) & mp, 1 /* nowait */ ))
    vl_msg_api_free_w_region (regp->vlib_rp, mp);
}

static void
vl_mem_send_client_keepalive_w_reg (api_main_t * am, f64 now,
				    vl_api_registration_t ** regpp,
				    u32 ** dead_indices,
				    u32 ** confused_indices)
{
  vl_api_registration_t *regp = *regpp;
  if (regp)
    {
      /* If we haven't heard from this client recently... */
      if (regp->last_heard < (now - 10.0))
	{
	  if (regp->unanswered_pings == 2)
	    {
	      svm_queue_t *q;
	      q = regp->vl_input_queue;
	      if (kill (q->consumer_pid, 0) >= 0)
		{
		  clib_warning ("REAPER: lazy binary API client '%s'",
				regp->name);
		  regp->unanswered_pings = 0;
		  regp->last_heard = now;
		}
	      else
		{
		  clib_warning ("REAPER: binary API client '%s' died",
				regp->name);
		  vec_add1 (*dead_indices, regpp - am->vl_clients);
		}
	    }
	  else
	    send_memclnt_keepalive (regp, now);
	}
      else
	regp->unanswered_pings = 0;
    }
  else
    {
      clib_warning ("NULL client registration index %d",
		    regpp - am->vl_clients);
      vec_add1 (*confused_indices, regpp - am->vl_clients);
    }
}

void
vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
{
  vl_api_registration_t **regpp;
  static u32 *dead_indices;
  static u32 *confused_indices;

  vec_reset_length (dead_indices);
  vec_reset_length (confused_indices);

  /* *INDENT-OFF* */
  pool_foreach (regpp, am->vl_clients)  {
      if (!(*regpp)->keepalive)
	continue;
      vl_mem_send_client_keepalive_w_reg (am, now, regpp, &dead_indices,
					  &confused_indices);
  }
  /* *INDENT-ON* */

  /* This should "never happen," but if it does, fix it... */
  if (PREDICT_FALSE (vec_len (confused_indices) > 0))
    {
      int i;
      for (i = 0; i < vec_len (confused_indices); i++)
	{
	  pool_put_index (am->vl_clients, confused_indices[i]);
	}
    }

  if (PREDICT_FALSE (vec_len (dead_indices) > 0))
    {
      int i;
      void *oldheap;

      /* Allow the application to clean up its registrations */
      for (i = 0; i < vec_len (dead_indices); i++)
	{
	  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
	  if (regpp)
	    {
	      u32 handle;

	      handle = vl_msg_api_handle_from_index_and_epoch
		(dead_indices[i], shm->application_restarts);
	      vl_api_call_reaper_functions (handle);
	    }
	}

      oldheap = vl_msg_push_heap ();

      for (i = 0; i < vec_len (dead_indices); i++)
	{
	  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
	  if (regpp)
	    {
	      /* Is this a pairwise SVM segment? */
	      if ((*regpp)->vlib_rp != am->vlib_rp)
		{
		  int i;
		  svm_region_t *dead_rp = (*regpp)->vlib_rp;
		  /* Note: account for the memfd header page */
		  uword virtual_base = dead_rp->virtual_base - MMAP_PAGESIZE;
		  uword virtual_size = dead_rp->virtual_size + MMAP_PAGESIZE;

		  /* For horizontal scaling, add a hash table... */
		  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
		    if (am->vlib_private_rps[i] == dead_rp)
		      {
			vec_delete (am->vlib_private_rps, 1, i);
			goto found;
		      }
		  svm_pop_heap (oldheap);
		  clib_warning ("private rp %llx AWOL", dead_rp);
		  oldheap = svm_push_data_heap (am->vlib_rp);

		found:
		  /* Kill it, accounting for the memfd header page */
		  svm_pop_heap (oldheap);
		  if (munmap ((void *) virtual_base, virtual_size) < 0)
		    clib_unix_warning ("munmap");
		  /* Reset the queue-length-address cache */
		  vec_reset_length (vl_api_queue_cursizes);
		  oldheap = svm_push_data_heap (am->vlib_rp);
		}
	      else
		{
		  /* Poison the old registration */
		  clib_memset (*regpp, 0xF3, sizeof (**regpp));
		  clib_mem_free (*regpp);
		}
	      /* no dangling references, please */
	      *regpp = 0;
	    }
	  else
	    {
	      svm_pop_heap (oldheap);
	      clib_warning ("Duplicate free, client index %d",
			    regpp - am->vl_clients);
	      oldheap = svm_push_data_heap (am->vlib_rp);
	    }
	}

      svm_client_scan_this_region_nolock (am->vlib_rp);

      vl_msg_pop_heap (oldheap);
      for (i = 0; i < vec_len (dead_indices); i++)
	pool_put_index (am->vl_clients, dead_indices[i]);
    }
}

void (*vl_mem_api_fuzz_hook) (u16, void *);

/* This is only to be called from a vlib/vnet app */
static void
vl_mem_api_handler_with_vm_node (api_main_t *am, svm_region_t *vlib_rp,
				 void *the_msg, vlib_main_t *vm,
				 vlib_node_runtime_t *node, u8 is_private)
{
  u16 id = clib_net_to_host_u16 (*((u16 *) the_msg));
  vl_api_msg_data_t *m = vl_api_get_msg_data (am, id);
  u8 *(*handler) (void *, void *, void *);
  u8 *(*print_fp) (void *, void *);
  svm_region_t *old_vlib_rp;
  void *save_shmem_hdr;
  int is_mp_safe = 1;

  if (PREDICT_FALSE (am->elog_trace_api_messages))
    {
      ELOG_TYPE_DECLARE (e) = {
	.format = "api-msg: %s",
	.format_args = "T4",
      };
      struct
      {
	u32 c;
      } * ed;
      ed = ELOG_DATA (am->elog_main, e);
      if (m && m->name)
	ed->c = elog_string (am->elog_main, (char *) m->name);
      else
	ed->c = elog_string (am->elog_main, "BOGUS");
    }

  if (m && m->handler)
    {
      handler = (void *) m->handler;

      if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))
	vl_msg_api_trace (am, am->rx_trace, the_msg);

      if (PREDICT_FALSE (am->msg_print_flag))
	{
	  fformat (stdout, "[%d]: %s\n", id, m->name);
	  print_fp = (void *) am->msg_data[id].print_handler;
	  if (print_fp == 0)
	    {
	      fformat (stdout, "  [no registered print fn for msg %d]\n", id);
	    }
	  else
	    {
	      (*print_fp) (the_msg, vm);
	    }
	}
      is_mp_safe = am->msg_data[id].is_mp_safe;

      if (!is_mp_safe)
	{
	  vl_msg_api_barrier_trace_context (am->msg_data[id].name);
	  vl_msg_api_barrier_sync ();
	}
      if (is_private)
	{
	  old_vlib_rp = am->vlib_rp;
	  save_shmem_hdr = am->shmem_hdr;
	  am->vlib_rp = vlib_rp;
	  am->shmem_hdr = (void *) vlib_rp->user_ctx;
	}

      if (PREDICT_FALSE (vl_mem_api_fuzz_hook != 0))
	(*vl_mem_api_fuzz_hook) (id, the_msg);

      if (m->is_autoendian)
	{
	  void (*endian_fp) (void *);
	  endian_fp = am->msg_data[id].endian_handler;
	  (*endian_fp) (the_msg);
	}
      if (PREDICT_FALSE (vec_len (am->perf_counter_cbs) != 0))
	clib_call_callbacks (am->perf_counter_cbs, am, id, 0 /* before */);

      (*handler) (the_msg, vm, node);

      if (PREDICT_FALSE (vec_len (am->perf_counter_cbs) != 0))
	clib_call_callbacks (am->perf_counter_cbs, am, id, 1 /* after */);
      if (is_private)
	{
	  am->vlib_rp = old_vlib_rp;
	  am->shmem_hdr = save_shmem_hdr;
	}
      if (!is_mp_safe)
	vl_msg_api_barrier_release ();
    }
  else
    {
      clib_warning ("no handler for msg id %d", id);
    }

  /*
   * Special-case, so we can e.g. bounce messages off the vnet
   * main thread without copying them...
   */
  if (!m || !m->bounce)
    {
      if (is_private)
	{
	  old_vlib_rp = am->vlib_rp;
	  save_shmem_hdr = am->shmem_hdr;
	  am->vlib_rp = vlib_rp;
	  am->shmem_hdr = (void *) vlib_rp->user_ctx;
	}
      vl_msg_api_free (the_msg);
      if (is_private)
	{
	  am->vlib_rp = old_vlib_rp;
	  am->shmem_hdr = save_shmem_hdr;
	}
    }

  if (PREDICT_FALSE (am->elog_trace_api_messages))
    {
      ELOG_TYPE_DECLARE (e) = { .format = "api-msg-done(%s): %s",
				.format_args = "t4T4",
				.n_enum_strings = 2,
				.enum_strings = {
				  "barrier",
				  "mp-safe",
				} };

      struct
      {
	u32 barrier;
	u32 c;
      } * ed;
      ed = ELOG_DATA (am->elog_main, e);
      if (m && m->name)
	ed->c = elog_string (am->elog_main, (char *) m->name);
      else
	ed->c = elog_string (am->elog_main, "BOGUS");
      ed->barrier = is_mp_safe;
    }
}

static inline int
void_mem_api_handle_msg_i (api_main_t * am, svm_region_t * vlib_rp,
			   vlib_main_t * vm, vlib_node_runtime_t * node,
			   u8 is_private)
{
  svm_queue_t *q;
  uword mp;

  q = ((vl_shmem_hdr_t *) (void *) vlib_rp->user_ctx)->vl_input_queue;

  if (!svm_queue_sub2 (q, (u8 *) & mp))
    {
      VL_MSG_API_UNPOISON ((void *) mp);
      vl_mem_api_handler_with_vm_node (am, vlib_rp, (void *) mp, vm, node,
				       is_private);
      return 0;
    }
  return -1;
}

int
vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
{
  api_main_t *am = vlibapi_get_main ();
  return void_mem_api_handle_msg_i (am, am->vlib_rp, vm, node,
				    0 /* is_private */ );
}

int
vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
{
  api_main_t *am = vlibapi_get_main ();
  int i;
  uword *tmp, mp;

  /*
   * Swap pending and processing vectors, then process the RPCs
   * Avoid deadlock conditions by construction.
   */
  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
  tmp = vm->processing_rpc_requests;
  vec_reset_length (tmp);
  vm->processing_rpc_requests = vm->pending_rpc_requests;
  vm->pending_rpc_requests = tmp;
  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);

  /*
   * RPCs are used to reflect function calls to thread 0
   * when the underlying code is not thread-safe.
   *
   * Grabbing the thread barrier across a set of RPCs
   * greatly increases efficiency, and avoids
   * running afoul of the barrier sync holddown timer.
   * The barrier sync code supports recursive locking.
   *
   * We really need to rewrite RPC-based code...
   */
  if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))
    {
      vl_msg_api_barrier_sync ();
      for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
	{
	  mp = vm->processing_rpc_requests[i];
	  vl_mem_api_handler_with_vm_node (am, am->vlib_rp, (void *) mp, vm,
					   node, 0 /* is_private */);
	}
      vl_msg_api_barrier_release ();
    }

  return 0;
}

int
vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
			       u32 reg_index)
{
  api_main_t *am = vlibapi_get_main ();
  return void_mem_api_handle_msg_i (am, am->vlib_private_rps[reg_index], vm,
				    node, 1 /* is_private */ );
}

vl_api_registration_t *
vl_mem_api_client_index_to_registration (u32 handle)
{
  vl_api_registration_t **regpp;
  vl_api_registration_t *regp;
  api_main_t *am = vlibapi_get_main ();
  vl_shmem_hdr_t *shmem_hdr;
  u32 index;

  index = vl_msg_api_handle_get_index (handle);
  regpp = am->vl_clients + index;

  if (pool_is_free (am->vl_clients, regpp))
    {
      vl_msg_api_increment_missing_client_counter ();
      return 0;
    }
  regp = *regpp;

  shmem_hdr = (vl_shmem_hdr_t *) regp->shmem_hdr;
  if (!vl_msg_api_handle_is_valid (handle, shmem_hdr->application_restarts))
    {
      vl_msg_api_increment_missing_client_counter ();
      return 0;
    }

  return (regp);
}

svm_queue_t *
vl_api_client_index_to_input_queue (u32 index)
{
  vl_api_registration_t *regp;
  api_main_t *am = vlibapi_get_main ();

  /* Special case: vlib trying to send itself a message */
  if (index == (u32) ~ 0)
    return (am->shmem_hdr->vl_input_queue);

  regp = vl_mem_api_client_index_to_registration (index);
  if (!regp)
    return 0;
  return (regp->vl_input_queue);
}

static clib_error_t *
setup_memclnt_exit (vlib_main_t * vm)
{
  atexit (vl_unmap_shmem);
  return 0;
}

VLIB_INIT_FUNCTION (setup_memclnt_exit);

u8 *
format_api_message_rings (u8 * s, va_list * args)
{
  api_main_t *am = va_arg (*args, api_main_t *);
  vl_shmem_hdr_t *shmem_hdr = va_arg (*args, vl_shmem_hdr_t *);
  int main_segment = va_arg (*args, int);
  ring_alloc_t *ap;
  int i;

  if (shmem_hdr == 0)
    return format (s, "%8s %8s %8s %8s %8s\n",
		   "Owner", "Size", "Nitems", "Hits", "Misses");

  ap = shmem_hdr->vl_rings;

  for (i = 0; i < vec_len (shmem_hdr->vl_rings); i++)
    {
      s = format (s, "%8s %8d %8d %8d %8d\n",
		  "vlib", ap->size, ap->nitems, ap->hits, ap->misses);
      ap++;
    }

  ap = shmem_hdr->client_rings;

  for (i = 0; i < vec_len (shmem_hdr->client_rings); i++)
    {
      s = format (s, "%8s %8d %8d %8d %8d\n",
		  "clnt", ap->size, ap->nitems, ap->hits, ap->misses);
      ap++;
    }

  if (main_segment)
    {
      s = format (s, "%d ring miss fallback allocations\n", am->ring_misses);
      s = format
	(s,
	 "%d application restarts, %d reclaimed msgs, %d garbage collects\n",
	 shmem_hdr->application_restarts, shmem_hdr->restart_reclaims,
	 shmem_hdr->garbage_collects);
    }
  return s;
}

static clib_error_t *
vl_api_ring_command (vlib_main_t * vm,
		     unformat_input_t * input, vlib_cli_command_t * cli_cmd)
{
  int i;
  vl_shmem_hdr_t *shmem_hdr;
  api_main_t *am = vlibapi_get_main ();

  /* First, dump the primary region rings.. */

  if (am->vlib_primary_rp == 0 || am->vlib_primary_rp->user_ctx == 0)
    {
      vlib_cli_output (vm, "Shared memory segment not initialized...\n");
      return 0;
    }

  shmem_hdr = (void *) am->vlib_primary_rp->user_ctx;

  vlib_cli_output (vm, "Main API segment rings:");

  vlib_cli_output (vm, "%U", format_api_message_rings, am,
		   0 /* print header */ , 0 /* notused */ );

  vlib_cli_output (vm, "%U", format_api_message_rings, am,
		   shmem_hdr, 1 /* main segment */ );

  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
    {
      svm_region_t *vlib_rp = am->vlib_private_rps[i];
      shmem_hdr = (void *) vlib_rp->user_ctx;
      vl_api_registration_t **regpp;
      vl_api_registration_t *regp = 0;

      /* For horizontal scaling, add a hash table... */
      /* *INDENT-OFF* */
      pool_foreach (regpp, am->vl_clients)
       {
        regp = *regpp;
        if (regp && regp->vlib_rp == vlib_rp)
          {
            vlib_cli_output (vm, "%s segment rings:", regp->name);
            goto found;
          }
      }
      vlib_cli_output (vm, "regp %llx not found?", regp);
      continue;
      /* *INDENT-ON* */
    found:
      vlib_cli_output (vm, "%U", format_api_message_rings, am,
		       0 /* print header */ , 0 /* notused */ );
      vlib_cli_output (vm, "%U", format_api_message_rings, am,
		       shmem_hdr, 0 /* main segment */ );
    }

  return 0;
}

/*?
 * Display binary api message allocation ring statistics
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (cli_show_api_ring_command, static) =
{
  .path = "show api ring-stats",
  .short_help = "Message ring statistics",
  .function = vl_api_ring_command,
};
/* *INDENT-ON* */

clib_error_t *
vlibmemory_init (vlib_main_t * vm)
{
  api_main_t *am = vlibapi_get_main ();
  svm_map_region_args_t _a, *a = &_a;
  u8 *remove_path1, *remove_path2;
  void vlibsocket_reference (void);

  vlibsocket_reference ();

  /*
   * By popular request / to avoid support fires, remove any old api segment
   * files Right Here.
   */
  if (am->root_path == 0)
    {
      remove_path1 = format (0, "/dev/shm/global_vm%c", 0);
      remove_path2 = format (0, "/dev/shm/vpe-api%c", 0);
    }
  else
    {
      remove_path1 = format (0, "/dev/shm/%s-global_vm%c", am->root_path, 0);
      remove_path2 = format (0, "/dev/shm/%s-vpe-api%c", am->root_path, 0);
    }

  (void) unlink ((char *) remove_path1);
  (void) unlink ((char *) remove_path2);

  vec_free (remove_path1);
  vec_free (remove_path2);

  clib_memset (a, 0, sizeof (*a));
  a->root_path = am->root_path;
  a->name = SVM_GLOBAL_REGION_NAME;
  a->baseva = (am->global_baseva != 0) ?
    am->global_baseva : +svm_get_global_region_base_va ();
  a->size = (am->global_size != 0) ? am->global_size : SVM_GLOBAL_REGION_SIZE;
  a->flags = SVM_FLAGS_NODATA;
  a->uid = am->api_uid;
  a->gid = am->api_gid;
  a->pvt_heap_size =
    (am->global_pvt_heap_size !=
     0) ? am->global_pvt_heap_size : SVM_PVT_MHEAP_SIZE;

  svm_region_init_args (a);

  return 0;
}

void
vl_set_memory_region_name (const char *name)
{
  api_main_t *am = vlibapi_get_main ();
  am->region_name = name;
}

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */