From: Bruce Richardson Date: Wed, 29 Mar 2017 15:21:24 +0000 (+0100) Subject: ring: return free space when enqueuing X-Git-Tag: spdx-start~4367 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=14fbffb0aac971fd96718d292701645779597a7a;p=dpdk.git ring: return free space when enqueuing Add an extra parameter to the ring enqueue burst/bulk functions so that those functions can optionally return the amount of free space in the ring. This information can be used by applications in a number of ways, for instance, with single-producer queues, it provides a max enqueue size which is guaranteed to work. It can also be used to implement watermark functionality in apps, replacing the older functionality with a more flexible version, which enables apps to implement multiple watermark thresholds, rather than just one. Signed-off-by: Bruce Richardson Reviewed-by: Yuanhan Liu Acked-by: Olivier Matz --- diff --git a/doc/guides/rel_notes/release_17_05.rst b/doc/guides/rel_notes/release_17_05.rst index 6da26123f8..b361a98717 100644 --- a/doc/guides/rel_notes/release_17_05.rst +++ b/doc/guides/rel_notes/release_17_05.rst @@ -137,6 +137,9 @@ API Changes * removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT`` * removed the function ``rte_ring_set_water_mark`` as part of a general removal of watermarks support in the library. + * added an extra parameter to the burst/bulk enqueue functions to + return the number of free spaces in the ring after enqueue. This can + be used by an application to implement its own watermark functionality. * changed the return value of the enqueue and dequeue bulk functions to match that of the burst equivalents. In all cases, ring functions which operate on multiple packets now return the number of elements enqueued diff --git a/doc/guides/sample_app_ug/server_node_efd.rst b/doc/guides/sample_app_ug/server_node_efd.rst index e3a63c8163..c2a5f20a15 100644 --- a/doc/guides/sample_app_ug/server_node_efd.rst +++ b/doc/guides/sample_app_ug/server_node_efd.rst @@ -286,7 +286,7 @@ repeated infinitely. cl = &nodes[node]; if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer, - cl_rx_buf[node].count) != cl_rx_buf[node].count){ + cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){ for (j = 0; j < cl_rx_buf[node].count; j++) rte_pktmbuf_free(cl_rx_buf[node].buffer[j]); cl->stats.rx_drop += cl_rx_buf[node].count; diff --git a/drivers/crypto/armv8/rte_armv8_pmd.c b/drivers/crypto/armv8/rte_armv8_pmd.c index d2b88a3e89..37ecd7babf 100644 --- a/drivers/crypto/armv8/rte_armv8_pmd.c +++ b/drivers/crypto/armv8/rte_armv8_pmd.c @@ -739,13 +739,15 @@ armv8_crypto_pmd_enqueue_burst(void *queue_pair, struct rte_crypto_op **ops, goto enqueue_err; } - retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i); + retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i, + NULL); qp->stats.enqueued_count += retval; return retval; enqueue_err: - retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i); + retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i, + NULL); if (ops[i] != NULL) ops[i]->status = RTE_CRYPTO_OP_STATUS_INVALID_ARGS; diff --git a/drivers/crypto/kasumi/rte_kasumi_pmd.c b/drivers/crypto/kasumi/rte_kasumi_pmd.c index 234921e0ef..1dd05cb84a 100644 --- a/drivers/crypto/kasumi/rte_kasumi_pmd.c +++ b/drivers/crypto/kasumi/rte_kasumi_pmd.c @@ -359,7 +359,7 @@ process_ops(struct rte_crypto_op **ops, struct kasumi_session *session, } enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops, - (void **)ops, processed_ops); + (void **)ops, processed_ops, NULL); qp->qp_stats.enqueued_count += enqueued_ops; *accumulated_enqueued_ops += enqueued_ops; @@ -410,7 +410,7 @@ process_op_bit(struct rte_crypto_op *op, struct kasumi_session *session, } enqueued_op = rte_ring_enqueue_burst(qp->processed_ops, (void **)&op, - processed_op); + processed_op, NULL); qp->qp_stats.enqueued_count += enqueued_op; *accumulated_enqueued_ops += enqueued_op; diff --git a/drivers/crypto/snow3g/rte_snow3g_pmd.c b/drivers/crypto/snow3g/rte_snow3g_pmd.c index ca97271b86..01c4e1ca4b 100644 --- a/drivers/crypto/snow3g/rte_snow3g_pmd.c +++ b/drivers/crypto/snow3g/rte_snow3g_pmd.c @@ -363,7 +363,7 @@ process_ops(struct rte_crypto_op **ops, struct snow3g_session *session, } enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops, - (void **)ops, processed_ops); + (void **)ops, processed_ops, NULL); qp->qp_stats.enqueued_count += enqueued_ops; *accumulated_enqueued_ops += enqueued_ops; @@ -414,7 +414,7 @@ process_op_bit(struct rte_crypto_op *op, struct snow3g_session *session, } enqueued_op = rte_ring_enqueue_burst(qp->processed_ops, - (void **)&op, processed_op); + (void **)&op, processed_op, NULL); qp->qp_stats.enqueued_count += enqueued_op; *accumulated_enqueued_ops += enqueued_op; diff --git a/drivers/crypto/zuc/rte_zuc_pmd.c b/drivers/crypto/zuc/rte_zuc_pmd.c index 6f9c06a0bc..5e2dbf56c0 100644 --- a/drivers/crypto/zuc/rte_zuc_pmd.c +++ b/drivers/crypto/zuc/rte_zuc_pmd.c @@ -339,7 +339,7 @@ process_ops(struct rte_crypto_op **ops, struct zuc_session *session, } enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops, - (void **)ops, processed_ops); + (void **)ops, processed_ops, NULL); qp->qp_stats.enqueued_count += enqueued_ops; *accumulated_enqueued_ops += enqueued_ops; diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c index 6f9cc1a6f5..adbf478291 100644 --- a/drivers/net/ring/rte_eth_ring.c +++ b/drivers/net/ring/rte_eth_ring.c @@ -102,7 +102,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) void **ptrs = (void *)&bufs[0]; struct ring_queue *r = q; const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng, - ptrs, nb_bufs); + ptrs, nb_bufs, NULL); if (r->rng->flags & RING_F_SP_ENQ) { r->tx_pkts.cnt += nb_tx; r->err_pkts.cnt += nb_bufs - nb_tx; diff --git a/examples/distributor/main.c b/examples/distributor/main.c index 6aa8755ba7..982b4fd7ea 100644 --- a/examples/distributor/main.c +++ b/examples/distributor/main.c @@ -257,7 +257,7 @@ lcore_rx(struct lcore_params *p) struct rte_ring *tx_ring = p->dist_tx_ring; uint16_t sent = rte_ring_enqueue_burst(tx_ring, - (void *)bufs, nb_ret); + (void *)bufs, nb_ret, NULL); #else uint16_t nb_ret = nb_rx; /* @@ -268,7 +268,7 @@ lcore_rx(struct lcore_params *p) /* struct rte_ring *out_ring = p->dist_tx_ring; */ uint16_t sent = rte_ring_enqueue_burst(out_ring, - (void *)bufs, nb_ret); + (void *)bufs, nb_ret, NULL); #endif app_stats.rx.enqueued_pkts += sent; @@ -350,7 +350,7 @@ lcore_distributor(struct lcore_params *p) app_stats.dist.ret_pkts += nb_ret; uint16_t sent = rte_ring_enqueue_burst(out_r, - (void *)bufs, nb_ret); + (void *)bufs, nb_ret, NULL); app_stats.dist.sent_pkts += sent; if (unlikely(sent < nb_ret)) { app_stats.dist.enqdrop_pkts += nb_ret - sent; diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c index 82b10bc20d..1645994fcb 100644 --- a/examples/load_balancer/runtime.c +++ b/examples/load_balancer/runtime.c @@ -144,7 +144,8 @@ app_lcore_io_rx_buffer_to_send ( ret = rte_ring_sp_enqueue_bulk( lp->rx.rings[worker], (void **) lp->rx.mbuf_out[worker].array, - bsz); + bsz, + NULL); if (unlikely(ret == 0)) { uint32_t k; @@ -310,7 +311,8 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers) ret = rte_ring_sp_enqueue_bulk( lp->rx.rings[worker], (void **) lp->rx.mbuf_out[worker].array, - lp->rx.mbuf_out[worker].n_mbufs); + lp->rx.mbuf_out[worker].n_mbufs, + NULL); if (unlikely(ret == 0)) { uint32_t k; @@ -553,7 +555,8 @@ app_lcore_worker( ret = rte_ring_sp_enqueue_bulk( lp->rings_out[port], (void **) lp->mbuf_out[port].array, - bsz_wr); + bsz_wr, + NULL); #if APP_STATS lp->rings_out_iters[port] ++; @@ -605,7 +608,8 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp) ret = rte_ring_sp_enqueue_bulk( lp->rings_out[port], (void **) lp->mbuf_out[port].array, - lp->mbuf_out[port].n_mbufs); + lp->mbuf_out[port].n_mbufs, + NULL); if (unlikely(ret == 0)) { uint32_t k; diff --git a/examples/multi_process/client_server_mp/mp_server/main.c b/examples/multi_process/client_server_mp/mp_server/main.c index 19c95b2339..c2b0261d19 100644 --- a/examples/multi_process/client_server_mp/mp_server/main.c +++ b/examples/multi_process/client_server_mp/mp_server/main.c @@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client) cl = &clients[client]; if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer, - cl_rx_buf[client].count) == 0){ + cl_rx_buf[client].count, NULL) == 0){ for (j = 0; j < cl_rx_buf[client].count; j++) rte_pktmbuf_free(cl_rx_buf[client].buffer[j]); cl->stats.rx_drop += cl_rx_buf[client].count; diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c index a448039d3d..569b6dadb0 100644 --- a/examples/packet_ordering/main.c +++ b/examples/packet_ordering/main.c @@ -421,8 +421,8 @@ rx_thread(struct rte_ring *ring_out) pkts[i++]->seqn = seqn++; /* enqueue to rx_to_workers ring */ - ret = rte_ring_enqueue_burst(ring_out, (void *) pkts, - nb_rx_pkts); + ret = rte_ring_enqueue_burst(ring_out, + (void *)pkts, nb_rx_pkts, NULL); app_stats.rx.enqueue_pkts += ret; if (unlikely(ret < nb_rx_pkts)) { app_stats.rx.enqueue_failed_pkts += @@ -473,7 +473,8 @@ worker_thread(void *args_ptr) burst_buffer[i++]->port ^= xor_val; /* enqueue the modified mbufs to workers_to_tx ring */ - ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size); + ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, + burst_size, NULL); __sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret); if (unlikely(ret < burst_size)) { /* Return the mbufs to their respective pool, dropping packets */ diff --git a/examples/performance-thread/l3fwd-thread/main.c b/examples/performance-thread/l3fwd-thread/main.c index bf92582246..b4c0df1881 100644 --- a/examples/performance-thread/l3fwd-thread/main.c +++ b/examples/performance-thread/l3fwd-thread/main.c @@ -2213,7 +2213,7 @@ lthread_rx(void *dummy) ret = rte_ring_sp_enqueue_burst( rx_conf->ring[worker_id], (void **) pkts_burst, - nb_rx); + nb_rx, NULL); new_len = old_len + ret; @@ -2453,7 +2453,7 @@ pthread_rx(void *dummy) SET_CPU_BUSY(rx_conf, CPU_PROCESS); worker_id = (worker_id + 1) % rx_conf->n_ring; n = rte_ring_sp_enqueue_burst(rx_conf->ring[worker_id], - (void **)pkts_burst, nb_rx); + (void **)pkts_burst, nb_rx, NULL); if (unlikely(n != nb_rx)) { uint32_t k; diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c index dab459449e..0c81a152c9 100644 --- a/examples/qos_sched/app_thread.c +++ b/examples/qos_sched/app_thread.c @@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs) } if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring, - (void **)rx_mbufs, nb_rx) == 0)) { + (void **)rx_mbufs, nb_rx, NULL) == 0)) { for(i = 0; i < nb_rx; i++) { rte_pktmbuf_free(rx_mbufs[i]); @@ -231,7 +231,7 @@ app_worker_thread(struct thread_conf **confs) burst_conf.qos_dequeue); if (likely(nb_pkt > 0)) while (rte_ring_sp_enqueue_bulk(conf->tx_ring, - (void **)mbufs, nb_pkt) == 0) + (void **)mbufs, nb_pkt, NULL) == 0) ; /* empty body */ conf_idx++; diff --git a/examples/server_node_efd/server/main.c b/examples/server_node_efd/server/main.c index 3eb7fac8d1..597b4c250d 100644 --- a/examples/server_node_efd/server/main.c +++ b/examples/server_node_efd/server/main.c @@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node) cl = &nodes[node]; if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer, - cl_rx_buf[node].count) != cl_rx_buf[node].count){ + cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){ for (j = 0; j < cl_rx_buf[node].count; j++) rte_pktmbuf_free(cl_rx_buf[node].buffer[j]); cl->stats.rx_drop += cl_rx_buf[node].count; diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 51db006a96..6552199f77 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -808,7 +808,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) /* Need to enqueue the free slots in global ring. */ n_slots = rte_ring_mp_enqueue_burst(h->free_slots, cached_free_slots->objs, - LCORE_CACHE_SIZE); + LCORE_CACHE_SIZE, NULL); cached_free_slots->len -= n_slots; } /* Put index of new free slot in cache. */ diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c index 409b86054f..9b8fd2bd4f 100644 --- a/lib/librte_mempool/rte_mempool_ring.c +++ b/lib/librte_mempool/rte_mempool_ring.c @@ -43,7 +43,7 @@ common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table, unsigned n) { return rte_ring_mp_enqueue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static int @@ -51,7 +51,7 @@ common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table, unsigned n) { return rte_ring_sp_enqueue_bulk(mp->pool_data, - obj_table, n) == 0 ? -ENOBUFS : 0; + obj_table, n, NULL) == 0 ? -ENOBUFS : 0; } static int diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index cc0b5b19db..b599d65d62 100644 --- a/lib/librte_pdump/rte_pdump.c +++ b/lib/librte_pdump/rte_pdump.c @@ -197,7 +197,7 @@ pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params) dup_bufs[d_pkts++] = p; } - ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts); + ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL); if (unlikely(ring_enq < d_pkts)) { RTE_LOG(DEBUG, PDUMP, "only %d of packets enqueued to ring\n", ring_enq); diff --git a/lib/librte_port/rte_port_ras.c b/lib/librte_port/rte_port_ras.c index c4bb508133..4de0945eae 100644 --- a/lib/librte_port/rte_port_ras.c +++ b/lib/librte_port/rte_port_ras.c @@ -167,7 +167,7 @@ send_burst(struct rte_port_ring_writer_ras *p) uint32_t nb_tx; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 0df1bcf0b5..c5dbe07e7a 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -241,7 +241,7 @@ send_burst(struct rte_port_ring_writer *p) uint32_t nb_tx; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) @@ -256,7 +256,7 @@ send_burst_mp(struct rte_port_ring_writer *p) uint32_t nb_tx; nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx); for ( ; nb_tx < p->tx_buf_count; nb_tx++) @@ -318,11 +318,11 @@ rte_port_ring_writer_tx_bulk_internal(void *port, RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts); if (is_multi) - n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, - n_pkts); + n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); else - n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, - n_pkts); + n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok); for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) { @@ -517,7 +517,7 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p) uint32_t nb_tx = 0, i; nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); /* We sent all the packets in a first try */ if (nb_tx >= p->tx_buf_count) { @@ -527,7 +527,8 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p) for (i = 0; i < p->n_retries; i++) { nb_tx += rte_ring_sp_enqueue_burst(p->ring, - (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx); + (void **) (p->tx_buf + nb_tx), + p->tx_buf_count - nb_tx, NULL); /* We sent all the packets in more than one try */ if (nb_tx >= p->tx_buf_count) { @@ -550,7 +551,7 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p) uint32_t nb_tx = 0, i; nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf, - p->tx_buf_count); + p->tx_buf_count, NULL); /* We sent all the packets in a first try */ if (nb_tx >= p->tx_buf_count) { @@ -560,7 +561,8 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p) for (i = 0; i < p->n_retries; i++) { nb_tx += rte_ring_mp_enqueue_burst(p->ring, - (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx); + (void **) (p->tx_buf + nb_tx), + p->tx_buf_count - nb_tx, NULL); /* We sent all the packets in more than one try */ if (nb_tx >= p->tx_buf_count) { @@ -633,10 +635,12 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void *port, RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts); if (is_multi) n_pkts_ok = - rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts); + rte_ring_mp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); else n_pkts_ok = - rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts); + rte_ring_sp_enqueue_burst(p->ring, + (void **)pkts, n_pkts, NULL); if (n_pkts_ok >= n_pkts) return 0; diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 34b438c1c1..439698be66 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -354,20 +354,16 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); */ static inline unsigned int __attribute__((always_inline)) __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; - const unsigned max = n; + const unsigned int max = n; int success; unsigned int i; uint32_t mask = r->mask; - /* Avoid the unnecessary cmpset operation below, which is also - * potentially harmful when n equals 0. */ - if (n == 0) - return 0; - /* move prod.head atomically */ do { /* Reset n to the initial burst count */ @@ -382,16 +378,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, free_entries = (mask + cons_tail - prod_head); /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) - return 0; - n = free_entries; - } - } + if (unlikely(n > free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : free_entries; + + if (n == 0) + goto end; prod_next = prod_head + n; success = rte_atomic32_cmpset(&r->prod.head, prod_head, @@ -410,6 +402,9 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, rte_pause(); r->prod.tail = prod_next; +end: + if (free_space != NULL) + *free_space = free_entries - n; return n; } @@ -431,7 +426,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, */ static inline unsigned int __attribute__((always_inline)) __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, - unsigned n, enum rte_ring_queue_behavior behavior) + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *free_space) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -447,16 +443,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, free_entries = mask + cons_tail - prod_head; /* check that we have enough room in ring */ - if (unlikely(n > free_entries)) { - if (behavior == RTE_RING_QUEUE_FIXED) - return 0; - else { - /* No free entry available */ - if (unlikely(free_entries == 0)) - return 0; - n = free_entries; - } - } + if (unlikely(n > free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries; + + if (n == 0) + goto end; + prod_next = prod_head + n; r->prod.head = prod_next; @@ -466,6 +458,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, rte_smp_wmb(); r->prod.tail = prod_next; +end: + if (free_space != NULL) + *free_space = free_entries - n; return n; } @@ -620,14 +615,18 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * The number of objects enqueued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); } /** @@ -639,14 +638,18 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * The number of objects enqueued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); } /** @@ -662,17 +665,20 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * The number of objects enqueued, either 0 or n */ static inline unsigned int __attribute__((always_inline)) rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { if (r->prod.single) - return rte_ring_sp_enqueue_bulk(r, obj_table, n); + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); else - return rte_ring_mp_enqueue_bulk(r, obj_table, n); + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); } /** @@ -692,7 +698,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, static inline int __attribute__((always_inline)) rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -709,7 +715,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) static inline int __attribute__((always_inline)) rte_ring_sp_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -730,7 +736,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) static inline int __attribute__((always_inline)) rte_ring_enqueue(struct rte_ring *r, void *obj) { - return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS; + return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; } /** @@ -971,14 +977,18 @@ struct rte_ring *rte_ring_lookup(const char *name); * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * - n: Actual number of objects enqueued. */ static inline unsigned __attribute__((always_inline)) rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_mp_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); } /** @@ -990,14 +1000,18 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * - n: Actual number of objects enqueued. */ static inline unsigned __attribute__((always_inline)) rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { - return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); + return __rte_ring_sp_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); } /** @@ -1013,17 +1027,20 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. * @return * - n: Actual number of objects enqueued. */ static inline unsigned __attribute__((always_inline)) rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, - unsigned n) + unsigned int n, unsigned int *free_space) { if (r->prod.single) - return rte_ring_sp_enqueue_burst(r, obj_table, n); + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); else - return rte_ring_mp_enqueue_burst(r, obj_table, n); + return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); } /** diff --git a/test/test-pipeline/pipeline_hash.c b/test/test-pipeline/pipeline_hash.c index 1ac0aa8414..0c6e04f788 100644 --- a/test/test-pipeline/pipeline_hash.c +++ b/test/test-pipeline/pipeline_hash.c @@ -546,7 +546,8 @@ app_main_loop_rx_metadata(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_rx[i], (void **) app.mbuf_rx.array, - n_mbufs); + n_mbufs, + NULL); } while (ret == 0); } } diff --git a/test/test-pipeline/runtime.c b/test/test-pipeline/runtime.c index 4e20669298..c06ff54cd3 100644 --- a/test/test-pipeline/runtime.c +++ b/test/test-pipeline/runtime.c @@ -97,7 +97,7 @@ app_main_loop_rx(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_rx[i], (void **) app.mbuf_rx.array, - n_mbufs); + n_mbufs, NULL); } while (ret == 0); } } @@ -130,7 +130,8 @@ app_main_loop_worker(void) { ret = rte_ring_sp_enqueue_bulk( app.rings_tx[i ^ 1], (void **) worker_mbuf->array, - app.burst_size_worker_write); + app.burst_size_worker_write, + NULL); } while (ret == 0); } } diff --git a/test/test/test_link_bonding_mode4.c b/test/test/test_link_bonding_mode4.c index 53caa3e980..8df28b4f15 100644 --- a/test/test/test_link_bonding_mode4.c +++ b/test/test/test_link_bonding_mode4.c @@ -206,7 +206,8 @@ slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size) static int slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size) { - return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size); + return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, + size, NULL); } static uint16_t diff --git a/test/test/test_pmd_ring_perf.c b/test/test/test_pmd_ring_perf.c index af011f7db5..045a7f255a 100644 --- a/test/test/test_pmd_ring_perf.c +++ b/test/test/test_pmd_ring_perf.c @@ -98,7 +98,7 @@ test_single_enqueue_dequeue(void) const uint64_t sc_start = rte_rdtsc_precise(); rte_compiler_barrier(); for (i = 0; i < iterations; i++) { - rte_ring_enqueue_bulk(r, &burst, 1); + rte_ring_enqueue_bulk(r, &burst, 1, NULL); rte_ring_dequeue_bulk(r, &burst, 1); } const uint64_t sc_end = rte_rdtsc_precise(); @@ -131,7 +131,8 @@ test_bulk_enqueue_dequeue(void) for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { const uint64_t sc_start = rte_rdtsc(); for (i = 0; i < iterations; i++) { - rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]); + rte_ring_sp_enqueue_bulk(r, (void *)burst, + bulk_sizes[sz], NULL); rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]); } const uint64_t sc_end = rte_rdtsc(); diff --git a/test/test/test_ring.c b/test/test/test_ring.c index 112433b838..b0ca88b65b 100644 --- a/test/test/test_ring.c +++ b/test/test/test_ring.c @@ -117,11 +117,12 @@ test_ring_basic_full_empty(void * const src[], void *dst[]) rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL); printf("%s: iteration %u, random shift: %u;\n", __func__, i, rand); - TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0); + TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand, + NULL) != 0); TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand); /* fill the ring */ - TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0); + TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz, NULL) != 0); TEST_RING_VERIFY(0 == rte_ring_free_count(r)); TEST_RING_VERIFY(rsz == rte_ring_count(r)); TEST_RING_VERIFY(rte_ring_full(r)); @@ -167,19 +168,19 @@ test_ring_basic(void) cur_dst = dst; printf("enqueue 1 obj\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL); cur_src += 1; if (ret == 0) goto fail; printf("enqueue 2 objs\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL); cur_src += 2; if (ret == 0) goto fail; printf("enqueue MAX_BULK objs\n"); - ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK); + ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL); cur_src += MAX_BULK; if (ret == 0) goto fail; @@ -213,19 +214,19 @@ test_ring_basic(void) cur_dst = dst; printf("enqueue 1 obj\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL); cur_src += 1; if (ret == 0) goto fail; printf("enqueue 2 objs\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL); cur_src += 2; if (ret == 0) goto fail; printf("enqueue MAX_BULK objs\n"); - ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK); + ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL); cur_src += MAX_BULK; if (ret == 0) goto fail; @@ -260,7 +261,7 @@ test_ring_basic(void) printf("fill and empty the ring\n"); for (i = 0; itx_queue, (void **)bufs, - nb_pkts); + nb_pkts, NULL); /* increment opacket count */ dev_private->eth_stats.opackets += nb_pkts; @@ -496,7 +496,7 @@ virtual_ethdev_add_mbufs_to_rx_queue(uint8_t port_id, vrtl_eth_dev->data->dev_private; return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst, - burst_length); + burst_length, NULL); } int