ring: support configurable element size
[dpdk.git] / lib / librte_ring / rte_ring_elem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13
14 /**
15  * @file
16  * RTE Ring with user defined element size
17  */
18
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <sys/queue.h>
27 #include <errno.h>
28 #include <rte_common.h>
29 #include <rte_config.h>
30 #include <rte_memory.h>
31 #include <rte_lcore.h>
32 #include <rte_atomic.h>
33 #include <rte_branch_prediction.h>
34 #include <rte_memzone.h>
35 #include <rte_pause.h>
36
37 #include "rte_ring.h"
38
39 /**
40  * @warning
41  * @b EXPERIMENTAL: this API may change without prior notice
42  *
43  * Calculate the memory size needed for a ring with given element size
44  *
45  * This function returns the number of bytes needed for a ring, given
46  * the number of elements in it and the size of the element. This value
47  * is the sum of the size of the structure rte_ring and the size of the
48  * memory needed for storing the elements. The value is aligned to a cache
49  * line size.
50  *
51  * @param esize
52  *   The size of ring element, in bytes. It must be a multiple of 4.
53  * @param count
54  *   The number of elements in the ring (must be a power of 2).
55  * @return
56  *   - The memory size needed for the ring on success.
57  *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
58  *               power of 2.
59  */
60 __rte_experimental
61 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
62
63 /**
64  * @warning
65  * @b EXPERIMENTAL: this API may change without prior notice
66  *
67  * Create a new ring named *name* that stores elements with given size.
68  *
69  * This function uses ``memzone_reserve()`` to allocate memory. Then it
70  * calls rte_ring_init() to initialize an empty ring.
71  *
72  * The new ring size is set to *count*, which must be a power of
73  * two. Water marking is disabled by default. The real usable ring size
74  * is *count-1* instead of *count* to differentiate a free ring from an
75  * empty ring.
76  *
77  * The ring is added in RTE_TAILQ_RING list.
78  *
79  * @param name
80  *   The name of the ring.
81  * @param esize
82  *   The size of ring element, in bytes. It must be a multiple of 4.
83  * @param count
84  *   The number of elements in the ring (must be a power of 2).
85  * @param socket_id
86  *   The *socket_id* argument is the socket identifier in case of
87  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
88  *   constraint for the reserved zone.
89  * @param flags
90  *   An OR of the following:
91  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
92  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
93  *      is "single-producer". Otherwise, it is "multi-producers".
94  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
95  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
96  *      is "single-consumer". Otherwise, it is "multi-consumers".
97  * @return
98  *   On success, the pointer to the new allocated ring. NULL on error with
99  *    rte_errno set appropriately. Possible errno values include:
100  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
101  *    - E_RTE_SECONDARY - function was called from a secondary process instance
102  *    - EINVAL - esize is not a multiple of 4 or count provided is not a
103  *               power of 2.
104  *    - ENOSPC - the maximum number of memzones has already been allocated
105  *    - EEXIST - a memzone with the same name already exists
106  *    - ENOMEM - no appropriate memory area found in which to create memzone
107  */
108 __rte_experimental
109 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
110                         unsigned int count, int socket_id, unsigned int flags);
111
112 static __rte_always_inline void
113 enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
114                 const void *obj_table, uint32_t n)
115 {
116         unsigned int i;
117         uint32_t *ring = (uint32_t *)&r[1];
118         const uint32_t *obj = (const uint32_t *)obj_table;
119         if (likely(idx + n < size)) {
120                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
121                         ring[idx] = obj[i];
122                         ring[idx + 1] = obj[i + 1];
123                         ring[idx + 2] = obj[i + 2];
124                         ring[idx + 3] = obj[i + 3];
125                         ring[idx + 4] = obj[i + 4];
126                         ring[idx + 5] = obj[i + 5];
127                         ring[idx + 6] = obj[i + 6];
128                         ring[idx + 7] = obj[i + 7];
129                 }
130                 switch (n & 0x7) {
131                 case 7:
132                         ring[idx++] = obj[i++]; /* fallthrough */
133                 case 6:
134                         ring[idx++] = obj[i++]; /* fallthrough */
135                 case 5:
136                         ring[idx++] = obj[i++]; /* fallthrough */
137                 case 4:
138                         ring[idx++] = obj[i++]; /* fallthrough */
139                 case 3:
140                         ring[idx++] = obj[i++]; /* fallthrough */
141                 case 2:
142                         ring[idx++] = obj[i++]; /* fallthrough */
143                 case 1:
144                         ring[idx++] = obj[i++]; /* fallthrough */
145                 }
146         } else {
147                 for (i = 0; idx < size; i++, idx++)
148                         ring[idx] = obj[i];
149                 /* Start at the beginning */
150                 for (idx = 0; i < n; i++, idx++)
151                         ring[idx] = obj[i];
152         }
153 }
154
155 static __rte_always_inline void
156 enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
157                 const void *obj_table, uint32_t n)
158 {
159         unsigned int i;
160         const uint32_t size = r->size;
161         uint32_t idx = prod_head & r->mask;
162         uint64_t *ring = (uint64_t *)&r[1];
163         const uint64_t *obj = (const uint64_t *)obj_table;
164         if (likely(idx + n < size)) {
165                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
166                         ring[idx] = obj[i];
167                         ring[idx + 1] = obj[i + 1];
168                         ring[idx + 2] = obj[i + 2];
169                         ring[idx + 3] = obj[i + 3];
170                 }
171                 switch (n & 0x3) {
172                 case 3:
173                         ring[idx++] = obj[i++]; /* fallthrough */
174                 case 2:
175                         ring[idx++] = obj[i++]; /* fallthrough */
176                 case 1:
177                         ring[idx++] = obj[i++];
178                 }
179         } else {
180                 for (i = 0; idx < size; i++, idx++)
181                         ring[idx] = obj[i];
182                 /* Start at the beginning */
183                 for (idx = 0; i < n; i++, idx++)
184                         ring[idx] = obj[i];
185         }
186 }
187
188 static __rte_always_inline void
189 enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
190                 const void *obj_table, uint32_t n)
191 {
192         unsigned int i;
193         const uint32_t size = r->size;
194         uint32_t idx = prod_head & r->mask;
195         rte_int128_t *ring = (rte_int128_t *)&r[1];
196         const rte_int128_t *obj = (const rte_int128_t *)obj_table;
197         if (likely(idx + n < size)) {
198                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
199                         memcpy((void *)(ring + idx),
200                                 (const void *)(obj + i), 32);
201                 switch (n & 0x1) {
202                 case 1:
203                         memcpy((void *)(ring + idx),
204                                 (const void *)(obj + i), 16);
205                 }
206         } else {
207                 for (i = 0; idx < size; i++, idx++)
208                         memcpy((void *)(ring + idx),
209                                 (const void *)(obj + i), 16);
210                 /* Start at the beginning */
211                 for (idx = 0; i < n; i++, idx++)
212                         memcpy((void *)(ring + idx),
213                                 (const void *)(obj + i), 16);
214         }
215 }
216
217 /* the actual enqueue of elements on the ring.
218  * Placed here since identical code needed in both
219  * single and multi producer enqueue functions.
220  */
221 static __rte_always_inline void
222 enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
223                 uint32_t esize, uint32_t num)
224 {
225         /* 8B and 16B copies implemented individually to retain
226          * the current performance.
227          */
228         if (esize == 8)
229                 enqueue_elems_64(r, prod_head, obj_table, num);
230         else if (esize == 16)
231                 enqueue_elems_128(r, prod_head, obj_table, num);
232         else {
233                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
234
235                 /* Normalize to uint32_t */
236                 scale = esize / sizeof(uint32_t);
237                 nr_num = num * scale;
238                 idx = prod_head & r->mask;
239                 nr_idx = idx * scale;
240                 nr_size = r->size * scale;
241                 enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
242         }
243 }
244
245 static __rte_always_inline void
246 dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
247                 void *obj_table, uint32_t n)
248 {
249         unsigned int i;
250         uint32_t *ring = (uint32_t *)&r[1];
251         uint32_t *obj = (uint32_t *)obj_table;
252         if (likely(idx + n < size)) {
253                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
254                         obj[i] = ring[idx];
255                         obj[i + 1] = ring[idx + 1];
256                         obj[i + 2] = ring[idx + 2];
257                         obj[i + 3] = ring[idx + 3];
258                         obj[i + 4] = ring[idx + 4];
259                         obj[i + 5] = ring[idx + 5];
260                         obj[i + 6] = ring[idx + 6];
261                         obj[i + 7] = ring[idx + 7];
262                 }
263                 switch (n & 0x7) {
264                 case 7:
265                         obj[i++] = ring[idx++]; /* fallthrough */
266                 case 6:
267                         obj[i++] = ring[idx++]; /* fallthrough */
268                 case 5:
269                         obj[i++] = ring[idx++]; /* fallthrough */
270                 case 4:
271                         obj[i++] = ring[idx++]; /* fallthrough */
272                 case 3:
273                         obj[i++] = ring[idx++]; /* fallthrough */
274                 case 2:
275                         obj[i++] = ring[idx++]; /* fallthrough */
276                 case 1:
277                         obj[i++] = ring[idx++]; /* fallthrough */
278                 }
279         } else {
280                 for (i = 0; idx < size; i++, idx++)
281                         obj[i] = ring[idx];
282                 /* Start at the beginning */
283                 for (idx = 0; i < n; i++, idx++)
284                         obj[i] = ring[idx];
285         }
286 }
287
288 static __rte_always_inline void
289 dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
290                 void *obj_table, uint32_t n)
291 {
292         unsigned int i;
293         const uint32_t size = r->size;
294         uint32_t idx = prod_head & r->mask;
295         uint64_t *ring = (uint64_t *)&r[1];
296         uint64_t *obj = (uint64_t *)obj_table;
297         if (likely(idx + n < size)) {
298                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
299                         obj[i] = ring[idx];
300                         obj[i + 1] = ring[idx + 1];
301                         obj[i + 2] = ring[idx + 2];
302                         obj[i + 3] = ring[idx + 3];
303                 }
304                 switch (n & 0x3) {
305                 case 3:
306                         obj[i++] = ring[idx++]; /* fallthrough */
307                 case 2:
308                         obj[i++] = ring[idx++]; /* fallthrough */
309                 case 1:
310                         obj[i++] = ring[idx++]; /* fallthrough */
311                 }
312         } else {
313                 for (i = 0; idx < size; i++, idx++)
314                         obj[i] = ring[idx];
315                 /* Start at the beginning */
316                 for (idx = 0; i < n; i++, idx++)
317                         obj[i] = ring[idx];
318         }
319 }
320
321 static __rte_always_inline void
322 dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
323                 void *obj_table, uint32_t n)
324 {
325         unsigned int i;
326         const uint32_t size = r->size;
327         uint32_t idx = prod_head & r->mask;
328         rte_int128_t *ring = (rte_int128_t *)&r[1];
329         rte_int128_t *obj = (rte_int128_t *)obj_table;
330         if (likely(idx + n < size)) {
331                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
332                         memcpy((void *)(obj + i), (void *)(ring + idx), 32);
333                 switch (n & 0x1) {
334                 case 1:
335                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
336                 }
337         } else {
338                 for (i = 0; idx < size; i++, idx++)
339                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
340                 /* Start at the beginning */
341                 for (idx = 0; i < n; i++, idx++)
342                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
343         }
344 }
345
346 /* the actual dequeue of elements from the ring.
347  * Placed here since identical code needed in both
348  * single and multi producer enqueue functions.
349  */
350 static __rte_always_inline void
351 dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table,
352                 uint32_t esize, uint32_t num)
353 {
354         /* 8B and 16B copies implemented individually to retain
355          * the current performance.
356          */
357         if (esize == 8)
358                 dequeue_elems_64(r, cons_head, obj_table, num);
359         else if (esize == 16)
360                 dequeue_elems_128(r, cons_head, obj_table, num);
361         else {
362                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
363
364                 /* Normalize to uint32_t */
365                 scale = esize / sizeof(uint32_t);
366                 nr_num = num * scale;
367                 idx = cons_head & r->mask;
368                 nr_idx = idx * scale;
369                 nr_size = r->size * scale;
370                 dequeue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
371         }
372 }
373
374 /* Between load and load. there might be cpu reorder in weak model
375  * (powerpc/arm).
376  * There are 2 choices for the users
377  * 1.use rmb() memory barrier
378  * 2.use one-direction load_acquire/store_release barrier,defined by
379  * CONFIG_RTE_USE_C11_MEM_MODEL=y
380  * It depends on performance test results.
381  * By default, move common functions to rte_ring_generic.h
382  */
383 #ifdef RTE_USE_C11_MEM_MODEL
384 #include "rte_ring_c11_mem.h"
385 #else
386 #include "rte_ring_generic.h"
387 #endif
388
389 /**
390  * @internal Enqueue several objects on the ring
391  *
392  * @param r
393  *   A pointer to the ring structure.
394  * @param obj_table
395  *   A pointer to a table of void * pointers (objects).
396  * @param esize
397  *   The size of ring element, in bytes. It must be a multiple of 4.
398  *   This must be the same value used while creating the ring. Otherwise
399  *   the results are undefined.
400  * @param n
401  *   The number of objects to add in the ring from the obj_table.
402  * @param behavior
403  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
404  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
405  * @param is_sp
406  *   Indicates whether to use single producer or multi-producer head update
407  * @param free_space
408  *   returns the amount of space after the enqueue operation has finished
409  * @return
410  *   Actual number of objects enqueued.
411  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
412  */
413 static __rte_always_inline unsigned int
414 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
415                 unsigned int esize, unsigned int n,
416                 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
417                 unsigned int *free_space)
418 {
419         uint32_t prod_head, prod_next;
420         uint32_t free_entries;
421
422         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
423                         &prod_head, &prod_next, &free_entries);
424         if (n == 0)
425                 goto end;
426
427         enqueue_elems(r, prod_head, obj_table, esize, n);
428
429         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
430 end:
431         if (free_space != NULL)
432                 *free_space = free_entries - n;
433         return n;
434 }
435
436 /**
437  * @internal Dequeue several objects from the ring
438  *
439  * @param r
440  *   A pointer to the ring structure.
441  * @param obj_table
442  *   A pointer to a table of void * pointers (objects).
443  * @param esize
444  *   The size of ring element, in bytes. It must be a multiple of 4.
445  *   This must be the same value used while creating the ring. Otherwise
446  *   the results are undefined.
447  * @param n
448  *   The number of objects to pull from the ring.
449  * @param behavior
450  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
451  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
452  * @param is_sc
453  *   Indicates whether to use single consumer or multi-consumer head update
454  * @param available
455  *   returns the number of remaining ring entries after the dequeue has finished
456  * @return
457  *   - Actual number of objects dequeued.
458  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
459  */
460 static __rte_always_inline unsigned int
461 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
462                 unsigned int esize, unsigned int n,
463                 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
464                 unsigned int *available)
465 {
466         uint32_t cons_head, cons_next;
467         uint32_t entries;
468
469         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
470                         &cons_head, &cons_next, &entries);
471         if (n == 0)
472                 goto end;
473
474         dequeue_elems(r, cons_head, obj_table, esize, n);
475
476         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
477
478 end:
479         if (available != NULL)
480                 *available = entries - n;
481         return n;
482 }
483
484 /**
485  * Enqueue several objects on the ring (multi-producers safe).
486  *
487  * This function uses a "compare and set" instruction to move the
488  * producer index atomically.
489  *
490  * @param r
491  *   A pointer to the ring structure.
492  * @param obj_table
493  *   A pointer to a table of void * pointers (objects).
494  * @param esize
495  *   The size of ring element, in bytes. It must be a multiple of 4.
496  *   This must be the same value used while creating the ring. Otherwise
497  *   the results are undefined.
498  * @param n
499  *   The number of objects to add in the ring from the obj_table.
500  * @param free_space
501  *   if non-NULL, returns the amount of space in the ring after the
502  *   enqueue operation has finished.
503  * @return
504  *   The number of objects enqueued, either 0 or n
505  */
506 static __rte_always_inline unsigned int
507 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
508                 unsigned int esize, unsigned int n, unsigned int *free_space)
509 {
510         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
511                         RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
512 }
513
514 /**
515  * Enqueue several objects on a ring
516  *
517  * @warning This API is NOT multi-producers safe
518  *
519  * @param r
520  *   A pointer to the ring structure.
521  * @param obj_table
522  *   A pointer to a table of void * pointers (objects).
523  * @param esize
524  *   The size of ring element, in bytes. It must be a multiple of 4.
525  *   This must be the same value used while creating the ring. Otherwise
526  *   the results are undefined.
527  * @param n
528  *   The number of objects to add in the ring from the obj_table.
529  * @param free_space
530  *   if non-NULL, returns the amount of space in the ring after the
531  *   enqueue operation has finished.
532  * @return
533  *   The number of objects enqueued, either 0 or n
534  */
535 static __rte_always_inline unsigned int
536 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
537                 unsigned int esize, unsigned int n, unsigned int *free_space)
538 {
539         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
540                         RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
541 }
542
543 /**
544  * Enqueue several objects on a ring.
545  *
546  * This function calls the multi-producer or the single-producer
547  * version depending on the default behavior that was specified at
548  * ring creation time (see flags).
549  *
550  * @param r
551  *   A pointer to the ring structure.
552  * @param obj_table
553  *   A pointer to a table of void * pointers (objects).
554  * @param esize
555  *   The size of ring element, in bytes. It must be a multiple of 4.
556  *   This must be the same value used while creating the ring. Otherwise
557  *   the results are undefined.
558  * @param n
559  *   The number of objects to add in the ring from the obj_table.
560  * @param free_space
561  *   if non-NULL, returns the amount of space in the ring after the
562  *   enqueue operation has finished.
563  * @return
564  *   The number of objects enqueued, either 0 or n
565  */
566 static __rte_always_inline unsigned int
567 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
568                 unsigned int esize, unsigned int n, unsigned int *free_space)
569 {
570         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
571                         RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
572 }
573
574 /**
575  * Enqueue one object on a ring (multi-producers safe).
576  *
577  * This function uses a "compare and set" instruction to move the
578  * producer index atomically.
579  *
580  * @param r
581  *   A pointer to the ring structure.
582  * @param obj
583  *   A pointer to the object to be added.
584  * @param esize
585  *   The size of ring element, in bytes. It must be a multiple of 4.
586  *   This must be the same value used while creating the ring. Otherwise
587  *   the results are undefined.
588  * @return
589  *   - 0: Success; objects enqueued.
590  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
591  */
592 static __rte_always_inline int
593 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
594 {
595         return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
596                                                                 -ENOBUFS;
597 }
598
599 /**
600  * Enqueue one object on a ring
601  *
602  * @warning This API is NOT multi-producers safe
603  *
604  * @param r
605  *   A pointer to the ring structure.
606  * @param obj
607  *   A pointer to the object to be added.
608  * @param esize
609  *   The size of ring element, in bytes. It must be a multiple of 4.
610  *   This must be the same value used while creating the ring. Otherwise
611  *   the results are undefined.
612  * @return
613  *   - 0: Success; objects enqueued.
614  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
615  */
616 static __rte_always_inline int
617 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
618 {
619         return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
620                                                                 -ENOBUFS;
621 }
622
623 /**
624  * Enqueue one object on a ring.
625  *
626  * This function calls the multi-producer or the single-producer
627  * version, depending on the default behaviour that was specified at
628  * ring creation time (see flags).
629  *
630  * @param r
631  *   A pointer to the ring structure.
632  * @param obj
633  *   A pointer to the object to be added.
634  * @param esize
635  *   The size of ring element, in bytes. It must be a multiple of 4.
636  *   This must be the same value used while creating the ring. Otherwise
637  *   the results are undefined.
638  * @return
639  *   - 0: Success; objects enqueued.
640  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
641  */
642 static __rte_always_inline int
643 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
644 {
645         return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
646                                                                 -ENOBUFS;
647 }
648
649 /**
650  * Dequeue several objects from a ring (multi-consumers safe).
651  *
652  * This function uses a "compare and set" instruction to move the
653  * consumer index atomically.
654  *
655  * @param r
656  *   A pointer to the ring structure.
657  * @param obj_table
658  *   A pointer to a table of void * pointers (objects) that will be filled.
659  * @param esize
660  *   The size of ring element, in bytes. It must be a multiple of 4.
661  *   This must be the same value used while creating the ring. Otherwise
662  *   the results are undefined.
663  * @param n
664  *   The number of objects to dequeue from the ring to the obj_table.
665  * @param available
666  *   If non-NULL, returns the number of remaining ring entries after the
667  *   dequeue has finished.
668  * @return
669  *   The number of objects dequeued, either 0 or n
670  */
671 static __rte_always_inline unsigned int
672 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
673                 unsigned int esize, unsigned int n, unsigned int *available)
674 {
675         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
676                                 RTE_RING_QUEUE_FIXED, __IS_MC, available);
677 }
678
679 /**
680  * Dequeue several objects from a ring (NOT multi-consumers safe).
681  *
682  * @param r
683  *   A pointer to the ring structure.
684  * @param obj_table
685  *   A pointer to a table of void * pointers (objects) that will be filled.
686  * @param esize
687  *   The size of ring element, in bytes. It must be a multiple of 4.
688  *   This must be the same value used while creating the ring. Otherwise
689  *   the results are undefined.
690  * @param n
691  *   The number of objects to dequeue from the ring to the obj_table,
692  *   must be strictly positive.
693  * @param available
694  *   If non-NULL, returns the number of remaining ring entries after the
695  *   dequeue has finished.
696  * @return
697  *   The number of objects dequeued, either 0 or n
698  */
699 static __rte_always_inline unsigned int
700 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
701                 unsigned int esize, unsigned int n, unsigned int *available)
702 {
703         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
704                         RTE_RING_QUEUE_FIXED, __IS_SC, available);
705 }
706
707 /**
708  * Dequeue several objects from a ring.
709  *
710  * This function calls the multi-consumers or the single-consumer
711  * version, depending on the default behaviour that was specified at
712  * ring creation time (see flags).
713  *
714  * @param r
715  *   A pointer to the ring structure.
716  * @param obj_table
717  *   A pointer to a table of void * pointers (objects) that will be filled.
718  * @param esize
719  *   The size of ring element, in bytes. It must be a multiple of 4.
720  *   This must be the same value used while creating the ring. Otherwise
721  *   the results are undefined.
722  * @param n
723  *   The number of objects to dequeue from the ring to the obj_table.
724  * @param available
725  *   If non-NULL, returns the number of remaining ring entries after the
726  *   dequeue has finished.
727  * @return
728  *   The number of objects dequeued, either 0 or n
729  */
730 static __rte_always_inline unsigned int
731 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
732                 unsigned int esize, unsigned int n, unsigned int *available)
733 {
734         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
735                         RTE_RING_QUEUE_FIXED, r->cons.single, available);
736 }
737
738 /**
739  * Dequeue one object from a ring (multi-consumers safe).
740  *
741  * This function uses a "compare and set" instruction to move the
742  * consumer index atomically.
743  *
744  * @param r
745  *   A pointer to the ring structure.
746  * @param obj_p
747  *   A pointer to a void * pointer (object) that will be filled.
748  * @param esize
749  *   The size of ring element, in bytes. It must be a multiple of 4.
750  *   This must be the same value used while creating the ring. Otherwise
751  *   the results are undefined.
752  * @return
753  *   - 0: Success; objects dequeued.
754  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
755  *     dequeued.
756  */
757 static __rte_always_inline int
758 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
759                                 unsigned int esize)
760 {
761         return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
762                                                                 -ENOENT;
763 }
764
765 /**
766  * Dequeue one object from a ring (NOT multi-consumers safe).
767  *
768  * @param r
769  *   A pointer to the ring structure.
770  * @param obj_p
771  *   A pointer to a void * pointer (object) that will be filled.
772  * @param esize
773  *   The size of ring element, in bytes. It must be a multiple of 4.
774  *   This must be the same value used while creating the ring. Otherwise
775  *   the results are undefined.
776  * @return
777  *   - 0: Success; objects dequeued.
778  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
779  *     dequeued.
780  */
781 static __rte_always_inline int
782 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
783                                 unsigned int esize)
784 {
785         return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
786                                                                 -ENOENT;
787 }
788
789 /**
790  * Dequeue one object from a ring.
791  *
792  * This function calls the multi-consumers or the single-consumer
793  * version depending on the default behaviour that was specified at
794  * ring creation time (see flags).
795  *
796  * @param r
797  *   A pointer to the ring structure.
798  * @param obj_p
799  *   A pointer to a void * pointer (object) that will be filled.
800  * @param esize
801  *   The size of ring element, in bytes. It must be a multiple of 4.
802  *   This must be the same value used while creating the ring. Otherwise
803  *   the results are undefined.
804  * @return
805  *   - 0: Success, objects dequeued.
806  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
807  *     dequeued.
808  */
809 static __rte_always_inline int
810 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
811 {
812         return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
813                                                                 -ENOENT;
814 }
815
816 /**
817  * Enqueue several objects on the ring (multi-producers safe).
818  *
819  * This function uses a "compare and set" instruction to move the
820  * producer index atomically.
821  *
822  * @param r
823  *   A pointer to the ring structure.
824  * @param obj_table
825  *   A pointer to a table of void * pointers (objects).
826  * @param esize
827  *   The size of ring element, in bytes. It must be a multiple of 4.
828  *   This must be the same value used while creating the ring. Otherwise
829  *   the results are undefined.
830  * @param n
831  *   The number of objects to add in the ring from the obj_table.
832  * @param free_space
833  *   if non-NULL, returns the amount of space in the ring after the
834  *   enqueue operation has finished.
835  * @return
836  *   - n: Actual number of objects enqueued.
837  */
838 static __rte_always_inline unsigned
839 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
840                 unsigned int esize, unsigned int n, unsigned int *free_space)
841 {
842         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
843                         RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
844 }
845
846 /**
847  * Enqueue several objects on a ring
848  *
849  * @warning This API is NOT multi-producers safe
850  *
851  * @param r
852  *   A pointer to the ring structure.
853  * @param obj_table
854  *   A pointer to a table of void * pointers (objects).
855  * @param esize
856  *   The size of ring element, in bytes. It must be a multiple of 4.
857  *   This must be the same value used while creating the ring. Otherwise
858  *   the results are undefined.
859  * @param n
860  *   The number of objects to add in the ring from the obj_table.
861  * @param free_space
862  *   if non-NULL, returns the amount of space in the ring after the
863  *   enqueue operation has finished.
864  * @return
865  *   - n: Actual number of objects enqueued.
866  */
867 static __rte_always_inline unsigned
868 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
869                 unsigned int esize, unsigned int n, unsigned int *free_space)
870 {
871         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
872                         RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
873 }
874
875 /**
876  * Enqueue several objects on a ring.
877  *
878  * This function calls the multi-producer or the single-producer
879  * version depending on the default behavior that was specified at
880  * ring creation time (see flags).
881  *
882  * @param r
883  *   A pointer to the ring structure.
884  * @param obj_table
885  *   A pointer to a table of void * pointers (objects).
886  * @param esize
887  *   The size of ring element, in bytes. It must be a multiple of 4.
888  *   This must be the same value used while creating the ring. Otherwise
889  *   the results are undefined.
890  * @param n
891  *   The number of objects to add in the ring from the obj_table.
892  * @param free_space
893  *   if non-NULL, returns the amount of space in the ring after the
894  *   enqueue operation has finished.
895  * @return
896  *   - n: Actual number of objects enqueued.
897  */
898 static __rte_always_inline unsigned
899 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
900                 unsigned int esize, unsigned int n, unsigned int *free_space)
901 {
902         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
903                         RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
904 }
905
906 /**
907  * Dequeue several objects from a ring (multi-consumers safe). When the request
908  * objects are more than the available objects, only dequeue the actual number
909  * of objects
910  *
911  * This function uses a "compare and set" instruction to move the
912  * consumer index atomically.
913  *
914  * @param r
915  *   A pointer to the ring structure.
916  * @param obj_table
917  *   A pointer to a table of void * pointers (objects) that will be filled.
918  * @param esize
919  *   The size of ring element, in bytes. It must be a multiple of 4.
920  *   This must be the same value used while creating the ring. Otherwise
921  *   the results are undefined.
922  * @param n
923  *   The number of objects to dequeue from the ring to the obj_table.
924  * @param available
925  *   If non-NULL, returns the number of remaining ring entries after the
926  *   dequeue has finished.
927  * @return
928  *   - n: Actual number of objects dequeued, 0 if ring is empty
929  */
930 static __rte_always_inline unsigned
931 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
932                 unsigned int esize, unsigned int n, unsigned int *available)
933 {
934         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
935                         RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
936 }
937
938 /**
939  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
940  * request objects are more than the available objects, only dequeue the
941  * actual number of objects
942  *
943  * @param r
944  *   A pointer to the ring structure.
945  * @param obj_table
946  *   A pointer to a table of void * pointers (objects) that will be filled.
947  * @param esize
948  *   The size of ring element, in bytes. It must be a multiple of 4.
949  *   This must be the same value used while creating the ring. Otherwise
950  *   the results are undefined.
951  * @param n
952  *   The number of objects to dequeue from the ring to the obj_table.
953  * @param available
954  *   If non-NULL, returns the number of remaining ring entries after the
955  *   dequeue has finished.
956  * @return
957  *   - n: Actual number of objects dequeued, 0 if ring is empty
958  */
959 static __rte_always_inline unsigned
960 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
961                 unsigned int esize, unsigned int n, unsigned int *available)
962 {
963         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
964                         RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
965 }
966
967 /**
968  * Dequeue multiple objects from a ring up to a maximum number.
969  *
970  * This function calls the multi-consumers or the single-consumer
971  * version, depending on the default behaviour that was specified at
972  * ring creation time (see flags).
973  *
974  * @param r
975  *   A pointer to the ring structure.
976  * @param obj_table
977  *   A pointer to a table of void * pointers (objects) that will be filled.
978  * @param esize
979  *   The size of ring element, in bytes. It must be a multiple of 4.
980  *   This must be the same value used while creating the ring. Otherwise
981  *   the results are undefined.
982  * @param n
983  *   The number of objects to dequeue from the ring to the obj_table.
984  * @param available
985  *   If non-NULL, returns the number of remaining ring entries after the
986  *   dequeue has finished.
987  * @return
988  *   - Number of objects dequeued
989  */
990 static __rte_always_inline unsigned int
991 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
992                 unsigned int esize, unsigned int n, unsigned int *available)
993 {
994         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
995                                 RTE_RING_QUEUE_VARIABLE,
996                                 r->cons.single, available);
997 }
998
999 #ifdef __cplusplus
1000 }
1001 #endif
1002
1003 #endif /* _RTE_RING_ELEM_H_ */