3976757edc0b3d07047bc64a5daaf82cbf0388b4
[dpdk.git] / lib / librte_ring / rte_ring_elem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13
14 /**
15  * @file
16  * RTE Ring with user defined element size
17  */
18
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <sys/queue.h>
27 #include <errno.h>
28 #include <rte_common.h>
29 #include <rte_config.h>
30 #include <rte_memory.h>
31 #include <rte_lcore.h>
32 #include <rte_atomic.h>
33 #include <rte_branch_prediction.h>
34 #include <rte_memzone.h>
35 #include <rte_pause.h>
36
37 #include "rte_ring.h"
38
39 /**
40  * @warning
41  * @b EXPERIMENTAL: this API may change without prior notice
42  *
43  * Calculate the memory size needed for a ring with given element size
44  *
45  * This function returns the number of bytes needed for a ring, given
46  * the number of elements in it and the size of the element. This value
47  * is the sum of the size of the structure rte_ring and the size of the
48  * memory needed for storing the elements. The value is aligned to a cache
49  * line size.
50  *
51  * @param esize
52  *   The size of ring element, in bytes. It must be a multiple of 4.
53  * @param count
54  *   The number of elements in the ring (must be a power of 2).
55  * @return
56  *   - The memory size needed for the ring on success.
57  *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
58  *               power of 2.
59  */
60 __rte_experimental
61 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
62
63 /**
64  * @warning
65  * @b EXPERIMENTAL: this API may change without prior notice
66  *
67  * Create a new ring named *name* that stores elements with given size.
68  *
69  * This function uses ``memzone_reserve()`` to allocate memory. Then it
70  * calls rte_ring_init() to initialize an empty ring.
71  *
72  * The new ring size is set to *count*, which must be a power of
73  * two. Water marking is disabled by default. The real usable ring size
74  * is *count-1* instead of *count* to differentiate a free ring from an
75  * empty ring.
76  *
77  * The ring is added in RTE_TAILQ_RING list.
78  *
79  * @param name
80  *   The name of the ring.
81  * @param esize
82  *   The size of ring element, in bytes. It must be a multiple of 4.
83  * @param count
84  *   The number of elements in the ring (must be a power of 2).
85  * @param socket_id
86  *   The *socket_id* argument is the socket identifier in case of
87  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
88  *   constraint for the reserved zone.
89  * @param flags
90  *   An OR of the following:
91  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
92  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
93  *      is "single-producer". Otherwise, it is "multi-producers".
94  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
95  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
96  *      is "single-consumer". Otherwise, it is "multi-consumers".
97  * @return
98  *   On success, the pointer to the new allocated ring. NULL on error with
99  *    rte_errno set appropriately. Possible errno values include:
100  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
101  *    - E_RTE_SECONDARY - function was called from a secondary process instance
102  *    - EINVAL - esize is not a multiple of 4 or count provided is not a
103  *               power of 2.
104  *    - ENOSPC - the maximum number of memzones has already been allocated
105  *    - EEXIST - a memzone with the same name already exists
106  *    - ENOMEM - no appropriate memory area found in which to create memzone
107  */
108 __rte_experimental
109 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
110                         unsigned int count, int socket_id, unsigned int flags);
111
112 static __rte_always_inline void
113 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
114                 uint32_t idx, const void *obj_table, uint32_t n)
115 {
116         unsigned int i;
117         uint32_t *ring = (uint32_t *)&r[1];
118         const uint32_t *obj = (const uint32_t *)obj_table;
119         if (likely(idx + n < size)) {
120                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
121                         ring[idx] = obj[i];
122                         ring[idx + 1] = obj[i + 1];
123                         ring[idx + 2] = obj[i + 2];
124                         ring[idx + 3] = obj[i + 3];
125                         ring[idx + 4] = obj[i + 4];
126                         ring[idx + 5] = obj[i + 5];
127                         ring[idx + 6] = obj[i + 6];
128                         ring[idx + 7] = obj[i + 7];
129                 }
130                 switch (n & 0x7) {
131                 case 7:
132                         ring[idx++] = obj[i++]; /* fallthrough */
133                 case 6:
134                         ring[idx++] = obj[i++]; /* fallthrough */
135                 case 5:
136                         ring[idx++] = obj[i++]; /* fallthrough */
137                 case 4:
138                         ring[idx++] = obj[i++]; /* fallthrough */
139                 case 3:
140                         ring[idx++] = obj[i++]; /* fallthrough */
141                 case 2:
142                         ring[idx++] = obj[i++]; /* fallthrough */
143                 case 1:
144                         ring[idx++] = obj[i++]; /* fallthrough */
145                 }
146         } else {
147                 for (i = 0; idx < size; i++, idx++)
148                         ring[idx] = obj[i];
149                 /* Start at the beginning */
150                 for (idx = 0; i < n; i++, idx++)
151                         ring[idx] = obj[i];
152         }
153 }
154
155 static __rte_always_inline void
156 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
157                 const void *obj_table, uint32_t n)
158 {
159         unsigned int i;
160         const uint32_t size = r->size;
161         uint32_t idx = prod_head & r->mask;
162         uint64_t *ring = (uint64_t *)&r[1];
163         const uint64_t *obj = (const uint64_t *)obj_table;
164         if (likely(idx + n < size)) {
165                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
166                         ring[idx] = obj[i];
167                         ring[idx + 1] = obj[i + 1];
168                         ring[idx + 2] = obj[i + 2];
169                         ring[idx + 3] = obj[i + 3];
170                 }
171                 switch (n & 0x3) {
172                 case 3:
173                         ring[idx++] = obj[i++]; /* fallthrough */
174                 case 2:
175                         ring[idx++] = obj[i++]; /* fallthrough */
176                 case 1:
177                         ring[idx++] = obj[i++];
178                 }
179         } else {
180                 for (i = 0; idx < size; i++, idx++)
181                         ring[idx] = obj[i];
182                 /* Start at the beginning */
183                 for (idx = 0; i < n; i++, idx++)
184                         ring[idx] = obj[i];
185         }
186 }
187
188 static __rte_always_inline void
189 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
190                 const void *obj_table, uint32_t n)
191 {
192         unsigned int i;
193         const uint32_t size = r->size;
194         uint32_t idx = prod_head & r->mask;
195         rte_int128_t *ring = (rte_int128_t *)&r[1];
196         const rte_int128_t *obj = (const rte_int128_t *)obj_table;
197         if (likely(idx + n < size)) {
198                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
199                         memcpy((void *)(ring + idx),
200                                 (const void *)(obj + i), 32);
201                 switch (n & 0x1) {
202                 case 1:
203                         memcpy((void *)(ring + idx),
204                                 (const void *)(obj + i), 16);
205                 }
206         } else {
207                 for (i = 0; idx < size; i++, idx++)
208                         memcpy((void *)(ring + idx),
209                                 (const void *)(obj + i), 16);
210                 /* Start at the beginning */
211                 for (idx = 0; i < n; i++, idx++)
212                         memcpy((void *)(ring + idx),
213                                 (const void *)(obj + i), 16);
214         }
215 }
216
217 /* the actual enqueue of elements on the ring.
218  * Placed here since identical code needed in both
219  * single and multi producer enqueue functions.
220  */
221 static __rte_always_inline void
222 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
223                 const void *obj_table, uint32_t esize, uint32_t num)
224 {
225         /* 8B and 16B copies implemented individually to retain
226          * the current performance.
227          */
228         if (esize == 8)
229                 __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
230         else if (esize == 16)
231                 __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
232         else {
233                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
234
235                 /* Normalize to uint32_t */
236                 scale = esize / sizeof(uint32_t);
237                 nr_num = num * scale;
238                 idx = prod_head & r->mask;
239                 nr_idx = idx * scale;
240                 nr_size = r->size * scale;
241                 __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
242                                 obj_table, nr_num);
243         }
244 }
245
246 static __rte_always_inline void
247 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
248                 uint32_t idx, void *obj_table, uint32_t n)
249 {
250         unsigned int i;
251         uint32_t *ring = (uint32_t *)&r[1];
252         uint32_t *obj = (uint32_t *)obj_table;
253         if (likely(idx + n < size)) {
254                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
255                         obj[i] = ring[idx];
256                         obj[i + 1] = ring[idx + 1];
257                         obj[i + 2] = ring[idx + 2];
258                         obj[i + 3] = ring[idx + 3];
259                         obj[i + 4] = ring[idx + 4];
260                         obj[i + 5] = ring[idx + 5];
261                         obj[i + 6] = ring[idx + 6];
262                         obj[i + 7] = ring[idx + 7];
263                 }
264                 switch (n & 0x7) {
265                 case 7:
266                         obj[i++] = ring[idx++]; /* fallthrough */
267                 case 6:
268                         obj[i++] = ring[idx++]; /* fallthrough */
269                 case 5:
270                         obj[i++] = ring[idx++]; /* fallthrough */
271                 case 4:
272                         obj[i++] = ring[idx++]; /* fallthrough */
273                 case 3:
274                         obj[i++] = ring[idx++]; /* fallthrough */
275                 case 2:
276                         obj[i++] = ring[idx++]; /* fallthrough */
277                 case 1:
278                         obj[i++] = ring[idx++]; /* fallthrough */
279                 }
280         } else {
281                 for (i = 0; idx < size; i++, idx++)
282                         obj[i] = ring[idx];
283                 /* Start at the beginning */
284                 for (idx = 0; i < n; i++, idx++)
285                         obj[i] = ring[idx];
286         }
287 }
288
289 static __rte_always_inline void
290 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
291                 void *obj_table, uint32_t n)
292 {
293         unsigned int i;
294         const uint32_t size = r->size;
295         uint32_t idx = prod_head & r->mask;
296         uint64_t *ring = (uint64_t *)&r[1];
297         uint64_t *obj = (uint64_t *)obj_table;
298         if (likely(idx + n < size)) {
299                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
300                         obj[i] = ring[idx];
301                         obj[i + 1] = ring[idx + 1];
302                         obj[i + 2] = ring[idx + 2];
303                         obj[i + 3] = ring[idx + 3];
304                 }
305                 switch (n & 0x3) {
306                 case 3:
307                         obj[i++] = ring[idx++]; /* fallthrough */
308                 case 2:
309                         obj[i++] = ring[idx++]; /* fallthrough */
310                 case 1:
311                         obj[i++] = ring[idx++]; /* fallthrough */
312                 }
313         } else {
314                 for (i = 0; idx < size; i++, idx++)
315                         obj[i] = ring[idx];
316                 /* Start at the beginning */
317                 for (idx = 0; i < n; i++, idx++)
318                         obj[i] = ring[idx];
319         }
320 }
321
322 static __rte_always_inline void
323 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
324                 void *obj_table, uint32_t n)
325 {
326         unsigned int i;
327         const uint32_t size = r->size;
328         uint32_t idx = prod_head & r->mask;
329         rte_int128_t *ring = (rte_int128_t *)&r[1];
330         rte_int128_t *obj = (rte_int128_t *)obj_table;
331         if (likely(idx + n < size)) {
332                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
333                         memcpy((void *)(obj + i), (void *)(ring + idx), 32);
334                 switch (n & 0x1) {
335                 case 1:
336                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
337                 }
338         } else {
339                 for (i = 0; idx < size; i++, idx++)
340                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
341                 /* Start at the beginning */
342                 for (idx = 0; i < n; i++, idx++)
343                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
344         }
345 }
346
347 /* the actual dequeue of elements from the ring.
348  * Placed here since identical code needed in both
349  * single and multi producer enqueue functions.
350  */
351 static __rte_always_inline void
352 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
353                 void *obj_table, uint32_t esize, uint32_t num)
354 {
355         /* 8B and 16B copies implemented individually to retain
356          * the current performance.
357          */
358         if (esize == 8)
359                 __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
360         else if (esize == 16)
361                 __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
362         else {
363                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
364
365                 /* Normalize to uint32_t */
366                 scale = esize / sizeof(uint32_t);
367                 nr_num = num * scale;
368                 idx = cons_head & r->mask;
369                 nr_idx = idx * scale;
370                 nr_size = r->size * scale;
371                 __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
372                                 obj_table, nr_num);
373         }
374 }
375
376 /* Between load and load. there might be cpu reorder in weak model
377  * (powerpc/arm).
378  * There are 2 choices for the users
379  * 1.use rmb() memory barrier
380  * 2.use one-direction load_acquire/store_release barrier,defined by
381  * CONFIG_RTE_USE_C11_MEM_MODEL=y
382  * It depends on performance test results.
383  * By default, move common functions to rte_ring_generic.h
384  */
385 #ifdef RTE_USE_C11_MEM_MODEL
386 #include "rte_ring_c11_mem.h"
387 #else
388 #include "rte_ring_generic.h"
389 #endif
390
391 /**
392  * @internal Enqueue several objects on the ring
393  *
394  * @param r
395  *   A pointer to the ring structure.
396  * @param obj_table
397  *   A pointer to a table of objects.
398  * @param esize
399  *   The size of ring element, in bytes. It must be a multiple of 4.
400  *   This must be the same value used while creating the ring. Otherwise
401  *   the results are undefined.
402  * @param n
403  *   The number of objects to add in the ring from the obj_table.
404  * @param behavior
405  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
406  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
407  * @param is_sp
408  *   Indicates whether to use single producer or multi-producer head update
409  * @param free_space
410  *   returns the amount of space after the enqueue operation has finished
411  * @return
412  *   Actual number of objects enqueued.
413  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
414  */
415 static __rte_always_inline unsigned int
416 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
417                 unsigned int esize, unsigned int n,
418                 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
419                 unsigned int *free_space)
420 {
421         uint32_t prod_head, prod_next;
422         uint32_t free_entries;
423
424         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
425                         &prod_head, &prod_next, &free_entries);
426         if (n == 0)
427                 goto end;
428
429         __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
430
431         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
432 end:
433         if (free_space != NULL)
434                 *free_space = free_entries - n;
435         return n;
436 }
437
438 /**
439  * @internal Dequeue several objects from the ring
440  *
441  * @param r
442  *   A pointer to the ring structure.
443  * @param obj_table
444  *   A pointer to a table of objects.
445  * @param esize
446  *   The size of ring element, in bytes. It must be a multiple of 4.
447  *   This must be the same value used while creating the ring. Otherwise
448  *   the results are undefined.
449  * @param n
450  *   The number of objects to pull from the ring.
451  * @param behavior
452  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
453  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
454  * @param is_sc
455  *   Indicates whether to use single consumer or multi-consumer head update
456  * @param available
457  *   returns the number of remaining ring entries after the dequeue has finished
458  * @return
459  *   - Actual number of objects dequeued.
460  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
461  */
462 static __rte_always_inline unsigned int
463 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
464                 unsigned int esize, unsigned int n,
465                 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
466                 unsigned int *available)
467 {
468         uint32_t cons_head, cons_next;
469         uint32_t entries;
470
471         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
472                         &cons_head, &cons_next, &entries);
473         if (n == 0)
474                 goto end;
475
476         __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
477
478         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
479
480 end:
481         if (available != NULL)
482                 *available = entries - n;
483         return n;
484 }
485
486 /**
487  * Enqueue several objects on the ring (multi-producers safe).
488  *
489  * This function uses a "compare and set" instruction to move the
490  * producer index atomically.
491  *
492  * @param r
493  *   A pointer to the ring structure.
494  * @param obj_table
495  *   A pointer to a table of objects.
496  * @param esize
497  *   The size of ring element, in bytes. It must be a multiple of 4.
498  *   This must be the same value used while creating the ring. Otherwise
499  *   the results are undefined.
500  * @param n
501  *   The number of objects to add in the ring from the obj_table.
502  * @param free_space
503  *   if non-NULL, returns the amount of space in the ring after the
504  *   enqueue operation has finished.
505  * @return
506  *   The number of objects enqueued, either 0 or n
507  */
508 static __rte_always_inline unsigned int
509 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
510                 unsigned int esize, unsigned int n, unsigned int *free_space)
511 {
512         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
513                         RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
514 }
515
516 /**
517  * Enqueue several objects on a ring
518  *
519  * @warning This API is NOT multi-producers safe
520  *
521  * @param r
522  *   A pointer to the ring structure.
523  * @param obj_table
524  *   A pointer to a table of objects.
525  * @param esize
526  *   The size of ring element, in bytes. It must be a multiple of 4.
527  *   This must be the same value used while creating the ring. Otherwise
528  *   the results are undefined.
529  * @param n
530  *   The number of objects to add in the ring from the obj_table.
531  * @param free_space
532  *   if non-NULL, returns the amount of space in the ring after the
533  *   enqueue operation has finished.
534  * @return
535  *   The number of objects enqueued, either 0 or n
536  */
537 static __rte_always_inline unsigned int
538 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
539                 unsigned int esize, unsigned int n, unsigned int *free_space)
540 {
541         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
542                         RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
543 }
544
545 /**
546  * Enqueue several objects on a ring.
547  *
548  * This function calls the multi-producer or the single-producer
549  * version depending on the default behavior that was specified at
550  * ring creation time (see flags).
551  *
552  * @param r
553  *   A pointer to the ring structure.
554  * @param obj_table
555  *   A pointer to a table of objects.
556  * @param esize
557  *   The size of ring element, in bytes. It must be a multiple of 4.
558  *   This must be the same value used while creating the ring. Otherwise
559  *   the results are undefined.
560  * @param n
561  *   The number of objects to add in the ring from the obj_table.
562  * @param free_space
563  *   if non-NULL, returns the amount of space in the ring after the
564  *   enqueue operation has finished.
565  * @return
566  *   The number of objects enqueued, either 0 or n
567  */
568 static __rte_always_inline unsigned int
569 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
570                 unsigned int esize, unsigned int n, unsigned int *free_space)
571 {
572         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
573                         RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
574 }
575
576 /**
577  * Enqueue one object on a ring (multi-producers safe).
578  *
579  * This function uses a "compare and set" instruction to move the
580  * producer index atomically.
581  *
582  * @param r
583  *   A pointer to the ring structure.
584  * @param obj
585  *   A pointer to the object to be added.
586  * @param esize
587  *   The size of ring element, in bytes. It must be a multiple of 4.
588  *   This must be the same value used while creating the ring. Otherwise
589  *   the results are undefined.
590  * @return
591  *   - 0: Success; objects enqueued.
592  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
593  */
594 static __rte_always_inline int
595 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
596 {
597         return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
598                                                                 -ENOBUFS;
599 }
600
601 /**
602  * Enqueue one object on a ring
603  *
604  * @warning This API is NOT multi-producers safe
605  *
606  * @param r
607  *   A pointer to the ring structure.
608  * @param obj
609  *   A pointer to the object to be added.
610  * @param esize
611  *   The size of ring element, in bytes. It must be a multiple of 4.
612  *   This must be the same value used while creating the ring. Otherwise
613  *   the results are undefined.
614  * @return
615  *   - 0: Success; objects enqueued.
616  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
617  */
618 static __rte_always_inline int
619 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
620 {
621         return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
622                                                                 -ENOBUFS;
623 }
624
625 /**
626  * Enqueue one object on a ring.
627  *
628  * This function calls the multi-producer or the single-producer
629  * version, depending on the default behaviour that was specified at
630  * ring creation time (see flags).
631  *
632  * @param r
633  *   A pointer to the ring structure.
634  * @param obj
635  *   A pointer to the object to be added.
636  * @param esize
637  *   The size of ring element, in bytes. It must be a multiple of 4.
638  *   This must be the same value used while creating the ring. Otherwise
639  *   the results are undefined.
640  * @return
641  *   - 0: Success; objects enqueued.
642  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
643  */
644 static __rte_always_inline int
645 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
646 {
647         return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
648                                                                 -ENOBUFS;
649 }
650
651 /**
652  * Dequeue several objects from a ring (multi-consumers safe).
653  *
654  * This function uses a "compare and set" instruction to move the
655  * consumer index atomically.
656  *
657  * @param r
658  *   A pointer to the ring structure.
659  * @param obj_table
660  *   A pointer to a table of objects that will be filled.
661  * @param esize
662  *   The size of ring element, in bytes. It must be a multiple of 4.
663  *   This must be the same value used while creating the ring. Otherwise
664  *   the results are undefined.
665  * @param n
666  *   The number of objects to dequeue from the ring to the obj_table.
667  * @param available
668  *   If non-NULL, returns the number of remaining ring entries after the
669  *   dequeue has finished.
670  * @return
671  *   The number of objects dequeued, either 0 or n
672  */
673 static __rte_always_inline unsigned int
674 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
675                 unsigned int esize, unsigned int n, unsigned int *available)
676 {
677         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
678                                 RTE_RING_QUEUE_FIXED, __IS_MC, available);
679 }
680
681 /**
682  * Dequeue several objects from a ring (NOT multi-consumers safe).
683  *
684  * @param r
685  *   A pointer to the ring structure.
686  * @param obj_table
687  *   A pointer to a table of objects that will be filled.
688  * @param esize
689  *   The size of ring element, in bytes. It must be a multiple of 4.
690  *   This must be the same value used while creating the ring. Otherwise
691  *   the results are undefined.
692  * @param n
693  *   The number of objects to dequeue from the ring to the obj_table,
694  *   must be strictly positive.
695  * @param available
696  *   If non-NULL, returns the number of remaining ring entries after the
697  *   dequeue has finished.
698  * @return
699  *   The number of objects dequeued, either 0 or n
700  */
701 static __rte_always_inline unsigned int
702 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
703                 unsigned int esize, unsigned int n, unsigned int *available)
704 {
705         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
706                         RTE_RING_QUEUE_FIXED, __IS_SC, available);
707 }
708
709 /**
710  * Dequeue several objects from a ring.
711  *
712  * This function calls the multi-consumers or the single-consumer
713  * version, depending on the default behaviour that was specified at
714  * ring creation time (see flags).
715  *
716  * @param r
717  *   A pointer to the ring structure.
718  * @param obj_table
719  *   A pointer to a table of objects that will be filled.
720  * @param esize
721  *   The size of ring element, in bytes. It must be a multiple of 4.
722  *   This must be the same value used while creating the ring. Otherwise
723  *   the results are undefined.
724  * @param n
725  *   The number of objects to dequeue from the ring to the obj_table.
726  * @param available
727  *   If non-NULL, returns the number of remaining ring entries after the
728  *   dequeue has finished.
729  * @return
730  *   The number of objects dequeued, either 0 or n
731  */
732 static __rte_always_inline unsigned int
733 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
734                 unsigned int esize, unsigned int n, unsigned int *available)
735 {
736         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
737                         RTE_RING_QUEUE_FIXED, r->cons.single, available);
738 }
739
740 /**
741  * Dequeue one object from a ring (multi-consumers safe).
742  *
743  * This function uses a "compare and set" instruction to move the
744  * consumer index atomically.
745  *
746  * @param r
747  *   A pointer to the ring structure.
748  * @param obj_p
749  *   A pointer to the object that will be filled.
750  * @param esize
751  *   The size of ring element, in bytes. It must be a multiple of 4.
752  *   This must be the same value used while creating the ring. Otherwise
753  *   the results are undefined.
754  * @return
755  *   - 0: Success; objects dequeued.
756  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
757  *     dequeued.
758  */
759 static __rte_always_inline int
760 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
761                                 unsigned int esize)
762 {
763         return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
764                                                                 -ENOENT;
765 }
766
767 /**
768  * Dequeue one object from a ring (NOT multi-consumers safe).
769  *
770  * @param r
771  *   A pointer to the ring structure.
772  * @param obj_p
773  *   A pointer to the object that will be filled.
774  * @param esize
775  *   The size of ring element, in bytes. It must be a multiple of 4.
776  *   This must be the same value used while creating the ring. Otherwise
777  *   the results are undefined.
778  * @return
779  *   - 0: Success; objects dequeued.
780  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
781  *     dequeued.
782  */
783 static __rte_always_inline int
784 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
785                                 unsigned int esize)
786 {
787         return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
788                                                                 -ENOENT;
789 }
790
791 /**
792  * Dequeue one object from a ring.
793  *
794  * This function calls the multi-consumers or the single-consumer
795  * version depending on the default behaviour that was specified at
796  * ring creation time (see flags).
797  *
798  * @param r
799  *   A pointer to the ring structure.
800  * @param obj_p
801  *   A pointer to the object that will be filled.
802  * @param esize
803  *   The size of ring element, in bytes. It must be a multiple of 4.
804  *   This must be the same value used while creating the ring. Otherwise
805  *   the results are undefined.
806  * @return
807  *   - 0: Success, objects dequeued.
808  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
809  *     dequeued.
810  */
811 static __rte_always_inline int
812 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
813 {
814         return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
815                                                                 -ENOENT;
816 }
817
818 /**
819  * Enqueue several objects on the ring (multi-producers safe).
820  *
821  * This function uses a "compare and set" instruction to move the
822  * producer index atomically.
823  *
824  * @param r
825  *   A pointer to the ring structure.
826  * @param obj_table
827  *   A pointer to a table of objects.
828  * @param esize
829  *   The size of ring element, in bytes. It must be a multiple of 4.
830  *   This must be the same value used while creating the ring. Otherwise
831  *   the results are undefined.
832  * @param n
833  *   The number of objects to add in the ring from the obj_table.
834  * @param free_space
835  *   if non-NULL, returns the amount of space in the ring after the
836  *   enqueue operation has finished.
837  * @return
838  *   - n: Actual number of objects enqueued.
839  */
840 static __rte_always_inline unsigned
841 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
842                 unsigned int esize, unsigned int n, unsigned int *free_space)
843 {
844         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
845                         RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
846 }
847
848 /**
849  * Enqueue several objects on a ring
850  *
851  * @warning This API is NOT multi-producers safe
852  *
853  * @param r
854  *   A pointer to the ring structure.
855  * @param obj_table
856  *   A pointer to a table of objects.
857  * @param esize
858  *   The size of ring element, in bytes. It must be a multiple of 4.
859  *   This must be the same value used while creating the ring. Otherwise
860  *   the results are undefined.
861  * @param n
862  *   The number of objects to add in the ring from the obj_table.
863  * @param free_space
864  *   if non-NULL, returns the amount of space in the ring after the
865  *   enqueue operation has finished.
866  * @return
867  *   - n: Actual number of objects enqueued.
868  */
869 static __rte_always_inline unsigned
870 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
871                 unsigned int esize, unsigned int n, unsigned int *free_space)
872 {
873         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
874                         RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
875 }
876
877 /**
878  * Enqueue several objects on a ring.
879  *
880  * This function calls the multi-producer or the single-producer
881  * version depending on the default behavior that was specified at
882  * ring creation time (see flags).
883  *
884  * @param r
885  *   A pointer to the ring structure.
886  * @param obj_table
887  *   A pointer to a table of objects.
888  * @param esize
889  *   The size of ring element, in bytes. It must be a multiple of 4.
890  *   This must be the same value used while creating the ring. Otherwise
891  *   the results are undefined.
892  * @param n
893  *   The number of objects to add in the ring from the obj_table.
894  * @param free_space
895  *   if non-NULL, returns the amount of space in the ring after the
896  *   enqueue operation has finished.
897  * @return
898  *   - n: Actual number of objects enqueued.
899  */
900 static __rte_always_inline unsigned
901 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
902                 unsigned int esize, unsigned int n, unsigned int *free_space)
903 {
904         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
905                         RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
906 }
907
908 /**
909  * Dequeue several objects from a ring (multi-consumers safe). When the request
910  * objects are more than the available objects, only dequeue the actual number
911  * of objects
912  *
913  * This function uses a "compare and set" instruction to move the
914  * consumer index atomically.
915  *
916  * @param r
917  *   A pointer to the ring structure.
918  * @param obj_table
919  *   A pointer to a table of objects that will be filled.
920  * @param esize
921  *   The size of ring element, in bytes. It must be a multiple of 4.
922  *   This must be the same value used while creating the ring. Otherwise
923  *   the results are undefined.
924  * @param n
925  *   The number of objects to dequeue from the ring to the obj_table.
926  * @param available
927  *   If non-NULL, returns the number of remaining ring entries after the
928  *   dequeue has finished.
929  * @return
930  *   - n: Actual number of objects dequeued, 0 if ring is empty
931  */
932 static __rte_always_inline unsigned
933 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
934                 unsigned int esize, unsigned int n, unsigned int *available)
935 {
936         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
937                         RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
938 }
939
940 /**
941  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
942  * request objects are more than the available objects, only dequeue the
943  * actual number of objects
944  *
945  * @param r
946  *   A pointer to the ring structure.
947  * @param obj_table
948  *   A pointer to a table of objects that will be filled.
949  * @param esize
950  *   The size of ring element, in bytes. It must be a multiple of 4.
951  *   This must be the same value used while creating the ring. Otherwise
952  *   the results are undefined.
953  * @param n
954  *   The number of objects to dequeue from the ring to the obj_table.
955  * @param available
956  *   If non-NULL, returns the number of remaining ring entries after the
957  *   dequeue has finished.
958  * @return
959  *   - n: Actual number of objects dequeued, 0 if ring is empty
960  */
961 static __rte_always_inline unsigned
962 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
963                 unsigned int esize, unsigned int n, unsigned int *available)
964 {
965         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
966                         RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
967 }
968
969 /**
970  * Dequeue multiple objects from a ring up to a maximum number.
971  *
972  * This function calls the multi-consumers or the single-consumer
973  * version, depending on the default behaviour that was specified at
974  * ring creation time (see flags).
975  *
976  * @param r
977  *   A pointer to the ring structure.
978  * @param obj_table
979  *   A pointer to a table of objects that will be filled.
980  * @param esize
981  *   The size of ring element, in bytes. It must be a multiple of 4.
982  *   This must be the same value used while creating the ring. Otherwise
983  *   the results are undefined.
984  * @param n
985  *   The number of objects to dequeue from the ring to the obj_table.
986  * @param available
987  *   If non-NULL, returns the number of remaining ring entries after the
988  *   dequeue has finished.
989  * @return
990  *   - Number of objects dequeued
991  */
992 static __rte_always_inline unsigned int
993 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
994                 unsigned int esize, unsigned int n, unsigned int *available)
995 {
996         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
997                                 RTE_RING_QUEUE_VARIABLE,
998                                 r->cons.single, available);
999 }
1000
1001 #ifdef __cplusplus
1002 }
1003 #endif
1004
1005 #endif /* _RTE_RING_ELEM_H_ */