summaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session_node.c
AgeCommit message (Collapse)AuthorFilesLines
2023-06-26udp: pass cless hdr to transport through bufferFlorin Coras1-4/+2
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Filip Tehlar <ftehlar@cisco.com> Change-Id: I7177ada23e5a69ec8e362ec98b98010c3b44b3d7
2023-03-24session: fix session node switching to interrupt mode failded if no user eventsXiaoming Jiang1-2/+2
wrk->event_elts has 5 elements if no user events Type: fix Signed-off-by: Xiaoming Jiang <jiangxiaoming@outlook.com> Change-Id: Ib38fab422304efc470e20ccb7121442f05bf8bf3
2023-03-22session: add session statsFilip Tehlar1-5/+10
Type: feature Signed-off-by: Filip Tehlar <ftehlar@cisco.com> Change-Id: I02d9bb5292b32ffb1b2f05daccd8a7d5dba05125
2023-03-14session vcl: refactor builtin tx event for main txFlorin Coras1-3/+5
Rename unused SESSION_IO_EVT_BUILTIN_TX to SESSION_IO_EVT_TX_MAIN and leverage it for non-connected udp tx. Non-connected udp sessions are listeners and are therefore allocated on main thread. Consequently, whenever session queue node is not polling main, tx events generated by external applications might be missed or processed with some delay. To solve this, request that apps use SESSION_IO_EVT_TX_MAIN tx events as opposed to SESSION_IO_EVT_TX and send that to first worker as opposed to main. Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I5df5ac3dc80c0f192b2eefb1d465e9deefe8786b
2023-03-14session: pre-alloc required dma batchesMarvin Liu1-0/+2
Specify the number of max_batches when applying for dma config. Skip this round when no batch available from vlib_dma_batch_new. Type: improvement Signed-off-by: Marvin Liu <yong.liu@intel.com> Change-Id: Ic6e0acf81ba4fc3ed33aea6ac6990ef841021c59
2023-02-16session: ignore zero length dgramsFlorin Coras1-1/+9
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I70596ffcf90fa4cd57092584cb7a454f44208943
2023-02-08session: accept lcl ip updates on cl sessionsFlorin Coras1-0/+2
Allow apps/vcl to provide updated local ips for dgrams. In particular, allow sessions bound to 0/0 to send data with valid local ips. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I50a086b1c252731a32a15b6a181ad3dba0c687e0
2023-01-10session: avoid trying to send incomplete dgramFlorin Coras1-2/+3
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Steven Luong <sluong@cisco.com> Change-Id: Ibebe9e4ab7331c3ae66c9502e910368acaba51ec
2023-01-06session: avoid dgram dequeues with no transportFlorin Coras1-0/+5
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I1a3393b579caeb5dc25b60bc1b4a71706fb07051
2022-12-14vcl: enable gso for 'sendmsg' in LDP mode.Dou Chao1-0/+5
Some upon apps(e.g. Nginx-quic) package it's several protocol buffers into a struct msg which is a combination of gso_buffer and gso_size. but if HostStack regardless the gso_size to the buffer and split the buffer with default mss, that cause peer client failed on parsing the package. Type: improvement Signed-off-by: Dou Chao <chao.dou@intel.com> Change-Id: I805eb642be826038ba96d1b85dad8ec0c0f6c459 Signed-off-by: Dou Chao <chao.dou@intel.com>
2022-12-02session: move connects to first workerFlorin Coras1-58/+33
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978
2022-10-26session: add session event log for session stateSteven Luong1-3/+3
To aid sesipon debug, add session event log in SM debug to track the session state. Type: improvement Signed-off-by: Steven Luong <sluong@cisco.com> Change-Id: I6909cf969cd5b6a3ea5a06d08ae32c2f1d48f686
2022-10-20session: add session debug cliSteven Luong1-2/+2
- add session debug cli to enable fine control of which event logs are enable/disable with below syntax session debug {show | group <list> level <n>} list may be entered with a dash, "0-4" or it may be entered with a comma, "0,1,4" - fix compilation errors when SESSION_EVT is enable - change SESSION_EVT_FREE_HANDLER to use DEC_SESSION_ED instead of DEC_SESSION_ETD because the transport may already be free when the handler is called Type: improvement Signed-off-by: Steven Luong <sluong@cisco.com> Change-Id: Iab2989e0a847bb59002ef16494eebcc1d112b2ae
2022-09-15session: support dma optionMarvin Liu1-20/+151
add dma support to session, acclerate host-stack with dma Type: feature Signed-off-by: Marvin Liu <yong.liu@intel.com> Signed-off-by: Junfeng Wang <drenfong.wang@intel.com> Change-Id: I3d492921d69d9e3e0b34d33adc33fba3bde9e1cc
2022-04-22session: fix ctrl evt rpc elt pool reallocFlorin Coras1-5/+10
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I79bfe91e81983b619c61a32285d8e038c2654147
2022-04-04session: use session layer rpc for evts sent to mainFlorin Coras1-5/+9
Fix race with connects for iperf3 udp test. Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ief725b80047911e87ba24736dc0a60aa8bcdac50
2022-03-21session: linked list of events to be handled by mainFlorin Coras1-56/+101
Minimize amount of rpcs from first worker to main Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I3794ff028a17d18b7bff69ede2b62e1e2d45ae77
2022-03-03session: improve tx tracingFlorin Coras1-10/+10
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I4fd7ae435514eb986543302c7e3e69e04acba8cf
2022-02-05session: track bytes dequeued in snd paramsFlorin Coras1-2/+4
Also reset send params flags before calling transports to avoid explicit resets in all transports. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I1da7b3fab009728e7fee4199425ced933fa8a122
2022-01-22session: update time for list of subscribersFlorin Coras1-1/+11
Instead of constantly scanning all transport vfts for update time functions, build list at transport enable time. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Id5c07cc03ee1fdd072ebbbd40119d1a440a5e3b1
2022-01-12session: pass tx buffers in bulk to transportsFlorin Coras1-7/+9
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I1025cccd784f80b557847f69c3ea1ada5c9de60d
2022-01-10session: deschedule sessions with no data to sendFlorin Coras1-2/+13
This ensures the scheduler always tracks sessions that are descheduled, i.e., do not have events in the old io events list. When app retries to send, clear descheduled flag and potentially the pacer. Consequently, transports no longer need to reset the pacer when sessions are rescheduled after a long app tx pause. This also fixes a tcp bug whereby the pacer was reset too often when snd_una was equal to snd_nxt as there was no way to distinguish betwen app tx breaks and congestion. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Id3cc6c98cd76299e15030e504380dcf3c04c5189
2021-11-26session: accept reply improvementsFlorin Coras1-26/+25
- Always check session ownership - Improve test for main thread rpc Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I5fa60f7f5de199af0966987f9ce9a4cc8180cf98
2021-11-15vcl: add DSCP support in VCLFilip Tehlar1-0/+1
Type: feature Signed-off-by: Filip Tehlar <ftehlar@cisco.com> Change-Id: I835675267c997b5dc92a0aaccdb58648bc786bb9
2021-10-06session: fix severity infoFilip Tehlar1-11/+10
Type: fix Signed-off-by: Filip Tehlar <ftehlar@cisco.com> Change-Id: I6548274f8c0ae2a183b1d221cb195de445c2819f
2021-09-09vlib: fix vlib_buffer_enqueue_to_next() overflowBenoît Ganne1-3/+3
vlib_buffer_enqueue_to_next() requires to allow overflow of up to 63 elements of 'buffer' and 'nexts' array. - add helper to compute the minimum size - fix occurences in session and async crypto Type: fix Change-Id: If8d7eebc5bf9beba71ba194aec0f79b8eb6d5843 Signed-off-by: Benoît Ganne <bganne@cisco.com>
2021-09-01udp: fill rmt info if session is connection lessliuyacan1-1/+1
The app may send packet to different remote via sendto() with same socket (without calling connect()). In such scenario, remote address need to be filled in. Type: fix Signed-off-by: liuyacan <liuyacan@corp.netease.com> Change-Id: I49deb308668c05442f237e97acc9bfa239782482
2021-08-30session: fix prefetch out of struct bound on ArmTianyu Li1-1/+1
CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); sizeof(svm_fifo_t) is 128 bytes Note on 64B cacheline size Arm machine, CLIB_CACHE_LINE_BYTES 128 CLIB_CACHE_PREFETCH_BYTES 6 above CLIB_PREFETCH () macro will be expand to __builtin_prefetch(s->tx_fifo) __builtin_prefetch(s->tx_fifo + 64) __builtin_prefetch(s->tx_fifo + 128) << prefetch out of range __builtin_prefetch(s->tx_fifo + 192) << the same here Solution: Change to CLIB_PREFETCH (s->tx_fifo, sizeof (*(s->tx_fifo)), LOAD); Type: fix Signed-off-by: Tianyu Li <tianyu.li@arm.com> Reviewed-by: Lijian Zhang <lijian.zhang@arm.com> Change-Id: I745cbce3dbe5afcab53c39189d18392f569df5aa
2021-08-30tcp: Allow accepting session to send custom packetliuyacan1-1/+9
Tcp may want to send acks in established, but the app has not called accept() yet. Type: improvement Signed-off-by: liuyacan <liuyacan@corp.netease.com> Change-Id: I43b8cd386e533ca95c8ec260a0a1f695ea140358
2021-07-21session: fix unlisten rpc barrier releaseFlorin Coras1-4/+5
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I9301fbbcd611033b4b6ad5313edbc66840f5bb3a
2021-06-29session: free ctrl event data on connect rpcFlorin Coras1-0/+1
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I56c4682aef59ed0e69073f9001341c425e65bd48
2021-06-22session: avoid reordering unlisten and connect msgFlorin Coras1-6/+33
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ibe32f7965f8cf457c39845713b029c8a4647ee55
2021-06-18session: improve ct locking with multiple workersFlorin Coras1-1/+1
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Id91c3be57d49745cb3db6c768a8d5d14133f899e
2021-05-21session: improve main thread connects rpcFlorin Coras1-17/+25
Avoid grabbing the worker barrier if there's no work to be done. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ice3de5df41cd1752aba3419ad2e2dd82f30e9bfb
2021-05-19session: cleanup event llist usageFlorin Coras1-17/+17
Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I750c856ac81d951e8c0e62c710e0f35a0c80d6f9
2021-05-18session: poll main thread if pending connectsFlorin Coras1-48/+71
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ie8a15c50531f3ccd5f91dbc0779e4d9c0d146844
2021-05-18session: only handle old ctrl events per dispatchFlorin Coras1-6/+12
Avoids dispatching ctrl events generated while handling the current pending list. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ibeaf901ba4cf58a68fbd88e5ec3c23f6c2f6f145
2021-05-18session: move tx-buffers to tx ctxFlorin Coras1-13/+11
Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I10ec410fb7f3acb47128dda23510162dc13b20d0
2021-05-16session: rpc for connects to mainFlorin Coras1-5/+81
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ifa47e1500e5cfb3c717f87b1d21131b9531c9005
2021-05-12session: support half-close connectionliuyacan1-0/+25
Some app(e.g. Envoy) may call shutdown() instead of close() when draining connection. Type: improvement Signed-off-by: liuyacan <liuyacan@corp.netease.com> Change-Id: I9543b9ca3caa87b10b134fd1fc4019124e41e4d2
2021-05-01vlib: refactor trajectory trace debug featureBenoît Ganne1-5/+0
trajectory trace has been broken for a while because we used to save the buffer trajectory in a vector pointed to in opaque2. This does not work well when opaque2 is copied (eg. because of a clone) as 2 buffers end up sharing the same vector. This dedicates a full cacheline in the buffer metadata instead when trajectory is compiled in. No dynamic allocation, no sharing, no tears. Type: refactor Change-Id: I6a028ca1b48d38f393a36979e5e452c2dd48ad3f Signed-off-by: Benoît Ganne <bganne@cisco.com>
2021-04-27session: fix mq ext config cleanupFlorin Coras1-2/+1
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I18eb1a71ac22017f75f835a23a57b023b46524ce
2021-04-22vcl session: refactor passing of crypto contextFlorin Coras1-11/+0
Pass tls/quic crypto context using extended config instead of bloating conect/listen messages. Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I0bc637ae310e6c31ef1e16847501dcb81453ee94
2021-04-22vcl session: extended connect/listen configurationFlorin Coras1-1/+35
Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ic8d9386fef37ffd3446aaeb93a96ee6d60633831
2021-04-14session tcp vcl: api to update connection attributesFlorin Coras1-0/+48
Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ifdd6024daf044751895bb8d2deabad41d3a80c92
2021-04-07session: fix coverity warningFlorin Coras1-5/+5
Type: fix Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Id2c06f49bcae1a26a2d16d58f9da7a4d3175bf3c
2021-04-06session: basic support for interrupt modeFlorin Coras1-0/+117
Experimental support for session layer interrupt mode. When enabled (use-private-rx-mqs must be set) session queue node switches to interrupt state when lightly loaded, i.e., no events and less than 1 vector/dispatch. Because transport protocols require a periodic time update, when in interrupt state the session queue node workers register a timerfd with the unix-epoll-input node that when triggered signals, i.e., wakes up, the queue node. Under light load, the timer is set to trigger every 1ms whereas if no session is allocated, the worker moves to idle state and the timeout is set to 100ms. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I905b00777fbc025faf9c4074fce4c516cd139387
2021-04-05tcp: time infra improvementsFlorin Coras1-2/+1
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I998c0686f9f7dc556dda8b28e23bbed127d0aafc
2021-03-29svm session vcl: per app rx message queuesFlorin Coras1-25/+25
Add option to use per app private segments for app to vpp message queues, as opposed to exposing internal message queues segment. When so configured, internal message queues are still polled by the session queue node but external app message queues are handled by a new input node (appsl-rx-mqs-input) that runs in interrupt state. Signaling of the node, when mqs receive new messages, is done through eventfds epolled by worker epoll input nodes. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Iffe8ce5a9944a56a14e6d0f492a850cb9e392d16
2021-03-26vlib: introduce vlib_get_main_by_index(), vlib_get_n_threads()Damjan Marion1-1/+1
Type: improvement Change-Id: If3da7d4338470912f37ff1794620418d928fb77f Signed-off-by: Damjan Marion <damarion@cisco.com>
ef='#n1462'>1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777
/*
 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <vnet/session/application.h>
#include <vnet/session/application_interface.h>
#include <vnet/session/application_namespace.h>
#include <vnet/session/application_local.h>
#include <vnet/session/session.h>

static app_main_t app_main;

#define app_interface_check_thread_and_barrier(_fn, _arg)		\
  if (PREDICT_FALSE (!vlib_thread_is_main_w_barrier ()))		\
    {									\
      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));	\
      return 0;								\
    }

static app_listener_t *
app_listener_alloc (application_t * app)
{
  app_listener_t *app_listener;
  pool_get (app->listeners, app_listener);
  clib_memset (app_listener, 0, sizeof (*app_listener));
  app_listener->al_index = app_listener - app->listeners;
  app_listener->app_index = app->app_index;
  app_listener->session_index = SESSION_INVALID_INDEX;
  app_listener->local_index = SESSION_INVALID_INDEX;
  app_listener->ls_handle = SESSION_INVALID_HANDLE;
  return app_listener;
}

app_listener_t *
app_listener_get (application_t * app, u32 app_listener_index)
{
  return pool_elt_at_index (app->listeners, app_listener_index);
}

static void
app_listener_free (application_t * app, app_listener_t * app_listener)
{
  clib_bitmap_free (app_listener->workers);
  if (CLIB_DEBUG)
    clib_memset (app_listener, 0xfa, sizeof (*app_listener));
  pool_put (app->listeners, app_listener);
}

session_handle_t
app_listener_handle (app_listener_t * al)
{
  return al->ls_handle;
}

app_listener_t *
app_listener_get_w_session (session_t * ls)
{
  application_t *app;

  app = application_get_if_valid (ls->app_index);
  if (!app)
    return 0;
  return app_listener_get (app, ls->al_index);
}

session_handle_t
app_listen_session_handle (session_t * ls)
{
  app_listener_t *al;
  al = app_listener_get_w_session (ls);
  if (!al)
    return listen_session_get_handle (ls);
  return al->ls_handle;
}

app_listener_t *
app_listener_get_w_handle (session_handle_t handle)
{
  session_t *ls;
  ls = session_get_from_handle_if_valid (handle);
  if (!ls)
    return 0;
  return app_listener_get_w_session (ls);
}

app_listener_t *
app_listener_lookup (application_t * app, session_endpoint_cfg_t * sep_ext)
{
  u32 table_index, fib_proto;
  session_endpoint_t *sep;
  session_handle_t handle;
  session_t *ls;

  sep = (session_endpoint_t *) sep_ext;
  if (application_has_local_scope (app) && session_endpoint_is_local (sep))
    {
      table_index = application_local_session_table (app);
      handle = session_lookup_endpoint_listener (table_index, sep, 1);
      if (handle != SESSION_INVALID_HANDLE)
	{
	  ls = listen_session_get_from_handle (handle);
	  return app_listener_get_w_session (ls);
	}
    }

  fib_proto = session_endpoint_fib_proto (sep);
  table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
  handle = session_lookup_endpoint_listener (table_index, sep, 1);
  if (handle != SESSION_INVALID_HANDLE)
    {
      ls = listen_session_get_from_handle (handle);
      return app_listener_get_w_session ((session_t *) ls);
    }

  return 0;
}

int
app_listener_alloc_and_init (application_t * app,
			     session_endpoint_cfg_t * sep,
			     app_listener_t ** listener)
{
  app_listener_t *app_listener;
  transport_connection_t *tc;
  u32 al_index, table_index;
  session_handle_t lh;
  session_type_t st;
  session_t *ls = 0;
  int rv;

  app_listener = app_listener_alloc (app);
  al_index = app_listener->al_index;
  st = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);

  /*
   * Add session endpoint to local session table. Only binds to "inaddr_any"
   * (i.e., zero address) are added to local scope table.
   */
  if (application_has_local_scope (app)
      && session_endpoint_is_local ((session_endpoint_t *) sep))
    {
      session_type_t local_st;

      local_st = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
						 sep->is_ip4);
      ls = listen_session_alloc (0, local_st);
      ls->app_index = app->app_index;
      ls->app_wrk_index = sep->app_wrk_index;
      lh = session_handle (ls);

      if ((rv = session_listen (ls, sep)))
	{
	  ls = session_get_from_handle (lh);
	  session_free (ls);
	  return rv;
	}

      ls = session_get_from_handle (lh);
      app_listener = app_listener_get (app, al_index);
      app_listener->local_index = ls->session_index;
      app_listener->ls_handle = lh;
      ls->al_index = al_index;

      table_index = application_local_session_table (app);
      session_lookup_add_session_endpoint (table_index,
					   (session_endpoint_t *) sep, lh);
    }

  if (application_has_global_scope (app))
    {
      /*
       * Start listening on local endpoint for requested transport and scope.
       * Creates a stream session with state LISTENING to be used in session
       * lookups, prior to establishing connection. Requests transport to
       * build it's own specific listening connection.
       */
      ls = listen_session_alloc (0, st);
      ls->app_index = app->app_index;
      ls->app_wrk_index = sep->app_wrk_index;

      /* Listen pool can be reallocated if the transport is
       * recursive (tls) */
      lh = listen_session_get_handle (ls);

      if ((rv = session_listen (ls, sep)))
	{
	  ls = listen_session_get_from_handle (lh);
	  session_free (ls);
	  return rv;
	}
      ls = listen_session_get_from_handle (lh);
      app_listener = app_listener_get (app, al_index);
      app_listener->session_index = ls->session_index;
      app_listener->ls_handle = lh;
      ls->al_index = al_index;

      /* Add to the global lookup table after transport was initialized.
       * Lookup table needs to be populated only now because sessions
       * with cut-through transport are are added to app local tables that
       * are not related to network fibs, i.e., cannot be added as
       * connections */
      tc = session_get_transport (ls);
      if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
	{
	  fib_protocol_t fib_proto;
	  fib_proto = session_endpoint_fib_proto ((session_endpoint_t *) sep);
	  table_index = session_lookup_get_index_for_fib (fib_proto,
							  sep->fib_index);
	  ASSERT (table_index != SESSION_TABLE_INVALID_INDEX);
	  session_lookup_add_session_endpoint (table_index,
					       (session_endpoint_t *) sep,
					       lh);
	}
    }

  if (!ls)
    {
      app_listener_free (app, app_listener);
      return -1;
    }

  *listener = app_listener;
  return 0;
}

