diff options
Diffstat (limited to 'src/plugins/http/http.c')
-rw-r--r-- | src/plugins/http/http.c | 481 |
1 files changed, 436 insertions, 45 deletions
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index 279a46a7fc6..752ca47a691 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -20,6 +20,15 @@ static http_main_t http_main; #define HTTP_FIFO_THRESH (16 << 10) +#define CONTENT_LEN_STR "Content-Length: " + +/* HTTP state machine result */ +typedef enum http_sm_result_t_ +{ + HTTP_SM_STOP = 0, + HTTP_SM_CONTINUE = 1, + HTTP_SM_ERROR = -1, +} http_sm_result_t; const char *http_status_code_str[] = { #define _(c, s, str) str, @@ -147,14 +156,14 @@ http_ts_accept_callback (session_t *ts) hc_index = http_conn_alloc_w_thread (ts->thread_index); hc = http_conn_get_w_thread (hc_index, ts->thread_index); clib_memcpy_fast (hc, lhc, sizeof (*lhc)); - hc->c_thread_index = vlib_get_thread_index (); + hc->c_thread_index = ts->thread_index; hc->h_hc_index = hc_index; hc->h_tc_session_handle = session_handle (ts); hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP; hc->state = HTTP_CONN_STATE_ESTABLISHED; - hc->req_state = HTTP_REQ_STATE_WAIT_METHOD; + hc->http_state = HTTP_STATE_WAIT_METHOD; ts->session_state = SESSION_STATE_READY; ts->opaque = hc_index; @@ -163,7 +172,6 @@ http_ts_accept_callback (session_t *ts) * Alloc session and initialize */ as = session_alloc (hc->c_thread_index); - as->session_state = SESSION_STATE_CREATED; hc->c_s_index = as->session_index; as->app_wrk_index = hc->h_pa_wrk_index; @@ -212,10 +220,66 @@ http_ts_accept_callback (session_t *ts) } static int -http_ts_connected_callback (u32 http_app_index, u32 hc_index, session_t *ts, +http_ts_connected_callback (u32 http_app_index, u32 ho_hc_index, session_t *ts, session_error_t err) { - clib_warning ("not supported"); + u32 new_hc_index; + session_t *as; + http_conn_t *hc, *ho_hc; + app_worker_t *app_wrk; + int rv; + + if (err) + { + clib_warning ("ERROR: %d", err); + return 0; + } + + new_hc_index = http_conn_alloc_w_thread (ts->thread_index); + hc = http_conn_get_w_thread (new_hc_index, ts->thread_index); + ho_hc = http_conn_get_w_thread (ho_hc_index, 0); + + ASSERT (ho_hc->state == HTTP_CONN_STATE_CONNECTING); + + clib_memcpy_fast (hc, ho_hc, sizeof (*hc)); + + hc->c_thread_index = ts->thread_index; + hc->h_tc_session_handle = session_handle (ts); + hc->c_c_index = new_hc_index; + hc->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP; + hc->state = HTTP_CONN_STATE_ESTABLISHED; + hc->http_state = HTTP_STATE_WAIT_APP; + + ts->session_state = SESSION_STATE_READY; + ts->opaque = new_hc_index; + + /* allocate app session and initialize */ + + as = session_alloc (hc->c_thread_index); + hc->c_s_index = as->session_index; + as->connection_index = hc->c_c_index; + as->app_wrk_index = hc->h_pa_wrk_index; + as->session_state = SESSION_STATE_READY; + as->session_type = session_type_from_proto_and_ip ( + TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type)); + + app_wrk = app_worker_get (hc->h_pa_wrk_index); + if (!app_wrk) + { + clib_warning ("no app worker"); + return -1; + } + + if ((rv = app_worker_init_connected (app_wrk, as))) + { + HTTP_DBG (1, "failed to allocate fifos"); + session_free (as); + return rv; + } + app_worker_connect_notify (app_wrk, as, err, hc->h_pa_app_api_ctx); + hc->h_pa_session_handle = session_handle (as); + http_conn_timer_start (hc); + return 0; } @@ -243,7 +307,7 @@ http_ts_reset_callback (session_t *ts) hc->state = HTTP_CONN_STATE_CLOSED; http_buffer_free (&hc->tx_buf); - hc->req_state = HTTP_REQ_STATE_WAIT_METHOD; + hc->http_state = HTTP_STATE_WAIT_METHOD; session_transport_reset_notify (&hc->connection); http_disconnect_transport (hc); @@ -269,6 +333,10 @@ static const char *http_response_template = "HTTP/1.1 200 OK\r\n" "Content-Type: %s\r\n" "Content-Length: %lu\r\n\r\n"; +static const char *http_request_template = "GET %s HTTP/1.1\r\n" + "User-Agent: VPP HTTP client\r\n" + "Accept: */*\r\n"; + static u32 send_data (http_conn_t *hc, u8 *data, u32 length, u32 offset) { @@ -309,7 +377,7 @@ send_error (http_conn_t *hc, http_status_code_t ec) } static int -read_request (http_conn_t *hc) +read_http_message (http_conn_t *hc) { u32 max_deq, cursize; session_t *ts; @@ -337,7 +405,7 @@ static int v_find_index (u8 *vec, u32 offset, char *str) { int start_index = offset; - u32 slen = (u32) strnlen_s_inline (str, 8); + u32 slen = (u32) strnlen_s_inline (str, 16); u32 vlen = vec_len (vec); ASSERT (slen > 0); @@ -357,8 +425,8 @@ v_find_index (u8 *vec, u32 offset, char *str) /** * waiting for request method from peer - parse request method and data */ -static int -state_wait_method (http_conn_t *hc, transport_send_params_t *sp) +static http_sm_result_t +state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp) { http_status_code_t ec; app_worker_t *app_wrk; @@ -368,11 +436,11 @@ state_wait_method (http_conn_t *hc, transport_send_params_t *sp) u32 len; u8 *buf; - rv = read_request (hc); + rv = read_http_message (hc); /* Nothing yet, wait for data or timer expire */ if (rv) - return 0; + return HTTP_SM_STOP; if (vec_len (hc->rx_buf) < 8) { @@ -425,16 +493,16 @@ state_wait_method (http_conn_t *hc, transport_send_params_t *sp) /* This should not happen as we only handle 1 request per session, * and fifo is allocated, but going forward we should consider * rescheduling */ - return -1; + return HTTP_SM_ERROR; } vec_free (hc->rx_buf); - hc->req_state = HTTP_REQ_STATE_WAIT_APP; + hc->http_state = HTTP_STATE_WAIT_APP; app_wrk = app_worker_get_if_valid (as->app_wrk_index); app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); - return 0; + return HTTP_SM_STOP; error: @@ -442,14 +510,14 @@ error: session_transport_closing_notify (&hc->connection); http_disconnect_transport (hc); - return -1; + return HTTP_SM_ERROR; } /** * waiting for data from app */ -static int -state_wait_app (http_conn_t *hc, transport_send_params_t *sp) +static http_sm_result_t +state_srv_wait_app (http_conn_t *hc, transport_send_params_t *sp) { http_main_t *hm = &http_main; http_status_code_t ec; @@ -509,26 +577,25 @@ state_wait_app (http_conn_t *hc, transport_send_params_t *sp) vec_free (header); /* Start sending the actual data */ - hc->req_state = HTTP_REQ_STATE_SEND_MORE_DATA; + hc->http_state = HTTP_STATE_IO_MORE_DATA; ASSERT (sp->max_burst_size >= offset); sp->max_burst_size -= offset; - return 1; + return HTTP_SM_CONTINUE; error: send_error (hc, ec); - hc->req_state = HTTP_REQ_STATE_WAIT_METHOD; + hc->http_state = HTTP_STATE_WAIT_METHOD; session_transport_closing_notify (&hc->connection); http_disconnect_transport (hc); - /* stop state machine processing */ - return 0; + return HTTP_SM_STOP; } -static int -state_send_more_data (http_conn_t *hc, transport_send_params_t *sp) +static http_sm_result_t +state_srv_send_more_data (http_conn_t *hc, transport_send_params_t *sp) { u32 max_send = 64 << 10, n_segs; http_buffer_t *hb = &hc->tx_buf; @@ -569,52 +636,331 @@ state_send_more_data (http_conn_t *hc, transport_send_params_t *sp) if (sent && svm_fifo_set_event (ts->tx_fifo)) session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX_FLUSH); - /* Finished transaction, back to HTTP_REQ_STATE_WAIT_METHOD */ - hc->req_state = HTTP_REQ_STATE_WAIT_METHOD; + /* Finished transaction, back to HTTP_STATE_WAIT_METHOD */ + hc->http_state = HTTP_STATE_WAIT_METHOD; http_buffer_free (&hc->tx_buf); } + return HTTP_SM_STOP; +} + +static int +parse_http_header (http_conn_t *hc, int *content_length) +{ + unformat_input_t input; + int i, len; + u8 *line; + + if ((i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "200 OK") < 0)) + { + clib_warning ("bad response code"); + return -1; + } + + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, CONTENT_LEN_STR); + if (i < 0) + { + clib_warning ("cannot find '%s' in the header!", CONTENT_LEN_STR); + return -1; + } + + hc->rx_buf_offset = i; + + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "\n"); + if (i < 0) + { + clib_warning ("end of line missing; incomplete data"); + return -1; + } + + len = i - hc->rx_buf_offset; + line = vec_new (u8, len); + clib_memcpy (line, hc->rx_buf + hc->rx_buf_offset, len); + + unformat_init_vector (&input, line); + if (!unformat (&input, CONTENT_LEN_STR "%d", content_length)) + { + clib_warning ("failed to unformat content length!"); + return -1; + } + unformat_free (&input); + + /* skip rest of the header */ + hc->rx_buf_offset += len; + i = v_find_index (hc->rx_buf, hc->rx_buf_offset, "<html>"); + if (i < 0) + { + clib_warning ("<html> tag not found"); + return -1; + } + hc->rx_buf_offset = i; + return 0; } -typedef int (*http_sm_handler) (http_conn_t *, transport_send_params_t *sp); +static int +state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp) +{ + session_t *as; + http_msg_t msg; + app_worker_t *app_wrk; + int rv, content_length; + + rv = read_http_message (hc); + if (rv) + return HTTP_SM_STOP; + + msg.type = HTTP_MSG_REPLY; + msg.content_type = HTTP_CONTENT_TEXT_HTML; + msg.code = HTTP_STATUS_OK; + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.len = 0; + + rv = parse_http_header (hc, &content_length); + if (rv) + { + clib_warning ("failed to parse http reply"); + session_transport_closing_notify (&hc->connection); + http_disconnect_transport (hc); + return -1; + } + + msg.data.len = content_length; + u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + as = session_get_from_handle (hc->h_pa_session_handle); + svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) }, + { &hc->rx_buf[hc->rx_buf_offset], dlen } }; + + rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */); + if (rv < 0) + { + clib_warning ("error enqueue"); + return HTTP_SM_ERROR; + } + hc->rx_buf_offset += dlen; + hc->http_state = HTTP_STATE_IO_MORE_DATA; + hc->to_recv = content_length - dlen; + + if (hc->rx_buf_offset == vec_len (hc->rx_buf)) + { + vec_reset_length (hc->rx_buf); + hc->rx_buf_offset = 0; + } + + if (hc->to_recv == 0) + { + hc->rx_buf_offset = 0; + vec_reset_length (hc->rx_buf); + hc->http_state = HTTP_STATE_WAIT_APP; + } + + app_wrk = app_worker_get_if_valid (as->app_wrk_index); + app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + return HTTP_SM_STOP; +} + +static int +cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as) +{ + app_worker_t *app_wrk; + u32 max_enq, n_enq, dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + int rv; + + max_enq = svm_fifo_max_enqueue (as->rx_fifo); + n_enq = clib_min (max_enq, dlen); + rv = svm_fifo_enqueue (as->rx_fifo, n_enq, &hc->rx_buf[hc->rx_buf_offset]); + if (rv < 0) + { + clib_warning ("enqueue failed"); + return -1; + } + + hc->rx_buf_offset += rv; + + if (hc->rx_buf_offset >= vec_len (hc->rx_buf)) + { + vec_reset_length (hc->rx_buf); + hc->rx_buf_offset = 0; + } + + app_wrk = app_worker_get_if_valid (as->app_wrk_index); + ASSERT (app_wrk); + + app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX); + return 1; +} + +static http_sm_result_t +state_cln_recv_more_data (http_conn_t *hc, transport_send_params_t *sp) +{ + session_t *as; + u32 max_deq; + session_t *ts; + int n_read, rv; + + as = session_get_from_handle (hc->h_pa_session_handle); + ts = session_get_from_handle (hc->h_tc_session_handle); + + u32 dlen = vec_len (hc->rx_buf) - hc->rx_buf_offset; + if (dlen) + { + rv = cln_drain_rx_buf (hc, ts, as); + if (rv < 0) + { + clib_warning ("drain rx error!"); + return HTTP_SM_ERROR; + } + goto maybe_reschedule; + } + + if (hc->to_recv == 0) + { + ASSERT (vec_len (hc->rx_buf) == 0); + ASSERT (hc->rx_buf_offset == 0); + hc->http_state = HTTP_STATE_WAIT_APP; + return HTTP_SM_STOP; + } + + max_deq = svm_fifo_max_dequeue (ts->rx_fifo); + if (max_deq == 0) + return HTTP_SM_STOP; + + ASSERT (vec_len (hc->rx_buf) == 0); + ASSERT (hc->rx_buf_offset == 0); + + vec_validate (hc->rx_buf, max_deq - 1); + n_read = svm_fifo_dequeue (ts->rx_fifo, max_deq, hc->rx_buf); + ASSERT (n_read == max_deq); + + if (svm_fifo_is_empty (ts->rx_fifo)) + svm_fifo_unset_event (ts->rx_fifo); + + hc->to_recv -= n_read; + vec_set_len (hc->rx_buf, max_deq); + +maybe_reschedule: + if (hc->rx_buf_offset < vec_len (hc->rx_buf) || + svm_fifo_max_dequeue_cons (ts->rx_fifo)) + { + if (svm_fifo_set_event (ts->rx_fifo)) + session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); + } + return HTTP_SM_CONTINUE; +} + +static http_sm_result_t +state_cln_wait_app (http_conn_t *hc, transport_send_params_t *sp) +{ + session_t *as; + http_msg_t msg; + http_status_code_t ec; + u8 *buf = 0, *request; + u32 offset; + int rv; + + as = session_get_from_handle (hc->h_pa_session_handle); + rv = svm_fifo_dequeue (as->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + if (msg.type != HTTP_MSG_REQUEST || msg.data.type > HTTP_MSG_DATA_PTR) + { + clib_warning ("unexpected msg type from app %u", msg.type); + ec = HTTP_STATUS_INTERNAL_ERROR; + goto error; + } + + vec_validate (buf, msg.data.len - 1); + rv = svm_fifo_dequeue (as->tx_fifo, msg.data.len, buf); + ASSERT (rv == msg.data.len); -static http_sm_handler req_state_funcs[HTTP_REQ_N_STATES] = { + request = format (0, http_request_template, buf); + offset = send_data (hc, request, vec_len (request), 0); + if (offset != vec_len (request)) + { + clib_warning ("sending request failed!"); + ec = HTTP_STATUS_INTERNAL_ERROR; + goto error; + } + + hc->http_state = HTTP_STATE_WAIT_METHOD; + + vec_free (buf); + vec_free (request); + + return HTTP_SM_CONTINUE; + +error: + send_error (hc, ec); + session_transport_closing_notify (&hc->connection); + http_disconnect_transport (hc); + return HTTP_SM_STOP; +} + +typedef http_sm_result_t (*http_sm_handler) (http_conn_t *, + transport_send_params_t *sp); + +static http_sm_handler srv_state_funcs[HTTP_N_STATES] = { /* Waiting for GET, POST, etc. */ - state_wait_method, + state_srv_wait_method, /* Wait for data from app */ - state_wait_app, + state_srv_wait_app, /* Send more data */ - state_send_more_data, + state_srv_send_more_data, +}; + +static http_sm_handler cln_state_funcs[HTTP_N_STATES] = { + /* wait for reply */ + state_cln_wait_method, + /* wait for data from app */ + state_cln_wait_app, + /* receive more data */ + state_cln_recv_more_data, }; static void http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp) { - int rv; - + http_sm_result_t res; + http_sm_handler *state_fn = + hc->is_client ? cln_state_funcs : srv_state_funcs; do { - rv = req_state_funcs[hc->req_state](hc, sp); - if (rv < 0) + res = state_fn[hc->http_state](hc, sp); + if (res == HTTP_SM_ERROR) return; } - while (rv); + while (res == HTTP_SM_CONTINUE); /* Reset the session expiration timer */ http_conn_timer_update (hc); } static int -http_ts_rx_callback (session_t *ts) +http_ts_server_rx_callback (session_t *ts, http_conn_t *hc) { - http_conn_t *hc; + if (hc->http_state != HTTP_STATE_WAIT_METHOD) + { + clib_warning ("tcp data in req state %u", hc->http_state); + return 0; + } - hc = http_conn_get_w_thread (ts->opaque, ts->thread_index); + http_req_run_state_machine (hc, 0); - if (hc->req_state != HTTP_REQ_STATE_WAIT_METHOD) + if (hc->state == HTTP_CONN_STATE_TRANSPORT_CLOSED) { - clib_warning ("tcp data in req state %u", hc->req_state); + if (!svm_fifo_max_dequeue_cons (ts->rx_fifo)) + session_transport_closing_notify (&hc->connection); + } + return 0; +} + +static int +http_ts_client_rx_callback (session_t *ts, http_conn_t *hc) +{ + if (hc->http_state != HTTP_STATE_WAIT_METHOD && + hc->http_state != HTTP_STATE_IO_MORE_DATA) + { + clib_warning ("http in unexpected state %d (ts %d)", hc->http_state, + ts->session_index); return 0; } @@ -628,6 +974,17 @@ http_ts_rx_callback (session_t *ts) return 0; } +static int +http_ts_rx_callback (session_t *ts) +{ + http_conn_t *hc; + + hc = http_conn_get_w_thread (ts->opaque, ts->thread_index); + if (hc->is_client) + return http_ts_client_rx_callback (ts, hc); + return http_ts_server_rx_callback (ts, hc); +} + int http_ts_builtin_tx_callback (session_t *ts) { @@ -738,7 +1095,34 @@ http_transport_enable (vlib_main_t *vm, u8 is_en) static int http_transport_connect (transport_endpoint_cfg_t *tep) { - return -1; + vnet_connect_args_t _cargs, *cargs = &_cargs; + http_main_t *hm = &http_main; + session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; + application_t *app; + http_conn_t *hc; + int error; + u32 hc_index; + app_worker_t *app_wrk = app_worker_get (sep->app_wrk_index); + + clib_memset (cargs, 0, sizeof (*cargs)); + clib_memcpy (&cargs->sep_ext, sep, sizeof (session_endpoint_cfg_t)); + cargs->sep.transport_proto = TRANSPORT_PROTO_TCP; + cargs->app_index = hm->app_index; + app = application_get (app_wrk->app_index); + cargs->sep_ext.ns_index = app->ns_index; + + hc_index = http_conn_alloc_w_thread (0 /* ts->thread_index */); + hc = http_conn_get_w_thread (hc_index, 0); + hc->h_pa_wrk_index = sep->app_wrk_index; + hc->h_pa_app_api_ctx = sep->opaque; + hc->is_client = 1; + hc->state = HTTP_CONN_STATE_CONNECTING; + cargs->api_context = hc_index; + + if ((error = vnet_connect (cargs))) + return error; + + return 0; } static u32 @@ -819,6 +1203,13 @@ http_transport_close (u32 hc_index, u32 thread_index) HTTP_DBG (1, "App disconnecting %x", hc_index); hc = http_conn_get_w_thread (hc_index, thread_index); + if (hc->state == HTTP_CONN_STATE_CONNECTING) + { + hc->state = HTTP_CONN_STATE_APP_CLOSED; + http_disconnect_transport (hc); + return; + } + as = session_get_from_handle (hc->h_pa_session_handle); /* Nothing more to send, confirm close */ @@ -856,10 +1247,10 @@ http_app_tx_callback (void *session, transport_send_params_t *sp) http_conn_t *hc; hc = http_conn_get_w_thread (as->connection_index, as->thread_index); - if (hc->req_state < HTTP_REQ_STATE_WAIT_APP) + if (hc->http_state < HTTP_STATE_WAIT_APP) { if (hc->state != HTTP_CONN_STATE_CLOSED) - clib_warning ("app data req state %u session state %u", hc->req_state, + clib_warning ("app data req state %u session state %u", hc->http_state, hc->state); svm_fifo_dequeue_drop_all (as->tx_fifo); return 0; |