From e151ee29d02d7802fab9e32b50ced54fd8d64160 Mon Sep 17 00:00:00 2001 From: Remy Horton Date: Tue, 30 May 2017 10:53:25 +0100 Subject: Add l4fwd RXTX mode This mode allows for transactions where the request and response are of different payload sizes Change-Id: I0744159f0618c9241e576a4af1c02765bbf1dd9f Signed-off-by: Remy Horton --- examples/l4fwd/README | 9 +++-- examples/l4fwd/common.h | 96 ++++++++++++++++++++++++++++++++++++++++++++- examples/l4fwd/netbe.h | 11 +++++- examples/l4fwd/parse.c | 18 ++++++++- examples/l4fwd/tcp.h | 15 ++++++- examples/l4fwd/udp.h | 2 +- lib/libtle_l4p/tcp_stream.c | 12 ++++++ lib/libtle_l4p/tle_tcp.h | 10 +++++ 8 files changed, 162 insertions(+), 11 deletions(-) diff --git a/examples/l4fwd/README b/examples/l4fwd/README index a232537..a7ae56a 100644 --- a/examples/l4fwd/README +++ b/examples/l4fwd/README @@ -55,10 +55,10 @@ FE configuration record format: - lcore=,op=<"rx|tx|echo|fwd">,\ + lcore=,op=<"rx|tx|echo|fwd|rxtx">,\ laddr=,lport=,raddr=,rport=,\ [txlen=,fwladdr=,fwlport=,fwraddr=,fwrport=,\ - belcore=] + belcore=,rxlen=] lcore - EAL lcore to manage that stream(s) in the FE. It is an mandatory option. @@ -70,12 +70,15 @@ "echo" - mimic recvfrom(..., &addr);sendto(..., &addr); on that stream. "fwd" - forward packets between streams. + "rxtx" - Receive/reply transactions on stream. It is an mandatory option. laddr - local address for the stream to open. It is an mandatory option. lport - local port for the stream to open. It is an mandatory option. raddr - remote address for the stream to open. It is an mandatory option. rport - remote port for the stream to open. It is an mandatory option. - txlen - data length sending in each packet (mandatory for "tx" mode only). + txlen - data length sending in each packet. + (mandatory for "tx" & "rxtx" modes only). + rxlen - Expected response length (mandatory for "rxtx" mode only). fwladdr - local address for the forwarding stream(s) to open (mandatory for "fwd" mode only). fwlport - local port for the forwarding stream(s) to open diff --git a/examples/l4fwd/common.h b/examples/l4fwd/common.h index 8d757f3..ae4f266 100644 --- a/examples/l4fwd/common.h +++ b/examples/l4fwd/common.h @@ -620,9 +620,78 @@ netbe_lcore(void) } static inline int -netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) +netfe_rxtx_get_mss(struct netfe_stream *fes) +{ + switch (fes->proto) { + case TLE_PROTO_TCP: + return tle_tcp_stream_get_mss(fes->s); + + case TLE_PROTO_UDP: + /* The UDP code doesn't have MSS discovery, so have to + * assume arbitary MTU. Going to use default mbuf + * data space as TLDK uses this internally as a + * maximum segment size. + */ + return RTE_MBUF_DEFAULT_DATAROOM - TLE_DST_MAX_HDR; + default: + NETFE_TRACE("%s(%u): Unhandled MSS query (family=%i)\n", + __func__, lcore, fes->proto, fes->family); + return -EINVAL; + } +} + +static inline int +netfe_rxtx_dispatch_reply(uint32_t lcore, struct netfe_stream *fes) + +{ + struct pkt_buf *pb; + int32_t sid; + int32_t cnt_mtu_pkts; + int32_t cnt_all_pkts; + int32_t idx_pkt; + int32_t len_tail; + int32_t mtu; + + pb = &fes->pbuf; + sid = rte_lcore_to_socket_id(lcore) + 1; + mtu = netfe_rxtx_get_mss(fes); + + cnt_mtu_pkts = (fes->txlen / mtu); + cnt_all_pkts = cnt_mtu_pkts; + len_tail = fes->txlen - (mtu * cnt_mtu_pkts); + + if (len_tail > 0) + cnt_all_pkts++; + + if (pb->num + cnt_all_pkts >= RTE_DIM(pb->pkt)) { + NETFE_TRACE("%s(%u): Insufficent space for outbound burst\n", + __func__, lcore); + return -ENOMEM; + } + if (rte_pktmbuf_alloc_bulk(mpool[sid], &pb->pkt[pb->num], cnt_all_pkts) + != 0) { + NETFE_TRACE("%s(%u): rte_pktmbuf_alloc_bulk() failed\n", + __func__, lcore); + return -ENOMEM; + } + + /* Full MTU packets */ + for (idx_pkt = 0; idx_pkt < cnt_mtu_pkts; idx_pkt++) { + rte_pktmbuf_append(pb->pkt[pb->num++], mtu); + } + + /* Last non-MTU packet, if any */ + if (len_tail > 0) + rte_pktmbuf_append(pb->pkt[pb->num++], len_tail); + + return 0; +} + +static inline int +netfe_rx_process(uint32_t lcore, struct netfe_stream *fes) { uint32_t k, n; + uint64_t count_bytes; n = fes->pbuf.num; k = RTE_DIM(fes->pbuf.pkt) - n; @@ -647,9 +716,32 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) /* free all received mbufs. */ if (fes->op == RXONLY) fes->stat.rxb += pkt_buf_empty(&fes->pbuf); + else if (fes->op == RXTX) { + /* RXTX mode. Count incoming bytes then discard. + * If receive threshold (rxlen) exceeded, send out a packet. + */ + count_bytes = pkt_buf_empty(&fes->pbuf); + fes->stat.rxb += count_bytes; + fes->rx_run_len += count_bytes; + if (fes->rx_run_len >= fes->rxlen) { + /* Idle Rx as buffer needed for Tx */ + tle_event_idle(fes->rxev); + fes->stat.rxev[TLE_SEV_IDLE]++; + + /* Discard surplus bytes. For now pipelining of + * requests is not supported. + */ + fes->rx_run_len = 0; + netfe_rxtx_dispatch_reply(lcore, fes); + + /* Kick off a Tx event */ + tle_event_active(fes->txev, TLE_SEV_UP); + fes->stat.txev[TLE_SEV_UP]++; + } + } /* mark stream as writable */ else if (k == RTE_DIM(fes->pbuf.pkt)) { - if (fes->op == RXTX) { + if (fes->op == ECHO) { tle_event_active(fes->txev, TLE_SEV_UP); fes->stat.txev[TLE_SEV_UP]++; } else if (fes->op == FWD) { diff --git a/examples/l4fwd/netbe.h b/examples/l4fwd/netbe.h index 80d1c28..134ce3d 100644 --- a/examples/l4fwd/netbe.h +++ b/examples/l4fwd/netbe.h @@ -44,6 +44,8 @@ #include #include +#define TLE_DEFAULT_MSS 536 + #define MAX_PKT_BURST 0x20 /* Used to allocate the memory for hash key. */ @@ -161,6 +163,7 @@ enum { RXONLY, TXONLY, RXTX, + ECHO, FWD, }; @@ -175,7 +178,8 @@ struct netfe_stream_prm { uint32_t belcore; uint16_t line; uint16_t op; - uint16_t txlen; /* valid/used only for TXONLY op. */ + uint32_t txlen; /* valid/used only for TXONLY op. */ + uint32_t rxlen; /* Used by RXTX */ struct netfe_sprm sprm; struct netfe_sprm fprm; /* valid/used only for FWD op. */ }; @@ -194,7 +198,10 @@ struct netfe_stream { uint16_t op; uint16_t proto; uint16_t family; - uint16_t txlen; + uint32_t txlen; + uint32_t rxlen; + uint16_t reply_count; + uint32_t rx_run_len; uint16_t posterr; /* # of time error event handling was postponed */ struct { uint64_t rxp; diff --git a/examples/l4fwd/parse.c b/examples/l4fwd/parse.c index 4850312..158b2cb 100644 --- a/examples/l4fwd/parse.c +++ b/examples/l4fwd/parse.c @@ -27,7 +27,8 @@ static const struct { } name2feop[] = { { .name = "rx", .op = RXONLY,}, { .name = "tx", .op = TXONLY,}, - { .name = "echo", .op = RXTX,}, + { .name = "echo", .op = ECHO,}, + { .name = "rxtx", .op = RXTX,}, { .name = "fwd", .op = FWD,}, }; @@ -520,6 +521,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) "fwraddr", "fwrport", "belcore", + "rxlen", }; static const arg_handler_t hndl[] = { @@ -535,6 +537,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) parse_ip_val, parse_uint_val, parse_uint_val, + parse_uint_val, }; union parse_val val[RTE_DIM(hndl)]; @@ -553,6 +556,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) pv2saddr(&sp->fprm.local_addr, val + 7, val + 8); pv2saddr(&sp->fprm.remote_addr, val + 9, val + 10); sp->belcore = val[11].u64; + sp->rxlen = val[12].u64; return 0; } @@ -631,6 +635,18 @@ check_netfe_arg(const struct netfe_stream_prm *sp) format_feop(sp->op)); return -EINVAL; } + } else if (sp->op == RXTX) { + /* RXTX: Check tx pkt size */ + if (sp->txlen == 0) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "txlen cannot be zero.\n", sp->line); + return -EINVAL; + } + if (sp->rxlen == 0) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "rxlen cannot be zero.\n", sp->line); + return -EINVAL; + } } return 0; diff --git a/examples/l4fwd/tcp.h b/examples/l4fwd/tcp.h index e4aadb5..701de9b 100644 --- a/examples/l4fwd/tcp.h +++ b/examples/l4fwd/tcp.h @@ -199,6 +199,9 @@ netfe_lcore_init_tcp(const struct netfe_lcore_prm *prm) } else if (prm->stream[i].op == TXONLY) { fes->txlen = prm->stream[i].txlen; fes->raddr = prm->stream[i].sprm.remote_addr; + } else if (prm->stream[i].op == RXTX) { + fes->txlen = prm->stream[i].txlen; + fes->rxlen = prm->stream[i].rxlen; } if (becfg.server == 1) { @@ -348,6 +351,7 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore, ts->proto = fes->proto; ts->family = fes->family; ts->txlen = fes->txlen; + ts->rxlen = fes->rxlen; tle_event_active(ts->erev, TLE_SEV_DOWN); if (fes->op == TXONLY || fes->op == FWD) { @@ -516,7 +520,12 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes) if (k == 0) return 0; - if (n == RTE_DIM(fes->pbuf.pkt)) { + /* Mark stream for reading if: + * ECHO: Buffer full + * RXTX: All outbound packets successfully dispatched + */ + if ((fes->op == ECHO && n == RTE_DIM(fes->pbuf.pkt)) || + (fes->op == RXTX && n - k == 0)) { /* mark stream as readable */ tle_event_active(fes->rxev, TLE_SEV_UP); fes->stat.rxev[TLE_SEV_UP]++; @@ -608,12 +617,14 @@ netfe_lcore_tcp(void) rc = 0; - if (fs[j]->op == RXTX) + if (fs[j]->op == ECHO) rc = netfe_rxtx_process_tcp(lcore, fs[j]); else if (fs[j]->op == FWD) rc = netfe_fwd_tcp(lcore, fs[j]); else if (fs[j]->op == TXONLY) rc = netfe_tx_process_tcp(lcore, fs[j]); + else if (fs[j]->op == RXTX) + rc = netfe_rxtx_process_tcp(lcore, fs[j]); /* we are ok to close the stream */ if (rc == 0 && fs[j]->posterr != 0) diff --git a/examples/l4fwd/udp.h b/examples/l4fwd/udp.h index cdec6a5..c079e9c 100644 --- a/examples/l4fwd/udp.h +++ b/examples/l4fwd/udp.h @@ -510,7 +510,7 @@ netfe_lcore_udp(void) NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n", __func__, lcore, fe->txeq, n); for (j = 0; j != n; j++) { - if (fs[j]->op == RXTX) + if (fs[j]->op == ECHO) netfe_rxtx_process_udp(lcore, fs[j]); else if (fs[j]->op == FWD) netfe_fwd_udp(lcore, fs[j]); diff --git a/lib/libtle_l4p/tcp_stream.c b/lib/libtle_l4p/tcp_stream.c index af65967..99791d0 100644 --- a/lib/libtle_l4p/tcp_stream.c +++ b/lib/libtle_l4p/tcp_stream.c @@ -604,3 +604,15 @@ tle_tcp_stream_update_cfg(struct tle_stream *ts[], return i; } + +int +tle_tcp_stream_get_mss(const struct tle_stream * stream) +{ + struct tle_tcp_stream *tcp; + + if (stream == NULL) + return -EINVAL; + + tcp = TCP_STREAM(stream); + return tcp->tcb.snd.mss; +} diff --git a/lib/libtle_l4p/tle_tcp.h b/lib/libtle_l4p/tle_tcp.h index ec89746..9086658 100644 --- a/lib/libtle_l4p/tle_tcp.h +++ b/lib/libtle_l4p/tle_tcp.h @@ -362,6 +362,16 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], */ int tle_tcp_process(struct tle_ctx *ctx, uint32_t num); +/** + * Get current TCP maximum segment size + * @param stream + * Stream to get MSS from. + * @return + * Maximum segment size in bytes, if successful. + * Negative on failure. + */ +int tle_tcp_stream_get_mss(const struct tle_stream * const stream); + #ifdef __cplusplus } #endif -- cgit 1.2.3-korg