void
app_listener_cleanup (app_listener_t * al)
{
  application_t *app = application_get (al->app_index);
  session_t *ls;

  if (al->session_index != SESSION_INVALID_INDEX)
    {
      ls = session_get (al->session_index, 0);
      session_stop_listen (ls);
      listen_session_free (ls);
    }
  if (al->local_index != SESSION_INVALID_INDEX)
    {
      session_endpoint_t sep = SESSION_ENDPOINT_NULL;
      u32 table_index;

      table_index = application_local_session_table (app);
      ls = listen_session_get (al->local_index);
      ct_session_endpoint (ls, &sep);
      session_lookup_del_session_endpoint (table_index, &sep);
      session_stop_listen (ls);
      listen_session_free (ls);
    }
  app_listener_free (app, al);
}

static app_worker_t *
app_listener_select_worker (application_t * app, app_listener_t * al)
{
  u32 wrk_index;

  app = application_get (al->app_index);
  wrk_index = clib_bitmap_next_set (al->workers, al->accept_rotor + 1);
  if (wrk_index == ~0)
    wrk_index = clib_bitmap_first_set (al->workers);

  ASSERT (wrk_index != ~0);
  al->accept_rotor = wrk_index;
  return application_get_worker (app, wrk_index);
}

session_t *
app_listener_get_session (app_listener_t * al)
{
  if (al->session_index == SESSION_INVALID_INDEX)
    return 0;

  return listen_session_get (al->session_index);
}

