From 6de3219c058877b87c5637096cd3f2025655ba39 Mon Sep 17 00:00:00 2001 From: Nikhil Rao Date: Mon, 2 Jul 2018 14:41:12 +0530 Subject: [PATCH] eventdev: move Rx adapter to separate function Create a separate function that handles eth receive and enqueue to event buffer. This function will also be called for interrupt driven receive queues. Signed-off-by: Nikhil Rao Acked-by: Jerin Jacob --- .../rte_event_eth_rx_adapter.c | 67 +++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c index 926f83ad2a..8fe037f16b 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c @@ -616,6 +616,45 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, } } +/* Enqueue packets from to event buffer */ +static inline uint32_t +rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, + uint16_t port_id, + uint16_t queue_id, + uint32_t rx_count, + uint32_t max_rx) +{ + struct rte_mbuf *mbufs[BATCH_SIZE]; + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; + struct rte_event_eth_rx_adapter_stats *stats = + &rx_adapter->stats; + uint16_t n; + uint32_t nb_rx = 0; + + /* Don't do a batch dequeue from the rx queue if there isn't + * enough space in the enqueue buffer. + */ + while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + stats->rx_poll_count++; + n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE); + if (unlikely(!n)) + break; + rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n); + nb_rx += n; + if (rx_count + nb_rx > max_rx) + break; + } + + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + return nb_rx; +} + /* * Polls receive queues added to the event adapter and enqueues received * packets to the event device. @@ -633,17 +672,16 @@ static inline void rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) { uint32_t num_queue; - uint16_t n; uint32_t nb_rx = 0; - struct rte_mbuf *mbufs[BATCH_SIZE]; struct rte_eth_event_enqueue_buffer *buf; uint32_t wrr_pos; uint32_t max_nb_rx; + struct rte_event_eth_rx_adapter_stats *stats; wrr_pos = rx_adapter->wrr_pos; max_nb_rx = rx_adapter->max_nb_rx; buf = &rx_adapter->event_enqueue_buffer; - struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + stats = &rx_adapter->stats; /* Iterate through a WRR sequence */ for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) { @@ -658,32 +696,21 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) rxa_flush_event_buffer(rx_adapter); if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { rx_adapter->wrr_pos = wrr_pos; - return; + break; } - stats->rx_poll_count++; - n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE); - - if (n) { - stats->rx_packets += n; - /* The check before rte_eth_rx_burst() ensures that - * all n mbufs can be buffered - */ - rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n); - nb_rx += n; - if (nb_rx > max_nb_rx) { - rx_adapter->wrr_pos = + nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx); + if (nb_rx > max_nb_rx) { + rx_adapter->wrr_pos = (wrr_pos + 1) % rx_adapter->wrr_len; - break; - } + break; } if (++wrr_pos == rx_adapter->wrr_len) wrr_pos = 0; } - if (buf->count >= BATCH_SIZE) - rxa_flush_event_buffer(rx_adapter); + stats->rx_packets += nb_rx; } static int -- 2.20.1