ring: return free space when enqueuing
authorBruce Richardson <bruce.richardson@intel.com>
Wed, 29 Mar 2017 15:21:24 +0000 (16:21 +0100)
committerThomas Monjalon <thomas.monjalon@6wind.com>
Wed, 29 Mar 2017 20:32:04 +0000 (22:32 +0200)
Add an extra parameter to the ring enqueue burst/bulk functions so that
those functions can optionally return the amount of free space in the
ring. This information can be used by applications in a number of ways,
for instance, with single-producer queues, it provides a max
enqueue size which is guaranteed to work. It can also be used to
implement watermark functionality in apps, replacing the older
functionality with a more flexible version, which enables apps to
implement multiple watermark thresholds, rather than just one.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Reviewed-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
28 files changed:
doc/guides/rel_notes/release_17_05.rst
doc/guides/sample_app_ug/server_node_efd.rst
drivers/crypto/armv8/rte_armv8_pmd.c
drivers/crypto/kasumi/rte_kasumi_pmd.c
drivers/crypto/snow3g/rte_snow3g_pmd.c
drivers/crypto/zuc/rte_zuc_pmd.c
drivers/net/ring/rte_eth_ring.c
examples/distributor/main.c
examples/load_balancer/runtime.c
examples/multi_process/client_server_mp/mp_server/main.c
examples/packet_ordering/main.c
examples/performance-thread/l3fwd-thread/main.c
examples/qos_sched/app_thread.c
examples/server_node_efd/server/main.c
lib/librte_hash/rte_cuckoo_hash.c
lib/librte_mempool/rte_mempool_ring.c
lib/librte_pdump/rte_pdump.c
lib/librte_port/rte_port_ras.c
lib/librte_port/rte_port_ring.c
lib/librte_ring/rte_ring.h
test/test-pipeline/pipeline_hash.c
test/test-pipeline/runtime.c
test/test/test_link_bonding_mode4.c
test/test/test_pmd_ring_perf.c
test/test/test_ring.c
test/test/test_ring_perf.c
test/test/test_table_ports.c
test/test/virtual_pmd.c

index 6da26123f8bdb8a46787d3c2ff07fb3de1511849..b361a98717bd900a954d2b661b36ab0a65802889 100644 (file)
@@ -137,6 +137,9 @@ API Changes
   * removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
   * removed the function ``rte_ring_set_water_mark`` as part of a general
     removal of watermarks support in the library.
+  * added an extra parameter to the burst/bulk enqueue functions to
+    return the number of free spaces in the ring after enqueue. This can
+    be used by an application to implement its own watermark functionality.
   * changed the return value of the enqueue and dequeue bulk functions to
     match that of the burst equivalents. In all cases, ring functions which
     operate on multiple packets now return the number of elements enqueued
index e3a63c81637f59ea1910a4411b536e59c5a89c35..c2a5f20a15906ccbe2c9a536fd432521a46bc583 100644 (file)
@@ -286,7 +286,7 @@ repeated infinitely.
 
         cl = &nodes[node];
         if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
-                cl_rx_buf[node].count) != cl_rx_buf[node].count){
+                cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
             for (j = 0; j < cl_rx_buf[node].count; j++)
                 rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
             cl->stats.rx_drop += cl_rx_buf[node].count;
index d2b88a3e89bfd0426a5d9dfadbef9cdd07cb9428..37ecd7babf8286af1dfff07218b9cc706735c455 100644 (file)
@@ -739,13 +739,15 @@ armv8_crypto_pmd_enqueue_burst(void *queue_pair, struct rte_crypto_op **ops,
                        goto enqueue_err;
        }
 
-       retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i);
+       retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i,
+                       NULL);
        qp->stats.enqueued_count += retval;
 
        return retval;
 
 enqueue_err:
-       retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i);
+       retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i,
+                       NULL);
        if (ops[i] != NULL)
                ops[i]->status = RTE_CRYPTO_OP_STATUS_INVALID_ARGS;
 
index 234921e0ef1f5578b2a44b09e8337db5667d1c4b..1dd05cb84a1b5c33b2a3d35caa635eb411e28503 100644 (file)
@@ -359,7 +359,7 @@ process_ops(struct rte_crypto_op **ops, struct kasumi_session *session,
        }
 
        enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
