aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/ip
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2022-01-25 17:32:38 +0000
committerOle Tr�an <otroan@employees.org>2022-05-10 16:01:25 +0000
commitbb912f2e25b5205f0705c4b8a5bd325aed078754 (patch)
tree7f42883d94cd15173c004958bb620d04086ce755 /src/vnet/ip
parente63a2d44d16774a88763c5f6368a3f7210c64ddc (diff)
ip: reassembly: add documentation
Type: docs Signed-off-by: Klement Sekera <ksekera@cisco.com> Change-Id: I23008cde47d8b7a531346eab02902e2ced18742a
Diffstat (limited to 'src/vnet/ip')
-rw-r--r--src/vnet/ip/reass/reassembly.rst221
1 files changed, 221 insertions, 0 deletions
diff --git a/src/vnet/ip/reass/reassembly.rst b/src/vnet/ip/reass/reassembly.rst
new file mode 100644
index 00000000000..d6861ed8a05
--- /dev/null
+++ b/src/vnet/ip/reass/reassembly.rst
@@ -0,0 +1,221 @@
+.. _reassembly:
+
+IP Reassembly
+=============
+
+Some VPP functions need access to whole packet and/or stream
+classification based on L4 headers. Reassembly functionality allows
+both former and latter.
+
+Full reassembly vs shallow (virtual) reassembly
+-----------------------------------------------
+
+There are two kinds of reassembly available in VPP:
+
+1. Full reassembly changes a stream of packet fragments into one
+packet containing all data reassembled with fragment bits cleared
+and fragment header stripped (in case of ip6). Note that resulting
+packet may come out of reassembly as a buffer chain. Because it's
+impractical to parse headers which are split over multiple vnet
+buffers, vnet_buffer_chain_linearize() is called after reassembly so
+that L2/L3/L4 headers can be found in first buffer. Full reassembly
+is costly and shouldn't be used unless necessary. Full reassembly is by
+default enabled for both ipv4 and ipv6 traffic for "forus" traffic
+- that is packets aimed at VPP addresses. This can be disabled via API
+if desired, in which case "forus" fragments are dropped.
+
+2. Shallow (virtual) reassembly allows various classifying and/or
+translating features to work with fragments without having to
+understand fragmentation. It works by extracting L4 data and adding
+them to vnet_buffer for each packet/fragment passing throught SVR
+nodes. This operation is performed for both fragments and regular
+packets, allowing consuming code to treat all packets in same way. SVR
+caches incoming packet fragments (buffers) until first fragment is
+seen. Then it extracts L4 data from that first fragment, fills it for
+any cached fragments and transmits them in the same order as they were
+received. From that point on, any other passing fragments get L4 data
+populated in vnet_buffer based on reassembly context.
+
+Multi-worker behaviour
+^^^^^^^^^^^^^^^^^^^^^^
+
+Both reassembly types deal with fragments arriving on different workers
+via handoff mechanism. All reassembly contexts are stored in pools.
+Bihash mapping 5-tuple key to a value containing pool index and thread
+index is used for lookups. When a lookup finds an existing reasembly on
+a different thread, it hands off the fragment to that thread. If lookup
+fails, a new reassembly context is created and current worker becomes
+owner of that context. Further fragments received on other worker
+threads are then handed off owner worker thread.
+
+Full reassembly also remembers thread index where first fragment (as in
+fragment with fragment offset 0) was seen and uses handoff mechanism to
+send the reassembled packet out on that thread even if pool owner is
+a different thread. This then requires an additional handoff to free
+reassembly context as only pool owner can do that in a thread-safe way.
+
+Limits
+^^^^^^
+
+Because reassembly could be an attack vector, there is a configurable
+limit on the number of concurrent reassemblies and also maximum
+fragments per packet.
+
+Custom applications
+^^^^^^^^^^^^^^^^^^^
+
+Both reassembly features allow to be used by custom applicatind which
+are not part of VPP source tree. Be it patches or 3rd party plugins,
+they can build their own graph paths by using "-custom*" versions of
+nodes. Reassembly then reads next_index and error_next_index for each
+buffer from vnet_buffer, allowing custom application to steer
+both reassembled packets and any packets which are considered an error
+in a way the custom application requires.
+
+Full reassembly
+---------------
+
+Configuration
+^^^^^^^^^^^^^
+
+Configuration is via API (``ip_reassembly_enable_disable``) or CLI:
+
+``set interface reassembly <interface-name> [on|off|ip4|ip6]``
+
+here ``on`` means both ip4 and ip6.
+
+A show command is provided to see reassembly contexts:
+
+For ip4:
+
+``show ip4-full-reassembly [details]``
+
+For ip6:
+
+``show ip6-full-reassembly [details]``
+
+Global full reassembly parameters can be modified using API
+``ip_reassembly_set`` and retrieved using ``ip_reassembly_get``.
+
+Defaults
+""""""""
+
+For defaults values, see #defines in
+
+`ip4_full_reass.c <__REPOSITORY_URL__/src/vnet/ip/reass/ip4_full_reass.c>`_
+
+========================================= ==========================================
+#define description
+----------------------------------------- ------------------------------------------
+IP4_REASS_TIMEOUT_DEFAULT_MS timeout in milliseconds
+IP4_REASS_EXPIRE_WALK_INTERVAL_DEFAULT_MS interval between reaping expired sessions
+IP4_REASS_MAX_REASSEMBLIES_DEFAULT maximum number of concurrent reassemblies
+IP4_REASS_MAX_REASSEMBLY_LENGTH_DEFAULT maximum number of fragments per reassembly
+========================================= ==========================================
+
+and
+
+`ip6_full_reass.c <__REPOSITORY_URL__/src/vnet/ip/reass/ip6_full_reass.c>`_
+
+========================================= ==========================================
+#define description
+----------------------------------------- ------------------------------------------
+IP6_REASS_TIMEOUT_DEFAULT_MS timeout in milliseconds
+IP6_REASS_EXPIRE_WALK_INTERVAL_DEFAULT_MS interval between reaping expired sessions
+IP6_REASS_MAX_REASSEMBLIES_DEFAULT maximum number of concurrent reassemblies
+IP6_REASS_MAX_REASSEMBLY_LENGTH_DEFAULT maximum number of fragments per reassembly
+========================================= ==========================================
+
+Finished/expired contexts
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Reassembly contexts are freed either when reassembly is finished - when
+all data has been received or in case of timeout. There is a process
+walking all reassemblies, freeing any expired ones.
+
+Shallow (virtual) reassembly
+----------------------------
+
+Configuration
+^^^^^^^^^^^^^
+
+Configuration is via API (``ip_reassembly_enable_disable``) only as
+there is no value in turning SVR on by hand without a feature consuming
+buffer metadata. SVR is designed to be turned on by a feature requiring
+it in a programmatic way.
+
+A show command is provided to see reassembly contexts:
+
+For ip4:
+
+``show ip4-sv-reassembly [details]``
+
+For ip6:
+
+``show ip6-sv-reassembly [details]``
+
+Global shallow reassembly parameters can be modified using API
+``ip_reassembly_set`` and retrieved using ``ip_reassembly_get``.
+
+Defaults
+""""""""
+
+For defaults values, see #defines in
+
+`ip4_sv_reass.c <__REPOSITORY_URL__/src/vnet/ip/reass/ip4_sv_reass.c>`_
+
+============================================ ==========================================
+#define description
+-------------------------------------------- ------------------------------------------
+IP4_SV_REASS_TIMEOUT_DEFAULT_MS timeout in milliseconds
+IP4_SV_REASS_EXPIRE_WALK_INTERVAL_DEFAULT_MS interval between reaping expired sessions
+IP4_SV_REASS_MAX_REASSEMBLIES_DEFAULT maximum number of concurrent reassemblies
+IP4_SV_REASS_MAX_REASSEMBLY_LENGTH_DEFAULT maximum number of fragments per reassembly
+============================================ ==========================================
+
+and
+
+`ip6_sv_reass.c <__REPOSITORY_URL__/src/vnet/ip/reass/ip6_sv_reass.c>`_
+
+============================================ ==========================================
+#define description
+-------------------------------------------- ------------------------------------------
+IP6_SV_REASS_TIMEOUT_DEFAULT_MS timeout in milliseconds
+IP6_SV_REASS_EXPIRE_WALK_INTERVAL_DEFAULT_MS interval between reaping expired sessions
+IP6_SV_REASS_MAX_REASSEMBLIES_DEFAULT maximum number of concurrent reassemblies
+IP6_SV_REASS_MAX_REASSEMBLY_LENGTH_DEFAULT maximum number of fragments per reassembly
+============================================ ==========================================
+
+Expiring contexts
+^^^^^^^^^^^^^^^^^
+
+There is no way of knowing when a reassembly is finished without
+performing (an almost) full reassembly, so contexts in SVR cannot be
+freed in the same way as in full reassembly. Instead a different
+approach is taken. Least recently used (LRU) list is maintained where
+reassembly contexts are ordered based on last update. The oldest
+context is then freed whenever SVR hits limit on number of concurrent
+reassembly contexts. There is also a process reaping expired sessions
+similar as in full reassembly.
+
+Truncated packets
+^^^^^^^^^^^^^^^^^
+
+When SVR detects that a packet has been truncated in a way where L4
+headers are not available, it will mark it as such in vnet_buffer,
+allowing downstream features to handle such packets as they deem fit.
+
+Fast path/slow path
+^^^^^^^^^^^^^^^^^^^
+
+SVR runs is implemented fast path/slow path way. By default, it assumes
+that any passing traffic doesn't contain fragments, processing buffers
+in a dual-loop. If it sees a fragment, it then jumps to single-loop
+processing.
+
+Feature enabled by other features/reference counting
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+SVR feature is enabled by some other features, like NAT, when those
+features are enabled. For this to work, it implements a reference
+counted API for enabling/disabling SVR.
='n572' href='#n572'>572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
/*
* Copyright (c) 2017-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <vnet/vnet.h>
#include <vlibmemory/api.h>
#include <vnet/session/application.h>
#include <vnet/session/application_interface.h>
#include <hs_apps/proxy.h>
#include <vnet/tcp/tcp.h>

proxy_main_t proxy_main;

#define TCP_MSS 1460

typedef struct
{
  session_endpoint_cfg_t sep;
  u32 app_index;
  u32 api_context;
} proxy_connect_args_t;

static void
proxy_cb_fn (void *data, u32 data_len)
{
  proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
  vnet_connect_args_t a;

  clib_memset (&a, 0, sizeof (a));
  a.api_context = pa->api_context;
  a.app_index = pa->app_index;
  clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep));
  vnet_connect (&a);
  if (a.sep_ext.ext_cfg)
    clib_mem_free (a.sep_ext.ext_cfg);
}

static void
proxy_call_main_thread (vnet_connect_args_t * a)
{
  if (vlib_get_thread_index () == 0)
    {
      vnet_connect (a);
      if (a->sep_ext.ext_cfg)
	clib_mem_free (a->sep_ext.ext_cfg);
    }
  else
    {
      proxy_connect_args_t args;
      args.api_context = a->api_context;
      args.app_index = a->app_index;
      clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext));
      vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
    }
}

static proxy_session_t *
proxy_session_alloc (void)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps;

  pool_get_zero (pm->sessions, ps);
  ps->ps_index = ps - pm->sessions;

  return ps;
}

static inline proxy_session_t *
proxy_session_get (u32 ps_index)
{
  proxy_main_t *pm = &proxy_main;

  return pool_elt_at_index (pm->sessions, ps_index);
}

static inline proxy_session_t *
proxy_session_get_if_valid (u32 ps_index)
{
  proxy_main_t *pm = &proxy_main;

  if (pool_is_free_index (pm->sessions, ps_index))
    return 0;
  return pool_elt_at_index (pm->sessions, ps_index);
}

static void
proxy_session_free (proxy_session_t *ps)
{
  proxy_main_t *pm = &proxy_main;

  if (CLIB_DEBUG > 0)
    clib_memset (ps, 0xFE, sizeof (*ps));
  pool_put (pm->sessions, ps);
}

static int
proxy_session_postponed_free_rpc (void *arg)
{
  uword ps_index = pointer_to_uword (arg);
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps = 0;

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (ps_index);
  segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo);
  proxy_session_free (ps);

  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  return 0;
}

static void
proxy_session_postponed_free (proxy_session_t *ps)
{
  session_send_rpc_evt_to_thread (ps->po_thread_index,
				  proxy_session_postponed_free_rpc,
				  uword_to_pointer (ps->ps_index, void *));
}

static void
proxy_try_close_session (session_t * s, int is_active_open)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps = 0;
  vnet_disconnect_args_t _a, *a = &_a;

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (s->opaque);

  if (is_active_open)
    {
      a->handle = ps->vpp_active_open_handle;
      a->app_index = pm->active_open_app_index;
      vnet_disconnect_session (a);
      ps->ao_disconnected = 1;

      if (!ps->po_disconnected)
	{
	  ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
	  a->handle = ps->vpp_server_handle;
	  a->app_index = pm->server_app_index;
	  vnet_disconnect_session (a);
	  ps->po_disconnected = 1;
	}
    }
  else
    {
      a->handle = ps->vpp_server_handle;
      a->app_index = pm->server_app_index;
      vnet_disconnect_session (a);
      ps->po_disconnected = 1;

      if (!ps->ao_disconnected && !ps->active_open_establishing)
	{
	  /* Proxy session closed before active open */
	  if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
	    {
	      a->handle = ps->vpp_active_open_handle;
	      a->app_index = pm->active_open_app_index;
	      vnet_disconnect_session (a);
	    }
	  ps->ao_disconnected = 1;
	}
    }
  clib_spinlock_unlock_if_init (&pm->sessions_lock);
}

