aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2023-10-30 08:21:36 +0100
committerFlorin Coras <florin.coras@gmail.com>2024-01-02 12:07:04 +0000
commitb1ea30e5639adb4290df2bfb493729cc0d5f3b70 (patch)
treea8baeea895dddc2482523698086bb08582854ec5 /src
parentb7e66f4a30bbde62595615f72a386383d931eeae (diff)
http: unify client/server state machines
Type: improvement Change-Id: I57a816fbed8b681dec201edc8d5950a34a555a2b Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src')
-rw-r--r--src/plugins/hs_apps/http_client_cli.c15
-rw-r--r--src/plugins/http/http.c656
-rw-r--r--src/plugins/http/http.h21
3 files changed, 363 insertions, 329 deletions
diff --git a/src/plugins/hs_apps/http_client_cli.c b/src/plugins/hs_apps/http_client_cli.c
index 113bc2ef750..f44d4e1bcd1 100644
--- a/src/plugins/hs_apps/http_client_cli.c
+++ b/src/plugins/hs_apps/http_client_cli.c
@@ -19,6 +19,14 @@
#include <http/http.h>
#include <hs_apps/http_cli.h>
+#define HCC_DEBUG 0
+
+#if HCC_DEBUG
+#define HCC_DBG(_fmt, _args...) clib_warning (_fmt, ##_args)
+#else
+#define HCC_DBG(_fmt, _args...)
+#endif
+
typedef struct
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -122,6 +130,8 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as,
http_msg_t msg;
int rv;
+ HCC_DBG ("hc_index: %d", hc_index);
+
if (err)
{
clib_warning ("connected error: hc_index(%d): %U", hc_index,
@@ -207,7 +217,7 @@ hcc_ts_rx_callback (session_t *ts)
return 0;
}
- if (!hs->to_recv)
+ if (hs->to_recv == 0)
{
rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
@@ -229,7 +239,7 @@ hcc_ts_rx_callback (session_t *ts)
rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hcm->http_response + curr);
if (rv < 0)
{
- clib_warning ("app dequeue failed");
+ clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv);
return -1;
}
@@ -239,6 +249,7 @@ hcc_ts_rx_callback (session_t *ts)
vec_set_len (hcm->http_response, curr + n_deq);
ASSERT (hs->to_recv >= rv);
hs->to_recv -= rv;
+ HCC_DBG ("app rcvd %d, remains %d", rv, hs->to_recv);
if (hs->to_recv == 0)
{
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index d404e734d48..4797308666e 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -47,6 +47,42 @@ const http_buffer_type_t msg_to_buf_type[] = {
[HTTP_MSG_DATA_PTR] = HTTP_BUFFER_PTR,
};
+u8 *
+format_http_state (u8 *s, va_list *va)
+{
+ http_state_t state = va_arg (*va, http_state_t);
+
+ switch (state)
+ {
+ case HTTP_STATE_IDLE:
+ return format (s, "idle");
+ case HTTP_STATE_WAIT_APP_METHOD:
+ return format (s, "wait app method");
+ case HTTP_STATE_WAIT_SERVER_REPLY:
+ return format (s, "wait server reply");
+ case HTTP_STATE_CLIENT_IO_MORE_DATA:
+ return format (s, "client io more data");
+ case HTTP_STATE_WAIT_CLIENT_METHOD:
+ return format (s, "wait client method");
+ case HTTP_STATE_WAIT_APP_REPLY:
+ return format (s, "wait app reply");
+ case HTTP_STATE_APP_IO_MORE_DATA:
+ return format (s, "app io more data");
+ default:
+ break;
+ }
+ return format (s, "unknown");
+}
+
+static inline void
+http_state_change (http_conn_t *hc, http_state_t state)
+{
+ HTTP_DBG (1, "changing http state %U -> %U", format_http_state,
+ hc->http_state, format_http_state, state);
+ ASSERT (hc->http_state != state);
+ hc->http_state = state;
+}
+
static inline http_worker_t *
http_worker_get (u32 thread_index)
{
@@ -164,7 +200,7 @@ http_ts_accept_callback (session_t *ts)
hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
hc->state = HTTP_CONN_STATE_ESTABLISHED;
- hc->http_state = HTTP_STATE_WAIT_METHOD;
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
ts->session_state = SESSION_STATE_READY;
ts->opaque = hc_index;
@@ -249,7 +285,7 @@ http_ts_connected_callback (u32 http_app_index, u32 ho_hc_index, session_t *ts,
hc->c_c_index = new_hc_index;
hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
hc->state = HTTP_CONN_STATE_ESTABLISHED;
- hc->http_state = HTTP_STATE_WAIT_APP;
+ http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
ts->session_state = SESSION_STATE_READY;
ts->opaque = new_hc_index;
@@ -265,6 +301,9 @@ http_ts_connected_callback (u32 http_app_index, u32 ho_hc_index, session_t *ts,
as->session_type = session_type_from_proto_and_ip (
TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type));
+ HTTP_DBG (1, "half-open hc index %d, hc index %d", ho_hc_index,
+ new_hc_index);
+
app_wrk = app_worker_get (hc->h_pa_wrk_index);
if (!app_wrk)
{
@@ -309,7 +348,7 @@ http_ts_reset_callback (session_t *ts)
hc->state = HTTP_CONN_STATE_CLOSED;
http_buffer_free (&hc->tx_buf);
- hc->http_state = HTTP_STATE_WAIT_METHOD;
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
session_transport_reset_notify (&hc->connection);
http_disconnect_transport (hc);
@@ -342,7 +381,7 @@ static const char *http_request_template = "GET %s HTTP/1.1\r\n"
"Accept: */*\r\n";
static u32
-send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset)
+http_send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset)
{
const u32 max_burst = 64 << 10;
session_t *ts;
@@ -364,7 +403,7 @@ send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset)
}
static void
-send_error (http_conn_t *hc, http_status_code_t ec)
+http_send_error (http_conn_t *hc, http_status_code_t ec)
{
http_main_t *hm = &http_main;
u8 *data;
@@ -376,12 +415,12 @@ send_error (http_conn_t *hc, http_status_code_t ec)
now = clib_timebase_now (&hm->timebase);
data = format (0, http_error_template, http_status_code_str[ec],
format_clib_timebase_time, now);
- send_data (hc, data, vec_len (data), 0);
+ http_send_data (hc, data, vec_len (data), 0);
vec_free (data);
}
static int
-read_http_message (http_conn_t *hc)
+http_read_message (http_conn_t *hc)
{
u32 max_deq, cursize;
session_t *ts;
@@ -426,11 +465,148 @@ v_find_index (u8 *vec, u32 offset, char *str)
return -1;
}
-/**
- * waiting for request method from peer - parse request method and data
- */
+static int
+http_parse_header (http_conn_t *hc, int *content_length)
+{
+ unformat_input_t input;
+ int i, len;
+ u8 *line;
+
+ i = v_find_index (hc->rx_buf, hc->rx_buf_offset, CONTENT_LEN_STR);
+ if (i < 0)
+ {
+ clib_warning ("cannot find '%s' in the header!", CONTENT_LEN_STR);
+ return -1;
+ }
+
+ hc->rx_buf_offset = i;
+
+ i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "\n");
+ if (i < 0)
+ {
+ clib_warning ("end of line missing; incomplete data");
+ return -1;
+ }
+
+ len = i - hc->rx_buf_offset;
+ line = vec_new (u8, len);
+ clib_memcpy (line, hc->rx_buf + hc->rx_buf_offset, len);
+
+ unformat_init_vector (&input, line);
+ if (!unformat (&input, CONTENT_LEN_STR "%d", content_length))
+ {
+ clib_warning ("failed to unformat content length!");
+ return -1;
+ }
+ unformat_free (&input);
+
+ /* skip rest of the header */
+ hc->rx_buf_offset += len;
+ i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "<html>");
+ if (i < 0)
+ {
+ clib_warning ("<html> tag not found");
+ return -1;
+ }
+ hc->rx_buf_offset = i;
+
+ return 0;
+}
+
static http_sm_result_t
-state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
+http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
+{
+ int i, rv, content_length;
+ http_msg_t msg = {};
+ app_worker_t *app_wrk;
+ session_t *as;
+ http_status_code_t ec;
+
+ rv = http_read_message (hc);
+
+ /* Nothing yet, wait for data or timer expire */
+ if (rv)
+ return HTTP_SM_STOP;
+
+ if (vec_len (hc->rx_buf) < 8)
+ {
+ ec = HTTP_STATUS_BAD_REQUEST;
+ goto error;
+ }
+
+ if ((i = v_find_index (hc->rx_buf, 0, "200 OK")) >= 0)
+ {
+ msg.type = HTTP_MSG_REPLY;
+ msg.content_type = HTTP_CONTENT_TEXT_HTML;
+ msg.code = HTTP_STATUS_OK;
+ msg.data.type = HTTP_MSG_DATA_INLINE;
+ msg.data.len = 0;
+
+ rv = http_parse_header (hc, &content_length);
+ if (rv)
+ {
+ clib_warning ("failed to parse http reply");
+ session_transport_closing_notify (&hc->connection);
+ http_disconnect_transport (hc);
+ return -1;
+ }
+ msg.data.len = content_length;
+ u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset;
+ as = session_get_from_handle (hc->h_pa_session_handle);
+ svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) },
+ { &hc->rx_buf[hc->rx_buf_offset], dlen } };
+
+ rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2,
+ 0 /* allow partial */);
+ if (rv < 0)
+ {
+ clib_warning ("error enqueue");
+ return HTTP_SM_ERROR;
+ }
+
+ hc->rx_buf_offset += dlen;
+ hc->to_recv = content_length - dlen;
+
+ if (hc->rx_buf_offset == vec_len (hc->rx_buf))
+ {
+ vec_reset_length (hc->rx_buf);
+ hc->rx_buf_offset = 0;
+ }
+
+ if (hc->to_recv == 0)
+ {
+ hc->rx_buf_offset = 0;
+ vec_reset_length (hc->rx_buf);
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
+ }
+ else
+ {
+ http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
+ }
+
+ app_wrk = app_worker_get_if_valid (as->app_wrk_index);
+ app_worker_rx_notify (app_wrk, as);
+ return HTTP_SM_STOP;
+ }
+ else
+ {
+ HTTP_DBG (0, "Unknown http method %v", hc->rx_buf);
+ ec = HTTP_STATUS_METHOD_NOT_ALLOWED;
+ goto error;
+ }
+ return HTTP_SM_STOP;
+
+error:
+
+ http_send_error (hc, ec);
+ session_transport_closing_notify (&hc->connection);
+ http_disconnect_transport (hc);
+
+ return HTTP_SM_ERROR;
+}
+
+static http_sm_result_t
+http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
{
http_status_code_t ec;
app_worker_t *app_wrk;
@@ -440,7 +616,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
u32 len;
u8 *buf;
- rv = read_http_message (hc);
+ rv = http_read_message (hc);
/* Nothing yet, wait for data or timer expire */
if (rv)
@@ -464,6 +640,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
goto error;
}
+ HTTP_DBG (0, "GET method %v", hc->rx_buf);
len = i - hc->rx_buf_offset - 1;
}
else if ((i = v_find_index (hc->rx_buf, 0, "POST ")) >= 0)
@@ -471,10 +648,11 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
hc->method = HTTP_REQ_POST;
hc->rx_buf_offset = i + 6;
len = vec_len (hc->rx_buf) - hc->rx_buf_offset - 1;
+ HTTP_DBG (0, "POST method %v", hc->rx_buf);
}
else
{
- HTTP_DBG (0, "Unknown http method");
+ HTTP_DBG (0, "Unknown http method %v", hc->rx_buf);
ec = HTTP_STATUS_METHOD_NOT_ALLOWED;
goto error;
}
@@ -501,7 +679,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
}
vec_free (hc->rx_buf);
- hc->http_state = HTTP_STATE_WAIT_APP;
+ http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
if (app_wrk)
@@ -511,26 +689,23 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
error:
- send_error (hc, ec);
+ http_send_error (hc, ec);
session_transport_closing_notify (&hc->connection);
http_disconnect_transport (hc);
return HTTP_SM_ERROR;
}
-/**
- * waiting for data from app
- */
static http_sm_result_t
-state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp)
+http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp)
{
http_main_t *hm = &http_main;
- http_status_code_t ec;
- http_msg_t msg;
- session_t *as;
u8 *header;
u32 offset;
f64 now;
+ session_t *as;
+ http_status_code_t sc;
+ http_msg_t msg;
int rv;
as = session_get_from_handle (hc->h_pa_session_handle);
@@ -538,21 +713,17 @@ state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp)
rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
- if (msg.type != HTTP_MSG_REPLY || msg.data.type > HTTP_MSG_DATA_PTR)
+ if (msg.data.type > HTTP_MSG_DATA_PTR)
{
- clib_warning ("unexpected msg type from app %u", msg.type);
- ec = HTTP_STATUS_INTERNAL_ERROR;
+ clib_warning ("no data");
+ sc = HTTP_STATUS_INTERNAL_ERROR;
goto error;
}
- ec = msg.code;
-
- switch (msg.code)
+ if (msg.type != HTTP_MSG_REPLY)
{
- case HTTP_STATUS_OK:
- case HTTP_STATUS_MOVED:
- break;
- default:
+ clib_warning ("unexpected message type %d", msg.type);
+ sc = HTTP_STATUS_INTERNAL_ERROR;
goto error;
}
@@ -588,221 +759,111 @@ state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp)
/* Location: http(s)://new-place already queued up as data */
break;
default:
- goto error;
+ return HTTP_SM_ERROR;
}
- offset = send_data (hc, header, vec_len (header), 0);
+ offset = http_send_data (hc, header, vec_len (header), 0);
if (offset != vec_len (header))
{
clib_warning ("couldn't send response header!");
- ec = HTTP_STATUS_INTERNAL_ERROR;
+ sc = HTTP_STATUS_INTERNAL_ERROR;
goto error;
}
vec_free (header);
/* Start sending the actual data */
- hc->http_state = HTTP_STATE_IO_MORE_DATA;
+ http_state_change (hc, HTTP_STATE_APP_IO_MORE_DATA);
ASSERT (sp->max_burst_size >= offset);
sp->max_burst_size -= offset;
-
return HTTP_SM_CONTINUE;
error:
-
- send_error (hc, ec);
- hc->http_state = HTTP_STATE_WAIT_METHOD;
+ clib_warning ("unexpected msg type from app %u", msg.type);
+ http_send_error (hc, sc);
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
session_transport_closing_notify (&hc->connection);
http_disconnect_transport (hc);
-
return HTTP_SM_STOP;
}
static http_sm_result_t
-state_srv_send_more_data (http_conn_t *hc, transport_send_params_t *sp)
+http_state_wait_app_method (http_conn_t *hc, transport_send_params_t *sp)
{
- u32 max_send = 64 << 10, n_segs;
- http_buffer_t *hb = &hc->tx_buf;
- svm_fifo_seg_t *seg;
- session_t *ts;
- int sent = 0;
-
- max_send = clib_min (max_send, sp->max_burst_size);
- ts = session_get_from_handle (hc->h_tc_session_handle);
- if ((seg = http_buffer_get_segs (hb, max_send, &n_segs)))
- sent = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
- 1 /* allow partial */);
-
- if (sent > 0)
- {
- /* Ask scheduler to notify app of deq event if needed */
- sp->bytes_dequeued += http_buffer_drain (hb, sent);
- sp->max_burst_size -= sent;
- }
-
- /* Not finished sending all data */
- if (!http_buffer_is_drained (hb))
- {
- if (sent && svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
-
- if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH)
- {
- /* Deschedule http session and wait for deq notification if
- * underlying ts tx fifo almost full */
- svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
- transport_connection_deschedule (&hc->connection);
- sp->flags |= TRANSPORT_SND_F_DESCHED;
- }
- }
- else
- {
- if (sent && svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX_FLUSH);
-
- /* Finished transaction, back to HTTP_STATE_WAIT_METHOD */
- hc->http_state = HTTP_STATE_WAIT_METHOD;
- http_buffer_free (&hc->tx_buf);
- }
-
- return HTTP_SM_STOP;
-}
-
-static int
-parse_http_header (http_conn_t *hc, int *content_length)
-{
- unformat_input_t input;
- int i, len;
- u8 *line;
-
- if ((i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "200 OK") < 0))
- {
- clib_warning ("bad response code");
- return -1;
- }
-
- i = v_find_index (hc->rx_buf, hc->rx_buf_offset, CONTENT_LEN_STR);
- if (i < 0)
- {
- clib_warning ("cannot find '%s' in the header!", CONTENT_LEN_STR);
- return -1;
- }
-
- hc->rx_buf_offset = i;
+ http_status_code_t sc;
+ http_msg_t msg;
+ session_t *as;
+ u8 *buf = 0, *request;
+ u32 offset;
+ int rv;
- i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "\n");
- if (i < 0)
- {
- clib_warning ("end of line missing; incomplete data");
- return -1;
- }
+ as = session_get_from_handle (hc->h_pa_session_handle);
- len = i - hc->rx_buf_offset;
- line = vec_new (u8, len);
- clib_memcpy (line, hc->rx_buf + hc->rx_buf_offset, len);
+ rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
- unformat_init_vector (&input, line);
- if (!unformat (&input, CONTENT_LEN_STR "%d", content_length))
+ if (msg.data.type > HTTP_MSG_DATA_PTR)
{
- clib_warning ("failed to unformat content length!");
- return -1;
+ clib_warning ("no data");
+ sc = HTTP_STATUS_INTERNAL_ERROR;
+ goto error;
}
- unformat_free (&input);
- /* skip rest of the header */
- hc->rx_buf_offset += len;
- i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "<html>");
- if (i < 0)
+ if (msg.type != HTTP_MSG_REQUEST)
{
- clib_warning ("<html> tag not found");
- return -1;
+ clib_warning ("unexpected message type %d", msg.type);
+ sc = HTTP_STATUS_INTERNAL_ERROR;
+ goto error;
}
- hc->rx_buf_offset = i;
- return 0;
-}
-
-static int
-state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
-{
- session_t *as;
- http_msg_t msg;
- app_worker_t *app_wrk;
- int rv, content_length;
-
- rv = read_http_message (hc);
- if (rv)
- return HTTP_SM_STOP;
+ sc = msg.code;
- msg.type = HTTP_MSG_REPLY;
- msg.content_type = HTTP_CONTENT_TEXT_HTML;
- msg.code = HTTP_STATUS_OK;
- msg.data.type = HTTP_MSG_DATA_INLINE;
- msg.data.len = 0;
-
- rv = parse_http_header (hc, &content_length);
- if (rv)
- {
- clib_warning ("failed to parse http reply");
- session_transport_closing_notify (&hc->connection);
- http_disconnect_transport (hc);
- return -1;
- }
-
- msg.data.len = content_length;
- u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset;
- as = session_get_from_handle (hc->h_pa_session_handle);
- svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) },
- { &hc->rx_buf[hc->rx_buf_offset], dlen } };
+ vec_validate (buf, msg.data.len - 1);
+ rv = svm_fifo_dequeue (as->tx_fifo, msg.data.len, buf);
+ ASSERT (rv == msg.data.len);
- rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */);
- if (rv < 0)
+ request = format (0, http_request_template, buf);
+ offset = http_send_data (hc, request, vec_len (request), 0);
+ if (offset != vec_len (request))
{
- clib_warning ("error enqueue");
- return HTTP_SM_ERROR;
+ clib_warning ("sending request failed!");
+ sc = HTTP_STATUS_INTERNAL_ERROR;
+ goto error;
}
- hc->rx_buf_offset += dlen;
- hc->http_state = HTTP_STATE_IO_MORE_DATA;
- hc->to_recv = content_length - dlen;
- if (hc->rx_buf_offset == vec_len (hc->rx_buf))
- {
- vec_reset_length (hc->rx_buf);
- hc->rx_buf_offset = 0;
- }
+ http_state_change (hc, HTTP_STATE_WAIT_SERVER_REPLY);
- if (hc->to_recv == 0)
- {
- hc->rx_buf_offset = 0;
- vec_reset_length (hc->rx_buf);
- hc->http_state = HTTP_STATE_WAIT_APP;
- }
+ vec_free (buf);
+ vec_free (request);
- app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- if (app_wrk)
- app_worker_rx_notify (app_wrk, as);
+ return HTTP_SM_CONTINUE;
+error:
+ clib_warning ("unexpected msg type from app %u", msg.type);
+ http_send_error (hc, sc);
+ session_transport_closing_notify (&hc->connection);
+ http_disconnect_transport (hc);
return HTTP_SM_STOP;
}
-static int
-cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
+static void
+http_app_enqueue (http_conn_t *hc, session_t *as)
{
app_worker_t *app_wrk;
- u32 max_enq, n_enq, dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset;
+ u32 dlen, max_enq, n_enq;
int rv;
+ dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset;
+ if (!dlen)
+ return;
+
max_enq = svm_fifo_max_enqueue (as->rx_fifo);
n_enq = clib_min (max_enq, dlen);
rv = svm_fifo_enqueue (as->rx_fifo, n_enq, &hc->rx_buf[hc->rx_buf_offset]);
if (rv < 0)
- {
- clib_warning ("enqueue failed");
- return -1;
- }
+ return;
hc->rx_buf_offset += rv;
-
if (hc->rx_buf_offset >= vec_len (hc->rx_buf))
{
vec_reset_length (hc->rx_buf);
@@ -811,149 +872,124 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
ASSERT (app_wrk);
-
app_worker_rx_notify (app_wrk, as);
- return 1;
}
static http_sm_result_t
-state_cln_recv_more_data (http_conn_t *hc, transport_send_params_t *sp)
+http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp)
{
- session_t *as;
+ session_t *as, *ts;
u32 max_deq;
- session_t *ts;
- int n_read, rv;
+ int n_read;
as = session_get_from_handle (hc->h_pa_session_handle);
ts = session_get_from_handle (hc->h_tc_session_handle);
- u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset;
- if (dlen)
- {
- rv = cln_drain_rx_buf (hc, ts, as);
- if (rv < 0)
- {
- clib_warning ("drain rx error!");
- return HTTP_SM_ERROR;
- }
- goto maybe_reschedule;
- }
+ http_app_enqueue (hc, as);
if (hc->to_recv == 0)
{
- ASSERT (vec_len (hc->rx_buf) == 0);
- ASSERT (hc->rx_buf_offset == 0);
- hc->http_state = HTTP_STATE_WAIT_APP;
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
return HTTP_SM_STOP;
}
max_deq = svm_fifo_max_dequeue (ts->rx_fifo);
- if (max_deq == 0)
- return HTTP_SM_STOP;
-
- ASSERT (vec_len (hc->rx_buf) == 0);
- ASSERT (hc->rx_buf_offset == 0);
+ if (max_deq > 0)
+ {
+ vec_validate (hc->rx_buf, max_deq - 1);
+ n_read = svm_fifo_dequeue (ts->rx_fifo, max_deq, hc->rx_buf);
+ ASSERT (n_read == max_deq);
- vec_validate (hc->rx_buf, max_deq - 1);
- n_read = svm_fifo_dequeue (ts->rx_fifo, max_deq, hc->rx_buf);
- ASSERT (n_read == max_deq);
+ if (svm_fifo_is_empty (ts->rx_fifo))
+ svm_fifo_unset_event (ts->rx_fifo);
- if (svm_fifo_is_empty (ts->rx_fifo))
- svm_fifo_unset_event (ts->rx_fifo);
-
- hc->to_recv -= n_read;
- vec_set_len (hc->rx_buf, max_deq);
+ hc->to_recv -= n_read;
+ vec_set_len (hc->rx_buf, n_read);
+ }
-maybe_reschedule:
if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
svm_fifo_max_dequeue_cons (ts->rx_fifo))
{
- /* TODO is the flag really needed? */
- if (svm_fifo_set_event (ts->rx_fifo))
session_enqueue_notify (ts);
}
return HTTP_SM_CONTINUE;
}
static http_sm_result_t
-state_cln_wait_app (http_conn_t *hc, transport_send_params_t *sp)
+http_state_app_io_more_data (http_conn_t *hc, transport_send_params_t *sp)
{
- session_t *as;
- http_msg_t msg;
- http_status_code_t ec;
- u8 *buf = 0, *request;
- u32 offset;
- int rv;
-
- as = session_get_from_handle (hc->h_pa_session_handle);
- rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg);
- ASSERT (rv == sizeof (msg));
- if (msg.type != HTTP_MSG_REQUEST || msg.data.type > HTTP_MSG_DATA_PTR)
- {
- clib_warning ("unexpected msg type from app %u", msg.type);
- ec = HTTP_STATUS_INTERNAL_ERROR;
- goto error;
- }
+ u32 max_send = 64 << 10, n_segs;
+ http_buffer_t *hb = &hc->tx_buf;
+ svm_fifo_seg_t *seg;
+ session_t *ts;
+ int sent = 0;
- vec_validate (buf, msg.data.len - 1);
- rv = svm_fifo_dequeue (as->tx_fifo, msg.data.len, buf);
- ASSERT (rv == msg.data.len);
+ max_send = clib_min (max_send, sp->max_burst_size);
+ ts = session_get_from_handle (hc->h_tc_session_handle);
+ if ((seg = http_buffer_get_segs (hb, max_send, &n_segs)))
+ sent = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
+ 1 /* allow partial */);
- request = format (0, http_request_template, buf);
- offset = send_data (hc, request, vec_len (request), 0);
- if (offset != vec_len (request))
+ if (sent > 0)
{
- clib_warning ("sending request failed!");
- ec = HTTP_STATUS_INTERNAL_ERROR;
- goto error;
+ /* Ask scheduler to notify app of deq event if needed */
+ sp->bytes_dequeued += http_buffer_drain (hb, sent);
+ sp->max_burst_size -= sent;
}
- hc->http_state = HTTP_STATE_WAIT_METHOD;
+ /* Not finished sending all data */
+ if (!http_buffer_is_drained (hb))
+ {
+ if (sent && svm_fifo_set_event (ts->tx_fifo))
+ session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
- vec_free (buf);
- vec_free (request);
+ if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH)
+ {
+ /* Deschedule http session and wait for deq notification if
+ * underlying ts tx fifo almost full */
+ svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ transport_connection_deschedule (&hc->connection);
+ sp->flags |= TRANSPORT_SND_F_DESCHED;
+ }
+ }
+ else
+ {
+ if (sent && svm_fifo_set_event (ts->tx_fifo))
+ session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX_FLUSH);
- return HTTP_SM_CONTINUE;
+ /* Finished transaction, back to HTTP_STATE_WAIT_METHOD */
+ http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD);
+ http_buffer_free (&hc->tx_buf);
+ }
-error:
- send_error (hc, ec);
- session_transport_closing_notify (&hc->connection);
- http_disconnect_transport (hc);
return HTTP_SM_STOP;
}
typedef http_sm_result_t (*http_sm_handler) (http_conn_t *,
transport_send_params_t *sp);
-static http_sm_handler srv_state_funcs[HTTP_N_STATES] = {
- /* Waiting for GET, POST, etc. */
- state_srv_wait_method,
- /* Wait for data from app */
- state_srv_wait_app,
- /* Send more data */
- state_srv_send_more_data,
-};
-
-static http_sm_handler cln_state_funcs[HTTP_N_STATES] = {
- /* wait for reply */
- state_cln_wait_method,
- /* wait for data from app */
- state_cln_wait_app,
- /* receive more data */
- state_cln_recv_more_data,
+static http_sm_handler state_funcs[HTTP_N_STATES] = {
+ 0, /* idle state */
+ http_state_wait_app_method,
+ http_state_wait_client_method,
+ http_state_wait_server_reply,
+ http_state_wait_app_reply,
+ http_state_client_io_more_data,
+ http_state_app_io_more_data,
};
static void
http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp)
{
http_sm_result_t res;
- http_sm_handler *state_fn =
- hc->is_client ? cln_state_funcs : srv_state_funcs;
do
{
- res = state_fn[hc->http_state](hc, sp);
+ res = state_funcs[hc->http_state](hc, sp);
if (res == HTTP_SM_ERROR)
- return;
+ {
+ HTTP_DBG (1, "error in state machine %d", res);
+ return;
+ }
}
while (res == HTTP_SM_CONTINUE);
@@ -962,33 +998,15 @@ http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp)
}
static int
-http_ts_server_rx_callback (session_t *ts, http_conn_t *hc)
+http_ts_rx_callback (session_t *ts)
{
- if (hc->http_state != HTTP_STATE_WAIT_METHOD)
- {
- clib_warning ("tcp data in req state %u", hc->http_state);
- return 0;
- }
-
- http_req_run_state_machine (hc, 0);
-
- if (hc->state == HTTP_CONN_STATE_TRANSPORT_CLOSED)
- {
- if (!svm_fifo_max_dequeue_cons (ts->rx_fifo))
- session_transport_closing_notify (&hc->connection);
- }
- return 0;
-}
+ http_conn_t *hc;
-static int
-http_ts_client_rx_callback (session_t *ts, http_conn_t *hc)
-{
- if (hc->http_state != HTTP_STATE_WAIT_METHOD &&
- hc->http_state != HTTP_STATE_IO_MORE_DATA)
+ hc = http_conn_get_w_thread (ts->opaque, ts->thread_index);
+ if (!hc)
{
- clib_warning ("http in unexpected state %d (ts %d)", hc->http_state,
- ts->session_index);
- return 0;
+ clib_warning ("http connection not found (ts %d)", ts->opaque);
+ return -1;
}
http_req_run_state_machine (hc, 0);
@@ -1001,17 +1019,6 @@ http_ts_client_rx_callback (session_t *ts, http_conn_t *hc)
return 0;
}
-static int
-http_ts_rx_callback (session_t *ts)
-{
- http_conn_t *hc;
-
- hc = http_conn_get_w_thread (ts->opaque, ts->thread_index);
- if (hc->is_client)
- return http_ts_client_rx_callback (ts, hc);
- return http_ts_server_rx_callback (ts, hc);
-}
-
int
http_ts_builtin_tx_callback (session_t *ts)
{
@@ -1142,10 +1149,11 @@ http_transport_connect (transport_endpoint_cfg_t *tep)
hc = http_conn_get_w_thread (hc_index, 0);
hc->h_pa_wrk_index = sep->app_wrk_index;
hc->h_pa_app_api_ctx = sep->opaque;
- hc->is_client = 1;
hc->state = HTTP_CONN_STATE_CONNECTING;
cargs->api_context = hc_index;
+ HTTP_DBG (1, "hc ho_index %x", hc_index);
+
if ((error = vnet_connect (cargs)))
return error;
@@ -1273,12 +1281,14 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
u32 max_burst_sz, sent;
http_conn_t *hc;
+ HTTP_DBG (1, "app session conn index %x", as->connection_index);
+
hc = http_conn_get_w_thread (as->connection_index, as->thread_index);
- if (hc->http_state < HTTP_STATE_WAIT_APP)
+ if (!http_state_is_tx_valid (hc))
{
if (hc->state != HTTP_CONN_STATE_CLOSED)
- clib_warning ("app data req state %u session state %u", hc->http_state,
- hc->state);
+ clib_warning ("app data req state '%U' session state %u",
+ format_http_state, hc->http_state, hc->state);
svm_fifo_dequeue_drop_all (as->tx_fifo);
return 0;
}
diff --git a/src/plugins/http/http.h b/src/plugins/http/http.h
index 50203dcd2f9..dbae5ac4611 100644
--- a/src/plugins/http/http.h
+++ b/src/plugins/http/http.h
@@ -61,9 +61,13 @@ typedef enum http_conn_state_
typedef enum http_state_
{
- HTTP_STATE_WAIT_METHOD,
- HTTP_STATE_WAIT_APP,
- HTTP_STATE_IO_MORE_DATA,
+ HTTP_STATE_IDLE = 0,
+ HTTP_STATE_WAIT_APP_METHOD,
+ HTTP_STATE_WAIT_CLIENT_METHOD,
+ HTTP_STATE_WAIT_SERVER_REPLY,
+ HTTP_STATE_WAIT_APP_REPLY,
+ HTTP_STATE_CLIENT_IO_MORE_DATA,
+ HTTP_STATE_APP_IO_MORE_DATA,
HTTP_N_STATES,
} http_state_t;
@@ -232,7 +236,6 @@ typedef struct http_tc_
u8 *rx_buf;
u32 rx_buf_offset;
http_buffer_t tx_buf;
- u8 is_client;
u32 to_recv;
u32 bytes_dequeued;
} http_conn_t;
@@ -263,6 +266,16 @@ typedef struct http_main_
u32 fifo_size;
} http_main_t;
+static inline int
+http_state_is_tx_valid (http_conn_t *hc)
+{
+ http_state_t state = hc->http_state;
+ return (state == HTTP_STATE_APP_IO_MORE_DATA ||
+ state == HTTP_STATE_CLIENT_IO_MORE_DATA ||
+ state == HTTP_STATE_WAIT_APP_REPLY ||
+ state == HTTP_STATE_WAIT_APP_METHOD);
+}
+
#endif /* SRC_PLUGINS_HTTP_HTTP_H_ */
/*