-                               (void **)ops, processed_ops);
+                               (void **)ops, processed_ops, NULL);
        qp->qp_stats.enqueued_count += enqueued_ops;
        *accumulated_enqueued_ops += enqueued_ops;
 
@@ -410,7 +410,7 @@ process_op_bit(struct rte_crypto_op *op, struct kasumi_session *session,
        }
 
        enqueued_op = rte_ring_enqueue_burst(qp->processed_ops, (void **)&op,
-                               processed_op);
+                               processed_op, NULL);
        qp->qp_stats.enqueued_count += enqueued_op;
        *accumulated_enqueued_ops += enqueued_op;
 
index ca97271b86609e6de1b7349e4c31a6fbf336d7f1..01c4e1ca4b44005ff041aaf725f82a55e7a83037 100644 (file)
@@ -363,7 +363,7 @@ process_ops(struct rte_crypto_op **ops, struct snow3g_session *session,
        }
 
        enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
-                       (void **)ops, processed_ops);
+                       (void **)ops, processed_ops, NULL);
        qp->qp_stats.enqueued_count += enqueued_ops;
        *accumulated_enqueued_ops += enqueued_ops;
 
@@ -414,7 +414,7 @@ process_op_bit(struct rte_crypto_op *op, struct snow3g_session *session,
        }
 
        enqueued_op = rte_ring_enqueue_burst(qp->processed_ops,
-                       (void **)&op, processed_op);
+                       (void **)&op, processed_op, NULL);
        qp->qp_stats.enqueued_count += enqueued_op;
        *accumulated_enqueued_ops += enqueued_op;
 
index 6f9c06a0bcee5efa78b7ca4fc3b8882b9717ac07..5e2dbf56c052be8cd0491ab67a762fa6773f90ca 100644 (file)
@@ -339,7 +339,7 @@ process_ops(struct rte_crypto_op **ops, struct zuc_session *session,
        }
 
        enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
-                       (void **)ops, processed_ops);
+                       (void **)ops, processed_ops, NULL);
        qp->qp_stats.enqueued_count += enqueued_ops;
        *accumulated_enqueued_ops += enqueued_ops;
 
index 6f9cc1a6f5ada315296521769219787ef4997a8d..adbf478291cb7b8e05b565c60fe814adad70f12b 100644 (file)
@@ -102,7 +102,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
        void **ptrs = (void *)&bufs[0];
        struct ring_queue *r = q;
        const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
-                       ptrs, nb_bufs);
+                       ptrs, nb_bufs, NULL);
        if (r->rng->flags & RING_F_SP_ENQ) {
                r->tx_pkts.cnt += nb_tx;
                r->err_pkts.cnt += nb_bufs - nb_tx;
index 6aa8755ba73468245f5f8307440739f318bca7de..982b4fd7ea695b6bb0095cc3b486521840e92d97 100644 (file)
@@ -257,7 +257,7 @@ lcore_rx(struct lcore_params *p)
 
                struct rte_ring *tx_ring = p->dist_tx_ring;
                uint16_t sent = rte_ring_enqueue_burst(tx_ring,
-                               (void *)bufs, nb_ret);
+                               (void *)bufs, nb_ret, NULL);
 #else
                uint16_t nb_ret = nb_rx;
                /*
@@ -268,7 +268,7 @@ lcore_rx(struct lcore_params *p)
                /* struct rte_ring *out_ring = p->dist_tx_ring; */
 
                uint16_t sent = rte_ring_enqueue_burst(out_ring,
-                               (void *)bufs, nb_ret);
+                               (void *)bufs, nb_ret, NULL);
 #endif
 
                app_stats.rx.enqueued_pkts += sent;