static void
proxy_try_delete_session (session_t * s, u8 is_active_open)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps = 0;

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (s->opaque);

  if (is_active_open)
    {
      ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;

      /* Revert master thread index change on connect notification */
      ps->server_rx_fifo->master_thread_index = ps->po_thread_index;

      /* Passive open already cleaned up */
      if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
	{
	  ASSERT (s->rx_fifo->refcnt == 1);

	  /* The two sides of the proxy on different threads */
	  if (ps->po_thread_index != s->thread_index)
	    {
	      /* This is not the right thread to delete the fifos */
	      s->rx_fifo = 0;
	      s->tx_fifo = 0;
	      proxy_session_postponed_free (ps);
	    }
	  else
	    proxy_session_free (ps);
	}
    }
  else
    {
      ps->vpp_server_handle = SESSION_INVALID_HANDLE;

      if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
	{
	  if (!ps->active_open_establishing)
	    proxy_session_free (ps);
	}
    }
  clib_spinlock_unlock_if_init (&pm->sessions_lock);
}

static int
common_fifo_tuning_callback (session_t * s, svm_fifo_t * f,
			     session_ft_action_t act, u32 bytes)
{
  proxy_main_t *pm = &proxy_main;

  segment_manager_t *sm = segment_manager_get (f->segment_manager);
  fifo_segment_t *fs = segment_manager_get_segment (sm, f->segment_index);

  u8 seg_usage = fifo_segment_get_mem_usage (fs);
  u32 fifo_in_use = svm_fifo_max_dequeue_prod (f);
  u32 fifo_size = svm_fifo_size (f);
  u8 fifo_usage = fifo_in_use * 100 / fifo_size;
  u8 update_size = 0;

  ASSERT (act < SESSION_FT_ACTION_N_ACTIONS);

  if (act == SESSION_FT_ACTION_ENQUEUED)
    {
      if (seg_usage < pm->low_watermark && fifo_usage > 50)
	update_size = fifo_in_use;
      else if (seg_usage < pm->high_watermark && fifo_usage > 80)
	update_size = fifo_in_use;

      update_size = clib_min (update_size, sm->max_fifo_size - fifo_size);
      if (update_size)
	svm_fifo_set_size (f, fifo_size + update_size);
    }
  else				/* dequeued */
    {
      if (seg_usage > pm->high_watermark || fifo_usage < 20)
	update_size = bytes;
      else if (seg_usage > pm->low_watermark && fifo_usage < 50)
	update_size = (bytes / 2);

      ASSERT (fifo_size >= 4096);
      update_size = clib_min (update_size, fifo_size - 4096);
      if (update_size)
	svm_fifo_set_size (f, fifo_size - update_size);
    }

  return 0;
}

static int
proxy_accept_callback (session_t * s)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps;

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_alloc ();
  ps->vpp_server_handle = session_handle (s);
  ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
  ps->po_thread_index = s->thread_index;

  s->opaque = ps->ps_index;

  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  s->session_state = SESSION_STATE_READY;

  return 0;
}

static void
proxy_disconnect_callback (session_t * s)
{
  proxy_try_close_session (s, 0 /* is_active_open */ );
}

static void
proxy_reset_callback (session_t * s)
{
  proxy_try_close_session (s, 0 /* is_active_open */ );
}

static int
proxy_connected_callback (u32 app_index, u32 api_context,
			  session_t * s, session_error_t err)
{
  clib_warning ("called...");
  return -1;
}

static int
proxy_add_segment_callback (u32 client_index, u64 segment_handle)
{
  return 0;
}

static int
proxy_transport_needs_crypto (transport_proto_t proto)
{
  return proto == TRANSPORT_PROTO_TLS;
}

static int
proxy_rx_callback (session_t * s)
{
  proxy_main_t *pm = &proxy_main;
  u32 thread_index = vlib_get_thread_index ();
  svm_fifo_t *ao_tx_fifo;
  proxy_session_t *ps;

  ASSERT (s->thread_index == thread_index);

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (s->opaque);

  if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
    {
      clib_spinlock_unlock_if_init (&pm->sessions_lock);

      ao_tx_fifo = s->rx_fifo;

      /*
       * Send event for active open tx fifo
       */
      if (svm_fifo_set_event (ao_tx_fifo))
	{
	  u32 ao_thread_index = ao_tx_fifo->master_thread_index;
	  u32 ao_session_index = ao_tx_fifo->shr->master_session_index;
	  if (session_send_io_evt_to_thread_custom (&ao_session_index,
						    ao_thread_index,
						    SESSION_IO_EVT_TX))
	    clib_warning ("failed to enqueue tx evt");
	}

      if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
	svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
    }
  else
    {
      vnet_connect_args_t _a, *a = &_a;
      svm_fifo_t *tx_fifo, *rx_fifo;
      u32 max_dequeue, ps_index;
      int actual_transfer __attribute__ ((unused));

      rx_fifo = s->rx_fifo;
      tx_fifo = s->tx_fifo;

      ASSERT (rx_fifo->master_thread_index == thread_index);
      ASSERT (tx_fifo->master_thread_index == thread_index);

      max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);

      if (PREDICT_FALSE (max_dequeue == 0))
	{
	  clib_spinlock_unlock_if_init (&pm->sessions_lock);
	  return 0;
	}

      max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
      actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
				       max_dequeue, pm->rx_buf[thread_index]);

      /* $$$ your message in this space: parse url, etc. */

      clib_memset (a, 0, sizeof (*a));

      ps->server_rx_fifo = rx_fifo;
      ps->server_tx_fifo = tx_fifo;
      ps->active_open_establishing = 1;
      ps_index = ps->ps_index;

      clib_spinlock_unlock_if_init (&pm->sessions_lock);

      clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
      a->api_context = ps_index;
      a->app_index = pm->active_open_app_index;

      if (proxy_transport_needs_crypto (a->sep.transport_proto))
	{
	  session_endpoint_alloc_ext_cfg (&a->sep_ext,
					  TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
	  a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
	}

      proxy_call_main_thread (a);
    }

  return 0;
}

