port: new Tx burst implementation of ring writer
authorMaciej Gajdzica <maciejx.t.gajdzica@intel.com>
Thu, 21 May 2015 12:28:42 +0000 (14:28 +0200)
committerThomas Monjalon <thomas.monjalon@6wind.com>
Thu, 18 Jun 2015 14:35:55 +0000 (16:35 +0200)
New implementation sends burst without copying data to internal buffer
if it is possible. It is similar to tx_bulk function in ethdev_writer
port.

Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
lib/librte_port/rte_port_ring.c

index fa3d77b..ba1fdcb 100644 (file)
@@ -101,6 +101,7 @@ struct rte_port_ring_writer {
        struct rte_ring *ring;
        uint32_t tx_burst_sz;
        uint32_t tx_buf_count;
+       uint64_t bsz_mask;
 };
 
 static void *
@@ -130,6 +131,7 @@ rte_port_ring_writer_create(void *params, int socket_id)
        port->ring = conf->ring;
        port->tx_burst_sz = conf->tx_burst_sz;
        port->tx_buf_count = 0;
+       port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
 
        return port;
 }
@@ -165,18 +167,27 @@ rte_port_ring_writer_tx_bulk(void *port,
                struct rte_mbuf **pkts,
                uint64_t pkts_mask)
 {
-       struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
+       struct rte_port_ring_writer *p =
+               (struct rte_port_ring_writer *) port;
+
+       uint32_t bsz_mask = p->bsz_mask;
+       uint32_t tx_buf_count = p->tx_buf_count;
+       uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+                       ((pkts_mask & bsz_mask) ^ bsz_mask);
 
-       if ((pkts_mask & (pkts_mask + 1)) == 0) {
+       if (expr == 0) {
                uint64_t n_pkts = __builtin_popcountll(pkts_mask);
-               uint32_t i;
+               uint32_t n_pkts_ok;
+
+               if (tx_buf_count)
+                       send_burst(p);
 
-               for (i = 0; i < n_pkts; i++) {
-                       struct rte_mbuf *pkt = pkts[i];
+               n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
 
-                       p->tx_buf[p->tx_buf_count++] = pkt;
-                       if (p->tx_buf_count >= p->tx_burst_sz)
-                               send_burst(p);
+               for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
+                       struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
+                       rte_pktmbuf_free(pkt);
                }
        } else {
                for ( ; pkts_mask; ) {
@@ -184,11 +195,13 @@ rte_port_ring_writer_tx_bulk(void *port,
                        uint64_t pkt_mask = 1LLU << pkt_index;
                        struct rte_mbuf *pkt = pkts[pkt_index];
 
-                       p->tx_buf[p->tx_buf_count++] = pkt;
-                       if (p->tx_buf_count >= p->tx_burst_sz)
-                               send_burst(p);
+                       p->tx_buf[tx_buf_count++] = pkt;
                        pkts_mask &= ~pkt_mask;
                }
+
+               p->tx_buf_count = tx_buf_count;
+               if (tx_buf_count >= p->tx_burst_sz)
+                       send_burst(p);
        }
 
        return 0;