@@ -350,7 +350,7 @@ lcore_distributor(struct lcore_params *p)
                        app_stats.dist.ret_pkts += nb_ret;
 
                        uint16_t sent = rte_ring_enqueue_burst(out_r,
-                                       (void *)bufs, nb_ret);
+                                       (void *)bufs, nb_ret, NULL);
                        app_stats.dist.sent_pkts += sent;
                        if (unlikely(sent < nb_ret)) {
                                app_stats.dist.enqdrop_pkts += nb_ret - sent;
index 82b10bc20d60dda5693ddc0582d4b3ef336e7076..1645994fcbb4e019e4a20eaf264f4f16e77caf43 100644 (file)
@@ -144,7 +144,8 @@ app_lcore_io_rx_buffer_to_send (
        ret = rte_ring_sp_enqueue_bulk(
                lp->rx.rings[worker],
                (void **) lp->rx.mbuf_out[worker].array,
-               bsz);
+               bsz,
+               NULL);
 
        if (unlikely(ret == 0)) {
                uint32_t k;
@@ -310,7 +311,8 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
                ret = rte_ring_sp_enqueue_bulk(
                        lp->rx.rings[worker],
                        (void **) lp->rx.mbuf_out[worker].array,
-                       lp->rx.mbuf_out[worker].n_mbufs);
+                       lp->rx.mbuf_out[worker].n_mbufs,
+                       NULL);
 
                if (unlikely(ret == 0)) {
                        uint32_t k;
@@ -553,7 +555,8 @@ app_lcore_worker(
                        ret = rte_ring_sp_enqueue_bulk(
                                lp->rings_out[port],
                                (void **) lp->mbuf_out[port].array,
-                               bsz_wr);
+                               bsz_wr,
+                               NULL);
 
 #if APP_STATS
                        lp->rings_out_iters[port] ++;
@@ -605,7 +608,8 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp)
                ret = rte_ring_sp_enqueue_bulk(
                        lp->rings_out[port],
                        (void **) lp->mbuf_out[port].array,
-                       lp->mbuf_out[port].n_mbufs);
+                       lp->mbuf_out[port].n_mbufs,
+                       NULL);
 
                if (unlikely(ret == 0)) {
                        uint32_t k;
index 19c95b2339d52483f31c4f4318f8cc39d046e19d..c2b0261d194d8f70b04bf8e3ea689f002fa68124 100644 (file)
@@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client)
 
        cl = &clients[client];
        if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
-                       cl_rx_buf[client].count) == 0){
+                       cl_rx_buf[client].count, NULL) == 0){
                for (j = 0; j < cl_rx_buf[client].count; j++)
                        rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
                cl->stats.rx_drop += cl_rx_buf[client].count;
index a448039d3d9addb36f8ce257a476f668432b22e3..569b6dadb0ba8e2c242ca3d7157971215afcf152 100644 (file)
@@ -421,8 +421,8 @@ rx_thread(struct rte_ring *ring_out)
                                        pkts[i++]->seqn = seqn++;
 
                                /* enqueue to rx_to_workers ring */
-                               ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
-                                                               nb_rx_pkts);
+                               ret = rte_ring_enqueue_burst(ring_out,
+                                               (void *)pkts, nb_rx_pkts, NULL);
                                app_stats.rx.enqueue_pkts += ret;
                                if (unlikely(ret < nb_rx_pkts)) {
                                        app_stats.rx.enqueue_failed_pkts +=
@@ -473,7 +473,8 @@ worker_thread(void *args_ptr)
                        burst_buffer[i++]->port ^= xor_val;
 
                /* enqueue the modified mbufs to workers_to_tx ring */
-               ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
+               ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer,
+                               burst_size, NULL);
                __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
                if (unlikely(ret < burst_size)) {
                        /* Return the mbufs to their respective pool, dropping packets */
index bf92582246614e5bd0932971ad9015f86a8c6259..b4c0df1881b86ecbf529abe2fda640e7470672ff 100644 (file)
@@ -2213,7 +2213,7 @@ lthread_rx(void *dummy)
                                ret = rte_ring_sp_enqueue_burst(
                                                rx_conf->ring[worker_id],
                                                (void **) pkts_burst,
-                                               nb_rx);
+                                               nb_rx, NULL);
 
                                new_len = old_len + ret;
 
@@ -2453,7 +2453,7 @@ pthread_rx(void *dummy)
                        SET_CPU_BUSY(rx_conf, CPU_PROCESS);
                        worker_id = (worker_id + 1) % rx_conf->n_ring;
                        n = rte_ring_sp_enqueue_burst(rx_conf->ring[worker_id],
-                                       (void **)pkts_burst, nb_rx);
+                                       (void **)pkts_burst, nb_rx, NULL);
 
                        if (unlikely(n != nb_rx)) {
                                uint32_t k;
index dab459449e20f49dcb291b47b66309bf3cf9c395..0c81a152c93a3898bd30c2e7faf92d40acd376df 100644 (file)
@@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs)
                        }
 
                        if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
