ring: introduce RTS ring mode
[dpdk.git] / lib / librte_ring / rte_ring_elem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13
14 /**
15  * @file
16  * RTE Ring with user defined element size
17  */
18
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 #include <rte_ring_core.h>
24
25 /**
26  * @warning
27  * @b EXPERIMENTAL: this API may change without prior notice
28  *
29  * Calculate the memory size needed for a ring with given element size
30  *
31  * This function returns the number of bytes needed for a ring, given
32  * the number of elements in it and the size of the element. This value
33  * is the sum of the size of the structure rte_ring and the size of the
34  * memory needed for storing the elements. The value is aligned to a cache
35  * line size.
36  *
37  * @param esize
38  *   The size of ring element, in bytes. It must be a multiple of 4.
39  * @param count
40  *   The number of elements in the ring (must be a power of 2).
41  * @return
42  *   - The memory size needed for the ring on success.
43  *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
44  *               power of 2.
45  */
46 __rte_experimental
47 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
48
49 /**
50  * @warning
51  * @b EXPERIMENTAL: this API may change without prior notice
52  *
53  * Create a new ring named *name* that stores elements with given size.
54  *
55  * This function uses ``memzone_reserve()`` to allocate memory. Then it
56  * calls rte_ring_init() to initialize an empty ring.
57  *
58  * The new ring size is set to *count*, which must be a power of
59  * two. Water marking is disabled by default. The real usable ring size
60  * is *count-1* instead of *count* to differentiate a free ring from an
61  * empty ring.
62  *
63  * The ring is added in RTE_TAILQ_RING list.
64  *
65  * @param name
66  *   The name of the ring.
67  * @param esize
68  *   The size of ring element, in bytes. It must be a multiple of 4.
69  * @param count
70  *   The number of elements in the ring (must be a power of 2).
71  * @param socket_id
72  *   The *socket_id* argument is the socket identifier in case of
73  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
74  *   constraint for the reserved zone.
75  * @param flags
76  *   An OR of the following:
77  *   - One of mutually exclusive flags that define producer behavior:
78  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
79  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
80  *        is "single-producer".
81  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
82  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
83  *        is "multi-producer RTS mode".
84  *     If none of these flags is set, then default "multi-producer"
85  *     behavior is selected.
86  *   - One of mutually exclusive flags that define consumer behavior:
87  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
88  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
89  *        is "single-consumer". Otherwise, it is "multi-consumers".
90  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
91  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
92  *        is "multi-consumer RTS mode".
93  *     If none of these flags is set, then default "multi-consumer"
94  *     behavior is selected.
95  * @return
96  *   On success, the pointer to the new allocated ring. NULL on error with
97  *    rte_errno set appropriately. Possible errno values include:
98  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
99  *    - E_RTE_SECONDARY - function was called from a secondary process instance
100  *    - EINVAL - esize is not a multiple of 4 or count provided is not a
101  *               power of 2.
102  *    - ENOSPC - the maximum number of memzones has already been allocated
103  *    - EEXIST - a memzone with the same name already exists
104  *    - ENOMEM - no appropriate memory area found in which to create memzone
105  */
106 __rte_experimental
107 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
108                         unsigned int count, int socket_id, unsigned int flags);
109
110 static __rte_always_inline void
111 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
112                 uint32_t idx, const void *obj_table, uint32_t n)
113 {
114         unsigned int i;
115         uint32_t *ring = (uint32_t *)&r[1];
116         const uint32_t *obj = (const uint32_t *)obj_table;
117         if (likely(idx + n < size)) {
118                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
119                         ring[idx] = obj[i];
120                         ring[idx + 1] = obj[i + 1];
121                         ring[idx + 2] = obj[i + 2];
122                         ring[idx + 3] = obj[i + 3];
123                         ring[idx + 4] = obj[i + 4];
124                         ring[idx + 5] = obj[i + 5];
125                         ring[idx + 6] = obj[i + 6];
126                         ring[idx + 7] = obj[i + 7];
127                 }
128                 switch (n & 0x7) {
129                 case 7:
130                         ring[idx++] = obj[i++]; /* fallthrough */
131                 case 6:
132                         ring[idx++] = obj[i++]; /* fallthrough */
133                 case 5:
134                         ring[idx++] = obj[i++]; /* fallthrough */
135                 case 4:
136                         ring[idx++] = obj[i++]; /* fallthrough */
137                 case 3:
138                         ring[idx++] = obj[i++]; /* fallthrough */
139                 case 2:
140                         ring[idx++] = obj[i++]; /* fallthrough */
141                 case 1:
142                         ring[idx++] = obj[i++]; /* fallthrough */
143                 }
144         } else {
145                 for (i = 0; idx < size; i++, idx++)
146                         ring[idx] = obj[i];
147                 /* Start at the beginning */
148                 for (idx = 0; i < n; i++, idx++)
149                         ring[idx] = obj[i];
150         }
151 }
152
153 static __rte_always_inline void
154 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
155                 const void *obj_table, uint32_t n)
156 {
157         unsigned int i;
158         const uint32_t size = r->size;
159         uint32_t idx = prod_head & r->mask;
160         uint64_t *ring = (uint64_t *)&r[1];
161         const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
162         if (likely(idx + n < size)) {
163                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
164                         ring[idx] = obj[i];
165                         ring[idx + 1] = obj[i + 1];
166                         ring[idx + 2] = obj[i + 2];
167                         ring[idx + 3] = obj[i + 3];
168                 }
169                 switch (n & 0x3) {
170                 case 3:
171                         ring[idx++] = obj[i++]; /* fallthrough */
172                 case 2:
173                         ring[idx++] = obj[i++]; /* fallthrough */
174                 case 1:
175                         ring[idx++] = obj[i++];
176                 }
177         } else {
178                 for (i = 0; idx < size; i++, idx++)
179                         ring[idx] = obj[i];
180                 /* Start at the beginning */
181                 for (idx = 0; i < n; i++, idx++)
182                         ring[idx] = obj[i];
183         }
184 }
185
186 static __rte_always_inline void
187 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
188                 const void *obj_table, uint32_t n)
189 {
190         unsigned int i;
191         const uint32_t size = r->size;
192         uint32_t idx = prod_head & r->mask;
193         rte_int128_t *ring = (rte_int128_t *)&r[1];
194         const rte_int128_t *obj = (const rte_int128_t *)obj_table;
195         if (likely(idx + n < size)) {
196                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
197                         memcpy((void *)(ring + idx),
198                                 (const void *)(obj + i), 32);
199                 switch (n & 0x1) {
200                 case 1:
201                         memcpy((void *)(ring + idx),
202                                 (const void *)(obj + i), 16);
203                 }
204         } else {
205                 for (i = 0; idx < size; i++, idx++)
206                         memcpy((void *)(ring + idx),
207                                 (const void *)(obj + i), 16);
208                 /* Start at the beginning */
209                 for (idx = 0; i < n; i++, idx++)
210                         memcpy((void *)(ring + idx),
211                                 (const void *)(obj + i), 16);
212         }
213 }
214
215 /* the actual enqueue of elements on the ring.
216  * Placed here since identical code needed in both
217  * single and multi producer enqueue functions.
218  */
219 static __rte_always_inline void
220 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
221                 const void *obj_table, uint32_t esize, uint32_t num)
222 {
223         /* 8B and 16B copies implemented individually to retain
224          * the current performance.
225          */
226         if (esize == 8)
227                 __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
228         else if (esize == 16)
229                 __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
230         else {
231                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
232
233                 /* Normalize to uint32_t */
234                 scale = esize / sizeof(uint32_t);
235                 nr_num = num * scale;
236                 idx = prod_head & r->mask;
237                 nr_idx = idx * scale;
238                 nr_size = r->size * scale;
239                 __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
240                                 obj_table, nr_num);
241         }
242 }
243
244 static __rte_always_inline void
245 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
246                 uint32_t idx, void *obj_table, uint32_t n)
247 {
248         unsigned int i;
249         uint32_t *ring = (uint32_t *)&r[1];
250         uint32_t *obj = (uint32_t *)obj_table;
251         if (likely(idx + n < size)) {
252                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
253                         obj[i] = ring[idx];
254                         obj[i + 1] = ring[idx + 1];
255                         obj[i + 2] = ring[idx + 2];
256                         obj[i + 3] = ring[idx + 3];
257                         obj[i + 4] = ring[idx + 4];
258                         obj[i + 5] = ring[idx + 5];
259                         obj[i + 6] = ring[idx + 6];
260                         obj[i + 7] = ring[idx + 7];
261                 }
262                 switch (n & 0x7) {
263                 case 7:
264                         obj[i++] = ring[idx++]; /* fallthrough */
265                 case 6:
266                         obj[i++] = ring[idx++]; /* fallthrough */
267                 case 5:
268                         obj[i++] = ring[idx++]; /* fallthrough */
269                 case 4:
270                         obj[i++] = ring[idx++]; /* fallthrough */
271                 case 3:
272                         obj[i++] = ring[idx++]; /* fallthrough */
273                 case 2:
274                         obj[i++] = ring[idx++]; /* fallthrough */
275                 case 1:
276                         obj[i++] = ring[idx++]; /* fallthrough */
277                 }
278         } else {
279                 for (i = 0; idx < size; i++, idx++)
280                         obj[i] = ring[idx];
281                 /* Start at the beginning */
282                 for (idx = 0; i < n; i++, idx++)
283                         obj[i] = ring[idx];
284         }
285 }
286
287 static __rte_always_inline void
288 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
289                 void *obj_table, uint32_t n)
290 {
291         unsigned int i;
292         const uint32_t size = r->size;
293         uint32_t idx = prod_head & r->mask;
294         uint64_t *ring = (uint64_t *)&r[1];
295         unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
296         if (likely(idx + n < size)) {
297                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
298                         obj[i] = ring[idx];
299                         obj[i + 1] = ring[idx + 1];
300                         obj[i + 2] = ring[idx + 2];
301                         obj[i + 3] = ring[idx + 3];
302                 }
303                 switch (n & 0x3) {
304                 case 3:
305                         obj[i++] = ring[idx++]; /* fallthrough */
306                 case 2:
307                         obj[i++] = ring[idx++]; /* fallthrough */
308                 case 1:
309                         obj[i++] = ring[idx++]; /* fallthrough */
310                 }
311         } else {
312                 for (i = 0; idx < size; i++, idx++)
313                         obj[i] = ring[idx];
314                 /* Start at the beginning */
315                 for (idx = 0; i < n; i++, idx++)
316                         obj[i] = ring[idx];
317         }
318 }
319
320 static __rte_always_inline void
321 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
322                 void *obj_table, uint32_t n)
323 {
324         unsigned int i;
325         const uint32_t size = r->size;
326         uint32_t idx = prod_head & r->mask;
327         rte_int128_t *ring = (rte_int128_t *)&r[1];
328         rte_int128_t *obj = (rte_int128_t *)obj_table;
329         if (likely(idx + n < size)) {
330                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
331                         memcpy((void *)(obj + i), (void *)(ring + idx), 32);
332                 switch (n & 0x1) {
333                 case 1:
334                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
335                 }
336         } else {
337                 for (i = 0; idx < size; i++, idx++)
338                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
339                 /* Start at the beginning */
340                 for (idx = 0; i < n; i++, idx++)
341                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
342         }
343 }
344
345 /* the actual dequeue of elements from the ring.
346  * Placed here since identical code needed in both
347  * single and multi producer enqueue functions.
348  */
349 static __rte_always_inline void
350 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
351                 void *obj_table, uint32_t esize, uint32_t num)
352 {
353         /* 8B and 16B copies implemented individually to retain
354          * the current performance.
355          */
356         if (esize == 8)
357                 __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
358         else if (esize == 16)
359                 __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
360         else {
361                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
362
363                 /* Normalize to uint32_t */
364                 scale = esize / sizeof(uint32_t);
365                 nr_num = num * scale;
366                 idx = cons_head & r->mask;
367                 nr_idx = idx * scale;
368                 nr_size = r->size * scale;
369                 __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
370                                 obj_table, nr_num);
371         }
372 }
373
374 /* Between load and load. there might be cpu reorder in weak model
375  * (powerpc/arm).
376  * There are 2 choices for the users
377  * 1.use rmb() memory barrier
378  * 2.use one-direction load_acquire/store_release barrier,defined by
379  * CONFIG_RTE_USE_C11_MEM_MODEL=y
380  * It depends on performance test results.
381  * By default, move common functions to rte_ring_generic.h
382  */
383 #ifdef RTE_USE_C11_MEM_MODEL
384 #include "rte_ring_c11_mem.h"
385 #else
386 #include "rte_ring_generic.h"
387 #endif
388
389 /**
390  * @internal Enqueue several objects on the ring
391  *
392  * @param r
393  *   A pointer to the ring structure.
394  * @param obj_table
395  *   A pointer to a table of objects.
396  * @param esize
397  *   The size of ring element, in bytes. It must be a multiple of 4.
398  *   This must be the same value used while creating the ring. Otherwise
399  *   the results are undefined.
400  * @param n
401  *   The number of objects to add in the ring from the obj_table.
402  * @param behavior
403  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
404  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
405  * @param is_sp
406  *   Indicates whether to use single producer or multi-producer head update
407  * @param free_space
408  *   returns the amount of space after the enqueue operation has finished
409  * @return
410  *   Actual number of objects enqueued.
411  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
412  */
413 static __rte_always_inline unsigned int
414 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
415                 unsigned int esize, unsigned int n,
416                 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
417                 unsigned int *free_space)
418 {
419         uint32_t prod_head, prod_next;
420         uint32_t free_entries;
421
422         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
423                         &prod_head, &prod_next, &free_entries);
424         if (n == 0)
425                 goto end;
426
427         __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
428
429         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
430 end:
431         if (free_space != NULL)
432                 *free_space = free_entries - n;
433         return n;
434 }
435
436 /**
437  * @internal Dequeue several objects from the ring
438  *
439  * @param r
440  *   A pointer to the ring structure.
441  * @param obj_table
442  *   A pointer to a table of objects.
443  * @param esize
444  *   The size of ring element, in bytes. It must be a multiple of 4.
445  *   This must be the same value used while creating the ring. Otherwise
446  *   the results are undefined.
447  * @param n
448  *   The number of objects to pull from the ring.
449  * @param behavior
450  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
451  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
452  * @param is_sc
453  *   Indicates whether to use single consumer or multi-consumer head update
454  * @param available
455  *   returns the number of remaining ring entries after the dequeue has finished
456  * @return
457  *   - Actual number of objects dequeued.
458  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
459  */
460 static __rte_always_inline unsigned int
461 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
462                 unsigned int esize, unsigned int n,
463                 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
464                 unsigned int *available)
465 {
466         uint32_t cons_head, cons_next;
467         uint32_t entries;
468
469         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
470                         &cons_head, &cons_next, &entries);
471         if (n == 0)
472                 goto end;
473
474         __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
475
476         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
477
478 end:
479         if (available != NULL)
480                 *available = entries - n;
481         return n;
482 }
483
484 /**
485  * Enqueue several objects on the ring (multi-producers safe).
486  *
487  * This function uses a "compare and set" instruction to move the
488  * producer index atomically.
489  *
490  * @param r
491  *   A pointer to the ring structure.
492  * @param obj_table
493  *   A pointer to a table of objects.
494  * @param esize
495  *   The size of ring element, in bytes. It must be a multiple of 4.
496  *   This must be the same value used while creating the ring. Otherwise
497  *   the results are undefined.
498  * @param n
499  *   The number of objects to add in the ring from the obj_table.
500  * @param free_space
501  *   if non-NULL, returns the amount of space in the ring after the
502  *   enqueue operation has finished.
503  * @return
504  *   The number of objects enqueued, either 0 or n
505  */
506 static __rte_always_inline unsigned int
507 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
508                 unsigned int esize, unsigned int n, unsigned int *free_space)
509 {
510         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
511                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space);
512 }
513
514 /**
515  * Enqueue several objects on a ring
516  *
517  * @warning This API is NOT multi-producers safe
518  *
519  * @param r
520  *   A pointer to the ring structure.
521  * @param obj_table
522  *   A pointer to a table of objects.
523  * @param esize
524  *   The size of ring element, in bytes. It must be a multiple of 4.
525  *   This must be the same value used while creating the ring. Otherwise
526  *   the results are undefined.
527  * @param n
528  *   The number of objects to add in the ring from the obj_table.
529  * @param free_space
530  *   if non-NULL, returns the amount of space in the ring after the
531  *   enqueue operation has finished.
532  * @return
533  *   The number of objects enqueued, either 0 or n
534  */
535 static __rte_always_inline unsigned int
536 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
537                 unsigned int esize, unsigned int n, unsigned int *free_space)
538 {
539         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
540                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space);
541 }
542
543 #ifdef ALLOW_EXPERIMENTAL_API
544 #include <rte_ring_rts.h>
545 #endif
546
547 /**
548  * Enqueue several objects on a ring.
549  *
550  * This function calls the multi-producer or the single-producer
551  * version depending on the default behavior that was specified at
552  * ring creation time (see flags).
553  *
554  * @param r
555  *   A pointer to the ring structure.
556  * @param obj_table
557  *   A pointer to a table of objects.
558  * @param esize
559  *   The size of ring element, in bytes. It must be a multiple of 4.
560  *   This must be the same value used while creating the ring. Otherwise
561  *   the results are undefined.
562  * @param n
563  *   The number of objects to add in the ring from the obj_table.
564  * @param free_space
565  *   if non-NULL, returns the amount of space in the ring after the
566  *   enqueue operation has finished.
567  * @return
568  *   The number of objects enqueued, either 0 or n
569  */
570 static __rte_always_inline unsigned int
571 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
572                 unsigned int esize, unsigned int n, unsigned int *free_space)
573 {
574         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
575                         RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
576
577         switch (r->prod.sync_type) {
578         case RTE_RING_SYNC_MT:
579                 return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n,
580                         free_space);
581         case RTE_RING_SYNC_ST:
582                 return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n,
583                         free_space);
584 #ifdef ALLOW_EXPERIMENTAL_API
585         case RTE_RING_SYNC_MT_RTS:
586                 return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n,
587                         free_space);
588 #endif
589         }
590
591         /* valid ring should never reach this point */
592         RTE_ASSERT(0);
593         if (free_space != NULL)
594                 *free_space = 0;
595         return 0;
596 }
597
598 /**
599  * Enqueue one object on a ring (multi-producers safe).
600  *
601  * This function uses a "compare and set" instruction to move the
602  * producer index atomically.
603  *
604  * @param r
605  *   A pointer to the ring structure.
606  * @param obj
607  *   A pointer to the object to be added.
608  * @param esize
609  *   The size of ring element, in bytes. It must be a multiple of 4.
610  *   This must be the same value used while creating the ring. Otherwise
611  *   the results are undefined.
612  * @return
613  *   - 0: Success; objects enqueued.
614  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
615  */
616 static __rte_always_inline int
617 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
618 {
619         return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
620                                                                 -ENOBUFS;
621 }
622
623 /**
624  * Enqueue one object on a ring
625  *
626  * @warning This API is NOT multi-producers safe
627  *
628  * @param r
629  *   A pointer to the ring structure.
630  * @param obj
631  *   A pointer to the object to be added.
632  * @param esize
633  *   The size of ring element, in bytes. It must be a multiple of 4.
634  *   This must be the same value used while creating the ring. Otherwise
635  *   the results are undefined.
636  * @return
637  *   - 0: Success; objects enqueued.
638  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
639  */
640 static __rte_always_inline int
641 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
642 {
643         return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
644                                                                 -ENOBUFS;
645 }
646
647 /**
648  * Enqueue one object on a ring.
649  *
650  * This function calls the multi-producer or the single-producer
651  * version, depending on the default behaviour that was specified at
652  * ring creation time (see flags).
653  *
654  * @param r
655  *   A pointer to the ring structure.
656  * @param obj
657  *   A pointer to the object to be added.
658  * @param esize
659  *   The size of ring element, in bytes. It must be a multiple of 4.
660  *   This must be the same value used while creating the ring. Otherwise
661  *   the results are undefined.
662  * @return
663  *   - 0: Success; objects enqueued.
664  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
665  */
666 static __rte_always_inline int
667 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
668 {
669         return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
670                                                                 -ENOBUFS;
671 }
672
673 /**
674  * Dequeue several objects from a ring (multi-consumers safe).
675  *
676  * This function uses a "compare and set" instruction to move the
677  * consumer index atomically.
678  *
679  * @param r
680  *   A pointer to the ring structure.
681  * @param obj_table
682  *   A pointer to a table of objects that will be filled.
683  * @param esize
684  *   The size of ring element, in bytes. It must be a multiple of 4.
685  *   This must be the same value used while creating the ring. Otherwise
686  *   the results are undefined.
687  * @param n
688  *   The number of objects to dequeue from the ring to the obj_table.
689  * @param available
690  *   If non-NULL, returns the number of remaining ring entries after the
691  *   dequeue has finished.
692  * @return
693  *   The number of objects dequeued, either 0 or n
694  */
695 static __rte_always_inline unsigned int
696 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
697                 unsigned int esize, unsigned int n, unsigned int *available)
698 {
699         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
700                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available);
701 }
702
703 /**
704  * Dequeue several objects from a ring (NOT multi-consumers safe).
705  *
706  * @param r
707  *   A pointer to the ring structure.
708  * @param obj_table
709  *   A pointer to a table of objects that will be filled.
710  * @param esize
711  *   The size of ring element, in bytes. It must be a multiple of 4.
712  *   This must be the same value used while creating the ring. Otherwise
713  *   the results are undefined.
714  * @param n
715  *   The number of objects to dequeue from the ring to the obj_table,
716  *   must be strictly positive.
717  * @param available
718  *   If non-NULL, returns the number of remaining ring entries after the
719  *   dequeue has finished.
720  * @return
721  *   The number of objects dequeued, either 0 or n
722  */
723 static __rte_always_inline unsigned int
724 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
725                 unsigned int esize, unsigned int n, unsigned int *available)
726 {
727         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
728                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available);
729 }
730
731 /**
732  * Dequeue several objects from a ring.
733  *
734  * This function calls the multi-consumers or the single-consumer
735  * version, depending on the default behaviour that was specified at
736  * ring creation time (see flags).
737  *
738  * @param r
739  *   A pointer to the ring structure.
740  * @param obj_table
741  *   A pointer to a table of objects that will be filled.
742  * @param esize
743  *   The size of ring element, in bytes. It must be a multiple of 4.
744  *   This must be the same value used while creating the ring. Otherwise
745  *   the results are undefined.
746  * @param n
747  *   The number of objects to dequeue from the ring to the obj_table.
748  * @param available
749  *   If non-NULL, returns the number of remaining ring entries after the
750  *   dequeue has finished.
751  * @return
752  *   The number of objects dequeued, either 0 or n
753  */
754 static __rte_always_inline unsigned int
755 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
756                 unsigned int esize, unsigned int n, unsigned int *available)
757 {
758         switch (r->cons.sync_type) {
759         case RTE_RING_SYNC_MT:
760                 return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n,
761                         available);
762         case RTE_RING_SYNC_ST:
763                 return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n,
764                         available);
765 #ifdef ALLOW_EXPERIMENTAL_API
766         case RTE_RING_SYNC_MT_RTS:
767                 return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize,
768                         n, available);
769 #endif
770         }
771
772         /* valid ring should never reach this point */
773         RTE_ASSERT(0);
774         if (available != NULL)
775                 *available = 0;
776         return 0;
777 }
778
779 /**
780  * Dequeue one object from a ring (multi-consumers safe).
781  *
782  * This function uses a "compare and set" instruction to move the
783  * consumer index atomically.
784  *
785  * @param r
786  *   A pointer to the ring structure.
787  * @param obj_p
788  *   A pointer to the object that will be filled.
789  * @param esize
790  *   The size of ring element, in bytes. It must be a multiple of 4.
791  *   This must be the same value used while creating the ring. Otherwise
792  *   the results are undefined.
793  * @return
794  *   - 0: Success; objects dequeued.
795  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
796  *     dequeued.
797  */
798 static __rte_always_inline int
799 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
800                                 unsigned int esize)
801 {
802         return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
803                                                                 -ENOENT;
804 }
805
806 /**
807  * Dequeue one object from a ring (NOT multi-consumers safe).
808  *
809  * @param r
810  *   A pointer to the ring structure.
811  * @param obj_p
812  *   A pointer to the object that will be filled.
813  * @param esize
814  *   The size of ring element, in bytes. It must be a multiple of 4.
815  *   This must be the same value used while creating the ring. Otherwise
816  *   the results are undefined.
817  * @return
818  *   - 0: Success; objects dequeued.
819  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
820  *     dequeued.
821  */
822 static __rte_always_inline int
823 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
824                                 unsigned int esize)
825 {
826         return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
827                                                                 -ENOENT;
828 }
829
830 /**
831  * Dequeue one object from a ring.
832  *
833  * This function calls the multi-consumers or the single-consumer
834  * version depending on the default behaviour that was specified at
835  * ring creation time (see flags).
836  *
837  * @param r
838  *   A pointer to the ring structure.
839  * @param obj_p
840  *   A pointer to the object that will be filled.
841  * @param esize
842  *   The size of ring element, in bytes. It must be a multiple of 4.
843  *   This must be the same value used while creating the ring. Otherwise
844  *   the results are undefined.
845  * @return
846  *   - 0: Success, objects dequeued.
847  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
848  *     dequeued.
849  */
850 static __rte_always_inline int
851 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
852 {
853         return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
854                                                                 -ENOENT;
855 }
856
857 /**
858  * Enqueue several objects on the ring (multi-producers safe).
859  *
860  * This function uses a "compare and set" instruction to move the
861  * producer index atomically.
862  *
863  * @param r
864  *   A pointer to the ring structure.
865  * @param obj_table
866  *   A pointer to a table of objects.
867  * @param esize
868  *   The size of ring element, in bytes. It must be a multiple of 4.
869  *   This must be the same value used while creating the ring. Otherwise
870  *   the results are undefined.
871  * @param n
872  *   The number of objects to add in the ring from the obj_table.
873  * @param free_space
874  *   if non-NULL, returns the amount of space in the ring after the
875  *   enqueue operation has finished.
876  * @return
877  *   - n: Actual number of objects enqueued.
878  */
879 static __rte_always_inline unsigned
880 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
881                 unsigned int esize, unsigned int n, unsigned int *free_space)
882 {
883         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
884                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
885 }
886
887 /**
888  * Enqueue several objects on a ring
889  *
890  * @warning This API is NOT multi-producers safe
891  *
892  * @param r
893  *   A pointer to the ring structure.
894  * @param obj_table
895  *   A pointer to a table of objects.
896  * @param esize
897  *   The size of ring element, in bytes. It must be a multiple of 4.
898  *   This must be the same value used while creating the ring. Otherwise
899  *   the results are undefined.
900  * @param n
901  *   The number of objects to add in the ring from the obj_table.
902  * @param free_space
903  *   if non-NULL, returns the amount of space in the ring after the
904  *   enqueue operation has finished.
905  * @return
906  *   - n: Actual number of objects enqueued.
907  */
908 static __rte_always_inline unsigned
909 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
910                 unsigned int esize, unsigned int n, unsigned int *free_space)
911 {
912         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
913                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
914 }
915
916 /**
917  * Enqueue several objects on a ring.
918  *
919  * This function calls the multi-producer or the single-producer
920  * version depending on the default behavior that was specified at
921  * ring creation time (see flags).
922  *
923  * @param r
924  *   A pointer to the ring structure.
925  * @param obj_table
926  *   A pointer to a table of objects.
927  * @param esize
928  *   The size of ring element, in bytes. It must be a multiple of 4.
929  *   This must be the same value used while creating the ring. Otherwise
930  *   the results are undefined.
931  * @param n
932  *   The number of objects to add in the ring from the obj_table.
933  * @param free_space
934  *   if non-NULL, returns the amount of space in the ring after the
935  *   enqueue operation has finished.
936  * @return
937  *   - n: Actual number of objects enqueued.
938  */
939 static __rte_always_inline unsigned
940 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
941                 unsigned int esize, unsigned int n, unsigned int *free_space)
942 {
943         switch (r->prod.sync_type) {
944         case RTE_RING_SYNC_MT:
945                 return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n,
946                         free_space);
947         case RTE_RING_SYNC_ST:
948                 return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n,
949                         free_space);
950 #ifdef ALLOW_EXPERIMENTAL_API
951         case RTE_RING_SYNC_MT_RTS:
952                 return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize,
953                         n, free_space);
954 #endif
955         }
956
957         /* valid ring should never reach this point */
958         RTE_ASSERT(0);
959         if (free_space != NULL)
960                 *free_space = 0;
961         return 0;
962 }
963
964 /**
965  * Dequeue several objects from a ring (multi-consumers safe). When the request
966  * objects are more than the available objects, only dequeue the actual number
967  * of objects
968  *
969  * This function uses a "compare and set" instruction to move the
970  * consumer index atomically.
971  *
972  * @param r
973  *   A pointer to the ring structure.
974  * @param obj_table
975  *   A pointer to a table of objects that will be filled.
976  * @param esize
977  *   The size of ring element, in bytes. It must be a multiple of 4.
978  *   This must be the same value used while creating the ring. Otherwise
979  *   the results are undefined.
980  * @param n
981  *   The number of objects to dequeue from the ring to the obj_table.
982  * @param available
983  *   If non-NULL, returns the number of remaining ring entries after the
984  *   dequeue has finished.
985  * @return
986  *   - n: Actual number of objects dequeued, 0 if ring is empty
987  */
988 static __rte_always_inline unsigned
989 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
990                 unsigned int esize, unsigned int n, unsigned int *available)
991 {
992         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
993                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
994 }
995
996 /**
997  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
998  * request objects are more than the available objects, only dequeue the
999  * actual number of objects
1000  *
1001  * @param r
1002  *   A pointer to the ring structure.
1003  * @param obj_table
1004  *   A pointer to a table of objects that will be filled.
1005  * @param esize
1006  *   The size of ring element, in bytes. It must be a multiple of 4.
1007  *   This must be the same value used while creating the ring. Otherwise
1008  *   the results are undefined.
1009  * @param n
1010  *   The number of objects to dequeue from the ring to the obj_table.
1011  * @param available
1012  *   If non-NULL, returns the number of remaining ring entries after the
1013  *   dequeue has finished.
1014  * @return
1015  *   - n: Actual number of objects dequeued, 0 if ring is empty
1016  */
1017 static __rte_always_inline unsigned
1018 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
1019                 unsigned int esize, unsigned int n, unsigned int *available)
1020 {
1021         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
1022                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
1023 }
1024
1025 /**
1026  * Dequeue multiple objects from a ring up to a maximum number.
1027  *
1028  * This function calls the multi-consumers or the single-consumer
1029  * version, depending on the default behaviour that was specified at
1030  * ring creation time (see flags).
1031  *
1032  * @param r
1033  *   A pointer to the ring structure.
1034  * @param obj_table
1035  *   A pointer to a table of objects that will be filled.
1036  * @param esize
1037  *   The size of ring element, in bytes. It must be a multiple of 4.
1038  *   This must be the same value used while creating the ring. Otherwise
1039  *   the results are undefined.
1040  * @param n
1041  *   The number of objects to dequeue from the ring to the obj_table.
1042  * @param available
1043  *   If non-NULL, returns the number of remaining ring entries after the
1044  *   dequeue has finished.
1045  * @return
1046  *   - Number of objects dequeued
1047  */
1048 static __rte_always_inline unsigned int
1049 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
1050                 unsigned int esize, unsigned int n, unsigned int *available)
1051 {
1052         switch (r->cons.sync_type) {
1053         case RTE_RING_SYNC_MT:
1054                 return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n,
1055                         available);
1056         case RTE_RING_SYNC_ST:
1057                 return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n,
1058                         available);
1059 #ifdef ALLOW_EXPERIMENTAL_API
1060         case RTE_RING_SYNC_MT_RTS:
1061                 return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize,
1062                         n, available);
1063 #endif
1064         }
1065
1066         /* valid ring should never reach this point */
1067         RTE_ASSERT(0);
1068         if (available != NULL)
1069                 *available = 0;
1070         return 0;
1071 }
1072
1073 #include <rte_ring.h>
1074
1075 #ifdef __cplusplus
1076 }
1077 #endif
1078
1079 #endif /* _RTE_RING_ELEM_H_ */