session_t *
app_listener_get_local_session (app_listener_t * al)
{
  if (al->local_index == SESSION_INVALID_INDEX)
    return 0;
  return listen_session_get (al->local_index);
}

static app_worker_map_t *
app_worker_map_alloc (application_t * app)
{
  app_worker_map_t *map;
  pool_get (app->worker_maps, map);
  clib_memset (map, 0, sizeof (*map));
  return map;
}

static u32
app_worker_map_index (application_t * app, app_worker_map_t * map)
{
  return (map - app->worker_maps);
}

static void
app_worker_map_free (application_t * app, app_worker_map_t * map)
{
  pool_put (app->worker_maps, map);
}

static app_worker_map_t *
app_worker_map_get (application_t * app, u32 map_index)
{
  if (pool_is_free_index (app->worker_maps, map_index))
    return 0;
  return pool_elt_at_index (app->worker_maps, map_index);
}

static const u8 *
app_get_name (application_t * app)
{
  return app->name;
}

u32
application_session_table (application_t * app, u8 fib_proto)
{
  app_namespace_t *app_ns;
  app_ns = app_namespace_get (app->ns_index);
  if (!application_has_global_scope (app))
    return APP_INVALID_INDEX;
  if (fib_proto == FIB_PROTOCOL_IP4)
    return session_lookup_get_index_for_fib (fib_proto,
					     app_ns->ip4_fib_index);
  else
    return session_lookup_get_index_for_fib (fib_proto,
					     app_ns->ip6_fib_index);
}

u32
application_local_session_table (application_t * app)
{
  app_namespace_t *app_ns;
  if (!application_has_local_scope (app))
    return APP_INVALID_INDEX;
  app_ns = app_namespace_get (app->ns_index);
  return app_ns->local_table_index;
}

/**
 * Returns app name for app-index
 */