static void
proxy_force_ack (void *handlep)
{
  transport_connection_t *tc;
  session_t *ao_s;

  ao_s = session_get_from_handle (pointer_to_uword (handlep));
  if (session_get_transport_proto (ao_s) != TRANSPORT_PROTO_TCP)
    return;
  tc = session_get_transport (ao_s);
  tcp_send_ack ((tcp_connection_t *) tc);
}

static int
proxy_tx_callback (session_t * proxy_s)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps;
  u32 min_free;

  min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
  if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
    {
      svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
      return 0;
    }

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (proxy_s->opaque);

  if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
    goto unlock;

  /* Force ack on active open side to update rcv wnd. Make sure it's done on
   * the right thread */
  void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
  session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
				  proxy_force_ack, arg);

unlock:
  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  return 0;
}

static void
proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
{
  if (ntf == SESSION_CLEANUP_TRANSPORT)
    return;

  proxy_try_delete_session (s, 0 /* is_active_open */ );
}

static session_cb_vft_t proxy_session_cb_vft = {
  .session_accept_callback = proxy_accept_callback,
  .session_disconnect_callback = proxy_disconnect_callback,
  .session_connected_callback = proxy_connected_callback,
  .add_segment_callback = proxy_add_segment_callback,
  .builtin_app_rx_callback = proxy_rx_callback,
  .builtin_app_tx_callback = proxy_tx_callback,
  .session_reset_callback = proxy_reset_callback,
  .session_cleanup_callback = proxy_cleanup_callback,
  .fifo_tuning_callback = common_fifo_tuning_callback,
};

static int
active_open_alloc_session_fifos (session_t *s)
{
  proxy_main_t *pm = &proxy_main;
  svm_fifo_t *rxf, *txf;
  proxy_session_t *ps;

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (s->opaque);

  txf = ps->server_rx_fifo;
  rxf = ps->server_tx_fifo;

  /*
   * Reset the active-open tx-fifo master indices so the active-open session
   * will receive data, etc.
   */
  txf->shr->master_session_index = s->session_index;
  txf->master_thread_index = s->thread_index;

  /*
   * Account for the active-open session's use of the fifos
   * so they won't disappear until the last session which uses
   * them disappears
   */
  rxf->refcnt++;
  txf->refcnt++;

  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  s->rx_fifo = rxf;
  s->tx_fifo = txf;

  return 0;
}

static int
active_open_connected_callback (u32 app_index, u32 opaque,
				session_t * s, session_error_t err)
{
  proxy_main_t *pm = &proxy_main;
  proxy_session_t *ps;
  u8 thread_index = vlib_get_thread_index ();

  /*
   * Setup proxy session handle.
   */
  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get (opaque);

  /* Connection failed */
  if (err)
    {
      vnet_disconnect_args_t _a, *a = &_a;

      a->handle = ps->vpp_server_handle;
      a->app_index = pm->server_app_index;
      vnet_disconnect_session (a);
      ps->po_disconnected = 1;
    }
  else
    {
      ps->vpp_active_open_handle = session_handle (s);
      ps->active_open_establishing = 0;
    }

  /* Passive open session was already closed! */
  if (ps->po_disconnected)
    {
      /* Setup everything for the cleanup notification */
      ps->ao_disconnected = 1;
      clib_spinlock_unlock_if_init (&pm->sessions_lock);
      return -1;
    }

  s->opaque = opaque;

  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  /*
   * Send event for active open tx fifo
   */
  ASSERT (s->thread_index == thread_index);
  if (svm_fifo_set_event (s->tx_fifo))
    session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);

  return 0;
}

static void
active_open_reset_callback (session_t * s)
{
  proxy_try_close_session (s, 1 /* is_active_open */ );
}

static int
active_open_create_callback (session_t * s)
{
  return 0;
}

static void
active_open_disconnect_callback (session_t * s)
{
  proxy_try_close_session (s, 1 /* is_active_open */ );
}

static int
active_open_rx_callback (session_t * s)
{
  svm_fifo_t *proxy_tx_fifo;

  proxy_tx_fifo = s->rx_fifo;

  /*
   * Send event for server tx fifo
   */
  if (svm_fifo_set_event (proxy_tx_fifo))
    {
      u8 thread_index = proxy_tx_fifo->master_thread_index;
      u32 session_index = proxy_tx_fifo->shr->master_session_index;
      return session_send_io_evt_to_thread_custom (&session_index,
						   thread_index,
						   SESSION_IO_EVT_TX);
    }

  if (svm_fifo_max_enqueue (proxy_tx_fifo) <= TCP_MSS)
    svm_fifo_add_want_deq_ntf (proxy_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);

  return 0;
}

static int
active_open_tx_callback (session_t * ao_s)
{
  proxy_main_t *pm = &proxy_main;
  transport_connection_t *tc;
  proxy_session_t *ps;
  session_t *proxy_s;
  u32 min_free;

  min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
  if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
    {
      svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
      return 0;
    }

  clib_spinlock_lock_if_init (&pm->sessions_lock);

  ps = proxy_session_get_if_valid (ao_s->opaque);
  if (!ps)
    goto unlock;

  if (ps->vpp_server_handle == ~0)
    goto unlock;

  proxy_s = session_get_from_handle (ps->vpp_server_handle);

  /* Force ack on proxy side to update rcv wnd */
  tc = session_get_transport (proxy_s);
  tcp_send_ack ((tcp_connection_t *) tc);

unlock:
  clib_spinlock_unlock_if_init (&pm->sessions_lock);

  return 0;
}

static void
active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
{
  if (ntf == SESSION_CLEANUP_TRANSPORT)
    return;

  proxy_try_delete_session (s, 1 /* is_active_open */ );
}