-                                       (void **)rx_mbufs, nb_rx) == 0)) {
+                                       (void **)rx_mbufs, nb_rx, NULL) == 0)) {
                                for(i = 0; i < nb_rx; i++) {
                                        rte_pktmbuf_free(rx_mbufs[i]);
 
@@ -231,7 +231,7 @@ app_worker_thread(struct thread_conf **confs)
                                        burst_conf.qos_dequeue);
                if (likely(nb_pkt > 0))
                        while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
-                                       (void **)mbufs, nb_pkt) == 0)
+                                       (void **)mbufs, nb_pkt, NULL) == 0)
                                ; /* empty body */
 
                conf_idx++;
index 3eb7fac8d1ecc62a8a3d5721878892025f3bcf4d..597b4c250d5c6d2c2ae9b6b604cb36b01a3d7a04 100644 (file)
@@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node)
 
        cl = &nodes[node];
        if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
-                       cl_rx_buf[node].count) != cl_rx_buf[node].count){
+                       cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
                for (j = 0; j < cl_rx_buf[node].count; j++)
                        rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
                cl->stats.rx_drop += cl_rx_buf[node].count;
index 51db006a9613c1a6770444a9dce6d379294808ad..6552199f773498280a2260ddf014470193c6ebc6 100644 (file)
@@ -808,7 +808,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
                        /* Need to enqueue the free slots in global ring. */
                        n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
                                                cached_free_slots->objs,
-                                               LCORE_CACHE_SIZE);
+                                               LCORE_CACHE_SIZE, NULL);
                        cached_free_slots->len -= n_slots;
                }
                /* Put index of new free slot in cache. */
index 409b86054f0da6a9a9b945f565af919d4a3e776d..9b8fd2bd4f0841985d07ede92d85dbd797607a6c 100644 (file)
@@ -43,7 +43,7 @@ common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
                unsigned n)
 {
        return rte_ring_mp_enqueue_bulk(mp->pool_data,
-                       obj_table, n) == 0 ? -ENOBUFS : 0;
+                       obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static int
@@ -51,7 +51,7 @@ common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
                unsigned n)
 {
        return rte_ring_sp_enqueue_bulk(mp->pool_data,
-                       obj_table, n) == 0 ? -ENOBUFS : 0;
+                       obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static int
index cc0b5b19db2fb448d285928b0a6154dd503cfd7e..b599d65d62de133dec050ac59162a47ef2165395 100644 (file)
@@ -197,7 +197,7 @@ pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
                        dup_bufs[d_pkts++] = p;
        }
 
-       ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
+       ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL);
        if (unlikely(ring_enq < d_pkts)) {
                RTE_LOG(DEBUG, PDUMP,
                        "only %d of packets enqueued to ring\n", ring_enq);
index c4bb5081331a25d6787a3dc7c9d9c0c066b8aaef..4de0945eae4fb6e4c0ef310722be009a3304daf5 100644 (file)
@@ -167,7 +167,7 @@ send_burst(struct rte_port_ring_writer_ras *p)
        uint32_t nb_tx;
 
        nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-                       p->tx_buf_count);
+                       p->tx_buf_count, NULL);
 
        RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
        for ( ; nb_tx < p->tx_buf_count; nb_tx++)
index 0df1bcf0b5314789453ed8bc7ecd75e5d4571b34..c5dbe07e7a26e39200a452d557ff71195c1ce467 100644 (file)
@@ -241,7 +241,7 @@ send_burst(struct rte_port_ring_writer *p)
        uint32_t nb_tx;
 
        nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-                       p->tx_buf_count);
+                       p->tx_buf_count, NULL);
 
        RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
        for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -256,7 +256,7 @@ send_burst_mp(struct rte_port_ring_writer *p)
        uint32_t nb_tx;
 
        nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
-                       p->tx_buf_count);
+                       p->tx_buf_count, NULL);
 
        RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
        for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -318,11 +318,11 @@ rte_port_ring_writer_tx_bulk_internal(void *port,
 
                RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
                if (is_multi)
-                       n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
-                               n_pkts);
+                       n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
+                                       (void **)pkts, n_pkts, NULL);
                else
-                       n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
-                               n_pkts);
+                       n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
+                                       (void **)pkts, n_pkts, NULL);
 
                RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
                for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
@@ -517,7 +517,7 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
        uint32_t nb_tx = 0, i;
 
        nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-                               p->tx_buf_count);
+                               p->tx_buf_count, NULL);
 
        /* We sent all the packets in a first try */
        if (nb_tx >= p->tx_buf_count) {
@@ -527,7 +527,8 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
 
        for (i = 0; i < p->n_retries; i++) {
                nb_tx += rte_ring_sp_enqueue_burst(p->ring,
-                               (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+                               (void **) (p->tx_buf + nb_tx),
+                               p->tx_buf_count - nb_tx, NULL);
 
                /* We sent all the packets in more than one try */
                if (nb_tx >= p->tx_buf_count) {
@@ -550,7 +551,7 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
        uint32_t nb_tx = 0, i;
 
        nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
-                               p->tx_buf_count);
+                               p->tx_buf_count, NULL);
 
        /* We sent all the packets in a first try */
        if (nb_tx >= p->tx_buf_count) {
@@ -560,7 +561,8 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
 
        for (i = 0; i < p->n_retries; i++) {
                nb_tx += rte_ring_mp_enqueue_burst(p->ring,
-                               (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+                               (void **) (p->tx_buf + nb_tx),
+                               p->tx_buf_count - nb_tx, NULL);
 
                /* We sent all the packets in more than one try */
                if (nb_tx >= p->tx_buf_count) {
@@ -633,10 +635,12 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
                RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
                if (is_multi)
                        n_pkts_ok =
-                               rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+                               rte_ring_mp_enqueue_burst(p->ring,
+                                               (void **)pkts, n_pkts, NULL);
                else
                        n_pkts_ok =
-                               rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+                               rte_ring_sp_enqueue_burst(p->ring,
+                                               (void **)pkts, n_pkts, NULL);
 
                if (n_pkts_ok >= n_pkts)
                        return 0;
index 34b438c1c1be3224be6c7023c081260a9f840695..439698be66b5c5def13c6ee6ec7360a2c9ae5243 100644 (file)
@@ -354,20 +354,16 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
  */
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
+                        unsigned int n, enum rte_ring_queue_behavior behavior,
+                        unsigned int *free_space)
 {
        uint32_t prod_head, prod_next;
        uint32_t cons_tail, free_entries;
-       const unsigned max = n;
+       const unsigned int max = n;
        int success;
        unsigned int i;
        uint32_t mask = r->mask;
 
-       /* Avoid the unnecessary cmpset operation below, which is also
-        * potentially harmful when n equals 0. */
-       if (n == 0)
-               return 0;
-
        /* move prod.head atomically */
        do {
                /* Reset n to the initial burst count */
@@ -382,16 +378,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
                free_entries = (mask + cons_tail - prod_head);
 
                /* check that we have enough room in ring */
-               if (unlikely(n > free_entries)) {
-                       if (behavior == RTE_RING_QUEUE_FIXED)
-                               return 0;
-                       else {
-                               /* No free entry available */
-                               if (unlikely(free_entries == 0))
-                                       return 0;
-                               n = free_entries;
-                       }
-               }
+               if (unlikely(n > free_entries))
+                       n = (behavior == RTE_RING_QUEUE_FIXED) ?
+                                       0 : free_entries;
+
+               if (n == 0)
+                       goto end;
 
                prod_next = prod_head + n;
                success = rte_atomic32_cmpset(&r->prod.head, prod_head,
@@ -410,6 +402,9 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
                rte_pause();
 
        r->prod.tail = prod_next;
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
        return n;
 }
 
@@ -431,7 +426,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
+                        unsigned int n, enum rte_ring_queue_behavior behavior,
+                        unsigned int *free_space)
 {
        uint32_t prod_head, cons_tail;
        uint32_t prod_next, free_entries;
@@ -447,16 +443,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        free_entries = mask + cons_tail - prod_head;
 
        /* check that we have enough room in ring */
-       if (unlikely(n > free_entries)) {
-               if (behavior == RTE_RING_QUEUE_FIXED)
-                       return 0;
-               else {
-                       /* No free entry available */
-                       if (unlikely(free_entries == 0))
-                               return 0;
-                       n = free_entries;
-               }
-       }
+       if (unlikely(n > free_entries))
+               n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries;
+
+       if (n == 0)
+               goto end;
+
 
        prod_next = prod_head + n;
        r->prod.head = prod_next;
@@ -466,6 +458,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        rte_smp_wmb();
 
        r->prod.tail = prod_next;
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
        return n;
 }
 
@@ -620,14 +615,18 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   The number of objects enqueued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       free_space);
 }
 
 /**
@@ -639,14 +638,18 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   The number of objects enqueued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       free_space);
 }
 
 /**
@@ -662,17 +665,20 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   The number of objects enqueued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                     unsigned n)
+                     unsigned int n, unsigned int *free_space)
 {
        if (r->prod.single)
-               return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+               return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
        else
-               return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+               return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
 }
 
 /**
@@ -692,7 +698,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 {
-       return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+       return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -709,7 +715,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 {
-       return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+       return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -730,7 +736,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_enqueue(struct rte_ring *r, void *obj)
 {
-       return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+       return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -971,14 +977,18 @@ struct rte_ring *rte_ring_lookup(const char *name);
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_mp_do_enqueue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -990,14 +1000,18 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_sp_do_enqueue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -1013,17 +1027,20 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                     unsigned n)
+                     unsigned int n, unsigned int *free_space)
 {
        if (r->prod.single)
-               return rte_ring_sp_enqueue_burst(r, obj_table, n);
+               return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
        else
-               return rte_ring_mp_enqueue_burst(r, obj_table, n);
+               return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
 }
 
 /**
index 1ac0aa84145dbcbe9c8a8149be359b52518206e1..0c6e04f7883ab078a0d90b8ebd510e3bf02df413 100644 (file)
@@ -546,7 +546,8 @@ app_main_loop_rx_metadata(void) {
                        ret = rte_ring_sp_enqueue_bulk(
                                app.rings_rx[i],
                                (void **) app.mbuf_rx.array,
-                               n_mbufs);
+                               n_mbufs,
+                               NULL);
                } while (ret == 0);
        }
 }
index 4e20669298df8f3963a23431ab8a679fa85b17a7..c06ff54cd30baa4e82ba9bc8875e978884ec801a 100644 (file)
@@ -97,7 +97,7 @@ app_main_loop_rx(void) {
                        ret = rte_ring_sp_enqueue_bulk(
                                app.rings_rx[i],
                                (void **) app.mbuf_rx.array,
-                               n_mbufs);
+                               n_mbufs, NULL);
                } while (ret == 0);
        }
 }
@@ -130,7 +130,8 @@ app_main_loop_worker(void) {
                        ret = rte_ring_sp_enqueue_bulk(
                                app.rings_tx[i ^ 1],
                                (void **) worker_mbuf->array,
-                               app.burst_size_worker_write);
+                               app.burst_size_worker_write,
+                               NULL);
                } while (ret == 0);
        }
 }
index 53caa3e980877c90c344fab65dc58e5cadcd61dc..8df28b4f152abf5be9bf1a5677ca78f8fdd7b41b 100644 (file)
@@ -206,7 +206,8 @@ slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
 static int
 slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
 {
-       return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size);
+       return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf,
+                       size, NULL);
 }
 
 static uint16_t
index af011f7db5c415e0a340ba7836f31de3327f64dc..045a7f255af86ed8efbf676dabff479e360ffcb6 100644 (file)
@@ -98,7 +98,7 @@ test_single_enqueue_dequeue(void)
        const uint64_t sc_start = rte_rdtsc_precise();
        rte_compiler_barrier();
        for (i = 0; i < iterations; i++) {
-               rte_ring_enqueue_bulk(r, &burst, 1);
+               rte_ring_enqueue_bulk(r, &burst, 1, NULL);
                rte_ring_dequeue_bulk(r, &burst, 1);
        }
        const uint64_t sc_end = rte_rdtsc_precise();
@@ -131,7 +131,8 @@ test_bulk_enqueue_dequeue(void)
        for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
                const uint64_t sc_start = rte_rdtsc();
                for (i = 0; i < iterations; i++) {
-                       rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]);
+                       rte_ring_sp_enqueue_bulk(r, (void *)burst,
+                                       bulk_sizes[sz], NULL);
                        rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]);
                }
                const uint64_t sc_end = rte_rdtsc();
index 112433b838d2571108103953fc9963ededa66fa7..b0ca88b65b92c71bbf87290aaad600a1a2da4112 100644 (file)
@@ -117,11 +117,12 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
                rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
                printf("%s: iteration %u, random shift: %u;\n",
                    __func__, i, rand);
-               TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0);
+               TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand,
+                               NULL) != 0);
                TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
 
                /* fill the ring */
-               TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
+               TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz, NULL) != 0);
                TEST_RING_VERIFY(0 == rte_ring_free_count(r));
                TEST_RING_VERIFY(rsz == rte_ring_count(r));
                TEST_RING_VERIFY(rte_ring_full(r));
@@ -167,19 +168,19 @@ test_ring_basic(void)
        cur_dst = dst;
 
        printf("enqueue 1 obj\n");
-       ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
+       ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL);
        cur_src += 1;
        if (ret == 0)
                goto fail;
 
        printf("enqueue 2 objs\n");
-       ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
+       ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL);
        cur_src += 2;
        if (ret == 0)
                goto fail;
 
        printf("enqueue MAX_BULK objs\n");
-       ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
+       ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK;
        if (ret == 0)
                goto fail;
@@ -213,19 +214,19 @@ test_ring_basic(void)
        cur_dst = dst;
 
        printf("enqueue 1 obj\n");
-       ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
+       ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL);
        cur_src += 1;
        if (ret == 0)
                goto fail;
 
        printf("enqueue 2 objs\n");
-       ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
+       ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL);
        cur_src += 2;
        if (ret == 0)
                goto fail;
 
        printf("enqueue MAX_BULK objs\n");
-       ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+       ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK;
        if (ret == 0)
                goto fail;
@@ -260,7 +261,7 @@ test_ring_basic(void)
 
        printf("fill and empty the ring\n");
        for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
-               ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+               ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
                cur_src += MAX_BULK;
                if (ret == 0)
                        goto fail;
@@ -290,13 +291,13 @@ test_ring_basic(void)
        cur_src = src;
        cur_dst = dst;
 
-       ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+       ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
        cur_src += num_elems;
        if (ret == 0) {
                printf("Cannot enqueue\n");
                goto fail;
        }
-       ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+       ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
        cur_src += num_elems;
        if (ret == 0) {
                printf("Cannot enqueue\n");
@@ -371,19 +372,19 @@ test_ring_burst_basic(void)
 
        printf("Test SP & SC basic functions \n");
        printf("enqueue 1 obj\n");
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, 1, NULL);
        cur_src += 1;
        if ((ret & RTE_RING_SZ_MASK) != 1)
                goto fail;
 
        printf("enqueue 2 objs\n");
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
        cur_src += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
        printf("enqueue MAX_BULK objs\n");
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK;
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                goto fail;
@@ -419,7 +420,7 @@ test_ring_burst_basic(void)
 
        printf("Test enqueue without enough memory space \n");
        for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