const u8 *
application_name_from_index (u32 app_index)
{
  application_t *app = application_get (app_index);
  if (!app)
    return 0;
  return app_get_name (app);
}

static void
application_api_table_add (u32 app_index, u32 api_client_index)
{
  if (api_client_index != APP_INVALID_INDEX)
    hash_set (app_main.app_by_api_client_index, api_client_index, app_index);
}

static void
application_api_table_del (u32 api_client_index)
{
  hash_unset (app_main.app_by_api_client_index, api_client_index);
}

static void
application_name_table_add (application_t * app)
{
  hash_set_mem (app_main.app_by_name, app->name, app->app_index);
}

static void
application_name_table_del (application_t * app)
{
  hash_unset_mem (app_main.app_by_name, app->name);
}

application_t *
application_lookup (u32 api_client_index)
{
  uword *p;
  p = hash_get (app_main.app_by_api_client_index, api_client_index);
  if (p)
    return application_get_if_valid (p[0]);

  return 0;
}

application_t *
application_lookup_name (const u8 * name)
{
  uword *p;
  p = hash_get_mem (app_main.app_by_name, name);
  if (p)
    return application_get (p[0]);

  return 0;
}

static application_t *
application_alloc (void)
{
  application_t *app;
  pool_get (app_main.app_pool, app);
  clib_memset (app, 0, sizeof (*app));
  app->app_index = app - app_main.app_pool;
  return app;
}

application_t *
application_get (u32 app_index)
{
  if (app_index == APP_INVALID_INDEX)
    return 0;
  return pool_elt_at_index (app_main.app_pool, app_index);
}

application_t *
application_get_if_valid (u32 app_index)
{
  if (pool_is_free_index (app_main.app_pool, app_index))
    return 0;

  return pool_elt_at_index (app_main.app_pool, app_index);
}

static void
application_verify_cb_fns (session_cb_vft_t * cb_fns)
{
  if (cb_fns->session_accept_callback == 0)
    clib_warning ("No accept callback function provided");
  if (cb_fns->session_connected_callback == 0)
    clib_warning ("No session connected callback function provided");
  if (cb_fns->session_disconnect_callback == 0)
    clib_warning ("No session disconnect callback function provided");
  if (cb_fns->session_reset_callback == 0)
    clib_warning ("No session reset callback function provided");
}

/**
 * Check app config for given segment type
 *
 * Returns 1 on success and 0 otherwise
 */
static u8
application_verify_cfg (ssvm_segment_type_t st)
{
  u8 is_valid;
  if (st == SSVM_SEGMENT_MEMFD)
    {
      is_valid = (session_main_get_evt_q_segment () != 0);
      if (!is_valid)
	clib_warning ("memfd seg: vpp's event qs IN binary api svm region");
      return is_valid;
    }
  else if (st == SSVM_SEGMENT_SHM)
    {
      is_valid = (session_main_get_evt_q_segment () == 0);
      if (!is_valid)
	clib_warning ("shm seg: vpp's event qs NOT IN binary api svm region");
      return is_valid;
    }
  else
    return 1;
}

static int
application_alloc_and_init (app_init_args_t * a)
{
  ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD;
  segment_manager_props_t *props;
  application_t *app;
  u64 *options;

  app = application_alloc ();
  options = a->options;
  /*
   * Make sure we support the requested configuration
   */
  if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_IS_BUILTIN)
    seg_type = SSVM_SEGMENT_PRIVATE;

  if ((options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
      && seg_type != SSVM_SEGMENT_MEMFD)
    {
      clib_warning ("mq eventfds can only be used if socket transport is "
		    "used for binary api");
      return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
    }

  if (!application_verify_cfg (seg_type))
    return VNET_API_ERROR_APP_UNSUPPORTED_CFG;

  if (options[APP_OPTIONS_PREALLOC_FIFO_PAIRS]
      && options[APP_OPTIONS_PREALLOC_FIFO_HDRS])
    return VNET_API_ERROR_APP_UNSUPPORTED_CFG;

  /* Check that the obvious things are properly set up */
  application_verify_cb_fns (a->session_cb_vft);

  app->flags = options[APP_OPTIONS_FLAGS];
  app->cb_fns = *a->session_cb_vft;
  app->ns_index = options[APP_OPTIONS_NAMESPACE];
  app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT];
  app->name = vec_dup (a->name);

  /* If no scope enabled, default to global */
  if (!application_has_global_scope (app)
      && !application_has_local_scope (app))
    app->flags |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;

  props = application_segment_manager_properties (app);
  segment_manager_props_init (props);
  props->segment_size = options[APP_OPTIONS_SEGMENT_SIZE];
  props->prealloc_fifos = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS];
  props->prealloc_fifo_hdrs = options[APP_OPTIONS_PREALLOC_FIFO_HDRS];
  if (options[APP_OPTIONS_ADD_SEGMENT_SIZE])
    {
      props->add_segment_size = options[APP_OPTIONS_ADD_SEGMENT_SIZE];
      props->add_segment = 1;
    }
  if (options[APP_OPTIONS_RX_FIFO_SIZE])
    props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE];
  if (options[APP_OPTIONS_TX_FIFO_SIZE])
    props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
  if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
    props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
  if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
    props->use_mq_eventfd = 1;
  if (options[APP_OPTIONS_TLS_ENGINE])
    app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
  if (options[APP_OPTIONS_MAX_FIFO_SIZE])
    props->max_fifo_size = options[APP_OPTIONS_MAX_FIFO_SIZE];
  if (options[APP_OPTIONS_HIGH_WATERMARK])
    props->high_watermark = options[APP_OPTIONS_HIGH_WATERMARK];
  if (options[APP_OPTIONS_LOW_WATERMARK])
    props->low_watermark = options[APP_OPTIONS_LOW_WATERMARK];
  if (options[APP_OPTIONS_PCT_FIRST_ALLOC])
    props->pct_first_alloc = options[APP_OPTIONS_PCT_FIRST_ALLOC];
  props->segment_type = seg_type;

  /* Add app to lookup by api_client_index table */
  if (!application_is_builtin (app))
    application_api_table_add (app->app_index, a->api_client_index);
  else
    application_name_table_add (app);

  a->app_index = app->app_index;

  APP_DBG ("New app name: %v api index: %u index %u", app->name,
	   a->api_client_index, app->app_index);

  return 0;
}

static void
application_free (application_t * app)
{
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;

  /*
   * The app event queue allocated in first segment is cleared with
   * the segment manager. No need to explicitly free it.
   */
  APP_DBG ("Delete app name %v index: %d", app->name, app->app_index);

  if (application_is_proxy (app))
    application_remove_proxy (app);

  /*
   * Free workers
   */

  /* *INDENT-OFF* */
  pool_flush (wrk_map, app->worker_maps, ({
    app_wrk = app_worker_get (wrk_map->wrk_index);
    app_worker_free (app_wrk);
  }));
  /* *INDENT-ON* */
  pool_free (app->worker_maps);

  /*
   * Cleanup remaining state
   */
  if (application_is_builtin (app))
    application_name_table_del (app);
  vec_free (app->name);
  pool_put (app_main.app_pool, app);
}

static void
application_detach_process (application_t * app, u32 api_client_index)
{
  vnet_app_worker_add_del_args_t _args = { 0 }, *args = &_args;
  app_worker_map_t *wrk_map;
  u32 *wrks = 0, *wrk_index;
  app_worker_t *app_wrk;

  if (api_client_index == ~0)
    {
      application_free (app);
      return;
    }

  APP_DBG ("Detaching for app %v index %u api client index %u", app->name,
	   app->app_index, api_client_index);

  /* *INDENT-OFF* */
  pool_foreach (wrk_map, app->worker_maps)  {
    app_wrk = app_worker_get (wrk_map->wrk_index);
    if (app_wrk->api_client_index == api_client_index)
      vec_add1 (wrks, app_wrk->wrk_index);
  }
  /* *INDENT-ON* */

  if (!vec_len (wrks))
    {
      clib_warning ("no workers for app %u api_index %u", app->app_index,
		    api_client_index);
      return;
    }

  args->app_index = app->app_index;
  args->api_client_index = api_client_index;
  vec_foreach (wrk_index, wrks)
  {
    app_wrk = app_worker_get (wrk_index[0]);
    args->wrk_map_index = app_wrk->wrk_map_index;
    args->is_add = 0;
    vnet_app_worker_add_del (args);
  }
  vec_free (wrks);
}

app_worker_t *
application_get_worker (application_t * app, u32 wrk_map_index)
{
  app_worker_map_t *map;
  map = app_worker_map_get (app, wrk_map_index);
  if (!map)
    return 0;
  return app_worker_get (map->wrk_index);
}

app_worker_t *
application_get_default_worker (application_t * app)
{
  return application_get_worker (app, 0);
}

u32
application_n_workers (application_t * app)
{
  return pool_elts (app->worker_maps);
}

app_worker_t *
application_listener_select_worker (session_t * ls)
{
  application_t *app;
  app_listener_t *al;

  app = application_get (ls->app_index);
  al = app_listener_get (app, ls->al_index);
  return app_listener_select_worker (app, al);
}

int
application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk)
{
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;
  segment_manager_t *sm;
  int rv;

  app_wrk = app_worker_alloc (app);
  wrk_map = app_worker_map_alloc (app);
  wrk_map->wrk_index = app_wrk->wrk_index;
  app_wrk->wrk_map_index = app_worker_map_index (app, wrk_map);

  /*
   * Setup first segment manager
   */
  sm = segment_manager_alloc ();
  sm->app_wrk_index = app_wrk->wrk_index;

  if ((rv = segment_manager_init_first (sm)))
    {
      app_worker_free (app_wrk);
      return rv;
    }
  sm->first_is_protected = 1;

  /*
   * Setup app worker
   */
  app_wrk->first_segment_manager = segment_manager_index (sm);
  app_wrk->listeners_table = hash_create (0, sizeof (u64));
  app_wrk->event_queue = segment_manager_event_queue (sm);
  app_wrk->app_is_builtin = application_is_builtin (app);

  *wrk = app_wrk;

  return 0;
}

int
vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a)
{
  fifo_segment_t *fs;
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;
  segment_manager_t *sm;
  application_t *app;
  int rv;

  app = application_get (a->app_index);
  if (!app)
    return VNET_API_ERROR_INVALID_VALUE;

  if (a->is_add)
    {
      if ((rv = application_alloc_worker_and_init (app, &app_wrk)))
	return rv;

      /* Map worker api index to the app */
      app_wrk->api_client_index = a->api_client_index;
      application_api_table_add (app->app_index, a->api_client_index);

      sm = segment_manager_get (app_wrk->first_segment_manager);
      fs = segment_manager_get_segment_w_lock (sm, 0);
      a->segment = &fs->ssvm;
      a->segment_handle = segment_manager_segment_handle (sm, fs);
      segment_manager_segment_reader_unlock (sm);
      a->evt_q = app_wrk->event_queue;
      a->wrk_map_index = app_wrk->wrk_map_index;
    }
  else
    {
      wrk_map = app_worker_map_get (app, a->wrk_map_index);
      if (!wrk_map)
	return VNET_API_ERROR_INVALID_VALUE;

      app_wrk = app_worker_get (wrk_map->wrk_index);
      if (!app_wrk)
	return VNET_API_ERROR_INVALID_VALUE;

      application_api_table_del (app_wrk->api_client_index);
      app_worker_free (app_wrk);
      app_worker_map_free (app, wrk_map);
      if (application_n_workers (app) == 0)
	application_free (app);
    }
  return 0;
}

static int
app_validate_namespace (u8 * namespace_id, u64 secret, u32 * app_ns_index)
{
  app_namespace_t *app_ns;
  if (vec_len (namespace_id) == 0)
    {
      /* Use default namespace */
      *app_ns_index = 0;
      return 0;
    }

  *app_ns_index = app_namespace_index_from_id (namespace_id);
  if (*app_ns_index == APP_NAMESPACE_INVALID_INDEX)
    return VNET_API_ERROR_APP_INVALID_NS;
  app_ns = app_namespace_get (*app_ns_index);
  if (!app_ns)
    return VNET_API_ERROR_APP_INVALID_NS;
  if (app_ns->ns_secret != secret)
    return VNET_API_ERROR_APP_WRONG_NS_SECRET;
  return 0;
}

static u8 *
app_name_from_api_index (u32 api_client_index)
{
  vl_api_registration_t *regp;
  regp = vl_api_client_index_to_registration (api_client_index);
  if (regp)
    return format (0, "%s", regp->name);

  clib_warning ("api client index %u does not have an api registration!",
		api_client_index);
  return format (0, "unknown");
}

/**
 * Attach application to vpp
 *
 * Allocates a vpp app, i.e., a structure that keeps back pointers
 * to external app and a segment manager for shared memory fifo based
 * communication with the external app.
 */
int
vnet_application_attach (vnet_app_attach_args_t * a)
{
  fifo_segment_t *fs;
  application_t *app = 0;
  app_worker_t *app_wrk;
  segment_manager_t *sm;
  u32 app_ns_index = 0;
  u8 *app_name = 0;
  u64 secret;
  int rv;

  if (a->api_client_index != APP_INVALID_INDEX)
    app = application_lookup (a->api_client_index);
  else if (a->name)
    app = application_lookup_name (a->name);
  else
    return VNET_API_ERROR_INVALID_VALUE;

  if (app)
    return VNET_API_ERROR_APP_ALREADY_ATTACHED;

  /* Socket api sets the name and validates namespace prior to attach */
  if (!a->use_sock_api)
    {
      if (a->api_client_index != APP_INVALID_INDEX)
	{
	  app_name = app_name_from_api_index (a->api_client_index);
	  a->name = app_name;
	}

      secret = a->options[APP_OPTIONS_NAMESPACE_SECRET];
      if ((rv = app_validate_namespace (a->namespace_id, secret,
					&app_ns_index)))
	return rv;
      a->options[APP_OPTIONS_NAMESPACE] = app_ns_index;
    }

  if ((rv = application_alloc_and_init ((app_init_args_t *) a)))
    return rv;

  app = application_get (a->app_index);
  if ((rv = application_alloc_worker_and_init (app, &app_wrk)))
    return rv;

  a->app_evt_q = app_wrk->event_queue;
  app_wrk->api_client_index = a->api_client_index;
  sm = segment_manager_get (app_wrk->first_segment_manager);
  fs = segment_manager_get_segment_w_lock (sm, 0);

  if (application_is_proxy (app))
    {
      application_setup_proxy (app);
      /* The segment manager pool is reallocated because a new listener
       * is added. Re-grab segment manager to avoid dangling reference */
      sm = segment_manager_get (app_wrk->first_segment_manager);
    }

  ASSERT (vec_len (fs->ssvm.name) <= 128);
  a->segment = &fs->ssvm;
  a->segment_handle = segment_manager_segment_handle (sm, fs);

  segment_manager_segment_reader_unlock (sm);
  vec_free (app_name);
  return 0;
}

/**
 * Detach application from vpp
 */
int
vnet_application_detach (vnet_app_detach_args_t * a)
{
  application_t *app;

  app = application_get_if_valid (a->app_index);
  if (!app)
    {
      clib_warning ("app not attached");
      return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
    }

  app_interface_check_thread_and_barrier (vnet_application_detach, a);
  application_detach_process (app, a->api_client_index);
  return 0;
}


static u8
session_endpoint_in_ns (session_endpoint_t * sep)
{
  u8 is_lep = session_endpoint_is_local (sep);
  if (!is_lep && sep->sw_if_index != ENDPOINT_INVALID_INDEX
      && !ip_interface_has_address (sep->sw_if_index, &sep->ip, sep->is_ip4))
    {
      clib_warning ("sw_if_index %u not configured with ip %U",
		    sep->sw_if_index, format_ip46_address, &sep->ip,
		    sep->is_ip4);
      return 0;
    }
  return (is_lep || ip_is_local (sep->fib_index, &sep->ip, sep->is_ip4));
}

static void
session_endpoint_update_for_app (session_endpoint_cfg_t * sep,
				 application_t * app, u8 is_connect)
{
  app_namespace_t *app_ns;
  u32 ns_index, fib_index;

  ns_index = app->ns_index;

  /* App is a transport proto, so fetch the calling app's ns */
  if (app->flags & APP_OPTIONS_FLAGS_IS_TRANSPORT_APP)
    ns_index = sep->ns_index;

  app_ns = app_namespace_get (ns_index);
  if (!app_ns)
    return;

  /* Ask transport and network to bind to/connect using local interface
   * that "supports" app's namespace. This will fix our local connection
   * endpoint.
   */

  /* If in default namespace and user requested a fib index use it */
  if (ns_index == 0 && sep->fib_index != ENDPOINT_INVALID_INDEX)
    fib_index = sep->fib_index;
  else
    fib_index = sep->is_ip4 ? app_ns->ip4_fib_index : app_ns->ip6_fib_index;
  sep->peer.fib_index = fib_index;
  sep->fib_index = fib_index;

  if (!is_connect)
    {
      sep->sw_if_index = app_ns->sw_if_index;
    }
  else
    {
      if (app_ns->sw_if_index != APP_NAMESPACE_INVALID_INDEX
	  && sep->peer.sw_if_index != ENDPOINT_INVALID_INDEX
	  && sep->peer.sw_if_index != app_ns->sw_if_index)
	clib_warning ("Local sw_if_index different from app ns sw_if_index");

      sep->peer.sw_if_index = app_ns->sw_if_index;
    }
}

int
vnet_listen (vnet_listen_args_t * a)
{
  app_listener_t *app_listener;
  app_worker_t *app_wrk;
  application_t *app;
  int rv;

  ASSERT (vlib_thread_is_main_w_barrier ());

  app = application_get_if_valid (a->app_index);
  if (!app)
    return SESSION_E_NOAPP;

  app_wrk = application_get_worker (app, a->wrk_map_index);
  if (!app_wrk)
    return SESSION_E_INVALID_APPWRK;

  a->sep_ext.app_wrk_index = app_wrk->wrk_index;

  session_endpoint_update_for_app (&a->sep_ext, app, 0 /* is_connect */ );
  if (!session_endpoint_in_ns (&a->sep))
    return SESSION_E_INVALID_NS;

  /*
   * Check if we already have an app listener
   */
  app_listener = app_listener_lookup (app, &a->sep_ext);
  if (app_listener)
    {
      if (app_listener->app_index != app->app_index)
	return SESSION_E_ALREADY_LISTENING;
      if ((rv = app_worker_start_listen (app_wrk, app_listener)))
	return rv;
      a->handle = app_listener_handle (app_listener);
      return 0;
    }

  /*
   * Create new app listener
   */
  if ((rv = app_listener_alloc_and_init (app, &a->sep_ext, &app_listener)))
    return rv;

  if ((rv = app_worker_start_listen (app_wrk, app_listener)))
    {
      app_listener_cleanup (app_listener);
      return rv;
    }

  a->handle = app_listener_handle (app_listener);
  return 0;
}

int
vnet_connect (vnet_connect_args_t * a)
{
  app_worker_t *client_wrk;
  application_t *client;

  ASSERT (vlib_thread_is_main_w_barrier ());

  if (session_endpoint_is_zero (&a->sep))
    return SESSION_E_INVALID_RMT_IP;

  client = application_get (a->app_index);
  session_endpoint_update_for_app (&a->sep_ext, client, 1 /* is_connect */ );
  client_wrk = application_get_worker (client, a->wrk_map_index);

  /*
   * First check the local scope for locally attached destinations.
   * If we have local scope, we pass *all* connects through it since we may
   * have special policy rules even for non-local destinations, think proxy.
   */
  if (application_has_local_scope (client))
    {
      int rv;

      a->sep_ext.original_tp = a->sep_ext.transport_proto;
      a->sep_ext.transport_proto = TRANSPORT_PROTO_NONE;
      rv = app_worker_connect_session (client_wrk, &a->sep, a->api_context);
      if (rv <= 0)
	return rv;
      a->sep_ext.transport_proto = a->sep_ext.original_tp;
    }
  /*
   * Not connecting to a local server, propagate to transport
   */
  return app_worker_connect_session (client_wrk, &a->sep, a->api_context);
}

int
vnet_unlisten (vnet_unlisten_args_t * a)
{
  app_worker_t *app_wrk;
  app_listener_t *al;
  application_t *app;

  ASSERT (vlib_thread_is_main_w_barrier ());

  if (!(app = application_get_if_valid (a->app_index)))
    return SESSION_E_NOAPP;

  if (!(al = app_listener_get_w_handle (a->handle)))
    return SESSION_E_NOLISTEN;

  if (al->app_index != app->app_index)
    {
      clib_warning ("app doesn't own handle %llu!", a->handle);
      return SESSION_E_OWNER;
    }

  app_wrk = application_get_worker (app, a->wrk_map_index);
  if (!app_wrk)
    {
      clib_warning ("no app %u worker %u", app->app_index, a->wrk_map_index);
      return SESSION_E_INVALID_APPWRK;
    }

  return app_worker_stop_listen (app_wrk, al);
}

int
vnet_disconnect_session (vnet_disconnect_args_t * a)
{
  app_worker_t *app_wrk;
  session_t *s;

  s = session_get_from_handle_if_valid (a->handle);
  if (!s)
    return SESSION_E_NOSESSION;

  app_wrk = app_worker_get (s->app_wrk_index);
  if (app_wrk->app_index != a->app_index)
    return SESSION_E_OWNER;

  /* We're peeking into another's thread pool. Make sure */
  ASSERT (s->session_index == session_index_from_handle (a->handle));

  session_close (s);
  return 0;
}

int
application_change_listener_owner (session_t * s, app_worker_t * app_wrk)
{
  app_worker_t *old_wrk = app_worker_get (s->app_wrk_index);
  app_listener_t *app_listener;
  application_t *app;
  int rv;

  if (!old_wrk)
    return SESSION_E_INVALID_APPWRK;

  hash_unset (old_wrk->listeners_table, listen_session_get_handle (s));
  if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL
      && s->rx_fifo)
    segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);

  app = application_get (old_wrk->app_index);
  if (!app)
    return SESSION_E_NOAPP;

  app_listener = app_listener_get (app, s->al_index);

  /* Only remove from lb for now */
  app_listener->workers = clib_bitmap_set (app_listener->workers,
					   old_wrk->wrk_map_index, 0);

  if ((rv = app_worker_start_listen (app_wrk, app_listener)))
    return rv;

  s->app_wrk_index = app_wrk->wrk_index;

  return 0;
}

int
application_is_proxy (application_t * app)
{
  return (app->flags & APP_OPTIONS_FLAGS_IS_PROXY);
}

int
application_is_builtin (application_t * app)
{
  return (app->flags & APP_OPTIONS_FLAGS_IS_BUILTIN);
}

int
application_is_builtin_proxy (application_t * app)
{
  return (application_is_proxy (app) && application_is_builtin (app));
}

u8
application_has_local_scope (application_t * app)
{
  return app->flags & APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
}

u8
application_has_global_scope (application_t * app)
{
  return app->flags & APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
}

static clib_error_t *
application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto,
					u8 transport_proto, u8 is_start)
{
  app_namespace_t *app_ns = app_namespace_get (app->ns_index);
  u8 is_ip4 = (fib_proto == FIB_PROTOCOL_IP4);
  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
  transport_connection_t *tc;
  app_worker_t *app_wrk;
  app_listener_t *al;
  session_t *s;
  u32 flags;

  /* TODO decide if we want proxy to be enabled for all workers */
  app_wrk = application_get_default_worker (app);
  if (is_start)
    {
      s = app_worker_first_listener (app_wrk, fib_proto, transport_proto);
      if (!s)
	{
	  sep.is_ip4 = is_ip4;
	  sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
	  sep.sw_if_index = app_ns->sw_if_index;
	  sep.transport_proto = transport_proto;
	  sep.app_wrk_index = app_wrk->wrk_index;	/* only default */

	  /* force global scope listener */
	  flags = app->flags;
	  app->flags &= ~APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
	  app_listener_alloc_and_init (app, &sep, &al);
	  app->flags = flags;

	  app_worker_start_listen (app_wrk, al);
	  s = listen_session_get (al->session_index);
	  s->flags |= SESSION_F_PROXY;
	}
    }
  else
    {
      s = app_worker_proxy_listener (app_wrk, fib_proto, transport_proto);
      ASSERT (s);
    }

  tc = listen_session_get_transport (s);

  if (!ip_is_zero (&tc->lcl_ip, 1))
    {
      u32 sti;
      sep.is_ip4 = is_ip4;
      sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
      sep.transport_proto = transport_proto;
      sep.port = 0;
      sti = session_lookup_get_index_for_fib (fib_proto, sep.fib_index);
      if (is_start)
	session_lookup_add_session_endpoint (sti,
					     (session_endpoint_t *) & sep,
					     s->session_index);
      else
	session_lookup_del_session_endpoint (sti,
					     (session_endpoint_t *) & sep);
    }

  return 0;
}

static void
application_start_stop_proxy_local_scope (application_t * app,
					  u8 transport_proto, u8 is_start)
{
  session_endpoint_t sep = SESSION_ENDPOINT_NULL;
  app_namespace_t *app_ns;
  app_ns = app_namespace_get (app->ns_index);
  sep.is_ip4 = 1;
  sep.transport_proto = transport_proto;
  sep.port = 0;

  if (is_start)
    {
      session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
					   app->app_index);
      sep.is_ip4 = 0;
      session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
					   app->app_index);
    }
  else
    {
      session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
      sep.is_ip4 = 0;
      session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
    }
}

void
application_start_stop_proxy (application_t * app,
			      transport_proto_t transport_proto, u8 is_start)
{
  if (application_has_local_scope (app))
    application_start_stop_proxy_local_scope (app, transport_proto, is_start);

  if (application_has_global_scope (app))
    {
      application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP4,
					      transport_proto, is_start);
      application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP6,
					      transport_proto, is_start);
    }
}

void
application_setup_proxy (application_t * app)
{
  u16 transports = app->proxied_transports;
  transport_proto_t tp;

  ASSERT (application_is_proxy (app));

  /* *INDENT-OFF* */
  transport_proto_foreach (tp, ({
    if (transports & (1 << tp))
      application_start_stop_proxy (app, tp, 1);
  }));
  /* *INDENT-ON* */
}

void
application_remove_proxy (application_t * app)
{
  u16 transports = app->proxied_transports;
  transport_proto_t tp;

  ASSERT (application_is_proxy (app));

  /* *INDENT-OFF* */
  transport_proto_foreach (tp, ({
    if (transports & (1 << tp))
      application_start_stop_proxy (app, tp, 0);
  }));
  /* *INDENT-ON* */
}

segment_manager_props_t *
application_segment_manager_properties (application_t * app)
{
  return &app->sm_properties;
}

segment_manager_props_t *
application_get_segment_manager_properties (u32 app_index)
{
  application_t *app = application_get (app_index);
  return &app->sm_properties;
}

static void
application_format_listeners (application_t * app, int verbose)
{
  vlib_main_t *vm = vlib_get_main ();
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;
  u32 sm_index;
  u64 handle;

  if (!app)
    {
      vlib_cli_output (vm, "%U", format_app_worker_listener, 0 /* header */ ,
		       0, 0, verbose);
      return;
    }

  /* *INDENT-OFF* */
  pool_foreach (wrk_map, app->worker_maps)  {
    app_wrk = app_worker_get (wrk_map->wrk_index);
    if (hash_elts (app_wrk->listeners_table) == 0)
      continue;
    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
      vlib_cli_output (vm, "%U", format_app_worker_listener, app_wrk,
                       handle, sm_index, verbose);
    }));
  }
  /* *INDENT-ON* */
}

static void
application_format_connects (application_t * app, int verbose)
{
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;

  if (!app)
    {
      app_worker_format_connects (0, verbose);
      return;
    }

  /* *INDENT-OFF* */
  pool_foreach (wrk_map, app->worker_maps)  {
    app_wrk = app_worker_get (wrk_map->wrk_index);
    app_worker_format_connects (app_wrk, verbose);
  }
  /* *INDENT-ON* */
}

u8 *
format_cert_key_pair (u8 * s, va_list * args)
{
  app_cert_key_pair_t *ckpair = va_arg (*args, app_cert_key_pair_t *);
  int key_len = 0, cert_len = 0;
  cert_len = vec_len (ckpair->cert);
  key_len = vec_len (ckpair->key);
  if (ckpair->cert_key_index == 0)
    s = format (s, "DEFAULT (cert:%d, key:%d)", cert_len, key_len);
  else
    s = format (s, "%d (cert:%d, key:%d)", ckpair->cert_key_index,
		cert_len, key_len);
  return s;
}

u8 *
format_crypto_engine (u8 * s, va_list * args)
{
  u32 engine = va_arg (*args, u32);
  switch (engine)
    {
    case CRYPTO_ENGINE_NONE:
      return format (s, "none");
    case CRYPTO_ENGINE_MBEDTLS:
      return format (s, "mbedtls");
    case CRYPTO_ENGINE_OPENSSL:
      return format (s, "openssl");
    case CRYPTO_ENGINE_PICOTLS:
      return format (s, "picotls");
    case CRYPTO_ENGINE_VPP:
      return format (s, "vpp");
    default:
      return format (s, "unknown engine");
    }
  return s;
}

uword
unformat_crypto_engine (unformat_input_t * input, va_list * args)
{
  u8 *a = va_arg (*args, u8 *);
  if (unformat (input, "mbedtls"))
    *a = CRYPTO_ENGINE_MBEDTLS;
  else if (unformat (input, "openssl"))
    *a = CRYPTO_ENGINE_OPENSSL;
  else if (unformat (input, "picotls"))
    *a = CRYPTO_ENGINE_PICOTLS;
  else if (unformat (input, "vpp"))
    *a = CRYPTO_ENGINE_VPP;
  else
    return 0;
  return 1;
}

u8 *
format_crypto_context (u8 * s, va_list * args)
{
  crypto_context_t *crctx = va_arg (*args, crypto_context_t *);
  s = format (s, "[0x%x][sub%d,ckpair%x]", crctx->ctx_index,
	      crctx->n_subscribers, crctx->ckpair_index);
  s = format (s, "[%U]", format_crypto_engine, crctx->crypto_engine);
  return s;
}

u8 *
format_application (u8 * s, va_list * args)
{
  application_t *app = va_arg (*args, application_t *);
  CLIB_UNUSED (int verbose) = va_arg (*args, int);
  segment_manager_props_t *props;
  const u8 *app_ns_name, *app_name;
  app_worker_map_t *wrk_map;
  app_worker_t *app_wrk;

  if (app == 0)
    {
      if (!verbose)
	s = format (s, "%-10s%-20s%-40s", "Index", "Name", "Namespace");
      return s;
    }

  app_name = app_get_name (app);
  app_ns_name = app_namespace_id_from_index (app->ns_index);
  props = application_segment_manager_properties (app);
  if (!verbose)
    {
      s = format (s, "%-10u%-20v%-40v", app->app_index, app_name,
		  app_ns_name);
      return s;
    }

  s = format (s, "app-name %v app-index %u ns-index %u seg-size %U\n",
	      app_name, app->app_index, app->ns_index,
	      format_memory_size, props->add_segment_size);
  s = format (s, "rx-fifo-size %U tx-fifo-size %U workers:\n",
	      format_memory_size, props->rx_fifo_size,
	      format_memory_size, props->tx_fifo_size);

  /* *INDENT-OFF* */
  pool_foreach (wrk_map, app->worker_maps)  {
      app_wrk = app_worker_get (wrk_map->wrk_index);
      s = format (s, "%U", format_app_worker, app_wrk);
  }
  /* *INDENT-ON* */

  return s;
}

void
application_format_all_listeners (vlib_main_t * vm, int verbose)
{
  application_t *app;

  if (!pool_elts (app_main.app_pool))
    {
      vlib_cli_output (vm, "No active server bindings");
      return;
    }

  application_format_listeners (0, verbose);

  /* *INDENT-OFF* */
  pool_foreach (app, app_main.app_pool)  {
    application_format_listeners (app, verbose);
  }
  /* *INDENT-ON* */
}

void
application_format_all_clients (vlib_main_t * vm, int verbose)
{
  application_t *app;

  if (!pool_elts (app_main.app_pool))
    {
      vlib_cli_output (vm, "No active apps");
      return;
    }

  application_format_connects (0, verbose);

  /* *INDENT-OFF* */
  pool_foreach (app, app_main.app_pool)  {
    application_format_connects (app, verbose);
  }
  /* *INDENT-ON* */
}

static clib_error_t *
show_certificate_command_fn (vlib_main_t * vm, unformat_input_t * input,
			     vlib_cli_command_t * cmd)
{
  app_cert_key_pair_t *ckpair;
  session_cli_return_if_not_enabled ();

  /* *INDENT-OFF* */
  pool_foreach (ckpair, app_main.cert_key_pair_store)  {
    vlib_cli_output (vm, "%U", format_cert_key_pair, ckpair);
  }
  /* *INDENT-ON* */
  return 0;
}

static inline void
appliction_format_app_mq (vlib_main_t * vm, application_t * app)
{
  app_worker_map_t *map;
  app_worker_t *wrk;
  /* *INDENT-OFF* */
  pool_foreach (map, app->worker_maps)  {
    wrk = app_worker_get (map->wrk_index);
    vlib_cli_output (vm, "[A%d][%d]%U", app->app_index,
		     map->wrk_index, format_svm_msg_q,
		     wrk->event_queue);
  }
  /* *INDENT-ON* */
}

static clib_error_t *
appliction_format_all_app_mq (vlib_main_t * vm)
{
  application_t *app;
  int i, n_threads;

  n_threads = vlib_get_n_threads ();

  for (i = 0; i < n_threads; i++)
    {
      vlib_cli_output (vm, "[Ctrl%d]%U", i, format_svm_msg_q,
		       session_main_get_vpp_event_queue (i));
    }

  /* *INDENT-OFF* */
  pool_foreach (app, app_main.app_pool)  {
      appliction_format_app_mq (vm, app);
  }
  /* *INDENT-ON* */
  return 0;
}

static clib_error_t *
show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
		     vlib_cli_command_t * cmd)
{
  int do_server = 0, do_client = 0, do_mq = 0;
  application_t *app;
  u32 app_index = ~0;
  int verbose = 0;

  session_cli_return_if_not_enabled ();

  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (input, "server"))
	do_server = 1;
      else if (unformat (input, "client"))
	do_client = 1;
      else if (unformat (input, "mq"))
	do_mq = 1;
      else if (unformat (input, "%u", &app_index))
	;
      else if (unformat (input, "verbose"))
	verbose = 1;
      else
	return clib_error_return (0, "unknown input `%U'",
				  format_unformat_error, input);
    }

  if (do_mq && app_index != ~0)
    {
      app = application_get_if_valid (app_index);
      if (!app)
	return clib_error_return (0, "No app with index %u", app_index);

      appliction_format_app_mq (vm, app);
      return 0;
    }

  if (do_mq)
    {
      appliction_format_all_app_mq (vm);
      return 0;
    }

  if (do_server)
    {
      application_format_all_listeners (vm, verbose);
      return 0;
    }

  if (do_client)
    {
      application_format_all_clients (vm, verbose);
      return 0;
    }

  if (app_index != ~0)
    {
      app = application_get_if_valid (app_index);
      if (!app)
	return clib_error_return (0, "No app with index %u", app_index);

      vlib_cli_output (vm, "%U", format_application, app, /* verbose */ 1);
      return 0;
    }

  /* Print app related info */
  if (!do_server && !do_client)
    {
      vlib_cli_output (vm, "%U", format_application, 0, 0);
      /* *INDENT-OFF* */
      pool_foreach (app, app_main.app_pool)  {
	vlib_cli_output (vm, "%U", format_application, app, 0);
      }
      /* *INDENT-ON* */
    }

  return 0;
}

/* Certificate store */

static app_cert_key_pair_t *
app_cert_key_pair_alloc ()
{
  app_cert_key_pair_t *ckpair;
  pool_get (app_main.cert_key_pair_store, ckpair);
  clib_memset (ckpair, 0, sizeof (*ckpair));
  ckpair->cert_key_index = ckpair - app_main.cert_key_pair_store;
  return ckpair;
}

app_cert_key_pair_t *
app_cert_key_pair_get_if_valid (u32 index)
{
  if (pool_is_free_index (app_main.cert_key_pair_store, index))
    return 0;
  return app_cert_key_pair_get (index);
}

app_cert_key_pair_t *
app_cert_key_pair_get (u32 index)
{
  return pool_elt_at_index (app_main.cert_key_pair_store, index);
}

app_cert_key_pair_t *
app_cert_key_pair_get_default ()
{
  /* To maintain legacy bapi */
  return app_cert_key_pair_get (0);
}

int
vnet_app_add_cert_key_pair (vnet_app_add_cert_key_pair_args_t * a)
{
  app_cert_key_pair_t *ckpair = app_cert_key_pair_alloc ();
  vec_validate (ckpair->cert, a->cert_len - 1);
  clib_memcpy_fast (ckpair->cert, a->cert, a->cert_len);
  vec_validate (ckpair->key, a->key_len - 1);
  clib_memcpy_fast (ckpair->key, a->key, a->key_len);
  a->index = ckpair->cert_key_index;
  return 0;
}

int
vnet_app_add_cert_key_interest (u32 index, u32 app_index)
{
  app_cert_key_pair_t *ckpair;
  if (!(ckpair = app_cert_key_pair_get_if_valid (index)))
    return -1;
  if (vec_search (ckpair->app_interests, app_index) != ~0)
    vec_add1 (ckpair->app_interests, app_index);
  return 0;
}

int
vnet_app_del_cert_key_pair (u32 index)
{
  app_cert_key_pair_t *ckpair;
  application_t *app;
  u32 *app_index;

  if (!(ckpair = app_cert_key_pair_get_if_valid (index)))
    return (VNET_API_ERROR_INVALID_VALUE);

  vec_foreach (app_index, ckpair->app_interests)
  {
    if ((app = application_get_if_valid (*app_index))
	&& app->cb_fns.app_cert_key_pair_delete_callback)
      app->cb_fns.app_cert_key_pair_delete_callback (ckpair);
  }

  vec_free (ckpair->cert);
  vec_free (ckpair->key);
  pool_put (app_main.cert_key_pair_store, ckpair);
  return 0;
}

clib_error_t *
application_init (vlib_main_t * vm)
{
  /* Index 0 was originally used by legacy apis, maintain as invalid */
  (void) app_cert_key_pair_alloc ();
  app_main.last_crypto_engine = CRYPTO_ENGINE_LAST;
  app_main.app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
  return 0;
}

/* *INDENT-OFF* */
VLIB_INIT_FUNCTION (application_init);

VLIB_CLI_COMMAND (show_app_command, static) =
{
  .path = "show app",
  .short_help = "show app [app_id] [server|client] [mq] [verbose]",
  .function = show_app_command_fn,
};

VLIB_CLI_COMMAND (show_certificate_command, static) =
{
  .path = "show app certificate",
  .short_help = "list app certs and keys present in store",
  .function = show_certificate_command_fn,
};
/* *INDENT-ON* */

crypto_engine_type_t
app_crypto_engine_type_add (void)
{
  return (++app_main.last_crypto_engine);
}

u8
app_crypto_engine_n_types (void)
{
  return (app_main.last_crypto_engine + 1);
}

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */