Add an extra parameter to the ring enqueue burst/bulk functions so that
those functions can optionally return the amount of free space in the
ring. This information can be used by applications in a number of ways,
for instance, with single-producer queues, it provides a max
enqueue size which is guaranteed to work. It can also be used to
implement watermark functionality in apps, replacing the older
functionality with a more flexible version, which enables apps to
implement multiple watermark thresholds, rather than just one.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Reviewed-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
28 files changed:
* removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
* removed the function ``rte_ring_set_water_mark`` as part of a general
removal of watermarks support in the library.
* removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
* removed the function ``rte_ring_set_water_mark`` as part of a general
removal of watermarks support in the library.
+ * added an extra parameter to the burst/bulk enqueue functions to
+ return the number of free spaces in the ring after enqueue. This can
+ be used by an application to implement its own watermark functionality.
* changed the return value of the enqueue and dequeue bulk functions to
match that of the burst equivalents. In all cases, ring functions which
operate on multiple packets now return the number of elements enqueued
* changed the return value of the enqueue and dequeue bulk functions to
match that of the burst equivalents. In all cases, ring functions which
operate on multiple packets now return the number of elements enqueued
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
- cl_rx_buf[node].count) != cl_rx_buf[node].count){
+ cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;
- retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i);
+ retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i,
+ NULL);
qp->stats.enqueued_count += retval;
return retval;
enqueue_err:
qp->stats.enqueued_count += retval;
return retval;
enqueue_err:
- retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i);
+ retval = rte_ring_enqueue_burst(qp->processed_ops, (void *)ops, i,
+ NULL);
if (ops[i] != NULL)
ops[i]->status = RTE_CRYPTO_OP_STATUS_INVALID_ARGS;
if (ops[i] != NULL)
ops[i]->status = RTE_CRYPTO_OP_STATUS_INVALID_ARGS;
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
- (void **)ops, processed_ops);
+ (void **)ops, processed_ops, NULL);
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
}
enqueued_op = rte_ring_enqueue_burst(qp->processed_ops, (void **)&op,
}
enqueued_op = rte_ring_enqueue_burst(qp->processed_ops, (void **)&op,
qp->qp_stats.enqueued_count += enqueued_op;
*accumulated_enqueued_ops += enqueued_op;
qp->qp_stats.enqueued_count += enqueued_op;
*accumulated_enqueued_ops += enqueued_op;
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
- (void **)ops, processed_ops);
+ (void **)ops, processed_ops, NULL);
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
}
enqueued_op = rte_ring_enqueue_burst(qp->processed_ops,
}
enqueued_op = rte_ring_enqueue_burst(qp->processed_ops,
- (void **)&op, processed_op);
+ (void **)&op, processed_op, NULL);
qp->qp_stats.enqueued_count += enqueued_op;
*accumulated_enqueued_ops += enqueued_op;
qp->qp_stats.enqueued_count += enqueued_op;
*accumulated_enqueued_ops += enqueued_op;
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
}
enqueued_ops = rte_ring_enqueue_burst(qp->processed_ops,
- (void **)ops, processed_ops);
+ (void **)ops, processed_ops, NULL);
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
qp->qp_stats.enqueued_count += enqueued_ops;
*accumulated_enqueued_ops += enqueued_ops;
void **ptrs = (void *)&bufs[0];
struct ring_queue *r = q;
const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
void **ptrs = (void *)&bufs[0];
struct ring_queue *r = q;
const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
if (r->rng->flags & RING_F_SP_ENQ) {
r->tx_pkts.cnt += nb_tx;
r->err_pkts.cnt += nb_bufs - nb_tx;
if (r->rng->flags & RING_F_SP_ENQ) {
r->tx_pkts.cnt += nb_tx;
r->err_pkts.cnt += nb_bufs - nb_tx;
struct rte_ring *tx_ring = p->dist_tx_ring;
uint16_t sent = rte_ring_enqueue_burst(tx_ring,
struct rte_ring *tx_ring = p->dist_tx_ring;
uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+ (void *)bufs, nb_ret, NULL);
#else
uint16_t nb_ret = nb_rx;
/*
#else
uint16_t nb_ret = nb_rx;
/*
/* struct rte_ring *out_ring = p->dist_tx_ring; */
uint16_t sent = rte_ring_enqueue_burst(out_ring,
/* struct rte_ring *out_ring = p->dist_tx_ring; */
uint16_t sent = rte_ring_enqueue_burst(out_ring,
+ (void *)bufs, nb_ret, NULL);
#endif
app_stats.rx.enqueued_pkts += sent;
#endif
app_stats.rx.enqueued_pkts += sent;
app_stats.dist.ret_pkts += nb_ret;
uint16_t sent = rte_ring_enqueue_burst(out_r,
app_stats.dist.ret_pkts += nb_ret;
uint16_t sent = rte_ring_enqueue_burst(out_r,
+ (void *)bufs, nb_ret, NULL);
app_stats.dist.sent_pkts += sent;
if (unlikely(sent < nb_ret)) {
app_stats.dist.enqdrop_pkts += nb_ret - sent;
app_stats.dist.sent_pkts += sent;
if (unlikely(sent < nb_ret)) {
app_stats.dist.enqdrop_pkts += nb_ret - sent;
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
if (unlikely(ret == 0)) {
uint32_t k;
if (unlikely(ret == 0)) {
uint32_t k;
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
- lp->rx.mbuf_out[worker].n_mbufs);
+ lp->rx.mbuf_out[worker].n_mbufs,
+ NULL);
if (unlikely(ret == 0)) {
uint32_t k;
if (unlikely(ret == 0)) {
uint32_t k;
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
#if APP_STATS
lp->rings_out_iters[port] ++;
#if APP_STATS
lp->rings_out_iters[port] ++;
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
- lp->mbuf_out[port].n_mbufs);
+ lp->mbuf_out[port].n_mbufs,
+ NULL);
if (unlikely(ret == 0)) {
uint32_t k;
if (unlikely(ret == 0)) {
uint32_t k;
cl = &clients[client];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
cl = &clients[client];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
- cl_rx_buf[client].count) == 0){
+ cl_rx_buf[client].count, NULL) == 0){
for (j = 0; j < cl_rx_buf[client].count; j++)
rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[client].count;
for (j = 0; j < cl_rx_buf[client].count; j++)
rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[client].count;
pkts[i++]->seqn = seqn++;
/* enqueue to rx_to_workers ring */
pkts[i++]->seqn = seqn++;
/* enqueue to rx_to_workers ring */
- ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
- nb_rx_pkts);
+ ret = rte_ring_enqueue_burst(ring_out,
+ (void *)pkts, nb_rx_pkts, NULL);
app_stats.rx.enqueue_pkts += ret;
if (unlikely(ret < nb_rx_pkts)) {
app_stats.rx.enqueue_failed_pkts +=
app_stats.rx.enqueue_pkts += ret;
if (unlikely(ret < nb_rx_pkts)) {
app_stats.rx.enqueue_failed_pkts +=
burst_buffer[i++]->port ^= xor_val;
/* enqueue the modified mbufs to workers_to_tx ring */
burst_buffer[i++]->port ^= xor_val;
/* enqueue the modified mbufs to workers_to_tx ring */
- ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
+ ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer,
+ burst_size, NULL);
__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
if (unlikely(ret < burst_size)) {
/* Return the mbufs to their respective pool, dropping packets */
__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
if (unlikely(ret < burst_size)) {
/* Return the mbufs to their respective pool, dropping packets */
ret = rte_ring_sp_enqueue_burst(
rx_conf->ring[worker_id],
(void **) pkts_burst,
ret = rte_ring_sp_enqueue_burst(
rx_conf->ring[worker_id],
(void **) pkts_burst,
SET_CPU_BUSY(rx_conf, CPU_PROCESS);
worker_id = (worker_id + 1) % rx_conf->n_ring;
n = rte_ring_sp_enqueue_burst(rx_conf->ring[worker_id],
SET_CPU_BUSY(rx_conf, CPU_PROCESS);
worker_id = (worker_id + 1) % rx_conf->n_ring;
n = rte_ring_sp_enqueue_burst(rx_conf->ring[worker_id],
- (void **)pkts_burst, nb_rx);
+ (void **)pkts_burst, nb_rx, NULL);
if (unlikely(n != nb_rx)) {
uint32_t k;
if (unlikely(n != nb_rx)) {
uint32_t k;
}
if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
}
if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
- (void **)rx_mbufs, nb_rx) == 0)) {
+ (void **)rx_mbufs, nb_rx, NULL) == 0)) {
for(i = 0; i < nb_rx; i++) {
rte_pktmbuf_free(rx_mbufs[i]);
for(i = 0; i < nb_rx; i++) {
rte_pktmbuf_free(rx_mbufs[i]);
burst_conf.qos_dequeue);
if (likely(nb_pkt > 0))
while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
burst_conf.qos_dequeue);
if (likely(nb_pkt > 0))
while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
- (void **)mbufs, nb_pkt) == 0)
+ (void **)mbufs, nb_pkt, NULL) == 0)
; /* empty body */
conf_idx++;
; /* empty body */
conf_idx++;
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
- cl_rx_buf[node].count) != cl_rx_buf[node].count){
+ cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;
/* Need to enqueue the free slots in global ring. */
n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
cached_free_slots->objs,
/* Need to enqueue the free slots in global ring. */
n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
cached_free_slots->objs,
+ LCORE_CACHE_SIZE, NULL);
cached_free_slots->len -= n_slots;
}
/* Put index of new free slot in cache. */
cached_free_slots->len -= n_slots;
}
/* Put index of new free slot in cache. */
unsigned n)
{
return rte_ring_mp_enqueue_bulk(mp->pool_data,
unsigned n)
{
return rte_ring_mp_enqueue_bulk(mp->pool_data,
- obj_table, n) == 0 ? -ENOBUFS : 0;
+ obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
unsigned n)
{
return rte_ring_sp_enqueue_bulk(mp->pool_data,
unsigned n)
{
return rte_ring_sp_enqueue_bulk(mp->pool_data,
- obj_table, n) == 0 ? -ENOBUFS : 0;
+ obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
dup_bufs[d_pkts++] = p;
}
dup_bufs[d_pkts++] = p;
}
- ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
+ ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL);
if (unlikely(ring_enq < d_pkts)) {
RTE_LOG(DEBUG, PDUMP,
"only %d of packets enqueued to ring\n", ring_enq);
if (unlikely(ring_enq < d_pkts)) {
RTE_LOG(DEBUG, PDUMP,
"only %d of packets enqueued to ring\n", ring_enq);
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
uint32_t nb_tx;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
uint32_t nb_tx;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
- n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
- n_pkts);
+ n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
- n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
- n_pkts);
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count, NULL);
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_sp_enqueue_burst(p->ring,
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_sp_enqueue_burst(p->ring,
- (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+ (void **) (p->tx_buf + nb_tx),
+ p->tx_buf_count - nb_tx, NULL);
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count, NULL);
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_mp_enqueue_burst(p->ring,
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_mp_enqueue_burst(p->ring,
- (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+ (void **) (p->tx_buf + nb_tx),
+ p->tx_buf_count - nb_tx, NULL);
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
n_pkts_ok =
RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
n_pkts_ok =
- rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+ rte_ring_mp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
- rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+ rte_ring_sp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
if (n_pkts_ok >= n_pkts)
return 0;
if (n_pkts_ok >= n_pkts)
return 0;
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
- const unsigned max = n;
+ const unsigned int max = n;
int success;
unsigned int i;
uint32_t mask = r->mask;
int success;
unsigned int i;
uint32_t mask = r->mask;
- /* Avoid the unnecessary cmpset operation below, which is also
- * potentially harmful when n equals 0. */
- if (n == 0)
- return 0;
-
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
- if (unlikely(n > free_entries)) {
- if (behavior == RTE_RING_QUEUE_FIXED)
- return 0;
- else {
- /* No free entry available */
- if (unlikely(free_entries == 0))
- return 0;
- n = free_entries;
- }
- }
+ if (unlikely(n > free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : free_entries;
+
+ if (n == 0)
+ goto end;
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
rte_pause();
r->prod.tail = prod_next;
rte_pause();
r->prod.tail = prod_next;
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
{
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
{
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
free_entries = mask + cons_tail - prod_head;
/* check that we have enough room in ring */
free_entries = mask + cons_tail - prod_head;
/* check that we have enough room in ring */
- if (unlikely(n > free_entries)) {
- if (behavior == RTE_RING_QUEUE_FIXED)
- return 0;
- else {
- /* No free entry available */
- if (unlikely(free_entries == 0))
- return 0;
- n = free_entries;
- }
- }
+ if (unlikely(n > free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries;
+
+ if (n == 0)
+ goto end;
+
prod_next = prod_head + n;
r->prod.head = prod_next;
prod_next = prod_head + n;
r->prod.head = prod_next;
rte_smp_wmb();
r->prod.tail = prod_next;
rte_smp_wmb();
r->prod.tail = prod_next;
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ free_space);
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ free_space);
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
* @return
* The number of objects enqueued, either 0 or n
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+ return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
- return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+ return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
static inline int __attribute__((always_inline))
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
static inline int __attribute__((always_inline))
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
static inline int __attribute__((always_inline))
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{
static inline int __attribute__((always_inline))
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{
static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_ring_mp_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, free_space);
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_ring_sp_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, free_space);
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the ring after the
+ * enqueue operation has finished.
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* @return
* - n: Actual number of objects enqueued.
*/
static inline unsigned __attribute__((always_inline))
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, unsigned int *free_space)
- return rte_ring_sp_enqueue_burst(r, obj_table, n);
+ return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
- return rte_ring_mp_enqueue_burst(r, obj_table, n);
+ return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
ret = rte_ring_sp_enqueue_bulk(
app.rings_tx[i ^ 1],
(void **) worker_mbuf->array,
ret = rte_ring_sp_enqueue_bulk(
app.rings_tx[i ^ 1],
(void **) worker_mbuf->array,
- app.burst_size_worker_write);
+ app.burst_size_worker_write,
+ NULL);
static int
slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
{
static int
slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
{
- return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size);
+ return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf,
+ size, NULL);
const uint64_t sc_start = rte_rdtsc_precise();
rte_compiler_barrier();
for (i = 0; i < iterations; i++) {
const uint64_t sc_start = rte_rdtsc_precise();
rte_compiler_barrier();
for (i = 0; i < iterations; i++) {
- rte_ring_enqueue_bulk(r, &burst, 1);
+ rte_ring_enqueue_bulk(r, &burst, 1, NULL);
rte_ring_dequeue_bulk(r, &burst, 1);
}
const uint64_t sc_end = rte_rdtsc_precise();
rte_ring_dequeue_bulk(r, &burst, 1);
}
const uint64_t sc_end = rte_rdtsc_precise();
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_bulk(r, (void *)burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
printf("%s: iteration %u, random shift: %u;\n",
__func__, i, rand);
rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
printf("%s: iteration %u, random shift: %u;\n",
__func__, i, rand);
- TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0);
+ TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand,
+ NULL) != 0);
TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
/* fill the ring */
TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
/* fill the ring */
- TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
+ TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz, NULL) != 0);
TEST_RING_VERIFY(0 == rte_ring_free_count(r));
TEST_RING_VERIFY(rsz == rte_ring_count(r));
TEST_RING_VERIFY(rte_ring_full(r));
TEST_RING_VERIFY(0 == rte_ring_free_count(r));
TEST_RING_VERIFY(rsz == rte_ring_count(r));
TEST_RING_VERIFY(rte_ring_full(r));
cur_dst = dst;
printf("enqueue 1 obj\n");
cur_dst = dst;
printf("enqueue 1 obj\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL);
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL);
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
cur_dst = dst;
printf("enqueue 1 obj\n");
cur_dst = dst;
printf("enqueue 1 obj\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL);
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL);
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
cur_src = src;
cur_dst = dst;
cur_src = src;
cur_dst = dst;
- ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
goto fail;
}
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
goto fail;
}
- ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
printf("Test SP & SC basic functions \n");
printf("enqueue 1 obj\n");
printf("Test SP & SC basic functions \n");
printf("enqueue 1 obj\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 1, NULL);
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
printf("Test enqueue without enough memory space \n");
for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
printf("Test enqueue without enough memory space \n");
for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
goto fail;
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
goto fail;
}
printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
}
printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
/* Always one free entry left */
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
/* Always one free entry left */
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
goto fail;
printf("Test enqueue for a full entry \n");
goto fail;
printf("Test enqueue for a full entry \n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
if ((ret & RTE_RING_SZ_MASK) != 0)
goto fail;
if ((ret & RTE_RING_SZ_MASK) != 0)
goto fail;
printf("Test MP & MC basic functions \n");
printf("enqueue 1 obj\n");
printf("Test MP & MC basic functions \n");
printf("enqueue 1 obj\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 1, NULL);
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
printf("Test enqueue without enough memory space \n");
for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
printf("Test enqueue without enough memory space \n");
for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
}
/* Available memory space for the exact MAX_BULK objects */
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
}
/* Available memory space for the exact MAX_BULK objects */
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
printf("Covering rte_ring_enqueue_burst functions \n");
printf("Covering rte_ring_enqueue_burst functions \n");
- ret = rte_ring_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
}
/* Covering the ring burst operation */
}
/* Covering the ring burst operation */
- ret = rte_ring_enqueue_burst(rp, obj, 2);
+ ret = rte_ring_enqueue_burst(rp, obj, 2, NULL);
if ((ret & RTE_RING_SZ_MASK) != 2) {
printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
goto fail_test;
if ((ret & RTE_RING_SZ_MASK) != 2) {
printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
goto fail_test;
const uint64_t sp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
const uint64_t sp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
- while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
+ while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
rte_pause();
const uint64_t sp_end = rte_rdtsc();
const uint64_t mp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
rte_pause();
const uint64_t sp_end = rte_rdtsc();
const uint64_t mp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
- while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
+ while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
rte_pause();
const uint64_t mp_end = rte_rdtsc();
rte_pause();
const uint64_t mp_end = rte_rdtsc();
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_burst(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
+ rte_ring_mp_enqueue_burst(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_bulk(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+ rte_ring_mp_enqueue_bulk(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
mbuf[0] = (void *)rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
mbuf[0] = (void *)rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf, 1);
if (received_pkts < expected_pkts)
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf, 1);
if (received_pkts < expected_pkts)
mbuf[i] = rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
mbuf[i] = rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
- (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX);
+ (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX, NULL);
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf,
RTE_PORT_IN_BURST_SIZE_MAX);
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf,
RTE_PORT_IN_BURST_SIZE_MAX);
nb_pkts = 0;
else
nb_pkts = rte_ring_enqueue_burst(dev_private->tx_queue, (void **)bufs,
nb_pkts = 0;
else
nb_pkts = rte_ring_enqueue_burst(dev_private->tx_queue, (void **)bufs,
/* increment opacket count */
dev_private->eth_stats.opackets += nb_pkts;
/* increment opacket count */
dev_private->eth_stats.opackets += nb_pkts;
vrtl_eth_dev->data->dev_private;
return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst,
vrtl_eth_dev->data->dev_private;
return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst,