From 21e7392fca2c430018cf387bb3e368ea4c665446 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev <konstantin.ananyev@intel.com> Date: Fri, 3 Mar 2017 18:40:23 +0000 Subject: Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly. Changes in public API: - removes tle_tcp_stream_synreqs() and tle_tcp_reject() - adds tle_tcp_stream_update_cfg Allocates and fills new stream when final ACK for 3-way handshake is received. Changes in l4fwd sample application: prevents l4fwd to call close() on error event immediately: first try to recv/send remaining data. Change-Id: I8c5b9d365353084083731a4ce582197a8268688f Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com> --- lib/libtle_l4p/stream_table.h | 4 +- lib/libtle_l4p/syncookie.h | 9 +- lib/libtle_l4p/tcp_ctl.h | 7 + lib/libtle_l4p/tcp_rxtx.c | 500 +++++++++++++++++++----------------------- lib/libtle_l4p/tcp_stream.c | 85 +++++++ lib/libtle_l4p/tle_event.h | 5 + lib/libtle_l4p/tle_tcp.h | 104 ++++----- 7 files changed, 361 insertions(+), 353 deletions(-) (limited to 'lib') diff --git a/lib/libtle_l4p/stream_table.h b/lib/libtle_l4p/stream_table.h index 8ad1103..29f1f63 100644 --- a/lib/libtle_l4p/stream_table.h +++ b/lib/libtle_l4p/stream_table.h @@ -110,13 +110,13 @@ stbl_add_entry(struct stbl *st, const union pkt_info *pi) } static inline struct stbl_entry * -stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt) +stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s) { struct stbl_entry *se; se = stbl_add_entry(st, pi); if (se != NULL) - se->data = (void *)((uintptr_t)pkt | STE_PKT); + se->data = (void *)(uintptr_t)s; return se; } diff --git a/lib/libtle_l4p/syncookie.h b/lib/libtle_l4p/syncookie.h index 276d45a..ad70b7d 100644 --- a/lib/libtle_l4p/syncookie.h +++ b/lib/libtle_l4p/syncookie.h @@ -156,13 +156,8 @@ sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len) static inline void sync_fill_tcb(struct tcb *tcb, const union seg_info *si, - const struct rte_mbuf *mb) + const struct syn_opts *so) { - const struct tcp_hdr *th; - - th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, - mb->l2_len + mb->l3_len); - tcb->rcv.nxt = si->seq; tcb->rcv.irs = si->seq - 1; @@ -174,7 +169,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si, tcb->snd.wu.wl1 = si->seq; tcb->snd.wu.wl2 = si->ack; - get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th)); + tcb->so = *so; tcb->snd.wscale = tcb->so.wscale; tcb->snd.mss = tcb->so.mss; diff --git a/lib/libtle_l4p/tcp_ctl.h b/lib/libtle_l4p/tcp_ctl.h index dcb9c3e..95c2bbc 100644 --- a/lib/libtle_l4p/tcp_ctl.h +++ b/lib/libtle_l4p/tcp_ctl.h @@ -41,6 +41,13 @@ tcp_stream_up(struct tle_tcp_stream *s) rwl_up(&s->tx.use); } +/* calculate RCV.WND value based on size of stream receive buffer */ +static inline uint32_t +calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale) +{ + return s->rx.q->prod.mask << scale; +} + /* empty stream's receive queue */ static void empty_rq(struct tle_tcp_stream *s) diff --git a/lib/libtle_l4p/tcp_rxtx.c b/lib/libtle_l4p/tcp_rxtx.c index 4e43730..6085814 100644 --- a/lib/libtle_l4p/tcp_rxtx.c +++ b/lib/libtle_l4p/tcp_rxtx.c @@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port, l4h->dst_port = port.src; wnd = (flags & TCP_FLAG_SYN) ? - RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) : + RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) : tcb->rcv.wnd >> tcb->rcv.wscale; /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */ @@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len, } static inline int -restore_syn_pkt(const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) +restore_syn_opt(struct syn_opts *so, const union pkt_info *pi, + const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb) { int32_t rc; uint32_t len; - struct tcp_hdr *th; - struct syn_opts so; + const struct tcp_hdr *th; /* check that ACK, etc fields are what we expected. */ rc = sync_check_ack(pi, si->seq, si->ack - 1, ts); if (rc < 0) return rc; - so.mss = rc; + so->mss = rc; - th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *, + th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *, mb->l2_len + mb->l3_len); len = mb->l4_len - sizeof(*th); - sync_get_opts(&so, (uintptr_t)(th + 1), len); - - /* reconstruct SYN options, extend header size if necessary */ - if (len < TCP_TX_OPT_LEN_MAX) { - len = TCP_TX_OPT_LEN_MAX - len; - th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN << - TCP_DATA_OFFSET; - mb->pkt_len += len; - mb->data_len += len; - mb->l4_len += len; - } - - fill_syn_opts(th + 1, &so); - return 0; -} - -static inline int -rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, - const union pkt_info *pi, const union seg_info *si, - uint32_t ts, struct rte_mbuf *mb) -{ - int32_t rc; - struct stbl_entry *se; - - if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) - return -EINVAL; - - /* ACK for new connection request. */ - - rc = restore_syn_pkt(pi, si, ts, mb); - if (rc < 0) - return rc; - - se = stbl_add_pkt(st, pi, mb); - if (se == NULL) - return -ENOBUFS; - - /* put new connection requests into stream listen queue */ - if (rte_ring_enqueue_burst(s->rx.q, - (void * const *)&se, 1) != 1) { - stbl_del_pkt(st, se, pi); - return -ENOBUFS; - } - + sync_get_opts(so, (uintptr_t)(th + 1), len); return 0; } @@ -848,6 +804,151 @@ stream_term(struct tle_tcp_stream *s) s->err.cb.func(s->err.cb.data, &s->s); } +static inline int +stream_fill_dest(struct tle_tcp_stream *s) +{ + int32_t rc; + const void *da; + + if (s->s.type == TLE_V4) + da = &s->s.ipv4.addr.src; + else + da = &s->s.ipv6.addr.src; + + rc = stream_get_dest(&s->s, da, &s->tx.dst); + return (rc < 0) ? rc : 0; +} + +/* + * helper function, prepares a new accept stream. + */ +static inline int +accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st, + struct tle_tcp_stream *cs, const struct syn_opts *so, + uint32_t tms, const union pkt_info *pi, const union seg_info *si) +{ + int32_t rc; + uint32_t rtt; + + /* some TX still pending for that stream. */ + if (TCP_STREAM_TX_PENDING(cs)) + return -EAGAIN; + + /* setup L4 ports and L3 addresses fields. */ + cs->s.port.raw = pi->port.raw; + cs->s.pmsk.raw = UINT32_MAX; + + if (pi->tf.type == TLE_V4) { + cs->s.ipv4.addr = pi->addr4; + cs->s.ipv4.mask.src = INADDR_NONE; + cs->s.ipv4.mask.dst = INADDR_NONE; + } else if (pi->tf.type == TLE_V6) { + cs->s.ipv6.addr = *pi->addr6; + rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.src)); + rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, + sizeof(cs->s.ipv6.mask.dst)); + } + + /* setup TCB */ + sync_fill_tcb(&cs->tcb, si, so); + cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale); + + /* + * estimate the rto + * for now rtt is calculated based on the tcp TMS option, + * later add real-time one + */ + if (cs->tcb.so.ts.ecr) { + rtt = tms - cs->tcb.so.ts.ecr; + rto_estimate(&cs->tcb, rtt); + } else + cs->tcb.snd.rto = TCP_RTO_DEFAULT; + + /* copy streams type. */ + cs->s.type = ps->s.type; + + /* retrive and cache destination information. */ + rc = stream_fill_dest(cs); + if (rc != 0) + return rc; + + /* update snd.mss with SMSS value */ + cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); + + /* setup congestion variables */ + cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); + cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; + + cs->tcb.state = TCP_ST_ESTABLISHED; + + /* add stream to the table */ + cs->ste = stbl_add_stream(st, pi, cs); + if (cs->ste == NULL) + return -ENOBUFS; + + cs->tcb.uop |= TCP_OP_ACCEPT; + tcp_stream_up(cs); + return 0; +} + + +/* + * ACK for new connection request arrived. + * Check that the packet meets all conditions and try to open a new stream. + * returns: + * < 0 - invalid packet + * == 0 - packet is valid and new stream was opened for it. + * > 0 - packet is valid, but failed to open new stream. + */ +static inline int +rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st, + const union pkt_info *pi, const union seg_info *si, + uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp) +{ + int32_t rc; + struct tle_ctx *ctx; + struct tle_stream *ts; + struct tle_tcp_stream *cs; + struct syn_opts so; + + *csp = NULL; + + if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0) + return -EINVAL; + + rc = restore_syn_opt(&so, pi, si, tms, mb); + if (rc < 0) + return rc; + + ctx = s->s.ctx; + + /* allocate new stream */ + ts = get_stream(ctx); + cs = TCP_STREAM(ts); + if (ts == NULL) + return ENFILE; + + /* prepare stream to handle new connection */ + if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) { + + /* put new stream in the accept queue */ + if (rte_ring_enqueue_burst(s->rx.q, + (void * const *)&ts, 1) == 1) { + *csp = cs; + return 0; + } + + /* cleanup on failure */ + tcp_stream_down(cs); + stbl_del_pkt(st, cs->ste, pi); + cs->ste = NULL; + } + + tcp_stream_reset(ctx, cs); + return ENOBUFS; +} + static inline int data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen, uint32_t *seqn, uint32_t *plen) @@ -1591,14 +1692,35 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts, return num - k; } +static inline uint32_t +rx_new_stream(struct tle_tcp_stream *s, uint32_t ts, + const union pkt_info *pi, const union seg_info si[], + struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], + uint32_t num) +{ + uint32_t i; + + if (rwl_acquire(&s->rx.use) > 0) { + i = rx_stream(s, ts, pi, si, mb, rp, rc, num); + rwl_release(&s->rx.use); + return i; + } + + for (i = 0; i != num; i++) { + rc[i] = ENOENT; + rp[i] = mb[i]; + } + return 0; +} + static inline uint32_t rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, const union pkt_info pi[], const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[], uint32_t num) { - struct tle_tcp_stream *s; - uint32_t i, k, state; + struct tle_tcp_stream *cs, *s; + uint32_t i, k, n, state; int32_t ret; s = rx_obtain_stream(dev, st, &pi[0], type); @@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts, if (state == TCP_ST_LISTEN) { /* one connection per flow */ - ret = EINVAL; - for (i = 0; i != num && ret != 0; i++) { - ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]); - if (ret != 0) { - rc[k] = -ret; - rp[k] = mb[i]; - k++; - } - } - /* duplicate SYN requests */ - for (; i != num; i++, k++) { - rc[k] = EINVAL; + cs = NULL; + ret = -EINVAL; + for (i = 0; i != num; i++) { + + ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs); + + /* valid packet encountered */ + if (ret >= 0) + break; + + /* invalid packet, keep trying to find a proper one */ + rc[k] = -ret; rp[k] = mb[i]; + k++; } - if (k != num && s->rx.ev != NULL) - tle_event_raise(s->rx.ev); - else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1) - s->rx.cb.func(s->rx.cb.data, &s->s); + /* packet is valid, but we are out of streams to serve it */ + if (ret > 0) { + for (; i != num; i++, k++) { + rc[k] = ret; + rp[k] = mb[i]; + } + /* new stream is accepted */ + } else if (ret == 0) { + + /* inform listen stream about new connections */ + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL && + rte_ring_count(s->rx.q) == 1) + s->rx.cb.func(s->rx.cb.data, &s->s); + + /* if there is no data, drop current packet */ + if (PKT_L4_PLEN(mb[i]) == 0) { + rc[k] = ENODATA; + rp[k++] = mb[i++]; + } + + /* process remaining packets for that stream */ + if (num != i) { + n = rx_new_stream(cs, ts, pi + i, si + i, + mb + i, rp + k, rc + k, num - i); + k += num - n - i; + } + } } else { i = rx_stream(s, ts, pi, si, mb, rp, rc, num); @@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], } uint16_t -tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], +tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[], uint32_t num) { - uint32_t i, n; + uint32_t n; struct tle_tcp_stream *s; - struct stbl_entry *se[num]; s = TCP_STREAM(ts); - n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num); + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num); if (n == 0) return 0; - for (i = 0; i != n; i++) { - rq[i].pkt = stbl_get_pkt(se[i]); - rq[i].opaque = se[i]; - } - /* * if we still have packets to read, * then rearm stream RX event. @@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[], return n; } -static inline int -stream_fill_dest(struct tle_tcp_stream *s) -{ - int32_t rc; - const void *da; - - if (s->s.type == TLE_V4) - da = &s->s.ipv4.addr.src; - else - da = &s->s.ipv6.addr.src; - - rc = stream_get_dest(&s->s, da, &s->tx.dst); - return (rc < 0) ? rc : 0; -} - -/* - * helper function, prepares an accepted stream. - */ -static int -accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs, - const struct tle_tcp_accept_param *prm, uint32_t tms, - const union pkt_info *pi, const union seg_info *si) -{ - int32_t rc; - uint32_t rtt; - - /* some TX still pending for that stream. */ - if (TCP_STREAM_TX_PENDING(cs)) - return -EAGAIN; - - /* setup L4 ports and L3 addresses fields. */ - cs->s.port.raw = pi->port.raw; - cs->s.pmsk.raw = UINT32_MAX; - - if (pi->tf.type == TLE_V4) { - cs->s.ipv4.addr = pi->addr4; - cs->s.ipv4.mask.src = INADDR_NONE; - cs->s.ipv4.mask.dst = INADDR_NONE; - } else if (pi->tf.type == TLE_V6) { - cs->s.ipv6.addr = *pi->addr6; - rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.src)); - rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none, - sizeof(cs->s.ipv6.mask.dst)); - } - - /* setup TCB */ - sync_fill_tcb(&cs->tcb, si, prm->syn.pkt); - cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale; - - /* setup stream notification menchanism */ - cs->rx.ev = prm->cfg.recv_ev; - cs->rx.cb = prm->cfg.recv_cb; - cs->tx.ev = prm->cfg.send_ev; - cs->tx.cb = prm->cfg.send_cb; - cs->err.ev = prm->cfg.err_ev; - cs->err.cb = prm->cfg.err_cb; - - /* store other params */ - cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries : - TLE_TCP_DEFAULT_RETRIES; - - /* - * estimate the rto - * for now rtt is calculated based on the tcp TMS option, - * later add real-time one - */ - if (cs->tcb.so.ts.ecr) { - rtt = tms - cs->tcb.so.ts.ecr; - rto_estimate(&cs->tcb, rtt); - } else - cs->tcb.snd.rto = TCP_RTO_DEFAULT; - - tcp_stream_up(cs); - - /* copy streams type. */ - cs->s.type = ps->s.type; - - /* retrive and cache destination information. */ - rc = stream_fill_dest(cs); - if (rc != 0) - return rc; - - /* update snd.mss with SMSS value */ - cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst); - - /* setup congestion variables */ - cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss); - cs->tcb.snd.ssthresh = cs->tcb.snd.wnd; - - cs->tcb.state = TCP_ST_ESTABLISHED; - cs->tcb.uop |= TCP_OP_ACCEPT; - - /* add stream to the table */ - cs->ste = prm->syn.opaque; - rte_smp_wmb(); - cs->ste->data = cs; - return 0; -} - -/* - * !!! - * Right now new stream rcv.wnd is set to zero. - * That simplifies handling of new connection establishment - * (as no data segments could be received), - * but has to be addressed. - * possible ways: - * - send ack after accept creates new stream with new rcv.wnd value. - * the problem with that approach that single ack is not delivered - * reliably (could be lost), plus might slowdown connection establishment - * (extra packet per connection, that client has to wait for). - * - allocate new stream at ACK recieve stage. - * As a drawback - whole new stream allocation/connection establishment - * will be done in BE. - * !!! - */ -int -tle_tcp_stream_accept(struct tle_stream *ts, - const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], - uint32_t num) -{ - struct tle_tcp_stream *cs, *s; - struct tle_ctx *ctx; - uint32_t i, j, n, tms; - int32_t rc; - union pkt_info pi[num]; - union seg_info si[num]; - - tms = tcp_get_tms(); - s = TCP_STREAM(ts); - - for (i = 0; i != num; i++) - get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]); - - /* mark stream as not closable */ - if (rwl_acquire(&s->rx.use) < 0) - return -EINVAL; - - ctx = s->s.ctx; - n = get_streams(ctx, rs, num); - - rc = 0; - for (i = 0; i != n; i++) { - - /* prepare new stream */ - cs = TCP_STREAM(rs[i]); - rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i); - if (rc != 0) - break; - } - - rwl_release(&s->rx.use); - - /* free 'SYN' mbufs. */ - for (j = 0; j != i; j++) - rte_pktmbuf_free(prm[j].syn.pkt); - - /* close failed stream, put unused streams back to the free list. */ - if (rc != 0) { - tle_tcp_stream_close(rs[i]); - for (j = i + 1; j != n; j++) { - cs = TCP_STREAM(rs[j]); - put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs)); - } - rte_errno = -rc; - - /* not enough streams are available */ - } else if (n != num) - rte_errno = ENFILE; - - return i; -} - -/* - * !!! implement a proper one, or delete !!! - * need to make sure no race conditions with add/lookup stream table. - */ -void -tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], - uint32_t num) -{ - uint32_t i; - struct rte_mbuf *mb; - struct stbl *st; - union pkt_info pi; - union seg_info si; - - st = CTX_TCP_STLB(s->ctx); - - for (i = 0; i != num; i++) { - mb = rq[i].pkt; - get_pkt_info(mb, &pi, &si); - if (pi.tf.type < TLE_VNUM) - stbl_del_pkt_lock(st, rq[i].opaque, &pi); - - /* !!! send RST pkt to the peer !!! */ - rte_pktmbuf_free(mb); - } -} - uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num) { @@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr) s->tcb.rcv.mss = s->tcb.so.mss; s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT; - s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale; + s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale); s->tcb.rcv.ts = 0; /* add the stream in stream table */ diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index 67ed66b..f06b2ed 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -511,6 +511,7 @@ tle_tcp_stream_listen(struct tle_stream *ts) TCP_ST_LISTEN); if (rc != 0) { s->tcb.uop |= TCP_OP_LISTEN; + s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT); rc = 0; } else rc = -EDEADLK; @@ -520,3 +521,87 @@ tle_tcp_stream_listen(struct tle_stream *ts) rwl_release(&s->rx.use); return rc; } + +/* + * helper function, updates stream config + */ +static inline int +stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm) +{ + int32_t rc1, rc2; + struct tle_tcp_stream *s; + + s = TCP_STREAM(ts); + + rc1 = rwl_try_acquire(&s->rx.use); + rc2 = rwl_try_acquire(&s->tx.use); + + if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) { + rwl_release(&s->tx.use); + rwl_release(&s->rx.use); + return -EINVAL; + } + + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->tx.ev = prm->send_ev; + s->err.ev = prm->err_ev; + + s->rx.cb.data = prm->recv_cb.data; + s->tx.cb.data = prm->send_cb.data; + s->err.cb.data = prm->err_cb.data; + + rte_smp_wmb(); + + s->rx.cb.func = prm->recv_cb.func; + s->tx.cb.func = prm->send_cb.func; + s->err.cb.func = prm->err_cb.func; + + /* store other params */ + s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries : + TLE_TCP_DEFAULT_RETRIES; + + /* invoke async notifications, if any */ + if (rte_ring_count(s->rx.q) != 0) { + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + else if (s->rx.cb.func != NULL) + s->rx.cb.func(s->rx.cb.data, &s->s); + } + if (rte_ring_free_count(s->tx.q) != 0) { + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + else if (s->tx.cb.func != NULL) + s->tx.cb.func(s->tx.cb.data, &s->s); + } + if (s->tcb.state == TCP_ST_CLOSE_WAIT || + s->tcb.state == TCP_ST_CLOSED) { + if (s->err.ev != NULL) + tle_event_raise(s->err.ev); + else if (s->err.cb.func != NULL) + s->err.cb.func(s->err.cb.data, &s->s); + } + + rwl_release(&s->tx.use); + rwl_release(&s->rx.use); + + return 0; +} + +uint32_t +tle_tcp_stream_update_cfg(struct tle_stream *ts[], + struct tle_tcp_stream_cfg prm[], uint32_t num) +{ + int32_t rc; + uint32_t i; + + for (i = 0; i != num; i++) { + rc = stream_update_cfg(ts[i], &prm[i]); + if (rc != 0) { + rte_errno = -rc; + break; + } + } + + return i; +} diff --git a/lib/libtle_l4p/tle_event.h b/lib/libtle_l4p/tle_event.h index b19954a..d730345 100644 --- a/lib/libtle_l4p/tle_event.h +++ b/lib/libtle_l4p/tle_event.h @@ -106,6 +106,11 @@ struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data); */ void tle_event_free(struct tle_event *ev); +static inline enum tle_ev_state +tle_event_state(const struct tle_event *ev) +{ + return ev->state; +} /** * move event from DOWN to UP state. diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h index e6eb336..ec89746 100644 --- a/lib/libtle_l4p/tle_tcp.h +++ b/lib/libtle_l4p/tle_tcp.h @@ -148,40 +148,19 @@ int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr); * <stream open happens here> * tle_tcp_stream_listen(stream_to_listen); * <wait for read event/callback on that stream> - * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs)); - * for (i = 0, k = 0; i != n; i++) { - * rc = <decide should connection from that endpoint be allowed>; - * if (rc == 0) { - * //proceed with connection establishment - * k++; - * accept_param[k].syn = syn_reqs[i]; - * <fill rest of accept_param fields for k-th connection> - * } else { - * //reject connection requests from that endpoint - * rej_reqs[i - k] = syn_reqs[i]; - * } + * n = tle_tcp_accept(stream_to_listen, accepted_streams, + * sizeof(accepted_streams)); + * for (i = 0, i != n; i++) { + * //prepare tle_tcp_stream_cfg for newly accepted streams + * ... + * } + * k = tle_tcp_stream_update_cfg(rs, prm, n); + * if (n != k) { + * //handle error + * ... * } - * - * //reject n - k connection requests - * tle_tcp_reject(stream_to_listen, rej_reqs, n - k); - * - * //accept k new connections - * rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k); - * <handle errors> */ -struct tle_syn_req { - struct rte_mbuf *pkt; - /*< mbuf with incoming connection request. */ - void *opaque; /*< tldk related opaque pointer. */ -}; - -struct tle_tcp_accept_param { - struct tle_syn_req syn; /*< mbuf with incoming SYN request. */ - struct tle_tcp_stream_cfg cfg; /*< stream configure options. */ -}; - - /** * Set stream into the listen state (passive opener), i.e. make stream ready * to accept new connections. @@ -198,28 +177,41 @@ struct tle_tcp_accept_param { int tle_tcp_stream_listen(struct tle_stream *s); /** - * return up to *num* mbufs with SYN requests that were received + * return up to *num* streams from the queue of pending connections * for given TCP endpoint. - * Note that the stream has to be in listen state. - * For each returned mbuf: - * data_off set to the start of the packet - * l2_len, l3_len, l4_len are setup properly - * (so user can still extract L2/L3/L4 header info if needed) - * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. - * L3/L4 checksum is verified. * @param s - * TCP stream to receive packets from. - * @param rq - * An array of tle_syn_req structures that contains - * at least *num* elements in it. + * TCP stream in listen state. + * @param rs + * An array of pointers to the newily accepted streams. + * Each such new stream represents a new connection to the given TCP endpoint. + * Newly accepted stream should be in connected state and ready to use + * by other FE API routines (send/recv/close/etc.). * @param num - * Number of elements in the *pkt* array. + * Number of elements in the *rs* array. * @return - * number of of entries filled inside *pkt* array. + * number of entries filled inside *rs* array. */ -uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[], +uint16_t tle_tcp_stream_accept(struct tle_stream *s, struct tle_stream *rs[], uint32_t num); +/** + * updates configuration (associated events, callbacks, stream parameters) + * for the given streams. + * @param ts + * An array of pointers to the streams to update. + * @param prm + * An array of parameters to update for the given streams. + * @param num + * Number of elements in the *ts* and *prm* arrays. + * @return + * number of streams successfully updated. + * In case of error, error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + */ +uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[], + struct tle_tcp_stream_cfg prm[], uint32_t num); + /** * Accept connection requests for the given stream. * Note that the stream has to be in listen state. @@ -241,27 +233,9 @@ uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[], * - EINVAL - invalid parameter passed to function * - ENFILE - no more streams are avaialble to open. */ -int tle_tcp_stream_accept(struct tle_stream *s, - const struct tle_tcp_accept_param prm[], struct tle_stream *rs[], - uint32_t num); - -/** - * Reject connection requests for the given stream. - * Note that the stream has to be in listen state. - * For each new connection a new stream will be open. - * @param s - * TCP listen stream. - * @param rq - * An array of tle_syn_req structures that contains - * at least *num* elements in it. - * @param num - * Number of elements in the *pkt* array. - */ -void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[], - uint32_t num); /** - * return up to *num* mbufs that was received for given TCP stream. + * Return up to *num* mbufs that was received for given TCP stream. * Note that the stream has to be in connected state. * Data ordering is preserved. * For each returned mbuf: -- cgit