From b1ea30e5639adb4290df2bfb493729cc0d5f3b70 Mon Sep 17 00:00:00 2001 From: Filip Tehlar Date: Mon, 30 Oct 2023 08:21:36 +0100 Subject: http: unify client/server state machines Type: improvement Change-Id: I57a816fbed8b681dec201edc8d5950a34a555a2b Signed-off-by: Filip Tehlar --- src/plugins/hs_apps/http_client_cli.c | 15 +- src/plugins/http/http.c | 656 +++++++++++++++++----------------- src/plugins/http/http.h | 21 +- 3 files changed, 363 insertions(+), 329 deletions(-) diff --git a/src/plugins/hs_apps/http_client_cli.c b/src/plugins/hs_apps/http_client_cli.c index 113bc2ef750..f44d4e1bcd1 100644 --- a/src/plugins/hs_apps/http_client_cli.c +++ b/src/plugins/hs_apps/http_client_cli.c @@ -19,6 +19,14 @@ #include #include +#define HCC_DEBUG 0 + +#if HCC_DEBUG +#define HCC_DBG(_fmt, _args...) clib_warning (_fmt, ##_args) +#else +#define HCC_DBG(_fmt, _args...) +#endif + typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -122,6 +130,8 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as, http_msg_t msg; int rv; + HCC_DBG ("hc_index: %d", hc_index); + if (err) { clib_warning ("connected error: hc_index(%d): %U", hc_index, @@ -207,7 +217,7 @@ hcc_ts_rx_callback (session_t *ts) return 0; } - if (!hs->to_recv) + if (hs->to_recv == 0) { rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); ASSERT (rv == sizeof (msg)); @@ -229,7 +239,7 @@ hcc_ts_rx_callback (session_t *ts) rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hcm->http_response + curr); if (rv < 0) { - clib_warning ("app dequeue failed"); + clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv); return -1; } @@ -239,6 +249,7 @@ hcc_ts_rx_callback (session_t *ts) vec_set_len (hcm->http_response, curr + n_deq); ASSERT (hs->to_recv >= rv); hs->to_recv -= rv; + HCC_DBG ("app rcvd %d, remains %d", rv, hs->to_recv); if (hs->to_recv == 0) { diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index d404e734d48..4797308666e 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -47,6 +47,42 @@ const http_buffer_type_t msg_to_buf_type[] = { [HTTP_MSG_DATA_PTR] = HTTP_BUFFER_PTR, }; +u8 * +format_http_state (u8 *s, va_list *va) +{ + http_state_t state = va_arg (*va, http_state_t); + + switch (state) + { + case HTTP_STATE_IDLE: + return format (s, "idle"); + case HTTP_STATE_WAIT_APP_METHOD: + return format (s, "wait app method"); + case HTTP_STATE_WAIT_SERVER_REPLY: + return format (s, "wait server reply"); + case HTTP_STATE_CLIENT_IO_MORE_DATA: + return format (s, "client io more data"); + case HTTP_STATE_WAIT_CLIENT_METHOD: + return format (s, "wait client method"); + case HTTP_STATE_WAIT_APP_REPLY: + return format (s, "wait app reply"); + case HTTP_STATE_APP_IO_MORE_DATA: + return format (s, "app io more data"); + default: + break; + } + return format (s, "unknown"); +} + +static inline void +http_state_change (http_conn_t *hc, http_state_t state) +{ + HTTP_DBG (1, "changing http state %U -> %U", format_http_state, + hc->http_state, format_http_state, state); + ASSERT (hc->http_state != state); + hc->http_state = state; +} + static inline http_worker_t * http_worker_get (u32 thread_index) { @@ -164,7 +200,7 @@ http_ts_accept_callback (session_t *ts) hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP; hc->state = HTTP_CONN_STATE_ESTABLISHED; - hc->http_state = HTTP_STATE_WAIT_METHOD; + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); ts->session_state = SESSION_STATE_READY; ts->opaque = hc_index; @@ -249,7 +285,7 @@ http_ts_connected_callback (u32 http_app_index, u32 ho_hc_index, session_t *ts, hc->c_c_index = new_hc_index; hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP; hc->state = HTTP_CONN_STATE_ESTABLISHED; - hc->http_state = HTTP_STATE_WAIT_APP; + http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD); ts->session_state = SESSION_STATE_READY; ts->opaque = new_hc_index; @@ -265,6 +301,9 @@ http_ts_connected_callback (u32 http_app_index, u32 ho_hc_index, session_t *ts, as->session_type = session_type_from_proto_and_ip ( TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type)); + HTTP_DBG (1, "half-open hc index %d, hc index %d", ho_hc_index, + new_hc_index); + app_wrk = app_worker_get (hc->h_pa_wrk_index); if (!app_wrk) { @@ -309,7 +348,7 @@ http_ts_reset_callback (session_t *ts) hc->state = HTTP_CONN_STATE_CLOSED; http_buffer_free (&hc->tx_buf); - hc->http_state = HTTP_STATE_WAIT_METHOD; + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); session_transport_reset_notify (&hc->connection); http_disconnect_transport (hc); @@ -342,7 +381,7 @@ static const char *http_request_template = "GET %s HTTP/1.1\r\n" "Accept: */*\r\n"; static u32 -send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset) +http_send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset) { const u32 max_burst = 64 << 10; session_t *ts; @@ -364,7 +403,7 @@ send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset) } static void -send_error (http_conn_t *hc, http_status_code_t ec) +http_send_error (http_conn_t *hc, http_status_code_t ec) { http_main_t *hm = &http_main; u8 *data; @@ -376,12 +415,12 @@ send_error (http_conn_t *hc, http_status_code_t ec) now = clib_timebase_now (&hm->timebase); data = format (0, http_error_template, http_status_code_str[ec], format_clib_timebase_time, now); - send_data (hc, data, vec_len (data), 0); + http_send_data (hc, data, vec_len (data), 0); vec_free (data); } static int -read_http_message (http_conn_t *hc) +http_read_message (http_conn_t *hc) { u32 max_deq, cursize; session_t *ts; @@ -426,11 +465,148 @@ v_find_index (u8 *vec, u32 offset, char *str) return -1; } -/** - * waiting for request method from peer - parse request method and data - */ +static int +http_parse_header (http_conn_t *hc, int *content_length) +{ + unformat_input_t input; + int i, len; + u8 *line; + + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, CONTENT_LEN_STR); + if (i < 0) + { + clib_warning ("cannot find '%s' in the header!", CONTENT_LEN_STR); + return -1; + } + + hc->rx_buf_offset = i; + + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "\n"); + if (i < 0) + { + clib_warning ("end of line missing; incomplete data"); + return -1; + } + + len = i - hc->rx_buf_offset; + line = vec_new (u8, len); + clib_memcpy (line, hc->rx_buf + hc->rx_buf_offset, len); + + unformat_init_vector (&input, line); + if (!unformat (&input, CONTENT_LEN_STR "%d", content_length)) + { + clib_warning ("failed to unformat content length!"); + return -1; + } + unformat_free (&input); + + /* skip rest of the header */ + hc->rx_buf_offset += len; + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, ""); + if (i < 0) + { + clib_warning (" tag not found"); + return -1; + } + hc->rx_buf_offset = i; + + return 0; +} + static http_sm_result_t -state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) +http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp) +{ + int i, rv, content_length; + http_msg_t msg = {}; + app_worker_t *app_wrk; + session_t *as; + http_status_code_t ec; + + rv = http_read_message (hc); + + /* Nothing yet, wait for data or timer expire */ + if (rv) + return HTTP_SM_STOP; + + if (vec_len (hc->rx_buf) < 8) + { + ec = HTTP_STATUS_BAD_REQUEST; + goto error; + } + + if ((i = v_find_index (hc->rx_buf, 0, "200 OK")) >= 0) + { + msg.type = HTTP_MSG_REPLY; + msg.content_type = HTTP_CONTENT_TEXT_HTML; + msg.code = HTTP_STATUS_OK; + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.len = 0; + + rv = http_parse_header (hc, &content_length); + if (rv) + { + clib_warning ("failed to parse http reply"); + session_transport_closing_notify (&hc->connection); + http_disconnect_transport (hc); + return -1; + } + msg.data.len = content_length; + u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + as = session_get_from_handle (hc->h_pa_session_handle); + svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) }, + { &hc->rx_buf[hc->rx_buf_offset], dlen } }; + + rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, + 0 /* allow partial */); + if (rv < 0) + { + clib_warning ("error enqueue"); + return HTTP_SM_ERROR; + } + + hc->rx_buf_offset += dlen; + hc->to_recv = content_length - dlen; + + if (hc->rx_buf_offset == vec_len (hc->rx_buf)) + { + vec_reset_length (hc->rx_buf); + hc->rx_buf_offset = 0; + } + + if (hc->to_recv == 0) + { + hc->rx_buf_offset = 0; + vec_reset_length (hc->rx_buf); + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); + } + else + { + http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA); + } + + app_wrk = app_worker_get_if_valid (as->app_wrk_index); + app_worker_rx_notify (app_wrk, as); + return HTTP_SM_STOP; + } + else + { + HTTP_DBG (0, "Unknown http method %v", hc->rx_buf); + ec = HTTP_STATUS_METHOD_NOT_ALLOWED; + goto error; + } + return HTTP_SM_STOP; + +error: + + http_send_error (hc, ec); + session_transport_closing_notify (&hc->connection); + http_disconnect_transport (hc); + + return HTTP_SM_ERROR; +} + +static http_sm_result_t +http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp) { http_status_code_t ec; app_worker_t *app_wrk; @@ -440,7 +616,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) u32 len; u8 *buf; - rv = read_http_message (hc); + rv = http_read_message (hc); /* Nothing yet, wait for data or timer expire */ if (rv) @@ -464,6 +640,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) goto error; } + HTTP_DBG (0, "GET method %v", hc->rx_buf); len = i - hc->rx_buf_offset - 1; } else if ((i = v_find_index (hc->rx_buf, 0, "POST ")) >= 0) @@ -471,10 +648,11 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) hc->method = HTTP_REQ_POST; hc->rx_buf_offset = i + 6; len = vec_len (hc->rx_buf) - hc->rx_buf_offset - 1; + HTTP_DBG (0, "POST method %v", hc->rx_buf); } else { - HTTP_DBG (0, "Unknown http method"); + HTTP_DBG (0, "Unknown http method %v", hc->rx_buf); ec = HTTP_STATUS_METHOD_NOT_ALLOWED; goto error; } @@ -501,7 +679,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) } vec_free (hc->rx_buf); - hc->http_state = HTTP_STATE_WAIT_APP; + http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY); app_wrk = app_worker_get_if_valid (as->app_wrk_index); if (app_wrk) @@ -511,26 +689,23 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) error: - send_error (hc, ec); + http_send_error (hc, ec); session_transport_closing_notify (&hc->connection); http_disconnect_transport (hc); return HTTP_SM_ERROR; } -/** - * waiting for data from app - */ static http_sm_result_t -state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp) +http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp) { http_main_t *hm = &http_main; - http_status_code_t ec; - http_msg_t msg; - session_t *as; u8 *header; u32 offset; f64 now; + session_t *as; + http_status_code_t sc; + http_msg_t msg; int rv; as = session_get_from_handle (hc->h_pa_session_handle); @@ -538,21 +713,17 @@ state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp) rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg); ASSERT (rv == sizeof (msg)); - if (msg.type != HTTP_MSG_REPLY || msg.data.type > HTTP_MSG_DATA_PTR) + if (msg.data.type > HTTP_MSG_DATA_PTR) { - clib_warning ("unexpected msg type from app %u", msg.type); - ec = HTTP_STATUS_INTERNAL_ERROR; + clib_warning ("no data"); + sc = HTTP_STATUS_INTERNAL_ERROR; goto error; } - ec = msg.code; - - switch (msg.code) + if (msg.type != HTTP_MSG_REPLY) { - case HTTP_STATUS_OK: - case HTTP_STATUS_MOVED: - break; - default: + clib_warning ("unexpected message type %d", msg.type); + sc = HTTP_STATUS_INTERNAL_ERROR; goto error; } @@ -588,221 +759,111 @@ state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp) /* Location: http(s)://new-place already queued up as data */ break; default: - goto error; + return HTTP_SM_ERROR; } - offset = send_data (hc, header, vec_len (header), 0); + offset = http_send_data (hc, header, vec_len (header), 0); if (offset != vec_len (header)) { clib_warning ("couldn't send response header!"); - ec = HTTP_STATUS_INTERNAL_ERROR; + sc = HTTP_STATUS_INTERNAL_ERROR; goto error; } vec_free (header); /* Start sending the actual data */ - hc->http_state = HTTP_STATE_IO_MORE_DATA; + http_state_change (hc, HTTP_STATE_APP_IO_MORE_DATA); ASSERT (sp->max_burst_size >= offset); sp->max_burst_size -= offset; - return HTTP_SM_CONTINUE; error: - - send_error (hc, ec); - hc->http_state = HTTP_STATE_WAIT_METHOD; + clib_warning ("unexpected msg type from app %u", msg.type); + http_send_error (hc, sc); + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); session_transport_closing_notify (&hc->connection); http_disconnect_transport (hc); - return HTTP_SM_STOP; } static http_sm_result_t -state_srv_send_more_data (http_conn_t *hc, transport_send_params_t *sp) +http_state_wait_app_method (http_conn_t *hc, transport_send_params_t *sp) { - u32 max_send = 64 << 10, n_segs; - http_buffer_t *hb = &hc->tx_buf; - svm_fifo_seg_t *seg; - session_t *ts; - int sent = 0; - - max_send = clib_min (max_send, sp->max_burst_size); - ts = session_get_from_handle (hc->h_tc_session_handle); - if ((seg = http_buffer_get_segs (hb, max_send, &n_segs))) - sent = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs, - 1 /* allow partial */); - - if (sent > 0) - { - /* Ask scheduler to notify app of deq event if needed */ - sp->bytes_dequeued += http_buffer_drain (hb, sent); - sp->max_burst_size -= sent; - } - - /* Not finished sending all data */ - if (!http_buffer_is_drained (hb)) - { - if (sent && svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); - - if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH) - { - /* Deschedule http session and wait for deq notification if - * underlying ts tx fifo almost full */ - svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); - transport_connection_deschedule (&hc->connection); - sp->flags |= TRANSPORT_SND_F_DESCHED; - } - } - else - { - if (sent && svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX_FLUSH); - - /* Finished transaction, back to HTTP_STATE_WAIT_METHOD */ - hc->http_state = HTTP_STATE_WAIT_METHOD; - http_buffer_free (&hc->tx_buf); - } - - return HTTP_SM_STOP; -} - -static int -parse_http_header (http_conn_t *hc, int *content_length) -{ - unformat_input_t input; - int i, len; - u8 *line; - - if ((i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "200 OK") < 0)) - { - clib_warning ("bad response code"); - return -1; - } - - i = v_find_index (hc->rx_buf, hc->rx_buf_offset, CONTENT_LEN_STR); - if (i < 0) - { - clib_warning ("cannot find '%s' in the header!", CONTENT_LEN_STR); - return -1; - } - - hc->rx_buf_offset = i; + http_status_code_t sc; + http_msg_t msg; + session_t *as; + u8 *buf = 0, *request; + u32 offset; + int rv; - i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "\n"); - if (i < 0) - { - clib_warning ("end of line missing; incomplete data"); - return -1; - } + as = session_get_from_handle (hc->h_pa_session_handle); - len = i - hc->rx_buf_offset; - line = vec_new (u8, len); - clib_memcpy (line, hc->rx_buf + hc->rx_buf_offset, len); + rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); - unformat_init_vector (&input, line); - if (!unformat (&input, CONTENT_LEN_STR "%d", content_length)) + if (msg.data.type > HTTP_MSG_DATA_PTR) { - clib_warning ("failed to unformat content length!"); - return -1; + clib_warning ("no data"); + sc = HTTP_STATUS_INTERNAL_ERROR; + goto error; } - unformat_free (&input); - /* skip rest of the header */ - hc->rx_buf_offset += len; - i = v_find_index (hc->rx_buf, hc->rx_buf_offset, ""); - if (i < 0) + if (msg.type != HTTP_MSG_REQUEST) { - clib_warning (" tag not found"); - return -1; + clib_warning ("unexpected message type %d", msg.type); + sc = HTTP_STATUS_INTERNAL_ERROR; + goto error; } - hc->rx_buf_offset = i; - return 0; -} - -static int -state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp) -{ - session_t *as; - http_msg_t msg; - app_worker_t *app_wrk; - int rv, content_length; - - rv = read_http_message (hc); - if (rv) - return HTTP_SM_STOP; + sc = msg.code; - msg.type = HTTP_MSG_REPLY; - msg.content_type = HTTP_CONTENT_TEXT_HTML; - msg.code = HTTP_STATUS_OK; - msg.data.type = HTTP_MSG_DATA_INLINE; - msg.data.len = 0; - - rv = parse_http_header (hc, &content_length); - if (rv) - { - clib_warning ("failed to parse http reply"); - session_transport_closing_notify (&hc->connection); - http_disconnect_transport (hc); - return -1; - } - - msg.data.len = content_length; - u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; - as = session_get_from_handle (hc->h_pa_session_handle); - svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) }, - { &hc->rx_buf[hc->rx_buf_offset], dlen } }; + vec_validate (buf, msg.data.len - 1); + rv = svm_fifo_dequeue (as->tx_fifo, msg.data.len, buf); + ASSERT (rv == msg.data.len); - rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */); - if (rv < 0) + request = format (0, http_request_template, buf); + offset = http_send_data (hc, request, vec_len (request), 0); + if (offset != vec_len (request)) { - clib_warning ("error enqueue"); - return HTTP_SM_ERROR; + clib_warning ("sending request failed!"); + sc = HTTP_STATUS_INTERNAL_ERROR; + goto error; } - hc->rx_buf_offset += dlen; - hc->http_state = HTTP_STATE_IO_MORE_DATA; - hc->to_recv = content_length - dlen; - if (hc->rx_buf_offset == vec_len (hc->rx_buf)) - { - vec_reset_length (hc->rx_buf); - hc->rx_buf_offset = 0; - } + http_state_change (hc, HTTP_STATE_WAIT_SERVER_REPLY); - if (hc->to_recv == 0) - { - hc->rx_buf_offset = 0; - vec_reset_length (hc->rx_buf); - hc->http_state = HTTP_STATE_WAIT_APP; - } + vec_free (buf); + vec_free (request); - app_wrk = app_worker_get_if_valid (as->app_wrk_index); - if (app_wrk) - app_worker_rx_notify (app_wrk, as); + return HTTP_SM_CONTINUE; +error: + clib_warning ("unexpected msg type from app %u", msg.type); + http_send_error (hc, sc); + session_transport_closing_notify (&hc->connection); + http_disconnect_transport (hc); return HTTP_SM_STOP; } -static int -cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as) +static void +http_app_enqueue (http_conn_t *hc, session_t *as) { app_worker_t *app_wrk; - u32 max_enq, n_enq, dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + u32 dlen, max_enq, n_enq; int rv; + dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + if (!dlen) + return; + max_enq = svm_fifo_max_enqueue (as->rx_fifo); n_enq = clib_min (max_enq, dlen); rv = svm_fifo_enqueue (as->rx_fifo, n_enq, &hc->rx_buf[hc->rx_buf_offset]); if (rv < 0) - { - clib_warning ("enqueue failed"); - return -1; - } + return; hc->rx_buf_offset += rv; - if (hc->rx_buf_offset >= vec_len (hc->rx_buf)) { vec_reset_length (hc->rx_buf); @@ -811,149 +872,124 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as) app_wrk = app_worker_get_if_valid (as->app_wrk_index); ASSERT (app_wrk); - app_worker_rx_notify (app_wrk, as); - return 1; } static http_sm_result_t -state_cln_recv_more_data (http_conn_t *hc, transport_send_params_t *sp) +http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp) { - session_t *as; + session_t *as, *ts; u32 max_deq; - session_t *ts; - int n_read, rv; + int n_read; as = session_get_from_handle (hc->h_pa_session_handle); ts = session_get_from_handle (hc->h_tc_session_handle); - u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; - if (dlen) - { - rv = cln_drain_rx_buf (hc, ts, as); - if (rv < 0) - { - clib_warning ("drain rx error!"); - return HTTP_SM_ERROR; - } - goto maybe_reschedule; - } + http_app_enqueue (hc, as); if (hc->to_recv == 0) { - ASSERT (vec_len (hc->rx_buf) == 0); - ASSERT (hc->rx_buf_offset == 0); - hc->http_state = HTTP_STATE_WAIT_APP; + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); return HTTP_SM_STOP; } max_deq = svm_fifo_max_dequeue (ts->rx_fifo); - if (max_deq == 0) - return HTTP_SM_STOP; - - ASSERT (vec_len (hc->rx_buf) == 0); - ASSERT (hc->rx_buf_offset == 0); + if (max_deq > 0) + { + vec_validate (hc->rx_buf, max_deq - 1); + n_read = svm_fifo_dequeue (ts->rx_fifo, max_deq, hc->rx_buf); + ASSERT (n_read == max_deq); - vec_validate (hc->rx_buf, max_deq - 1); - n_read = svm_fifo_dequeue (ts->rx_fifo, max_deq, hc->rx_buf); - ASSERT (n_read == max_deq); + if (svm_fifo_is_empty (ts->rx_fifo)) + svm_fifo_unset_event (ts->rx_fifo); - if (svm_fifo_is_empty (ts->rx_fifo)) - svm_fifo_unset_event (ts->rx_fifo); - - hc->to_recv -= n_read; - vec_set_len (hc->rx_buf, max_deq); + hc->to_recv -= n_read; + vec_set_len (hc->rx_buf, n_read); + } -maybe_reschedule: if (hc->rx_buf_offset < vec_len (hc->rx_buf) || svm_fifo_max_dequeue_cons (ts->rx_fifo)) { - /* TODO is the flag really needed? */ - if (svm_fifo_set_event (ts->rx_fifo)) session_enqueue_notify (ts); } return HTTP_SM_CONTINUE; } static http_sm_result_t -state_cln_wait_app (http_conn_t *hc, transport_send_params_t *sp) +http_state_app_io_more_data (http_conn_t *hc, transport_send_params_t *sp) { - session_t *as; - http_msg_t msg; - http_status_code_t ec; - u8 *buf = 0, *request; - u32 offset; - int rv; - - as = session_get_from_handle (hc->h_pa_session_handle); - rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg); - ASSERT (rv == sizeof (msg)); - if (msg.type != HTTP_MSG_REQUEST || msg.data.type > HTTP_MSG_DATA_PTR) - { - clib_warning ("unexpected msg type from app %u", msg.type); - ec = HTTP_STATUS_INTERNAL_ERROR; - goto error; - } + u32 max_send = 64 << 10, n_segs; + http_buffer_t *hb = &hc->tx_buf; + svm_fifo_seg_t *seg; + session_t *ts; + int sent = 0; - vec_validate (buf, msg.data.len - 1); - rv = svm_fifo_dequeue (as->tx_fifo, msg.data.len, buf); - ASSERT (rv == msg.data.len); + max_send = clib_min (max_send, sp->max_burst_size); + ts = session_get_from_handle (hc->h_tc_session_handle); + if ((seg = http_buffer_get_segs (hb, max_send, &n_segs))) + sent = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs, + 1 /* allow partial */); - request = format (0, http_request_template, buf); - offset = send_data (hc, request, vec_len (request), 0); - if (offset != vec_len (request)) + if (sent > 0) { - clib_warning ("sending request failed!"); - ec = HTTP_STATUS_INTERNAL_ERROR; - goto error; + /* Ask scheduler to notify app of deq event if needed */ + sp->bytes_dequeued += http_buffer_drain (hb, sent); + sp->max_burst_size -= sent; } - hc->http_state = HTTP_STATE_WAIT_METHOD; + /* Not finished sending all data */ + if (!http_buffer_is_drained (hb)) + { + if (sent && svm_fifo_set_event (ts->tx_fifo)) + session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); - vec_free (buf); - vec_free (request); + if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH) + { + /* Deschedule http session and wait for deq notification if + * underlying ts tx fifo almost full */ + svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + transport_connection_deschedule (&hc->connection); + sp->flags |= TRANSPORT_SND_F_DESCHED; + } + } + else + { + if (sent && svm_fifo_set_event (ts->tx_fifo)) + session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX_FLUSH); - return HTTP_SM_CONTINUE; + /* Finished transaction, back to HTTP_STATE_WAIT_METHOD */ + http_state_change (hc, HTTP_STATE_WAIT_CLIENT_METHOD); + http_buffer_free (&hc->tx_buf); + } -error: - send_error (hc, ec); - session_transport_closing_notify (&hc->connection); - http_disconnect_transport (hc); return HTTP_SM_STOP; } typedef http_sm_result_t (*http_sm_handler) (http_conn_t *, transport_send_params_t *sp); -static http_sm_handler srv_state_funcs[HTTP_N_STATES] = { - /* Waiting for GET, POST, etc. */ - state_srv_wait_method, - /* Wait for data from app */ - state_srv_wait_app, - /* Send more data */ - state_srv_send_more_data, -}; - -static http_sm_handler cln_state_funcs[HTTP_N_STATES] = { - /* wait for reply */ - state_cln_wait_method, - /* wait for data from app */ - state_cln_wait_app, - /* receive more data */ - state_cln_recv_more_data, +static http_sm_handler state_funcs[HTTP_N_STATES] = { + 0, /* idle state */ + http_state_wait_app_method, + http_state_wait_client_method, + http_state_wait_server_reply, + http_state_wait_app_reply, + http_state_client_io_more_data, + http_state_app_io_more_data, }; static void http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp) { http_sm_result_t res; - http_sm_handler *state_fn = - hc->is_client ? cln_state_funcs : srv_state_funcs; do { - res = state_fn[hc->http_state](hc, sp); + res = state_funcs[hc->http_state](hc, sp); if (res == HTTP_SM_ERROR) - return; + { + HTTP_DBG (1, "error in state machine %d", res); + return; + } } while (res == HTTP_SM_CONTINUE); @@ -962,33 +998,15 @@ http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp) } static int -http_ts_server_rx_callback (session_t *ts, http_conn_t *hc) +http_ts_rx_callback (session_t *ts) { - if (hc->http_state != HTTP_STATE_WAIT_METHOD) - { - clib_warning ("tcp data in req state %u", hc->http_state); - return 0; - } - - http_req_run_state_machine (hc, 0); - - if (hc->state == HTTP_CONN_STATE_TRANSPORT_CLOSED) - { - if (!svm_fifo_max_dequeue_cons (ts->rx_fifo)) - session_transport_closing_notify (&hc->connection); - } - return 0; -} + http_conn_t *hc; -static int -http_ts_client_rx_callback (session_t *ts, http_conn_t *hc) -{ - if (hc->http_state != HTTP_STATE_WAIT_METHOD && - hc->http_state != HTTP_STATE_IO_MORE_DATA) + hc = http_conn_get_w_thread (ts->opaque, ts->thread_index); + if (!hc) { - clib_warning ("http in unexpected state %d (ts %d)", hc->http_state, - ts->session_index); - return 0; + clib_warning ("http connection not found (ts %d)", ts->opaque); + return -1; } http_req_run_state_machine (hc, 0); @@ -1001,17 +1019,6 @@ http_ts_client_rx_callback (session_t *ts, http_conn_t *hc) return 0; } -static int -http_ts_rx_callback (session_t *ts) -{ - http_conn_t *hc; - - hc = http_conn_get_w_thread (ts->opaque, ts->thread_index); - if (hc->is_client) - return http_ts_client_rx_callback (ts, hc); - return http_ts_server_rx_callback (ts, hc); -} - int http_ts_builtin_tx_callback (session_t *ts) { @@ -1142,10 +1149,11 @@ http_transport_connect (transport_endpoint_cfg_t *tep) hc = http_conn_get_w_thread (hc_index, 0); hc->h_pa_wrk_index = sep->app_wrk_index; hc->h_pa_app_api_ctx = sep->opaque; - hc->is_client = 1; hc->state = HTTP_CONN_STATE_CONNECTING; cargs->api_context = hc_index; + HTTP_DBG (1, "hc ho_index %x", hc_index); + if ((error = vnet_connect (cargs))) return error; @@ -1273,12 +1281,14 @@ http_app_tx_callback (void *session, transport_send_params_t *sp) u32 max_burst_sz, sent; http_conn_t *hc; + HTTP_DBG (1, "app session conn index %x", as->connection_index); + hc = http_conn_get_w_thread (as->connection_index, as->thread_index); - if (hc->http_state < HTTP_STATE_WAIT_APP) + if (!http_state_is_tx_valid (hc)) { if (hc->state != HTTP_CONN_STATE_CLOSED) - clib_warning ("app data req state %u session state %u", hc->http_state, - hc->state); + clib_warning ("app data req state '%U' session state %u", + format_http_state, hc->http_state, hc->state); svm_fifo_dequeue_drop_all (as->tx_fifo); return 0; } diff --git a/src/plugins/http/http.h b/src/plugins/http/http.h index 50203dcd2f9..dbae5ac4611 100644 --- a/src/plugins/http/http.h +++ b/src/plugins/http/http.h @@ -61,9 +61,13 @@ typedef enum http_conn_state_ typedef enum http_state_ { - HTTP_STATE_WAIT_METHOD, - HTTP_STATE_WAIT_APP, - HTTP_STATE_IO_MORE_DATA, + HTTP_STATE_IDLE = 0, + HTTP_STATE_WAIT_APP_METHOD, + HTTP_STATE_WAIT_CLIENT_METHOD, + HTTP_STATE_WAIT_SERVER_REPLY, + HTTP_STATE_WAIT_APP_REPLY, + HTTP_STATE_CLIENT_IO_MORE_DATA, + HTTP_STATE_APP_IO_MORE_DATA, HTTP_N_STATES, } http_state_t; @@ -232,7 +236,6 @@ typedef struct http_tc_ u8 *rx_buf; u32 rx_buf_offset; http_buffer_t tx_buf; - u8 is_client; u32 to_recv; u32 bytes_dequeued; } http_conn_t; @@ -263,6 +266,16 @@ typedef struct http_main_ u32 fifo_size; } http_main_t; +static inline int +http_state_is_tx_valid (http_conn_t *hc) +{ + http_state_t state = hc->http_state; + return (state == HTTP_STATE_APP_IO_MORE_DATA || + state == HTTP_STATE_CLIENT_IO_MORE_DATA || + state == HTTP_STATE_WAIT_APP_REPLY || + state == HTTP_STATE_WAIT_APP_METHOD); +} + #endif /* SRC_PLUGINS_HTTP_HTTP_H_ */ /* -- cgit 1.2.3-korg