-               ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+               ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
                cur_src += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
                        goto fail;
@@ -427,14 +428,14 @@ test_ring_burst_basic(void)
        }
 
        printf("Enqueue 2 objects, free entries = MAX_BULK - 2  \n");
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
        cur_src += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
        printf("Enqueue the remaining entries = MAX_BULK - 2  \n");
        /* Always one free entry left */
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK - 3;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
                goto fail;
@@ -444,7 +445,7 @@ test_ring_burst_basic(void)
                goto fail;
 
        printf("Test enqueue for a full entry  \n");
-       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+       ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
        if ((ret & RTE_RING_SZ_MASK) != 0)
                goto fail;
 
@@ -486,19 +487,19 @@ test_ring_burst_basic(void)
        printf("Test MP & MC basic functions \n");
 
        printf("enqueue 1 obj\n");
-       ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
+       ret = rte_ring_mp_enqueue_burst(r, cur_src, 1, NULL);
        cur_src += 1;
        if ((ret & RTE_RING_SZ_MASK) != 1)
                goto fail;
 
        printf("enqueue 2 objs\n");
-       ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+       ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
        cur_src += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
        printf("enqueue MAX_BULK objs\n");
-       ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+       ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                goto fail;
@@ -534,7 +535,7 @@ test_ring_burst_basic(void)
 
        printf("fill and empty the ring\n");
        for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
-               ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+               ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
                cur_src += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
@@ -557,19 +558,19 @@ test_ring_burst_basic(void)
 
        printf("Test enqueue without enough memory space \n");
        for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
-               ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+               ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
                cur_src += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
        }
 
        /* Available memory space for the exact MAX_BULK objects */
-       ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+       ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
        cur_src += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
-       ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+       ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
        cur_src += MAX_BULK - 3;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
                goto fail;
@@ -607,7 +608,7 @@ test_ring_burst_basic(void)
 
        printf("Covering rte_ring_enqueue_burst functions \n");
 
-       ret = rte_ring_enqueue_burst(r, cur_src, 2);
+       ret = rte_ring_enqueue_burst(r, cur_src, 2, NULL);
        cur_src += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
@@ -746,7 +747,7 @@ test_ring_basic_ex(void)
        }
 
        /* Covering the ring burst operation */
-       ret = rte_ring_enqueue_burst(rp, obj, 2);
+       ret = rte_ring_enqueue_burst(rp, obj, 2, NULL);
        if ((ret & RTE_RING_SZ_MASK) != 2) {
                printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
                goto fail_test;
index 8ccbdeff75c6c346bc2a64f4065b7677a3e9a2b8..f95a8e9a4e0a7c4c65f2b3e629e79838528c7d4d 100644 (file)
@@ -195,13 +195,13 @@ enqueue_bulk(void *p)
 
        const uint64_t sp_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
+               while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
                        rte_pause();
        const uint64_t sp_end = rte_rdtsc();
 
        const uint64_t mp_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
+               while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
                        rte_pause();
        const uint64_t mp_end = rte_rdtsc();
 
@@ -323,14 +323,16 @@ test_burst_enqueue_dequeue(void)
        for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
                const uint64_t sc_start = rte_rdtsc();
                for (i = 0; i < iterations; i++) {
-                       rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
+                       rte_ring_sp_enqueue_burst(r, burst,
+                                       bulk_sizes[sz], NULL);
                        rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
                }
                const uint64_t sc_end = rte_rdtsc();
 
                const uint64_t mc_start = rte_rdtsc();
                for (i = 0; i < iterations; i++) {
-                       rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
+                       rte_ring_mp_enqueue_burst(r, burst,
+                                       bulk_sizes[sz], NULL);
                        rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
                }
                const uint64_t mc_end = rte_rdtsc();
@@ -357,14 +359,16 @@ test_bulk_enqueue_dequeue(void)
        for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
                const uint64_t sc_start = rte_rdtsc();
                for (i = 0; i < iterations; i++) {
-                       rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+                       rte_ring_sp_enqueue_bulk(r, burst,
+                                       bulk_sizes[sz], NULL);
                        rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
                }
                const uint64_t sc_end = rte_rdtsc();
 
                const uint64_t mc_start = rte_rdtsc();
                for (i = 0; i < iterations; i++) {
-                       rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+                       rte_ring_mp_enqueue_bulk(r, burst,
+                                       bulk_sizes[sz], NULL);
                        rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
                }
                const uint64_t mc_end = rte_rdtsc();
index 25323677891323b0365d7108cbf4f1e747a3e2a8..395f4f352ed1dab3778323018ecd2eaf130379d5 100644 (file)
@@ -80,7 +80,7 @@ test_port_ring_reader(void)
        mbuf[0] = (void *)rte_pktmbuf_alloc(pool);
 
        expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
-               mbuf, 1);
+               mbuf, 1, NULL);
        received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf, 1);
 
        if (received_pkts < expected_pkts)
@@ -93,7 +93,7 @@ test_port_ring_reader(void)
                mbuf[i] = rte_pktmbuf_alloc(pool);
 
        expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
-               (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX);
+               (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX, NULL);
        received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf,
                RTE_PORT_IN_BURST_SIZE_MAX);
 
index 6e4dcd8f9656de4d823dd028b52755f1e12f49b7..39e070c453be6c73009d52209e94832c9622a3cb 100644 (file)
@@ -380,7 +380,7 @@ virtual_ethdev_tx_burst_success(void *queue, struct rte_mbuf **bufs,
                nb_pkts = 0;
        else
                nb_pkts = rte_ring_enqueue_burst(dev_private->tx_queue, (void **)bufs,
-                               nb_pkts);
+                               nb_pkts, NULL);
 
        /* increment opacket count */
        dev_private->eth_stats.opackets += nb_pkts;
@@ -496,7 +496,7 @@ virtual_ethdev_add_mbufs_to_rx_queue(uint8_t port_id,
                        vrtl_eth_dev->data->dev_private;
 
        return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst,
-                       burst_length);
+                       burst_length, NULL);
 }
 
 int