/* *INDENT-OFF* */
static session_cb_vft_t active_open_clients = {
  .session_reset_callback = active_open_reset_callback,
  .session_connected_callback = active_open_connected_callback,
  .session_accept_callback = active_open_create_callback,
  .session_disconnect_callback = active_open_disconnect_callback,
  .session_cleanup_callback = active_open_cleanup_callback,
  .builtin_app_rx_callback = active_open_rx_callback,
  .builtin_app_tx_callback = active_open_tx_callback,
  .fifo_tuning_callback = common_fifo_tuning_callback,
  .proxy_alloc_session_fifos = active_open_alloc_session_fifos,
};
/* *INDENT-ON* */

static int
proxy_server_attach ()
{
  proxy_main_t *pm = &proxy_main;
  u64 options[APP_OPTIONS_N_OPTIONS];
  vnet_app_attach_args_t _a, *a = &_a;

  clib_memset (a, 0, sizeof (*a));
  clib_memset (options, 0, sizeof (options));

  a->name = format (0, "proxy-server");
  a->api_client_index = pm->server_client_index;
  a->session_cb_vft = &proxy_session_cb_vft;
  a->options = options;
  a->options[APP_OPTIONS_SEGMENT_SIZE] = pm->segment_size;
  a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = pm->segment_size;
  a->options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
  a->options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
  a->options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
  a->options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
  a->options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
  a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
  a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
    pm->prealloc_fifos ? pm->prealloc_fifos : 0;

  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;

  if (vnet_application_attach (a))
    {
      clib_warning ("failed to attach server");
      return -1;
    }
  pm->server_app_index = a->app_index;

  vec_free (a->name);
  return 0;
}

static int
active_open_attach (void)
{
  proxy_main_t *pm = &proxy_main;
  vnet_app_attach_args_t _a, *a = &_a;
  u64 options[APP_OPTIONS_N_OPTIONS];

  clib_memset (a, 0, sizeof (*a));
  clib_memset (options, 0, sizeof (options));

  a->api_client_index = pm->active_open_client_index;
  a->session_cb_vft = &active_open_clients;
  a->name = format (0, "proxy-active-open");

  options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
  options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
  options[APP_OPTIONS_RX_FIFO_SIZE] = pm->fifo_size;
  options[APP_OPTIONS_TX_FIFO_SIZE] = pm->fifo_size;
  options[APP_OPTIONS_MAX_FIFO_SIZE] = pm->max_fifo_size;
  options[APP_OPTIONS_HIGH_WATERMARK] = (u64) pm->high_watermark;
  options[APP_OPTIONS_LOW_WATERMARK] = (u64) pm->low_watermark;
  options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = pm->private_segment_count;
  options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] =
    pm->prealloc_fifos ? pm->prealloc_fifos : 0;

  options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN
    | APP_OPTIONS_FLAGS_IS_PROXY;

  a->options = options;

  if (vnet_application_attach (a))
    return -1;

  pm->active_open_app_index = a->app_index;

  vec_free (a->name);

  return 0;
}

static int
proxy_server_listen ()
{
  proxy_main_t *pm = &proxy_main;
  vnet_listen_args_t _a, *a = &_a;
  int rv;

  clib_memset (a, 0, sizeof (*a));

  a->app_index = pm->server_app_index;
  clib_memcpy (&a->sep_ext, &pm->server_sep, sizeof (pm->server_sep));
  if (proxy_transport_needs_crypto (a->sep.transport_proto))
    {
      session_endpoint_alloc_ext_cfg (&a->sep_ext,
				      TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
      a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
    }

  rv = vnet_listen (a);
  if (a->sep_ext.ext_cfg)
    clib_mem_free (a->sep_ext.ext_cfg);

  return rv;
}

static void
proxy_server_add_ckpair (void)
{
  vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
  proxy_main_t *pm = &proxy_main;

  clib_memset (ck_pair, 0, sizeof (*ck_pair));
  ck_pair->cert = (u8 *) test_srv_crt_rsa;
  ck_pair->key = (u8 *) test_srv_key_rsa;
  ck_pair->cert_len = test_srv_crt_rsa_len;
  ck_pair->key_len = test_srv_key_rsa_len;
  vnet_app_add_cert_key_pair (ck_pair);

  pm->ckpair_index = ck_pair->index;
}

static int
proxy_server_create (vlib_main_t * vm)
{
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  proxy_main_t *pm = &proxy_main;
  u32 num_threads;
  int i;

  num_threads = 1 /* main thread */  + vtm->n_threads;
  vec_validate (pm->rx_buf, num_threads - 1);

  for (i = 0; i < num_threads; i++)
    vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);

  proxy_server_add_ckpair ();

  if (proxy_server_attach ())
    {
      clib_warning ("failed to attach server app");
      return -1;
    }
  if (proxy_server_listen ())
    {
      clib_warning ("failed to start listening");
      return -1;
    }
  if (active_open_attach ())
    {
      clib_warning ("failed to attach active open app");
      return -1;
    }

  return 0;
}

static clib_error_t *
proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
				vlib_cli_command_t * cmd)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  char *default_server_uri = "tcp://0.0.0.0/23";
  char *default_client_uri = "tcp://6.0.2.2/23";
  u8 *server_uri = 0, *client_uri = 0;
  proxy_main_t *pm = &proxy_main;
  clib_error_t *error = 0;
  int rv, tmp32;
  u64 tmp64;

  pm->fifo_size = 64 << 10;
  pm->max_fifo_size = 128 << 20;
  pm->high_watermark = 80;
  pm->low_watermark = 50;
  pm->rcv_buffer_size = 1024;
  pm->prealloc_fifos = 0;
  pm->private_segment_count = 0;
  pm->segment_size = 512 << 20;

  if (vlib_num_workers ())
    clib_spinlock_init (&pm->sessions_lock);

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "fifo-size %U", unformat_memory_size,
		    &pm->fifo_size))
	;
      else if (unformat (line_input, "max-fifo-size %U", unformat_memory_size,
			 &pm->max_fifo_size))
	;
      else if (unformat (line_input, "high-watermark %d", &tmp32))
	pm->high_watermark = (u8) tmp32;
      else if (unformat (line_input, "low-watermark %d", &tmp32))
	pm->low_watermark = (u8) tmp32;
      else if (unformat (line_input, "rcv-buf-size %d", &pm->rcv_buffer_size))
	;
      else if (unformat (line_input, "prealloc-fifos %d", &pm->prealloc_fifos))
	;
      else if (unformat (line_input, "private-segment-count %d",
			 &pm->private_segment_count))
	;
      else if (unformat (line_input, "private-segment-size %U",
			 unformat_memory_size, &tmp64))
	{
	  pm->segment_size = tmp64;
	}
      else if (unformat (line_input, "server-uri %s", &server_uri))
	vec_add1 (server_uri, 0);
      else if (unformat (line_input, "client-uri %s", &client_uri))
	vec_add1 (client_uri, 0);
      else
	{
	  error = clib_error_return (0, "unknown input `%U'",
				     format_unformat_error, line_input);
	  goto done;
	}
    }

  if (!server_uri)
    {
      clib_warning ("No server-uri provided, Using default: %s",
		    default_server_uri);
      server_uri = format (0, "%s%c", default_server_uri, 0);
    }
  if (!client_uri)
    {
      clib_warning ("No client-uri provided, Using default: %s",
		    default_client_uri);
      client_uri = format (0, "%s%c", default_client_uri, 0);
    }

  if (parse_uri ((char *) server_uri, &pm->server_sep))
    {
      error = clib_error_return (0, "Invalid server uri %v", server_uri);
      goto done;
    }
  if (parse_uri ((char *) client_uri, &pm->client_sep))
    {
      error = clib_error_return (0, "Invalid client uri %v", client_uri);
      goto done;
    }

  vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );

  rv = proxy_server_create (vm);
  switch (rv)
    {
    case 0:
      break;
    default:
      error = clib_error_return (0, "server_create returned %d", rv);
    }

done:
  unformat_free (line_input);
  vec_free (client_uri);
  vec_free (server_uri);
  return error;
}

VLIB_CLI_COMMAND (proxy_create_command, static) =
{
  .path = "test proxy server",
  .short_help = "test proxy server [server-uri <tcp://ip/port>]"
      "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
      "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
      "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
      "[private-segment-size <mem>][private-segment-count <nn>]",
  .function = proxy_server_create_command_fn,
};

clib_error_t *
proxy_main_init (vlib_main_t * vm)
{
  proxy_main_t *pm = &proxy_main;
  pm->server_client_index = ~0;
  pm->active_open_client_index = ~0;

  return 0;
}

VLIB_INIT_FUNCTION (proxy_main_init);

/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/